use std::collections::VecDeque;
use crate::Decimal;
use crate::types::error::{MMError, MMResult};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum TradeSide {
Buy,
Sell,
}
impl TradeSide {
#[must_use]
pub fn is_buy(&self) -> bool {
matches!(self, TradeSide::Buy)
}
#[must_use]
pub fn is_sell(&self) -> bool {
matches!(self, TradeSide::Sell)
}
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Trade {
pub price: Decimal,
pub size: Decimal,
pub side: TradeSide,
pub timestamp: u64,
}
impl Trade {
#[must_use]
pub fn new(price: Decimal, size: Decimal, side: TradeSide, timestamp: u64) -> Self {
Self {
price,
size,
side,
timestamp,
}
}
#[must_use]
pub fn notional(&self) -> Decimal {
self.price * self.size
}
#[must_use]
pub fn is_buy(&self) -> bool {
self.side.is_buy()
}
#[must_use]
pub fn is_sell(&self) -> bool {
self.side.is_sell()
}
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct OrderFlowStats {
pub buy_volume: Decimal,
pub sell_volume: Decimal,
pub buy_count: u64,
pub sell_count: u64,
pub imbalance: Decimal,
pub net_flow: Decimal,
pub buy_vwap: Option<Decimal>,
pub sell_vwap: Option<Decimal>,
pub total_notional: Decimal,
pub window_start: u64,
pub window_end: u64,
}
impl OrderFlowStats {
#[must_use]
pub fn total_volume(&self) -> Decimal {
self.buy_volume + self.sell_volume
}
#[must_use]
pub fn total_count(&self) -> u64 {
self.buy_count + self.sell_count
}
#[must_use]
pub fn is_bullish(&self) -> bool {
self.imbalance > Decimal::ZERO
}
#[must_use]
pub fn is_bearish(&self) -> bool {
self.imbalance < Decimal::ZERO
}
#[must_use]
pub fn volume_ratio(&self) -> Option<Decimal> {
if self.sell_volume > Decimal::ZERO {
Some(self.buy_volume / self.sell_volume)
} else {
None
}
}
#[must_use]
pub fn window_duration_ms(&self) -> u64 {
self.window_end.saturating_sub(self.window_start)
}
}
impl Default for OrderFlowStats {
fn default() -> Self {
Self {
buy_volume: Decimal::ZERO,
sell_volume: Decimal::ZERO,
buy_count: 0,
sell_count: 0,
imbalance: Decimal::ZERO,
net_flow: Decimal::ZERO,
buy_vwap: None,
sell_vwap: None,
total_notional: Decimal::ZERO,
window_start: 0,
window_end: 0,
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct OrderFlowAnalyzer {
window_ms: u64,
trades: VecDeque<Trade>,
max_trades: usize,
}
impl OrderFlowAnalyzer {
pub const DEFAULT_MAX_TRADES: usize = 10_000;
#[must_use]
pub fn new(window_ms: u64) -> Self {
Self {
window_ms,
trades: VecDeque::new(),
max_trades: Self::DEFAULT_MAX_TRADES,
}
}
#[must_use]
pub fn with_max_trades(window_ms: u64, max_trades: usize) -> Self {
Self {
window_ms,
trades: VecDeque::new(),
max_trades,
}
}
#[must_use]
pub fn window_ms(&self) -> u64 {
self.window_ms
}
#[must_use]
pub fn trade_count(&self) -> usize {
self.trades.len()
}
pub fn add_trade(&mut self, trade: Trade) {
self.trades.push_back(trade);
while self.trades.len() > self.max_trades {
self.trades.pop_front();
}
}
pub fn add_trade_components(
&mut self,
price: Decimal,
size: Decimal,
side: TradeSide,
timestamp: u64,
) {
self.add_trade(Trade::new(price, size, side, timestamp));
}
#[must_use]
pub fn get_stats(&self, current_time: u64) -> OrderFlowStats {
let window_start = current_time.saturating_sub(self.window_ms);
let mut buy_volume = Decimal::ZERO;
let mut sell_volume = Decimal::ZERO;
let mut buy_count = 0u64;
let mut sell_count = 0u64;
let mut buy_notional = Decimal::ZERO;
let mut sell_notional = Decimal::ZERO;
for trade in &self.trades {
if trade.timestamp >= window_start && trade.timestamp <= current_time {
let notional = trade.notional();
match trade.side {
TradeSide::Buy => {
buy_volume += trade.size;
buy_notional += notional;
buy_count += 1;
}
TradeSide::Sell => {
sell_volume += trade.size;
sell_notional += notional;
sell_count += 1;
}
}
}
}
let total_volume = buy_volume + sell_volume;
let net_flow = buy_volume - sell_volume;
let imbalance = if total_volume > Decimal::ZERO {
net_flow / total_volume
} else {
Decimal::ZERO
};
let buy_vwap = if buy_volume > Decimal::ZERO {
Some(buy_notional / buy_volume)
} else {
None
};
let sell_vwap = if sell_volume > Decimal::ZERO {
Some(sell_notional / sell_volume)
} else {
None
};
OrderFlowStats {
buy_volume,
sell_volume,
buy_count,
sell_count,
imbalance,
net_flow,
buy_vwap,
sell_vwap,
total_notional: buy_notional + sell_notional,
window_start,
window_end: current_time,
}
}
#[must_use]
pub fn get_imbalance(&self, current_time: u64) -> Decimal {
self.get_stats(current_time).imbalance
}
#[must_use]
pub fn is_bullish(&self, threshold: Decimal, current_time: u64) -> bool {
self.get_imbalance(current_time) >= threshold
}
#[must_use]
pub fn is_bearish(&self, threshold: Decimal, current_time: u64) -> bool {
self.get_imbalance(current_time) <= threshold
}
pub fn cleanup(&mut self, current_time: u64) {
let window_start = current_time.saturating_sub(self.window_ms);
while let Some(trade) = self.trades.front() {
if trade.timestamp < window_start {
self.trades.pop_front();
} else {
break;
}
}
}
#[must_use]
pub fn trade_intensity(&self, current_time: u64) -> Decimal {
let stats = self.get_stats(current_time);
let window_seconds = Decimal::from(self.window_ms) / Decimal::from(1000);
if window_seconds > Decimal::ZERO {
Decimal::from(stats.total_count()) / window_seconds
} else {
Decimal::ZERO
}
}
#[must_use]
pub fn volume_intensity(&self, current_time: u64) -> Decimal {
let stats = self.get_stats(current_time);
let window_seconds = Decimal::from(self.window_ms) / Decimal::from(1000);
if window_seconds > Decimal::ZERO {
stats.total_volume() / window_seconds
} else {
Decimal::ZERO
}
}
pub fn clear(&mut self) {
self.trades.clear();
}
#[must_use]
pub fn last_trade(&self) -> Option<&Trade> {
self.trades.back()
}
#[must_use]
pub fn first_trade(&self) -> Option<&Trade> {
self.trades.front()
}
}
#[derive(Debug, Clone)]
pub struct OrderFlowAnalyzerBuilder {
window_ms: u64,
max_trades: usize,
}
impl OrderFlowAnalyzerBuilder {
#[must_use]
pub fn new() -> Self {
Self {
window_ms: 5000,
max_trades: OrderFlowAnalyzer::DEFAULT_MAX_TRADES,
}
}
#[must_use]
pub fn window_ms(mut self, window_ms: u64) -> Self {
self.window_ms = window_ms;
self
}
#[must_use]
pub fn max_trades(mut self, max_trades: usize) -> Self {
self.max_trades = max_trades;
self
}
pub fn build(self) -> MMResult<OrderFlowAnalyzer> {
if self.window_ms == 0 {
return Err(MMError::InvalidConfiguration(
"window_ms must be positive".to_string(),
));
}
if self.max_trades == 0 {
return Err(MMError::InvalidConfiguration(
"max_trades must be positive".to_string(),
));
}
Ok(OrderFlowAnalyzer {
window_ms: self.window_ms,
trades: VecDeque::new(),
max_trades: self.max_trades,
})
}
}
impl Default for OrderFlowAnalyzerBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dec;
#[test]
fn test_trade_creation() {
let trade = Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000);
assert_eq!(trade.price, dec!(100.0));
assert_eq!(trade.size, dec!(10.0));
assert_eq!(trade.side, TradeSide::Buy);
assert_eq!(trade.timestamp, 1000);
assert_eq!(trade.notional(), dec!(1000.0));
assert!(trade.is_buy());
assert!(!trade.is_sell());
}
#[test]
fn test_trade_side() {
assert!(TradeSide::Buy.is_buy());
assert!(!TradeSide::Buy.is_sell());
assert!(!TradeSide::Sell.is_buy());
assert!(TradeSide::Sell.is_sell());
}
#[test]
fn test_empty_analyzer() {
let analyzer = OrderFlowAnalyzer::new(5000);
let stats = analyzer.get_stats(10_000);
assert_eq!(stats.buy_volume, Decimal::ZERO);
assert_eq!(stats.sell_volume, Decimal::ZERO);
assert_eq!(stats.buy_count, 0);
assert_eq!(stats.sell_count, 0);
assert_eq!(stats.imbalance, Decimal::ZERO);
assert_eq!(stats.net_flow, Decimal::ZERO);
assert!(stats.buy_vwap.is_none());
assert!(stats.sell_vwap.is_none());
}
#[test]
fn test_single_buy_trade() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
let stats = analyzer.get_stats(3000);
assert_eq!(stats.buy_volume, dec!(10.0));
assert_eq!(stats.sell_volume, Decimal::ZERO);
assert_eq!(stats.buy_count, 1);
assert_eq!(stats.sell_count, 0);
assert_eq!(stats.imbalance, Decimal::ONE);
assert_eq!(stats.net_flow, dec!(10.0));
assert_eq!(stats.buy_vwap, Some(dec!(100.0)));
assert!(stats.sell_vwap.is_none());
}
#[test]
fn test_single_sell_trade() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Sell, 1000));
let stats = analyzer.get_stats(3000);
assert_eq!(stats.buy_volume, Decimal::ZERO);
assert_eq!(stats.sell_volume, dec!(10.0));
assert_eq!(stats.imbalance, -Decimal::ONE);
assert_eq!(stats.net_flow, dec!(-10.0));
assert!(stats.buy_vwap.is_none());
assert_eq!(stats.sell_vwap, Some(dec!(100.0)));
}
#[test]
fn test_balanced_flow() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Sell, 2000));
let stats = analyzer.get_stats(3000);
assert_eq!(stats.buy_volume, dec!(10.0));
assert_eq!(stats.sell_volume, dec!(10.0));
assert_eq!(stats.imbalance, Decimal::ZERO);
assert_eq!(stats.net_flow, Decimal::ZERO);
}
#[test]
fn test_imbalanced_flow_bullish() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(15.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(5.0), TradeSide::Sell, 2000));
let stats = analyzer.get_stats(3000);
assert_eq!(stats.imbalance, dec!(0.5));
assert_eq!(stats.net_flow, dec!(10.0));
assert!(stats.is_bullish());
assert!(!stats.is_bearish());
}
#[test]
fn test_imbalanced_flow_bearish() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(5.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(15.0), TradeSide::Sell, 2000));
let stats = analyzer.get_stats(3000);
assert_eq!(stats.imbalance, dec!(-0.5));
assert_eq!(stats.net_flow, dec!(-10.0));
assert!(!stats.is_bullish());
assert!(stats.is_bearish());
}
#[test]
fn test_window_expiration() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(5.0), TradeSide::Sell, 8000));
let stats = analyzer.get_stats(10_000);
assert_eq!(stats.buy_volume, Decimal::ZERO);
assert_eq!(stats.sell_volume, dec!(5.0));
assert_eq!(stats.buy_count, 0);
assert_eq!(stats.sell_count, 1);
}
#[test]
fn test_vwap_calculation() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(102.0), dec!(20.0), TradeSide::Buy, 2000));
analyzer.add_trade(Trade::new(dec!(99.0), dec!(5.0), TradeSide::Sell, 3000));
analyzer.add_trade(Trade::new(dec!(101.0), dec!(15.0), TradeSide::Sell, 4000));
let stats = analyzer.get_stats(5000);
let expected_buy_vwap = dec!(3040) / dec!(30);
assert_eq!(stats.buy_vwap, Some(expected_buy_vwap));
assert_eq!(stats.sell_vwap, Some(dec!(100.5)));
}
#[test]
fn test_cleanup() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 2000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 8000));
assert_eq!(analyzer.trade_count(), 3);
analyzer.cleanup(10_000);
assert_eq!(analyzer.trade_count(), 1);
}
#[test]
fn test_is_bullish_bearish() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(20.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(5.0), TradeSide::Sell, 2000));
assert!(analyzer.is_bullish(dec!(0.5), 3000));
assert!(!analyzer.is_bullish(dec!(0.7), 3000));
assert!(!analyzer.is_bearish(dec!(-0.5), 3000));
}
#[test]
fn test_trade_intensity() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 2000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Sell, 3000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Sell, 4000));
let intensity = analyzer.trade_intensity(5000);
assert_eq!(intensity, dec!(0.8));
}
#[test]
fn test_volume_intensity() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(15.0), TradeSide::Sell, 2000));
let intensity = analyzer.volume_intensity(5000);
assert_eq!(intensity, dec!(5));
}
#[test]
fn test_add_trade_components() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade_components(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000);
let stats = analyzer.get_stats(2000);
assert_eq!(stats.buy_volume, dec!(10.0));
}
#[test]
fn test_max_trades_limit() {
let mut analyzer = OrderFlowAnalyzer::with_max_trades(5000, 3);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(1.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(2.0), TradeSide::Buy, 2000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(3.0), TradeSide::Buy, 3000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(4.0), TradeSide::Buy, 4000));
assert_eq!(analyzer.trade_count(), 3);
let stats = analyzer.get_stats(5000);
assert_eq!(stats.buy_volume, dec!(9.0)); }
#[test]
fn test_builder_valid() {
let analyzer = OrderFlowAnalyzerBuilder::new()
.window_ms(10_000)
.max_trades(5000)
.build();
assert!(analyzer.is_ok());
let analyzer = analyzer.unwrap();
assert_eq!(analyzer.window_ms(), 10_000);
}
#[test]
fn test_builder_invalid_window() {
let result = OrderFlowAnalyzerBuilder::new().window_ms(0).build();
assert!(result.is_err());
}
#[test]
fn test_builder_invalid_max_trades() {
let result = OrderFlowAnalyzerBuilder::new().max_trades(0).build();
assert!(result.is_err());
}
#[test]
fn test_order_flow_stats_helpers() {
let stats = OrderFlowStats {
buy_volume: dec!(100.0),
sell_volume: dec!(50.0),
buy_count: 10,
sell_count: 5,
imbalance: dec!(0.333),
net_flow: dec!(50.0),
buy_vwap: Some(dec!(100.0)),
sell_vwap: Some(dec!(99.0)),
total_notional: dec!(15000.0),
window_start: 1000,
window_end: 6000,
};
assert_eq!(stats.total_volume(), dec!(150.0));
assert_eq!(stats.total_count(), 15);
assert!(stats.is_bullish());
assert!(!stats.is_bearish());
assert_eq!(stats.volume_ratio(), Some(dec!(2.0)));
assert_eq!(stats.window_duration_ms(), 5000);
}
#[test]
fn test_last_first_trade() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
assert!(analyzer.last_trade().is_none());
assert!(analyzer.first_trade().is_none());
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(101.0), dec!(20.0), TradeSide::Sell, 2000));
assert_eq!(analyzer.first_trade().unwrap().price, dec!(100.0));
assert_eq!(analyzer.last_trade().unwrap().price, dec!(101.0));
}
#[test]
fn test_clear() {
let mut analyzer = OrderFlowAnalyzer::new(5000);
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000));
analyzer.add_trade(Trade::new(dec!(100.0), dec!(10.0), TradeSide::Sell, 2000));
assert_eq!(analyzer.trade_count(), 2);
analyzer.clear();
assert_eq!(analyzer.trade_count(), 0);
}
#[cfg(feature = "serde")]
#[test]
fn test_serialization() {
let trade = Trade::new(dec!(100.0), dec!(10.0), TradeSide::Buy, 1000);
let json = serde_json::to_string(&trade).unwrap();
let deserialized: Trade = serde_json::from_str(&json).unwrap();
assert_eq!(trade, deserialized);
}
}