digdigdig3 0.2.2

Unified async Rust API for 44 exchange connectors — crypto, stocks, forex. REST + WebSocket.
Documentation
//! TopicRegistry — maps (StreamKind, AccountType) → (TopicPattern, ParserFn).
//!
//! ## Matching strategy
//!
//! Single-star wildcard covering one or more contiguous characters.
//! Examples:
//!   - `*@trade`         — Binance trades
//!   - `publicTrade.*`   — Bybit trades
//!   - `market.*.depth`  — HTX depth topics

use std::collections::HashMap;

use serde_json::Value;

use crate::core::types::{AccountType, StreamEvent, WebSocketError, WebSocketResult};

use super::stream_kind::StreamKind;

// ─────────────────────────────────────────────────────────────────────────────
// TopicKey
// ─────────────────────────────────────────────────────────────────────────────

/// Raw topic string extracted from a frame (e.g. "btcusdt@trade", "trades.BTC-USDT-SWAP").
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopicKey(pub String);

impl TopicKey {
    pub fn new(s: impl Into<String>) -> Self {
        Self(s.into())
    }

    pub fn as_str(&self) -> &str {
        &self.0
    }
}

impl std::fmt::Display for TopicKey {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(&self.0)
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// TopicPattern
// ─────────────────────────────────────────────────────────────────────────────

/// A pattern used to register parsers.  Supports a single `*` wildcard.
///
/// Examples:
///   `"*@trade"`          — matches any Binance trade topic
///   `"publicTrade.*"`    — matches any Bybit trade topic
///   `"market.*.depth"`   — matches any HTX depth topic with symbol in segment 2
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopicPattern(pub String);

impl TopicPattern {
    pub fn new(s: impl Into<String>) -> Self {
        Self(s.into())
    }

    /// Returns true if this pattern matches the given key.
    /// At most one `*` wildcard is supported.
    pub fn matches(&self, key: &TopicKey) -> bool {
        topic_pattern_matches(&self.0, &key.0)
    }
}

impl std::fmt::Display for TopicPattern {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(&self.0)
    }
}

