use std::{cell::RefCell, fmt::Debug, sync::Arc};
use crate::{
messages::{data::DataCommand, execution::TradingCommand},
msgbus::{self, MessagingSwitchboard},
timer::TimeEventHandler,
};
pub trait DataCommandSender {
fn execute(&self, command: DataCommand);
}
#[derive(Debug)]
pub struct SyncDataCommandSender;
impl DataCommandSender for SyncDataCommandSender {
fn execute(&self, command: DataCommand) {
DATA_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
}
}
pub fn drain_data_cmd_queue() {
DATA_CMD_QUEUE.with(|q| {
let commands: Vec<DataCommand> = q.borrow_mut().drain(..).collect();
let endpoint = MessagingSwitchboard::data_engine_execute();
for cmd in commands {
msgbus::send_data_command(endpoint, cmd);
}
});
}
pub fn data_cmd_queue_is_empty() -> bool {
DATA_CMD_QUEUE.with(|q| q.borrow().is_empty())
}
#[must_use]
pub fn get_data_cmd_sender() -> Arc<dyn DataCommandSender> {
DATA_CMD_SENDER.with(|sender| {
sender
.borrow()
.as_ref()
.expect("Data command sender should be initialized by runner")
.clone()
})
}
pub fn set_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
DATA_CMD_SENDER.with(|s| {
let mut slot = s.borrow_mut();
assert!(slot.is_none(), "Data command sender can only be set once");
*slot = Some(sender);
});
}
pub fn replace_data_cmd_sender(sender: Arc<dyn DataCommandSender>) {
DATA_CMD_SENDER.with(|s| {
*s.borrow_mut() = Some(sender);
});
}
pub trait TimeEventSender: Debug + Send + Sync {
fn send(&self, handler: TimeEventHandler);
}
#[must_use]
pub fn get_time_event_sender() -> Arc<dyn TimeEventSender> {
TIME_EVENT_SENDER.with(|sender| {
sender
.borrow()
.as_ref()
.expect("Time event sender should be initialized by runner")
.clone()
})
}
#[must_use]
pub fn try_get_time_event_sender() -> Option<Arc<dyn TimeEventSender>> {
TIME_EVENT_SENDER.with(|sender| sender.borrow().as_ref().cloned())
}
pub fn set_time_event_sender(sender: Arc<dyn TimeEventSender>) {
TIME_EVENT_SENDER.with(|s| {
let mut slot = s.borrow_mut();
assert!(slot.is_none(), "Time event sender can only be set once");
*slot = Some(sender);
});
}
pub fn replace_time_event_sender(sender: Arc<dyn TimeEventSender>) {
TIME_EVENT_SENDER.with(|s| {
*s.borrow_mut() = Some(sender);
});
}
pub trait TradingCommandSender {
fn execute(&self, command: TradingCommand);
}
#[derive(Debug)]
pub struct SyncTradingCommandSender;
impl TradingCommandSender for SyncTradingCommandSender {
fn execute(&self, command: TradingCommand) {
TRADING_CMD_QUEUE.with(|q| q.borrow_mut().push(command));
}
}
pub fn drain_trading_cmd_queue() {
TRADING_CMD_QUEUE.with(|q| {
let commands: Vec<TradingCommand> = q.borrow_mut().drain(..).collect();
let endpoint = MessagingSwitchboard::exec_engine_execute();
for cmd in commands {
msgbus::send_trading_command(endpoint, cmd);
}
});
}
pub fn trading_cmd_queue_is_empty() -> bool {
TRADING_CMD_QUEUE.with(|q| q.borrow().is_empty())
}
#[must_use]
pub fn get_trading_cmd_sender() -> Arc<dyn TradingCommandSender> {
EXEC_CMD_SENDER.with(|sender| {
sender
.borrow()
.as_ref()
.expect("Trading command sender should be initialized by runner")
.clone()
})
}
#[must_use]
pub fn try_get_trading_cmd_sender() -> Option<Arc<dyn TradingCommandSender>> {
EXEC_CMD_SENDER.with(|sender| sender.borrow().as_ref().cloned())
}
pub fn set_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
EXEC_CMD_SENDER.with(|s| {
let mut slot = s.borrow_mut();
assert!(
slot.is_none(),
"Trading command sender can only be set once"
);
*slot = Some(sender);
});
}
pub fn replace_exec_cmd_sender(sender: Arc<dyn TradingCommandSender>) {
EXEC_CMD_SENDER.with(|s| {
*s.borrow_mut() = Some(sender);
});
}
thread_local! {
static TIME_EVENT_SENDER: RefCell<Option<Arc<dyn TimeEventSender>>> = const { RefCell::new(None) };
static DATA_CMD_SENDER: RefCell<Option<Arc<dyn DataCommandSender>>> = const { RefCell::new(None) };
static EXEC_CMD_SENDER: RefCell<Option<Arc<dyn TradingCommandSender>>> = const { RefCell::new(None) };
static DATA_CMD_QUEUE: RefCell<Vec<DataCommand>> = const { RefCell::new(Vec::new()) };
static TRADING_CMD_QUEUE: RefCell<Vec<TradingCommand>> = const { RefCell::new(Vec::new()) };
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use rstest::rstest;
use super::*;
#[derive(Debug)]
struct NoopTimeEventSender;
impl TimeEventSender for NoopTimeEventSender {
fn send(&self, _handler: TimeEventHandler) {}
}
#[rstest]
fn test_replace_data_cmd_sender_overwrites_previous() {
std::thread::spawn(|| {
replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
let _sender = get_data_cmd_sender();
})
.join()
.unwrap();
}
#[rstest]
fn test_replace_exec_cmd_sender_overwrites_previous() {
std::thread::spawn(|| {
replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
let _sender = get_trading_cmd_sender();
})
.join()
.unwrap();
}
#[rstest]
fn test_replace_time_event_sender_overwrites_previous() {
std::thread::spawn(|| {
replace_time_event_sender(Arc::new(NoopTimeEventSender));
replace_time_event_sender(Arc::new(NoopTimeEventSender));
let _sender = get_time_event_sender();
})
.join()
.unwrap();
}
#[rstest]
fn test_set_data_cmd_sender_panics_on_double_set() {
let result = std::thread::spawn(|| {
set_data_cmd_sender(Arc::new(SyncDataCommandSender));
set_data_cmd_sender(Arc::new(SyncDataCommandSender));
})
.join();
assert!(result.is_err());
}
#[rstest]
fn test_set_exec_cmd_sender_panics_on_double_set() {
let result = std::thread::spawn(|| {
set_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
set_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
})
.join();
assert!(result.is_err());
}
#[rstest]
fn test_set_time_event_sender_panics_on_double_set() {
let result = std::thread::spawn(|| {
set_time_event_sender(Arc::new(NoopTimeEventSender));
set_time_event_sender(Arc::new(NoopTimeEventSender));
})
.join();
assert!(result.is_err());
}
#[rstest]
fn test_try_get_time_event_sender_returns_none_when_unset() {
let result = std::thread::spawn(try_get_time_event_sender)
.join()
.unwrap();
assert!(result.is_none());
}
#[rstest]
fn test_try_get_trading_cmd_sender_returns_none_when_unset() {
let is_none = std::thread::spawn(|| try_get_trading_cmd_sender().is_none())
.join()
.unwrap();
assert!(is_none);
}
}