use std::path::PathBuf;
use std::{
cmp::Ordering,
collections::BTreeMap,
env,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering},
Arc, OnceLock,
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use anyhow::{anyhow, Context, Result};
use chrono::{Datelike, Utc};
use dashmap::DashMap;
use dotenvy::dotenv;
use futures_util::StreamExt;
use jsonl_logger::jsonl_logger::ModuleLogger;
use serde::{Deserialize, Serialize};
use sqlx::sqlite::SqlitePool;
const SOURCE_FILE: &str = "a0_limit_order_book";
static MODULE_LOGGER: OnceLock<ModuleLogger> = OnceLock::new();
pub fn init_module_logger(broker_name: &str) {
let logger_file_name = format!("ORDER_MANAGER_{}", broker_name);
let _ = MODULE_LOGGER.set(ModuleLogger::new(&logger_file_name, "A0_LIMIT_ORDER_BOOK", SOURCE_FILE));
}
fn module_logger() -> &'static ModuleLogger {
MODULE_LOGGER.get_or_init(|| ModuleLogger::new("ORDER_MANAGER", "A0_LIMIT_ORDER_BOOK", SOURCE_FILE))
}
#[cfg(test)]
const TEST_STALE_THRESHOLD_SECS: u64 = 5;
#[cfg(test)]
const TEST_MEMORY_EVICTION_THRESHOLD_SECS: u64 = 300;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct MarketWindowConfig {
pub minutes_before_open: i64,
pub minutes_after_close: i64,
}
pub struct OrderBookConfig {
pub stale_threshold_secs: u64,
pub processing_market_window: MarketWindowConfig,
pub memory_eviction_threshold_secs: u64,
}
pub struct BrokerConfig {
pub broker_name: String,
pub health_port: u16,
}
pub fn load_order_book_config() -> OrderBookConfig {
dotenv().ok();
OrderBookConfig {
stale_threshold_secs: 5,
processing_market_window: MarketWindowConfig {
minutes_before_open: 2,
minutes_after_close: 0,
},
memory_eviction_threshold_secs: 300,
}
}
fn get_nats_url_from_env() -> Result<String> {
dotenv().ok();
let port =
env::var("NATS_PORT").map_err(|_| anyhow!("NATS_PORT not set in .env or environment"))?;
Ok(format!("nats://127.0.0.1:{}", port))
}
const NATS_RECEIVE_SUBJECT: &str = "marketdepth.data";
#[allow(dead_code)]
const ORDERBOOK_BUFFER_TIME_MS: u64 = 10;
#[allow(dead_code)]
const ORDERBOOK_CHANNEL_BUFFER_SIZE: usize = 5_000;
#[allow(dead_code)]
const ORDERBOOK_BATCH_SIZE: usize = 500;
#[allow(dead_code)]
const ORDERBOOK_BACKPRESSURE_TIMEOUT_MS: u64 = 10;
#[allow(dead_code)]
static LAST_BACKPRESSURE_WARNING: AtomicU64 = AtomicU64::new(0);
#[allow(dead_code)]
static DROPPED_MESSAGE_COUNT: AtomicU64 = AtomicU64::new(0);
#[allow(dead_code)]
static INTERVAL_DROPPED_MESSAGE_COUNT: AtomicU64 = AtomicU64::new(0);
#[derive(Debug)]
struct MetricsDatabase {
pool: SqlitePool,
}
impl MetricsDatabase {
pub async fn initialize(
#[allow(unused_variables)] logger: &jsonl_logger::jsonl_logger::Logger,
broker_name: &str,
) -> Result<Self> {
dotenv().ok();
let project_dir =
env::var("PROJECT_DIRECTORY").context("PROJECT_DIRECTORY must be set in .env file")?;
let now = Utc::now();
let date_folder = format!("{:04}_{:02}_{:02}", now.year(), now.month(), now.day());
let db_dir = PathBuf::from(&project_dir)
.join("_LOGS_DIRECTORY")
.join(date_folder)
.join("METRICS");
tokio::fs::create_dir_all(&db_dir)
.await
.context("Failed to create daily metrics directory")?;
let db_path = db_dir.join(format!("limit_order_book_{}.db", broker_name));
let connection_options = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true)
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
.synchronous(sqlx::sqlite::SqliteSynchronous::Normal)
.pragma("busy_timeout", "30000");
let pool = sqlx::sqlite::SqlitePoolOptions::new()
.max_connections(5)
.connect_with(connection_options)
.await
.context("Failed to establish database connection")?;
sqlx::query(
r#"CREATE TABLE IF NOT EXISTS "limit_order_book_metrics" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
lifetime_received_count INTEGER NOT NULL,
interval_counts REAL NOT NULL,
total_processing_time_ms REAL NOT NULL
)"#,
)
.execute(&pool)
.await
.context("Failed to create metrics table")?;
Ok(Self { pool })
}
#[allow(dead_code)]
pub async fn new_with_pool(
pool: SqlitePool,
#[allow(unused_variables)] logger: jsonl_logger::jsonl_logger::Logger,
) -> Result<Self> {
sqlx::query(
r#"CREATE TABLE IF NOT EXISTS "limit_order_book_metrics" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
lifetime_received_count INTEGER NOT NULL,
interval_counts REAL NOT NULL,
total_processing_time_ms REAL NOT NULL
)"#,
)
.execute(&pool)
.await
.context("Failed to create metrics table")?;
Ok(Self { pool })
}
pub async fn log_metrics(
&self,
lifetime_received_count: usize,
interval_counts: f64,
total_processing_time: f64,
) -> Result<()> {
let timestamp = chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.to_string();
sqlx::query(
r#"INSERT INTO "limit_order_book_metrics" (timestamp, lifetime_received_count, interval_counts, total_processing_time_ms)
VALUES (?1, ?2, ?3, ?4)"#,
)
.bind(×tamp)
.bind(lifetime_received_count as i64)
.bind(interval_counts)
.bind(total_processing_time)
.execute(&self.pool)
.await
.context("Failed to log metrics")?;
Ok(())
}
}
#[cfg(test)]
mod metrics_database_log_metrics_tests {
use super::*;
use jsonl_logger::jsonl_logger::init;
use serial_test::serial;
use sqlx::Row;
#[tokio::test]
#[serial]
async fn test_log_metrics() {
let pool = SqlitePool::connect("sqlite::memory:")
.await
.expect("Failed to create in-memory database");
let _logger = init().expect("Failed to init jsonl_logger");
sqlx::query(
r#"CREATE TABLE IF NOT EXISTS "limit_order_book_metrics" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
lifetime_received_count INTEGER NOT NULL,
interval_counts REAL NOT NULL,
total_processing_time_ms REAL NOT NULL
)"#,
)
.execute(&pool)
.await
.expect("Failed to create metrics table");
let metrics_db = MetricsDatabase { pool };
metrics_db.log_metrics(100, 200.0, 500.0).await.unwrap();
let row = sqlx::query(
r#"SELECT lifetime_received_count, interval_counts, total_processing_time_ms
FROM "limit_order_book_metrics" LIMIT 1"#,
)
.fetch_one(&metrics_db.pool)
.await
.unwrap();
assert_eq!(row.get::<i64, _>("lifetime_received_count"), 100);
assert_eq!(row.get::<f64, _>("interval_counts"), 200.0);
assert_eq!(row.get::<f64, _>("total_processing_time_ms"), 500.0);
metrics_db.log_metrics(0, 0.0, 0.0).await.unwrap();
let count: i64 = sqlx::query_scalar(r#"SELECT COUNT(*) FROM "limit_order_book_metrics""#)
.fetch_one(&metrics_db.pool)
.await
.unwrap();
assert_eq!(count, 2, "Both rows should be persisted");
let pool2 = SqlitePool::connect("sqlite::memory:")
.await
.expect("Failed to create in-memory database");
let metrics_db2 =
MetricsDatabase::new_with_pool(pool2, init().expect("Failed to init logger").clone())
.await
.expect("Failed to create metrics database");
metrics_db2.log_metrics(50, 100.0, 250.0).await.unwrap();
let count2: i64 = sqlx::query_scalar(r#"SELECT COUNT(*) FROM "limit_order_book_metrics""#)
.fetch_one(&metrics_db2.pool)
.await
.unwrap();
assert_eq!(count2, 1, "One row should be persisted");
}
}
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub struct MarketDepthData {
pub exch_timestamp: u64,
pub unified_symbol: String,
pub bids: Vec<(f64, i32)>,
pub asks: Vec<(f64, i32)>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct OrderedF64(f64);
impl Eq for OrderedF64 {}
impl PartialOrd for OrderedF64 {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OrderedF64 {
fn cmp(&self, other: &Self) -> Ordering {
self.0.total_cmp(&other.0)
}
}
#[derive(Debug, Clone)]
pub struct OrderBook {
pub(crate) bid_order_book: BTreeMap<OrderedF64, i32>,
pub(crate) ask_order_book: BTreeMap<OrderedF64, i32>,
last_updated: Instant,
exch_timestamp: u64,
}
impl Default for OrderBook {
fn default() -> Self {
Self {
bid_order_book: BTreeMap::new(),
ask_order_book: BTreeMap::new(),
last_updated: Instant::now(),
exch_timestamp: 0,
}
}
}
impl OrderBook {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn last_updated(&self) -> Instant {
self.last_updated
}
#[inline]
pub fn set_last_updated(&mut self, t: Instant) {
self.last_updated = t;
}
#[inline(always)]
pub fn update(&mut self, data: &MarketDepthData) -> bool {
if self.exch_timestamp > data.exch_timestamp {
return false;
}
self.bid_order_book.clear();
self.ask_order_book.clear();
if !data.bids.is_empty() {
data.bids
.iter()
.filter(|(p, q)| p.is_finite() && *p > 0.0 && *q > 0)
.for_each(|(p, q)| {
self.bid_order_book.insert(OrderedF64(*p), *q);
});
}
if !data.asks.is_empty() {
data.asks
.iter()
.filter(|(p, q)| p.is_finite() && *p > 0.0 && *q > 0)
.for_each(|(p, q)| {
self.ask_order_book.insert(OrderedF64(*p), *q);
});
}
self.last_updated = Instant::now();
self.exch_timestamp = data.exch_timestamp;
true
}
pub fn get_exch_timestamp(&self) -> u64 {
self.exch_timestamp
}
}
#[cfg(test)]
mod order_book_update_tests {
use super::*;
use serial_test::serial;
fn create_test_market_depth(
base_bid_price: f64,
base_bid_size: i32,
base_ask_price: f64,
base_ask_size: i32,
levels: usize,
price_step: f64,
size_step: i32,
timestamp: u64,
symbol: Option<&str>,
) -> MarketDepthData {
let bids = (0..levels)
.map(|i| {
(
base_bid_price - (i as f64 * price_step),
base_bid_size + (i as i32 * size_step),
)
})
.collect();
let asks = (0..levels)
.map(|i| {
(
base_ask_price + (i as f64 * price_step),
base_ask_size + (i as i32 * size_step),
)
})
.collect();
MarketDepthData {
bids,
asks,
unified_symbol: symbol.unwrap_or("BTCUSD").to_string(),
exch_timestamp: timestamp,
}
}
#[tokio::test]
#[serial]
async fn test_update() {
let mut order_book = OrderBook::new();
let initial_depth = create_test_market_depth(100.0, 10, 110.0, 15, 5, 1.0, 5, 1000, None);
assert!(
order_book.update(&initial_depth),
"Initial update should succeed"
);
assert_eq!(
order_book.bid_order_book.len(),
5,
"Should have 5 bid levels"
);
assert_eq!(
order_book.ask_order_book.len(),
5,
"Should have 5 ask levels"
);
assert_eq!(order_book.get_exch_timestamp(), 1000);
let older_depth = create_test_market_depth(101.0, 11, 111.0, 16, 5, 1.0, 5, 999, None);
assert!(
!order_book.update(&older_depth),
"Stale update should return false (skipped)"
);
assert_eq!(
order_book.bid_order_book.get(&OrderedF64(100.0)),
Some(&10),
"Original bid should remain unchanged after stale update"
);
assert_eq!(
order_book.ask_order_book.get(&OrderedF64(110.0)),
Some(&15),
"Original ask should remain unchanged after stale update"
);
assert_eq!(
order_book.get_exch_timestamp(),
1000,
"Timestamp must not regress"
);
for i in 0..5 {
let expected_bid_price = 100.0 - (i as f64 * 1.0);
let expected_bid_size = 10 + (i as i32 * 5);
assert_eq!(
order_book
.bid_order_book
.get(&OrderedF64(expected_bid_price)),
Some(&expected_bid_size),
"Bid level {} should remain unchanged",
i
);
}
let mut fresh_book = OrderBook::new();
let market_depth = MarketDepthData {
unified_symbol: "BTCUSD".to_string(),
bids: vec![
(100.0, 10), (99.0, 0), (98.0, 15), (97.0, -5), (0.0, 20), ],
asks: vec![
(101.0, 5), (102.0, 0), (103.0, 10), (0.0, 15), ],
exch_timestamp: 2000,
};
assert!(fresh_book.update(&market_depth));
assert_eq!(fresh_book.bid_order_book.len(), 2, "Only 2 valid bids");
assert_eq!(fresh_book.ask_order_book.len(), 2, "Only 2 valid asks");
assert_eq!(fresh_book.bid_order_book.get(&OrderedF64(100.0)), Some(&10));
assert_eq!(fresh_book.bid_order_book.get(&OrderedF64(98.0)), Some(&15));
assert_eq!(fresh_book.ask_order_book.get(&OrderedF64(101.0)), Some(&5));
assert_eq!(fresh_book.ask_order_book.get(&OrderedF64(103.0)), Some(&10));
assert_eq!(fresh_book.bid_order_book.get(&OrderedF64(99.0)), None);
assert_eq!(fresh_book.bid_order_book.get(&OrderedF64(97.0)), None);
assert_eq!(fresh_book.bid_order_book.get(&OrderedF64(0.0)), None);
assert_eq!(fresh_book.ask_order_book.get(&OrderedF64(102.0)), None);
assert_eq!(fresh_book.ask_order_book.get(&OrderedF64(0.0)), None);
assert_eq!(fresh_book.get_exch_timestamp(), 2000);
}
}
#[derive(Debug)]
pub struct OrderBookStore {
books: DashMap<String, OrderBook>,
metrics: MetricsTracker,
#[allow(dead_code)]
logger: jsonl_logger::jsonl_logger::Logger,
stale_threshold_secs: u64,
memory_eviction_threshold_secs: u64,
}
impl OrderBookStore {
pub async fn new(
logger: jsonl_logger::jsonl_logger::Logger,
stale_threshold_secs: u64,
memory_eviction_threshold_secs: u64,
broker_name: &str,
) -> Result<Self> {
let broker_name_owned = broker_name.to_string();
let (metric_tx, mut metric_rx) = tokio::sync::mpsc::unbounded_channel::<MetricBatch>();
let logger_db = logger.clone();
tokio::spawn(async move {
match MetricsDatabase::initialize(&logger_db, &broker_name_owned).await {
Ok(db) => {
while let Some(batch) = metric_rx.recv().await {
let _ = db
.log_metrics(
batch.lifetime_received_count,
batch.interval_counts,
batch.total_processing_time_ms,
)
.await;
}
}
Err(e) => {
let _ = &logger_db;
module_logger().error(
&format!(
"Metrics database initialization failed — metrics will be lost: {}",
e
),
None,
);
}
}
});
Ok(Self {
books: DashMap::new(),
metrics: MetricsTracker::new(logger.clone(), metric_tx),
logger,
stale_threshold_secs,
memory_eviction_threshold_secs,
})
}
#[allow(dead_code)]
pub fn logger(&self) -> &jsonl_logger::jsonl_logger::Logger {
&self.logger
}
#[inline]
pub fn update_book(&self, data: &MarketDepthData) -> Result<Duration> {
let start = Instant::now();
let was_updated = {
let mut entry = self.books.entry(data.unified_symbol.clone()).or_default();
entry.update(data)
};
if was_updated {
self.metrics.record_update(start.elapsed()); }
Ok(start.elapsed())
}
#[allow(dead_code)]
fn check_stale_data(&self, symbol: &str, last_updated: Instant) -> Result<()> {
let age = Instant::now().duration_since(last_updated);
if age > Duration::from_secs(self.stale_threshold_secs) {
return Err(anyhow::anyhow!(
"Stale order book data for {} (age: {:.2}s > threshold: {}s)",
symbol,
age.as_secs_f64(),
self.stale_threshold_secs
));
}
Ok(())
}
#[inline]
fn get_fresh_book(
&self,
symbol: &str,
) -> Option<dashmap::mapref::one::Ref<'_, String, OrderBook>> {
self.books.get(symbol).filter(|book| {
book.last_updated().elapsed() <= Duration::from_secs(self.stale_threshold_secs)
})
}
pub fn get_symbol_timestamp(&self, symbol: &str) -> Result<Option<u64>> {
Ok(self
.get_fresh_book(symbol)
.map(|book| book.get_exch_timestamp()))
}
pub fn remove_stale_books(&self, max_age: Duration) -> usize {
let now = Instant::now();
let removed_count = AtomicUsize::new(0);
self.books.retain(|_, book| {
let is_fresh = now.duration_since(book.last_updated()) < max_age;
if !is_fresh {
removed_count.fetch_add(1, AtomicOrdering::Relaxed);
}
is_fresh
});
removed_count.load(AtomicOrdering::Relaxed)
}
pub fn get_order_book(&self, symbol: &str) -> Result<OrderBook> {
self.get_fresh_book(symbol)
.map(|book| book.clone())
.ok_or_else(|| anyhow::anyhow!("Stale order book data for {}", symbol))
}
pub fn does_symbol_exist(&self, symbol: &str) -> Result<bool> {
Ok(self.get_fresh_book(symbol).is_some())
}
#[must_use]
pub fn get_all_symbols(&self) -> Vec<String> {
self.books
.iter()
.filter(|entry| {
entry.value().last_updated().elapsed()
<= Duration::from_secs(self.stale_threshold_secs)
})
.map(|entry| entry.key().clone())
.collect()
}
#[inline]
pub fn get_best_ask(&self, symbol: &str) -> Result<Option<f64>> {
Ok(self.get_fresh_book(symbol).and_then(|book| {
book.ask_order_book
.iter()
.next()
.map(|(OrderedF64(p), _)| *p)
}))
}
#[inline]
pub fn get_best_bid(&self, symbol: &str) -> Result<Option<f64>> {
Ok(self.get_fresh_book(symbol).and_then(|book| {
book.bid_order_book
.iter()
.next_back()
.map(|(OrderedF64(p), _)| *p)
}))
}
pub fn calculate_total_ask_value(&self, symbol: &str) -> Result<Option<f64>> {
Ok(self.get_fresh_book(symbol).map(|book| {
book.ask_order_book
.iter()
.map(|(OrderedF64(p), s)| p * *s as f64)
.sum()
}))
}
pub fn calculate_total_bid_value(&self, symbol: &str) -> Result<Option<f64>> {
Ok(self.get_fresh_book(symbol).map(|book| {
book.bid_order_book
.iter()
.map(|(OrderedF64(p), s)| p * *s as f64)
.sum()
}))
}
pub fn calculate_total_ask_quantity(&self, symbol: &str) -> Result<Option<i64>> {
Ok(self
.get_fresh_book(symbol)
.map(|book| book.ask_order_book.values().map(|&q| q as i64).sum()))
}
pub fn calculate_total_bid_quantity(&self, symbol: &str) -> Result<Option<i64>> {
Ok(self
.get_fresh_book(symbol)
.map(|book| book.bid_order_book.values().map(|&q| q as i64).sum()))
}
#[inline]
pub fn calculate_mid_price(&self, symbol: &str) -> Result<Option<f64>> {
Ok(self.get_fresh_book(symbol).and_then(|book| {
let bid = book
.bid_order_book
.iter()
.next_back()
.map(|(OrderedF64(p), _)| *p);
let ask = book
.ask_order_book
.iter()
.next()
.map(|(OrderedF64(p), _)| *p);
match (bid, ask) {
(Some(b), Some(a)) => Some((b + a) / 2.0),
_ => None,
}
}))
}
pub fn calculate_spread(&self, symbol: &str) -> Result<Option<f64>> {
Ok(self.get_fresh_book(symbol).and_then(|book| {
let bid = book
.bid_order_book
.iter()
.next_back()
.map(|(OrderedF64(p), _)| *p);
let ask = book
.ask_order_book
.iter()
.next()
.map(|(OrderedF64(p), _)| *p);
match (bid, ask) {
(Some(b), Some(a)) => Some((a - b).abs()),
_ => None,
}
}))
}
pub fn calculate_order_book_liquidity(&self, symbol: &str) -> Result<Option<f64>> {
Ok(self.get_fresh_book(symbol).map(|book| {
let bid_liquidity: f64 = book
.bid_order_book
.iter()
.map(|(OrderedF64(p), s)| p * *s as f64)
.sum();
let ask_liquidity: f64 = book
.ask_order_book
.iter()
.map(|(OrderedF64(p), s)| p * *s as f64)
.sum();
bid_liquidity + ask_liquidity
}))
}
pub fn check_quantity_availability_with_price(
&self,
symbol: &str,
price: f64,
quantity: usize,
is_bid: bool,
) -> Result<Option<bool>> {
Ok(self.get_fresh_book(symbol).map(|book| {
let order_book = if is_bid {
&book.bid_order_book
} else {
&book.ask_order_book
};
let mut accumulated = 0usize;
let mut available = false;
if is_bid {
for (OrderedF64(_p), size) in order_book
.iter()
.rev()
.take_while(|(OrderedF64(p), _)| *p >= price)
{
accumulated += (*size).max(0) as usize;
if accumulated >= quantity {
available = true;
break;
}
}
} else {
for (OrderedF64(_p), size) in order_book
.iter()
.take_while(|(OrderedF64(p), _)| *p <= price)
{
accumulated += (*size).max(0) as usize;
if accumulated >= quantity {
available = true;
break;
}
}
}
available
}))
}
pub fn calculate_market_impact(
&self,
symbol: &str,
quantity: usize,
is_buy_order: bool,
) -> Result<Option<(f64, f64, usize, bool)>> {
Ok(self.get_fresh_book(symbol).and_then(|book| {
let mut remaining = quantity;
let mut total_cost = 0.0;
let mut impact_price = 0.0;
let mut executed = 0;
if is_buy_order {
for (OrderedF64(price), size) in book.ask_order_book.iter() {
let available = (*size).max(0) as usize;
let fill = remaining.min(available);
total_cost += fill as f64 * price;
remaining -= fill;
executed += fill;
impact_price = *price;
if remaining == 0 {
break;
}
}
} else {
for (OrderedF64(price), size) in book.bid_order_book.iter().rev() {
let available = (*size).max(0) as usize;
let fill = remaining.min(available);
total_cost += fill as f64 * price;
remaining -= fill;
executed += fill;
impact_price = *price;
if remaining == 0 {
break;
}
}
}
if executed > 0 {
let avg_price = total_cost / executed as f64;
let fully_filled = executed >= quantity;
Some((avg_price, impact_price, executed, fully_filled))
} else {
None
}
}))
}
pub fn calculate_depth_at_price(
&self,
symbol: &str,
price: f64,
is_bid: bool,
) -> Result<Option<usize>> {
Ok(self.get_fresh_book(symbol).map(|book| {
let order_book = if is_bid {
&book.bid_order_book
} else {
&book.ask_order_book
};
if is_bid {
order_book
.iter()
.rev()
.take_while(|(OrderedF64(p), _)| *p >= price)
.map(|(_, s)| (*s).max(0) as usize)
.sum()
} else {
order_book
.iter()
.take_while(|(OrderedF64(p), _)| *p <= price)
.map(|(_, s)| (*s).max(0) as usize)
.sum()
}
}))
}
pub async fn report_metrics(&self) -> Result<()> {
self.metrics.report_and_reset().await
}
#[cfg(test)]
pub fn set_last_updated_for_test(&self, symbol: &str, instant: Instant) {
if let Some(mut book) = self.books.get_mut(symbol) {
book.set_last_updated(instant);
}
}
#[cfg(test)]
pub async fn new_with_existing_books(
logger: jsonl_logger::jsonl_logger::Logger,
stale_threshold_secs: u64,
books: DashMap<String, OrderBook>,
broker_name: &str,
) -> Result<Self> {
let broker_name_owned = broker_name.to_string();
let (metric_tx, mut metric_rx) = tokio::sync::mpsc::unbounded_channel::<MetricBatch>();
let logger_db = logger.clone();
tokio::spawn(async move {
match MetricsDatabase::initialize(&logger_db, &broker_name_owned).await {
Ok(db) => {
while let Some(batch) = metric_rx.recv().await {
let _ = db
.log_metrics(
batch.lifetime_received_count,
batch.interval_counts,
batch.total_processing_time_ms,
)
.await;
}
}
Err(e) => {
let _ = &logger_db;
module_logger().error(
&format!(
"Metrics database initialization failed — metrics will be lost: {}",
e
),
None,
);
}
}
});
Ok(Self {
books,
metrics: MetricsTracker::new(logger.clone(), metric_tx),
logger,
stale_threshold_secs,
memory_eviction_threshold_secs: TEST_MEMORY_EVICTION_THRESHOLD_SECS,
})
}
pub fn memory_eviction_threshold(&self) -> Duration {
Duration::from_secs(self.memory_eviction_threshold_secs)
}
}
#[cfg(test)]
mod order_book_store_shared {
use super::*;
use jsonl_logger::jsonl_logger::init;
pub async fn setup_test_order_book() -> OrderBookStore {
dotenv().ok();
let logger = init().expect("Failed to init jsonl_logger").clone();
let (metric_tx, mut metric_rx) = tokio::sync::mpsc::unbounded_channel::<MetricBatch>();
tokio::spawn(async move {
let pool = SqlitePool::connect("sqlite::memory:")
.await
.expect("Failed to create test database");
let metrics_db = MetricsDatabase { pool };
while let Some(batch) = metric_rx.recv().await {
let _ = metrics_db
.log_metrics(
batch.lifetime_received_count,
batch.interval_counts,
batch.total_processing_time_ms,
)
.await;
}
});
OrderBookStore {
books: DashMap::new(),
metrics: MetricsTracker::new(logger.clone(), metric_tx),
logger,
stale_threshold_secs: TEST_STALE_THRESHOLD_SECS,
memory_eviction_threshold_secs: TEST_MEMORY_EVICTION_THRESHOLD_SECS,
}
}
pub fn create_test_market_data(symbol: &str, timestamp: u64) -> MarketDepthData {
MarketDepthData {
unified_symbol: symbol.to_string(),
bids: vec![(100.0, 10), (99.0, 20), (98.0, 15), (97.0, 25), (96.0, 30)],
asks: vec![
(101.0, 5),
(102.0, 15),
(103.0, 10),
(104.0, 20),
(105.0, 25),
],
exch_timestamp: timestamp,
}
}
pub fn create_test_market_data_now(symbol: &str) -> MarketDepthData {
MarketDepthData {
unified_symbol: symbol.to_string(),
bids: vec![(100.0, 10), (99.0, 20), (98.0, 15), (97.0, 25), (96.0, 30)],
asks: vec![
(101.0, 5),
(102.0, 15),
(103.0, 10),
(104.0, 20),
(105.0, 25),
],
exch_timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
}
}
}
#[cfg(test)]
mod check_stale_data_tests {
use super::order_book_store_shared::*;
use super::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_check_stale_data() {
let store = setup_test_order_book().await;
let symbol = "BTCUSD";
let fresh_data = create_test_market_data(symbol, 1000);
store.update_book(&fresh_data).unwrap();
{
let book = store.books.get(symbol).unwrap();
assert!(store.check_stale_data(symbol, book.last_updated()).is_ok());
}
store.set_last_updated_for_test(
symbol,
Instant::now() - Duration::from_secs(TEST_STALE_THRESHOLD_SECS + 1),
);
{
let book = store.books.get(symbol).unwrap();
let result = store.check_stale_data(symbol, book.last_updated());
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Stale"));
}
let old_ts = Instant::now() - Duration::from_secs(TEST_STALE_THRESHOLD_SECS + 1);
let result = store.check_stale_data("ANY_SYMBOL", old_ts);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Stale"));
}
}
#[cfg(test)]
mod update_book_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_update_book() {
let store = setup_test_order_book().await;
let initial = create_test_market_data("BTC", 1000);
store.update_book(&initial).unwrap();
assert_eq!(store.get_symbol_timestamp("BTC").unwrap(), Some(1000));
let older = create_test_market_data("BTC", 999);
store.update_book(&older).unwrap();
assert_eq!(store.get_symbol_timestamp("BTC").unwrap(), Some(1000));
let newer = create_test_market_data("BTC", 1001);
store.update_book(&newer).unwrap();
assert_eq!(store.get_symbol_timestamp("BTC").unwrap(), Some(1001));
}
}
#[cfg(test)]
mod get_symbol_timestamp_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_get_symbol_timestamp() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert_eq!(store.get_symbol_timestamp("BTC").unwrap(), Some(1000));
assert_eq!(store.get_symbol_timestamp("ETH").unwrap(), None);
}
}
#[cfg(test)]
mod remove_stale_books_tests {
use super::order_book_store_shared::*;
use super::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_remove_stale_books() {
let store = setup_test_order_book().await;
let symbol1 = "BTCUSD".to_string();
let symbol2 = "ETHUSD".to_string();
let data1 = MarketDepthData {
unified_symbol: symbol1.clone(),
bids: vec![(50000.0, 10), (49900.0, 20)],
asks: vec![(51000.0, 5), (51100.0, 15)],
exch_timestamp: 1000,
};
let data2 = MarketDepthData {
unified_symbol: symbol2.clone(),
bids: vec![(3000.0, 20), (2900.0, 30)],
asks: vec![(3200.0, 10), (3300.0, 20)],
exch_timestamp: 1000,
};
store.update_book(&data1).unwrap();
store.update_book(&data2).unwrap();
store.set_last_updated_for_test(&symbol1, Instant::now() - Duration::from_secs(10));
store.set_last_updated_for_test(&symbol2, Instant::now() - Duration::from_secs(1));
assert_eq!(store.books.len(), 2);
store.remove_stale_books(Duration::from_secs(5));
assert_eq!(store.books.len(), 1);
assert!(store.books.get(&symbol2).is_some());
assert!(store.books.get(&symbol1).is_none());
}
}
#[cfg(test)]
mod does_symbol_exist_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_does_symbol_exist() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert!(store.does_symbol_exist("BTC").unwrap());
assert!(!store.does_symbol_exist("ETH").unwrap());
}
}
#[cfg(test)]
mod get_best_bid_ask_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_get_best_bid_ask() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert_eq!(store.get_best_bid("BTC").unwrap(), Some(100.0));
assert_eq!(store.get_best_ask("BTC").unwrap(), Some(101.0));
assert_eq!(store.get_best_bid("ETH").unwrap(), None);
assert_eq!(store.get_best_ask("ETH").unwrap(), None);
}
}
#[cfg(test)]
mod calculate_mid_price_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_calculate_mid_price() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert_eq!(store.calculate_mid_price("BTC").unwrap(), Some(100.5));
assert_eq!(store.calculate_mid_price("ETH").unwrap(), None);
}
}
#[cfg(test)]
mod calculate_spread_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_calculate_spread() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert_eq!(store.calculate_spread("BTC").unwrap(), Some(1.0));
assert_eq!(store.calculate_spread("ETH").unwrap(), None);
}
}
#[cfg(test)]
mod calculate_total_quantities_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_calculate_total_quantities() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert_eq!(
store.calculate_total_bid_quantity("BTC").unwrap(),
Some(100)
);
assert_eq!(store.calculate_total_ask_quantity("BTC").unwrap(), Some(75));
}
}
#[cfg(test)]
mod calculate_order_book_liquidity_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_calculate_order_book_liquidity() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
let expected_bid = 100.0 * 10.0 + 99.0 * 20.0 + 98.0 * 15.0 + 97.0 * 25.0 + 96.0 * 30.0;
let expected_ask = 101.0 * 5.0 + 102.0 * 15.0 + 103.0 * 10.0 + 104.0 * 20.0 + 105.0 * 25.0;
let liquidity = store
.calculate_order_book_liquidity("BTC")
.unwrap()
.unwrap();
assert!((liquidity - (expected_bid + expected_ask)).abs() < 0.001);
assert_eq!(store.calculate_order_book_liquidity("ETH").unwrap(), None);
}
}
#[cfg(test)]
mod check_quantity_availability_with_price_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_check_quantity_availability_with_price() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert_eq!(
store
.check_quantity_availability_with_price("BTC", 99.0, 30, true)
.unwrap(),
Some(true)
);
assert_eq!(
store
.check_quantity_availability_with_price("BTC", 102.0, 20, false)
.unwrap(),
Some(true)
);
assert_eq!(
store
.check_quantity_availability_with_price("BTC", 99.0, 30, true)
.unwrap(),
Some(true),
"Exact boundary: 30 units should be available at price >= 99.0"
);
assert_eq!(
store
.check_quantity_availability_with_price("BTC", 99.0, 31, true)
.unwrap(),
Some(false),
"Over boundary: 31 units should NOT be available at price >= 99.0"
);
assert_eq!(
store
.check_quantity_availability_with_price("MISSING", 100.0, 10, true)
.unwrap(),
None
);
}
}
#[cfg(test)]
mod calculate_market_impact_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_calculate_market_impact() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
let (avg_price, impact_price, executed_qty, fully_filled) = store
.calculate_market_impact("BTC", 15, true)
.unwrap()
.unwrap();
assert!((avg_price - 101.67).abs() < 0.01);
assert_eq!(impact_price, 102.0);
assert_eq!(executed_qty, 15);
assert!(fully_filled, "Order should be fully filled");
let (avg_price, impact_price, executed_qty, fully_filled) = store
.calculate_market_impact("BTC", 20, false)
.unwrap()
.unwrap();
assert!((avg_price - 99.5).abs() < 0.01);
assert_eq!(impact_price, 99.0);
assert_eq!(executed_qty, 20);
assert!(fully_filled, "Order should be fully filled");
let (_avg, _impact, executed, fully_filled) = store
.calculate_market_impact("BTC", 200, true)
.unwrap()
.unwrap();
assert_eq!(executed, 75);
assert!(!fully_filled);
let empty_store = setup_test_order_book().await;
assert_eq!(
empty_store
.calculate_market_impact("BTC", 100, true)
.unwrap(),
None
);
assert_eq!(
store.calculate_market_impact("MISSING", 10, true).unwrap(),
None
);
}
}
#[cfg(test)]
mod calculate_depth_at_price_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_calculate_depth_at_price() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert_eq!(
store.calculate_depth_at_price("BTC", 98.0, true).unwrap(),
Some(45)
);
assert_eq!(
store.calculate_depth_at_price("BTC", 103.0, false).unwrap(),
Some(30)
);
assert_eq!(
store.calculate_depth_at_price("BTC", 100.0, true).unwrap(),
Some(10)
);
assert_eq!(
store.calculate_depth_at_price("BTC", 95.0, true).unwrap(),
Some(100)
);
assert_eq!(
store.calculate_depth_at_price("ETH", 100.0, true).unwrap(),
None
);
}
}
#[cfg(test)]
mod floating_point_precision_tests {
use super::order_book_store_shared::*;
use super::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_f64_precision() {
let store = setup_test_order_book().await;
let data = MarketDepthData {
unified_symbol: "BTCUSD".to_string(),
bids: vec![(10000.00, 10), (10000.01, 20), (10000.02, 15)],
asks: vec![(10001.00, 5), (10001.01, 15), (10001.02, 10)],
exch_timestamp: 1000,
};
store.update_book(&data).unwrap();
let result = store.calculate_market_impact("BTCUSD", 25, true).unwrap();
assert!(result.is_some());
let (avg_price, _impact_price, executed, fully_filled) = result.unwrap();
assert_eq!(executed, 25);
assert!(fully_filled);
let expected_avg = 10001.01;
let tolerance = 1e-6; assert!(
(avg_price - expected_avg).abs() < tolerance,
"avg_price {} differs from expected {} by more than {}",
avg_price,
expected_avg,
tolerance
);
let store2 = setup_test_order_book().await;
let data2 = MarketDepthData {
unified_symbol: "ETHUSD".to_string(),
bids: vec![(5000.125, 10)],
asks: vec![(5000.135, 10)],
exch_timestamp: 2000,
};
store2.update_book(&data2).unwrap();
let mid = store2.calculate_mid_price("ETHUSD").unwrap().unwrap();
let expected = (5000.125 + 5000.135) / 2.0;
assert!(
(mid - expected).abs() < 1e-9,
"Mid price precision error: {} vs {}",
mid,
expected
);
}
}
#[derive(Debug)]
pub struct MetricsTracker {
processed_count: AtomicUsize,
lifetime_received_count: AtomicUsize,
total_processing_time: AtomicU64,
#[allow(dead_code)]
logger: jsonl_logger::jsonl_logger::Logger,
metric_tx: tokio::sync::mpsc::UnboundedSender<MetricBatch>,
#[allow(dead_code)]
interval_start: Instant,
}
#[derive(Debug)]
struct MetricBatch {
lifetime_received_count: usize,
interval_counts: f64,
total_processing_time_ms: f64,
}
impl MetricsTracker {
fn new(
logger: jsonl_logger::jsonl_logger::Logger,
metric_tx: tokio::sync::mpsc::UnboundedSender<MetricBatch>,
) -> Self {
Self {
processed_count: AtomicUsize::new(0),
lifetime_received_count: AtomicUsize::new(0),
total_processing_time: AtomicU64::new(0),
logger,
metric_tx,
interval_start: Instant::now(),
}
}
pub fn record_received(&self) {
self.lifetime_received_count
.fetch_add(1, AtomicOrdering::Relaxed);
}
pub fn record_update(&self, duration: Duration) {
self.processed_count.fetch_add(1, AtomicOrdering::Relaxed);
self.total_processing_time
.fetch_add(duration.as_micros() as u64, AtomicOrdering::Relaxed);
}
pub async fn report_and_reset(&self) -> Result<()> {
let interval_counts = self.processed_count.swap(0, AtomicOrdering::Relaxed);
let total_micros = self.total_processing_time.swap(0, AtomicOrdering::Relaxed);
let total_ms = total_micros as f64 / 1000.0;
let lifetime_received_count = self.lifetime_received_count.load(AtomicOrdering::Relaxed);
let batch = MetricBatch {
lifetime_received_count,
interval_counts: interval_counts as f64,
total_processing_time_ms: total_ms,
};
let _ = self.metric_tx.send(batch);
Ok(())
}
}
pub async fn run_limit_order_book(
logger: jsonl_logger::jsonl_logger::Logger,
broker_name: &str,
) -> Result<Arc<OrderBookStore>> {
let _ = &logger;
init_module_logger(broker_name);
module_logger().info("Initializing limit order book system", None);
let order_book_config = load_order_book_config();
let stale_threshold = order_book_config.stale_threshold_secs;
let memory_eviction_threshold = order_book_config.memory_eviction_threshold_secs;
let order_books =
OrderBookStore::new(logger.clone(), stale_threshold, memory_eviction_threshold, broker_name).await?;
let order_books_arc = Arc::new(order_books);
Ok(order_books_arc)
}
fn process_message_batch(
batch: &mut [Vec<u8>],
order_books: &Arc<OrderBookStore>,
logger: &jsonl_logger::jsonl_logger::Logger,
) -> Result<()> {
let mut has_error = false;
for msg_bytes in batch {
match simd_json::serde::from_slice::<MarketDepthData>(msg_bytes.as_mut_slice()) {
Ok(data) => {
if let Err(e) = order_books.update_book(&data) {
let _ = logger;
module_logger().warn(&format!("Failed to update order book: {}", e), None);
has_error = true;
}
}
Err(e) => {
let _ = logger;
module_logger().warn(
&format!("Failed to deserialize message, skipping: {}", e),
None,
);
has_error = true;
continue;
}
}
}
if has_error {
Err(anyhow!("Some messages in batch failed to process"))
} else {
Ok(())
}
}
pub fn process_buffered_messages(
messages: &mut [Vec<u8>],
order_books: Arc<OrderBookStore>,
logger: &jsonl_logger::jsonl_logger::Logger,
) -> Result<()> {
let count = messages.len();
for _ in 0..count {
order_books.metrics.record_received();
}
process_message_batch(messages, &order_books, logger)
}
async fn process_nats_messages(
order_books: Arc<OrderBookStore>,
logger: &jsonl_logger::jsonl_logger::Logger,
) -> Result<()> {
let nats_url = get_nats_url_from_env()?;
let subject = NATS_RECEIVE_SUBJECT;
let client = async_nats::connect(&nats_url).await?;
let _ = logger;
module_logger().info(&format!("Connected to NATS server at {}", nats_url), None);
let mut sub = client.subscribe(subject).await?;
module_logger().info(&format!("Subscribed to {}", subject), None);
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(ORDERBOOK_CHANNEL_BUFFER_SIZE);
let order_books_processor = Arc::clone(&order_books);
let logger_processor = logger.clone();
tokio::spawn(async move {
let mut batch = Vec::with_capacity(ORDERBOOK_BATCH_SIZE);
let mut flush_deadline: Option<tokio::time::Instant> = None;
loop {
let sleep_fut = async {
match flush_deadline {
Some(dl) => tokio::time::sleep_until(dl).await,
None => std::future::pending::<()>().await,
}
};
tokio::select! {
Some(message) = rx.recv() => {
if flush_deadline.is_none() {
flush_deadline = Some(
tokio::time::Instant::now()
+ Duration::from_millis(ORDERBOOK_BUFFER_TIME_MS)
);
}
batch.push(message);
if batch.len() >= ORDERBOOK_BATCH_SIZE {
if let Err(e) = process_message_batch(&mut batch, &order_books_processor, &logger_processor) {
let _ = &logger_processor;
module_logger().error(&format!("Batch processing error: {}", e), None);
}
batch.clear();
flush_deadline = None;
}
}
_ = sleep_fut => {
if !batch.is_empty() {
if let Err(e) = process_message_batch(&mut batch, &order_books_processor, &logger_processor) {
let _ = &logger_processor;
module_logger().error(&format!("Batch processing error: {}", e), None);
}
batch.clear();
}
flush_deadline = None;
}
else => {
let _ = &logger_processor;
module_logger().info("Message processor shutting down gracefully", None);
if !batch.is_empty() {
let _ = process_message_batch(&mut batch, &order_books_processor, &logger_processor);
}
break;
}
}
}
});
let mut last_cleanup = Instant::now();
while let Some(msg) = sub.next().await {
order_books.metrics.record_received();
if last_cleanup.elapsed() > Duration::from_secs(60) {
let removed = order_books.remove_stale_books(order_books.memory_eviction_threshold());
if removed > 0 {
module_logger().info(&format!("Removed {} stale order books", removed), None);
}
let prev_interval_dropped =
INTERVAL_DROPPED_MESSAGE_COUNT.swap(0, AtomicOrdering::Relaxed);
if prev_interval_dropped > 0 {
module_logger().info(
&format!(
"Interval dropped messages reset: {} (last 60s)",
prev_interval_dropped
),
None,
);
}
last_cleanup = Instant::now();
}
let payload = msg.payload.to_vec();
match tx.try_send(payload) {
Ok(_) => {}
Err(tokio::sync::mpsc::error::TrySendError::Full(payload)) => {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let last_warn = LAST_BACKPRESSURE_WARNING.load(AtomicOrdering::Relaxed);
if now - last_warn >= 5 {
module_logger().warn("Channel backpressure - processor overloaded!", None);
LAST_BACKPRESSURE_WARNING.store(now, AtomicOrdering::Relaxed);
}
match tokio::time::timeout(
Duration::from_millis(ORDERBOOK_BACKPRESSURE_TIMEOUT_MS),
tx.send(payload),
)
.await
{
Ok(Ok(_)) => {}
Ok(Err(_)) => {
module_logger().error("Processor channel closed during backpressure", None);
break;
}
Err(_) => {
DROPPED_MESSAGE_COUNT.fetch_add(1, AtomicOrdering::Relaxed);
INTERVAL_DROPPED_MESSAGE_COUNT.fetch_add(1, AtomicOrdering::Relaxed);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let last_warn = LAST_BACKPRESSURE_WARNING.load(AtomicOrdering::Relaxed);
if now - last_warn >= 5 {
let total_dropped = DROPPED_MESSAGE_COUNT.load(AtomicOrdering::Relaxed);
let interval_dropped =
INTERVAL_DROPPED_MESSAGE_COUNT.load(AtomicOrdering::Relaxed);
module_logger().error(
&format!(
"Channel backpressure timeout - interval dropped: {}, total dropped: {}",
interval_dropped, total_dropped
),
None,
);
LAST_BACKPRESSURE_WARNING.store(now, AtomicOrdering::Relaxed);
}
}
}
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
module_logger().error("Processor channel closed during backpressure retry", None);
break;
}
}
}
Ok(())
}
use trading_hours::trading_hours::processing_market_window;
#[allow(dead_code)]
pub struct HealthServerState {
pub logger: jsonl_logger::jsonl_logger::Logger,
pub order_books: Arc<OrderBookStore>,
pub processing_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<Result<()>>>>>,
}
#[allow(dead_code)]
const HEALTH_ENDPOINT: &str = "/healthz";
#[allow(dead_code)]
const TRIGGER_ENDPOINT: &str = "/start";
async fn health_check() -> impl axum::response::IntoResponse {
(axum::http::StatusCode::OK, "OK")
}
async fn trigger_run_processing(
axum::extract::Extension(state): axum::extract::Extension<Arc<HealthServerState>>,
) -> &'static str {
let mut handle_guard = state.processing_handle.lock().await;
if let Some(handle) = handle_guard.take() {
if !handle.is_finished() {
let _ = &state.logger;
module_logger().warn(
"Aborting existing NATS processing task — starting fresh",
None,
);
handle.abort();
loop {
if handle.is_finished() {
break;
}
tokio::task::yield_now().await;
}
}
}
let _ = &state.logger;
module_logger().info("Triggering order book processing manually...", None);
let state_clone = state.clone();
let handle = tokio::spawn(async move {
let result =
process_nats_messages(state_clone.order_books.clone(), &state_clone.logger).await;
if let Err(e) = &result {
let _ = &state_clone.logger;
module_logger().error(&format!("Order book processing failed: {e}"), None);
}
result
});
*handle_guard = Some(handle);
"Processing started"
}
pub async fn run_health_server(
port: u16,
order_books: Arc<OrderBookStore>,
logger: &jsonl_logger::jsonl_logger::Logger,
processing_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<Result<()>>>>>,
) -> Result<()> {
use axum::routing::{get, post};
use axum::Router;
use std::net::SocketAddr;
let state = Arc::new(HealthServerState {
logger: logger.clone(),
order_books,
processing_handle,
});
let app = Router::new()
.route(HEALTH_ENDPOINT, get(health_check))
.route(TRIGGER_ENDPOINT, post(trigger_run_processing))
.layer(axum::extract::Extension(state));
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr).await?;
let _ = logger;
module_logger().info(
&format!(
"🏥 Health server listening on {} at {} and {}",
addr, HEALTH_ENDPOINT, TRIGGER_ENDPOINT
),
None,
);
axum::serve(listener, app.into_make_service()).await?;
Ok(())
}
pub async fn run_limit_order_book_full(config: BrokerConfig) -> Result<Arc<OrderBookStore>> {
dotenv().ok();
let logger = jsonl_logger::jsonl_logger::init()
.map_err(|e| anyhow!("Failed to initialize JSONL logger: {}", e))?
.clone();
init_module_logger(&config.broker_name);
module_logger().info("Initializing limit order book system (full mode)", None);
let order_book_config = load_order_book_config();
let stale_threshold = order_book_config.stale_threshold_secs;
let memory_eviction_threshold = order_book_config.memory_eviction_threshold_secs;
let market_window = order_book_config.processing_market_window.clone();
let order_books = Arc::new(
OrderBookStore::new(logger.clone(), stale_threshold, memory_eviction_threshold, &config.broker_name).await?,
);
let processing_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<Result<()>>>>> =
Arc::new(tokio::sync::Mutex::new(None));
let health_logger = logger.clone();
let store_for_health = Arc::clone(&order_books);
let health_processing_handle = Arc::clone(&processing_handle);
tokio::spawn(async move {
if let Err(e) = run_health_server(
config.health_port,
store_for_health,
&health_logger,
health_processing_handle,
)
.await
{
let _ = &health_logger;
module_logger().error(&format!("Health server error: {}", e), None);
}
});
let store_for_metrics = Arc::clone(&order_books);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(15));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
if let Err(e) = store_for_metrics.report_metrics().await {
let _ = &store_for_metrics.logger;
module_logger().error(&format!("Metrics reporting error: {}", e), None);
}
}
});
let nats_logger = logger.clone();
let store_for_nats = Arc::clone(&order_books);
let nats_processing_handle = Arc::clone(&processing_handle);
tokio::spawn(async move {
if let Err(e) = run_processing_with_market_window(
&nats_logger,
store_for_nats,
market_window,
nats_processing_handle,
)
.await
{
let _ = &nats_logger;
module_logger().error(&format!("NATS processing error: {}", e), None);
}
});
Ok(order_books)
}
async fn run_processing_with_market_window(
logger: &jsonl_logger::jsonl_logger::Logger,
order_books: Arc<OrderBookStore>,
market_window: MarketWindowConfig,
processing_handle: Arc<tokio::sync::Mutex<Option<tokio::task::JoinHandle<Result<()>>>>>,
) -> Result<()> {
let mut previous_market_status = None;
let _ = logger;
module_logger().info(
&format!(
"Market window config: {} min before open, {} min after close",
market_window.minutes_before_open, market_window.minutes_after_close
),
None,
);
loop {
let mw_result = tokio::time::timeout(
Duration::from_secs(15),
processing_market_window(
market_window.minutes_before_open,
market_window.minutes_after_close,
),
)
.await;
match mw_result {
Ok(Ok((true, info))) => {
if previous_market_status != Some(true) {
module_logger().info(&format!("🟢 MARKET STATUS CHANGED: OPEN — {info}"), None);
{
let mut guard = processing_handle.lock().await;
if let Some(ref existing) = *guard {
if !existing.is_finished() {
module_logger().warn(
"NATS processing already running — skipping duplicate start",
None,
);
} else {
let logger_clone = logger.clone();
let store_clone = Arc::clone(&order_books);
let handle = tokio::spawn(async move {
process_nats_messages(store_clone, &logger_clone).await
});
*guard = Some(handle);
}
} else {
let logger_clone = logger.clone();
let store_clone = Arc::clone(&order_books);
let handle = tokio::spawn(async move {
process_nats_messages(store_clone, &logger_clone).await
});
*guard = Some(handle);
}
}
previous_market_status = Some(true);
}
{
let mut guard = processing_handle.lock().await;
if let Some(handle) = guard.as_ref() {
if handle.is_finished() {
module_logger().warn("⚠️ NATS processing task died — restarting", None);
let logger_clone = logger.clone();
let store_clone = Arc::clone(&order_books);
let handle = tokio::spawn(async move {
process_nats_messages(store_clone, &logger_clone).await
});
*guard = Some(handle);
}
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
Ok(Ok((false, info))) => {
if previous_market_status != Some(false) {
module_logger().warn(&format!("⏹️ MARKET STATUS CHANGED: CLOSED — {info}"), None);
if let Some(handle) = processing_handle.lock().await.take() {
handle.abort();
}
previous_market_status = Some(false);
}
tokio::time::sleep(Duration::from_secs(60)).await;
}
Ok(Err(e)) => {
module_logger().error(&format!("❌ Market window error: {e}"), None);
tokio::time::sleep(Duration::from_secs(60)).await;
}
Err(_) => {
module_logger().error(
"❌ processing_market_window timed out after 15 seconds — possible deadlock in trading_hours crate",
None,
);
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
}
}
pub async fn print_best_ask_loop(
store: Arc<OrderBookStore>,
logger: jsonl_logger::jsonl_logger::Logger,
symbol: &str,
) {
tokio::time::sleep(Duration::from_secs(30)).await;
let mut interval = tokio::time::interval(Duration::from_secs(15));
let _ = &logger;
module_logger().info(
&format!("🚀 Starting best ask monitoring for {symbol}"),
None,
);
loop {
interval.tick().await;
match store.get_best_ask(symbol) {
Ok(Some(price)) => {
module_logger().info(&format!("📊 {symbol} - Best ask: {price:.2}"), None);
println!("📊 {symbol} - Best ask: {price:.2}");
}
Ok(None) => {
module_logger().warn(&format!("⚠️ {symbol} - No asks available"), None);
println!("📊 {symbol} - No asks available");
}
Err(e) => {
module_logger().warn(&format!("❌ {symbol} - Error fetching best ask: {e}"), None);
}
}
}
}
#[cfg(test)]
mod process_buffered_messages_tests {
use super::order_book_store_shared::*;
use super::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_process_buffered_messages() {
let _logger_ref = {
let store = setup_test_order_book().await;
store.logger.clone()
};
let store = setup_test_order_book().await;
let logger = store.logger.clone();
let symbol = "BTCUSD";
let market_data1 = create_test_market_data_now(symbol);
let market_data2 = MarketDepthData {
exch_timestamp: market_data1.exch_timestamp + 1000,
..market_data1.clone()
};
let mut messages = vec![
serde_json::to_vec(&market_data1).unwrap(),
serde_json::to_vec(&market_data2).unwrap(),
];
let store_arc = Arc::new(store);
let result = process_buffered_messages(&mut messages, Arc::clone(&store_arc), &logger);
assert!(result.is_ok(), "Expected successful message processing");
if let Some(book) = store_arc.books.get(symbol) {
assert_eq!(
book.get_exch_timestamp(),
market_data2.exch_timestamp,
"Order book should hold the latest timestamp"
);
}
let store2 = setup_test_order_book().await;
let logger2 = store2.logger.clone();
let invalid_message = vec![1, 2, 3, 4];
let store2_arc = Arc::new(store2);
let result = process_buffered_messages(&mut [invalid_message], store2_arc, &logger2);
assert!(result.is_err(), "Expected deserialization error");
}
}
#[cfg(test)]
mod process_nats_messages_tests {
use super::order_book_store_shared::*;
use super::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_process_nats_messages() {
let store = Arc::new(setup_test_order_book().await);
let logger = store.logger.clone();
let store_clone = Arc::clone(&store);
let handle = tokio::spawn(async move {
let _ = process_nats_messages(store_clone, &logger).await;
});
tokio::select! {
result = handle => {
assert!(result.is_ok(), "process_nats_messages task panicked");
}
_ = tokio::time::sleep(Duration::from_secs(2)) => {
}
}
}
}
#[cfg(test)]
mod timestamp_ordering_tests {
use super::order_book_store_shared::*;
use super::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_timestamp_ordering() {
let store = setup_test_order_book().await;
let symbol = "BTCUSD";
let new_data = MarketDepthData {
exch_timestamp: 2000,
..create_test_market_data_now(symbol)
};
let old_data = MarketDepthData {
exch_timestamp: 1000,
..create_test_market_data_now(symbol)
};
store.update_book(&new_data).unwrap();
store.update_book(&old_data).unwrap();
assert_eq!(
store.get_symbol_timestamp(symbol).unwrap(),
Some(2000),
"Order book should maintain the newer timestamp"
);
}
}
#[cfg(test)]
mod concurrency_stress_tests {
use super::order_book_store_shared::*;
use super::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_concurrency_stress() {
let store = Arc::new(setup_test_order_book().await);
let symbol = "BTCUSD";
let num_tasks = 10;
let updates_per_task = 100;
let mut handles = vec![];
for task_id in 0..num_tasks {
let store_clone = Arc::clone(&store);
let symbol_clone = symbol.to_string();
let handle = tokio::spawn(async move {
for update_id in 0..updates_per_task {
let base_ts = 1_000_000 + (task_id * updates_per_task + update_id) as u64;
let data = MarketDepthData {
unified_symbol: symbol_clone.clone(),
bids: vec![(100.0 + task_id as f64, 10), (99.0, 20)],
asks: vec![(101.0 + task_id as f64, 5), (102.0, 15)],
exch_timestamp: base_ts,
};
let _ = store_clone.update_book(&data);
}
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("Task should not panic");
}
let _max_ts = 1_000_000 + (num_tasks * updates_per_task) as u64 - 1;
let final_ts = store.get_symbol_timestamp(symbol).unwrap();
assert!(
final_ts.is_some(),
"Symbol should exist after concurrent updates"
);
let final_ts = final_ts.unwrap();
assert!(
final_ts >= 1_000_000,
"Timestamp should be from our test data, got: {}",
final_ts
);
let store2 = Arc::new(setup_test_order_book().await);
let symbol2 = "ETHUSD";
let num_readers = 5;
let store_writer = Arc::clone(&store2);
let symbol_writer = symbol2.to_string();
let writer_handle = tokio::spawn(async move {
for i in 0..50 {
let data = MarketDepthData {
unified_symbol: symbol_writer.clone(),
bids: vec![(3000.0 + i as f64, 10)],
asks: vec![(3100.0 + i as f64, 5)],
exch_timestamp: 2_000_000 + i as u64,
};
let _ = store_writer.update_book(&data);
}
});
let mut reader_handles = vec![];
for _ in 0..num_readers {
let store_reader = Arc::clone(&store2);
let symbol_reader = symbol2.to_string();
let handle = tokio::spawn(async move {
for _ in 0..50 {
let _ = store_reader.get_best_bid(&symbol_reader);
let _ = store_reader.get_best_ask(&symbol_reader);
let _ = store_reader.calculate_mid_price(&symbol_reader);
}
});
reader_handles.push(handle);
}
writer_handle.await.expect("Writer task should not panic");
for handle in reader_handles {
handle.await.expect("Reader task should not panic");
}
}
}
#[cfg(test)]
mod calculate_total_value_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_calculate_total_value() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
let total = store.calculate_total_ask_value("BTC").unwrap().unwrap();
assert!((total - 7770.0).abs() < 0.01);
let total = store.calculate_total_bid_value("BTC").unwrap().unwrap();
assert!((total - 9755.0).abs() < 0.01);
assert_eq!(store.calculate_total_ask_value("ETH").unwrap(), None);
assert_eq!(store.calculate_total_bid_value("ETH").unwrap(), None);
}
}
#[cfg(test)]
mod get_order_book_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_get_order_book() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
let book = store.get_order_book("BTC").unwrap();
assert_eq!(book.get_exch_timestamp(), 1000);
let err = store.get_order_book("ETH").unwrap_err();
assert!(err.to_string().contains("Stale order book data for ETH"));
}
}
#[cfg(test)]
mod get_all_symbols_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_get_all_symbols() {
let store = setup_test_order_book().await;
assert!(store.get_all_symbols().is_empty());
store
.update_book(&create_test_market_data("BTC", 1000))
.unwrap();
store
.update_book(&create_test_market_data("ETH", 1000))
.unwrap();
let symbols = store.get_all_symbols();
assert_eq!(symbols.len(), 2);
assert!(symbols.contains(&"BTC".to_string()));
assert!(symbols.contains(&"ETH".to_string()));
}
}
#[cfg(test)]
mod run_limit_order_book_tests {
use super::order_book_store_shared::*;
use super::*;
use jsonl_logger::jsonl_logger::init;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_run_limit_order_book() {
let logger = init().expect("Failed to init jsonl_logger");
let store = run_limit_order_book(logger.clone(), "test").await;
assert!(store.is_ok());
let store = store.unwrap();
assert!(Arc::strong_count(&store) >= 1);
let data = create_test_market_data("TEST", 1000);
assert!(store.update_book(&data).is_ok());
}
}
#[cfg(test)]
mod report_metrics_tests {
use super::order_book_store_shared::*;
use serial_test::serial;
#[tokio::test]
#[serial]
async fn test_report_metrics() {
let store = setup_test_order_book().await;
let data = create_test_market_data("BTC", 1000);
store.update_book(&data).unwrap();
assert!(store.report_metrics().await.is_ok());
assert!(store.report_metrics().await.is_ok());
}
}