use crate::orderbook::OrderBook;
use crate::orderbook::error::ManagerError;
use crate::orderbook::mass_cancel::MassCancelResult;
use crate::orderbook::trade::{TradeEvent, TradeListener, TradeResult};
use pricelevel::{Hash32, Side};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{error, info};
pub trait BookManager<T>
where
T: Clone + Send + Sync + Default + 'static,
{
fn add_book(&mut self, symbol: &str);
fn get_book(&self, symbol: &str) -> Option<&OrderBook<T>>;
fn get_book_mut(&mut self, symbol: &str) -> Option<&mut OrderBook<T>>;
fn symbols(&self) -> Vec<String>;
fn remove_book(&mut self, symbol: &str) -> Option<OrderBook<T>>;
fn has_book(&self, symbol: &str) -> bool;
fn book_count(&self) -> usize;
}
pub struct BookManagerStd<T>
where
T: Clone + Send + Sync + Default + 'static,
{
books: HashMap<String, OrderBook<T>>,
trade_sender: std::sync::mpsc::Sender<TradeEvent>,
trade_receiver: Option<std::sync::mpsc::Receiver<TradeEvent>>,
}
impl<T> BookManagerStd<T>
where
T: Clone + Send + Sync + Default + 'static,
{
pub fn new() -> Self {
let (sender, receiver) = std::sync::mpsc::channel();
Self {
books: HashMap::new(),
trade_sender: sender,
trade_receiver: Some(receiver),
}
}
pub fn start_trade_processor(&mut self) -> Result<std::thread::JoinHandle<()>, ManagerError> {
let receiver = self
.trade_receiver
.take()
.ok_or(ManagerError::ProcessorAlreadyStarted)?;
Ok(std::thread::spawn(move || {
info!("Trade processor started");
while let Ok(trade_event) = receiver.recv() {
Self::process_trade_event(trade_event);
}
info!("Trade processor stopped");
}))
}
fn process_trade_event(event: TradeEvent) {
info!(
"Processing trade for {}: {} trades, executed quantity: {}",
event.symbol,
event.trade_result.match_result.trades().as_vec().len(),
event
.trade_result
.match_result
.executed_quantity()
.unwrap_or(0)
);
for trade in event.trade_result.match_result.trades().as_vec() {
info!(
" Trade: {} units at price {} (ID: {})",
trade.quantity(),
trade.price(),
trade.trade_id()
);
}
}
}
impl<T> BookManagerStd<T>
where
T: Clone + Send + Sync + Default + 'static,
{
#[must_use]
pub fn cancel_all_across_books(&self) -> HashMap<String, MassCancelResult> {
self.books
.iter()
.map(|(symbol, book)| (symbol.clone(), book.cancel_all_orders()))
.collect()
}
#[must_use]
pub fn cancel_by_user_across_books(
&self,
user_id: Hash32,
) -> HashMap<String, MassCancelResult> {
self.books
.iter()
.map(|(symbol, book)| (symbol.clone(), book.cancel_orders_by_user(user_id)))
.collect()
}
#[must_use]
pub fn cancel_by_side_across_books(&self, side: Side) -> HashMap<String, MassCancelResult> {
self.books
.iter()
.map(|(symbol, book)| (symbol.clone(), book.cancel_orders_by_side(side)))
.collect()
}
}
impl<T> BookManager<T> for BookManagerStd<T>
where
T: Clone + Send + Sync + Default + 'static,
{
fn add_book(&mut self, symbol: &str) {
let sender = self.trade_sender.clone();
let symbol_clone = symbol.to_string();
let trade_listener: TradeListener = Arc::new(move |trade_result: &TradeResult| {
let trade_event = TradeEvent {
symbol: trade_result.symbol.clone(),
trade_result: trade_result.clone(),
timestamp: crate::current_time_millis(),
};
if let Err(e) = sender.send(trade_event) {
error!("Failed to send trade event for {}: {}", symbol_clone, e);
}
});
let book = OrderBook::with_trade_listener(symbol, trade_listener);
self.books.insert(symbol.to_string(), book);
info!("Added order book for symbol: {}", symbol);
}
fn get_book(&self, symbol: &str) -> Option<&OrderBook<T>> {
self.books.get(symbol)
}
fn get_book_mut(&mut self, symbol: &str) -> Option<&mut OrderBook<T>> {
self.books.get_mut(symbol)
}
fn symbols(&self) -> Vec<String> {
self.books.keys().cloned().collect()
}
fn remove_book(&mut self, symbol: &str) -> Option<OrderBook<T>> {
let result = self.books.remove(symbol);
if result.is_some() {
info!("Removed order book for symbol: {}", symbol);
}
result
}
fn has_book(&self, symbol: &str) -> bool {
self.books.contains_key(symbol)
}
fn book_count(&self) -> usize {
self.books.len()
}
}
impl<T> Default for BookManagerStd<T>
where
T: Clone + Send + Sync + Default + 'static,
{
fn default() -> Self {
Self::new()
}
}
pub struct BookManagerTokio<T>
where
T: Clone + Send + Sync + Default + 'static,
{
books: HashMap<String, OrderBook<T>>,
trade_sender: tokio::sync::mpsc::UnboundedSender<TradeEvent>,
trade_receiver: Option<tokio::sync::mpsc::UnboundedReceiver<TradeEvent>>,
}
impl<T> BookManagerTokio<T>
where
T: Clone + Send + Sync + Default + 'static,
{
pub fn new() -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
Self {
books: HashMap::new(),
trade_sender: sender,
trade_receiver: Some(receiver),
}
}
pub fn start_trade_processor(&mut self) -> Result<tokio::task::JoinHandle<()>, ManagerError> {
let mut receiver = self
.trade_receiver
.take()
.ok_or(ManagerError::ProcessorAlreadyStarted)?;
Ok(tokio::spawn(async move {
info!("Trade processor started (Tokio)");
while let Some(trade_event) = receiver.recv().await {
Self::process_trade_event(trade_event);
}
info!("Trade processor stopped (Tokio)");
}))
}
fn process_trade_event(event: TradeEvent) {
info!(
"Processing trade for {}: {} trades, executed quantity: {}",
event.symbol,
event.trade_result.match_result.trades().as_vec().len(),
event
.trade_result
.match_result
.executed_quantity()
.unwrap_or(0)
);
for trade in event.trade_result.match_result.trades().as_vec() {
info!(
" Trade: {} units at price {} (ID: {})",
trade.quantity(),
trade.price(),
trade.trade_id()
);
}
}
}
impl<T> BookManagerTokio<T>
where
T: Clone + Send + Sync + Default + 'static,
{
#[must_use]
pub fn cancel_all_across_books(&self) -> HashMap<String, MassCancelResult> {
self.books
.iter()
.map(|(symbol, book)| (symbol.clone(), book.cancel_all_orders()))
.collect()
}
#[must_use]
pub fn cancel_by_user_across_books(
&self,
user_id: Hash32,
) -> HashMap<String, MassCancelResult> {
self.books
.iter()
.map(|(symbol, book)| (symbol.clone(), book.cancel_orders_by_user(user_id)))
.collect()
}
#[must_use]
pub fn cancel_by_side_across_books(&self, side: Side) -> HashMap<String, MassCancelResult> {
self.books
.iter()
.map(|(symbol, book)| (symbol.clone(), book.cancel_orders_by_side(side)))
.collect()
}
}
impl<T> BookManager<T> for BookManagerTokio<T>
where
T: Clone + Send + Sync + Default + 'static,
{
fn add_book(&mut self, symbol: &str) {
let sender = self.trade_sender.clone();
let symbol_clone = symbol.to_string();
let trade_listener: TradeListener = Arc::new(move |trade_result: &TradeResult| {
let trade_event = TradeEvent {
symbol: trade_result.symbol.clone(),
trade_result: trade_result.clone(),
timestamp: crate::current_time_millis(),
};
if let Err(e) = sender.send(trade_event) {
error!("Failed to send trade event for {}: {}", symbol_clone, e);
}
});
let book = OrderBook::with_trade_listener(symbol, trade_listener);
self.books.insert(symbol.to_string(), book);
info!("Added order book for symbol: {}", symbol);
}
fn get_book(&self, symbol: &str) -> Option<&OrderBook<T>> {
self.books.get(symbol)
}
fn get_book_mut(&mut self, symbol: &str) -> Option<&mut OrderBook<T>> {
self.books.get_mut(symbol)
}
fn symbols(&self) -> Vec<String> {
self.books.keys().cloned().collect()
}
fn remove_book(&mut self, symbol: &str) -> Option<OrderBook<T>> {
let result = self.books.remove(symbol);
if result.is_some() {
info!("Removed order book for symbol: {}", symbol);
}
result
}
fn has_book(&self, symbol: &str) -> bool {
self.books.contains_key(symbol)
}
fn book_count(&self) -> usize {
self.books.len()
}
}
impl<T> Default for BookManagerTokio<T>
where
T: Clone + Send + Sync + Default + 'static,
{
fn default() -> Self {
Self::new()
}
}