use std::{
any::{Any, TypeId},
cell::RefCell,
collections::HashMap,
hash::{Hash, Hasher},
rc::Rc,
};
use ahash::{AHashMap, AHashSet};
use indexmap::IndexMap;
use nautilus_core::{UUID4, correctness::FAILED};
use nautilus_model::{
data::{
Bar, Data, FundingRateUpdate, GreeksData, IndexPriceUpdate, MarkPriceUpdate,
OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
option_chain::{OptionChainSlice, OptionGreeks},
},
events::{AccountState, OrderEventAny, PositionEvent},
identifiers::TraderId,
orderbook::OrderBook,
orders::OrderAny,
position::Position,
};
use smallvec::SmallVec;
use ustr::Ustr;
use super::{
ShareableMessageHandler,
matching::is_matching_backtracking,
mstr::{Endpoint, MStr, Pattern, Topic},
set_message_bus,
switchboard::MessagingSwitchboard,
typed_endpoints::{EndpointMap, IntoEndpointMap},
typed_router::TopicRouter,
};
use crate::messages::{
data::{DataCommand, DataResponse},
execution::{ExecutionReport, TradingCommand},
};
#[derive(Clone, Debug)]
pub struct Subscription {
pub handler: ShareableMessageHandler,
pub handler_id: Ustr,
pub pattern: MStr<Pattern>,
pub priority: u8,
}
impl Subscription {
#[must_use]
pub fn new(
pattern: MStr<Pattern>,
handler: ShareableMessageHandler,
priority: Option<u8>,
) -> Self {
Self {
handler_id: handler.0.id(),
pattern,
handler,
priority: priority.unwrap_or(0),
}
}
}
impl PartialEq<Self> for Subscription {
fn eq(&self, other: &Self) -> bool {
self.pattern == other.pattern && self.handler_id == other.handler_id
}
}
impl Eq for Subscription {}
impl PartialOrd for Subscription {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Subscription {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other
.priority
.cmp(&self.priority)
.then_with(|| self.pattern.cmp(&other.pattern))
.then_with(|| self.handler_id.cmp(&other.handler_id))
}
}
impl Hash for Subscription {
fn hash<H: Hasher>(&self, state: &mut H) {
self.pattern.hash(state);
self.handler_id.hash(state);
}
}
#[derive(Debug)]
pub struct MessageBus {
pub trader_id: TraderId,
pub instance_id: UUID4,
pub name: String,
pub has_backing: bool,
pub(crate) switchboard: MessagingSwitchboard,
pub(crate) subscriptions: AHashSet<Subscription>,
pub(crate) topics: IndexMap<MStr<Topic>, Vec<Subscription>>,
pub(crate) endpoints: IndexMap<MStr<Endpoint>, ShareableMessageHandler>,
pub(crate) correlation_index: AHashMap<UUID4, ShareableMessageHandler>,
pub(crate) router_quotes: TopicRouter<QuoteTick>,
pub(crate) router_trades: TopicRouter<TradeTick>,
pub(crate) router_bars: TopicRouter<Bar>,
pub(crate) router_deltas: TopicRouter<OrderBookDeltas>,
pub(crate) router_depth10: TopicRouter<OrderBookDepth10>,
pub(crate) router_book_snapshots: TopicRouter<OrderBook>,
pub(crate) router_mark_prices: TopicRouter<MarkPriceUpdate>,
pub(crate) router_index_prices: TopicRouter<IndexPriceUpdate>,
pub(crate) router_funding_rates: TopicRouter<FundingRateUpdate>,
pub(crate) router_order_events: TopicRouter<OrderEventAny>,
pub(crate) router_position_events: TopicRouter<PositionEvent>,
pub(crate) router_account_state: TopicRouter<AccountState>,
pub(crate) router_orders: TopicRouter<OrderAny>,
pub(crate) router_positions: TopicRouter<Position>,
pub(crate) router_greeks: TopicRouter<GreeksData>,
pub(crate) router_option_greeks: TopicRouter<OptionGreeks>,
pub(crate) router_option_chain: TopicRouter<OptionChainSlice>,
#[cfg(feature = "defi")]
pub(crate) router_defi_blocks: TopicRouter<nautilus_model::defi::Block>, #[cfg(feature = "defi")]
pub(crate) router_defi_pools: TopicRouter<nautilus_model::defi::Pool>, #[cfg(feature = "defi")]
pub(crate) router_defi_swaps: TopicRouter<nautilus_model::defi::PoolSwap>, #[cfg(feature = "defi")]
pub(crate) router_defi_liquidity: TopicRouter<nautilus_model::defi::PoolLiquidityUpdate>, #[cfg(feature = "defi")]
pub(crate) router_defi_collects: TopicRouter<nautilus_model::defi::PoolFeeCollect>, #[cfg(feature = "defi")]
pub(crate) router_defi_flash: TopicRouter<nautilus_model::defi::PoolFlash>, #[cfg(feature = "defi")]
pub(crate) endpoints_defi_data: IntoEndpointMap<nautilus_model::defi::DefiData>, pub(crate) endpoints_quotes: EndpointMap<QuoteTick>,
pub(crate) endpoints_trades: EndpointMap<TradeTick>,
pub(crate) endpoints_bars: EndpointMap<Bar>,
pub(crate) endpoints_account_state: EndpointMap<AccountState>,
pub(crate) endpoints_trading_commands: IntoEndpointMap<TradingCommand>,
pub(crate) endpoints_data_commands: IntoEndpointMap<DataCommand>,
pub(crate) endpoints_data_responses: IntoEndpointMap<DataResponse>,
pub(crate) endpoints_exec_reports: IntoEndpointMap<ExecutionReport>,
pub(crate) endpoints_order_events: IntoEndpointMap<OrderEventAny>,
pub(crate) endpoints_data: IntoEndpointMap<Data>,
routers_typed: AHashMap<TypeId, Box<dyn Any>>,
endpoints_typed: AHashMap<TypeId, Box<dyn Any>>,
}
impl Default for MessageBus {
fn default() -> Self {
Self::new(TraderId::from("TRADER-001"), UUID4::new(), None, None)
}
}
impl MessageBus {
#[must_use]
pub fn new(
trader_id: TraderId,
instance_id: UUID4,
name: Option<String>,
_config: Option<HashMap<String, serde_json::Value>>,
) -> Self {
Self {
trader_id,
instance_id,
name: name.unwrap_or(stringify!(MessageBus).to_owned()),
switchboard: MessagingSwitchboard::default(),
subscriptions: AHashSet::new(),
topics: IndexMap::new(),
endpoints: IndexMap::new(),
correlation_index: AHashMap::new(),
has_backing: false,
router_quotes: TopicRouter::new(),
router_trades: TopicRouter::new(),
router_bars: TopicRouter::new(),
router_deltas: TopicRouter::new(),
router_depth10: TopicRouter::new(),
router_book_snapshots: TopicRouter::new(),
router_mark_prices: TopicRouter::new(),
router_index_prices: TopicRouter::new(),
router_funding_rates: TopicRouter::new(),
router_order_events: TopicRouter::new(),
router_position_events: TopicRouter::new(),
router_account_state: TopicRouter::new(),
router_orders: TopicRouter::new(),
router_positions: TopicRouter::new(),
router_greeks: TopicRouter::new(),
router_option_greeks: TopicRouter::new(),
router_option_chain: TopicRouter::new(),
#[cfg(feature = "defi")]
router_defi_blocks: TopicRouter::new(),
#[cfg(feature = "defi")]
router_defi_pools: TopicRouter::new(),
#[cfg(feature = "defi")]
router_defi_swaps: TopicRouter::new(),
#[cfg(feature = "defi")]
router_defi_liquidity: TopicRouter::new(),
#[cfg(feature = "defi")]
router_defi_collects: TopicRouter::new(),
#[cfg(feature = "defi")]
router_defi_flash: TopicRouter::new(),
#[cfg(feature = "defi")]
endpoints_defi_data: IntoEndpointMap::new(),
endpoints_quotes: EndpointMap::new(),
endpoints_trades: EndpointMap::new(),
endpoints_bars: EndpointMap::new(),
endpoints_account_state: EndpointMap::new(),
endpoints_trading_commands: IntoEndpointMap::new(),
endpoints_data_commands: IntoEndpointMap::new(),
endpoints_data_responses: IntoEndpointMap::new(),
endpoints_exec_reports: IntoEndpointMap::new(),
endpoints_order_events: IntoEndpointMap::new(),
endpoints_data: IntoEndpointMap::new(),
routers_typed: AHashMap::new(),
endpoints_typed: AHashMap::new(),
}
}
pub fn register_message_bus(self) -> Rc<RefCell<Self>> {
let msgbus = Rc::new(RefCell::new(self));
set_message_bus(msgbus.clone());
msgbus
}
pub fn router<T: 'static>(&mut self) -> &mut TopicRouter<T> {
self.routers_typed
.entry(TypeId::of::<T>())
.or_insert_with(|| Box::new(TopicRouter::<T>::new()))
.downcast_mut::<TopicRouter<T>>()
.expect("TopicRouter type mismatch - this is a bug")
}
pub fn endpoint_map<T: 'static>(&mut self) -> &mut EndpointMap<T> {
self.endpoints_typed
.entry(TypeId::of::<T>())
.or_insert_with(|| Box::new(EndpointMap::<T>::new()))
.downcast_mut::<EndpointMap<T>>()
.expect("EndpointMap type mismatch - this is a bug")
}
pub fn dispose(&mut self) {
self.subscriptions.clear();
self.topics.clear();
self.endpoints.clear();
self.correlation_index.clear();
self.router_quotes.clear();
self.router_trades.clear();
self.router_bars.clear();
self.router_deltas.clear();
self.router_depth10.clear();
self.router_book_snapshots.clear();
self.router_mark_prices.clear();
self.router_index_prices.clear();
self.router_funding_rates.clear();
self.router_order_events.clear();
self.router_position_events.clear();
self.router_account_state.clear();
self.router_orders.clear();
self.router_positions.clear();
self.router_greeks.clear();
self.router_option_greeks.clear();
self.router_option_chain.clear();
#[cfg(feature = "defi")]
{
self.router_defi_blocks.clear();
self.router_defi_pools.clear();
self.router_defi_swaps.clear();
self.router_defi_liquidity.clear();
self.router_defi_collects.clear();
self.router_defi_flash.clear();
self.endpoints_defi_data.clear();
}
self.endpoints_quotes.clear();
self.endpoints_trades.clear();
self.endpoints_bars.clear();
self.endpoints_account_state.clear();
self.endpoints_trading_commands.clear();
self.endpoints_data_commands.clear();
self.endpoints_data_responses.clear();
self.endpoints_exec_reports.clear();
self.endpoints_order_events.clear();
self.endpoints_data.clear();
self.routers_typed.clear();
self.endpoints_typed.clear();
}
#[must_use]
pub fn mem_address(&self) -> String {
format!("{self:p}")
}
#[must_use]
pub fn switchboard(&self) -> &MessagingSwitchboard {
&self.switchboard
}
#[must_use]
pub fn endpoints(&self) -> Vec<&str> {
self.endpoints.iter().map(|e| e.0.as_str()).collect()
}
#[must_use]
pub fn patterns(&self) -> Vec<&str> {
self.subscriptions
.iter()
.map(|s| s.pattern.as_str())
.collect()
}
pub fn has_subscribers<T: AsRef<str>>(&self, topic: T) -> bool {
self.subscriptions_count(topic) > 0
}
#[must_use]
pub fn subscriptions_count<T: AsRef<str>>(&self, topic: T) -> usize {
let topic = MStr::<Topic>::topic(topic).expect(FAILED);
self.topics
.get(&topic)
.map_or_else(|| self.find_topic_matches(topic).len(), |subs| subs.len())
}
#[must_use]
pub fn subscriptions(&self) -> Vec<&Subscription> {
self.subscriptions.iter().collect()
}
#[must_use]
pub fn subscription_handler_ids(&self) -> Vec<&str> {
self.subscriptions
.iter()
.map(|s| s.handler_id.as_str())
.collect()
}
#[must_use]
pub fn is_registered<T: Into<MStr<Endpoint>>>(&self, endpoint: T) -> bool {
let endpoint: MStr<Endpoint> = endpoint.into();
self.endpoints.contains_key(&endpoint)
}
#[must_use]
pub fn is_subscribed<T: AsRef<str>>(
&self,
pattern: T,
handler: ShareableMessageHandler,
) -> bool {
let pattern = MStr::<Pattern>::pattern(pattern);
let sub = Subscription::new(pattern, handler, None);
self.subscriptions.contains(&sub)
}
pub const fn close(&self) -> anyhow::Result<()> {
Ok(())
}
#[must_use]
pub fn get_endpoint(&self, endpoint: MStr<Endpoint>) -> Option<&ShareableMessageHandler> {
self.endpoints.get(&endpoint)
}
#[must_use]
pub fn get_response_handler(&self, correlation_id: &UUID4) -> Option<&ShareableMessageHandler> {
self.correlation_index.get(correlation_id)
}
pub(crate) fn find_topic_matches(&self, topic: MStr<Topic>) -> Vec<Subscription> {
self.subscriptions
.iter()
.filter_map(|sub| {
if is_matching_backtracking(topic, sub.pattern) {
Some(sub.clone())
} else {
None
}
})
.collect()
}
#[must_use]
pub fn matching_subscriptions<T: Into<MStr<Topic>>>(&mut self, topic: T) -> Vec<Subscription> {
self.inner_matching_subscriptions(topic.into())
}
pub(crate) fn inner_matching_subscriptions(&mut self, topic: MStr<Topic>) -> Vec<Subscription> {
self.topics.get(&topic).cloned().unwrap_or_else(|| {
let mut matches = self.find_topic_matches(topic);
matches.sort();
self.topics.insert(topic, matches.clone());
matches
})
}
pub(crate) fn fill_matching_any_handlers(
&mut self,
topic: MStr<Topic>,
buf: &mut SmallVec<[ShareableMessageHandler; 64]>,
) {
if let Some(subs) = self.topics.get(&topic) {
for sub in subs {
buf.push(sub.handler.clone());
}
} else {
let mut matches = self.find_topic_matches(topic);
matches.sort();
for sub in &matches {
buf.push(sub.handler.clone());
}
self.topics.insert(topic, matches);
}
}
pub fn register_response_handler(
&mut self,
correlation_id: &UUID4,
handler: ShareableMessageHandler,
) -> anyhow::Result<()> {
if self.correlation_index.contains_key(correlation_id) {
anyhow::bail!("Correlation ID <{correlation_id}> already has a registered handler");
}
self.correlation_index.insert(*correlation_id, handler);
Ok(())
}
}
#[cfg(test)]
mod tests {
use rand::{RngExt, SeedableRng, rngs::StdRng};
use rstest::rstest;
use ustr::Ustr;
use super::*;
use crate::msgbus::{
self, ShareableMessageHandler, get_message_bus,
matching::is_matching_backtracking,
stubs::{get_call_check_handler, get_stub_shareable_handler},
subscriptions_count_any,
};
#[rstest]
fn test_new() {
let trader_id = TraderId::default();
let msgbus = MessageBus::new(trader_id, UUID4::new(), None, None);
assert_eq!(msgbus.trader_id, trader_id);
assert_eq!(msgbus.name, stringify!(MessageBus));
}
#[rstest]
fn test_endpoints_when_no_endpoints() {
let msgbus = get_message_bus();
assert!(msgbus.borrow().endpoints().is_empty());
}
#[rstest]
fn test_topics_when_no_subscriptions() {
let msgbus = get_message_bus();
assert!(msgbus.borrow().patterns().is_empty());
assert!(!msgbus.borrow().has_subscribers("my-topic"));
}
#[rstest]
fn test_is_subscribed_when_no_subscriptions() {
let msgbus = get_message_bus();
let handler = get_stub_shareable_handler(None);
assert!(!msgbus.borrow().is_subscribed("my-topic", handler));
}
#[rstest]
fn test_get_response_handler_when_no_handler() {
let msgbus = get_message_bus();
let msgbus_ref = msgbus.borrow();
let handler = msgbus_ref.get_response_handler(&UUID4::new());
assert!(handler.is_none());
}
#[rstest]
fn test_get_response_handler_when_already_registered() {
let msgbus = get_message_bus();
let mut msgbus_ref = msgbus.borrow_mut();
let handler = get_stub_shareable_handler(None);
let request_id = UUID4::new();
msgbus_ref
.register_response_handler(&request_id, handler.clone())
.unwrap();
let result = msgbus_ref.register_response_handler(&request_id, handler);
assert!(result.is_err());
}
#[rstest]
fn test_get_response_handler_when_registered() {
let msgbus = get_message_bus();
let mut msgbus_ref = msgbus.borrow_mut();
let handler = get_stub_shareable_handler(None);
let request_id = UUID4::new();
msgbus_ref
.register_response_handler(&request_id, handler)
.unwrap();
let handler = msgbus_ref.get_response_handler(&request_id).unwrap();
assert_eq!(handler.id(), handler.id());
}
#[rstest]
fn test_is_registered_when_no_registrations() {
let msgbus = get_message_bus();
assert!(!msgbus.borrow().is_registered("MyEndpoint"));
}
#[rstest]
fn test_register_endpoint() {
let msgbus = get_message_bus();
let endpoint = "MyEndpoint".into();
let handler = get_stub_shareable_handler(None);
msgbus::register_any(endpoint, handler);
assert_eq!(msgbus.borrow().endpoints(), vec![endpoint.to_string()]);
assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
}
#[rstest]
fn test_endpoint_send() {
let msgbus = get_message_bus();
let endpoint = "MyEndpoint".into();
let (handler, checker) = get_call_check_handler(None);
msgbus::register_any(endpoint, handler);
assert!(msgbus.borrow().get_endpoint(endpoint).is_some());
assert!(!checker.was_called());
msgbus::send_any(endpoint, &"Test Message");
assert!(checker.was_called());
}
#[rstest]
fn test_deregsiter_endpoint() {
let msgbus = get_message_bus();
let endpoint = "MyEndpoint".into();
let handler = get_stub_shareable_handler(None);
msgbus::register_any(endpoint, handler);
msgbus::deregister_any(endpoint);
assert!(msgbus.borrow().endpoints().is_empty());
}
#[rstest]
fn test_subscribe() {
let msgbus = get_message_bus();
let topic = "my-topic";
let handler = get_stub_shareable_handler(None);
msgbus::subscribe_any(topic.into(), handler, Some(1));
assert!(msgbus.borrow().has_subscribers(topic));
assert_eq!(msgbus.borrow().patterns(), vec![topic]);
}
#[rstest]
fn test_unsubscribe() {
let msgbus = get_message_bus();
let topic = "my-topic";
let handler = get_stub_shareable_handler(None);
msgbus::subscribe_any(topic.into(), handler.clone(), None);
msgbus::unsubscribe_any(topic.into(), &handler);
assert!(!msgbus.borrow().has_subscribers(topic));
assert!(msgbus.borrow().patterns().is_empty());
}
#[rstest]
fn test_matching_subscriptions() {
let msgbus = get_message_bus();
let pattern = "my-pattern";
let handler_id1 = Ustr::from("1");
let handler1 = get_stub_shareable_handler(Some(handler_id1));
let handler_id2 = Ustr::from("2");
let handler2 = get_stub_shareable_handler(Some(handler_id2));
let handler_id3 = Ustr::from("3");
let handler3 = get_stub_shareable_handler(Some(handler_id3));
let handler_id4 = Ustr::from("4");
let handler4 = get_stub_shareable_handler(Some(handler_id4));
msgbus::subscribe_any(pattern.into(), handler1, None);
msgbus::subscribe_any(pattern.into(), handler2, None);
msgbus::subscribe_any(pattern.into(), handler3, Some(1));
msgbus::subscribe_any(pattern.into(), handler4, Some(2));
assert_eq!(
msgbus.borrow().patterns(),
vec![pattern, pattern, pattern, pattern]
);
assert_eq!(subscriptions_count_any(pattern), 4);
let topic = pattern;
let subs = msgbus.borrow_mut().matching_subscriptions(topic);
assert_eq!(subs.len(), 4);
assert_eq!(subs[0].handler_id, handler_id4);
assert_eq!(subs[1].handler_id, handler_id3);
assert_eq!(subs[2].handler_id, handler_id1);
assert_eq!(subs[3].handler_id, handler_id2);
}
#[rstest]
fn test_subscription_pattern_matching() {
let msgbus = get_message_bus();
let handler1 = get_stub_shareable_handler(Some(Ustr::from("1")));
let handler2 = get_stub_shareable_handler(Some(Ustr::from("2")));
let handler3 = get_stub_shareable_handler(Some(Ustr::from("3")));
msgbus::subscribe_any("data.quotes.*".into(), handler1, None);
msgbus::subscribe_any("data.trades.*".into(), handler2, None);
msgbus::subscribe_any("data.*.BINANCE.*".into(), handler3, None);
assert_eq!(msgbus.borrow().subscriptions().len(), 3);
let topic = "data.quotes.BINANCE.ETHUSDT";
assert_eq!(msgbus.borrow().find_topic_matches(topic.into()).len(), 2);
let matches = msgbus.borrow_mut().matching_subscriptions(topic);
assert_eq!(matches.len(), 2);
assert_eq!(matches[0].handler_id, Ustr::from("3"));
assert_eq!(matches[1].handler_id, Ustr::from("1"));
}
struct SimpleSubscriptionModel {
subscriptions: Vec<(String, String)>,
}
impl SimpleSubscriptionModel {
fn new() -> Self {
Self {
subscriptions: Vec::new(),
}
}
fn subscribe(&mut self, pattern: &str, handler_id: &str) {
let subscription = (pattern.to_string(), handler_id.to_string());
if !self.subscriptions.contains(&subscription) {
self.subscriptions.push(subscription);
}
}
fn unsubscribe(&mut self, pattern: &str, handler_id: &str) -> bool {
let subscription = (pattern.to_string(), handler_id.to_string());
if let Some(idx) = self.subscriptions.iter().position(|s| s == &subscription) {
self.subscriptions.remove(idx);
true
} else {
false
}
}
fn is_subscribed(&self, pattern: &str, handler_id: &str) -> bool {
self.subscriptions
.contains(&(pattern.to_string(), handler_id.to_string()))
}
fn matching_subscriptions(&self, topic: &str) -> Vec<(String, String)> {
let topic = topic.into();
self.subscriptions
.iter()
.filter(|(pat, _)| is_matching_backtracking(topic, pat.into()))
.map(|(pat, id)| (pat.clone(), id.clone()))
.collect()
}
fn subscription_count(&self) -> usize {
self.subscriptions.len()
}
}
#[rstest]
fn subscription_model_fuzz_testing() {
let mut rng = StdRng::seed_from_u64(42);
let msgbus = get_message_bus();
let mut model = SimpleSubscriptionModel::new();
let mut handlers: Vec<(String, ShareableMessageHandler)> = Vec::new();
let patterns = generate_test_patterns(&mut rng);
let handler_ids: Vec<String> = (0..50).map(|i| format!("handler_{i}")).collect();
for id in &handler_ids {
let handler = get_stub_shareable_handler(Some(Ustr::from(id)));
handlers.push((id.clone(), handler));
}
let num_operations = 50_000;
for op_num in 0..num_operations {
let operation = rng.random_range(0..4);
match operation {
0 => {
let pattern_idx = rng.random_range(0..patterns.len());
let handler_idx = rng.random_range(0..handlers.len());
let pattern = &patterns[pattern_idx];
let (handler_id, handler) = &handlers[handler_idx];
model.subscribe(pattern, handler_id);
msgbus::subscribe_any(pattern.as_str().into(), handler.clone(), None);
assert_eq!(
model.subscription_count(),
msgbus.borrow().subscriptions().len()
);
assert!(
msgbus.borrow().is_subscribed(pattern, handler.clone()),
"Op {op_num}: is_subscribed should return true after subscribe"
);
}
1 => {
if model.subscription_count() > 0 {
let sub_idx = rng.random_range(0..model.subscription_count());
let (pattern, handler_id) = model.subscriptions[sub_idx].clone();
model.unsubscribe(&pattern, &handler_id);
let handler = handlers
.iter()
.find(|(id, _)| id == &handler_id)
.map(|(_, h)| h.clone())
.unwrap();
msgbus::unsubscribe_any(pattern.as_str().into(), &handler);
assert_eq!(
model.subscription_count(),
msgbus.borrow().subscriptions().len()
);
assert!(
!msgbus.borrow().is_subscribed(pattern, handler.clone()),
"Op {op_num}: is_subscribed should return false after unsubscribe"
);
}
}
2 => {
let pattern_idx = rng.random_range(0..patterns.len());
let handler_idx = rng.random_range(0..handlers.len());
let pattern = &patterns[pattern_idx];
let (handler_id, handler) = &handlers[handler_idx];
let expected = model.is_subscribed(pattern, handler_id);
let actual = msgbus.borrow().is_subscribed(pattern, handler.clone());
assert_eq!(
expected, actual,
"Op {op_num}: Subscription state mismatch for pattern '{pattern}', handler '{handler_id}': expected={expected}, actual={actual}"
);
}
3 => {
let topic = create_topic(&mut rng);
let actual_matches = msgbus.borrow_mut().matching_subscriptions(topic);
let expected_matches = model.matching_subscriptions(&topic);
assert_eq!(
expected_matches.len(),
actual_matches.len(),
"Op {}: Match count mismatch for topic '{}': expected={}, actual={}",
op_num,
topic,
expected_matches.len(),
actual_matches.len()
);
for sub in &actual_matches {
assert!(
expected_matches
.contains(&(sub.pattern.to_string(), sub.handler_id.to_string())),
"Op {}: Expected match not found: pattern='{}', handler_id='{}'",
op_num,
sub.pattern,
sub.handler_id
);
}
}
_ => unreachable!(),
}
}
}
fn generate_pattern_from_topic(topic: &str, rng: &mut StdRng) -> String {
let mut pattern = String::new();
for c in topic.chars() {
let val: f64 = rng.random();
if val < 0.1 {
pattern.push('*');
} else if val < 0.3 {
pattern.push('?');
} else if val >= 0.5 {
pattern.push(c);
}
}
pattern
}
fn generate_test_patterns(rng: &mut StdRng) -> Vec<String> {
let mut patterns = vec![
"data.*.*.*".to_string(),
"*.*.BINANCE.*".to_string(),
"events.order.*".to_string(),
"data.*.*.?USDT".to_string(),
"*.trades.*.BTC*".to_string(),
"*.*.*.*".to_string(),
];
for _ in 0..50 {
match rng.random_range(0..10) {
0..=1 => {
let idx = rng.random_range(0..patterns.len());
patterns.push(patterns[idx].clone());
}
_ => {
let topic = create_topic(rng);
let pattern = generate_pattern_from_topic(&topic, rng);
patterns.push(pattern);
}
}
}
patterns
}
fn create_topic(rng: &mut StdRng) -> Ustr {
let cat = ["data", "info", "order"];
let model = ["quotes", "trades", "orderbooks", "depths"];
let venue = ["BINANCE", "BYBIT", "OKX", "FTX", "KRAKEN"];
let instrument = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"];
let cat = cat[rng.random_range(0..cat.len())];
let model = model[rng.random_range(0..model.len())];
let venue = venue[rng.random_range(0..venue.len())];
let instrument = instrument[rng.random_range(0..instrument.len())];
Ustr::from(&format!("{cat}.{model}.{venue}.{instrument}"))
}
}