use std::{
any::Any,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use ahash::AHashSet;
use nautilus_core::UnixNanos;
use nautilus_system::event_store::DataMarkerConfig;
use crate::{
Topic,
markers::{
CursorState, DataMarkerExtractorRegistry, HiFiMarker, MarkerMsg, MarkerWriter,
StreamDictEntry,
},
};
#[derive(Debug)]
pub struct DataMarkerCapture {
cursor: CursorState,
registry: DataMarkerExtractorRegistry,
writer: MarkerWriter,
submit_counter: Arc<AtomicU64>,
marker_seq: u64,
hifi: AHashSet<String>,
last_flush: UnixNanos,
safety_flush_interval: Duration,
}
impl DataMarkerCapture {
#[must_use]
pub fn new(
registry: DataMarkerExtractorRegistry,
writer: MarkerWriter,
submit_counter: Arc<AtomicU64>,
config: &DataMarkerConfig,
) -> Self {
Self {
cursor: CursorState::new(),
registry,
writer,
submit_counter,
marker_seq: 0,
hifi: config.high_fidelity.iter().cloned().collect(),
last_flush: UnixNanos::default(),
safety_flush_interval: config.safety_flush_interval,
}
}
pub fn observe_publish(&mut self, topic: Topic, message: &dyn Any, _now: UnixNanos) {
let Some(extractor) = self.registry.lookup(message) else {
return;
};
let event_seq_before = self.submit_counter.load(Ordering::Acquire);
let Some(identifier) = extractor.identifier(message) else {
return;
};
let Some((ts_event, ts_init)) = extractor.timestamps(message) else {
return;
};
let data_class = extractor.data_class();
let record_fingerprint = if self.hifi.contains(identifier.as_str()) {
let Some(record_fingerprint) = extractor.fingerprint(message) else {
return;
};
Some(record_fingerprint)
} else {
None
};
let (slot, same_ts_ordinal) = self.cursor.advance(topic, data_class, &identifier, ts_init);
self.drain_dict_entries();
if let Some(record_fingerprint) = record_fingerprint {
let marker_seq = self.marker_seq + 1;
let marker = HiFiMarker {
marker_seq,
event_seq_before,
slot,
ts_event,
ts_init,
same_ts_ordinal,
record_fingerprint,
};
self.submit_marker(MarkerMsg::HiFi(marker), marker_seq);
}
}
pub fn on_entry_submitted(&mut self, now: UnixNanos) {
self.flush_snapshot(now);
}
pub fn maybe_safety_flush(&mut self, now: UnixNanos) {
if now
.duration_since(&self.last_flush)
.is_some_and(|elapsed| elapsed >= duration_nanos_saturating(self.safety_flush_interval))
{
self.flush_snapshot(now);
}
}
pub fn close(self) {
self.writer.close();
}
fn drain_dict_entries(&mut self) {
for entry in self.cursor.take_new_dict_entries() {
self.submit_dict(entry);
}
}
fn submit_dict(&self, entry: StreamDictEntry) {
let _ = self.writer.put_dict(entry);
}
fn flush_snapshot(&mut self, now: UnixNanos) {
let marker_seq = self.marker_seq + 1;
let event_seq_before = self.submit_counter.load(Ordering::Acquire);
if let Some(snapshot) = self
.cursor
.build_snapshot(marker_seq, event_seq_before, now)
{
self.submit_marker(MarkerMsg::Snapshot(snapshot), marker_seq);
self.last_flush = now;
}
}
fn submit_marker(&mut self, msg: MarkerMsg, marker_seq: u64) {
let _ = self.writer.submit(msg, marker_seq);
self.marker_seq = marker_seq;
}
}
fn duration_nanos_saturating(duration: Duration) -> u64 {
u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
}
#[cfg(test)]
mod tests {
use std::{
any::Any,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use nautilus_core::{UnixNanos, time::get_atomic_clock_static};
use nautilus_system::event_store::DataMarkerClass;
use rstest::rstest;
use super::{
super::test_support::{SharedMemoryMarker, SharedMemoryMarkerState},
*,
};
use crate::{
Topic,
manifest::RunStatus,
markers::{
DataClass, DataCursorSnapshot, DataMarkerExtractor, DataMarkerExtractorRegistry,
HiFiMarker, MarkerBackend, MarkerManifest, MarkerWriter, MarkerWriterConfig,
StreamDictEntry,
},
};
#[derive(Debug)]
struct TestQuote {
identifier: &'static str,
ts_event: UnixNanos,
ts_init: UnixNanos,
fingerprint: [u8; 32],
}
#[derive(Debug)]
struct IgnoredMessage;
#[derive(Debug)]
struct TestQuoteExtractor;
impl DataMarkerExtractor for TestQuoteExtractor {
fn data_class(&self) -> DataClass {
DataClass::Quote
}
fn identifier(&self, msg: &dyn Any) -> Option<String> {
msg.downcast_ref::<TestQuote>()
.map(|quote| quote.identifier.to_string())
}
fn timestamps(&self, msg: &dyn Any) -> Option<(UnixNanos, UnixNanos)> {
msg.downcast_ref::<TestQuote>()
.map(|quote| (quote.ts_event, quote.ts_init))
}
fn fingerprint(&self, msg: &dyn Any) -> Option<[u8; 32]> {
msg.downcast_ref::<TestQuote>()
.map(|quote| quote.fingerprint)
}
}
fn manifest() -> MarkerManifest {
MarkerManifest {
run_id: "1700000000-phase7".to_string(),
enabled_classes: vec![DataClass::Quote],
high_fidelity: true,
snapshot_count: 0,
hifi_count: 0,
gap_count: 0,
dict_count: 0,
status: RunStatus::Running,
}
}
fn registry() -> DataMarkerExtractorRegistry {
let mut registry = DataMarkerExtractorRegistry::new();
registry.register::<TestQuote>(Box::new(TestQuoteExtractor));
registry
}
fn config(high_fidelity: Vec<String>, safety_flush_interval: Duration) -> DataMarkerConfig {
DataMarkerConfig {
classes: vec![DataMarkerClass::Quote],
high_fidelity,
safety_flush_interval,
channel_capacity: 100,
}
}
fn quote(identifier: &'static str, ts_event: u64, ts_init: u64) -> TestQuote {
TestQuote {
identifier,
ts_event: UnixNanos::from(ts_event),
ts_init: UnixNanos::from(ts_init),
fingerprint: [7; 32],
}
}
fn open_capture(
config: &DataMarkerConfig,
submit_counter: Arc<AtomicU64>,
) -> (DataMarkerCapture, SharedMemoryMarkerState) {
let (wrapper, shared) = SharedMemoryMarker::new();
shared
.lock()
.expect("shared marker")
.open_run(manifest())
.expect("open marker run");
let writer = MarkerWriter::spawn(
Box::new(wrapper),
get_atomic_clock_static(),
MarkerWriterConfig {
channel_capacity: 100,
max_batch: 1,
max_latency: Duration::from_millis(1),
},
)
.expect("spawn writer");
(
DataMarkerCapture::new(registry(), writer, submit_counter, config),
shared,
)
}
fn snapshots(shared: &SharedMemoryMarkerState) -> Vec<DataCursorSnapshot> {
shared
.lock()
.expect("shared marker")
.scan_snapshots()
.expect("scan snapshots")
}
fn hifi(shared: &SharedMemoryMarkerState) -> Vec<HiFiMarker> {
shared
.lock()
.expect("shared marker")
.scan_hifi()
.expect("scan hifi")
}
fn dict(shared: &SharedMemoryMarkerState) -> Vec<StreamDictEntry> {
shared
.lock()
.expect("shared marker")
.scan_dict()
.expect("scan dict")
}
#[rstest]
fn event_seq_before_tracks_submit_counter() {
let submit_counter = Arc::new(AtomicU64::new(5));
let cfg = config(vec!["ETHUSDT.BINANCE".to_string()], Duration::from_secs(1));
let (mut capture, shared) = open_capture(&cfg, Arc::clone(&submit_counter));
let topic: Topic = "data.quotes.BINANCE.ETHUSDT".into();
capture.observe_publish(
topic,
"e("ETHUSDT.BINANCE", 10, 20),
UnixNanos::from(20),
);
submit_counter.store(6, Ordering::Release);
capture.on_entry_submitted(UnixNanos::from(30));
capture.close();
let snapshots = snapshots(&shared);
let hifi = hifi(&shared);
assert_eq!(hifi.len(), 1);
assert_eq!(hifi[0].event_seq_before, 5);
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].event_seq_before, 6);
}
#[rstest]
fn snapshot_written_on_entry_boundary() {
let submit_counter = Arc::new(AtomicU64::new(1));
let cfg = config(Vec::new(), Duration::from_secs(1));
let (mut capture, shared) = open_capture(&cfg, submit_counter);
let topic: Topic = "data.quotes.BINANCE.ETHUSDT".into();
capture.observe_publish(
topic,
"e("ETHUSDT.BINANCE", 10, 20),
UnixNanos::from(20),
);
capture.on_entry_submitted(UnixNanos::from(30));
capture.close();
let snapshots = snapshots(&shared);
let dict = dict(&shared);
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].marker_seq, 1);
assert_eq!(snapshots[0].event_seq_before, 1);
assert_eq!(snapshots[0].advanced[0].count, 1);
assert_eq!(
dict,
vec![StreamDictEntry {
slot: 0,
data_cls: DataClass::Quote,
identifier: "ETHUSDT.BINANCE".to_string(),
}]
);
}
#[rstest]
fn safety_flush_emits_without_entry() {
let submit_counter = Arc::new(AtomicU64::new(0));
let cfg = config(Vec::new(), Duration::from_nanos(10));
let (mut capture, shared) = open_capture(&cfg, submit_counter);
let topic: Topic = "data.quotes.BINANCE.ETHUSDT".into();
capture.observe_publish(
topic,
"e("ETHUSDT.BINANCE", 10, 20),
UnixNanos::from(20),
);
capture.maybe_safety_flush(UnixNanos::from(30));
capture.close();
let snapshots = snapshots(&shared);
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].event_seq_before, 0);
assert_eq!(snapshots[0].ts_init, UnixNanos::from(30));
}
#[rstest]
fn safety_flush_waits_for_interval_after_entry_boundary() {
let submit_counter = Arc::new(AtomicU64::new(1));
let cfg = config(Vec::new(), Duration::from_nanos(10));
let (mut capture, shared) = open_capture(&cfg, submit_counter);
let topic: Topic = "data.quotes.BINANCE.ETHUSDT".into();
capture.observe_publish(
topic,
"e("ETHUSDT.BINANCE", 10, 20),
UnixNanos::from(20),
);
capture.on_entry_submitted(UnixNanos::from(100));
capture.observe_publish(
topic,
"e("ETHUSDT.BINANCE", 30, 40),
UnixNanos::from(40),
);
capture.maybe_safety_flush(UnixNanos::from(109));
capture.maybe_safety_flush(UnixNanos::from(110));
capture.close();
let snapshots = snapshots(&shared);
assert_eq!(snapshots.len(), 2);
assert_eq!(snapshots[0].ts_init, UnixNanos::from(100));
assert_eq!(snapshots[0].advanced[0].count, 1);
assert_eq!(snapshots[1].ts_init, UnixNanos::from(110));
assert_eq!(snapshots[1].advanced[0].count, 2);
}
#[rstest]
fn hifi_marker_emitted_for_configured_instrument() {
let submit_counter = Arc::new(AtomicU64::new(2));
let cfg = config(vec!["ETHUSDT.BINANCE".to_string()], Duration::from_secs(1));
let (mut capture, shared) = open_capture(&cfg, submit_counter);
let eth_topic: Topic = "data.quotes.BINANCE.ETHUSDT".into();
let btc_topic: Topic = "data.quotes.BINANCE.BTCUSDT".into();
capture.observe_publish(
eth_topic,
"e("ETHUSDT.BINANCE", 10, 20),
UnixNanos::from(20),
);
capture.observe_publish(
btc_topic,
"e("BTCUSDT.BINANCE", 30, 40),
UnixNanos::from(40),
);
capture.close();
let hifi = hifi(&shared);
assert_eq!(hifi.len(), 1);
assert_eq!(hifi[0].marker_seq, 1);
assert_eq!(hifi[0].event_seq_before, 2);
assert_eq!(hifi[0].slot, 0);
assert_eq!(hifi[0].ts_event, UnixNanos::from(10));
assert_eq!(hifi[0].ts_init, UnixNanos::from(20));
assert_eq!(hifi[0].same_ts_ordinal, 0);
assert_eq!(hifi[0].record_fingerprint, [7; 32]);
}
#[rstest]
fn hifi_same_ts_ordinal_increments() {
let submit_counter = Arc::new(AtomicU64::new(0));
let cfg = config(vec!["ETHUSDT.BINANCE".to_string()], Duration::from_secs(1));
let (mut capture, shared) = open_capture(&cfg, submit_counter);
let topic: Topic = "data.quotes.BINANCE.ETHUSDT".into();
capture.observe_publish(
topic,
"e("ETHUSDT.BINANCE", 10, 20),
UnixNanos::from(20),
);
capture.observe_publish(
topic,
"e("ETHUSDT.BINANCE", 11, 20),
UnixNanos::from(20),
);
capture.close();
let hifi = hifi(&shared);
assert_eq!(hifi.len(), 2);
assert_eq!(
hifi.iter()
.map(|marker| marker.same_ts_ordinal)
.collect::<Vec<_>>(),
vec![0, 1]
);
assert_eq!(
hifi.iter()
.map(|marker| marker.marker_seq)
.collect::<Vec<_>>(),
vec![1, 2]
);
}
#[rstest]
fn unregistered_type_is_ignored() {
let submit_counter = Arc::new(AtomicU64::new(0));
let cfg = config(Vec::new(), Duration::from_nanos(1));
let (mut capture, shared) = open_capture(&cfg, submit_counter);
let topic: Topic = "data.quotes.BINANCE.ETHUSDT".into();
capture.observe_publish(topic, &IgnoredMessage, UnixNanos::from(20));
capture.maybe_safety_flush(UnixNanos::from(30));
capture.close();
assert!(snapshots(&shared).is_empty());
assert!(hifi(&shared).is_empty());
assert!(dict(&shared).is_empty());
}
}