atelier_data 0.0.15

Data Artifacts and I/O for the atelier-rs engine
//! Topic-keyed broadcast channels for raw event publishing.
//!
//! The `TopicPublisher` is the output interface of the
//! `DataWorker` (see `super::data_worker::DataWorker`).  Each exchange
//! subscription maps to exactly one topic, and the `DataWorker`
//! publishes raw (un-normalised) events through the matching publisher.
//!
//! # Topic naming convention
//!
//! ```text
//! {datatype}.{qualifier}.{symbol}
//! ```
//!
//! Examples:
//! - `orderbook.50.BTCUSDT`
//! - `trade.all.BTCUSDT`
//! - `liquidation.all.BTCUSDT`
//! - `funding.all.BTCUSDT`
//! - `open_interest.all.BTCUSDT`
//!
//! # Back-pressure
//!
//! The underlying `tokio::sync::broadcast` channel has a bounded
//! capacity.  If **all** receivers lag behind, the oldest messages are
//! silently dropped — this is acceptable because the `data_worker`'s
//! contract is *best-effort ingestion*, not guaranteed delivery.
//! Guaranteed delivery is the downstream consumer's responsibility.

use crate::{config::markets::market_config::DataTypesSection, sources::ExchangeEvent};
use std::collections::HashMap;
use tokio::sync::broadcast;

/// Default broadcast channel capacity per topic.
pub const DEFAULT_CHANNEL_CAPACITY: usize = 8192;

// ─────────────────────────────────────────────────────────────────────────────
// TopicMessage
// ─────────────────────────────────────────────────────────────────────────────

/// Envelope wrapping a raw exchange event with ingestion metadata.
///
/// Published into broadcast channels by the `DataWorker`.  Downstream
/// consumers receive clones of this struct.
#[derive(Debug, Clone)]
pub struct TopicMessage {
    /// Canonical topic name (e.g. `"orderbook.50.BTCUSDT"`).
    pub topic: String,
    /// Wall-clock nanosecond timestamp when the frame was received
    /// by the `DataWorker` (before any processing).
    pub received_at_ns: u64,
    /// Exchange that produced this event.
    pub exchange: String,
    /// The raw decoded event — no normalisation, no delta application.
    pub payload: ExchangeEvent,
}

// ─────────────────────────────────────────────────────────────────────────────
// TopicPublisher
// ─────────────────────────────────────────────────────────────────────────────

/// A single named broadcast channel.
///
/// Holds the `broadcast::Sender` for one topic.  Subscribers obtain a
/// `broadcast::Receiver` by calling [`subscribe()`](Self::subscribe).
pub struct TopicPublisher {
    topic: String,
    tx: broadcast::Sender<TopicMessage>,
}

impl TopicPublisher {
    /// Create a new publisher for the given topic with the specified
    /// channel capacity.
    pub fn new(topic: impl Into<String>, capacity: usize) -> Self {
        let topic = topic.into();
        let (tx, _) = broadcast::channel(capacity);
        Self { topic, tx }
    }

    /// The topic name this publisher writes to.
    pub fn topic(&self) -> &str {
        &self.topic
    }

    /// Publish a message.
    ///
    /// Returns `Ok(n)` where `n` is the number of active receivers, or
    /// `Err` if there are zero receivers (the message is still dropped
    /// in that case, which is fine for best-effort ingestion).
    pub fn send(
        &self,
        msg: TopicMessage,
    ) -> Result<usize, Box<tokio::sync::broadcast::error::SendError<TopicMessage>>> {
        Ok(self.tx.send(msg)?)
    }

    /// Create a new receiver subscribed to this topic.
    ///
    /// The receiver will see all messages published **after** this call.
    pub fn subscribe(&self) -> broadcast::Receiver<TopicMessage> {
        self.tx.subscribe()
    }

