use std::{any::Any, cell::RefCell, thread::LocalKey};
use nautilus_core::UUID4;
#[cfg(feature = "defi")]
use nautilus_model::defi::{
Block, DefiData, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
};
use nautilus_model::{
data::{
Bar, CustomData, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
option_chain::{OptionChainSlice, OptionGreeks},
},
events::{AccountState, OrderEventAny, PortfolioSnapshot, PositionEvent},
instruments::InstrumentAny,
orderbook::OrderBook,
orders::OrderAny,
position::Position,
};
use smallvec::SmallVec;
use ustr::Ustr;
pub use super::external::republish_external_message;
use super::{
ACCOUNT_STATE_HANDLERS, ANY_HANDLERS, BAR_HANDLERS, BOOK_HANDLERS, BusPayloadType,
DELTAS_HANDLERS, DEPTH10_HANDLERS, FUNDING_RATE_HANDLERS, GREEKS_HANDLERS, HANDLER_BUFFER_CAP,
INDEX_PRICE_HANDLERS, INSTRUMENT_HANDLERS, MARK_PRICE_HANDLERS, OPTION_CHAIN_HANDLERS,
OPTION_GREEKS_HANDLERS, ORDER_EVENT_HANDLERS, PORTFOLIO_SNAPSHOT_HANDLERS,
POSITION_EVENT_HANDLERS, QUOTE_HANDLERS, TRADE_HANDLERS,
core::{MessageBus, Subscription},
dispatch_tap_publish, dispatch_tap_response, dispatch_tap_send,
external::forward_to_external_egress,
get_message_bus,
matching::is_matching_backtracking,
mstr::{Endpoint, MStr, Pattern, Topic},
try_get_message_bus,
typed_handler::{ShareableMessageHandler, TypedHandler, TypedIntoHandler},
};
#[cfg(feature = "defi")]
use super::{
DEFI_BLOCK_HANDLERS, DEFI_COLLECT_HANDLERS, DEFI_FLASH_HANDLERS, DEFI_LIQUIDITY_HANDLERS,
DEFI_POOL_HANDLERS, DEFI_SWAP_HANDLERS,
};
use crate::messages::{
data::{DataCommand, DataResponse},
execution::{ExecutionReport, TradingCommand},
};
pub fn register_any(endpoint: MStr<Endpoint>, handler: ShareableMessageHandler) {
log::debug!(
"Registering endpoint '{endpoint}' with handler ID {}",
handler.0.id(),
);
get_message_bus()
.borrow_mut()
.endpoints
.insert(endpoint, handler);
}
pub fn register_response_handler(correlation_id: &UUID4, handler: ShareableMessageHandler) {
if let Err(e) = get_message_bus()
.borrow_mut()
.register_response_handler(correlation_id, handler)
{
log::error!("Failed to register request handler: {e}");
}
}
pub fn register_quote_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<QuoteTick>) {
get_message_bus()
.borrow_mut()
.endpoints_quotes
.register(endpoint, handler);
}
#[must_use]
pub fn has_quote_endpoint(endpoint: MStr<Endpoint>) -> bool {
get_message_bus()
.borrow()
.endpoints_quotes
.is_registered(endpoint)
}
pub fn register_trade_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<TradeTick>) {
get_message_bus()
.borrow_mut()
.endpoints_trades
.register(endpoint, handler);
}
pub fn register_bar_endpoint(endpoint: MStr<Endpoint>, handler: TypedHandler<Bar>) {
get_message_bus()
.borrow_mut()
.endpoints_bars
.register(endpoint, handler);
}
pub fn register_order_event_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<OrderEventAny>,
) {
get_message_bus()
.borrow_mut()
.endpoints_order_events
.register(endpoint, handler);
}
pub fn register_account_state_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedHandler<AccountState>,
) {
get_message_bus()
.borrow_mut()
.endpoints_account_state
.register(endpoint, handler);
}
pub fn register_trading_command_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<TradingCommand>,
) {
get_message_bus()
.borrow_mut()
.endpoints_trading_commands
.register(endpoint, handler);
}
pub fn register_data_command_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<DataCommand>,
) {
get_message_bus()
.borrow_mut()
.endpoints_data_commands
.register(endpoint, handler);
}
pub fn register_data_response_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<DataResponse>,
) {
get_message_bus()
.borrow_mut()
.endpoints_data_responses
.register(endpoint, handler);
}
pub fn register_execution_report_endpoint(
endpoint: MStr<Endpoint>,
handler: TypedIntoHandler<ExecutionReport>,
) {
get_message_bus()
.borrow_mut()
.endpoints_exec_reports
.register(endpoint, handler);
}
pub fn register_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<Data>) {
get_message_bus()
.borrow_mut()
.endpoints_data
.register(endpoint, handler);
}
#[cfg(feature = "defi")]
pub fn register_defi_data_endpoint(endpoint: MStr<Endpoint>, handler: TypedIntoHandler<DefiData>) {
get_message_bus()
.borrow_mut()
.endpoints_defi_data
.register(endpoint, handler);
}
pub fn deregister_any(endpoint: MStr<Endpoint>) {
log::debug!("Deregistering endpoint '{endpoint}'");
get_message_bus()
.borrow_mut()
.endpoints
.shift_remove(&endpoint);
}
#[must_use]
pub fn has_endpoint(endpoint: &str) -> bool {
let key: MStr<Endpoint> = Ustr::from(endpoint).into();
get_message_bus().borrow().get_endpoint(key).is_some()
}
pub fn subscribe_any(
pattern: MStr<Pattern>,
handler: ShareableMessageHandler,
priority: Option<u32>,
) {
let msgbus = get_message_bus();
let mut msgbus_ref_mut = msgbus.borrow_mut();
let sub = Subscription::new(pattern, handler, priority);
log::debug!(
"Subscribing {:?} for pattern '{}'",
sub.handler,
sub.pattern
);
if msgbus_ref_mut.subscriptions.contains(&sub) {
log::warn!("{sub:?} already exists");
return;
}
for (topic, subs) in &mut msgbus_ref_mut.topics {
if is_matching_backtracking(*topic, sub.pattern) {
subs.push(sub.clone());
subs.sort();
log::debug!("Added subscription for '{topic}'");
}
}
msgbus_ref_mut.subscriptions.insert(sub);
}
pub fn subscribe_instruments(
pattern: MStr<Pattern>,
handler: TypedHandler<InstrumentAny>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_instruments.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
pub fn subscribe_instrument_close(
pattern: MStr<Pattern>,
handler: ShareableMessageHandler,
priority: Option<u32>,
) {
subscribe_any(pattern, handler, priority);
}
pub fn subscribe_book_deltas(
pattern: MStr<Pattern>,
handler: TypedHandler<OrderBookDeltas>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_deltas
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_book_depth10(
pattern: MStr<Pattern>,
handler: TypedHandler<OrderBookDepth10>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_depth10.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
pub fn subscribe_book_snapshots(
pattern: MStr<Pattern>,
handler: TypedHandler<OrderBook>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_book_snapshots
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_quotes(
pattern: MStr<Pattern>,
handler: TypedHandler<QuoteTick>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_quotes
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_trades(
pattern: MStr<Pattern>,
handler: TypedHandler<TradeTick>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_trades
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_bars(pattern: MStr<Pattern>, handler: TypedHandler<Bar>, priority: Option<u32>) {
get_message_bus()
.borrow_mut()
.router_bars
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_mark_prices(
pattern: MStr<Pattern>,
handler: TypedHandler<MarkPriceUpdate>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_mark_prices.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
pub fn subscribe_index_prices(
pattern: MStr<Pattern>,
handler: TypedHandler<IndexPriceUpdate>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_index_prices
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_funding_rates(
pattern: MStr<Pattern>,
handler: TypedHandler<FundingRateUpdate>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_funding_rates
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_greeks(
pattern: MStr<Pattern>,
handler: TypedHandler<GreeksData>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_greeks
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_option_greeks(
pattern: MStr<Pattern>,
handler: TypedHandler<OptionGreeks>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_option_greeks
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_option_chain(
pattern: MStr<Pattern>,
handler: TypedHandler<OptionChainSlice>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_option_chain
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_order_events(
pattern: MStr<Pattern>,
handler: TypedHandler<OrderEventAny>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_order_events
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_position_events(
pattern: MStr<Pattern>,
handler: TypedHandler<PositionEvent>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_position_events
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_positions(
pattern: MStr<Pattern>,
handler: TypedHandler<Position>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_positions.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
pub fn subscribe_account_state(
pattern: MStr<Pattern>,
handler: TypedHandler<AccountState>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_account_state
.subscribe(pattern, handler, priority.unwrap_or(0));
}
pub fn subscribe_portfolio_snapshot(
pattern: MStr<Pattern>,
handler: TypedHandler<PortfolioSnapshot>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_portfolio.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_blocks(
pattern: MStr<Pattern>,
handler: TypedHandler<Block>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_defi_blocks.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_pools(
pattern: MStr<Pattern>,
handler: TypedHandler<Pool>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_defi_pools.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_swaps(
pattern: MStr<Pattern>,
handler: TypedHandler<PoolSwap>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_defi_swaps.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_liquidity(
pattern: MStr<Pattern>,
handler: TypedHandler<PoolLiquidityUpdate>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_defi_liquidity
.subscribe(pattern, handler, priority.unwrap_or(0));
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_collects(
pattern: MStr<Pattern>,
handler: TypedHandler<PoolFeeCollect>,
priority: Option<u32>,
) {
get_message_bus()
.borrow_mut()
.router_defi_collects
.subscribe(pattern, handler, priority.unwrap_or(0));
}
#[cfg(feature = "defi")]
pub fn subscribe_defi_flash(
pattern: MStr<Pattern>,
handler: TypedHandler<PoolFlash>,
priority: Option<u32>,
) {
get_message_bus().borrow_mut().router_defi_flash.subscribe(
pattern,
handler,
priority.unwrap_or(0),
);
}
pub fn unsubscribe_instruments(pattern: MStr<Pattern>, handler: &TypedHandler<InstrumentAny>) {
get_message_bus()
.borrow_mut()
.router_instruments
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_instrument_close(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
unsubscribe_any(pattern, handler);
}
pub fn unsubscribe_book_deltas(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDeltas>) {
get_message_bus()
.borrow_mut()
.router_deltas
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_book_depth10(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBookDepth10>) {
get_message_bus()
.borrow_mut()
.router_depth10
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_book_snapshots(pattern: MStr<Pattern>, handler: &TypedHandler<OrderBook>) {
get_message_bus()
.borrow_mut()
.router_book_snapshots
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_quotes(pattern: MStr<Pattern>, handler: &TypedHandler<QuoteTick>) {
get_message_bus()
.borrow_mut()
.router_quotes
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_trades(pattern: MStr<Pattern>, handler: &TypedHandler<TradeTick>) {
get_message_bus()
.borrow_mut()
.router_trades
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_bars(pattern: MStr<Pattern>, handler: &TypedHandler<Bar>) {
get_message_bus()
.borrow_mut()
.router_bars
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_mark_prices(pattern: MStr<Pattern>, handler: &TypedHandler<MarkPriceUpdate>) {
get_message_bus()
.borrow_mut()
.router_mark_prices
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_index_prices(pattern: MStr<Pattern>, handler: &TypedHandler<IndexPriceUpdate>) {
get_message_bus()
.borrow_mut()
.router_index_prices
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_funding_rates(
pattern: MStr<Pattern>,
handler: &TypedHandler<FundingRateUpdate>,
) {
get_message_bus()
.borrow_mut()
.router_funding_rates
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_account_state(pattern: MStr<Pattern>, handler: &TypedHandler<AccountState>) {
get_message_bus()
.borrow_mut()
.router_account_state
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_portfolio_snapshot(
pattern: MStr<Pattern>,
handler: &TypedHandler<PortfolioSnapshot>,
) {
get_message_bus()
.borrow_mut()
.router_portfolio
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_order_events(pattern: MStr<Pattern>, handler: &TypedHandler<OrderEventAny>) {
get_message_bus()
.borrow_mut()
.router_order_events
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_position_events(pattern: MStr<Pattern>, handler: &TypedHandler<PositionEvent>) {
get_message_bus()
.borrow_mut()
.router_position_events
.unsubscribe(pattern, handler);
}
pub fn remove_order_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
get_message_bus()
.borrow_mut()
.router_order_events
.remove_handler(pattern, handler_id);
}
pub fn remove_position_event_handler(pattern: MStr<Pattern>, handler_id: Ustr) {
get_message_bus()
.borrow_mut()
.router_position_events
.remove_handler(pattern, handler_id);
}
pub fn unsubscribe_orders(pattern: MStr<Pattern>, handler: &TypedHandler<OrderAny>) {
get_message_bus()
.borrow_mut()
.router_orders
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_positions(pattern: MStr<Pattern>, handler: &TypedHandler<Position>) {
get_message_bus()
.borrow_mut()
.router_positions
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<GreeksData>) {
get_message_bus()
.borrow_mut()
.router_greeks
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_option_greeks(pattern: MStr<Pattern>, handler: &TypedHandler<OptionGreeks>) {
get_message_bus()
.borrow_mut()
.router_option_greeks
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_option_chain(pattern: MStr<Pattern>, handler: &TypedHandler<OptionChainSlice>) {
get_message_bus()
.borrow_mut()
.router_option_chain
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_blocks(pattern: MStr<Pattern>, handler: &TypedHandler<Block>) {
get_message_bus()
.borrow_mut()
.router_defi_blocks
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_pools(pattern: MStr<Pattern>, handler: &TypedHandler<Pool>) {
get_message_bus()
.borrow_mut()
.router_defi_pools
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_swaps(pattern: MStr<Pattern>, handler: &TypedHandler<PoolSwap>) {
get_message_bus()
.borrow_mut()
.router_defi_swaps
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_liquidity(
pattern: MStr<Pattern>,
handler: &TypedHandler<PoolLiquidityUpdate>,
) {
get_message_bus()
.borrow_mut()
.router_defi_liquidity
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_collects(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFeeCollect>) {
get_message_bus()
.borrow_mut()
.router_defi_collects
.unsubscribe(pattern, handler);
}
#[cfg(feature = "defi")]
pub fn unsubscribe_defi_flash(pattern: MStr<Pattern>, handler: &TypedHandler<PoolFlash>) {
get_message_bus()
.borrow_mut()
.router_defi_flash
.unsubscribe(pattern, handler);
}
pub fn unsubscribe_any(pattern: MStr<Pattern>, handler: &ShareableMessageHandler) {
log::debug!("Unsubscribing {handler:?} from pattern '{pattern}'");
let handler_id = handler.0.id();
let bus_rc = get_message_bus();
let mut bus = bus_rc.borrow_mut();
let count_before = bus.subscriptions.len();
bus.topics.values_mut().for_each(|subs| {
subs.retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
});
bus.subscriptions
.retain(|s| !(s.pattern == pattern && s.handler_id == handler_id));
let removed = bus.subscriptions.len() < count_before;
if removed {
log::debug!("Handler for pattern '{pattern}' was removed");
} else {
log::debug!("No matching handler for pattern '{pattern}' was found");
}
}
pub fn is_subscribed_any<T: AsRef<str>>(pattern: T, handler: ShareableMessageHandler) -> bool {
let pattern = MStr::from(pattern.as_ref());
let sub = Subscription::new(pattern, handler, None);
get_message_bus().borrow().subscriptions.contains(&sub)
}
pub fn subscriptions_count_any<S: AsRef<str>>(topic: S) -> anyhow::Result<usize> {
get_message_bus().borrow().subscriptions_count(topic)
}
pub fn subscriber_count_deltas(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_deltas
.subscriber_count(topic)
}
pub fn subscriber_count_depth10(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_depth10
.subscriber_count(topic)
}
pub fn subscriber_count_book_snapshots(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_book_snapshots
.subscriber_count(topic)
}
pub fn exact_subscriber_count_quotes(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_quotes
.exact_subscriber_count(topic)
}
pub fn exact_subscriber_count_trades(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_trades
.exact_subscriber_count(topic)
}
pub fn exact_subscriber_count_mark_prices(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_mark_prices
.exact_subscriber_count(topic)
}
pub fn exact_subscriber_count_index_prices(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_index_prices
.exact_subscriber_count(topic)
}
pub fn exact_subscriber_count_funding_rates(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_funding_rates
.exact_subscriber_count(topic)
}
pub fn exact_subscriber_count_option_greeks(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_option_greeks
.exact_subscriber_count(topic)
}
pub fn exact_subscriber_count_bars(topic: MStr<Topic>) -> usize {
get_message_bus()
.borrow()
.router_bars
.exact_subscriber_count(topic)
}
pub fn publish_any(topic: MStr<Topic>, message: &dyn Any) {
dispatch_tap_publish(topic, message);
let mut handlers = ANY_HANDLERS.with_borrow_mut(std::mem::take);
{
let bus_rc = get_message_bus();
let mut bus = bus_rc.borrow_mut();
bus.fill_matching_any_handlers(topic, &mut handlers);
bus.increment_pub_count();
}
for handler in &handlers {
handler.0.handle(message);
}
handlers.clear(); ANY_HANDLERS.with_borrow_mut(|buf| *buf = handlers);
let Some(custom) = message.downcast_ref::<CustomData>() else {
return;
};
forward_to_external_egress(
topic,
BusPayloadType::Custom(Ustr::from(custom.data.type_name())),
custom,
);
}
pub fn try_publish_any(topic: MStr<Topic>, message: &dyn Any) -> bool {
let Some(bus_rc) = try_get_message_bus() else {
return false;
};
if bus_rc.try_borrow_mut().is_err() {
return false;
}
dispatch_tap_publish(topic, message);
let Ok(mut bus) = bus_rc.try_borrow_mut() else {
return false;
};
let mut handlers = ANY_HANDLERS.with_borrow_mut(std::mem::take);
bus.fill_matching_any_handlers(topic, &mut handlers);
bus.increment_pub_count();
drop(bus);
for handler in &handlers {
handler.0.handle(message);
}
handlers.clear(); ANY_HANDLERS.with_borrow_mut(|buf| *buf = handlers);
true
}
pub fn publish_instrument(topic: MStr<Topic>, instrument: &InstrumentAny) {
publish_typed(
topic,
&INSTRUMENT_HANDLERS,
|bus, h| bus.router_instruments.fill_matching_handlers(topic, h),
instrument,
);
forward_to_external_egress(topic, BusPayloadType::Instrument, instrument);
}
pub fn publish_deltas(topic: MStr<Topic>, deltas: &OrderBookDeltas) {
publish_typed(
topic,
&DELTAS_HANDLERS,
|bus, h| bus.router_deltas.fill_matching_handlers(topic, h),
deltas,
);
forward_to_external_egress(topic, BusPayloadType::OrderBookDeltas, deltas);
}
pub fn publish_depth10(topic: MStr<Topic>, depth: &OrderBookDepth10) {
publish_typed(
topic,
&DEPTH10_HANDLERS,
|bus, h| bus.router_depth10.fill_matching_handlers(topic, h),
depth,
);
forward_to_external_egress(topic, BusPayloadType::OrderBookDepth10, depth);
}
pub fn publish_book(topic: MStr<Topic>, book: &OrderBook) {
publish_typed(
topic,
&BOOK_HANDLERS,
|bus, h| bus.router_book_snapshots.fill_matching_handlers(topic, h),
book,
);
}
pub fn publish_quote(topic: MStr<Topic>, quote: &QuoteTick) {
publish_typed(
topic,
"E_HANDLERS,
|bus, h| bus.router_quotes.fill_matching_handlers(topic, h),
quote,
);
forward_to_external_egress(topic, BusPayloadType::QuoteTick, quote);
}
pub fn publish_trade(topic: MStr<Topic>, trade: &TradeTick) {
publish_typed(
topic,
&TRADE_HANDLERS,
|bus, h| bus.router_trades.fill_matching_handlers(topic, h),
trade,
);
forward_to_external_egress(topic, BusPayloadType::TradeTick, trade);
}
pub fn publish_bar(topic: MStr<Topic>, bar: &Bar) {
publish_typed(
topic,
&BAR_HANDLERS,
|bus, h| bus.router_bars.fill_matching_handlers(topic, h),
bar,
);
forward_to_external_egress(topic, BusPayloadType::Bar, bar);
}
pub fn publish_mark_price(topic: MStr<Topic>, mark_price: &MarkPriceUpdate) {
publish_typed(
topic,
&MARK_PRICE_HANDLERS,
|bus, h| bus.router_mark_prices.fill_matching_handlers(topic, h),
mark_price,
);
forward_to_external_egress(topic, BusPayloadType::MarkPriceUpdate, mark_price);
}
pub fn publish_index_price(topic: MStr<Topic>, index_price: &IndexPriceUpdate) {
publish_typed(
topic,
&INDEX_PRICE_HANDLERS,
|bus, h| bus.router_index_prices.fill_matching_handlers(topic, h),
index_price,
);
forward_to_external_egress(topic, BusPayloadType::IndexPriceUpdate, index_price);
}
pub fn publish_funding_rate(topic: MStr<Topic>, funding_rate: &FundingRateUpdate) {
publish_typed(
topic,
&FUNDING_RATE_HANDLERS,
|bus, h| bus.router_funding_rates.fill_matching_handlers(topic, h),
funding_rate,
);
forward_to_external_egress(topic, BusPayloadType::FundingRateUpdate, funding_rate);
}
pub fn publish_greeks(topic: MStr<Topic>, greeks: &GreeksData) {
publish_typed(
topic,
&GREEKS_HANDLERS,
|bus, h| bus.router_greeks.fill_matching_handlers(topic, h),
greeks,
);
}
pub fn publish_option_greeks(topic: MStr<Topic>, option_greeks: &OptionGreeks) {
publish_typed(
topic,
&OPTION_GREEKS_HANDLERS,
|bus, h| bus.router_option_greeks.fill_matching_handlers(topic, h),
option_greeks,
);
forward_to_external_egress(topic, BusPayloadType::OptionGreeks, option_greeks);
}
pub fn publish_option_chain(topic: MStr<Topic>, slice: &OptionChainSlice) {
publish_typed(
topic,
&OPTION_CHAIN_HANDLERS,
|bus, h| bus.router_option_chain.fill_matching_handlers(topic, h),
slice,
);
}
pub fn publish_account_state(topic: MStr<Topic>, state: &AccountState) {
publish_typed(
topic,
&ACCOUNT_STATE_HANDLERS,
|bus, h| bus.router_account_state.fill_matching_handlers(topic, h),
state,
);
forward_to_external_egress(topic, BusPayloadType::AccountState, state);
}
pub fn publish_portfolio_snapshot(topic: MStr<Topic>, snapshot: &PortfolioSnapshot) {
publish_typed(
topic,
&PORTFOLIO_SNAPSHOT_HANDLERS,
|bus, h| {
bus.router_portfolio.fill_matching_handlers(topic, h);
},
snapshot,
);
forward_to_external_egress(topic, BusPayloadType::PortfolioSnapshot, snapshot);
}
pub fn publish_order_event(topic: MStr<Topic>, event: &OrderEventAny) {
publish_typed(
topic,
&ORDER_EVENT_HANDLERS,
|bus, h| bus.router_order_events.fill_matching_handlers(topic, h),
event,
);
forward_to_external_egress(topic, BusPayloadType::OrderEvent, event);
}
pub fn publish_position_event(topic: MStr<Topic>, event: &PositionEvent) {
publish_typed(
topic,
&POSITION_EVENT_HANDLERS,
|bus, h| bus.router_position_events.fill_matching_handlers(topic, h),
event,
);
forward_to_external_egress(topic, BusPayloadType::PositionEvent, event);
}
#[cfg(feature = "defi")]
pub fn publish_defi_block(topic: MStr<Topic>, block: &Block) {
publish_typed(
topic,
&DEFI_BLOCK_HANDLERS,
|bus, h| bus.router_defi_blocks.fill_matching_handlers(topic, h),
block,
);
forward_to_external_egress(topic, BusPayloadType::Block, block);
}
#[cfg(feature = "defi")]
pub fn publish_defi_pool(topic: MStr<Topic>, pool: &Pool) {
publish_typed(
topic,
&DEFI_POOL_HANDLERS,
|bus, h| bus.router_defi_pools.fill_matching_handlers(topic, h),
pool,
);
forward_to_external_egress(topic, BusPayloadType::Pool, pool);
}
#[cfg(feature = "defi")]
pub fn publish_defi_swap(topic: MStr<Topic>, swap: &PoolSwap) {
publish_typed(
topic,
&DEFI_SWAP_HANDLERS,
|bus, h| bus.router_defi_swaps.fill_matching_handlers(topic, h),
swap,
);
}
#[cfg(feature = "defi")]
pub fn publish_defi_liquidity(topic: MStr<Topic>, update: &PoolLiquidityUpdate) {
publish_typed(
topic,
&DEFI_LIQUIDITY_HANDLERS,
|bus, h| bus.router_defi_liquidity.fill_matching_handlers(topic, h),
update,
);
forward_to_external_egress(topic, BusPayloadType::PoolLiquidityUpdate, update);
}
#[cfg(feature = "defi")]
pub fn publish_defi_collect(topic: MStr<Topic>, collect: &PoolFeeCollect) {
publish_typed(
topic,
&DEFI_COLLECT_HANDLERS,
|bus, h| bus.router_defi_collects.fill_matching_handlers(topic, h),
collect,
);
forward_to_external_egress(topic, BusPayloadType::PoolFeeCollect, collect);
}
#[cfg(feature = "defi")]
pub fn publish_defi_flash(topic: MStr<Topic>, flash: &PoolFlash) {
publish_typed(
topic,
&DEFI_FLASH_HANDLERS,
|bus, h| bus.router_defi_flash.fill_matching_handlers(topic, h),
flash,
);
forward_to_external_egress(topic, BusPayloadType::PoolFlash, flash);
}
#[inline]
fn publish_typed<T: 'static>(
topic: MStr<Topic>,
tls: &'static LocalKey<RefCell<SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>>>,
fill_fn: impl FnOnce(&mut MessageBus, &mut SmallVec<[TypedHandler<T>; HANDLER_BUFFER_CAP]>),
message: &T,
) {
dispatch_tap_publish(topic, message);
let mut handlers = tls.with_borrow_mut(std::mem::take);
let bus_rc = get_message_bus();
{
let mut bus = bus_rc.borrow_mut();
fill_fn(&mut bus, &mut handlers);
bus.increment_pub_count();
}
for handler in &handlers {
handler.handle(message);
}
handlers.clear(); tls.with_borrow_mut(|buf| *buf = handlers);
}
pub fn send_any(endpoint: MStr<Endpoint>, message: &dyn Any) {
dispatch_tap_send(endpoint, message);
let handler = {
let bus = get_message_bus();
let mut bus = bus.borrow_mut();
let handler = bus.get_endpoint(endpoint).cloned();
if handler.is_some() {
bus.increment_sent_count();
}
handler
};
if let Some(handler) = handler {
handler.0.handle(message);
} else {
log::error!("send_any: no registered endpoint '{endpoint}'");
}
}
pub fn send_any_value<T: 'static>(endpoint: MStr<Endpoint>, message: &T) {
dispatch_tap_send(endpoint, message);
let handler = {
let bus = get_message_bus();
let mut bus = bus.borrow_mut();
let handler = bus.get_endpoint(endpoint).cloned();
if handler.is_some() {
bus.increment_sent_count();
}
handler
};
if let Some(handler) = handler {
handler.0.handle(message);
} else {
log::error!("send_any_value: no registered endpoint '{endpoint}'");
}
}
pub fn send_response(correlation_id: &UUID4, message: &DataResponse) {
dispatch_tap_response(correlation_id, message);
let handler = {
let bus = get_message_bus();
let mut bus = bus.borrow_mut();
let handler = bus.get_response_handler(correlation_id).cloned();
bus.increment_res_count();
handler
};
if let Some(handler) = handler {
match message {
DataResponse::Data(resp) => handler.0.handle(resp),
DataResponse::Instrument(resp) => handler.0.handle(resp.as_ref()),
DataResponse::Instruments(resp) => handler.0.handle(resp),
DataResponse::Book(resp) => handler.0.handle(resp),
DataResponse::BookDeltas(resp) => handler.0.handle(resp),
DataResponse::BookDepth(resp) => handler.0.handle(resp),
DataResponse::Quotes(resp) => handler.0.handle(resp),
DataResponse::Trades(resp) => handler.0.handle(resp),
DataResponse::FundingRates(resp) => handler.0.handle(resp),
DataResponse::ForwardPrices(resp) => handler.0.handle(resp),
DataResponse::Bars(resp) => handler.0.handle(resp),
}
} else {
log::error!("send_response: handler not found for correlation_id '{correlation_id}'");
}
}
pub fn send_quote(endpoint: MStr<Endpoint>, quote: &QuoteTick) {
send_endpoint_ref(
endpoint,
quote,
|bus| bus.endpoints_quotes.get(endpoint),
"send_quote",
);
}
pub fn send_trade(endpoint: MStr<Endpoint>, trade: &TradeTick) {
send_endpoint_ref(
endpoint,
trade,
|bus| bus.endpoints_trades.get(endpoint),
"send_trade",
);
}
pub fn send_bar(endpoint: MStr<Endpoint>, bar: &Bar) {
send_endpoint_ref(
endpoint,
bar,
|bus| bus.endpoints_bars.get(endpoint),
"send_bar",
);
}
pub fn send_order_event(endpoint: MStr<Endpoint>, event: OrderEventAny) {
send_endpoint_owned(
endpoint,
event,
|bus| bus.endpoints_order_events.get(endpoint),
"send_order_event",
);
}
pub fn send_account_state(endpoint: MStr<Endpoint>, state: &AccountState) {
send_endpoint_ref(
endpoint,
state,
|bus| bus.endpoints_account_state.get(endpoint),
"send_account_state",
);
}
pub fn send_trading_command(endpoint: MStr<Endpoint>, command: TradingCommand) {
send_endpoint_owned(
endpoint,
command,
|bus| bus.endpoints_trading_commands.get(endpoint),
"send_trading_command",
);
}
pub fn send_data_command(endpoint: MStr<Endpoint>, command: DataCommand) {
let is_request = data_command_is_request(&command);
send_endpoint_owned_counted(
endpoint,
command,
|bus| bus.endpoints_data_commands.get(endpoint),
"send_data_command",
is_request,
);
}
pub fn send_data_response(endpoint: MStr<Endpoint>, response: DataResponse) {
send_endpoint_owned(
endpoint,
response,
|bus| bus.endpoints_data_responses.get(endpoint),
"send_data_response",
);
}
pub fn send_execution_report(endpoint: MStr<Endpoint>, report: ExecutionReport) {
send_endpoint_owned(
endpoint,
report,
|bus| bus.endpoints_exec_reports.get(endpoint),
"send_execution_report",
);
}
pub fn send_data(endpoint: MStr<Endpoint>, data: Data) {
send_endpoint_owned(
endpoint,
data,
|bus| bus.endpoints_data.get(endpoint),
"send_data",
);
}
#[cfg(feature = "defi")]
pub fn send_defi_data(endpoint: MStr<Endpoint>, data: DefiData) {
send_endpoint_owned(
endpoint,
data,
|bus| bus.endpoints_defi_data.get(endpoint),
"send_defi_data",
);
}
#[inline]
fn send_endpoint_ref<T: 'static, F>(
endpoint: MStr<Endpoint>,
message: &T,
get_handler: F,
fn_name: &str,
) where
F: FnOnce(&MessageBus) -> Option<&TypedHandler<T>>,
{
dispatch_tap_send(endpoint, message);
let handler = {
let bus = get_message_bus();
let mut bus = bus.borrow_mut();
let handler = get_handler(&bus).cloned();
if handler.is_some() {
bus.increment_sent_count();
}
handler
};
if let Some(handler) = handler {
handler.handle(message);
} else {
log::error!("{fn_name}: no registered endpoint '{endpoint}'");
}
}
#[inline]
fn send_endpoint_owned<T: 'static, F>(
endpoint: MStr<Endpoint>,
message: T,
get_handler: F,
fn_name: &str,
) where
F: FnOnce(&MessageBus) -> Option<&TypedIntoHandler<T>>,
{
send_endpoint_owned_counted(endpoint, message, get_handler, fn_name, false);
}
#[inline]
fn send_endpoint_owned_counted<T: 'static, F>(
endpoint: MStr<Endpoint>,
message: T,
get_handler: F,
fn_name: &str,
count_request: bool,
) where
F: FnOnce(&MessageBus) -> Option<&TypedIntoHandler<T>>,
{
dispatch_tap_send(endpoint, &message);
let handler = {
let bus = get_message_bus();
let mut bus = bus.borrow_mut();
let handler = get_handler(&bus).cloned();
if handler.is_some() {
bus.increment_sent_count();
if count_request {
bus.increment_req_count();
}
}
handler
};
if let Some(handler) = handler {
handler.handle(message);
} else {
log::error!("{fn_name}: no registered endpoint '{endpoint}'");
}
}
#[inline]
fn data_command_is_request(command: &DataCommand) -> bool {
match command {
DataCommand::Request(_) => true,
#[cfg(feature = "defi")]
DataCommand::DefiRequest(_) => true,
_ => false,
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "defi")]
use std::sync::Arc;
use std::{
cell::{Cell, RefCell},
rc::Rc,
thread,
};
#[cfg(feature = "defi")]
use alloy_primitives::{U256, address};
use bytes::Bytes;
use nautilus_core::{UUID4, UnixNanos};
#[cfg(feature = "defi")]
use nautilus_model::defi::{
AmmType, Chain, Dex, DexType, PoolIdentifier, PoolLiquidityUpdateType, Token,
};
#[cfg(any(feature = "sbe", feature = "capnp"))]
use nautilus_model::{data::OptionGreekValues, enums::GreeksConvention};
use nautilus_model::{
data::{
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OptionGreeks,
OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
stubs::{stub_custom_data, stub_deltas, stub_depth10},
},
enums::{AccountType, OrderSide, PositionSide},
events::{OrderEventAny, PositionEvent, PositionOpened, order::spec::OrderDeniedSpec},
identifiers::{
AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId,
},
instruments::{InstrumentAny, stubs::audusd_sim},
types::{Currency, Price, Quantity},
};
#[cfg(feature = "sbe")]
use nautilus_serialization::sbe::FromSbe;
#[cfg(feature = "capnp")]
use nautilus_serialization::{capnp::FromCapnp, market_capnp};
use rstest::rstest;
use rust_decimal::Decimal;
use super::*;
use crate::{
enums::SerializationEncoding,
messages::{
data::{
DataCommand, DataResponse, QuotesResponse, RequestCommand, RequestQuotes,
SubscribeCommand, SubscribeQuotes,
},
execution::{CancelAllOrders, TradingCommand},
},
msgbus::{
BusMessage, BusTap, MessageBusConfig, MessageBusExternalEgress, SuppressExternalGuard,
clear_bus_tap, set_bus_tap, set_message_bus, stubs::get_call_check_handler,
},
};
#[derive(Debug)]
struct CapturedEgressMessage {
topic: String,
encoding: SerializationEncoding,
payload_type: BusPayloadType,
payload: Bytes,
}
struct CapturingExternalEgress {
publications: Rc<RefCell<Vec<CapturedEgressMessage>>>,
closed: Cell<bool>,
}
impl CapturingExternalEgress {
fn new() -> (Self, Rc<RefCell<Vec<CapturedEgressMessage>>>) {
let publications = Rc::new(RefCell::new(Vec::new()));
(
Self {
publications: publications.clone(),
closed: Cell::new(false),
},
publications,
)
}
}
impl MessageBusExternalEgress for CapturingExternalEgress {
fn is_closed(&self) -> bool {
self.closed.get()
}
fn publish(&self, message: BusMessage) {
self.publications.borrow_mut().push(CapturedEgressMessage {
topic: message.topic.to_string(),
encoding: message.encoding,
payload_type: message.payload_type,
payload: message.payload,
});
}
fn close(&mut self) {
self.closed.set(true);
}
}
fn install_capturing_external_egress(
encoding: SerializationEncoding,
) -> Rc<RefCell<Vec<CapturedEgressMessage>>> {
let msgbus = Rc::new(RefCell::new(MessageBus::default()));
set_message_bus(msgbus.clone());
let (external_egress, publications) = CapturingExternalEgress::new();
msgbus
.borrow_mut()
.set_external_egress(Box::new(external_egress), encoding);
publications
}
fn install_capturing_external_egress_config(
config: &MessageBusConfig,
) -> Rc<RefCell<Vec<CapturedEgressMessage>>> {
let msgbus = Rc::new(RefCell::new(MessageBus::default()));
set_message_bus(msgbus.clone());
let (external_egress, publications) = CapturingExternalEgress::new();
msgbus
.borrow_mut()
.set_external_egress_config(Box::new(external_egress), config)
.expect("message bus config must be valid");
publications
}
fn reset_message_bus() {
get_message_bus().borrow_mut().dispose();
set_message_bus(Rc::new(RefCell::new(MessageBus::default())));
}
#[rstest]
#[case(SerializationEncoding::MsgPack)]
#[case(SerializationEncoding::Json)]
fn publish_quote_forwards_decodable_payload_to_external_egress(
#[case] encoding: SerializationEncoding,
) {
let publications = install_capturing_external_egress(encoding);
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].topic, "data.quotes.TEST");
let decoded: QuoteTick = match encoding {
SerializationEncoding::MsgPack => rmp_serde::from_slice(&publications[0].payload)
.expect("MsgPack payload must decode as QuoteTick"),
SerializationEncoding::Json => serde_json::from_slice(&publications[0].payload)
.expect("JSON payload must decode as QuoteTick"),
SerializationEncoding::Sbe | SerializationEncoding::Capnp => {
unreachable!("schema encodings are tested separately")
}
};
let payload_value: serde_json::Value = match encoding {
SerializationEncoding::MsgPack => rmp_serde::from_slice(&publications[0].payload)
.expect("MsgPack payload must decode as a value"),
SerializationEncoding::Json => serde_json::from_slice(&publications[0].payload)
.expect("JSON payload must decode as a value"),
SerializationEncoding::Sbe | SerializationEncoding::Capnp => {
unreachable!("schema encodings are tested separately")
}
};
assert_eq!(
payload_value.get("type").and_then(|value| value.as_str()),
Some("QuoteTick")
);
assert_eq!(decoded, quote);
drop(publications);
reset_message_bus();
}
fn assert_quote_round_trips(encoding: SerializationEncoding) {
let publications = install_capturing_external_egress(encoding);
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
let bus_message = {
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].payload_type, BusPayloadType::QuoteTick);
assert_eq!(publications[0].encoding, encoding);
BusMessage::with_str_topic(
publications[0].topic.clone(),
publications[0].payload_type,
publications[0].payload.clone(),
publications[0].encoding,
)
};
publications.borrow_mut().clear();
let received = Rc::new(RefCell::new(Vec::<QuoteTick>::new()));
let received_handler = received.clone();
let handler = TypedHandler::from(move |quote: &QuoteTick| {
received_handler.borrow_mut().push(*quote);
});
subscribe_quotes("data.quotes.*".into(), handler, None);
get_message_bus()
.borrow_mut()
.add_streaming_type(BusPayloadType::QuoteTick);
republish_external_message(&bus_message).unwrap();
assert_eq!(*received.borrow(), vec![quote]);
assert!(
publications.borrow().is_empty(),
"republished message must not be forwarded back out externally"
);
reset_message_bus();
}
#[rstest]
#[case(SerializationEncoding::Json)]
#[case(SerializationEncoding::MsgPack)]
fn republish_external_message_round_trips_quote(#[case] encoding: SerializationEncoding) {
assert_quote_round_trips(encoding);
}
#[cfg(feature = "sbe")]
#[rstest]
fn republish_external_message_round_trips_quote_sbe() {
assert_quote_round_trips(SerializationEncoding::Sbe);
}
#[cfg(feature = "capnp")]
#[rstest]
fn republish_external_message_round_trips_quote_capnp() {
assert_quote_round_trips(SerializationEncoding::Capnp);
}
fn assert_typed_external_round_trips<T>(
encoding: SerializationEncoding,
payload_type: BusPayloadType,
topic: &str,
value: T,
publish: fn(MStr<Topic>, &T),
subscribe: fn(MStr<Pattern>, TypedHandler<T>, Option<u32>),
assert_received: impl Fn(&T, &T),
) where
T: Clone + 'static,
{
let publications = install_capturing_external_egress(encoding);
publish(topic.into(), &value);
let bus_message = {
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].payload_type, payload_type);
assert_eq!(publications[0].encoding, encoding);
BusMessage::with_str_topic(
publications[0].topic.clone(),
publications[0].payload_type,
publications[0].payload.clone(),
publications[0].encoding,
)
};
publications.borrow_mut().clear();
let received = Rc::new(RefCell::new(Vec::<T>::new()));
let received_handler = received.clone();
let handler = TypedHandler::from(move |message: &T| {
received_handler.borrow_mut().push(message.clone());
});
subscribe(topic.into(), handler, None);
get_message_bus()
.borrow_mut()
.add_streaming_type(payload_type);
republish_external_message(&bus_message).unwrap();
let received = received.borrow();
assert_eq!(received.len(), 1);
assert_received(&received[0], &value);
assert!(
publications.borrow().is_empty(),
"republished message must not be forwarded back out externally"
);
reset_message_bus();
}
fn assert_eq_ref<T>(actual: &T, expected: &T)
where
T: PartialEq + std::fmt::Debug,
{
assert_eq!(actual, expected);
}
fn assert_json_value_eq<T>(actual: &T, expected: &T)
where
T: serde::Serialize,
{
assert_eq!(
serde_json::to_value(actual).expect("actual value must serialize"),
serde_json::to_value(expected).expect("expected value must serialize"),
);
}
fn assert_depth10_market_eq(actual: &OrderBookDepth10, expected: &OrderBookDepth10) {
assert_eq!(actual.instrument_id, expected.instrument_id);
assert_eq!(actual.bid_counts, expected.bid_counts);
assert_eq!(actual.ask_counts, expected.ask_counts);
assert_eq!(actual.flags, expected.flags);
assert_eq!(actual.sequence, expected.sequence);
assert_eq!(actual.ts_event, expected.ts_event);
assert_eq!(actual.ts_init, expected.ts_init);
for (actual, expected) in actual.bids.iter().zip(expected.bids.iter()) {
assert_eq!(actual.side, expected.side);
assert_eq!(actual.price, expected.price);
assert_eq!(actual.size, expected.size);
}
for (actual, expected) in actual.asks.iter().zip(expected.asks.iter()) {
assert_eq!(actual.side, expected.side);
assert_eq!(actual.price, expected.price);
assert_eq!(actual.size, expected.size);
}
}
fn mark_price_update() -> MarkPriceUpdate {
MarkPriceUpdate::new(
InstrumentId::from("AUDUSD.SIM"),
Price::from("1.00010"),
UnixNanos::from(1),
UnixNanos::from(2),
)
}
fn index_price_update() -> IndexPriceUpdate {
IndexPriceUpdate::new(
InstrumentId::from("AUDUSD.SIM"),
Price::from("1.00020"),
UnixNanos::from(3),
UnixNanos::from(4),
)
}
fn funding_rate_update() -> FundingRateUpdate {
FundingRateUpdate::new(
InstrumentId::from("AUDUSD.SIM"),
Decimal::new(1, 4),
Some(480),
Some(UnixNanos::from(5)),
UnixNanos::from(6),
UnixNanos::from(7),
)
}
#[cfg(any(feature = "sbe", feature = "capnp"))]
fn option_greeks() -> OptionGreeks {
OptionGreeks {
instrument_id: InstrumentId::from("BTC-30JUN23-40000-C.DERIBIT"),
convention: GreeksConvention::PriceAdjusted,
greeks: OptionGreekValues {
delta: 0.525,
gamma: 0.00032,
vega: 12.25,
theta: -0.72,
rho: 0.18,
},
mark_iv: Some(0.0),
bid_iv: None,
ask_iv: Some(0.54),
underlying_price: Some(41_500.25),
open_interest: Some(0.0),
ts_event: UnixNanos::from(20),
ts_init: UnixNanos::from(21),
}
}
fn portfolio_snapshot() -> PortfolioSnapshot {
PortfolioSnapshot::new(
AccountId::from("SIM-001"),
AccountType::Cash,
Some(Currency::USD()),
vec![],
vec![],
vec![],
vec![],
vec![],
UUID4::new(),
UnixNanos::from(8),
UnixNanos::from(9),
)
}
fn position_event() -> PositionEvent {
PositionEvent::PositionOpened(PositionOpened {
trader_id: TraderId::from("TRADER-001"),
strategy_id: StrategyId::from("S-001"),
instrument_id: InstrumentId::from("AUDUSD.SIM"),
position_id: PositionId::from("P-001"),
account_id: AccountId::from("SIM-001"),
opening_order_id: ClientOrderId::from("O-19700101-000000-001-001-1"),
entry: OrderSide::Buy,
side: PositionSide::Long,
signed_qty: 100.0,
quantity: Quantity::from("100"),
last_qty: Quantity::from("100"),
last_px: Price::from("1.00000"),
currency: Currency::USD(),
avg_px_open: 1.0,
event_id: UUID4::new(),
ts_event: UnixNanos::from(10),
ts_init: UnixNanos::from(11),
})
}
#[cfg(feature = "defi")]
fn defi_chain() -> Arc<Chain> {
Arc::new(
Chain::from_chain_id(42161)
.expect("Arbitrum chain must be registered")
.clone(),
)
}
#[cfg(feature = "defi")]
fn defi_dex() -> Arc<Dex> {
let chain = Chain::from_chain_id(42161)
.expect("Arbitrum chain must be registered")
.clone();
Arc::new(Dex::new(
chain,
DexType::UniswapV3,
"0x1F98431c8aD98523631AE4a59f267346ea31F984",
0,
AmmType::CLAMM,
"PoolCreated",
"Swap",
"Mint",
"Burn",
"Collect",
))
}
#[cfg(feature = "defi")]
fn defi_pool() -> Pool {
let chain = defi_chain();
let dex = defi_dex();
let rain = Token::new(
chain.clone(),
address!("0x25118290e6A5f4139381D072181157035864099d"),
"RAIN".to_string(),
"RAIN".to_string(),
18,
);
let weth = Token::new(
chain.clone(),
address!("0x82aF49447D8a07e3bd95BD0d56f35241523fBab1"),
"Wrapped Ether".to_string(),
"WETH".to_string(),
18,
);
let pool_address = address!("0xd13040d4fe917EE704158CfCB3338dCd2838B245");
Pool::new(
chain,
dex,
pool_address,
PoolIdentifier::from_address(pool_address),
0,
rain,
weth,
Some(3000),
Some(60),
UnixNanos::from(12),
)
}
#[cfg(feature = "defi")]
fn defi_block() -> Block {
Block::new(
"0x0000000000000000000000000000000000000000000000000000000000000100".to_string(),
"0x0000000000000000000000000000000000000000000000000000000000000099".to_string(),
100,
Ustr::from("0x0000000000000000000000000000000000000001"),
30_000_000,
21_000,
UnixNanos::from(13),
None,
)
}
#[cfg(feature = "defi")]
fn defi_transaction_hash() -> String {
"0x1aa3506e78dd6e7e53986fa310c7ef1b7825042e19693c04eb56b2404067407b".to_string()
}
#[cfg(feature = "defi")]
fn defi_liquidity_update() -> PoolLiquidityUpdate {
let pool = defi_pool();
PoolLiquidityUpdate::new(
pool.chain.clone(),
pool.dex.clone(),
pool.instrument_id,
pool.pool_identifier,
PoolLiquidityUpdateType::Mint,
100_000,
defi_transaction_hash(),
0,
1,
None,
address!("0x5E325eDA8064b456f4781070C0738d849c824258"),
100,
U256::from(10),
U256::from(20),
-120,
120,
UnixNanos::from(14),
UnixNanos::from(15),
)
}
#[cfg(feature = "defi")]
fn defi_collect() -> PoolFeeCollect {
let pool = defi_pool();
PoolFeeCollect::new(
pool.chain.clone(),
pool.dex.clone(),
pool.instrument_id,
pool.pool_identifier,
100_000,
defi_transaction_hash(),
0,
2,
address!("0x5E325eDA8064b456f4781070C0738d849c824258"),
10,
20,
-120,
120,
UnixNanos::from(16),
UnixNanos::from(17),
)
}
#[cfg(feature = "defi")]
fn defi_flash() -> PoolFlash {
let pool = defi_pool();
PoolFlash::new(
pool.chain.clone(),
pool.dex.clone(),
pool.instrument_id,
pool.pool_identifier,
100_000,
defi_transaction_hash(),
0,
3,
UnixNanos::from(18),
UnixNanos::from(19),
address!("0x1aa3506e78dd6e7e53986fa310c7ef1b7825042e"),
address!("0x1aa3506e78dd6e7e53986fa310c7ef1b7825042e"),
U256::from(100),
U256::from(200),
U256::from(101),
U256::from(202),
)
}
fn assert_publishable_json_msgpack_round_trips(encoding: SerializationEncoding) {
assert_typed_external_round_trips(
encoding,
BusPayloadType::Instrument,
"data.instruments.AUDUSD.SIM",
InstrumentAny::CurrencyPair(audusd_sim()),
publish_instrument,
subscribe_instruments,
assert_json_value_eq,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::OrderBookDeltas,
"data.book.deltas.AAPL.XNAS",
stub_deltas(),
publish_deltas,
subscribe_book_deltas,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::OrderBookDepth10,
"data.book.depth10.AAPL.XNAS",
stub_depth10(),
publish_depth10,
subscribe_book_depth10,
assert_depth10_market_eq,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::TradeTick,
"data.trades.AUDUSD.SIM",
TradeTick::default(),
publish_trade,
subscribe_trades,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::Bar,
"data.bars.AUDUSD.SIM",
Bar::default(),
publish_bar,
subscribe_bars,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::MarkPriceUpdate,
"data.mark_prices.AUDUSD.SIM",
mark_price_update(),
publish_mark_price,
subscribe_mark_prices,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::IndexPriceUpdate,
"data.index_prices.AUDUSD.SIM",
index_price_update(),
publish_index_price,
subscribe_index_prices,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::FundingRateUpdate,
"data.funding_rates.AUDUSD.SIM",
funding_rate_update(),
publish_funding_rate,
subscribe_funding_rates,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::OptionGreeks,
"data.option_greeks.AUDUSD.SIM",
OptionGreeks::default(),
publish_option_greeks,
subscribe_option_greeks,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::AccountState,
"events.account.SIM-001",
nautilus_model::events::account::stubs::cash_account_state(),
publish_account_state,
subscribe_account_state,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::PortfolioSnapshot,
"events.portfolio.SIM-001",
portfolio_snapshot(),
publish_portfolio_snapshot,
subscribe_portfolio_snapshot,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::OrderEvent,
"events.orders.SIM-001",
OrderEventAny::Denied(OrderDeniedSpec::builder().build()),
publish_order_event,
subscribe_order_events,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::PositionEvent,
"events.positions.SIM-001",
position_event(),
publish_position_event,
subscribe_position_events,
assert_json_value_eq,
);
}
#[cfg(feature = "defi")]
fn assert_publishable_defi_json_msgpack_round_trips(encoding: SerializationEncoding) {
assert_typed_external_round_trips(
encoding,
BusPayloadType::Block,
"data.defi.blocks.ARBITRUM",
defi_block(),
publish_defi_block,
subscribe_defi_blocks,
assert_json_value_eq,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::Pool,
"data.defi.pools.RAIN-WETH",
defi_pool(),
publish_defi_pool,
subscribe_defi_pools,
assert_json_value_eq,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::PoolLiquidityUpdate,
"data.defi.liquidity.RAIN-WETH",
defi_liquidity_update(),
publish_defi_liquidity,
subscribe_defi_liquidity,
assert_json_value_eq,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::PoolFeeCollect,
"data.defi.collects.RAIN-WETH",
defi_collect(),
publish_defi_collect,
subscribe_defi_collects,
assert_json_value_eq,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::PoolFlash,
"data.defi.flash.RAIN-WETH",
defi_flash(),
publish_defi_flash,
subscribe_defi_flash,
assert_json_value_eq,
);
}
#[rstest]
#[case(SerializationEncoding::Json)]
#[case(SerializationEncoding::MsgPack)]
fn republish_external_message_round_trips_publishable_json_msgpack(
#[case] encoding: SerializationEncoding,
) {
assert_publishable_json_msgpack_round_trips(encoding);
}
#[cfg(feature = "defi")]
#[rstest]
#[case(SerializationEncoding::Json)]
#[case(SerializationEncoding::MsgPack)]
fn republish_external_message_round_trips_publishable_defi_json_msgpack(
#[case] encoding: SerializationEncoding,
) {
assert_publishable_defi_json_msgpack_round_trips(encoding);
}
fn assert_custom_data_round_trips(encoding: SerializationEncoding) {
let publications = install_capturing_external_egress(encoding);
let custom = stub_custom_data(100, 42, None, Some("stub-id".to_string()));
publish_any("data.custom.StubCustomData".into(), &custom);
let bus_message = {
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(
publications[0].payload_type,
BusPayloadType::Custom(Ustr::from("StubCustomData"))
);
assert_eq!(publications[0].encoding, encoding);
BusMessage::with_str_topic(
publications[0].topic.clone(),
publications[0].payload_type,
publications[0].payload.clone(),
publications[0].encoding,
)
};
publications.borrow_mut().clear();
let received = Rc::new(RefCell::new(Vec::<CustomData>::new()));
let received_handler = received.clone();
subscribe_any(
"data.custom.StubCustomData".into(),
ShareableMessageHandler::from_typed(move |message: &CustomData| {
received_handler.borrow_mut().push(message.clone());
}),
None,
);
get_message_bus()
.borrow_mut()
.add_streaming_type(BusPayloadType::Custom(Ustr::from("StubCustomData")));
republish_external_message(&bus_message).unwrap();
assert_eq!(*received.borrow(), vec![custom]);
assert!(
publications.borrow().is_empty(),
"republished message must not be forwarded back out externally"
);
reset_message_bus();
}
#[rstest]
#[case(SerializationEncoding::Json)]
#[case(SerializationEncoding::MsgPack)]
fn republish_external_message_round_trips_custom_data(#[case] encoding: SerializationEncoding) {
assert_custom_data_round_trips(encoding);
}
#[rstest]
#[case(SerializationEncoding::Json)]
#[case(SerializationEncoding::MsgPack)]
fn republish_external_message_skips_unregistered_custom_payload(
#[case] encoding: SerializationEncoding,
) {
let envelope = serde_json::json!({
"type": "UnregisteredCustomData",
"data_type": {
"type_name": "UnregisteredCustomData",
"metadata": {},
},
"payload": {
"value": 1,
},
});
let payload = match encoding {
SerializationEncoding::Json => {
serde_json::to_vec(&envelope).expect("JSON envelope must serialize")
}
SerializationEncoding::MsgPack => {
rmp_serde::to_vec_named(&envelope).expect("MsgPack envelope must serialize")
}
SerializationEncoding::Sbe | SerializationEncoding::Capnp => {
unreachable!("schema encodings do not support custom payloads")
}
};
let message = BusMessage::with_str_topic(
"data.custom.UnregisteredCustomData",
BusPayloadType::Custom(Ustr::from("UnregisteredCustomData")),
Bytes::from(payload),
encoding,
);
get_message_bus()
.borrow_mut()
.add_streaming_type(BusPayloadType::Custom(Ustr::from("UnregisteredCustomData")));
republish_external_message(&message).unwrap();
reset_message_bus();
}
#[rstest]
fn republish_external_message_skips_untyped_custom_payload() {
let message = BusMessage::with_str_topic(
"events/control",
BusPayloadType::Custom(Ustr::default()),
Bytes::new(),
SerializationEncoding::Json,
);
republish_external_message(&message).unwrap();
reset_message_bus();
}
#[rstest]
fn republish_external_message_skips_unregistered_streaming_type_before_decode() {
let received = Rc::new(RefCell::new(Vec::<QuoteTick>::new()));
let received_handler = received.clone();
let handler = TypedHandler::from(move |quote: &QuoteTick| {
received_handler.borrow_mut().push(*quote);
});
subscribe_quotes("data.quotes.*".into(), handler, None);
let message = BusMessage::with_str_topic(
"data.quotes.AUDUSD.SIM",
BusPayloadType::QuoteTick,
Bytes::from_static(b"not-json"),
SerializationEncoding::Json,
);
republish_external_message(&message).unwrap();
assert!(received.borrow().is_empty());
reset_message_bus();
}
#[cfg(any(feature = "sbe", feature = "capnp"))]
fn assert_market_data_binary_round_trips(encoding: SerializationEncoding) {
assert_typed_external_round_trips(
encoding,
BusPayloadType::OrderBookDeltas,
"data.book.deltas.AAPL.XNAS",
stub_deltas(),
publish_deltas,
subscribe_book_deltas,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::OrderBookDepth10,
"data.book.depth10.AAPL.XNAS",
stub_depth10(),
publish_depth10,
subscribe_book_depth10,
assert_depth10_market_eq,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::QuoteTick,
"data.quotes.AUDUSD.SIM",
QuoteTick::default(),
publish_quote,
subscribe_quotes,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::TradeTick,
"data.trades.AUDUSD.SIM",
TradeTick::default(),
publish_trade,
subscribe_trades,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::Bar,
"data.bars.AUDUSD.SIM",
Bar::default(),
publish_bar,
subscribe_bars,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::MarkPriceUpdate,
"data.mark_prices.AUDUSD.SIM",
mark_price_update(),
publish_mark_price,
subscribe_mark_prices,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::IndexPriceUpdate,
"data.index_prices.AUDUSD.SIM",
index_price_update(),
publish_index_price,
subscribe_index_prices,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::FundingRateUpdate,
"data.funding_rates.AUDUSD.SIM",
funding_rate_update(),
publish_funding_rate,
subscribe_funding_rates,
assert_eq_ref,
);
assert_typed_external_round_trips(
encoding,
BusPayloadType::OptionGreeks,
"data.option_greeks.BTC-30JUN23-40000-C.DERIBIT",
option_greeks(),
publish_option_greeks,
subscribe_option_greeks,
assert_eq_ref,
);
}
#[cfg(feature = "sbe")]
#[rstest]
fn republish_external_message_round_trips_market_data_sbe() {
assert_market_data_binary_round_trips(SerializationEncoding::Sbe);
}
#[cfg(feature = "capnp")]
#[rstest]
fn republish_external_message_round_trips_market_data_capnp() {
assert_market_data_binary_round_trips(SerializationEncoding::Capnp);
}
#[rstest]
#[case(BusPayloadType::AccountState)]
fn republish_external_message_skips_unsupported_binary_payload(
#[case] payload_type: BusPayloadType,
) {
let received = Rc::new(RefCell::new(Vec::<serde_json::Value>::new()));
let account_received = received.clone();
subscribe_account_state(
"events.unsupported.*".into(),
TypedHandler::from(move |state: &AccountState| {
account_received
.borrow_mut()
.push(serde_json::to_value(state).unwrap());
}),
None,
);
for encoding in [SerializationEncoding::Sbe, SerializationEncoding::Capnp] {
let message = BusMessage::with_str_topic(
"events.unsupported.payload",
payload_type,
Bytes::from_static(b"malformed unsupported payload"),
encoding,
);
get_message_bus()
.borrow_mut()
.add_streaming_type(payload_type);
republish_external_message(&message).unwrap();
}
assert!(received.borrow().is_empty());
reset_message_bus();
}
#[rstest]
fn republish_external_message_errors_for_malformed_supported_payload() {
let message = BusMessage::with_str_topic(
"data.quotes.AUDUSD.SIM",
BusPayloadType::QuoteTick,
Bytes::from_static(b"not-json"),
SerializationEncoding::Json,
);
get_message_bus()
.borrow_mut()
.add_streaming_type(BusPayloadType::QuoteTick);
let error = republish_external_message(&message).unwrap_err();
assert!(
error
.to_string()
.contains("failed to decode JSON QuoteTick"),
"{error:?}"
);
reset_message_bus();
}
#[cfg(feature = "sbe")]
#[rstest]
fn publish_quote_sbe_forwards_decodable_payload_to_external_egress() {
let publications = install_capturing_external_egress(SerializationEncoding::Sbe);
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].topic, "data.quotes.TEST");
assert_eq!(
QuoteTick::from_sbe(&publications[0].payload)
.expect("SBE payload must decode as QuoteTick"),
quote
);
drop(publications);
reset_message_bus();
}
#[cfg(feature = "sbe")]
#[rstest]
fn publish_option_greeks_sbe_forwards_decodable_payload_to_external_egress() {
let publications = install_capturing_external_egress(SerializationEncoding::Sbe);
let greeks = option_greeks();
publish_option_greeks("data.option_greeks.TEST".into(), &greeks);
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].topic, "data.option_greeks.TEST");
assert_eq!(
OptionGreeks::from_sbe(&publications[0].payload)
.expect("SBE payload must decode as OptionGreeks"),
greeks
);
drop(publications);
reset_message_bus();
}
#[cfg(not(feature = "sbe"))]
#[rstest]
fn publish_quote_sbe_without_feature_drops_payload() {
let publications = install_capturing_external_egress(SerializationEncoding::Sbe);
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
assert!(publications.borrow().is_empty());
reset_message_bus();
}
#[cfg(feature = "capnp")]
#[rstest]
fn publish_quote_capnp_forwards_decodable_payload_to_external_egress() {
let publications = install_capturing_external_egress(SerializationEncoding::Capnp);
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].topic, "data.quotes.TEST");
let reader = capnp::serialize::read_message(
&mut &publications[0].payload[..],
capnp::message::ReaderOptions::new(),
)
.expect("Cap'n Proto payload must be readable");
let root = reader
.get_root::<market_capnp::quote_tick::Reader>()
.expect("Cap'n Proto payload must have a QuoteTick root");
let decoded =
QuoteTick::from_capnp(root).expect("Cap'n Proto payload must decode as QuoteTick");
assert_eq!(decoded, quote);
drop(publications);
reset_message_bus();
}
#[cfg(feature = "capnp")]
#[rstest]
fn publish_option_greeks_capnp_forwards_decodable_payload_to_external_egress() {
let publications = install_capturing_external_egress(SerializationEncoding::Capnp);
let greeks = option_greeks();
publish_option_greeks("data.option_greeks.TEST".into(), &greeks);
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].topic, "data.option_greeks.TEST");
let reader = capnp::serialize::read_message(
&mut &publications[0].payload[..],
capnp::message::ReaderOptions::new(),
)
.expect("Cap'n Proto payload must be readable");
let root = reader
.get_root::<market_capnp::option_greeks::Reader>()
.expect("Cap'n Proto payload must have an OptionGreeks root");
let decoded = OptionGreeks::from_capnp(root)
.expect("Cap'n Proto payload must decode as OptionGreeks");
assert_eq!(decoded, greeks);
drop(publications);
reset_message_bus();
}
#[cfg(not(feature = "capnp"))]
#[rstest]
fn publish_quote_capnp_without_feature_drops_payload() {
let publications = install_capturing_external_egress(SerializationEncoding::Capnp);
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
assert!(publications.borrow().is_empty());
reset_message_bus();
}
#[rstest]
fn publish_quote_external_egress_respects_filter_and_suppress_guard() {
let publications = install_capturing_external_egress(SerializationEncoding::MsgPack);
let quote = QuoteTick::default();
get_message_bus()
.borrow_mut()
.set_types_filter(vec!["QuoteTick".to_string()]);
publish_quote("data.quotes.FILTERED".into(), "e);
get_message_bus().borrow_mut().set_types_filter(Vec::new());
{
let _guard = SuppressExternalGuard::new();
publish_quote("data.quotes.SUPPRESSED".into(), "e);
}
publish_quote("data.quotes.PUBLISHED".into(), "e);
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].topic, "data.quotes.PUBLISHED");
drop(publications);
reset_message_bus();
}
#[rstest]
fn publish_quote_uses_market_data_encoding_override() {
let publications = install_capturing_external_egress_config(&MessageBusConfig {
encoding: SerializationEncoding::Json,
encoding_market_data: Some(SerializationEncoding::MsgPack),
..Default::default()
});
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].encoding, SerializationEncoding::MsgPack);
assert_eq!(
rmp_serde::from_slice::<QuoteTick>(&publications[0].payload)
.expect("MsgPack payload must decode as QuoteTick"),
quote
);
drop(publications);
reset_message_bus();
}
#[rstest]
fn publish_custom_data_forwards_envelope_to_external_egress_and_respects_filter() {
let publications = install_capturing_external_egress(SerializationEncoding::Json);
let custom = stub_custom_data(100, 42, None, Some("stub-id".to_string()));
publish_any("data.custom.StubCustomData".into(), &custom);
get_message_bus()
.borrow_mut()
.set_types_filter(vec!["StubCustomData".to_string()]);
publish_any("data.custom.FILTERED".into(), &custom);
let publications = publications.borrow();
assert_eq!(publications.len(), 1);
assert_eq!(publications[0].topic, "data.custom.StubCustomData");
let payload_value: serde_json::Value = serde_json::from_slice(&publications[0].payload)
.expect("JSON payload must decode as a CustomData envelope");
assert_eq!(
payload_value.get("type").and_then(|value| value.as_str()),
Some("StubCustomData")
);
assert_eq!(
payload_value
.pointer("/data_type/type_name")
.and_then(|value| value.as_str()),
Some("StubCustomData")
);
assert_eq!(
payload_value
.pointer("/data_type/identifier")
.and_then(|value| value.as_str()),
Some("stub-id")
);
assert_eq!(
payload_value
.pointer("/payload/value")
.and_then(serde_json::Value::as_i64),
Some(42)
);
drop(publications);
reset_message_bus();
}
#[rstest]
fn test_typed_quote_publish_subscribe_integration() {
let msgbus = get_message_bus();
let pub_count = msgbus.borrow().pub_count();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |quote: &QuoteTick| {
received_clone.borrow_mut().push(*quote);
});
subscribe_quotes("data.quotes.*".into(), handler, None);
let quote = QuoteTick::default();
publish_quote("data.quotes.TEST".into(), "e);
publish_quote("data.quotes.TEST".into(), "e);
assert_eq!(received.borrow().len(), 2);
assert_eq!(msgbus.borrow().pub_count(), pub_count + 2);
}
#[rstest]
fn test_typed_trade_publish_subscribe_integration() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |trade: &TradeTick| {
received_clone.borrow_mut().push(*trade);
});
subscribe_trades("data.trades.*".into(), handler, None);
let trade = TradeTick::default();
publish_trade("data.trades.TEST".into(), &trade);
assert_eq!(received.borrow().len(), 1);
}
#[rstest]
fn test_typed_bar_publish_subscribe_integration() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |bar: &Bar| {
received_clone.borrow_mut().push(*bar);
});
subscribe_bars("data.bars.*".into(), handler, None);
let bar = Bar::default();
publish_bar("data.bars.TEST".into(), &bar);
assert_eq!(received.borrow().len(), 1);
}
#[rstest]
fn test_typed_deltas_publish_subscribe_integration() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
received_clone.borrow_mut().push(deltas.clone());
});
subscribe_book_deltas("data.book.deltas.*".into(), handler, None);
let instrument_id = InstrumentId::from("TEST.VENUE");
let delta = OrderBookDelta::clear(instrument_id, 0, 1.into(), 2.into());
let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
publish_deltas("data.book.deltas.TEST".into(), &deltas);
assert_eq!(received.borrow().len(), 1);
}
#[rstest]
fn test_typed_unsubscribe_stops_delivery() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from_with_id("unsub-test", move |quote: &QuoteTick| {
received_clone.borrow_mut().push(*quote);
});
subscribe_quotes("data.quotes.UNSUB".into(), handler.clone(), None);
let quote = QuoteTick::default();
publish_quote("data.quotes.UNSUB".into(), "e);
assert_eq!(received.borrow().len(), 1);
unsubscribe_quotes("data.quotes.UNSUB".into(), &handler);
publish_quote("data.quotes.UNSUB".into(), "e);
assert_eq!(received.borrow().len(), 1);
}
#[rstest]
fn test_typed_wildcard_pattern_matching() {
let _msgbus = get_message_bus();
let received = Rc::new(RefCell::new(Vec::new()));
let received_clone = received.clone();
let handler = TypedHandler::from(move |quote: &QuoteTick| {
received_clone.borrow_mut().push(*quote);
});
subscribe_quotes("data.quotes.WILD.*".into(), handler, None);
let quote = QuoteTick::default();
publish_quote("data.quotes.WILD.AAPL".into(), "e);
publish_quote("data.quotes.WILD.MSFT".into(), "e);
publish_quote("data.quotes.OTHER.AAPL".into(), "e);
assert_eq!(received.borrow().len(), 2);
}
#[rstest]
fn test_typed_priority_ordering() {
let _msgbus = get_message_bus();
let order = Rc::new(RefCell::new(Vec::new()));
let order1 = order.clone();
let handler_low = TypedHandler::from_with_id("low-priority", move |_: &QuoteTick| {
order1.borrow_mut().push("low");
});
let order2 = order.clone();
let handler_high = TypedHandler::from_with_id("high-priority", move |_: &QuoteTick| {
order2.borrow_mut().push("high");
});
subscribe_quotes("data.quotes.PRIO.*".into(), handler_low, Some(1));
subscribe_quotes("data.quotes.PRIO.*".into(), handler_high, Some(10));
let quote = QuoteTick::default();
publish_quote("data.quotes.PRIO.TEST".into(), "e);
assert_eq!(*order.borrow(), vec!["high", "low"]);
}
#[rstest]
fn test_typed_routing_isolation() {
let _msgbus = get_message_bus();
let quote_received = Rc::new(RefCell::new(false));
let trade_received = Rc::new(RefCell::new(false));
let qr = quote_received.clone();
let quote_handler = TypedHandler::from(move |_: &QuoteTick| {
*qr.borrow_mut() = true;
});
let tr = trade_received.clone();
let trade_handler = TypedHandler::from(move |_: &TradeTick| {
*tr.borrow_mut() = true;
});
subscribe_quotes("data.iso.*".into(), quote_handler, None);
subscribe_trades("data.iso.*".into(), trade_handler, None);
let quote = QuoteTick::default();
publish_quote("data.iso.TEST".into(), "e);
assert!(*quote_received.borrow());
assert!(!*trade_received.borrow());
}
#[rstest]
fn test_send_data_allows_reentrant_topic_access() {
use crate::msgbus::switchboard::get_quotes_topic;
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |data: Data| {
let instrument_id = data.instrument_id();
let _topic = get_quotes_topic(instrument_id);
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.data".into();
register_data_endpoint(endpoint, handler);
let quote = QuoteTick::default();
send_data(endpoint, Data::Quote(quote));
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_trading_command_allows_reentrant_topic_access() {
use nautilus_model::{
enums::OrderSide,
identifiers::{StrategyId, TraderId},
};
use crate::{
messages::execution::{TradingCommand, cancel::CancelAllOrders},
msgbus::switchboard::get_trades_topic,
};
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |cmd: TradingCommand| {
let instrument_id = cmd.instrument_id();
let _topic = get_trades_topic(instrument_id);
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.tradingCmd".into();
register_trading_command_endpoint(endpoint, handler);
let cmd = TradingCommand::CancelAllOrders(CancelAllOrders::new(
TraderId::new("TESTER-001"),
None,
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
OrderSide::NoOrderSide,
UUID4::new(),
0.into(),
None,
None, ));
send_trading_command(endpoint, cmd);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_account_state_allows_reentrant_topic_access() {
use nautilus_model::{enums::AccountType, identifiers::AccountId, types::Currency};
use crate::msgbus::switchboard::get_quotes_topic;
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedHandler::from(move |_state: &AccountState| {
let instrument_id = InstrumentId::from("TEST.VENUE");
let _topic = get_quotes_topic(instrument_id);
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.accountState".into();
register_account_state_endpoint(endpoint, handler);
let state = AccountState::new(
AccountId::new("SIM-001"),
AccountType::Cash,
vec![],
vec![],
true,
UUID4::new(),
0.into(),
0.into(),
Some(Currency::USD()),
);
send_account_state(endpoint, &state);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_order_event_allows_reentrant_topic_access() {
use crate::msgbus::switchboard::get_quotes_topic;
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
let instrument_id = InstrumentId::from("TEST.VENUE");
let _topic = get_quotes_topic(instrument_id);
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.orderEvent".into();
register_order_event_endpoint(endpoint, handler);
let event = OrderEventAny::Denied(OrderDeniedSpec::builder().build());
send_order_event(endpoint, event);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_data_command_allows_reentrant_topic_access() {
use crate::msgbus::switchboard::get_trades_topic;
let msgbus = get_message_bus();
let sent_count = msgbus.borrow().sent_count();
let req_count = msgbus.borrow().req_count();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd".into();
register_data_command_endpoint(endpoint, handler);
let cmd = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
InstrumentId::from("TEST.VENUE"),
Some(ClientId::new("SIM")),
None,
UUID4::new(),
0.into(),
None,
None,
)));
send_data_command(endpoint, cmd);
assert!(*topic_retrieved.borrow());
assert_eq!(msgbus.borrow().sent_count(), sent_count + 1);
assert_eq!(msgbus.borrow().req_count(), req_count);
let request = DataCommand::Request(RequestCommand::Quotes(RequestQuotes::new(
InstrumentId::from("TEST.VENUE"),
None,
None,
None,
Some(ClientId::new("SIM")),
UUID4::new(),
0.into(),
None,
)));
send_data_command(endpoint, request);
assert_eq!(msgbus.borrow().sent_count(), sent_count + 2);
assert_eq!(msgbus.borrow().req_count(), req_count + 1);
}
#[rstest]
fn test_send_data_request_without_endpoint_does_not_increment_counts() {
let msgbus = get_message_bus();
let sent_count = msgbus.borrow().sent_count();
let req_count = msgbus.borrow().req_count();
let request = DataCommand::Request(RequestCommand::Quotes(RequestQuotes::new(
InstrumentId::from("MISSING.VENUE"),
None,
None,
None,
Some(ClientId::new("SIM")),
UUID4::new(),
0.into(),
None,
)));
send_data_command("Missing.dataCmd".into(), request);
assert_eq!(msgbus.borrow().sent_count(), sent_count);
assert_eq!(msgbus.borrow().req_count(), req_count);
}
#[rstest]
fn test_send_data_response_allows_reentrant_topic_access() {
use nautilus_model::identifiers::ClientId;
use crate::{
messages::data::{DataResponse, QuotesResponse},
msgbus::switchboard::get_quotes_topic,
};
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |_resp: DataResponse| {
let _topic = get_quotes_topic(InstrumentId::from("TEST.VENUE"));
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.dataResp".into();
register_data_response_endpoint(endpoint, handler);
let resp = DataResponse::Quotes(QuotesResponse {
correlation_id: UUID4::new(),
client_id: ClientId::new("SIM"),
instrument_id: InstrumentId::from("TEST.VENUE"),
data: vec![],
start: None,
end: None,
ts_init: 0.into(),
params: None,
});
send_data_response(endpoint, resp);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_send_response_increments_response_count() {
use nautilus_model::identifiers::ClientId;
use crate::messages::data::{DataResponse, QuotesResponse};
let msgbus = get_message_bus();
let res_count = msgbus.borrow().res_count();
let resp = DataResponse::Quotes(QuotesResponse {
correlation_id: UUID4::new(),
client_id: ClientId::new("SIM"),
instrument_id: InstrumentId::from("TEST.VENUE"),
data: vec![],
start: None,
end: None,
ts_init: 0.into(),
params: None,
});
send_response(&UUID4::new(), &resp);
assert_eq!(msgbus.borrow().res_count(), res_count + 1);
}
#[rstest]
fn test_send_execution_report_allows_reentrant_topic_access() {
use nautilus_model::{
identifiers::{AccountId, ClientId, Venue},
reports::ExecutionMassStatus,
};
use crate::{messages::execution::ExecutionReport, msgbus::switchboard::get_trades_topic};
let _msgbus = get_message_bus();
let topic_retrieved = Rc::new(RefCell::new(false));
let topic_clone = topic_retrieved.clone();
let handler = TypedIntoHandler::from(move |_report: ExecutionReport| {
let _topic = get_trades_topic(InstrumentId::from("TEST.VENUE"));
*topic_clone.borrow_mut() = true;
});
let endpoint: MStr<Endpoint> = "ReentrantTest.execReport".into();
register_execution_report_endpoint(endpoint, handler);
let report = ExecutionReport::MassStatus(Box::new(ExecutionMassStatus::new(
ClientId::new("SIM"),
AccountId::new("SIM-001"),
Venue::new("TEST"),
0.into(),
None,
)));
send_execution_report(endpoint, report);
assert!(*topic_retrieved.borrow());
}
#[rstest]
fn test_order_event_handler_can_send_trading_command() {
let _msgbus = get_message_bus();
let command_sent = Rc::new(RefCell::new(false));
let command_sent_clone = command_sent.clone();
let cmd_received = Rc::new(RefCell::new(false));
let cmd_received_clone = cmd_received.clone();
let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
*cmd_received_clone.borrow_mut() = true;
});
let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd".into();
register_trading_command_endpoint(cmd_endpoint, cmd_handler);
let event_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
TraderId::new("TESTER-001"),
None,
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
OrderSide::Buy,
UUID4::new(),
0.into(),
None,
None, ));
send_trading_command(cmd_endpoint, command);
*command_sent_clone.borrow_mut() = true;
});
let event_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt".into();
register_order_event_endpoint(event_endpoint, event_handler);
let event = OrderEventAny::Denied(OrderDeniedSpec::builder().build());
send_order_event(event_endpoint, event);
assert!(
*command_sent.borrow(),
"Order event handler should have run"
);
assert!(
*cmd_received.borrow(),
"Trading command should have been received"
);
}
#[rstest]
fn test_data_handler_can_send_data_command() {
let _msgbus = get_message_bus();
let command_sent = Rc::new(RefCell::new(false));
let command_sent_clone = command_sent.clone();
let cmd_received = Rc::new(RefCell::new(false));
let cmd_received_clone = cmd_received.clone();
let cmd_handler = TypedIntoHandler::from(move |_cmd: DataCommand| {
*cmd_received_clone.borrow_mut() = true;
});
let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.dataCmd2".into();
register_data_command_endpoint(cmd_endpoint, cmd_handler);
let data_handler = TypedIntoHandler::from(move |_data: Data| {
let command = DataCommand::Subscribe(SubscribeCommand::Quotes(SubscribeQuotes::new(
InstrumentId::from("TEST.VENUE"),
Some(ClientId::new("SIM")),
None,
UUID4::new(),
0.into(),
None,
None,
)));
send_data_command(cmd_endpoint, command);
*command_sent_clone.borrow_mut() = true;
});
let data_endpoint: MStr<Endpoint> = "ReentrantTest.data2".into();
register_data_endpoint(data_endpoint, data_handler);
let quote = QuoteTick::default();
send_data(data_endpoint, Data::Quote(quote));
assert!(*command_sent.borrow(), "Data handler should have run");
assert!(
*cmd_received.borrow(),
"Data command should have been received"
);
}
#[rstest]
fn test_trading_command_handler_can_send_order_event() {
let _msgbus = get_message_bus();
let event_sent = Rc::new(RefCell::new(false));
let event_sent_clone = event_sent.clone();
let evt_received = Rc::new(RefCell::new(false));
let evt_received_clone = evt_received.clone();
let evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
*evt_received_clone.borrow_mut() = true;
});
let evt_endpoint: MStr<Endpoint> = "ReentrantTest.orderEvt2".into();
register_order_event_endpoint(evt_endpoint, evt_handler);
let cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
let event = OrderEventAny::Denied(OrderDeniedSpec::builder().build());
send_order_event(evt_endpoint, event);
*event_sent_clone.borrow_mut() = true;
});
let cmd_endpoint: MStr<Endpoint> = "ReentrantTest.execCmd2".into();
register_trading_command_endpoint(cmd_endpoint, cmd_handler);
let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
TraderId::new("TESTER-001"),
None,
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
OrderSide::Buy,
UUID4::new(),
0.into(),
None,
None, ));
send_trading_command(cmd_endpoint, command);
assert!(
*event_sent.borrow(),
"Trading command handler should have run"
);
assert!(
*evt_received.borrow(),
"Order event should have been received"
);
}
#[rstest]
fn test_nested_reentrant_calls() {
let _msgbus = get_message_bus();
let call_depth = Rc::new(RefCell::new(0u32));
let final_received = Rc::new(RefCell::new(false));
let final_received_clone = final_received.clone();
let final_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
*final_received_clone.borrow_mut() = true;
});
let final_evt_endpoint: MStr<Endpoint> = "ReentrantTest.finalEvt".into();
register_order_event_endpoint(final_evt_endpoint, final_evt_handler);
let call_depth_clone2 = call_depth.clone();
let mid_cmd_handler = TypedIntoHandler::from(move |_cmd: TradingCommand| {
*call_depth_clone2.borrow_mut() += 1;
let event = OrderEventAny::Denied(OrderDeniedSpec::builder().build());
send_order_event(final_evt_endpoint, event);
});
let mid_cmd_endpoint: MStr<Endpoint> = "ReentrantTest.midCmd".into();
register_trading_command_endpoint(mid_cmd_endpoint, mid_cmd_handler);
let call_depth_clone1 = call_depth.clone();
let init_evt_handler = TypedIntoHandler::from(move |_event: OrderEventAny| {
*call_depth_clone1.borrow_mut() += 1;
let command = TradingCommand::CancelAllOrders(CancelAllOrders::new(
TraderId::new("TESTER-001"),
None,
StrategyId::new("S-001"),
InstrumentId::from("TEST.VENUE"),
OrderSide::Buy,
UUID4::new(),
0.into(),
None,
None, ));
send_trading_command(mid_cmd_endpoint, command);
});
let init_evt_endpoint: MStr<Endpoint> = "ReentrantTest.initEvt".into();
register_order_event_endpoint(init_evt_endpoint, init_evt_handler);
let event = OrderEventAny::Denied(OrderDeniedSpec::builder().build());
send_order_event(init_evt_endpoint, event);
assert_eq!(
*call_depth.borrow(),
2,
"Both intermediate handlers should have run"
);
assert!(
*final_received.borrow(),
"Final event handler should have received the event"
);
}
#[derive(Default)]
struct RecordingTap {
publishes: RefCell<Vec<(String, std::any::TypeId)>>,
sends: RefCell<Vec<(String, std::any::TypeId)>>,
responses: RefCell<Vec<(UUID4, std::any::TypeId)>>,
}
impl RecordingTap {
fn publish_topics(&self) -> Vec<String> {
self.publishes
.borrow()
.iter()
.map(|(t, _)| t.clone())
.collect()
}
fn send_endpoints(&self) -> Vec<String> {
self.sends.borrow().iter().map(|(e, _)| e.clone()).collect()
}
fn response_correlation_ids(&self) -> Vec<UUID4> {
self.responses.borrow().iter().map(|(id, _)| *id).collect()
}
}
impl BusTap for RecordingTap {
fn on_publish(&self, topic: MStr<Topic>, message: &dyn std::any::Any) {
self.publishes
.borrow_mut()
.push((topic.to_string(), message.type_id()));
}
fn on_send(&self, endpoint: MStr<Endpoint>, message: &dyn std::any::Any) {
self.sends
.borrow_mut()
.push((endpoint.to_string(), message.type_id()));
}
fn on_response(&self, correlation_id: &UUID4, message: &dyn std::any::Any) {
self.responses
.borrow_mut()
.push((*correlation_id, message.type_id()));
}
}
#[rstest]
fn try_publish_any_dispatches_handler_and_tap() {
let msgbus = Rc::new(RefCell::new(MessageBus::default()));
set_message_bus(msgbus.clone());
clear_bus_tap();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
let topic = "data.any.try.test";
let (handler, checker) = get_call_check_handler(None);
let pub_count = msgbus.borrow().pub_count();
subscribe_any(topic.into(), handler, None);
let payload: u32 = 42;
let published = try_publish_any(topic.into(), &payload);
clear_bus_tap();
assert!(published);
assert!(checker.was_called());
assert_eq!(msgbus.borrow().pub_count(), pub_count + 1);
assert_eq!(tap.publish_topics(), vec![topic]);
}
#[rstest]
fn try_publish_any_without_registered_bus_returns_false() {
let published = thread::spawn(|| {
let payload: u32 = 42;
try_publish_any("data.any.no-bus.test".into(), &payload)
})
.join()
.expect("thread should join");
assert!(!published);
}
#[rstest]
fn try_publish_any_with_borrowed_bus_returns_false_without_tap() {
let msgbus = Rc::new(RefCell::new(MessageBus::default()));
set_message_bus(msgbus.clone());
clear_bus_tap();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
let bus_borrow = msgbus.borrow_mut();
let payload: u32 = 42;
let published = try_publish_any("data.any.borrowed.test".into(), &payload);
drop(bus_borrow);
clear_bus_tap();
assert!(!published);
assert_eq!(msgbus.borrow().pub_count(), 0);
assert!(tap.publish_topics().is_empty());
}
#[rstest]
fn set_bus_tap_then_publish_typed_invokes_tap() {
let _msgbus = get_message_bus();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
let quote = QuoteTick::default();
publish_quote("data.quotes.tap.test".into(), "e);
clear_bus_tap();
assert_eq!(tap.publish_topics(), vec!["data.quotes.tap.test"]);
}
#[rstest]
fn set_bus_tap_then_publish_any_invokes_tap() {
let _msgbus = get_message_bus();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
let payload: u32 = 42;
publish_any("data.any.tap.test".into(), &payload);
clear_bus_tap();
assert_eq!(tap.publish_topics(), vec!["data.any.tap.test"]);
}
#[rstest]
fn set_bus_tap_then_send_any_value_invokes_tap() {
let _msgbus = get_message_bus();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
let payload: u32 = 7;
send_any_value("endpoint.send.any.value.test".into(), &payload);
clear_bus_tap();
assert_eq!(tap.send_endpoints(), vec!["endpoint.send.any.value.test"],);
}
#[rstest]
fn set_bus_tap_then_send_endpoint_owned_invokes_tap() {
let _msgbus = get_message_bus();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
let cancel_all = CancelAllOrders::new(
TraderId::from("TRADER-001"),
Some(ClientId::from("BINANCE")),
StrategyId::from("S-001"),
InstrumentId::from("ETHUSDT-PERP.BINANCE"),
OrderSide::Buy,
UUID4::new(),
nautilus_core::UnixNanos::from(1),
None,
None, );
send_trading_command(
"endpoint.send.trading.command.test".into(),
TradingCommand::CancelAllOrders(cancel_all),
);
clear_bus_tap();
assert_eq!(
tap.send_endpoints(),
vec!["endpoint.send.trading.command.test"],
);
}
#[rstest]
fn set_bus_tap_then_send_endpoint_ref_invokes_tap() {
let _msgbus = get_message_bus();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
let quote = QuoteTick::default();
send_quote("endpoint.send.quote.test".into(), "e);
clear_bus_tap();
assert_eq!(tap.send_endpoints(), vec!["endpoint.send.quote.test"]);
}
#[rstest]
fn has_quote_endpoint_returns_registration_state() {
let _msgbus = get_message_bus();
let endpoint: MStr<Endpoint> = "endpoint.has.quote.registered".into();
assert!(!has_quote_endpoint(endpoint));
let handler = TypedHandler::from_with_id(endpoint, |_quote: &QuoteTick| {});
register_quote_endpoint(endpoint, handler);
assert!(has_quote_endpoint(endpoint));
}
#[rstest]
fn set_bus_tap_then_send_response_invokes_tap() {
let _msgbus = get_message_bus();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
let correlation_id = UUID4::new();
let handler_called = Rc::new(RefCell::new(false));
let handler_called_clone = handler_called.clone();
register_response_handler(
&correlation_id,
ShareableMessageHandler::from_typed(move |_resp: &QuotesResponse| {
*handler_called_clone.borrow_mut() = true;
}),
);
let response = DataResponse::Quotes(QuotesResponse {
correlation_id,
client_id: ClientId::new("SIM"),
instrument_id: InstrumentId::from("TEST.VENUE"),
data: vec![],
start: None,
end: None,
ts_init: 0.into(),
params: None,
});
send_response(&correlation_id, &response);
clear_bus_tap();
assert!(*handler_called.borrow());
assert_eq!(tap.response_correlation_ids(), vec![correlation_id]);
}
#[rstest]
fn clear_bus_tap_prevents_subsequent_dispatches_from_invoking_tap() {
let _msgbus = get_message_bus();
let tap = Rc::new(RecordingTap::default());
set_bus_tap(tap.clone());
clear_bus_tap();
let quote = QuoteTick::default();
publish_quote("data.quotes.after.clear".into(), "e);
send_quote("endpoint.send.after.clear".into(), "e);
let correlation_id = UUID4::new();
register_response_handler(
&correlation_id,
ShareableMessageHandler::from_typed(|_resp: &QuotesResponse| {}),
);
let response = DataResponse::Quotes(QuotesResponse {
correlation_id,
client_id: ClientId::new("SIM"),
instrument_id: InstrumentId::from("TEST.VENUE"),
data: vec![],
start: None,
end: None,
ts_init: 0.into(),
params: None,
});
send_response(&correlation_id, &response);
assert!(tap.publish_topics().is_empty());
assert!(tap.send_endpoints().is_empty());
assert!(tap.response_correlation_ids().is_empty());
}
#[rstest]
fn dispatch_with_no_tap_installed_is_a_noop() {
let _msgbus = get_message_bus();
let quote = QuoteTick::default();
publish_quote("data.quotes.no.tap".into(), "e);
send_quote("endpoint.no.tap".into(), "e);
}
struct ReinstallTap;
impl BusTap for ReinstallTap {
fn on_publish(&self, _topic: MStr<Topic>, _message: &dyn std::any::Any) {
set_bus_tap(Rc::new(NoopTap));
}
fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn std::any::Any) {}
}
struct NoopTap;
impl BusTap for NoopTap {
fn on_publish(&self, _topic: MStr<Topic>, _message: &dyn std::any::Any) {}
fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn std::any::Any) {}
}
#[rstest]
fn reentrant_set_bus_tap_during_dispatch_does_not_panic() {
let _msgbus = get_message_bus();
set_bus_tap(Rc::new(ReinstallTap));
let quote = QuoteTick::default();
publish_quote("data.quotes.reentrant".into(), "e);
clear_bus_tap();
}
}