use crate::{config::markets::market_config::DataTypesSection, sources::ExchangeEvent};
use std::collections::HashMap;
use tokio::sync::broadcast;
pub const DEFAULT_CHANNEL_CAPACITY: usize = 8192;
#[derive(Debug, Clone)]
pub struct TopicMessage {
pub topic: String,
pub received_at_ns: u64,
pub exchange: String,
pub payload: ExchangeEvent,
}
pub struct TopicPublisher {
topic: String,
tx: broadcast::Sender<TopicMessage>,
}
impl TopicPublisher {
pub fn new(topic: impl Into<String>, capacity: usize) -> Self {
let topic = topic.into();
let (tx, _) = broadcast::channel(capacity);
Self { topic, tx }
}
pub fn topic(&self) -> &str {
&self.topic
}
pub fn send(
&self,
msg: TopicMessage,
) -> Result<usize, Box<tokio::sync::broadcast::error::SendError<TopicMessage>>> {
Ok(self.tx.send(msg)?)
}
pub fn subscribe(&self) -> broadcast::Receiver<TopicMessage> {
self.tx.subscribe()
}
pub fn receiver_count(&self) -> usize {
self.tx.receiver_count()
}
}
pub struct TopicRegistry {
publishers: HashMap<String, TopicPublisher>,
}
impl TopicRegistry {
pub fn from_config(
exchange: &str,
symbol: &str,
datatypes: &DataTypesSection,
capacity: usize,
) -> Self {
let mut publishers = HashMap::new();
if datatypes.orderbook.enabled {
let topic = format!("orderbook.{}.{}", datatypes.orderbook.depth, symbol);
publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
}
if datatypes.trades.enabled {
let topic = format!("trade.all.{}", symbol);
publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
}
if datatypes.liquidations.enabled {
let topic = format!("liquidation.all.{}", symbol);
publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
}
if datatypes.funding_rates.enabled {
let topic = format!("funding.all.{}", symbol);
publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
}
if datatypes.open_interest.enabled {
let topic = format!("open_interest.all.{}", symbol);
publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
}
tracing::info!(
exchange = exchange,
symbol = symbol,
topics = ?publishers.keys().collect::<Vec<_>>(),
"topic_registry.created"
);
Self { publishers }
}
pub fn get(&self, topic: &str) -> Option<&TopicPublisher> {
self.publishers.get(topic)
}
pub fn topics(&self) -> Vec<&str> {
self.publishers.keys().map(|s| s.as_str()).collect()
}
pub fn len(&self) -> usize {
self.publishers.len()
}
pub fn is_empty(&self) -> bool {
self.publishers.is_empty()
}
pub fn subscribe(&self, topic: &str) -> Option<broadcast::Receiver<TopicMessage>> {
self.publishers.get(topic).map(|p| p.subscribe())
}
pub fn subscribe_all(&self) -> Vec<(String, broadcast::Receiver<TopicMessage>)> {
self.publishers
.iter()
.map(|(name, pub_)| (name.clone(), pub_.subscribe()))
.collect()
}
pub fn publish(&self, topic: &str, msg: TopicMessage) -> Result<usize, PublishError> {
let publisher = self
.publishers
.get(topic)
.ok_or_else(|| PublishError::UnknownTopic(topic.to_string()))?;
match publisher.send(msg) {
Ok(n) => Ok(n),
Err(_) => {
tracing::trace!(topic = topic, "topic_registry.no_receivers");
Ok(0)
}
}
}
}
#[derive(Debug, Clone)]
pub enum PublishError {
UnknownTopic(String),
}
impl std::fmt::Display for PublishError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownTopic(t) => write!(f, "unknown topic: {}", t),
}
}
}
impl std::error::Error for PublishError {}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::markets::market_config::{
DataTypesSection, FeedToggle, OrderbookConfig,
};
use crate::sources::bybit::events::BybitWssEvent;
use crate::sources::bybit::responses::BybitTradeData;
fn sample_datatypes() -> DataTypesSection {
DataTypesSection {
orderbook: OrderbookConfig {
enabled: true,
depth: 50,
},
trades: FeedToggle { enabled: true },
liquidations: FeedToggle { enabled: true },
funding_rates: FeedToggle { enabled: false },
open_interest: FeedToggle { enabled: false },
}
}
fn dummy_event() -> ExchangeEvent {
ExchangeEvent::Bybit(BybitWssEvent::TradeData(BybitTradeData {
trade_ts: 1_700_000_000_000,
symbol: "BTCUSDT".into(),
side: "Buy".into(),
amount: "0.001".into(),
price: "42000.0".into(),
direction: "PlusTick".into(),
trade_id: "test-001".into(),
block_trade: false,
rpi_trade: false,
sequence: 1,
}))
}
#[test]
fn registry_creates_topics_for_enabled_datatypes() {
let registry =
TopicRegistry::from_config("bybit", "BTCUSDT", &sample_datatypes(), 64);
assert_eq!(registry.len(), 3);
assert!(registry.get("orderbook.50.BTCUSDT").is_some());
assert!(registry.get("trade.all.BTCUSDT").is_some());
assert!(registry.get("liquidation.all.BTCUSDT").is_some());
assert!(registry.get("funding.all.BTCUSDT").is_none());
assert!(registry.get("open_interest.all.BTCUSDT").is_none());
}
#[test]
fn subscribe_and_publish() {
let registry =
TopicRegistry::from_config("bybit", "BTCUSDT", &sample_datatypes(), 64);
let mut rx = registry.subscribe("trade.all.BTCUSDT").unwrap();
let msg = TopicMessage {
topic: "trade.all.BTCUSDT".into(),
received_at_ns: 123_456_789,
exchange: "bybit".into(),
payload: dummy_event(),
};
let n = registry.publish("trade.all.BTCUSDT", msg).unwrap();
assert_eq!(n, 1);
let received = rx.try_recv().unwrap();
assert_eq!(received.topic, "trade.all.BTCUSDT");
assert_eq!(received.received_at_ns, 123_456_789);
}
#[test]
fn publish_to_unknown_topic_returns_error() {
let registry =
TopicRegistry::from_config("bybit", "BTCUSDT", &sample_datatypes(), 64);
let msg = TopicMessage {
topic: "nonexistent".into(),
received_at_ns: 0,
exchange: "bybit".into(),
payload: dummy_event(),
};
let result = registry.publish("nonexistent", msg);
assert!(result.is_err());
}
}