use chrono::{DateTime, Utc};
use serde::de::{self};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::utils::{sign_extend, BitReader, BitWriter};
const DEFAULT_CAPACITY: u32 = 8192;
const DEFAULT_INTERVAL_SECS: u64 = 60;
const SERIALIZATION_VERSION: u8 = 1;
#[derive(PartialEq, Clone, Copy)]
pub(crate) struct TrackedSeq {
pub(crate) seq: u64,
pub(crate) ts: DateTime<Utc>,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum FindOption {
RoundUp,
RoundDown,
}
#[derive(Debug, Clone)]
pub(crate) struct SequenceTracker {
sequence_numbers: Vec<u64>,
timestamps: Vec<i64>,
capacity: u32,
interval_secs: u64,
last_recorded_ts: Option<i64>,
}
impl PartialEq for SequenceTracker {
fn eq(&self, other: &Self) -> bool {
self.sequence_numbers == other.sequence_numbers
&& self.timestamps == other.timestamps
&& self.capacity == other.capacity
&& self.interval_secs == other.interval_secs
}
}
#[allow(dead_code)]
impl SequenceTracker {
pub(crate) fn new() -> Self {
Self::with_config(DEFAULT_CAPACITY, DEFAULT_INTERVAL_SECS)
}
fn with_config(capacity: u32, interval_secs: u64) -> Self {
Self {
sequence_numbers: Vec::with_capacity(capacity as usize),
timestamps: Vec::with_capacity(capacity as usize),
capacity,
interval_secs,
last_recorded_ts: None,
}
}
pub(crate) fn insert(&mut self, seq: TrackedSeq) {
let ts_secs = seq.ts.timestamp();
if let Some(last_ts) = self.last_recorded_ts {
if (ts_secs - last_ts) < self.interval_secs as i64 {
return;
}
}
if let Some(last_seq) = self.sequence_numbers.last() {
assert!(
seq.seq >= *last_seq,
"Sequence numbers must be monotonic. seq: {}, last_seq: {}",
seq.seq,
*last_seq
);
}
if let Some(last_ts) = self.timestamps.last() {
assert!(
ts_secs >= *last_ts,
"Timestamps must be monotonic. ts_secs: {}, last_ts: {}",
ts_secs,
*last_ts
);
}
self.sequence_numbers.push(seq.seq);
self.timestamps.push(ts_secs);
self.last_recorded_ts = Some(ts_secs);
if self.sequence_numbers.len() > self.capacity as usize {
self.downsample();
}
}
pub(crate) fn extend_from(&mut self, other: &SequenceTracker) {
if other.sequence_numbers.is_empty() {
return;
}
for (seq, &ts_secs) in other.sequence_numbers.iter().zip(other.timestamps.iter()) {
if let Some(ts) = DateTime::from_timestamp(ts_secs, 0) {
self.insert(TrackedSeq { seq: *seq, ts });
}
}
}
fn downsample(&mut self) {
let mut new_seqs = Vec::with_capacity(self.capacity as usize);
let mut new_timestamps = Vec::with_capacity(self.capacity as usize);
for i in (0..self.sequence_numbers.len()).step_by(2) {
new_seqs.push(self.sequence_numbers[i]);
new_timestamps.push(self.timestamps[i]);
}
self.sequence_numbers = new_seqs;
self.timestamps = new_timestamps;
}
pub(crate) fn find_ts(&self, seq: u64, find_opt: FindOption) -> Option<DateTime<Utc>> {
if self.sequence_numbers.is_empty() {
return None;
}
match self.sequence_numbers.binary_search(&seq) {
Ok(idx) => DateTime::from_timestamp(self.timestamps[idx], 0),
Err(idx) => match find_opt {
FindOption::RoundUp if idx < self.sequence_numbers.len() => {
DateTime::from_timestamp(self.timestamps[idx], 0)
}
FindOption::RoundDown if idx > 0 => {
DateTime::from_timestamp(self.timestamps[idx - 1], 0)
}
_ => None,
},
}
}
pub(crate) fn find_seq(&self, ts: DateTime<Utc>, find_opt: FindOption) -> Option<u64> {
let ts_secs = ts.timestamp();
if self.timestamps.is_empty() {
return None;
}
match self.timestamps.binary_search(&ts_secs) {
Ok(idx) => Some(self.sequence_numbers[idx]),
Err(idx) => match find_opt {
FindOption::RoundUp if idx < self.timestamps.len() => {
Some(self.sequence_numbers[idx])
}
FindOption::RoundDown if idx > 0 => Some(self.sequence_numbers[idx - 1]),
_ => None,
},
}
}
pub(crate) fn to_bytes(&self) -> Vec<u8> {
encode_sequence_tracker(self)
}
pub(crate) fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
decode_sequence_tracker(bytes)
}
}
impl Serialize for SequenceTracker {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let encoded = encode_sequence_tracker(self);
serializer.serialize_bytes(&encoded)
}
}
impl<'de> Deserialize<'de> for SequenceTracker {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let buf: Vec<u8> = Vec::deserialize(deserializer)?;
decode_sequence_tracker(&buf).map_err(<D::Error as de::Error>::custom)
}
}
fn encode_sequence_tracker(tracker: &SequenceTracker) -> Vec<u8> {
let mut result = Vec::new();
result.push(SERIALIZATION_VERSION);
let len = tracker.sequence_numbers.len() as u32;
result.extend_from_slice(&len.to_le_bytes());
let encoded_seqs = encode_sequence_numbers(&tracker.sequence_numbers);
result.extend_from_slice(&encoded_seqs);
let encoded_timestamps = encode_timestamps(&tracker.timestamps);
result.extend_from_slice(&encoded_timestamps);
result
}
fn decode_sequence_tracker(buf: &[u8]) -> Result<SequenceTracker, String> {
if buf.is_empty() {
return Err("Empty buffer".to_string());
}
let mut offset = 0;
let version = buf[offset];
if version != SERIALIZATION_VERSION {
return Err(format!("Unsupported version: {}", version));
}
offset += 1;
if buf.len() < offset + 4 {
return Err("Buffer too small for length".to_string());
}
let len_bytes: [u8; 4] = buf[offset..offset + 4]
.try_into()
.map_err(|_| "Buffer too small for length".to_string())?;
let len = u32::from_le_bytes(len_bytes) as usize;
offset += 4;
let (encoded_seq_len, sequence_numbers) = decode_sequence_numbers_with_length(&buf[offset..])?;
if sequence_numbers.len() != len {
return Err(format!(
"Expected {} sequence numbers, got {}",
len,
sequence_numbers.len()
));
}
offset += encoded_seq_len;
let timestamps = decode_timestamps(&buf[offset..])?;
if timestamps.len() != len {
return Err(format!(
"Expected {} timestamps, got {}",
len,
timestamps.len()
));
}
let mut tracker = SequenceTracker::with_config(DEFAULT_CAPACITY, DEFAULT_INTERVAL_SECS);
tracker.sequence_numbers = sequence_numbers;
tracker.timestamps = timestamps;
Ok(tracker)
}
fn encode_gorilla_i64(values: &[i64]) -> Vec<u8> {
let mut w = BitWriter::new();
w.push32(values.len() as u32, 32);
if !values.is_empty() {
w.push64(values[0] as u64, 64);
let mut prev_val = values[0];
let mut prev_delta: i64 = 0;
for &val in &values[1..] {
let delta = val - prev_val;
let dod = delta - prev_delta;
match dod {
0 => w.push(false),
-63..=63 => {
w.push32(0b10, 2);
w.push32((dod as u32) & 0x7F, 7);
}
-255..=255 => {
w.push32(0b110, 3);
w.push32((dod as u32) & 0x1FF, 9);
}
-2047..=2047 => {
w.push32(0b1110, 4);
w.push32((dod as u32) & 0xFFF, 12);
}
_ => {
w.push32(0b1111, 4);
w.push32(dod as u32, 32);
}
}
prev_val = val;
prev_delta = delta;
}
}
w.finish()
}
fn decode_gorilla_i64_with_length(buf: &[u8]) -> Result<(usize, Vec<i64>), String> {
let mut values = Vec::new();
let mut reader = BitReader::new(buf);
let mut bits_read = 0usize;
let mut remaining = reader.read32(32).ok_or("Expected count first")?;
bits_read += 32;
if remaining > 0 {
let first_bits = reader
.read64(64)
.ok_or("Unexpected EOF: first value should be 64 bit")?;
let first = first_bits as i64;
bits_read += 64;
remaining -= 1;
values.push(first);
let mut prev_val = first;
let mut prev_delta: i64 = 0;
while remaining > 0 {
let prefix = reader.read_bit().ok_or("Unexpected EOF reading prefix")?;
bits_read += 1;
let dod = if !prefix {
0i64
} else {
let mut count = 1;
while count < 4 {
bits_read += 1;
match reader.read_bit() {
Some(true) => count += 1,
Some(false) => break,
None => return Err("Unexpected EOF decoding Gorilla prefix".to_string()),
}
}
let bits = GORILLA_PREFIX_BYTES[count];
let raw = reader
.read32(bits)
.ok_or_else(|| format!("Unexpected EOF reading {}-bit delta-of-delta", bits))?;
bits_read += bits as usize;
sign_extend(raw, bits) as i64
};
let delta = prev_delta + dod;
let val = prev_val + delta;
values.push(val);
prev_delta = delta;
prev_val = val;
remaining -= 1;
}
}
let bytes_consumed = bits_read.div_ceil(8);
Ok((bytes_consumed, values))
}
fn encode_sequence_numbers(sequences: &[u64]) -> Vec<u8> {
let as_i64: Vec<i64> = sequences.iter().map(|&x| x as i64).collect();
encode_gorilla_i64(&as_i64)
}
fn decode_sequence_numbers_with_length(buf: &[u8]) -> Result<(usize, Vec<u64>), String> {
let (bytes_consumed, values) = decode_gorilla_i64_with_length(buf)?;
let sequences: Vec<u64> = values.into_iter().map(|v| v as u64).collect();
Ok((bytes_consumed, sequences))
}
fn encode_timestamps(timestamps: &[i64]) -> Vec<u8> {
encode_gorilla_i64(timestamps)
}
fn decode_timestamps(buf: &[u8]) -> Result<Vec<i64>, String> {
let (_, timestamps) = decode_gorilla_i64_with_length(buf)?;
Ok(timestamps)
}
const GORILLA_PREFIX_BYTES: [u8; 5] = [0, 7, 9, 12, 32];
#[cfg(test)]
mod tests {
use super::*;
use rstest::*;
#[test]
fn should_track_sequence_timestamp_pairs() {
let mut tracker = SequenceTracker::with_config(10, 1);
tracker.insert(TrackedSeq {
seq: 100,
ts: DateTime::from_timestamp(1000, 0).unwrap(),
});
tracker.insert(TrackedSeq {
seq: 200,
ts: DateTime::from_timestamp(2000, 0).unwrap(),
});
tracker.insert(TrackedSeq {
seq: 300,
ts: DateTime::from_timestamp(3000, 0).unwrap(),
});
assert_eq!(tracker.sequence_numbers.len(), 3);
assert_eq!(
tracker.find_seq(
DateTime::from_timestamp(2000, 0).unwrap(),
FindOption::RoundDown
),
Some(200)
);
assert_eq!(
tracker.find_ts(200, FindOption::RoundDown),
DateTime::from_timestamp(2000, 0)
);
}
#[test]
fn should_respect_reporting_interval() {
let mut tracker = SequenceTracker::with_config(10, 60);
tracker.insert(TrackedSeq {
seq: 100,
ts: DateTime::from_timestamp(1000, 0).unwrap(),
});
tracker.insert(TrackedSeq {
seq: 200,
ts: DateTime::from_timestamp(1030, 0).unwrap(),
});
tracker.insert(TrackedSeq {
seq: 300,
ts: DateTime::from_timestamp(1060, 0).unwrap(),
});
assert_eq!(tracker.sequence_numbers.len(), 2);
assert_eq!(tracker.sequence_numbers, vec![100, 300]);
}
#[test]
fn should_downsample_when_exceeding_capacity() {
let mut tracker = SequenceTracker::with_config(4, 1);
for i in 0..6 {
tracker.insert(TrackedSeq {
seq: (i * 100) as u64,
ts: DateTime::from_timestamp((i * 1000) as i64, 0).unwrap(),
});
}
assert!(tracker.sequence_numbers.len() <= 4);
assert_eq!(tracker.sequence_numbers[0], 0);
}
#[test]
fn extend_from_non_empty_tracker() {
let mut tracker1 = SequenceTracker::with_config(10, 1);
tracker1.insert(TrackedSeq {
seq: 100,
ts: DateTime::from_timestamp(1000, 0).unwrap(),
});
tracker1.insert(TrackedSeq {
seq: 200,
ts: DateTime::from_timestamp(2000, 0).unwrap(),
});
let mut tracker2 = SequenceTracker::with_config(10, 1);
tracker2.insert(TrackedSeq {
seq: 300,
ts: DateTime::from_timestamp(3000, 0).unwrap(),
});
tracker2.insert(TrackedSeq {
seq: 400,
ts: DateTime::from_timestamp(4000, 0).unwrap(),
});
tracker1.extend_from(&tracker2);
assert_eq!(tracker1.sequence_numbers.len(), 4);
assert_eq!(tracker1.sequence_numbers, vec![100, 200, 300, 400]);
assert_eq!(
tracker1.find_ts(300, FindOption::RoundDown),
DateTime::from_timestamp(3000, 0)
);
assert_eq!(
tracker1.find_ts(400, FindOption::RoundDown),
DateTime::from_timestamp(4000, 0)
);
}
#[test]
fn extend_empty_tracker() {
let mut tracker1 = SequenceTracker::with_config(10, 1);
let mut tracker2 = SequenceTracker::with_config(10, 1);
tracker2.insert(TrackedSeq {
seq: 100,
ts: DateTime::from_timestamp(1000, 0).unwrap(),
});
tracker2.insert(TrackedSeq {
seq: 200,
ts: DateTime::from_timestamp(2000, 0).unwrap(),
});
tracker1.extend_from(&tracker2);
assert_eq!(tracker1.sequence_numbers.len(), 2);
assert_eq!(tracker1.sequence_numbers, vec![100, 200]);
assert_eq!(
tracker1.find_ts(100, FindOption::RoundDown),
DateTime::from_timestamp(1000, 0)
);
assert_eq!(
tracker1.find_ts(200, FindOption::RoundDown),
DateTime::from_timestamp(2000, 0)
);
}
#[test]
fn extend_from_empty_tracker() {
let mut tracker1 = SequenceTracker::with_config(10, 1);
tracker1.insert(TrackedSeq {
seq: 100,
ts: DateTime::from_timestamp(1000, 0).unwrap(),
});
let tracker2 = SequenceTracker::with_config(10, 1);
let initial_len = tracker1.sequence_numbers.len();
tracker1.extend_from(&tracker2);
assert_eq!(tracker1.sequence_numbers.len(), initial_len);
assert_eq!(tracker1.sequence_numbers, vec![100]);
}
#[test]
fn extend_respects_interval_filtering() {
let mut tracker1 = SequenceTracker::with_config(10, 60);
tracker1.insert(TrackedSeq {
seq: 100,
ts: DateTime::from_timestamp(1000, 0).unwrap(),
});
let mut tracker2 = SequenceTracker::with_config(10, 1);
tracker2.insert(TrackedSeq {
seq: 200,
ts: DateTime::from_timestamp(1030, 0).unwrap(), });
tracker2.insert(TrackedSeq {
seq: 300,
ts: DateTime::from_timestamp(1060, 0).unwrap(), });
tracker2.insert(TrackedSeq {
seq: 400,
ts: DateTime::from_timestamp(1110, 0).unwrap(), });
tracker2.insert(TrackedSeq {
seq: 500,
ts: DateTime::from_timestamp(1120, 0).unwrap(), });
tracker1.extend_from(&tracker2);
assert_eq!(tracker1.sequence_numbers.len(), 3); assert_eq!(tracker1.sequence_numbers, vec![100, 300, 500]);
}
#[test]
fn extend_triggers_downsampling() {
let mut tracker1 = SequenceTracker::with_config(4, 1);
for i in 0..4 {
tracker1.insert(TrackedSeq {
seq: (i * 100) as u64,
ts: DateTime::from_timestamp((i * 1000) as i64, 0).unwrap(),
});
}
let mut tracker2 = SequenceTracker::with_config(10, 1);
for i in 4..7 {
tracker2.insert(TrackedSeq {
seq: (i * 100) as u64,
ts: DateTime::from_timestamp((i * 1000) as i64, 0).unwrap(),
});
}
tracker1.extend_from(&tracker2);
assert!(tracker1.sequence_numbers.len() <= 4);
assert_eq!(tracker1.sequence_numbers[0], 0);
}
#[rstest]
#[case::exact_match(8, FindOption::RoundDown, Some(200))]
#[case::exact_match_round_up(8, FindOption::RoundUp, Some(200))]
#[case::round_up_to_next(6, FindOption::RoundUp, Some(200))]
#[case::round_down_to_previous(6, FindOption::RoundDown, Some(100))]
#[case::before_first_seq(0, FindOption::RoundDown, None)]
#[case::after_last_seq(16, FindOption::RoundUp, None)]
#[case::at_first_seq(4, FindOption::RoundDown, Some(100))]
#[case::at_last_seq(12, FindOption::RoundUp, Some(300))]
fn test_find_timestamp_by_seq(
#[case] seq: u64,
#[case] find_opt: FindOption,
#[case] expected_ts: Option<i64>,
) {
let mut tracker = SequenceTracker::with_config(4, 1);
if seq != 1 || find_opt != FindOption::RoundDown {
tracker.insert(TrackedSeq {
seq: 4,
ts: DateTime::from_timestamp(100, 0).unwrap(),
});
tracker.insert(TrackedSeq {
seq: 8,
ts: DateTime::from_timestamp(200, 0).unwrap(),
});
tracker.insert(TrackedSeq {
seq: 12,
ts: DateTime::from_timestamp(300, 0).unwrap(),
});
}
let result = tracker.find_ts(seq, find_opt);
assert_eq!(
result,
expected_ts.and_then(|ts| DateTime::from_timestamp(ts, 0))
);
}
#[rstest]
#[case::exact_match(200, FindOption::RoundDown, Some(8))]
#[case::exact_match_round_up(200, FindOption::RoundUp, Some(8))]
#[case::round_up_to_next(150, FindOption::RoundUp, Some(8))]
#[case::round_down_to_previous(150, FindOption::RoundDown, Some(4))]
#[case::before_first_ts(50, FindOption::RoundDown, None)]
#[case::after_last_ts(350, FindOption::RoundUp, None)]
#[case::at_first_ts(100, FindOption::RoundDown, Some(4))]
#[case::at_last_ts(300, FindOption::RoundUp, Some(12))]
#[case::empty_tier(1, FindOption::RoundDown, None)]
fn test_find_seq_by_timestamp(
#[case] ts: i64,
#[case] find_opt: FindOption,
#[case] expected: Option<u64>,
) {
let mut tracker = SequenceTracker::with_config(4, 1);
if ts != 1 || find_opt != FindOption::RoundDown {
tracker.insert(TrackedSeq {
seq: 4,
ts: DateTime::from_timestamp(100, 0).unwrap(),
});
tracker.insert(TrackedSeq {
seq: 8,
ts: DateTime::from_timestamp(200, 0).unwrap(),
});
tracker.insert(TrackedSeq {
seq: 12,
ts: DateTime::from_timestamp(300, 0).unwrap(),
});
}
let result = tracker.find_seq(DateTime::from_timestamp(ts, 0).unwrap(), find_opt);
assert_eq!(result, expected);
}
#[test]
fn test_serialize_deserialize_round_trip() {
let mut tracker = SequenceTracker::with_config(10, 1);
for i in 0..5 {
tracker.insert(TrackedSeq {
seq: (i * 100) as u64,
ts: DateTime::from_timestamp((i * 1000) as i64, 0).unwrap(),
});
}
let encoded = encode_sequence_tracker(&tracker);
let decoded = decode_sequence_tracker(&encoded).unwrap();
assert_eq!(decoded.sequence_numbers, tracker.sequence_numbers);
assert_eq!(decoded.timestamps, tracker.timestamps);
}
#[rstest]
#[case::empty_sequences(vec![])]
#[case::single_sequence(vec![1000])]
#[case::multiple_sequences(vec![100, 200, 300, 400, 500])]
#[case::large_gaps(vec![100, 10000, 20000, 30000])]
#[case::small_increments(vec![100, 101, 102, 103, 104])]
#[case::max_u64(vec![u64::MAX])]
#[case::above_i64_max(vec![i64::MAX as u64 + 1])]
#[case::u64_values_that_become_negative(vec![u64::MAX - 100, u64::MAX - 50, u64::MAX])]
#[case::large_u64_with_small_deltas(vec![u64::MAX - 10, u64::MAX - 9, u64::MAX - 8, u64::MAX - 7])]
#[case::wraparound_sequence(vec![u64::MAX - 1, u64::MAX, 0, 1, 2])]
fn test_encode_decode_sequence_numbers(#[case] sequences: Vec<u64>) {
let encoded = encode_sequence_numbers(&sequences);
let (_, decoded) = decode_sequence_numbers_with_length(&encoded).unwrap();
assert_eq!(decoded, sequences);
}
#[rstest]
#[case::empty_timestamps(vec![])]
#[case::single_timestamp(vec![1234567890])]
#[case::multiple_timestamps(vec![100, 200, 300, 400, 500])]
#[case::zero_dod(vec![1000, 1000, 1000, 1000])] #[case::small_dod(vec![1000, 1064, 1128, 1192])] #[case::medium_dod(vec![1000, 1256, 1512, 1768])] #[case::large_dod(vec![1000, 3048, 5096, 7144])] #[case::huge_dod(vec![1000, 1000000, 2000000, 3000000])] #[case::negative_dod(vec![1000, 937, 874, 811])] #[case::negative_timestamps(vec![-1000, -500, 0, 500, 1000])] #[case::mixed_dod(vec![1000, 1000, 1032, 1128, 1512, 50000])] #[case::mixed_dod_boundaries(vec![1000, 1000, 1028, 1255, 2047, 1000000])] #[case::large_numbers(vec![i64::MAX - 1000, i64::MAX - 500, i64::MAX])]
fn test_encode_decode_timestamps(#[case] timestamps: Vec<i64>) {
let encoded = encode_timestamps(×tamps);
let decoded = decode_timestamps(&encoded).unwrap();
assert_eq!(decoded, timestamps);
}
#[test]
fn test_empty_tracker_queries() {
let tracker = SequenceTracker::new();
assert_eq!(
tracker.find_seq(
DateTime::from_timestamp(100, 0).unwrap(),
FindOption::RoundDown
),
None
);
assert_eq!(tracker.find_ts(100, FindOption::RoundUp), None);
assert!(tracker.sequence_numbers.is_empty());
}
#[test]
fn test_serialize_deserialize_round_trip_proptest() {
use proptest::collection::vec;
use proptest::prelude::*;
let tracker_strategy = vec(any::<(u32, u32)>(), 0..=20).prop_map(|pairs| {
let mut tracker = SequenceTracker::with_config(100, 1);
let base_seq = 0u64;
let base_timestamp = 1_600_000_000i64;
let mut current_seq = base_seq;
let mut current_ts = base_timestamp;
for (seq_delta, ts_delta) in pairs {
let safe_seq_delta = (seq_delta % (u32::MAX / 2)) as u64;
let safe_ts_delta = (ts_delta % (u32::MAX / 2)) as i64;
current_seq = current_seq.saturating_add(safe_seq_delta);
current_ts = current_ts.saturating_add(safe_ts_delta);
tracker.insert(TrackedSeq {
seq: current_seq,
ts: DateTime::from_timestamp(current_ts, 0).unwrap(),
});
}
tracker
});
proptest!(|(tracker in tracker_strategy)| {
let encoded = encode_sequence_tracker(&tracker);
let decoded = decode_sequence_tracker(&encoded).unwrap();
assert_eq!(decoded.sequence_numbers, tracker.sequence_numbers);
assert_eq!(decoded.timestamps, tracker.timestamps);
});
}
#[test]
fn admin_seq_tracker_seq_to_ts_rounding() -> Result<(), crate::Error> {
use crate::admin::AdminBuilder;
use crate::config::{CheckpointOptions, CheckpointScope, Settings};
use crate::db::Db;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::ObjectStore;
use std::sync::Arc;
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_admin_seq_tracker_seq_to_ts");
let admin = AdminBuilder::new(path.clone(), object_store.clone()).build();
let db = Db::builder(path.clone(), object_store.clone())
.with_settings(Settings::default())
.build()
.await?;
db.put(b"k1", b"v1").await?;
db.put(b"k2", b"v2").await?;
db.put(b"k3", b"v3").await?;
db.create_checkpoint(CheckpointScope::All, &CheckpointOptions::default())
.await?;
db.close().await?;
let ts_first = admin.get_timestamp_for_sequence(0, true).await?;
assert!(ts_first.is_some());
let ts_after_last = admin.get_timestamp_for_sequence(u64::MAX, true).await?;
assert!(ts_after_last.is_none());
Ok::<_, crate::Error>(())
})?;
Ok(())
}
#[test]
fn admin_seq_tracker_ts_to_seq_rounding() -> Result<(), crate::Error> {
use crate::admin::AdminBuilder;
use crate::config::{CheckpointOptions, CheckpointScope, Settings};
use crate::db::Db;
use chrono::{TimeZone, Utc};
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::ObjectStore;
use std::sync::Arc;
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("/tmp/test_admin_seq_tracker_ts_to_seq");
let admin = AdminBuilder::new(path.clone(), object_store.clone()).build();
let db = Db::builder(path.clone(), object_store.clone())
.with_settings(Settings::default())
.build()
.await?;
db.put(b"a", b"1").await?;
db.put(b"b", b"2").await?;
db.create_checkpoint(CheckpointScope::All, &CheckpointOptions::default())
.await?;
db.close().await?;
let long_ago = Utc.timestamp_opt(1, 0).single().unwrap();
let seq_before_first = admin.get_sequence_for_timestamp(long_ago, false).await?;
assert!(
seq_before_first.is_none(),
"round-down before first ts should yield None"
);
let now = Utc::now();
let seq_latest = admin.get_sequence_for_timestamp(now, false).await?;
assert!(
seq_latest.is_some(),
"round-down at/after last ts should yield Some(seq)"
);
Ok::<_, crate::Error>(())
})?;
Ok(())
}
}