mod api;
mod backing;
pub mod config;
pub mod core;
pub mod matching;
pub mod message;
pub mod mstr;
pub mod stubs;
pub mod switchboard;
pub mod typed_endpoints;
pub mod typed_handler;
pub mod typed_router;
pub(crate) mod external;
use std::{
any::Any,
cell::{Cell, RefCell},
rc::Rc,
};
use nautilus_core::UUID4;
#[cfg(feature = "defi")]
use nautilus_model::defi::{Block, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap};
use nautilus_model::{
data::{
Bar, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas,
OrderBookDepth10, QuoteTick, TradeTick,
option_chain::{OptionChainSlice, OptionGreeks},
},
events::{AccountState, OrderEventAny, PortfolioSnapshot, PositionEvent},
instruments::InstrumentAny,
orderbook::OrderBook,
};
use smallvec::SmallVec;
#[cfg(feature = "live")]
pub use self::backing::{
MessageBusExternalIngress, MessageBusExternalReceiver, external_io_from_backing,
};
pub use self::{
api::*,
backing::{
MessageBusBacking, MessageBusBackingFactory, MessageBusExternalEgress,
external_egress_from_backing,
},
config::MessageBusConfig,
core::{MessageBus, Subscription},
message::{BusMessage, BusPayloadCategory, BusPayloadType},
mstr::{Endpoint, MStr, Pattern, Topic},
switchboard::MessagingSwitchboard,
typed_endpoints::{EndpointMap, IntoEndpointMap},
typed_handler::{
CallbackHandler, Handler, IntoHandler, ShareableMessageHandler, TypedHandler,
TypedIntoHandler,
},
typed_router::{TopicRouter, TypedSubscription},
};
use crate::timer::TimeEvent;
pub(super) const HANDLER_BUFFER_CAP: usize = 64;
thread_local! {
pub(super) static MESSAGE_BUS: RefCell<Option<Rc<RefCell<MessageBus>>>> = const { RefCell::new(None) };
pub(super) static HAS_EXTERNAL_EGRESS: Cell<bool> = const { Cell::new(false) };
pub(super) static SUPPRESS_EXTERNAL_DEPTH: Cell<u32> = const { Cell::new(0) };
pub(super) static ANY_HANDLERS: RefCell<SmallVec<[ShareableMessageHandler; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static DELTAS_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBookDeltas>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static DEPTH10_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBookDepth10>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static BOOK_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderBook>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static QUOTE_HANDLERS: RefCell<SmallVec<[TypedHandler<QuoteTick>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static TRADE_HANDLERS: RefCell<SmallVec<[TypedHandler<TradeTick>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static BAR_HANDLERS: RefCell<SmallVec<[TypedHandler<Bar>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static MARK_PRICE_HANDLERS: RefCell<SmallVec<[TypedHandler<MarkPriceUpdate>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static INDEX_PRICE_HANDLERS: RefCell<SmallVec<[TypedHandler<IndexPriceUpdate>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static FUNDING_RATE_HANDLERS: RefCell<SmallVec<[TypedHandler<FundingRateUpdate>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static GREEKS_HANDLERS: RefCell<SmallVec<[TypedHandler<GreeksData>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static OPTION_GREEKS_HANDLERS: RefCell<SmallVec<[TypedHandler<OptionGreeks>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static OPTION_CHAIN_HANDLERS: RefCell<SmallVec<[TypedHandler<OptionChainSlice>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static ACCOUNT_STATE_HANDLERS: RefCell<SmallVec<[TypedHandler<AccountState>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static PORTFOLIO_SNAPSHOT_HANDLERS: RefCell<SmallVec<[TypedHandler<PortfolioSnapshot>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static ORDER_EVENT_HANDLERS: RefCell<SmallVec<[TypedHandler<OrderEventAny>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static POSITION_EVENT_HANDLERS: RefCell<SmallVec<[TypedHandler<PositionEvent>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
pub(super) static INSTRUMENT_HANDLERS: RefCell<SmallVec<[TypedHandler<InstrumentAny>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
#[cfg(feature = "defi")]
pub(super) static DEFI_BLOCK_HANDLERS: RefCell<SmallVec<[TypedHandler<Block>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
#[cfg(feature = "defi")]
pub(super) static DEFI_POOL_HANDLERS: RefCell<SmallVec<[TypedHandler<Pool>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
#[cfg(feature = "defi")]
pub(super) static DEFI_SWAP_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolSwap>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
#[cfg(feature = "defi")]
pub(super) static DEFI_LIQUIDITY_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolLiquidityUpdate>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
#[cfg(feature = "defi")]
pub(super) static DEFI_COLLECT_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolFeeCollect>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
#[cfg(feature = "defi")]
pub(super) static DEFI_FLASH_HANDLERS: RefCell<SmallVec<[TypedHandler<PoolFlash>; HANDLER_BUFFER_CAP]>> =
RefCell::new(SmallVec::new());
}
#[derive(Debug)]
pub struct SuppressExternalGuard;
impl SuppressExternalGuard {
#[must_use]
pub fn new() -> Self {
SUPPRESS_EXTERNAL_DEPTH.with(|depth| depth.set(depth.get().saturating_add(1)));
Self
}
}
impl Default for SuppressExternalGuard {
fn default() -> Self {
Self::new()
}
}
impl Drop for SuppressExternalGuard {
fn drop(&mut self) {
SUPPRESS_EXTERNAL_DEPTH.with(|depth| depth.set(depth.get().saturating_sub(1)));
}
}
pub fn set_message_bus(msgbus: Rc<RefCell<MessageBus>>) {
HAS_EXTERNAL_EGRESS.with(|flag| flag.set(msgbus.borrow().has_external_egress()));
MESSAGE_BUS.with(|bus| {
*bus.borrow_mut() = Some(msgbus);
});
}
pub fn get_message_bus() -> Rc<RefCell<MessageBus>> {
MESSAGE_BUS.with(|bus| {
let mut slot = bus.borrow_mut();
let rc = slot.get_or_insert_with(|| Rc::new(RefCell::new(MessageBus::default())));
rc.clone()
})
}
pub fn try_get_message_bus() -> Option<Rc<RefCell<MessageBus>>> {
MESSAGE_BUS.with(|bus| bus.borrow().clone())
}
pub trait BusTap: 'static {
fn on_publish(&self, topic: MStr<Topic>, message: &dyn Any);
fn on_send(&self, endpoint: MStr<Endpoint>, message: &dyn Any);
fn on_response(&self, _correlation_id: &UUID4, _message: &dyn Any) {}
}
thread_local! {
pub(super) static BUS_TAP: RefCell<Option<Rc<dyn BusTap>>> = const { RefCell::new(None) };
}
pub fn set_bus_tap(tap: Rc<dyn BusTap>) {
BUS_TAP.with(|slot| {
*slot.borrow_mut() = Some(tap);
});
}
pub fn clear_bus_tap() {
BUS_TAP.with(|slot| {
*slot.borrow_mut() = None;
});
}
#[inline]
pub(super) fn dispatch_tap_publish(topic: MStr<Topic>, message: &dyn Any) {
let tap = BUS_TAP.with(|slot| slot.borrow().clone());
if let Some(tap) = tap {
tap.on_publish(topic, message);
}
}
#[inline]
pub(super) fn dispatch_tap_send(endpoint: MStr<Endpoint>, message: &dyn Any) {
let tap = BUS_TAP.with(|slot| slot.borrow().clone());
if let Some(tap) = tap {
tap.on_send(endpoint, message);
}
}
#[inline]
pub(super) fn dispatch_tap_response(correlation_id: &UUID4, message: &dyn Any) {
let tap = BUS_TAP.with(|slot| slot.borrow().clone());
if let Some(tap) = tap {
tap.on_response(correlation_id, message);
}
}
#[inline]
pub(crate) fn dispatch_tap_time_event(event: &TimeEvent) {
dispatch_tap_publish(MessagingSwitchboard::time_event_topic(), event);
}