use std::{num::NonZeroUsize, sync::OnceLock};
use ahash::AHashMap;
use nautilus_model::{
data::{BarType, DataType},
identifiers::{ClientOrderId, InstrumentId, OptionSeriesId, PositionId, StrategyId, Venue},
};
use super::mstr::{Endpoint, MStr, Pattern, Topic};
use crate::msgbus::get_message_bus;
pub const CLOSE_TOPIC: &str = "CLOSE";
static DATA_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static DATA_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static DATA_PROCESS_ANY_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static DATA_PROCESS_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
#[cfg(feature = "defi")]
static DATA_PROCESS_DEFI_DATA_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static DATA_RESPONSE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static EXEC_QUEUE_COMMAND_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static EXEC_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static EXEC_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static EXEC_RECONCILE_REPORT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static RISK_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static RISK_QUEUE_EXECUTE_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static RISK_PROCESS_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static ORDER_EMULATOR_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static PORTFOLIO_ACCOUNT_ENDPOINT: OnceLock<MStr<Endpoint>> = OnceLock::new();
static SHUTDOWN_SYSTEM_TOPIC: OnceLock<MStr<Topic>> = OnceLock::new();
macro_rules! define_switchboard {
($(
$field:ident: $key_ty:ty,
$method:ident($($arg_name:ident: $arg_ty:ty),*) -> $key_expr:expr,
$val_fmt:expr,
$($val_args:expr),*
);* $(;)?) => {
#[derive(Clone, Debug)]
pub struct MessagingSwitchboard {
$(
$field: AHashMap<$key_ty, MStr<Topic>>,
)*
instruments_patterns: AHashMap<Venue, MStr<Pattern>>,
signal_topics: AHashMap<String, MStr<Topic>>,
signal_patterns: AHashMap<String, MStr<Pattern>>,
#[cfg(feature = "defi")]
pub(crate) defi: crate::defi::switchboard::DefiSwitchboard,
}
impl Default for MessagingSwitchboard {
fn default() -> Self {
Self {
$(
$field: AHashMap::new(),
)*
instruments_patterns: AHashMap::new(),
signal_topics: AHashMap::new(),
signal_patterns: AHashMap::new(),
#[cfg(feature = "defi")]
defi: crate::defi::switchboard::DefiSwitchboard::default(),
}
}
}
impl MessagingSwitchboard {
#[inline]
#[must_use]
pub fn data_engine_queue_execute() -> MStr<Endpoint> {
*DATA_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "DataEngine.queue_execute".into())
}
#[inline]
#[must_use]
pub fn data_engine_execute() -> MStr<Endpoint> {
*DATA_EXECUTE_ENDPOINT.get_or_init(|| "DataEngine.execute".into())
}
#[inline]
#[must_use]
pub fn data_engine_process() -> MStr<Endpoint> {
*DATA_PROCESS_ANY_ENDPOINT.get_or_init(|| "DataEngine.process".into())
}
#[inline]
#[must_use]
pub fn data_engine_process_data() -> MStr<Endpoint> {
*DATA_PROCESS_DATA_ENDPOINT.get_or_init(|| "DataEngine.process_data".into())
}
#[cfg(feature = "defi")]
#[inline]
#[must_use]
pub fn data_engine_process_defi_data() -> MStr<Endpoint> {
*DATA_PROCESS_DEFI_DATA_ENDPOINT
.get_or_init(|| "DataEngine.process_defi_data".into())
}
#[inline]
#[must_use]
pub fn data_engine_response() -> MStr<Endpoint> {
*DATA_RESPONSE_ENDPOINT.get_or_init(|| "DataEngine.response".into())
}
#[inline]
#[must_use]
pub fn exec_engine_execute() -> MStr<Endpoint> {
*EXEC_EXECUTE_ENDPOINT.get_or_init(|| "ExecEngine.execute".into())
}
#[inline]
#[must_use]
pub fn exec_engine_queue_execute() -> MStr<Endpoint> {
*EXEC_QUEUE_COMMAND_ENDPOINT.get_or_init(|| "ExecEngine.queue_execute".into())
}
#[inline]
#[must_use]
pub fn exec_engine_process() -> MStr<Endpoint> {
*EXEC_PROCESS_ENDPOINT.get_or_init(|| "ExecEngine.process".into())
}
#[inline]
#[must_use]
pub fn exec_engine_reconcile_execution_report() -> MStr<Endpoint> {
*EXEC_RECONCILE_REPORT_ENDPOINT.get_or_init(|| "ExecEngine.reconcile_execution_report".into())
}
#[inline]
#[must_use]
pub fn risk_engine_execute() -> MStr<Endpoint> {
*RISK_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.execute".into())
}
#[inline]
#[must_use]
pub fn risk_engine_queue_execute() -> MStr<Endpoint> {
*RISK_QUEUE_EXECUTE_ENDPOINT.get_or_init(|| "RiskEngine.queue_execute".into())
}
#[inline]
#[must_use]
pub fn risk_engine_process() -> MStr<Endpoint> {
*RISK_PROCESS_ENDPOINT.get_or_init(|| "RiskEngine.process".into())
}
#[inline]
#[must_use]
pub fn order_emulator_execute() -> MStr<Endpoint> {
*ORDER_EMULATOR_ENDPOINT.get_or_init(|| "OrderEmulator.execute".into())
}
#[inline]
#[must_use]
pub fn portfolio_update_account() -> MStr<Endpoint> {
*PORTFOLIO_ACCOUNT_ENDPOINT.get_or_init(|| "Portfolio.update_account".into())
}
#[inline]
#[must_use]
pub fn shutdown_system_topic() -> MStr<Topic> {
*SHUTDOWN_SYSTEM_TOPIC.get_or_init(|| "commands.system.shutdown".into())
}
#[must_use]
pub fn instruments_pattern(&mut self, venue: Venue) -> MStr<Pattern> {
*self.instruments_patterns
.entry(venue)
.or_insert_with(|| format!("data.instrument.{venue}.*").into())
}
#[must_use]
pub fn signal_topic(&mut self, name: &str) -> MStr<Topic> {
*self
.signal_topics
.entry(name.to_string())
.or_insert_with(|| {
format!(
"data.Signal{}",
nautilus_core::string::conversions::title_case(name)
)
.into()
})
}
#[must_use]
pub fn signal_pattern(&mut self, name: &str) -> MStr<Pattern> {
*self
.signal_patterns
.entry(name.to_string())
.or_insert_with(|| {
format!(
"data.Signal{}*",
nautilus_core::string::conversions::title_case(name)
)
.into()
})
}
$(
#[must_use]
pub fn $method(&mut self, $($arg_name: $arg_ty),*) -> MStr<Topic> {
let key = $key_expr;
*self.$field
.entry(key)
.or_insert_with(|| format!($val_fmt, $($val_args),*).into())
}
)*
}
};
}
define_switchboard! {
custom_topics: DataType,
get_custom_topic(data_type: &DataType) -> data_type.clone(),
"data.{}", data_type.topic();
instruments_topics: Venue,
get_instruments_topic(venue: Venue) -> venue,
"data.instrument.{}", venue;
instrument_topics: InstrumentId,
get_instrument_topic(instrument_id: InstrumentId) -> instrument_id,
"data.instrument.{}.{}", instrument_id.venue, instrument_id.symbol;
book_deltas_topics: InstrumentId,
get_book_deltas_topic(instrument_id: InstrumentId) -> instrument_id,
"data.book.deltas.{}.{}", instrument_id.venue, instrument_id.symbol;
book_depth10_topics: InstrumentId,
get_book_depth10_topic(instrument_id: InstrumentId) -> instrument_id,
"data.book.depth10.{}.{}", instrument_id.venue, instrument_id.symbol;
book_snapshots_topics: (InstrumentId, NonZeroUsize),
get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> (instrument_id, interval_ms),
"data.book.snapshots.{}.{}.{}", instrument_id.venue, instrument_id.symbol, interval_ms;
quote_topics: InstrumentId,
get_quotes_topic(instrument_id: InstrumentId) -> instrument_id,
"data.quotes.{}.{}", instrument_id.venue, instrument_id.symbol;
trade_topics: InstrumentId,
get_trades_topic(instrument_id: InstrumentId) -> instrument_id,
"data.trades.{}.{}", instrument_id.venue, instrument_id.symbol;
bar_topics: BarType,
get_bars_topic(bar_type: BarType) -> bar_type,
"data.bars.{}", bar_type;
mark_price_topics: InstrumentId,
get_mark_price_topic(instrument_id: InstrumentId) -> instrument_id,
"data.mark_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
index_price_topics: InstrumentId,
get_index_price_topic(instrument_id: InstrumentId) -> instrument_id,
"data.index_prices.{}.{}", instrument_id.venue, instrument_id.symbol;
funding_rate_topics: InstrumentId,
get_funding_rate_topic(instrument_id: InstrumentId) -> instrument_id,
"data.funding_rates.{}.{}", instrument_id.venue, instrument_id.symbol;
instrument_status_topics: InstrumentId,
get_instrument_status_topic(instrument_id: InstrumentId) -> instrument_id,
"data.status.{}.{}", instrument_id.venue, instrument_id.symbol;
instrument_close_topics: InstrumentId,
get_instrument_close_topic(instrument_id: InstrumentId) -> instrument_id,
"data.close.{}.{}", instrument_id.venue, instrument_id.symbol;
option_greeks_topics: InstrumentId,
get_option_greeks_topic(instrument_id: InstrumentId) -> instrument_id,
"data.option_greeks.{}.{}", instrument_id.venue, instrument_id.symbol;
option_chain_topics: OptionSeriesId,
get_option_chain_topic(series_id: OptionSeriesId) -> series_id,
"data.option_chain.{}", series_id;
order_fills_topics: InstrumentId,
get_order_fills_topic(instrument_id: InstrumentId) -> instrument_id,
"events.fills.{}", instrument_id;
order_cancels_topics: InstrumentId,
get_order_cancels_topic(instrument_id: InstrumentId) -> instrument_id,
"events.cancels.{}", instrument_id;
order_snapshots_topics: ClientOrderId,
get_order_snapshots_topic(client_order_id: ClientOrderId) -> client_order_id,
"order.snapshots.{}", client_order_id;
positions_snapshots_topics: PositionId,
get_positions_snapshots_topic(position_id: PositionId) -> position_id,
"positions.snapshots.{}", position_id;
event_orders_topics: StrategyId,
get_event_orders_topic(strategy_id: StrategyId) -> strategy_id,
"events.order.{}", strategy_id;
event_positions_topics: StrategyId,
get_event_positions_topic(strategy_id: StrategyId) -> strategy_id,
"events.position.{}", strategy_id;
}
macro_rules! define_wrappers {
($($method:ident($($arg_name:ident: $arg_ty:ty),*) -> $ret:ty),* $(,)?) => {
$(
#[must_use]
pub fn $method($($arg_name: $arg_ty),*) -> $ret {
get_message_bus()
.borrow_mut()
.switchboard
.$method($($arg_name),*)
}
)*
}
}
define_wrappers! {
get_custom_topic(data_type: &DataType) -> MStr<Topic>,
get_instruments_topic(venue: Venue) -> MStr<Topic>,
get_instrument_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_book_deltas_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_book_depth10_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_book_snapshots_topic(instrument_id: InstrumentId, interval_ms: NonZeroUsize) -> MStr<Topic>,
get_quotes_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_trades_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_bars_topic(bar_type: BarType) -> MStr<Topic>,
get_mark_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_index_price_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_funding_rate_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_instrument_status_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_instrument_close_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_option_greeks_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_option_chain_topic(series_id: OptionSeriesId) -> MStr<Topic>,
get_order_fills_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_order_cancels_topic(instrument_id: InstrumentId) -> MStr<Topic>,
get_order_snapshots_topic(client_order_id: ClientOrderId) -> MStr<Topic>,
get_positions_snapshots_topic(position_id: PositionId) -> MStr<Topic>,
get_event_orders_topic(strategy_id: StrategyId) -> MStr<Topic>,
get_event_positions_topic(strategy_id: StrategyId) -> MStr<Topic>,
}
#[must_use]
pub fn get_instruments_pattern(venue: Venue) -> MStr<Pattern> {
get_message_bus()
.borrow_mut()
.switchboard
.instruments_pattern(venue)
}
#[must_use]
pub fn get_signal_topic(name: &str) -> MStr<Topic> {
get_message_bus()
.borrow_mut()
.switchboard
.signal_topic(name)
}
#[must_use]
pub fn get_signal_pattern(name: &str) -> MStr<Pattern> {
get_message_bus()
.borrow_mut()
.switchboard
.signal_pattern(name)
}
#[cfg(test)]
mod tests {
use nautilus_model::{
data::{BarType, DataType},
identifiers::{InstrumentId, Venue},
};
use rstest::*;
use super::*;
use crate::msgbus::matching::is_matching_backtracking;
#[fixture]
fn switchboard() -> MessagingSwitchboard {
MessagingSwitchboard::default()
}
#[fixture]
fn instrument_id() -> InstrumentId {
InstrumentId::from("ESZ24.XCME")
}
#[rstest]
fn test_get_custom_topic(mut switchboard: MessagingSwitchboard) {
let data_type = DataType::new("ExampleDataType", None, None);
let expected_topic = "data.ExampleDataType".into();
let result = switchboard.get_custom_topic(&data_type);
assert_eq!(result, expected_topic);
assert!(switchboard.custom_topics.contains_key(&data_type));
}
#[rstest]
fn test_get_instrument_topic(
mut switchboard: MessagingSwitchboard,
instrument_id: InstrumentId,
) {
let expected_topic = "data.instrument.XCME.ESZ24".into();
let result = switchboard.get_instrument_topic(instrument_id);
assert_eq!(result, expected_topic);
assert!(switchboard.instrument_topics.contains_key(&instrument_id));
}
#[rstest]
fn test_get_book_deltas_topic(
mut switchboard: MessagingSwitchboard,
instrument_id: InstrumentId,
) {
let expected_topic = "data.book.deltas.XCME.ESZ24".into();
let result = switchboard.get_book_deltas_topic(instrument_id);
assert_eq!(result, expected_topic);
assert!(switchboard.book_deltas_topics.contains_key(&instrument_id));
}
#[rstest]
fn test_get_book_depth10_topic(
mut switchboard: MessagingSwitchboard,
instrument_id: InstrumentId,
) {
let expected_topic = "data.book.depth10.XCME.ESZ24".into();
let result = switchboard.get_book_depth10_topic(instrument_id);
assert_eq!(result, expected_topic);
assert!(switchboard.book_depth10_topics.contains_key(&instrument_id));
}
#[rstest]
fn test_get_book_snapshots_topic(
mut switchboard: MessagingSwitchboard,
instrument_id: InstrumentId,
) {
let expected_topic = "data.book.snapshots.XCME.ESZ24.1000".into();
let interval_ms = NonZeroUsize::new(1000).unwrap();
let result = switchboard.get_book_snapshots_topic(instrument_id, interval_ms);
assert_eq!(result, expected_topic);
assert!(
switchboard
.book_snapshots_topics
.contains_key(&(instrument_id, interval_ms))
);
}
#[rstest]
fn test_get_quotes_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
let expected_topic = "data.quotes.XCME.ESZ24".into();
let result = switchboard.get_quotes_topic(instrument_id);
assert_eq!(result, expected_topic);
assert!(switchboard.quote_topics.contains_key(&instrument_id));
}
#[rstest]
fn test_get_trades_topic(mut switchboard: MessagingSwitchboard, instrument_id: InstrumentId) {
let expected_topic = "data.trades.XCME.ESZ24".into();
let result = switchboard.get_trades_topic(instrument_id);
assert_eq!(result, expected_topic);
assert!(switchboard.trade_topics.contains_key(&instrument_id));
}
#[rstest]
fn test_get_bars_topic(mut switchboard: MessagingSwitchboard) {
let bar_type = BarType::from("ESZ24.XCME-1-MINUTE-LAST-INTERNAL");
let expected_topic = format!("data.bars.{bar_type}").into();
let result = switchboard.get_bars_topic(bar_type);
assert_eq!(result, expected_topic);
assert!(switchboard.bar_topics.contains_key(&bar_type));
}
#[rstest]
fn test_get_order_snapshots_topic(mut switchboard: MessagingSwitchboard) {
let client_order_id = ClientOrderId::from("O-123456789");
let expected_topic = format!("order.snapshots.{client_order_id}").into();
let result = switchboard.get_order_snapshots_topic(client_order_id);
assert_eq!(result, expected_topic);
assert!(
switchboard
.order_snapshots_topics
.contains_key(&client_order_id)
);
}
#[rstest]
fn test_instruments_pattern_matches_instrument_topic(
mut switchboard: MessagingSwitchboard,
instrument_id: InstrumentId,
) {
let venue = instrument_id.venue;
let pattern = switchboard.instruments_pattern(venue);
let topic = switchboard.get_instrument_topic(instrument_id);
assert_eq!(pattern.as_ref(), "data.instrument.XCME.*");
assert!(is_matching_backtracking(topic, pattern));
}
#[rstest]
fn test_instruments_pattern_does_not_match_other_venue(mut switchboard: MessagingSwitchboard) {
let pattern = switchboard.instruments_pattern(Venue::from("BINANCE"));
let topic = switchboard.get_instrument_topic(InstrumentId::from("ESZ24.XCME"));
assert!(!is_matching_backtracking(topic, pattern));
}
}