use std::{any::Any, cell::RefCell, thread::LocalKey};
use nautilus_core::UUID4;
#[cfg(feature = "defi")]
use nautilus_model::defi::{
Block, DefiData, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
};
use nautilus_model::{
data::{
Bar, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
option_chain::{OptionChainSlice, OptionGreeks},
},
events::{AccountState, OrderEventAny, PositionEvent},
orderbook::OrderBook,
orders::OrderAny,
position::Position,
};
use smallvec::SmallVec;
use ustr::Ustr;
use super::{
ACCOUNT_STATE_HANDLERS, ANY_HANDLERS, BAR_HANDLERS, BOOK_HANDLERS, DELTAS_HANDLERS,
DEPTH10_HANDLERS, FUNDING_RATE_HANDLERS, GREEKS_HANDLERS, HANDLER_BUFFER_CAP,
INDEX_PRICE_HANDLERS, MARK_PRICE_HANDLERS, OPTION_CHAIN_HANDLERS, OPTION_GREEKS_HANDLERS,
ORDER_EVENT_HANDLERS, POSITION_EVENT_HANDLERS, QUOTE_HANDLERS, TRADE_HANDLERS,
core::{MessageBus, Subscription},
get_message_bus,
matching::is_matching_backtracking,
mstr::{Endpoint, MStr, Pattern, Topic},
typed_handler::{ShareableMessageHandler, TypedHandler, TypedIntoHandler},
};
#[cfg(feature = "defi")]
use super::{
DEFI_BLOCK_HANDLERS, DEFI_COLLECT_HANDLERS, DEFI_FLASH_HANDLERS, DEFI_LIQUIDITY_HANDLERS,
DEFI_POOL_HANDLERS, DEFI_SWAP_HANDLERS,
};
use crate::messages::{
data::{DataCommand, DataResponse},
execution::{ExecutionReport, TradingCommand},
};
pub fn register_any(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
log::debug!(
"Registering endpoint '{endpoint}' with handler ID {}",
handler.0.id(),
);
get_message_bus()
.borrow_mut()
.endpoints
.insert(endpoint, handler);
}
pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
if let Err(e) = get_message_bus()
.borrow_mut()
.register_response_handler(correlation_id, handler)
{
log::error!("Failed to register request handler: {e}");
}
}
pub fn register_quote_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<QuoteTick>) {
get_message_bus()
.borrow_mut()
.endpoints_quotes
.register(endpoint, handler);
}
pub fn register_trade_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<TradeTick>) {
get_message_bus()
.borrow_mut()
.endpoints_trades
.register(endpoint, handler);
}
pub fn register_bar_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<Bar>) {
get_message_bus()
.borrow_mut()
.endpoints_bars
.register(endpoint, handler);
}
pub fn register_order_event_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<OrderEventAny>,
) {
get_message_bus()
.borrow_mut()
.endpoints_order_events
.register(endpoint, handler);
}
pub fn register_account_state_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedHandler<AccountState>,
) {
get_message_bus()
.borrow_mut()
.endpoints_account_state
.register(endpoint, handler);
}
pub fn register_trading_command_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<TradingCommand>,
) {
get_message_bus()
.borrow_mut()
.endpoints_trading_commands
.register(endpoint, handler);
}
pub fn register_data_command_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<DataCommand>,
) {
get_message_bus()
.borrow_mut()
.endpoints_data_commands
.register(endpoint, handler);
}
pub fn register_data_response_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<DataResponse>,
) {
get_message_bus()
.borrow_mut()
.endpoints_data_responses
.register(endpoint, handler);
}
pub fn register_execution_report_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<ExecutionReport>,
) {
get_message_bus()
.borrow_mut()
.endpoints_exec_reports
.register(endpoint, handler);
}
pub fn register_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<Data>) {
get_message_bus()
.borrow_mut()
.endpoints_data
.register(endpoint, handler);
}
#[cfg(feature = "defi")]
pub fn register_defi_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<DefiData>) {
get_message_bus()
.borrow_mut()
.endpoints_defi_data
.register(endpoint, handler);
}
pub fn deregister_any(endpoint: MStr<Endpoint>) {
log::debug!("Deregistering endpoint '{endpoint}'");
get_message_bus()
.borrow_mut()
.endpoints
.shift_remove(&endpoint);
}
#[must_use]
pub fn has_endpoint(endpoint: &str) -> bool {
let key: MStr<Endpoint> = Ustr::from(endpoint).into();
get_message_bus().borrow().get_endpoint(key).is_some()
}
pub fn subscribe_any(
pattern: MStr<Pattern>,
handler: ShareableMessageHandler,
priority: Option<u8>,
) {
let msgbus = get_message_bus();
let mut msgbus_ref_mut = msgbus.borrow_mut();
let sub = Subscription::new(pattern, handler, priority);
log::debug!(
"Subscribing {:?} for pattern '{}'",
sub.handler,
sub.pattern
);
if msgbus_ref_mut.subscriptions.contains(&sub) {
log::warn!("{sub:?} already exists");
return;
}
for (topic, subs) in &mut msgbus_ref_mut.topics {
if is_matching_backtracking(*topic, sub.pattern) {
subs.push(sub.clone());
subs.sort();
log::debug!("Added subscription for '{topic}'");
}
}
msgbus_ref_mut.subscriptions.insert(sub);
}
pub fn subscribe_instruments(
pattern: MStr<Pattern>,
handler: ShareableMessageHandler,
priority: Option<u8>,
) {
subscribe_any(pattern, handler, priority);
}
pub fn subscribe_instrument_close(
pattern: MStr<Pattern>,
handler: ShareableMessageHandler,
priority: Option<u8>,
) {
subscribe_any(pattern, handler, priority);
}
pub fn subscribe_book_deltas(
pattern: MStr<Pattern>,
handler: TypedHandler<OrderBookDeltas>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_deltas
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_book_depth10(
pattern: MStr<Pattern>,
handler: TypedHandler<OrderBookDepth10>,
priority: Option<u8>,
) {
get_message_bus().borrow_mut().router_depth10.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
pub fn subscribe_book_snapshots(
pattern: MStr<Pattern>,
handler: TypedHandler<OrderBook>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_book_snapshots
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_quotes(
pattern: MStr<Pattern>,
handler: TypedHandler<QuoteTick>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_quotes
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_trades(
pattern: MStr<Pattern>,
handler: TypedHandler<TradeTick>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_trades
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_bars(pattern: MStr<Pattern>, handler: TypedHandler<Bar>, priority: Option<u8>) {
get_message_bus()
.borrow_mut()
.router_bars
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_mark_prices(
pattern: MStr<Pattern>,
handler: TypedHandler<MarkPriceUpdate>,
priority: Option<u8>,
) {
get_message_bus().borrow_mut().router_mark_prices.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
pub fn subscribe_index_prices(
pattern: MStr<Pattern>,
handler: TypedHandler<IndexPriceUpdate>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_index_prices
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_funding_rates(
pattern: MStr<Pattern>,
handler: TypedHandler<FundingRateUpdate>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_funding_rates
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_greeks(
pattern: MStr<Pattern>,
handler: TypedHandler<GreeksData>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_greeks
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_option_greeks(
pattern: MStr<Pattern>,
handler: TypedHandler<OptionGreeks>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_option_greeks
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_option_chain(
pattern: MStr<Pattern>,
handler: TypedHandler<OptionChainSlice>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_option_chain
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_order_events(
pattern: MStr<Pattern>,
handler: TypedHandler<OrderEventAny>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_order_events
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_position_events(
pattern: MStr<Pattern>,
handler: TypedHandler<PositionEvent>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_position_events
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_account_state(
pattern: MStr<Pattern>,
handler: TypedHandler<AccountState>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_account_state
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_positions(
pattern: MStr<Pattern>,
handler: TypedHandler<Position>,
priority: Option<u8>,
) {
get_message_bus().borrow_mut().router_positions.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_blocks(
pattern: MStr<Pattern>,
handler: TypedHandler<Block>,
priority: Option<u8>,
) {
get_message_bus().borrow_mut().router_defi_blocks.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_pools(
pattern: MStr<Pattern>,
handler: TypedHandler<Pool>,
priority: Option<u8>,
) {
get_message_bus().borrow_mut().router_defi_pools.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_swaps(
pattern: MStr<Pattern>,
handler: TypedHandler<PoolSwap>,
priority: Option<u8>,
) {
get_message_bus().borrow_mut().router_defi_swaps.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_liquidity(
pattern: MStr<Pattern>,
handler: TypedHandler<PoolLiquidityUpdate>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_defi_liquidity
.subscribe(pattern, handler, priority.unwrap_or(0));
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_collects(
pattern: MStr<Pattern>,
handler: TypedHandler<PoolFeeCollect>,
priority: Option<u8>,
) {
get_message_bus()
.borrow_mut()
.router_defi_collects
.subscribe(pattern, handler, priority.unwrap_or(0));
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_flash(
pattern: MStr<Pattern>,
handler: TypedHandler<PoolFlash>,
priority: Option<u8>,
) {
get_message_bus().borrow_mut().router_defi_flash.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
pub fn unsubscribe_instruments(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
unsubscribe_any(pattern, handler);
}
pub fn unsubscribe_instrument_close(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
unsubscribe_any(pattern, handler);
}
pub fn unsubscribe_book_deltas(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDeltas>) {
get_message_bus()
.borrow_mut()
.router_deltas
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_book_depth10(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDepth10>) {
get_message_bus()
.borrow_mut()
.router_depth10
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_book_snapshots(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBook>) {
get_message_bus()
.borrow_mut()
.router_book_snapshots
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_quotes(pattern: MStr<Pattern>, handler: &TypedHandler<QuoteTick>) {
get_message_bus()
.borrow_mut()
.router_quotes
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_trades(pattern: MStr<Pattern>, handler: &TypedHandler<TradeTick>) {
get_message_bus()
.borrow_mut()
.router_trades
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_bars(pattern: MStr<Pattern>, handler: &TypedHandler<Bar>) {
get_message_bus()
.borrow_mut()
.router_bars
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_mark_prices(pattern: MStr<Pattern>, handler: &TypedHandler<MarkPriceUpdate>) {
get_message_bus()
.borrow_mut()
.router_mark_prices
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_index_prices(pattern: MStr<Pattern>, handler: &TypedHandler<IndexPriceUpdate>) {
get_message_bus()
.borrow_mut()
.router_index_prices
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_funding_rates(
pattern: MStr<Pattern>,
handler: &TypedHandler<FundingRateUpdate>,
) {
get_message_bus()
.borrow_mut()
.router_funding_rates
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_account_state(pattern: MStr<Pattern>, handler: &TypedHandler<AccountState>) {
get_message_bus()
.borrow_mut()
.router_account_state
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_order_events(pattern: MStr<Pattern>, handler: &TypedHandler<OrderEventAny>) {
get_message_bus()
.borrow_mut()
.router_order_events
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_position_events(pattern: MStr<Pattern>, handler: &TypedHandler<PositionEvent>) {
get_message_bus()
.borrow_mut()
.router_position_events
.unsubscribe(pattern, handler);
}
pub fn remove_order_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
get_message_bus()
.borrow_mut()
.router_order_events
.remove_handler(pattern, handler_id);
}
pub fn remove_position_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
get_message_bus()
.borrow_mut()
.router_position_events
.remove_handler(pattern, handler_id);
}
pub fn unsubscribe_orders(pattern: MStr<Pattern>, handler: &TypedHandler<OrderAny>) {
get_message_bus()
.borrow_mut()
.router_orders
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_positions(pattern: MStr<Pattern>, handler: &TypedHandler<Position>) {
get_message_bus()
.borrow_mut()
.router_positions
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<GreeksData>) {
get_message_bus()
.borrow_mut()
.router_greeks
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_option_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<OptionGreeks>) {
get_message_bus()
.borrow_mut()
.router_option_greeks
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_option_chain(pattern: MStr<Pattern>, handler: &TypedHandler<OptionChainSlice>) {
get_message_bus()
.borrow_mut()
.router_option_chain
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_blocks(pattern: MStr<Pattern>, handler: &TypedHandler<Block>) {
get_message_bus()
.borrow_mut()
.router_defi_blocks
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_pools(pattern: MStr<Pattern>, handler: &TypedHandler<Pool>) {
get_message_bus()
.borrow_mut()
.router_defi_pools
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_swaps(pattern: MStr<Pattern>, handler: &TypedHandler<PoolSwap>) {
get_message_bus()
.borrow_mut()
.router_defi_swaps
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_liquidity(
pattern: MStr<Pattern>,
handler: &TypedHandler<PoolLiquidityUpdate>,
) {
get_message_bus()
.borrow_mut()
.router_defi_liquidity
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_collects(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFeeCollect>) {
get_message_bus()
.borrow_mut()
.router_defi_collects
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_flash(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFlash>) {
get_message_bus()
.borrow_mut()
.router_defi_flash
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_any(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
let handler_id = handler.0.id();
let bus_rc = get_message_bus();
let mut bus = bus_rc.borrow_mut();
let count_before = bus.subscriptions.len();
bus.topics.values_mut().for_each(|subs| {
subs.retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
});
bus.subscriptions
.retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
let removed = bus.subscriptions.len() < count_before;
if removed {
log::debug!("Handler for pattern '{pattern}' was removed");
} else {
log::debug!("No matching handler for pattern '{pattern}' was found");
}
}
pub fn is_subscribed_any<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
let pattern = MStr::from(pattern.as_ref());
let sub = Subscription::new(pattern, handler, None);
get_message_bus().borrow().subscriptions.contains(&sub)
}
pub fn subscriptions_count_any<S: AsRef<str>>(topic: S) -> usize {
get_message_bus().borrow().subscriptions_count(topic)
}
pub fn subscriber_count_deltas(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_deltas
.subscriber_count(topic)
}
pub fn subscriber_count_depth10(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_depth10
.subscriber_count(topic)
}
pub fn subscriber_count_book_snapshots(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_book_snapshots
.subscriber_count(topic)
}
pub fn exact_subscriber_count_bars(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_bars
.exact_subscriber_count(topic)
}
pub fn publish_any(topic: MStr<Topic>, message: &dyn Any) {
let mut handlers = ANY_HANDLERS.with_borrow_mut(std::mem::take);
get_message_bus()
.borrow_mut()
.fill_matching_any_handlers(topic, &mut handlers);
for handler in &handlers {
handler.0.handle(message);
}
handlers.clear(); ANY_HANDLERS.with_borrow_mut(|buf| *buf = handlers);
}
pub fn publish_deltas(topic: MStr<Topic>, deltas: &OrderBookDeltas) {
publish_typed(
&DELTAS_HANDLERS,
|bus, h| bus.router_deltas.fill_matching_handlers(topic, h),
deltas,
);
}
pub fn publish_depth10(topic: MStr<Topic>, depth: &OrderBookDepth10) {
publish_typed(
&DEPTH10_HANDLERS,
|bus, h| bus.router_depth10.fill_matching_handlers(topic, h),
depth,
);
}
pub fn publish_book(topic: MStr<Topic>, book: &OrderBook) {
publish_typed(
&BOOK_HANDLERS,
|bus, h| bus.router_book_snapshots.fill_matching_handlers(topic, h),
book,
);
}
pub fn publish_quote(topic: MStr<Topic>, quote: &QuoteTick) {
publish_typed(
"E_HANDLERS,
|bus, h| bus.router_quotes.fill_matching_handlers(topic, h),
quote,
);
}
pub fn publish_trade(topic: MStr<Topic>, trade: &TradeTick) {
publish_typed(
&TRADE_HANDLERS,
|bus, h| bus.router_trades.fill_matching_handlers(topic, h),
trade,
);
}
pub fn publish_bar(topic: MStr<Topic>, bar: &Bar) {
publish_typed(
&BAR_HANDLERS,
|bus, h| bus.router_bars.fill_matching_handlers(topic, h),
bar,
);
}
pub fn publish_mark_price(topic: MStr<Topic>, mark_price: &MarkPriceUpdate) {
publish_typed(
&MARK_PRICE_HANDLERS,
|bus, h| bus.router_mark_prices.fill_matching_handlers(topic, h),
mark_price,
);
}
pub fn publish_index_price(topic: MStr<Topic>, index_price: &IndexPriceUpdate) {
publish_typed(
&INDEX_PRICE_HANDLERS,
|bus, h| bus.router_index_prices.fill_matching_handlers(topic, h),
index_price,
);
}
pub fn publish_funding_rate(topic: MStr<Topic>, funding_rate: &FundingRateUpdate) {
publish_typed(
&FUNDING_RATE_HANDLERS,
|bus, h| bus.router_funding_rates.fill_matching_handlers(topic, h),
funding_rate,
);
}
pub fn publish_greeks(topic: MStr<Topic>, greeks: &GreeksData) {
publish_typed(
&GREEKS_HANDLERS,
|bus, h| bus.router_greeks.fill_matching_handlers(topic, h),
greeks,
);
}
pub fn publish_option_greeks(topic: MStr<Topic>, option_greeks: &OptionGreeks) {
publish_typed(
&OPTION_GREEKS_HANDLERS,
|bus, h| bus.router_option_greeks.fill_matching_handlers(topic, h),
option_greeks,
);
}
pub fn publish_option_chain(topic: MStr<Topic>, slice: &OptionChainSlice) {
publish_typed(
&OPTION_CHAIN_HANDLERS,
|bus, h| bus.router_option_chain.fill_matching_handlers(topic, h),
slice,
);
}
pub fn publish_account_state(topic: MStr<Topic>, state: &AccountState) {
publish_typed(
&ACCOUNT_STATE_HANDLERS,
|bus, h| bus.router_account_state.fill_matching_handlers(topic, h),
state,
);
}
pub fn publish_order_event(topic: MStr<Topic>, event: &OrderEventAny) {
publish_typed(
&ORDER_EVENT_HANDLERS,
|bus, h| bus.router_order_events.fill_matching_handlers(topic, h),
event,
);
}
pub fn publish_position_event(topic: MStr<Topic>, event: &PositionEvent) {
publish_typed(
&POSITION_EVENT_HANDLERS,
|bus, h| bus.router_position_events.fill_matching_handlers(topic, h),
event,
);
}
#[cfg(feature = "defi")]
pub fn publish_defi_block(topic: MStr<Topic>, block: &Block) {
publish_typed(
&DEFI_BLOCK_HANDLERS,
|bus, h| bus.router_defi_blocks.fill_matching_handlers(topic, h),
block,
);
}
#[cfg(feature = "defi")]
pub fn publish_defi_pool(topic: MStr<Topic>, pool: &Pool) {
publish_typed(
&DEFI_POOL_HANDLERS,
|bus, h| bus.router_defi_pools.fill_matching_handlers(topic, h),
pool,
);
}
#[cfg(feature = "defi")]
pub fn publish_defi_swap(topic: MStr<Topic>, swap: &PoolSwap) {
publish_typed(
&DEFI_SWAP_HANDLERS,
|bus, h| bus.router_defi_swaps.fill_matching_handlers(topic, h),
swap,
);
}
#[cfg(feature = "defi")]
pub fn publish_defi_liquidity(topic: MStr<Topic>, update: &PoolLiquidityUpdate) {
publish_typed(
&DEFI_LIQUIDITY_HANDLERS,
|bus, h| bus.router_defi_liquidity.fill_matching_handlers(topic, h),
update,
);
}
#[cfg(feature = "defi")]
pub fn publish_defi_collect(topic: MStr<Topic>, collect: &PoolFeeCollect) {
publish_typed(
&DEFI_COLLECT_HANDLERS,
|bus, h| bus.router_defi_collects.fill_matching_handlers(topic, h),
collect,
);
}
#[cfg(feature = "defi")]
pub fn publish_defi_flash(topic: MStr<Topic>, flash: &PoolFlash) {
publish_typed(
&DEFI_FLASH_HANDLERS,
|bus, h| bus.router_defi_flash.fill_matching_handlers(topic, h),
flash,
);
}
#[inline]
fn publish_typed<T: 'static>(
tls: &'static LocalKey<RefCell<SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>>>,
fill_fn: impl FnOnce(&mut MessageBus, &mut SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>),
message: &T,
) {
let mut handlers = tls.with_borrow_mut(std::mem::take);
let bus_rc = get_message_bus();
fill_fn(&mut bus_rc.borrow_mut(), &mut handlers);
for handler in &handlers {
handler.handle(message);
}
handlers.clear(); tls.with_borrow_mut(|buf| *buf = handlers);
}
pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
if let Some(handler) = handler {
handler.0.handle(message);
} else {
log::error!("send_any: no registered endpoint '{endpoint}'");
}
}
pub fn send_any_value<T: 'static>(endpoint: MStr<Endpoint>, message: &T) {
let handler = get_message_bus().borrow().get_endpoint(endpoint).cloned();
if let Some(handler) = handler {
handler.0.handle(message);
} else {
log::error!("send_any_value: no registered endpoint '{endpoint}'");
}
}
pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
let handler = get_message_bus()
.borrow()
.get_response_handler(correlation_id)
.cloned();
if let Some(handler) = handler {
match message {
DataResponse::Data(resp) => handler.0.handle(resp),
DataResponse::Instrument(resp) => handler.0.handle(resp.as_ref()),
DataResponse::Instruments(resp) => handler.0.handle(resp),
DataResponse::Book(resp) => handler.0.handle(resp),
DataResponse::Quotes(resp) => handler.0.handle(resp),
DataResponse::Trades(resp) => handler.0.handle(resp),
DataResponse::FundingRates(resp) => handler.0.handle(resp),
DataResponse::ForwardPrices(resp) => handler.0.handle(resp),
DataResponse::Bars(resp) => handler.0.handle(resp),
}
} else {
log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
}
}
pub fn send_quote(endpoint: MStr<Endpoint>, quote: &QuoteTick) {
send_endpoint_ref(
endpoint,
quote,
|bus| bus.endpoints_quotes.get(endpoint),
"send_quote",
);
}
pub fn send_trade(endpoint: MStr<Endpoint>, trade: &TradeTick) {
send_endpoint_ref(
endpoint,
trade,
|bus| bus.endpoints_trades.get(endpoint),
"send_trade",
);
}
pub fn send_bar(endpoint: MStr<Endpoint>, bar: &Bar) {
send_endpoint_ref(
endpoint,
bar,
|bus| bus.endpoints_bars.get(endpoint),
"send_bar",
);
}
pub fn send_order_event(endpoint: MStr<Endpoint>, event: OrderEventAny) {
send_endpoint_owned(
endpoint,
event,
|bus| bus.endpoints_order_events.get(endpoint),
"send_order_event",
);
}
pub fn send_account_state(endpoint: MStr<Endpoint>, state: &AccountState) {
send_endpoint_ref(
endpoint,
state,
|bus| bus.endpoints_account_state.get(endpoint),
"send_account_state",
);
}
pub fn send_trading_command(endpoint: MStr<Endpoint>, command: TradingCommand) {
send_endpoint_owned(
endpoint,
command,
|bus| bus.endpoints_trading_commands.get(endpoint),
"send_trading_command",
);
}
pub fn send_data_command(endpoint: MStr<Endpoint>, command: DataCommand) {
send_endpoint_owned(
endpoint,
command,
|bus| bus.endpoints_data_commands.get(endpoint),
"send_data_command",
);
}
pub fn send_data_response(endpoint: MStr<Endpoint>, response: DataResponse) {
send_endpoint_owned(
endpoint,
response,
|bus| bus.endpoints_data_responses.get(endpoint),
"send_data_response",
);
}
pub fn send_execution_report(endpoint: MStr<Endpoint>, report: ExecutionReport) {
send_endpoint_owned(
endpoint,
report,
|bus| bus.endpoints_exec_reports.get(endpoint),
"send_execution_report",
);
}
pub fn send_data(endpoint: MStr<Endpoint>, data: Data) {
send_endpoint_owned(
endpoint,
data,
|bus| bus.endpoints_data.get(endpoint),
"send_data",
);
}
#[cfg(feature = "defi")]
pub fn send_defi_data(endpoint: MStr<Endpoint>, data: DefiData) {
send_endpoint_owned(
endpoint,
data,
|bus| bus.endpoints_defi_data.get(endpoint),
"send_defi_data",
);
}
#[inline]
fn send_endpoint_ref<T: 'static, F>(
endpoint: MStr<Endpoint>,
message: &T,
get_handler: F,
fn_name: &str,
) where
F: FnOnce(&MessageBus) -> Option<&TypedHandler<T>>,
{
let handler = {
let bus = get_message_bus();
get_handler(&bus.borrow()).cloned()
};
if let Some(handler) = handler {
handler.handle(message);
} else {
log::error!("{fn_name}: no registered endpoint '{endpoint}'");
}
}
#[inline]
fn send_endpoint_owned<T: 'static, F>(
endpoint: MStr<Endpoint>,
message: T,
get_handler: F,
fn_name: &str,
) where
F: FnOnce(&MessageBus) -> Option<&TypedIntoHandler<T>>,
{
let handler = {
let bus = get_message_bus();
get_handler(&bus.borrow()).cloned()
};
if let Some(handler) = handler {
handler.handle(message);
} else {
log::error!("{fn_name}: no registered endpoint '{endpoint}'");
}
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};
use nautilus_core::UUID4;
use nautilus_model::{
data::{Bar, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
enums::OrderSide,
events::OrderDenied,
identifiers::{ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId},
};
use rstest::rstest;
use super::*;
use crate::messages::{
data::{DataCommand, SubscribeCommand, SubscribeQuotes},
execution::{CancelAllOrders, TradingCommand},
};
#[rstest]
fn test_typed_quote_publish_subscribe_integration() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |quote: &QuoteTick| {
received_clone.borrow_mut().push(*quote);
});
subscribe_quotes("data.quotes.*".into(), handler, None);
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
publish_quote("data.quotes.TEST".into(), "e);
assert_eq!(received.borrow().len(), 2);
}
#[rstest]
fn test_typed_trade_publish_subscribe_integration() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |trade: &TradeTick| {
received_clone.borrow_mut().push(*trade);
});
subscribe_trades("data.trades.*".into(), handler, None);
let trade = TradeTick::default();
publish_trade("data.trades.TEST".into(), &trade);
assert_eq!(received.borrow().len(), 1);
}
#[rstest]
fn test_typed_bar_publish_subscribe_integration() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |bar: &Bar| {
received_clone.borrow_mut().push(*bar);
});
subscribe_bars("data.bars.*".into(), handler, None);
let bar = Bar::default();
publish_bar("data.bars.TEST".into(), &bar);
assert_eq!(received.borrow().len(), 1);
}
#[rstest]
fn test_typed_deltas_publish_subscribe_integration() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
received_clone.borrow_mut().push(deltas.clone());
});
subscribe_book_deltas("data.book.deltas.*".into(), handler, None);
let instrument_id = InstrumentId::from("TEST.VENUE");
let delta = OrderBookDelta::clear(instrument_id, 0, 1.into(), 2.into());
let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
publish_deltas("data.book.deltas.TEST".into(), &deltas);
assert_eq!(received.borrow().len(), 1);
}
#[rstest]
fn test_typed_unsubscribe_stops_delivery() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from_with_id("unsub-test", move |quote: &QuoteTick| {
received_clone.borrow_mut().push(*quote);
});
subscribe_quotes("data.quotes.UNSUB".into(), handler.clone(), None);
let quote = QuoteTick::default();
publish_quote("data.quotes.UNSUB".into(), "e);
assert_eq!(received.borrow().len(), 1);
unsubscribe_quotes("data.quotes.UNSUB".into(), &handler);
publish_quote("data.quotes.UNSUB".into(), "e);
assert_eq!(received.borrow().len(), 1);
}
#[rstest]
fn test_typed_wildcard_pattern_matching() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |quote: &QuoteTick| {
received_clone.borrow_mut().push(*quote);
});
subscribe_quotes("data.quotes.WILD.*".into(), handler, None);
let quote = QuoteTick::default();
publish_quote("data.quotes.WILD.AAPL".into(), "e);
publish_quote("data.quotes.WILD.MSFT".into(), "e);
publish_quote("data.quotes.OTHER.AAPL".into(), "e);
assert_eq!(received.borrow().len(), 2);
}
#[rstest]
fn test_typed_priority_ordering() {
let _msgbus = get_message_bus();
let order = Rc::new(RefCell::new(Vec::new()));
let order1 = order.clone();
let handler_low = TypedHandler::from_with_id("low-priority", move |_: &QuoteTick| {
order1.borrow_mut().push("low");
});
let order2 = order.clone();
let handler_high = TypedHandler::from_with_id("high-priority", move |_: &QuoteTick| {
order2.borrow_mut().push("high");
});
subscribe_quotes("data.quotes.PRIO.*".into(), handler_low, Some(1));
subscribe_quotes("data.quotes.PRIO.*".into(), handler_high, Some(10));
let quote = QuoteTick::default();
publish_quote("data.quotes.PRIO.TEST".into(), "e);
assert_eq!(*order.borrow(), vec!["high", "low"]);
}
#[rstest]
fn test_typed_routing_isolation() {
let _msgbus = get_message_bus();
let quote_received = Rc::new(RefCell::new(false));
let trade_received = Rc::new(RefCell::new(false));
let qr = quote_received.clone();
let quote_handler = TypedHandler::from(move |_: &QuoteTick| {
*qr.borrow_mut() = true;
});
let tr = trade_received.clone();
let trade_handler = TypedHandler::from(move |_: &TradeTick| {
*tr.borrow_mut() = true;
});
subscribe_quotes("data.iso.*".into(), quote_handler, None);
subscribe_trades("data.iso.*".into(), trade_handler, None);
let quote = QuoteTick::default();
publish_quote("data.iso.TEST".into(), "e);
assert!(*quote_received.borrow());
assert!(!*trade_received.borrow());
}
#[rstest]
fn test_send_data_allows_reentrant_topic_access() {
use crate::msgbus::switchboard::get_quotes_topic;
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |data: Data| {
let instrument_id = data.instrument_id();
let _topic = get_quotes_topic(instrument_id);
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.data".into();
register_data_endpoint(endpoint, handler);
let quote = QuoteTick::default();
send_data(endpoint, Data::Quote(quote));
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_trading_command_allows_reentrant_topic_access() {
use nautilus_model::{
enums::OrderSide,
identifiers::{StrategyId, TraderId},
};
use crate::{
messages::execution::{TradingCommand, cancel::CancelAllOrders},
msgbus::switchboard::get_trades_topic,
};
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |cmd: TradingCommand| {
let instrument_id = cmd.instrument_id();
let _topic = get_trades_topic(instrument_id);
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.tradingCmd".into();
register_trading_command_endpoint(endpoint, handler);
let cmd = TradingCommand::CancelAllOrders(CancelAllOrders::new(
TraderId::new("TESTER-001"),
None,
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
OrderSide::NoOrderSide,
UUID4::new(),
0.into(),
None,
));
send_trading_command(endpoint, cmd);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_account_state_allows_reentrant_topic_access() {
use nautilus_model::{enums::AccountType, identifiers::AccountId, types::Currency};
use crate::msgbus::switchboard::get_quotes_topic;
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedHandler::from(move |_state: &AccountState| {
let instrument_id = InstrumentId::from("TEST.VENUE");
let _topic = get_quotes_topic(instrument_id);
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.accountState".into();
register_account_state_endpoint(endpoint, handler);
let state = AccountState::new(
AccountId::new("SIM-001"),
AccountType::Cash,
vec![],
vec![],
true,
UUID4::new(),
0.into(),
0.into(),
Some(Currency::USD()),
);
send_account_state(endpoint, &state);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_order_event_allows_reentrant_topic_access() {
use nautilus_model::{
events::OrderDenied,
identifiers::{ClientOrderId, StrategyId, TraderId},
};
use crate::msgbus::switchboard::get_quotes_topic;
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
let instrument_id = InstrumentId::from("TEST.VENUE");
let _topic = get_quotes_topic(instrument_id);
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.orderEvent".into();
register_order_event_endpoint(endpoint, handler);
let event = OrderEventAny::Denied(OrderDenied::new(
TraderId::new("TESTER-001"),
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
ClientOrderId::new("O-001"),
"test denied".into(),
UUID4::new(),
0.into(),
0.into(),
));
send_order_event(endpoint, event);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_data_command_allows_reentrant_topic_access() {
use nautilus_model::identifiers::ClientId;
use crate::{
messages::data::{DataCommand, SubscribeCommand, SubscribeQuotes},
msgbus::switchboard::get_trades_topic,
};
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd".into();
register_data_command_endpoint(endpoint, handler);
let cmd = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
InstrumentId::from("TEST.VENUE"),
Some(ClientId::new("SIM")),
None,
UUID4::new(),
0.into(),
None,
None,
)));
send_data_command(endpoint, cmd);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_data_response_allows_reentrant_topic_access() {
use nautilus_model::identifiers::ClientId;
use crate::{
messages::data::{DataResponse, QuotesResponse},
msgbus::switchboard::get_quotes_topic,
};
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |_resp: DataResponse| {
let _topic = get_quotes_topic(InstrumentId::from("TEST.VENUE"));
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.dataResp".into();
register_data_response_endpoint(endpoint, handler);
let resp = DataResponse::Quotes(QuotesResponse {
correlation_id: UUID4::new(),
client_id: ClientId::new("SIM"),
instrument_id: InstrumentId::from("TEST.VENUE"),
data: vec![],
start: None,
end: None,
ts_init: 0.into(),
params: None,
});
send_data_response(endpoint, resp);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_execution_report_allows_reentrant_topic_access() {
use nautilus_model::{
identifiers::{AccountId, ClientId, Venue},
reports::ExecutionMassStatus,
};
use crate::{messages::execution::ExecutionReport, msgbus::switchboard::get_trades_topic};
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |_report: ExecutionReport| {
let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.execReport".into();
register_execution_report_endpoint(endpoint, handler);
let report = ExecutionReport::MassStatus(Box::new(ExecutionMassStatus::new(
ClientId::new("SIM"),
AccountId::new("SIM-001"),
Venue::new("TEST"),
0.into(),
None,
)));
send_execution_report(endpoint, report);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_order_event_handler_can_send_trading_command() {
let _msgbus = get_message_bus();
let command_sent = Rc::new(RefCell::new(false));
let command_sent_clone = command_sent.clone();
let cmd_received = Rc::new(RefCell::new(false));
let cmd_received_clone = cmd_received.clone();
let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
*cmd_received_clone.borrow_mut() = true;
});
let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd".into();
register_trading_command_endpoint(cmd_endpoint, cmd_handler);
let event_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
TraderId::new("TESTER-001"),
None,
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
OrderSide::Buy,
UUID4::new(),
0.into(),
None,
));
send_trading_command(cmd_endpoint, command);
*command_sent_clone.borrow_mut() = true;
});
let event_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt".into();
register_order_event_endpoint(event_endpoint, event_handler);
let event = OrderEventAny::Denied(OrderDenied::new(
TraderId::new("TESTER-001"),
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
ClientOrderId::new("O-001"),
"Test denial".into(),
UUID4::new(),
0.into(),
0.into(),
));
send_order_event(event_endpoint, event);
assert!(
*command_sent.borrow(),
"Order event handler should have run"
);
assert!(
*cmd_received.borrow(),
"Trading command should have been received"
);
}
#[rstest]
fn test_data_handler_can_send_data_command() {
let _msgbus = get_message_bus();
let command_sent = Rc::new(RefCell::new(false));
let command_sent_clone = command_sent.clone();
let cmd_received = Rc::new(RefCell::new(false));
let cmd_received_clone = cmd_received.clone();
let cmd_handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
*cmd_received_clone.borrow_mut() = true;
});
let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd2".into();
register_data_command_endpoint(cmd_endpoint, cmd_handler);
let data_handler = TypedIntoHandler::from(move |_data: Data| {
let command = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
InstrumentId::from("TEST.VENUE"),
Some(ClientId::new("SIM")),
None,
UUID4::new(),
0.into(),
None,
None,
)));
send_data_command(cmd_endpoint, command);
*command_sent_clone.borrow_mut() = true;
});
let data_endpoint: MStr<Endpoint> = "ReentrantTest.data2".into();
register_data_endpoint(data_endpoint, data_handler);
let quote = QuoteTick::default();
send_data(data_endpoint, Data::Quote(quote));
assert!(*command_sent.borrow(), "Data handler should have run");
assert!(
*cmd_received.borrow(),
"Data command should have been received"
);
}
#[rstest]
fn test_trading_command_handler_can_send_order_event() {
let _msgbus = get_message_bus();
let event_sent = Rc::new(RefCell::new(false));
let event_sent_clone = event_sent.clone();
let evt_received = Rc::new(RefCell::new(false));
let evt_received_clone = evt_received.clone();
let evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
*evt_received_clone.borrow_mut() = true;
});
let evt_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt2".into();
register_order_event_endpoint(evt_endpoint, evt_handler);
let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
let event = OrderEventAny::Denied(OrderDenied::new(
TraderId::new("TESTER-001"),
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
ClientOrderId::new("O-001"),
"Test denial".into(),
UUID4::new(),
0.into(),
0.into(),
));
send_order_event(evt_endpoint, event);
*event_sent_clone.borrow_mut() = true;
});
let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd2".into();
register_trading_command_endpoint(cmd_endpoint, cmd_handler);
let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
TraderId::new("TESTER-001"),
None,
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
OrderSide::Buy,
UUID4::new(),
0.into(),
None,
));
send_trading_command(cmd_endpoint, command);
assert!(
*event_sent.borrow(),
"Trading command handler should have run"
);
assert!(
*evt_received.borrow(),
"Order event should have been received"
);
}
#[rstest]
fn test_nested_reentrant_calls() {
let _msgbus = get_message_bus();
let call_depth = Rc::new(RefCell::new(0u32));
let final_received = Rc::new(RefCell::new(false));
let final_received_clone = final_received.clone();
let final_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
*final_received_clone.borrow_mut() = true;
});
let final_evt_endpoint: MStr<Endpoint> = "ReentrantTest.finalEvt".into();
register_order_event_endpoint(final_evt_endpoint, final_evt_handler);
let call_depth_clone2 = call_depth.clone();
let mid_cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
*call_depth_clone2.borrow_mut() += 1;
let event = OrderEventAny::Denied(OrderDenied::new(
TraderId::new("TESTER-001"),
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
ClientOrderId::new("O-002"),
"Nested denial".into(),
UUID4::new(),
0.into(),
0.into(),
));
send_order_event(final_evt_endpoint, event);
});
let mid_cmd_endpoint: MStr<Endpoint> = "ReentrantTest.midCmd".into();
register_trading_command_endpoint(mid_cmd_endpoint, mid_cmd_handler);
let call_depth_clone1 = call_depth.clone();
let init_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
*call_depth_clone1.borrow_mut() += 1;
let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
TraderId::new("TESTER-001"),
None,
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
OrderSide::Buy,
UUID4::new(),
0.into(),
None,
));
send_trading_command(mid_cmd_endpoint, command);
});
let init_evt_endpoint: MStr<Endpoint> = "ReentrantTest.initEvt".into();
register_order_event_endpoint(init_evt_endpoint, init_evt_handler);
let event = OrderEventAny::Denied(OrderDenied::new(
TraderId::new("TESTER-001"),
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
ClientOrderId::new("O-001"),
"Initial denial".into(),
UUID4::new(),
0.into(),
0.into(),
));
send_order_event(init_evt_endpoint, event);
assert_eq!(
*call_depth.borrow(),
2,
"Both intermediate handlers should have run"
);
assert!(
*final_received.borrow(),
"Final event handler should have received the event"
);
}
}