use std::fmt::Display;
use nautilus_core::UnixNanos;
use serde::{Deserialize, Serialize};
use crate::wire;
const MARKER_HASH_DOMAIN: &[u8] = b"nautilus-event-store/marker/v1";
const HIFI_HASH_DOMAIN: &[u8] = b"nautilus-event-store/hifi/v1";
const DICT_HASH_DOMAIN: &[u8] = b"nautilus-event-store/dict/v1";
const GAP_HASH_DOMAIN: &[u8] = b"nautilus-event-store/gap/v1";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum DataClass {
BookDeltas,
BookDepth10,
Quote,
Trade,
Bar,
}
impl DataClass {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::BookDeltas => "BookDeltas",
Self::BookDepth10 => "BookDepth10",
Self::Quote => "Quote",
Self::Trade => "Trade",
Self::Bar => "Bar",
}
}
}
impl Display for DataClass {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for DataClass {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"BookDeltas" => Ok(Self::BookDeltas),
"BookDepth10" => Ok(Self::BookDepth10),
"Quote" => Ok(Self::Quote),
"Trade" => Ok(Self::Trade),
"Bar" => Ok(Self::Bar),
other => Err(format!("unknown DataClass, was `{other}`")),
}
}
}
pub type StreamSlot = u32;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StreamCursor {
pub slot: StreamSlot,
#[serde(with = "wire::nanos_as_u64")]
pub ts_init_hi: UnixNanos,
pub count: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DataCursorSnapshot {
pub marker_seq: u64,
pub event_seq_before: u64,
#[serde(with = "wire::nanos_as_u64")]
pub ts_init: UnixNanos,
pub advanced: Vec<StreamCursor>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct HiFiMarker {
pub marker_seq: u64,
pub event_seq_before: u64,
pub slot: StreamSlot,
#[serde(with = "wire::nanos_as_u64")]
pub ts_event: UnixNanos,
#[serde(with = "wire::nanos_as_u64")]
pub ts_init: UnixNanos,
pub same_ts_ordinal: u32,
pub record_fingerprint: [u8; 32],
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum MarkerGapReason {
Overflow,
WriterClosed,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MarkerGap {
pub from_marker_seq: u64,
pub to_marker_seq: u64,
pub reason: MarkerGapReason,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StreamDictEntry {
pub slot: StreamSlot,
pub data_cls: DataClass,
pub identifier: String,
}
#[must_use]
pub fn compute_marker_hash(snapshot: &DataCursorSnapshot) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(MARKER_HASH_DOMAIN);
hasher.update(&snapshot.marker_seq.to_be_bytes());
hasher.update(&snapshot.event_seq_before.to_be_bytes());
hasher.update(&snapshot.ts_init.as_u64().to_be_bytes());
hasher.update(&(snapshot.advanced.len() as u64).to_be_bytes());
for cursor in &snapshot.advanced {
hasher.update(&cursor.slot.to_be_bytes());
hasher.update(&cursor.ts_init_hi.as_u64().to_be_bytes());
hasher.update(&cursor.count.to_be_bytes());
}
*hasher.finalize().as_bytes()
}
#[must_use]
pub fn compute_hifi_hash(marker: &HiFiMarker) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(HIFI_HASH_DOMAIN);
hasher.update(&marker.marker_seq.to_be_bytes());
hasher.update(&marker.event_seq_before.to_be_bytes());
hasher.update(&marker.slot.to_be_bytes());
hasher.update(&marker.ts_event.as_u64().to_be_bytes());
hasher.update(&marker.ts_init.as_u64().to_be_bytes());
hasher.update(&marker.same_ts_ordinal.to_be_bytes());
hasher.update(&marker.record_fingerprint);
*hasher.finalize().as_bytes()
}
#[must_use]
pub fn compute_dict_hash(entry: &StreamDictEntry) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(DICT_HASH_DOMAIN);
hasher.update(&entry.slot.to_be_bytes());
let class = entry.data_cls.as_str().as_bytes();
hasher.update(&(class.len() as u64).to_be_bytes());
hasher.update(class);
let identifier = entry.identifier.as_bytes();
hasher.update(&(identifier.len() as u64).to_be_bytes());
hasher.update(identifier);
*hasher.finalize().as_bytes()
}
#[must_use]
pub fn compute_gap_hash(gap: &MarkerGap) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(GAP_HASH_DOMAIN);
hasher.update(&gap.from_marker_seq.to_be_bytes());
hasher.update(&gap.to_marker_seq.to_be_bytes());
let reason = match gap.reason {
MarkerGapReason::Overflow => 0u8,
MarkerGapReason::WriterClosed => 1u8,
};
hasher.update(&[reason]);
*hasher.finalize().as_bytes()
}
#[cfg(test)]
mod tests {
use std::{fmt::Write, str::FromStr};
use proptest::{prelude::*, test_runner::Config as ProptestConfig};
use rstest::rstest;
use super::*;
#[rstest]
fn data_class_roundtrips_to_str() {
let variants = [
(DataClass::BookDeltas, "BookDeltas"),
(DataClass::BookDepth10, "BookDepth10"),
(DataClass::Quote, "Quote"),
(DataClass::Trade, "Trade"),
(DataClass::Bar, "Bar"),
];
for (variant, expected) in variants {
assert_eq!(variant.as_str(), expected, "as_str for {variant:?}");
assert_eq!(variant.to_string(), expected, "Display for {variant:?}");
assert_eq!(
DataClass::from_str(expected).unwrap(),
variant,
"from_str for {expected}"
);
}
}
fn baseline_snapshot() -> DataCursorSnapshot {
DataCursorSnapshot {
marker_seq: 1,
event_seq_before: 42,
ts_init: UnixNanos::from(1_700_000_000_000_000_000),
advanced: vec![
StreamCursor {
slot: 0,
ts_init_hi: UnixNanos::from(1_700_000_000_000_000_001),
count: 7,
},
StreamCursor {
slot: 1,
ts_init_hi: UnixNanos::from(1_700_000_000_000_000_002),
count: 3,
},
],
}
}
fn baseline_hifi() -> HiFiMarker {
HiFiMarker {
marker_seq: 1,
event_seq_before: 42,
slot: 0,
ts_event: UnixNanos::from(1_700_000_000_000_000_000),
ts_init: UnixNanos::from(1_700_000_000_000_000_001),
same_ts_ordinal: 0,
record_fingerprint: [0xABu8; 32],
}
}
fn baseline_dict() -> StreamDictEntry {
StreamDictEntry {
slot: 3,
data_cls: DataClass::Quote,
identifier: "ETHUSDT.BINANCE".to_string(),
}
}
fn baseline_gap() -> MarkerGap {
MarkerGap {
from_marker_seq: 5,
to_marker_seq: 9,
reason: MarkerGapReason::Overflow,
}
}
fn hex32(bytes: &[u8; 32]) -> String {
let mut out = String::with_capacity(64);
for byte in bytes {
write!(out, "{byte:02x}").expect("writing to a String is infallible");
}
out
}
#[rstest]
fn marker_hash_is_deterministic() {
let snap = baseline_snapshot();
let h1 = compute_marker_hash(&snap);
let h2 = compute_marker_hash(&snap);
assert_eq!(h1, h2);
let hex = hex32(&h1);
assert_eq!(
hex, "898bc3efdaf0edd9167a38a1c3060c9b4dc051658ea2f6132004bed78a481c47",
"marker hash wire format changed"
);
}
#[rstest]
fn hifi_hash_is_deterministic() {
let marker = baseline_hifi();
let h1 = compute_hifi_hash(&marker);
let h2 = compute_hifi_hash(&marker);
assert_eq!(h1, h2);
let hex = hex32(&h1);
assert_eq!(
hex, "06542408380d8815ef783b9dbde6b3e3ffdf05605bb17e83ad48474557457517",
"hifi hash wire format changed"
);
}
#[rstest]
fn dict_hash_is_deterministic() {
let entry = baseline_dict();
let h1 = compute_dict_hash(&entry);
let h2 = compute_dict_hash(&entry);
assert_eq!(h1, h2);
let hex = hex32(&h1);
assert_eq!(
hex, "24e702c5ae20b832ad6907676919fa18a89b79e97dde9df7e1de454191f42fda",
"dict hash wire format changed"
);
}
#[rstest]
fn gap_hash_is_deterministic() {
let gap = baseline_gap();
let h1 = compute_gap_hash(&gap);
let h2 = compute_gap_hash(&gap);
assert_eq!(h1, h2);
let hex = hex32(&h1);
assert_eq!(
hex, "ec1ae0ea813e9971155c6277e95c43de72da6f22ca1832f072aadd9b91f5a3ec",
"gap hash wire format changed"
);
}
#[rstest]
fn marker_record_bincode_roundtrip() {
let cfg = bincode::config::standard();
let snap = baseline_snapshot();
let bytes = bincode::serde::encode_to_vec(&snap, cfg).unwrap();
let (decoded, _): (DataCursorSnapshot, _) =
bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
assert_eq!(snap, decoded);
let hifi = baseline_hifi();
let bytes = bincode::serde::encode_to_vec(&hifi, cfg).unwrap();
let (decoded, _): (HiFiMarker, _) = bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
assert_eq!(hifi, decoded);
let gap = MarkerGap {
from_marker_seq: 5,
to_marker_seq: 10,
reason: MarkerGapReason::Overflow,
};
let bytes = bincode::serde::encode_to_vec(&gap, cfg).unwrap();
let (decoded, _): (MarkerGap, _) = bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
assert_eq!(gap, decoded);
let dict = StreamDictEntry {
slot: 2,
data_cls: DataClass::Bar,
identifier: "BTCUSDT-PERP.BINANCE".to_string(),
};
let bytes = bincode::serde::encode_to_vec(&dict, cfg).unwrap();
let (decoded, _): (StreamDictEntry, _) =
bincode::serde::decode_from_slice(&bytes, cfg).unwrap();
assert_eq!(dict, decoded);
}
#[rstest]
#[case::quote_lowercase("quote")]
#[case::empty("")]
#[case::trailing_s("Quotes")]
#[case::partial("BookDepth")]
fn data_class_from_str_rejects_unknown(#[case] input: &str) {
let err = DataClass::from_str(input).unwrap_err();
assert!(
err.contains(input),
"error should name the rejected input, was `{err}`"
);
}
#[rstest]
#[case::marker_seq(|s: &mut DataCursorSnapshot| s.marker_seq = 99)]
#[case::event_seq_before(|s: &mut DataCursorSnapshot| s.event_seq_before = 99)]
#[case::ts_init(|s: &mut DataCursorSnapshot| s.ts_init = UnixNanos::from(1))]
#[case::cursor_slot(|s: &mut DataCursorSnapshot| s.advanced[0].slot = 256)]
#[case::cursor_ts_init_hi(|s: &mut DataCursorSnapshot| s.advanced[0].ts_init_hi = UnixNanos::from(1))]
#[case::cursor_count(|s: &mut DataCursorSnapshot| s.advanced[0].count = 999)]
#[case::extra_cursor(|s: &mut DataCursorSnapshot| s.advanced.push(StreamCursor { slot: 2, ts_init_hi: UnixNanos::from(1_700_000_000_000_000_003), count: 1 }))]
#[case::cursor_order(|s: &mut DataCursorSnapshot| s.advanced.reverse())]
fn every_marker_field_affects_hash(#[case] mutate: fn(&mut DataCursorSnapshot)) {
let base = baseline_snapshot();
let mut mutated = base.clone();
mutate(&mut mutated);
assert_ne!(compute_marker_hash(&base), compute_marker_hash(&mutated));
}
#[rstest]
#[case::marker_seq(|m: &mut HiFiMarker| m.marker_seq = 99)]
#[case::event_seq_before(|m: &mut HiFiMarker| m.event_seq_before = 99)]
#[case::slot(|m: &mut HiFiMarker| m.slot = 256)]
#[case::ts_event(|m: &mut HiFiMarker| m.ts_event = UnixNanos::from(1))]
#[case::ts_init(|m: &mut HiFiMarker| m.ts_init = UnixNanos::from(1))]
#[case::same_ts_ordinal(|m: &mut HiFiMarker| m.same_ts_ordinal = 256)]
#[case::fingerprint(|m: &mut HiFiMarker| m.record_fingerprint[0] ^= 0x01)]
fn every_hifi_field_affects_hash(#[case] mutate: fn(&mut HiFiMarker)) {
let base = baseline_hifi();
let mut mutated = base.clone();
mutate(&mut mutated);
assert_ne!(compute_hifi_hash(&base), compute_hifi_hash(&mutated));
}
#[rstest]
#[case::slot(|e: &mut StreamDictEntry| e.slot = 99)]
#[case::data_cls(|e: &mut StreamDictEntry| e.data_cls = DataClass::Trade)]
#[case::identifier(|e: &mut StreamDictEntry| e.identifier = "BTCUSDT.BINANCE".to_string())]
fn every_dict_field_affects_hash(#[case] mutate: fn(&mut StreamDictEntry)) {
let base = baseline_dict();
let mut mutated = base.clone();
mutate(&mut mutated);
assert_ne!(compute_dict_hash(&base), compute_dict_hash(&mutated));
}
#[rstest]
#[case::from(|g: &mut MarkerGap| g.from_marker_seq = 99)]
#[case::to(|g: &mut MarkerGap| g.to_marker_seq = 99)]
#[case::reason(|g: &mut MarkerGap| g.reason = MarkerGapReason::WriterClosed)]
fn every_gap_field_affects_hash(#[case] mutate: fn(&mut MarkerGap)) {
let base = baseline_gap();
let mut mutated = base.clone();
mutate(&mut mutated);
assert_ne!(compute_gap_hash(&base), compute_gap_hash(&mutated));
}
#[rstest]
fn marker_hash_handles_empty_advanced() {
let empty = DataCursorSnapshot {
marker_seq: 1,
event_seq_before: 42,
ts_init: UnixNanos::from(1_700_000_000_000_000_000),
advanced: vec![],
};
assert_eq!(compute_marker_hash(&empty), compute_marker_hash(&empty));
assert_ne!(
compute_marker_hash(&empty),
compute_marker_hash(&baseline_snapshot())
);
}
proptest! {
#![proptest_config(ProptestConfig { cases: 64, ..ProptestConfig::default() })]
#[rstest]
fn prop_marker_snapshot_bincode_roundtrip(
marker_seq in any::<u64>(),
event_seq_before in any::<u64>(),
ts_init in any::<u64>(),
cursors in proptest::collection::vec((any::<u32>(), any::<u64>(), any::<u64>()), 0..8),
) {
let cfg = bincode::config::standard();
let snap = DataCursorSnapshot {
marker_seq,
event_seq_before,
ts_init: UnixNanos::from(ts_init),
advanced: cursors
.into_iter()
.map(|(slot, hi, count)| StreamCursor {
slot,
ts_init_hi: UnixNanos::from(hi),
count,
})
.collect(),
};
let bytes = bincode::serde::encode_to_vec(&snap, cfg).expect("encode");
let (decoded, _): (DataCursorSnapshot, _) =
bincode::serde::decode_from_slice(&bytes, cfg).expect("decode");
prop_assert_eq!(snap, decoded);
}
#[rstest]
fn prop_hifi_marker_bincode_roundtrip(
marker_seq in any::<u64>(),
event_seq_before in any::<u64>(),
slot in any::<u32>(),
ts_event in any::<u64>(),
ts_init in any::<u64>(),
same_ts_ordinal in any::<u32>(),
fingerprint in proptest::array::uniform32(any::<u8>()),
) {
let cfg = bincode::config::standard();
let marker = HiFiMarker {
marker_seq,
event_seq_before,
slot,
ts_event: UnixNanos::from(ts_event),
ts_init: UnixNanos::from(ts_init),
same_ts_ordinal,
record_fingerprint: fingerprint,
};
let bytes = bincode::serde::encode_to_vec(&marker, cfg).expect("encode");
let (decoded, _): (HiFiMarker, _) =
bincode::serde::decode_from_slice(&bytes, cfg).expect("decode");
prop_assert_eq!(marker, decoded);
}
}
}