use ahash::AHashMap;
use nautilus_core::UnixNanos;
use crate::{
entry::Topic,
markers::{DataClass, DataCursorSnapshot, StreamCursor, StreamDictEntry, StreamSlot},
};
#[derive(Debug, Default)]
pub struct CursorState {
slots: AHashMap<Topic, StreamSlot>,
dict: Vec<StreamDictEntry>,
cursors: Vec<Cursor>,
dirty: Vec<bool>,
}
#[derive(Debug, Default)]
struct Cursor {
ts_init_hi: UnixNanos,
count: u64,
last_ts_init: UnixNanos,
same_ts_count: u32,
}
impl CursorState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn advance(
&mut self,
topic: Topic,
data_cls: DataClass,
identifier: &str,
ts_init: UnixNanos,
) -> (StreamSlot, u32) {
let slot = if let Some(&slot) = self.slots.get(&topic) {
slot
} else {
let slot = u32::try_from(self.cursors.len()).expect("stream slot count fits u32");
self.cursors.push(Cursor::default());
self.dirty.push(false);
self.dict.push(StreamDictEntry {
slot,
data_cls,
identifier: identifier.to_string(),
});
self.slots.insert(topic, slot);
slot
};
let cursor = &mut self.cursors[slot as usize];
let same_ts_ordinal = if cursor.count == 0 || ts_init != cursor.last_ts_init {
cursor.last_ts_init = ts_init;
cursor.same_ts_count = 0;
0
} else {
cursor.same_ts_count += 1;
cursor.same_ts_count
};
cursor.count += 1;
if ts_init > cursor.ts_init_hi {
cursor.ts_init_hi = ts_init;
}
self.dirty[slot as usize] = true;
(slot, same_ts_ordinal)
}
pub fn take_new_dict_entries(&mut self) -> Vec<StreamDictEntry> {
std::mem::take(&mut self.dict)
}
pub fn build_snapshot(
&mut self,
marker_seq: u64,
event_seq_before: u64,
ts_init: UnixNanos,
) -> Option<DataCursorSnapshot> {
let mut advanced = Vec::new();
for (slot, (cursor, dirty)) in (0_u32..).zip(self.cursors.iter().zip(self.dirty.iter_mut()))
{
if *dirty {
*dirty = false;
advanced.push(StreamCursor {
slot,
ts_init_hi: cursor.ts_init_hi,
count: cursor.count,
});
}
}
if advanced.is_empty() {
None
} else {
Some(DataCursorSnapshot {
marker_seq,
event_seq_before,
ts_init,
advanced,
})
}
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
#[rstest]
fn advance_assigns_slot_lazily_and_writes_dict() {
let mut state = CursorState::new();
let quotes: Topic = "data.quotes.BINANCE.ETHUSDT".into();
let trades: Topic = "data.trades.BINANCE.BTCUSDT".into();
let (s0, _) = state.advance(
quotes,
DataClass::Quote,
"ETHUSDT.BINANCE",
UnixNanos::from(1),
);
let (s1, _) = state.advance(
trades,
DataClass::Trade,
"BTCUSDT.BINANCE",
UnixNanos::from(2),
);
let (s0_again, _) = state.advance(
quotes,
DataClass::Quote,
"ETHUSDT.BINANCE",
UnixNanos::from(3),
);
let dict = state.take_new_dict_entries();
assert_eq!(s0, 0);
assert_eq!(s1, 1);
assert_eq!(s0_again, 0);
assert_eq!(
dict,
vec![
StreamDictEntry {
slot: 0,
data_cls: DataClass::Quote,
identifier: "ETHUSDT.BINANCE".to_string(),
},
StreamDictEntry {
slot: 1,
data_cls: DataClass::Trade,
identifier: "BTCUSDT.BINANCE".to_string(),
},
]
);
assert!(state.take_new_dict_entries().is_empty());
}
#[rstest]
fn advance_updates_count_and_ts_hi() {
let mut state = CursorState::new();
let quotes: Topic = "data.quotes.BINANCE.ETHUSDT".into();
let (slot, _) = state.advance(
quotes,
DataClass::Quote,
"ETHUSDT.BINANCE",
UnixNanos::from(100),
);
state.advance(
quotes,
DataClass::Quote,
"ETHUSDT.BINANCE",
UnixNanos::from(200),
);
state.advance(
quotes,
DataClass::Quote,
"ETHUSDT.BINANCE",
UnixNanos::from(150),
);
let cursor = &state.cursors[slot as usize];
assert_eq!(cursor.count, 3);
assert_eq!(cursor.ts_init_hi, UnixNanos::from(200));
assert!(state.dirty[slot as usize]);
}
#[rstest]
fn build_snapshot_includes_only_dirty_and_clears() {
let mut state = CursorState::new();
let a: Topic = "data.quotes.BINANCE.A".into();
let b: Topic = "data.quotes.BINANCE.B".into();
let c: Topic = "data.quotes.BINANCE.C".into();
let (sa, _) = state.advance(a, DataClass::Quote, "A.BINANCE", UnixNanos::from(10));
state.advance(b, DataClass::Quote, "B.BINANCE", UnixNanos::from(20));
let (sc, _) = state.advance(c, DataClass::Quote, "C.BINANCE", UnixNanos::from(30));
let first = state
.build_snapshot(1, 0, UnixNanos::from(30))
.expect("snapshot");
state.advance(a, DataClass::Quote, "A.BINANCE", UnixNanos::from(40));
state.advance(c, DataClass::Quote, "C.BINANCE", UnixNanos::from(50));
let second = state
.build_snapshot(2, 7, UnixNanos::from(50))
.expect("snapshot");
let third = state.build_snapshot(3, 7, UnixNanos::from(60));
assert_eq!(first.advanced.len(), 3);
assert_eq!(second.marker_seq, 2);
assert_eq!(second.event_seq_before, 7);
assert_eq!(
second.advanced,
vec![
StreamCursor {
slot: sa,
ts_init_hi: UnixNanos::from(40),
count: 2,
},
StreamCursor {
slot: sc,
ts_init_hi: UnixNanos::from(50),
count: 2,
},
]
);
assert!(third.is_none());
}
#[rstest]
#[case::repeats_then_advance(vec![100, 100, 100, 200, 200], vec![0, 1, 2, 0, 1])]
#[case::non_monotonic_resets_each_change(vec![100, 200, 100], vec![0, 0, 0])]
fn same_ts_ordinal_tracks_contiguous_runs(
#[case] timestamps: Vec<u64>,
#[case] expected: Vec<u32>,
) {
let mut state = CursorState::new();
let trades: Topic = "data.trades.BINANCE.ETHUSDT".into();
let ordinals: Vec<u32> = timestamps
.into_iter()
.map(|ts| {
let (_, ordinal) = state.advance(
trades,
DataClass::Trade,
"ETHUSDT.BINANCE",
UnixNanos::from(ts),
);
ordinal
})
.collect();
assert_eq!(ordinals, expected);
}
}