use std::fmt;
use std::ops::BitOr;
use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SymbolId {
pub venue: String,
pub symbol: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Side {
Bid,
Ask,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BookAction {
Upsert,
Delete,
}
#[derive(Debug, Clone)]
pub struct BookUpdate {
pub symbol: SymbolId,
pub side: Side,
pub level: u16,
pub price: i64,
pub size: i64,
pub action: BookAction,
pub sequence: u64,
pub ts_exchange_ns: u64,
pub ts_recv_ns: u64,
}
#[derive(Debug, Clone)]
pub struct TradePrint {
pub symbol: SymbolId,
pub price: i64,
pub size: i64,
pub aggressor_side: Side,
pub sequence: u64,
pub ts_exchange_ns: u64,
pub ts_recv_ns: u64,
}
#[derive(Debug, Clone, Default)]
pub struct AnalyticsSnapshot {
pub delta: i64,
pub cumulative_delta: i64,
pub buy_volume: i64,
pub sell_volume: i64,
pub last_price: i64,
pub point_of_control: i64,
pub value_area_low: i64,
pub value_area_high: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SignalState {
Neutral,
LongBias,
ShortBias,
Blocked,
}
#[derive(Debug, Clone)]
pub struct SignalSnapshot {
pub module_id: &'static str,
pub state: SignalState,
pub confidence_bps: u16,
pub quality_flags: u32,
pub reason: String,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct DataQualityFlags(u32);
impl DataQualityFlags {
pub const NONE: Self = Self(0);
pub const STALE_FEED: Self = Self(1 << 0);
pub const SEQUENCE_GAP: Self = Self(1 << 1);
pub const CLOCK_SKEW: Self = Self(1 << 2);
pub const DEPTH_TRUNCATED: Self = Self(1 << 3);
pub const OUT_OF_ORDER: Self = Self(1 << 4);
pub const ADAPTER_DEGRADED: Self = Self(1 << 5);
pub fn bits(self) -> u32 {
self.0
}
pub fn from_bits_truncate(bits: u32) -> Self {
Self(bits)
}
pub fn intersects(self, other: Self) -> bool {
(self.0 & other.0) != 0
}
}
impl fmt::Debug for DataQualityFlags {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DataQualityFlags({:#x})", self.0)
}
}
impl BitOr for DataQualityFlags {
type Output = Self;
fn bitor(self, rhs: Self) -> Self::Output {
Self(self.0 | rhs.0)
}
}
#[derive(Debug, Default)]
pub struct AnalyticsAccumulator {
snapshot: AnalyticsSnapshot,
volume_profile: HashMap<i64, i64>,
}
impl AnalyticsAccumulator {
pub fn on_trade(&mut self, trade: &TradePrint) {
self.snapshot.last_price = trade.price;
*self.volume_profile.entry(trade.price).or_insert(0) += trade.size;
match trade.aggressor_side {
Side::Bid => {
self.snapshot.sell_volume += trade.size;
self.snapshot.delta -= trade.size;
self.snapshot.cumulative_delta -= trade.size;
}
Side::Ask => {
self.snapshot.buy_volume += trade.size;
self.snapshot.delta += trade.size;
self.snapshot.cumulative_delta += trade.size;
}
}
self.recompute_profile_levels();
}
pub fn reset_session_delta(&mut self) {
self.snapshot.delta = 0;
self.snapshot.buy_volume = 0;
self.snapshot.sell_volume = 0;
}
pub fn reset_session(&mut self) {
self.snapshot = AnalyticsSnapshot::default();
self.volume_profile.clear();
}
pub fn snapshot(&self) -> AnalyticsSnapshot {
self.snapshot.clone()
}
fn recompute_profile_levels(&mut self) {
if self.volume_profile.is_empty() {
return;
}
let mut prices: Vec<i64> = self.volume_profile.keys().copied().collect();
prices.sort_unstable();
let total_volume: i64 = self.volume_profile.values().sum();
if total_volume <= 0 {
return;
}
let mut poc_price = prices[0];
let mut poc_volume = self.volume_profile[&poc_price];
for p in &prices {
let v = self.volume_profile[p];
if v > poc_volume || (v == poc_volume && *p > poc_price) {
poc_price = *p;
poc_volume = v;
}
}
self.snapshot.point_of_control = poc_price;
let target = ((total_volume as f64) * 0.70).ceil() as i64;
let mut covered = poc_volume;
let mut low = poc_price;
let mut high = poc_price;
let poc_idx = prices.iter().position(|p| *p == poc_price).unwrap_or(0);
let mut left: isize = poc_idx as isize - 1;
let mut right: usize = poc_idx + 1;
while covered < target && (left >= 0 || right < prices.len()) {
let left_vol = if left >= 0 {
self.volume_profile[&prices[left as usize]]
} else {
-1
};
let right_vol = if right < prices.len() {
self.volume_profile[&prices[right]]
} else {
-1
};
if right_vol > left_vol {
covered += right_vol.max(0);
high = prices[right];
right += 1;
} else {
covered += left_vol.max(0);
low = prices[left as usize];
left -= 1;
}
}
self.snapshot.value_area_low = low;
self.snapshot.value_area_high = high;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn symbol() -> SymbolId {
SymbolId {
venue: "CME".to_string(),
symbol: "ESM6".to_string(),
}
}
#[test]
fn tracks_delta_and_cumulative_delta() {
let mut acc = AnalyticsAccumulator::default();
acc.on_trade(&TradePrint {
symbol: symbol(),
price: 100,
size: 5,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 0,
ts_recv_ns: 0,
});
acc.on_trade(&TradePrint {
symbol: symbol(),
price: 99,
size: 2,
aggressor_side: Side::Bid,
sequence: 2,
ts_exchange_ns: 0,
ts_recv_ns: 0,
});
let snap = acc.snapshot();
assert_eq!(snap.delta, 3);
assert_eq!(snap.cumulative_delta, 3);
assert_eq!(snap.buy_volume, 5);
assert_eq!(snap.sell_volume, 2);
assert_eq!(snap.last_price, 99);
assert_eq!(snap.point_of_control, 100);
assert_eq!(snap.value_area_low, 100);
assert_eq!(snap.value_area_high, 100);
acc.reset_session_delta();
let reset = acc.snapshot();
assert_eq!(reset.delta, 0);
assert_eq!(reset.buy_volume, 0);
assert_eq!(reset.sell_volume, 0);
assert_eq!(reset.cumulative_delta, 3);
}
#[test]
fn tracks_poc_and_value_area() {
let mut acc = AnalyticsAccumulator::default();
let s = symbol();
let prints = [
(100, 5, Side::Ask),
(101, 7, Side::Ask),
(99, 3, Side::Bid),
(102, 2, Side::Ask),
(101, 5, Side::Bid),
];
for (i, (price, size, side)) in prints.iter().enumerate() {
acc.on_trade(&TradePrint {
symbol: s.clone(),
price: *price,
size: *size,
aggressor_side: *side,
sequence: i as u64 + 1,
ts_exchange_ns: 0,
ts_recv_ns: 0,
});
}
let snap = acc.snapshot();
assert_eq!(snap.point_of_control, 101);
assert!(snap.value_area_low <= snap.point_of_control);
assert!(snap.value_area_high >= snap.point_of_control);
}
#[test]
fn full_session_reset_clears_profile_and_cumulative() {
let mut acc = AnalyticsAccumulator::default();
acc.on_trade(&TradePrint {
symbol: symbol(),
price: 101,
size: 4,
aggressor_side: Side::Ask,
sequence: 1,
ts_exchange_ns: 0,
ts_recv_ns: 0,
});
acc.reset_session();
let snap = acc.snapshot();
assert_eq!(snap.delta, 0);
assert_eq!(snap.cumulative_delta, 0);
assert_eq!(snap.buy_volume, 0);
assert_eq!(snap.sell_volume, 0);
assert_eq!(snap.point_of_control, 0);
assert_eq!(snap.value_area_low, 0);
assert_eq!(snap.value_area_high, 0);
}
}