/// Pattern matching: split on `*` → prefix + optional suffix.
/// key must start_with(prefix) AND end_with(suffix).
/// If no `*`, requires exact equality.
pub fn topic_pattern_matches(pattern: &str, key: &str) -> bool {
    match pattern.find('*') {
        None => pattern == key,
        Some(star_pos) => {
            let prefix = &pattern[..star_pos];
            let suffix = &pattern[star_pos + 1..];

            // Ensure prefix fits
            if !key.starts_with(prefix) {
                return false;
            }
            // Remaining after stripping prefix
            let rest = &key[prefix.len()..];
            // Suffix must fit into rest, and the wildcard must match at least 0 chars
            if suffix.is_empty() {
                true
            } else {
                rest.ends_with(suffix) && rest.len() >= suffix.len()
            }
        }
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// ParserFn
// ─────────────────────────────────────────────────────────────────────────────

/// Parse a raw JSON frame into a StreamEvent.
/// Receives the full frame Value so parsers can read any field.
pub type ParserFn = fn(&Value) -> WebSocketResult<StreamEvent>;

// ─────────────────────────────────────────────────────────────────────────────
// RegistryKey / RegistryEntry
// ─────────────────────────────────────────────────────────────────────────────

/// Registry key: (stream kind, account type).
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RegistryKey {
    pub kind: StreamKind,
    pub account_type: AccountType,
}

/// One registered entry: the wire topic pattern + parser function.
#[derive(Clone)]
pub struct RegistryEntry {
    pub pattern: TopicPattern,
    pub parser: ParserFn,
}

// ─────────────────────────────────────────────────────────────────────────────
// TopicRegistry
// ─────────────────────────────────────────────────────────────────────────────

/// Maps (StreamKind, AccountType) → (TopicPattern, ParserFn).
///
/// Also maintains a flat dispatch list of (TopicPattern, ParserFn) for O(patterns)
/// per-frame dispatch (typically 5-40 patterns per exchange).
///
/// Immutable after construction — built once via `TopicRegistryBuilder`.
pub struct TopicRegistry {
    /// Primary map: per (kind, account) what pattern + parser.
    entries: HashMap<RegistryKey, RegistryEntry>,

    /// Flattened list of (pattern, parser) for frame dispatch.
    /// Built once at construction; not mutated at runtime.
    dispatch: Vec<(TopicPattern, ParserFn)>,
}

impl TopicRegistry {
    pub fn builder() -> TopicRegistryBuilder {
        TopicRegistryBuilder::default()
    }

    /// Look up a parser for an incoming frame's topic key.
    /// Returns the first pattern that matches.  O(patterns).
    pub fn dispatch(&self, key: &TopicKey) -> Option<ParserFn> {
        for (pattern, parser) in &self.dispatch {
            if pattern.matches(key) {
                return Some(*parser);
            }
        }
        None
    }

    /// Returns true if (kind, account) has a registered parser.
    pub fn supports(&self, kind: &StreamKind, account: AccountType) -> bool {
        let key = RegistryKey {
            kind: kind.clone(),
            account_type: account,
        };
        self.entries.contains_key(&key)
    }

    /// Returns all (kind, account) pairs with Native support.
    pub fn native_pairs(&self) -> impl Iterator<Item = (&StreamKind, AccountType)> + '_ {
        self.entries
            .keys()
            .map(|k| (&k.kind, k.account_type))
    }

    /// Returns a reference to all raw entries (for capability introspection).
    pub fn entries(&self) -> &HashMap<RegistryKey, RegistryEntry> {
        &self.entries
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// TopicRegistryBuilder
// ─────────────────────────────────────────────────────────────────────────────

#[derive(Default)]
pub struct TopicRegistryBuilder {
    entries: Vec<(RegistryKey, RegistryEntry)>,
}

impl TopicRegistryBuilder {
    /// Register a (kind, account_type, pattern, parser) entry.
    pub fn register(
        mut self,
        kind: StreamKind,
        account_type: AccountType,
        pattern: impl Into<String>,
        parser: ParserFn,
    ) -> Self {
        let key = RegistryKey { kind, account_type };
        let entry = RegistryEntry {
            pattern: TopicPattern::new(pattern),
            parser,
        };
        self.entries.push((key, entry));
        self
    }

    pub fn build(self) -> TopicRegistry {
        // Build dispatch list (de-dup patterns that appear multiple times is intentional
        // — same pattern for spot + futures is allowed; first match wins per dispatch).
        let mut dispatch: Vec<(TopicPattern, ParserFn)> = Vec::new();
        let mut map: HashMap<RegistryKey, RegistryEntry> = HashMap::new();

        for (key, entry) in self.entries {
            // Add to dispatch list (all patterns, including duplicates from different keys)
            dispatch.push((entry.pattern.clone(), entry.parser));
            map.insert(key, entry);
        }

        TopicRegistry {
            entries: map,
            dispatch,
        }
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// Error type helper (used to create WebSocketError in parser return)
// ─────────────────────────────────────────────────────────────────────────────

/// Helper: create a parse error for a field that is missing.
pub fn missing_field(field: &str) -> WebSocketError {
    WebSocketError::Parse(format!("missing field: {}", field))
}

// ─────────────────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn exact_match() {
        assert!(topic_pattern_matches("spot.trades", "spot.trades"));
        assert!(!topic_pattern_matches("spot.trades", "spot.trade"));
    }

    #[test]
    fn suffix_wildcard() {
        // "*@trade" matches "btcusdt@trade" and "ethusdt@trade"
        assert!(topic_pattern_matches("*@trade", "btcusdt@trade"));
        assert!(topic_pattern_matches("*@trade", "ethusdt@trade"));
        assert!(!topic_pattern_matches("*@trade", "btcusdt@kline_1m"));
    }

    #[test]
    fn prefix_wildcard() {
        // "publicTrade.*" matches "publicTrade.BTCUSDT"
        assert!(topic_pattern_matches("publicTrade.*", "publicTrade.BTCUSDT"));
        assert!(topic_pattern_matches("publicTrade.*", "publicTrade.ETHUSDT"));
        assert!(!topic_pattern_matches("publicTrade.*", "orderbook.BTCUSDT"));
    }

    #[test]
    fn mid_wildcard() {
        // "market.*.trade.detail" matches "market.BTC-USDT.trade.detail"
        assert!(topic_pattern_matches(
            "market.*.trade.detail",
            "market.BTC-USDT.trade.detail"
        ));
        assert!(!topic_pattern_matches(
            "market.*.trade.detail",
            "market.BTC-USDT.depth"
        ));
    }

    #[test]
    fn topic_key_display() {
        let key = TopicKey::new("btcusdt@trade");
        assert_eq!(key.to_string(), "btcusdt@trade");
    }

    #[test]
    fn registry_dispatch() {
        fn dummy_parser(_v: &Value) -> WebSocketResult<StreamEvent> {
            Err(WebSocketError::Parse("test".into()))
        }

        let registry = TopicRegistry::builder()
            .register(
                StreamKind::Trade,
                AccountType::Spot,
                "*@trade",
                dummy_parser,
            )
            .build();

        let key = TopicKey::new("btcusdt@trade");
        assert!(registry.dispatch(&key).is_some());

        let miss = TopicKey::new("btcusdt@kline_1m");
        assert!(registry.dispatch(&miss).is_none());
    }

    #[test]
    fn registry_supports() {
        fn dummy_parser(_v: &Value) -> WebSocketResult<StreamEvent> {
            Err(WebSocketError::Parse("test".into()))
        }

        let registry = TopicRegistry::builder()
            .register(
                StreamKind::Trade,
                AccountType::Spot,
                "*@trade",
                dummy_parser,
            )
            .build();

        assert!(registry.supports(&StreamKind::Trade, AccountType::Spot));
        assert!(!registry.supports(&StreamKind::Ticker, AccountType::Spot));
    }
}