use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result, bail};
use kafka_protocol::protocol::VersionRange;
pub fn select_api_version(
api_key: i16,
broker_range: VersionRange,
local_range: VersionRange,
cap: i16,
) -> Result<i16> {
let local_range = VersionRange {
min: local_range.min,
max: local_range.max.min(cap),
};
let negotiated = broker_range.intersect(&local_range);
if negotiated.is_empty() {
bail!(
"no shared version for API key {}: broker={}, local={}",
api_key,
broker_range,
local_range
);
}
Ok(negotiated.max)
}
pub fn duration_to_i32_ms(duration: Duration) -> Result<i32> {
i32::try_from(duration.as_millis())
.context("duration does not fit in Kafka's i32 millisecond field")
}
pub fn now_unix_ms() -> Result<i64> {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("system clock is before the unix epoch")?
.as_millis();
i64::try_from(now_ms).context("system time does not fit in i64 milliseconds")
}
#[cfg(test)]
mod tests {
use super::*;
use kafka_protocol::protocol::{Message, Request};
#[test]
fn version_negotiation_respects_cap() {
let negotiated = select_api_version(
kafka_protocol::messages::ProduceRequest::KEY,
VersionRange { min: 3, max: 13 },
kafka_protocol::messages::ProduceRequest::VERSIONS,
12,
)
.unwrap();
assert_eq!(negotiated, 12);
}
}