kafkit-client 0.1.7

Kafka 4.0+ pure Rust client.
Documentation
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::{Context, Result, bail};
use kafka_protocol::protocol::VersionRange;

pub fn select_api_version(
    api_key: i16,
    broker_range: VersionRange,
    local_range: VersionRange,
    cap: i16,
) -> Result<i16> {
    let local_range = VersionRange {
        min: local_range.min,
        max: local_range.max.min(cap),
    };
    let negotiated = broker_range.intersect(&local_range);
    if negotiated.is_empty() {
        bail!(
            "no shared version for API key {}: broker={}, local={}",
            api_key,
            broker_range,
            local_range
        );
    }
    Ok(negotiated.max)
}

pub fn duration_to_i32_ms(duration: Duration) -> Result<i32> {
    i32::try_from(duration.as_millis())
        .context("duration does not fit in Kafka's i32 millisecond field")
}

pub fn now_unix_ms() -> Result<i64> {
    let now_ms = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .context("system clock is before the unix epoch")?
        .as_millis();
    i64::try_from(now_ms).context("system time does not fit in i64 milliseconds")
}

#[cfg(test)]
mod tests {
    use super::*;
    use kafka_protocol::protocol::{Message, Request};

    #[test]
    fn version_negotiation_respects_cap() {
        let negotiated = select_api_version(
            kafka_protocol::messages::ProduceRequest::KEY,
            VersionRange { min: 3, max: 13 },
            kafka_protocol::messages::ProduceRequest::VERSIONS,
            12,
        )
        .unwrap();
        assert_eq!(negotiated, 12);
    }
}