    /// Number of currently active receivers.
    pub fn receiver_count(&self) -> usize {
        self.tx.receiver_count()
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// TopicRegistry
// ─────────────────────────────────────────────────────────────────────────────

/// Collection of [`TopicPublisher`]s keyed by topic name.
///
/// Built at `DataWorker` startup from the configured exchange, symbol,
/// and enabled data types.
pub struct TopicRegistry {
    publishers: HashMap<String, TopicPublisher>,
}

impl TopicRegistry {
    /// Build a registry for the given exchange configuration.
    ///
    /// Creates one [`TopicPublisher`] per enabled data type.
    pub fn from_config(
        exchange: &str,
        symbol: &str,
        datatypes: &DataTypesSection,
        capacity: usize,
    ) -> Self {
        let mut publishers = HashMap::new();

        if datatypes.orderbook.enabled {
            let topic = format!("orderbook.{}.{}", datatypes.orderbook.depth, symbol);
            publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
        }

        if datatypes.trades.enabled {
            let topic = format!("trade.all.{}", symbol);
            publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
        }

        if datatypes.liquidations.enabled {
            let topic = format!("liquidation.all.{}", symbol);
            publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
        }

        if datatypes.funding_rates.enabled {
            let topic = format!("funding.all.{}", symbol);
            publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
        }

        if datatypes.open_interest.enabled {
            let topic = format!("open_interest.all.{}", symbol);
            publishers.insert(topic.clone(), TopicPublisher::new(topic, capacity));
        }

        tracing::info!(
            exchange = exchange,
            symbol = symbol,
            topics = ?publishers.keys().collect::<Vec<_>>(),
            "topic_registry.created"
        );

        Self { publishers }
    }

    /// Look up a publisher by topic name.
    pub fn get(&self, topic: &str) -> Option<&TopicPublisher> {
        self.publishers.get(topic)
    }

    /// All registered topic names.
    pub fn topics(&self) -> Vec<&str> {
        self.publishers.keys().map(|s| s.as_str()).collect()
    }

    /// Number of registered topics.
    pub fn len(&self) -> usize {
        self.publishers.len()
    }

    /// Whether the registry is empty.
    pub fn is_empty(&self) -> bool {
        self.publishers.is_empty()
    }

    /// Subscribe to a specific topic.
    ///
    /// Returns `None` if the topic doesn't exist.
    pub fn subscribe(&self, topic: &str) -> Option<broadcast::Receiver<TopicMessage>> {
        self.publishers.get(topic).map(|p| p.subscribe())
    }

    /// Subscribe to all topics.
    ///
    /// Returns a `Vec` of `(topic_name, Receiver)` pairs.
    pub fn subscribe_all(&self) -> Vec<(String, broadcast::Receiver<TopicMessage>)> {
        self.publishers
            .iter()
            .map(|(name, pub_)| (name.clone(), pub_.subscribe()))
            .collect()
    }

    /// Publish a message to the named topic.
    ///
    /// Returns `Ok(n_receivers)` on success.  Returns `Err` if the
    /// topic doesn't exist (programming error — the caller should only
    /// publish to topics that were registered at startup).
    pub fn publish(&self, topic: &str, msg: TopicMessage) -> Result<usize, PublishError> {
        let publisher = self
            .publishers
            .get(topic)
            .ok_or_else(|| PublishError::UnknownTopic(topic.to_string()))?;

        // broadcast::send fails only if there are zero receivers.
        // That's fine — we log it and move on.
        match publisher.send(msg) {
            Ok(n) => Ok(n),
            Err(_) => {
                tracing::trace!(topic = topic, "topic_registry.no_receivers");
                Ok(0)
            }
        }
    }
}

/// Error from [`TopicRegistry::publish()`].
#[derive(Debug, Clone)]
pub enum PublishError {
    /// Attempted to publish to a topic that was never registered.
    UnknownTopic(String),
}

impl std::fmt::Display for PublishError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::UnknownTopic(t) => write!(f, "unknown topic: {}", t),
        }
    }
}

impl std::error::Error for PublishError {}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::markets::market_config::{
        DataTypesSection, FeedToggle, OrderbookConfig,
    };
    use crate::sources::bybit::events::BybitWssEvent;
    use crate::sources::bybit::responses::BybitTradeData;

    fn sample_datatypes() -> DataTypesSection {
        DataTypesSection {
            orderbook: OrderbookConfig {
                enabled: true,
                depth: 50,
            },
            trades: FeedToggle { enabled: true },
            liquidations: FeedToggle { enabled: true },
            funding_rates: FeedToggle { enabled: false },
            open_interest: FeedToggle { enabled: false },
        }
    }

    /// Construct a minimal `ExchangeEvent` for testing.
    fn dummy_event() -> ExchangeEvent {
        ExchangeEvent::Bybit(BybitWssEvent::TradeData(BybitTradeData {
            trade_ts: 1_700_000_000_000,
            symbol: "BTCUSDT".into(),
            side: "Buy".into(),
            amount: "0.001".into(),
            price: "42000.0".into(),
            direction: "PlusTick".into(),
            trade_id: "test-001".into(),
            block_trade: false,
            rpi_trade: false,
            sequence: 1,
        }))
    }

    #[test]
    fn registry_creates_topics_for_enabled_datatypes() {
        let registry =
            TopicRegistry::from_config("bybit", "BTCUSDT", &sample_datatypes(), 64);

        assert_eq!(registry.len(), 3);
        assert!(registry.get("orderbook.50.BTCUSDT").is_some());
        assert!(registry.get("trade.all.BTCUSDT").is_some());
        assert!(registry.get("liquidation.all.BTCUSDT").is_some());
        // disabled
        assert!(registry.get("funding.all.BTCUSDT").is_none());
        assert!(registry.get("open_interest.all.BTCUSDT").is_none());
    }

    #[test]
    fn subscribe_and_publish() {
        let registry =
            TopicRegistry::from_config("bybit", "BTCUSDT", &sample_datatypes(), 64);

        let mut rx = registry.subscribe("trade.all.BTCUSDT").unwrap();

        let msg = TopicMessage {
            topic: "trade.all.BTCUSDT".into(),
            received_at_ns: 123_456_789,
            exchange: "bybit".into(),
            payload: dummy_event(),
        };

        let n = registry.publish("trade.all.BTCUSDT", msg).unwrap();
        assert_eq!(n, 1);

        let received = rx.try_recv().unwrap();
        assert_eq!(received.topic, "trade.all.BTCUSDT");
        assert_eq!(received.received_at_ns, 123_456_789);
    }

    #[test]
    fn publish_to_unknown_topic_returns_error() {
        let registry =
            TopicRegistry::from_config("bybit", "BTCUSDT", &sample_datatypes(), 64);

        let msg = TopicMessage {
            topic: "nonexistent".into(),
            received_at_ns: 0,
            exchange: "bybit".into(),
            payload: dummy_event(),
        };

        let result = registry.publish("nonexistent", msg);
        assert!(result.is_err());
    }
}