use crate::Decimal;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Debug, Default)]
pub struct Counter(AtomicU64);
impl Counter {
#[must_use]
pub fn new() -> Self {
Self(AtomicU64::new(0))
}
#[must_use]
pub fn with_value(value: u64) -> Self {
Self(AtomicU64::new(value))
}
pub fn increment(&self) {
self.0.fetch_add(1, Ordering::Relaxed);
}
pub fn add(&self, n: u64) {
self.0.fetch_add(n, Ordering::Relaxed);
}
#[must_use]
pub fn get(&self) -> u64 {
self.0.load(Ordering::Relaxed)
}
pub fn reset(&self) {
self.0.store(0, Ordering::Relaxed);
}
}
#[derive(Debug, Default)]
pub struct Gauge(AtomicI64);
impl Gauge {
#[must_use]
pub fn new() -> Self {
Self(AtomicI64::new(0))
}
#[must_use]
pub fn with_value(value: i64) -> Self {
Self(AtomicI64::new(value))
}
pub fn set(&self, value: i64) {
self.0.store(value, Ordering::Relaxed);
}
#[must_use]
pub fn get(&self) -> i64 {
self.0.load(Ordering::Relaxed)
}
pub fn increment(&self) {
self.0.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement(&self) {
self.0.fetch_sub(1, Ordering::Relaxed);
}
pub fn add(&self, n: i64) {
self.0.fetch_add(n, Ordering::Relaxed);
}
pub fn sub(&self, n: i64) {
self.0.fetch_sub(n, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct MetricsSnapshot {
pub timestamp: u64,
pub quotes_generated: u64,
pub orders_submitted: u64,
pub orders_filled: u64,
pub orders_cancelled: u64,
pub orders_rejected: u64,
pub partial_fills: u64,
pub open_orders: i64,
pub current_position: Decimal,
pub current_pnl: Decimal,
pub unrealized_pnl: Decimal,
pub realized_pnl: Decimal,
pub fill_rate: Decimal,
pub cancel_rate: Decimal,
pub quotes_per_second: Decimal,
pub fills_per_second: Decimal,
pub uptime_ms: u64,
pub last_quote_time: u64,
pub last_fill_time: u64,
}
impl Default for MetricsSnapshot {
fn default() -> Self {
Self {
timestamp: 0,
quotes_generated: 0,
orders_submitted: 0,
orders_filled: 0,
orders_cancelled: 0,
orders_rejected: 0,
partial_fills: 0,
open_orders: 0,
current_position: Decimal::ZERO,
current_pnl: Decimal::ZERO,
unrealized_pnl: Decimal::ZERO,
realized_pnl: Decimal::ZERO,
fill_rate: Decimal::ZERO,
cancel_rate: Decimal::ZERO,
quotes_per_second: Decimal::ZERO,
fills_per_second: Decimal::ZERO,
uptime_ms: 0,
last_quote_time: 0,
last_fill_time: 0,
}
}
}
impl MetricsSnapshot {
#[must_use]
pub fn has_activity(&self) -> bool {
self.quotes_generated > 0 || self.orders_submitted > 0
}
#[must_use]
pub fn rejection_rate(&self) -> Decimal {
if self.orders_submitted == 0 {
return Decimal::ZERO;
}
Decimal::from(self.orders_rejected) / Decimal::from(self.orders_submitted)
}
#[must_use]
pub fn partial_fill_rate(&self) -> Decimal {
if self.orders_filled == 0 {
return Decimal::ZERO;
}
Decimal::from(self.partial_fills) / Decimal::from(self.orders_filled)
}
}
#[derive(Debug)]
pub struct LiveMetrics {
start_time: AtomicU64,
quotes_generated: Counter,
orders_submitted: Counter,
orders_filled: Counter,
orders_cancelled: Counter,
orders_rejected: Counter,
partial_fills: Counter,
open_orders: Gauge,
position: Arc<RwLock<Decimal>>,
realized_pnl: Arc<RwLock<Decimal>>,
unrealized_pnl: Arc<RwLock<Decimal>>,
last_quote_time: AtomicU64,
last_fill_time: AtomicU64,
}
impl LiveMetrics {
#[must_use]
pub fn new(start_time: u64) -> Self {
Self {
start_time: AtomicU64::new(start_time),
quotes_generated: Counter::new(),
orders_submitted: Counter::new(),
orders_filled: Counter::new(),
orders_cancelled: Counter::new(),
orders_rejected: Counter::new(),
partial_fills: Counter::new(),
open_orders: Gauge::new(),
position: Arc::new(RwLock::new(Decimal::ZERO)),
realized_pnl: Arc::new(RwLock::new(Decimal::ZERO)),
unrealized_pnl: Arc::new(RwLock::new(Decimal::ZERO)),
last_quote_time: AtomicU64::new(0),
last_fill_time: AtomicU64::new(0),
}
}
#[must_use]
pub fn start_time(&self) -> u64 {
self.start_time.load(Ordering::Relaxed)
}
pub fn record_quote(&self, timestamp: u64) {
self.quotes_generated.increment();
self.last_quote_time.store(timestamp, Ordering::Relaxed);
}
pub fn record_order_submitted(&self) {
self.orders_submitted.increment();
}
pub fn record_order_filled(&self, timestamp: u64) {
self.orders_filled.increment();
self.last_fill_time.store(timestamp, Ordering::Relaxed);
}
pub fn record_order_cancelled(&self) {
self.orders_cancelled.increment();
}
pub fn record_order_rejected(&self) {
self.orders_rejected.increment();
}
pub fn record_partial_fill(&self) {
self.partial_fills.increment();
}
pub fn record_quotes(&self, count: u64, timestamp: u64) {
self.quotes_generated.add(count);
self.last_quote_time.store(timestamp, Ordering::Relaxed);
}
pub fn set_open_orders(&self, count: i64) {
self.open_orders.set(count);
}
pub fn increment_open_orders(&self) {
self.open_orders.increment();
}
pub fn decrement_open_orders(&self) {
self.open_orders.decrement();
}
#[must_use]
pub fn get_open_orders(&self) -> i64 {
self.open_orders.get()
}
pub fn update_position(&self, position: Decimal) {
if let Ok(mut pos) = self.position.write() {
*pos = position;
}
}
pub fn update_pnl(&self, realized: Decimal, unrealized: Decimal) {
if let Ok(mut rpnl) = self.realized_pnl.write() {
*rpnl = realized;
}
if let Ok(mut upnl) = self.unrealized_pnl.write() {
*upnl = unrealized;
}
}
pub fn update_realized_pnl(&self, realized: Decimal) {
if let Ok(mut rpnl) = self.realized_pnl.write() {
*rpnl = realized;
}
}
pub fn update_unrealized_pnl(&self, unrealized: Decimal) {
if let Ok(mut upnl) = self.unrealized_pnl.write() {
*upnl = unrealized;
}
}
pub fn add_realized_pnl(&self, amount: Decimal) {
if let Ok(mut rpnl) = self.realized_pnl.write() {
*rpnl += amount;
}
}
#[must_use]
pub fn get_position(&self) -> Decimal {
self.position.read().map(|p| *p).unwrap_or(Decimal::ZERO)
}
#[must_use]
pub fn get_realized_pnl(&self) -> Decimal {
self.realized_pnl
.read()
.map(|p| *p)
.unwrap_or(Decimal::ZERO)
}
#[must_use]
pub fn get_unrealized_pnl(&self) -> Decimal {
self.unrealized_pnl
.read()
.map(|p| *p)
.unwrap_or(Decimal::ZERO)
}
#[must_use]
pub fn snapshot(&self, current_time: u64) -> MetricsSnapshot {
let start = self.start_time.load(Ordering::Relaxed);
let uptime_ms = current_time.saturating_sub(start);
let quotes_generated = self.quotes_generated.get();
let orders_submitted = self.orders_submitted.get();
let orders_filled = self.orders_filled.get();
let orders_cancelled = self.orders_cancelled.get();
let fill_rate = if quotes_generated > 0 {
Decimal::from(orders_filled) / Decimal::from(quotes_generated)
} else {
Decimal::ZERO
};
let cancel_rate = if orders_submitted > 0 {
Decimal::from(orders_cancelled) / Decimal::from(orders_submitted)
} else {
Decimal::ZERO
};
let (quotes_per_second, fills_per_second) = if uptime_ms > 0 {
let seconds = Decimal::from(uptime_ms) / Decimal::from(1000u64);
(
Decimal::from(quotes_generated) / seconds,
Decimal::from(orders_filled) / seconds,
)
} else {
(Decimal::ZERO, Decimal::ZERO)
};
let position = self.get_position();
let realized = self.get_realized_pnl();
let unrealized = self.get_unrealized_pnl();
MetricsSnapshot {
timestamp: current_time,
quotes_generated,
orders_submitted,
orders_filled,
orders_cancelled,
orders_rejected: self.orders_rejected.get(),
partial_fills: self.partial_fills.get(),
open_orders: self.open_orders.get(),
current_position: position,
current_pnl: realized + unrealized,
unrealized_pnl: unrealized,
realized_pnl: realized,
fill_rate,
cancel_rate,
quotes_per_second,
fills_per_second,
uptime_ms,
last_quote_time: self.last_quote_time.load(Ordering::Relaxed),
last_fill_time: self.last_fill_time.load(Ordering::Relaxed),
}
}
pub fn reset(&self, new_start_time: u64) {
self.start_time.store(new_start_time, Ordering::Relaxed);
self.quotes_generated.reset();
self.orders_submitted.reset();
self.orders_filled.reset();
self.orders_cancelled.reset();
self.orders_rejected.reset();
self.partial_fills.reset();
self.open_orders.set(0);
if let Ok(mut pos) = self.position.write() {
*pos = Decimal::ZERO;
}
if let Ok(mut rpnl) = self.realized_pnl.write() {
*rpnl = Decimal::ZERO;
}
if let Ok(mut upnl) = self.unrealized_pnl.write() {
*upnl = Decimal::ZERO;
}
self.last_quote_time.store(0, Ordering::Relaxed);
self.last_fill_time.store(0, Ordering::Relaxed);
}
#[must_use]
pub fn total_quotes(&self) -> u64 {
self.quotes_generated.get()
}
#[must_use]
pub fn total_orders_submitted(&self) -> u64 {
self.orders_submitted.get()
}
#[must_use]
pub fn total_orders_filled(&self) -> u64 {
self.orders_filled.get()
}
#[must_use]
pub fn total_orders_cancelled(&self) -> u64 {
self.orders_cancelled.get()
}
#[must_use]
pub fn total_orders_rejected(&self) -> u64 {
self.orders_rejected.get()
}
#[must_use]
pub fn total_partial_fills(&self) -> u64 {
self.partial_fills.get()
}
}
impl Default for LiveMetrics {
fn default() -> Self {
Self::new(0)
}
}
pub type SharedLiveMetrics = Arc<LiveMetrics>;
#[cfg(test)]
mod tests {
use super::*;
use crate::dec;
use std::thread;
#[test]
fn test_counter_new() {
let counter = Counter::new();
assert_eq!(counter.get(), 0);
}
#[test]
fn test_counter_with_value() {
let counter = Counter::with_value(10);
assert_eq!(counter.get(), 10);
}
#[test]
fn test_counter_increment() {
let counter = Counter::new();
counter.increment();
counter.increment();
assert_eq!(counter.get(), 2);
}
#[test]
fn test_counter_add() {
let counter = Counter::new();
counter.add(5);
counter.add(3);
assert_eq!(counter.get(), 8);
}
#[test]
fn test_counter_reset() {
let counter = Counter::new();
counter.add(10);
counter.reset();
assert_eq!(counter.get(), 0);
}
#[test]
fn test_gauge_new() {
let gauge = Gauge::new();
assert_eq!(gauge.get(), 0);
}
#[test]
fn test_gauge_with_value() {
let gauge = Gauge::with_value(10);
assert_eq!(gauge.get(), 10);
}
#[test]
fn test_gauge_set() {
let gauge = Gauge::new();
gauge.set(42);
assert_eq!(gauge.get(), 42);
}
#[test]
fn test_gauge_increment_decrement() {
let gauge = Gauge::new();
gauge.increment();
gauge.increment();
assert_eq!(gauge.get(), 2);
gauge.decrement();
assert_eq!(gauge.get(), 1);
}
#[test]
fn test_gauge_add_sub() {
let gauge = Gauge::new();
gauge.add(10);
assert_eq!(gauge.get(), 10);
gauge.sub(3);
assert_eq!(gauge.get(), 7);
}
#[test]
fn test_gauge_negative() {
let gauge = Gauge::new();
gauge.decrement();
assert_eq!(gauge.get(), -1);
}
#[test]
fn test_metrics_snapshot_default() {
let snapshot = MetricsSnapshot::default();
assert_eq!(snapshot.timestamp, 0);
assert_eq!(snapshot.quotes_generated, 0);
assert!(!snapshot.has_activity());
}
#[test]
fn test_metrics_snapshot_has_activity() {
let snapshot = MetricsSnapshot {
quotes_generated: 1,
..Default::default()
};
assert!(snapshot.has_activity());
}
#[test]
fn test_metrics_snapshot_rejection_rate() {
let snapshot = MetricsSnapshot {
orders_submitted: 100,
orders_rejected: 5,
..Default::default()
};
assert_eq!(snapshot.rejection_rate(), dec!(0.05));
}
#[test]
fn test_metrics_snapshot_partial_fill_rate() {
let snapshot = MetricsSnapshot {
orders_filled: 100,
partial_fills: 20,
..Default::default()
};
assert_eq!(snapshot.partial_fill_rate(), dec!(0.2));
}
#[test]
fn test_live_metrics_new() {
let metrics = LiveMetrics::new(1000);
assert_eq!(metrics.start_time(), 1000);
assert_eq!(metrics.total_quotes(), 0);
}
#[test]
fn test_live_metrics_record_quote() {
let metrics = LiveMetrics::new(1000);
metrics.record_quote(1001);
metrics.record_quote(1002);
assert_eq!(metrics.total_quotes(), 2);
}
#[test]
fn test_live_metrics_record_orders() {
let metrics = LiveMetrics::new(1000);
metrics.record_order_submitted();
metrics.record_order_submitted();
metrics.record_order_filled(1001);
metrics.record_order_cancelled();
metrics.record_order_rejected();
assert_eq!(metrics.total_orders_submitted(), 2);
assert_eq!(metrics.total_orders_filled(), 1);
assert_eq!(metrics.total_orders_cancelled(), 1);
assert_eq!(metrics.total_orders_rejected(), 1);
}
#[test]
fn test_live_metrics_open_orders() {
let metrics = LiveMetrics::new(1000);
metrics.increment_open_orders();
metrics.increment_open_orders();
assert_eq!(metrics.get_open_orders(), 2);
metrics.decrement_open_orders();
assert_eq!(metrics.get_open_orders(), 1);
metrics.set_open_orders(10);
assert_eq!(metrics.get_open_orders(), 10);
}
#[test]
fn test_live_metrics_position() {
let metrics = LiveMetrics::new(1000);
metrics.update_position(dec!(100.5));
assert_eq!(metrics.get_position(), dec!(100.5));
}
#[test]
fn test_live_metrics_pnl() {
let metrics = LiveMetrics::new(1000);
metrics.update_pnl(dec!(100.0), dec!(50.0));
assert_eq!(metrics.get_realized_pnl(), dec!(100.0));
assert_eq!(metrics.get_unrealized_pnl(), dec!(50.0));
}
#[test]
fn test_live_metrics_add_realized_pnl() {
let metrics = LiveMetrics::new(1000);
metrics.add_realized_pnl(dec!(50.0));
metrics.add_realized_pnl(dec!(30.0));
assert_eq!(metrics.get_realized_pnl(), dec!(80.0));
}
#[test]
fn test_live_metrics_snapshot() {
let metrics = LiveMetrics::new(1000);
metrics.record_quote(1001);
metrics.record_quote(1002);
metrics.record_order_submitted();
metrics.record_order_filled(1003);
metrics.update_position(dec!(10.0));
metrics.update_pnl(dec!(100.0), dec!(50.0));
let snapshot = metrics.snapshot(2000);
assert_eq!(snapshot.timestamp, 2000);
assert_eq!(snapshot.quotes_generated, 2);
assert_eq!(snapshot.orders_submitted, 1);
assert_eq!(snapshot.orders_filled, 1);
assert_eq!(snapshot.current_position, dec!(10.0));
assert_eq!(snapshot.realized_pnl, dec!(100.0));
assert_eq!(snapshot.unrealized_pnl, dec!(50.0));
assert_eq!(snapshot.current_pnl, dec!(150.0));
assert_eq!(snapshot.uptime_ms, 1000);
assert_eq!(snapshot.last_quote_time, 1002);
assert_eq!(snapshot.last_fill_time, 1003);
}
#[test]
fn test_live_metrics_snapshot_rates() {
let metrics = LiveMetrics::new(0);
metrics.record_quotes(10, 10000);
for _ in 0..5 {
metrics.record_order_filled(10000);
}
let snapshot = metrics.snapshot(10000);
assert_eq!(snapshot.fill_rate, dec!(0.5));
assert_eq!(snapshot.quotes_per_second, dec!(1.0));
assert_eq!(snapshot.fills_per_second, dec!(0.5));
}
#[test]
fn test_live_metrics_reset() {
let metrics = LiveMetrics::new(1000);
metrics.record_quote(1001);
metrics.record_order_submitted();
metrics.update_position(dec!(100.0));
metrics.reset(2000);
assert_eq!(metrics.start_time(), 2000);
assert_eq!(metrics.total_quotes(), 0);
assert_eq!(metrics.total_orders_submitted(), 0);
assert_eq!(metrics.get_position(), Decimal::ZERO);
}
#[test]
fn test_live_metrics_thread_safety() {
let metrics = Arc::new(LiveMetrics::new(0));
let mut handles = vec![];
for _ in 0..10 {
let m = Arc::clone(&metrics);
handles.push(thread::spawn(move || {
for _ in 0..100 {
m.record_quote(1);
m.record_order_submitted();
}
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(metrics.total_quotes(), 1000);
assert_eq!(metrics.total_orders_submitted(), 1000);
}
#[test]
fn test_live_metrics_concurrent_pnl_updates() {
let metrics = Arc::new(LiveMetrics::new(0));
let mut handles = vec![];
for i in 0..10 {
let m = Arc::clone(&metrics);
handles.push(thread::spawn(move || {
m.add_realized_pnl(Decimal::from(i + 1));
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(metrics.get_realized_pnl(), dec!(55));
}
#[test]
fn test_shared_live_metrics() {
let metrics: SharedLiveMetrics = Arc::new(LiveMetrics::new(0));
metrics.record_quote(1);
assert_eq!(metrics.total_quotes(), 1);
}
#[cfg(feature = "serde")]
#[test]
fn test_metrics_snapshot_serialization() {
let snapshot = MetricsSnapshot {
timestamp: 1000,
quotes_generated: 100,
orders_filled: 50,
current_pnl: dec!(1000.0),
..Default::default()
};
let json = serde_json::to_string(&snapshot).unwrap();
let deserialized: MetricsSnapshot = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.timestamp, 1000);
assert_eq!(deserialized.quotes_generated, 100);
assert_eq!(deserialized.orders_filled, 50);
}
}