use kimberlite_query::TimestampResolution;
use kimberlite_types::Offset;
#[derive(Debug, Default, Clone)]
pub(crate) struct TimestampIndex {
entries: Vec<(Offset, i64)>,
}
impl TimestampIndex {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn insert(&mut self, offset: Offset, wall_ns: i64) {
let (clamped_ns, accept) = match self.entries.last().copied() {
Some((prev_off, prev_ns)) => {
if offset <= prev_off {
return;
}
let clamped = if wall_ns > prev_ns {
wall_ns
} else {
prev_ns.saturating_add(1)
};
(clamped, true)
}
None => (wall_ns, true),
};
if !accept {
return;
}
debug_assert!(
self.entries
.last()
.is_none_or(|(off, ns)| offset > *off && clamped_ns > *ns),
"TimestampIndex invariant broken: inserting ({offset:?}, {clamped_ns}) after \
last {:?}",
self.entries.last()
);
self.entries.push((offset, clamped_ns));
}
pub(crate) fn resolve(&self, target_ns: i64) -> TimestampResolution {
if self.entries.is_empty() {
return TimestampResolution::LogEmpty;
}
let earliest_ns = self.entries[0].1;
if target_ns < earliest_ns {
return TimestampResolution::BeforeRetentionHorizon {
horizon_ns: earliest_ns,
};
}
let idx = self.entries.partition_point(|(_, ns)| *ns <= target_ns);
debug_assert!(idx >= 1);
let (offset, _) = self.entries[idx.saturating_sub(1)];
TimestampResolution::Offset(offset)
}
#[cfg(test)]
pub(crate) fn earliest(&self) -> Option<(Offset, i64)> {
self.entries.first().copied()
}
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.entries.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_index_resolves_to_log_empty() {
let idx = TimestampIndex::new();
assert_eq!(
idx.resolve(1_700_000_000_000_000_000),
TimestampResolution::LogEmpty
);
}
#[test]
fn target_before_earliest_resolves_to_retention_horizon() {
let mut idx = TimestampIndex::new();
idx.insert(Offset::new(1), 1_700_000_000_000_000_000);
idx.insert(Offset::new(2), 1_700_000_001_000_000_000);
match idx.resolve(1_699_000_000_000_000_000) {
TimestampResolution::BeforeRetentionHorizon { horizon_ns } => {
assert_eq!(horizon_ns, 1_700_000_000_000_000_000);
}
other => panic!("expected BeforeRetentionHorizon, got {other:?}"),
}
}
#[test]
fn exact_match_returns_matching_offset() {
let mut idx = TimestampIndex::new();
idx.insert(Offset::new(10), 1_000);
idx.insert(Offset::new(20), 2_000);
idx.insert(Offset::new(30), 3_000);
assert_eq!(
idx.resolve(2_000),
TimestampResolution::Offset(Offset::new(20))
);
}
#[test]
fn between_entries_returns_floor() {
let mut idx = TimestampIndex::new();
idx.insert(Offset::new(10), 1_000);
idx.insert(Offset::new(20), 2_000);
idx.insert(Offset::new(30), 3_000);
assert_eq!(
idx.resolve(2_500),
TimestampResolution::Offset(Offset::new(20))
);
}
#[test]
fn future_timestamp_returns_latest_offset() {
let mut idx = TimestampIndex::new();
idx.insert(Offset::new(10), 1_000);
idx.insert(Offset::new(20), 2_000);
assert_eq!(
idx.resolve(i64::MAX),
TimestampResolution::Offset(Offset::new(20))
);
}
#[test]
fn clock_skew_clamps_to_monotonic() {
let mut idx = TimestampIndex::new();
idx.insert(Offset::new(1), 5_000);
idx.insert(Offset::new(2), 4_000);
assert_eq!(idx.len(), 2);
assert_eq!(
idx.resolve(4_000),
TimestampResolution::BeforeRetentionHorizon { horizon_ns: 5_000 }
);
assert_eq!(
idx.resolve(5_000),
TimestampResolution::Offset(Offset::new(1))
);
assert_eq!(
idx.resolve(5_001),
TimestampResolution::Offset(Offset::new(2))
);
}
#[test]
fn duplicate_offset_is_noop() {
let mut idx = TimestampIndex::new();
idx.insert(Offset::new(5), 1_000);
idx.insert(Offset::new(5), 2_000); assert_eq!(idx.len(), 1);
assert_eq!(idx.earliest(), Some((Offset::new(5), 1_000)));
}
#[test]
fn proptest_resolve_returns_consistent_offset_for_monotonic_inserts() {
use proptest::prelude::*;
proptest!(|(count in 1usize..50)| {
let mut idx = TimestampIndex::new();
for i in 0..count {
let off = Offset::new((i as u64) + 1);
let ns = 1_000_000 + (i as i64) * 1_000;
idx.insert(off, ns);
}
for i in 0..count {
let ns = 1_000_000 + (i as i64) * 1_000;
let expected_off = Offset::new((i as u64) + 1);
prop_assert_eq!(idx.resolve(ns), TimestampResolution::Offset(expected_off));
}
match idx.resolve(0) {
TimestampResolution::BeforeRetentionHorizon { horizon_ns } => {
prop_assert_eq!(horizon_ns, 1_000_000);
}
other => prop_assert!(false, "expected horizon, got {:?}", other),
}
});
}
}