Skip to main content

digdigdig3_core/core/websocket/
stream_spec.rs

1//! StreamSpec — internal subscription specification for UniversalWsTransport.
2//!
3//! Replaces SubscriptionRequest inside the framework layer. SubscriptionRequest
4//! is kept for the public WebSocketConnector trait (backward compat).
5
6use crate::core::types::{
7    AccountType, ExchangeId, OwnedSymbolInput, StreamType, SubscriptionRequest, Symbol,
8    WebSocketError, WebSocketResult,
9};
10use crate::core::utils::symbol_normalizer::NormalizerError;
11
12use super::stream_kind::{KlineInterval, StreamKind};
13
14/// Internal subscription specification used by UniversalWsTransport.
15///
16/// Converted from SubscriptionRequest at subscribe() time.
17///
18/// `symbol` holds either a raw exchange-native string (e.g. `"BTCUSDT"` for
19/// Binance) or a canonical [`Symbol`] that is resolved to exchange-native via
20/// [`StreamSpec::resolve_symbol`] before building the subscribe frame.
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
22pub struct StreamSpec {
23    pub kind: StreamKind,
24    /// Symbol input — raw exchange-native or canonical (resolved on use).
25    pub symbol: OwnedSymbolInput,
26    pub account_type: AccountType,
27    /// Depth hint for orderbook channels. None = exchange default.
28    pub depth: Option<u32>,
29    /// Speed hint in ms. None = exchange default.
30    pub speed_ms: Option<u32>,
31}
32
33impl StreamSpec {
34    /// Resolve `symbol` to an exchange-native owned [`String`].
35    ///
36    /// - `OwnedSymbolInput::Raw(s)` → clones `s`.
37    /// - `OwnedSymbolInput::Canonical(sym)` → normalizes via [`SymbolNormalizer`].
38    pub fn resolve_symbol(
39        &self,
40        exchange: ExchangeId,
41        account_type: AccountType,
42    ) -> Result<String, NormalizerError> {
43        self.symbol.resolve(exchange, account_type)
44    }
45}
46
47impl TryFrom<SubscriptionRequest> for StreamSpec {
48    type Error = WebSocketError;
49
50    fn try_from(req: SubscriptionRequest) -> WebSocketResult<Self> {
51        let kind = StreamKind::try_from(req.stream_type)?;
52        // Prefer the explicit raw string if the caller set it via Symbol::with_raw.
53        // Fall back to base+quote concat as a last-resort default; callers that
54        // need correct per-exchange format must call SymbolNormalizer::to_exchange
55        // before building the SubscriptionRequest.
56        let raw = req
57            .symbol
58            .raw()
59            .map(|r| r.to_string())
60            .unwrap_or_else(|| req.symbol.to_concat());
61        let symbol = OwnedSymbolInput::Raw(raw);
62        Ok(Self {
63            kind,
64            symbol,
65            account_type: req.account_type,
66            depth: req.depth,
67            speed_ms: req.update_speed_ms,
68        })
69    }
70}
71
72impl From<StreamSpec> for SubscriptionRequest {
73    fn from(spec: StreamSpec) -> Self {
74        let stream_type = StreamType::from(spec.kind);
75        // Reconstruct a Symbol from the OwnedSymbolInput for the public boundary.
76        // Canonical variant: use base+quote directly. Raw: store as raw field.
77        let symbol = match &spec.symbol {
78            OwnedSymbolInput::Raw(s) => Symbol::with_raw("", "", s.clone()),
79            OwnedSymbolInput::Canonical(sym) => sym.clone(),
80        };
81        SubscriptionRequest {
82            symbol,
83            stream_type,
84            account_type: spec.account_type,
85            depth: spec.depth,
86            update_speed_ms: spec.speed_ms,
87        }
88    }
89}
90
91// ─────────────────────────────────────────────────────────────────────────────
92// StreamType → StreamKind (lossless)
93// ─────────────────────────────────────────────────────────────────────────────
94
95impl TryFrom<StreamType> for StreamKind {
96    type Error = WebSocketError;
97
98    fn try_from(st: StreamType) -> WebSocketResult<Self> {
99        Ok(match st {
100            StreamType::Ticker => StreamKind::Ticker,
101            StreamType::Trade => StreamKind::Trade,
102            StreamType::Orderbook => StreamKind::Orderbook,
103            StreamType::OrderbookDelta => StreamKind::OrderbookDelta,
104            StreamType::OrderbookL3 => StreamKind::OrderbookL3,
105            StreamType::Kline { interval } => StreamKind::Kline {
106                interval: KlineInterval::new(interval),
107            },
108            StreamType::MarkPrice => StreamKind::MarkPrice,
109            StreamType::FundingRate => StreamKind::FundingRate,
110            StreamType::Liquidation => StreamKind::Liquidation,
111            StreamType::OpenInterest => StreamKind::OpenInterest,
112            StreamType::LongShortRatio => StreamKind::LongShortRatio,
113            StreamType::AggTrade => StreamKind::AggTrade,
114            StreamType::CompositeIndex => StreamKind::CompositeIndex,
115            StreamType::MarkPriceKline { interval } => StreamKind::MarkPriceKline {
116                interval: KlineInterval::new(interval),
117            },
118            StreamType::IndexPriceKline { interval } => StreamKind::IndexPriceKline {
119                interval: KlineInterval::new(interval),
120            },
121            StreamType::PremiumIndexKline { interval } => StreamKind::PremiumIndexKline {
122                interval: KlineInterval::new(interval),
123            },
124            StreamType::IndexPrice => StreamKind::IndexPrice,
125            StreamType::HistoricalVolatility => StreamKind::HistoricalVolatility,
126            StreamType::InsuranceFund => StreamKind::InsuranceFund,
127            StreamType::Basis => StreamKind::Basis,
128            StreamType::OptionGreeks => StreamKind::OptionGreeks,
129            StreamType::VolatilityIndex => StreamKind::VolatilityIndex,
130            StreamType::BlockTrade => StreamKind::BlockTrade,
131            StreamType::AuctionEvent => StreamKind::AuctionEvent,
132            StreamType::MarketWarning => StreamKind::MarketWarning,
133            StreamType::SettlementEvent => StreamKind::SettlementEvent,
134            StreamType::RiskLimit => StreamKind::RiskLimit,
135            StreamType::PredictedFunding => StreamKind::PredictedFunding,
136            StreamType::FundingSettlement => StreamKind::FundingSettlement,
137            StreamType::OrderUpdate => StreamKind::OrderUpdate,
138            StreamType::BalanceUpdate => StreamKind::BalanceUpdate,
139            StreamType::PositionUpdate => StreamKind::PositionUpdate,
140        })
141    }
142}
143
144// ─────────────────────────────────────────────────────────────────────────────
145// StreamKind → StreamType (lossless round-trip)
146// ─────────────────────────────────────────────────────────────────────────────
147
148impl From<StreamKind> for StreamType {
149    fn from(kind: StreamKind) -> Self {
150        match kind {
151            StreamKind::Ticker => StreamType::Ticker,
152            StreamKind::Trade => StreamType::Trade,
153            StreamKind::Orderbook => StreamType::Orderbook,
154            StreamKind::OrderbookDelta => StreamType::OrderbookDelta,
155            StreamKind::OrderbookL3 => StreamType::OrderbookL3,
156            StreamKind::Kline { interval } => StreamType::Kline {
157                interval: interval.0,
158            },
159            StreamKind::MarkPrice => StreamType::MarkPrice,
160            StreamKind::FundingRate => StreamType::FundingRate,
161            StreamKind::Liquidation => StreamType::Liquidation,
162            StreamKind::OpenInterest => StreamType::OpenInterest,
163            StreamKind::LongShortRatio => StreamType::LongShortRatio,
164            StreamKind::AggTrade => StreamType::AggTrade,
165            StreamKind::CompositeIndex => StreamType::CompositeIndex,
166            StreamKind::MarkPriceKline { interval } => StreamType::MarkPriceKline {
167                interval: interval.0,
168            },
169            StreamKind::IndexPriceKline { interval } => StreamType::IndexPriceKline {
170                interval: interval.0,
171            },
172            StreamKind::PremiumIndexKline { interval } => StreamType::PremiumIndexKline {
173                interval: interval.0,
174            },
175            StreamKind::IndexPrice => StreamType::IndexPrice,
176            StreamKind::HistoricalVolatility => StreamType::HistoricalVolatility,
177            StreamKind::InsuranceFund => StreamType::InsuranceFund,
178            StreamKind::Basis => StreamType::Basis,
179            StreamKind::OptionGreeks => StreamType::OptionGreeks,
180            StreamKind::VolatilityIndex => StreamType::VolatilityIndex,
181            StreamKind::BlockTrade => StreamType::BlockTrade,
182            StreamKind::AuctionEvent => StreamType::AuctionEvent,
183            StreamKind::MarketWarning => StreamType::MarketWarning,
184            StreamKind::SettlementEvent => StreamType::SettlementEvent,
185            StreamKind::RiskLimit => StreamType::RiskLimit,
186            StreamKind::PredictedFunding => StreamType::PredictedFunding,
187            StreamKind::FundingSettlement => StreamType::FundingSettlement,
188            StreamKind::OrderUpdate => StreamType::OrderUpdate,
189            StreamKind::BalanceUpdate => StreamType::BalanceUpdate,
190            StreamKind::PositionUpdate => StreamType::PositionUpdate,
191        }
192    }
193}