Skip to main content

bat_markets/
stream.rs

1use std::collections::HashSet;
2
3use tokio::{
4    sync::{broadcast, oneshot},
5    task::JoinHandle,
6};
7
8use bat_markets_core::{
9    Balance, ErrorKind, Execution, FundingRate, InstrumentId, Kline, KlineInterval, Liquidation,
10    MarkPrice, OpenInterest, Order, OrderBookDelta, Position, PrivateLaneEvent, PublicLaneEvent,
11    Result, Ticker, TradeTick, WatchOrderBookRequest,
12};
13#[cfg(test)]
14use bat_markets_core::{BookTop, WatchFastFeedRequest};
15
16use crate::{
17    client::BatMarkets,
18    subscriptions::{PrivateSubscriptionLease, PublicSubscriptionLease},
19};
20
21pub(crate) const fn public_lane(inner: &BatMarkets) -> PublicLaneClient<'_> {
22    PublicLaneClient { inner }
23}
24
25pub(crate) const fn private_lane(inner: &BatMarkets) -> PrivateLaneClient<'_> {
26    PrivateLaneClient { inner }
27}
28
29/// Public market-data subscription plan for live websocket runners.
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub(crate) struct PublicSubscription {
32    /// Instruments included in the public subscription.
33    pub instrument_ids: Vec<InstrumentId>,
34    /// Subscribe to ticker updates.
35    pub ticker: bool,
36    /// Subscribe to public trade updates.
37    pub trades: bool,
38    /// Subscribe to top-of-book updates.
39    pub book_top: bool,
40    /// Subscribe to full order-book delta updates.
41    pub order_book: bool,
42    /// Subscribe to mark-price updates.
43    pub mark_price: bool,
44    /// Subscribe to funding-rate updates.
45    pub funding_rate: bool,
46    /// Subscribe to open-interest updates.
47    pub open_interest: bool,
48    /// Subscribe to liquidation updates.
49    pub liquidations: bool,
50    /// OHLCV intervals to subscribe to.
51    pub kline_intervals: Vec<Box<str>>,
52}
53
54/// Venue-specific public websocket route for shared live subscriptions.
55#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
56pub(crate) enum PublicStreamRoute {
57    /// One public websocket carries every public topic for this venue.
58    Default,
59    /// Binance USD-M market-data websocket.
60    BinanceMarket,
61    /// Binance USD-M public-data websocket.
62    BinancePublic,
63}
64
65/// Typed market-data watch request for one or many instruments.
66#[derive(Clone, Debug, PartialEq, Eq)]
67pub(crate) struct WatchInstrumentsRequest {
68    /// Instruments to watch.
69    pub instrument_ids: Vec<InstrumentId>,
70}
71
72impl WatchInstrumentsRequest {
73    /// Build a watch request for a single instrument.
74    #[must_use]
75    pub fn for_instrument(instrument_id: InstrumentId) -> Self {
76        Self {
77            instrument_ids: vec![instrument_id],
78        }
79    }
80
81    /// Build a watch request for one or many instruments.
82    #[must_use]
83    pub fn for_instruments(instrument_ids: Vec<InstrumentId>) -> Self {
84        Self { instrument_ids }
85    }
86}
87
88/// Typed OHLCV watch request for one or many instruments.
89#[derive(Clone, Debug, PartialEq, Eq)]
90pub(crate) struct WatchOhlcvRequest {
91    /// Instruments to watch.
92    pub instrument_ids: Vec<InstrumentId>,
93    /// OHLCV interval in unified notation, such as `1m`, `5m`, or `1h`.
94    pub interval: Box<str>,
95}
96
97impl WatchOhlcvRequest {
98    /// Build an OHLCV watch request for a single instrument and interval.
99    #[must_use]
100    pub fn for_instrument(instrument_id: InstrumentId, interval: impl Into<Box<str>>) -> Self {
101        Self {
102            instrument_ids: vec![instrument_id],
103            interval: normalize_interval_box(interval.into()),
104        }
105    }
106
107    /// Build an OHLCV watch request for one or many instruments and one interval.
108    #[must_use]
109    pub fn for_instruments(
110        instrument_ids: Vec<InstrumentId>,
111        interval: impl Into<Box<str>>,
112    ) -> Self {
113        Self {
114            instrument_ids,
115            interval: normalize_interval_box(interval.into()),
116        }
117    }
118
119    fn into_public_subscription(self) -> PublicSubscription {
120        PublicSubscription {
121            instrument_ids: self.instrument_ids,
122            ticker: false,
123            trades: false,
124            book_top: false,
125            order_book: false,
126            mark_price: false,
127            funding_rate: false,
128            open_interest: false,
129            liquidations: false,
130            kline_intervals: vec![self.interval],
131        }
132    }
133}
134
135#[derive(Clone, Debug)]
136enum InstrumentFilter {
137    Single(InstrumentId),
138    Many(HashSet<InstrumentId>),
139}
140
141impl InstrumentFilter {
142    fn from_vec(instrument_ids: Vec<InstrumentId>) -> Self {
143        match instrument_ids.as_slice() {
144            [instrument_id] => Self::Single(instrument_id.clone()),
145            _ => Self::Many(instrument_ids.into_iter().collect()),
146        }
147    }
148
149    #[cfg(test)]
150    fn from_slice(instrument_ids: &[InstrumentId]) -> Self {
151        match instrument_ids {
152            [instrument_id] => Self::Single(instrument_id.clone()),
153            _ => Self::Many(instrument_ids.iter().cloned().collect()),
154        }
155    }
156
157    fn contains(&self, instrument_id: &InstrumentId) -> bool {
158        match self {
159            Self::Single(candidate) => candidate == instrument_id,
160            Self::Many(candidates) => candidates.contains(instrument_id),
161        }
162    }
163}
164
165/// Handle for a running live stream task.
166pub(crate) struct LiveStreamHandle {
167    pub(crate) _shutdown: oneshot::Sender<()>,
168    pub(crate) join: JoinHandle<Result<()>>,
169}
170
171impl LiveStreamHandle {
172    /// Abort the stream task immediately.
173    pub fn abort(&self) {
174        self.join.abort();
175    }
176}
177
178async fn recv_public_event(
179    receiver: &mut broadcast::Receiver<PublicLaneEvent>,
180    label: &str,
181) -> Result<PublicLaneEvent> {
182    loop {
183        match receiver.recv().await {
184            Ok(event) => return Ok(event),
185            Err(broadcast::error::RecvError::Lagged(_)) => continue,
186            Err(error) => {
187                return Err(bat_markets_core::MarketError::new(
188                    ErrorKind::TransportError,
189                    format!("{label} receive failed: {error}"),
190                ));
191            }
192        }
193    }
194}
195
196async fn recv_private_event(
197    receiver: &mut broadcast::Receiver<PrivateLaneEvent>,
198    label: &str,
199) -> Result<PrivateLaneEvent> {
200    loop {
201        match receiver.recv().await {
202            Ok(event) => return Ok(event),
203            Err(broadcast::error::RecvError::Lagged(_)) => continue,
204            Err(error) => {
205                return Err(bat_markets_core::MarketError::new(
206                    ErrorKind::TransportError,
207                    format!("{label} receive failed: {error}"),
208                ));
209            }
210        }
211    }
212}
213
214/// Typed OHLCV subscription receiver over the shared public event bus.
215pub(crate) struct OhlcvUpdates<'a> {
216    inner: &'a BatMarkets,
217    receiver: broadcast::Receiver<PublicLaneEvent>,
218    instrument_ids: InstrumentFilter,
219    interval: Box<str>,
220}
221
222impl<'a> OhlcvUpdates<'a> {
223    fn new(
224        inner: &'a BatMarkets,
225        receiver: broadcast::Receiver<PublicLaneEvent>,
226        request: WatchOhlcvRequest,
227    ) -> Self {
228        Self {
229            inner,
230            receiver,
231            instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
232            interval: request.interval,
233        }
234    }
235
236    /// Wait for the next matching OHLCV candle.
237    pub async fn recv(&mut self) -> Result<Kline> {
238        let requested_interval = parse_watch_interval(self.interval.as_ref())?;
239        loop {
240            let event = recv_public_event(&mut self.receiver, "ohlcv subscription").await?;
241
242            let PublicLaneEvent::Kline(kline) = event else {
243                continue;
244            };
245            let Some(incoming_interval) = KlineInterval::parse(kline.interval.as_ref()) else {
246                continue;
247            };
248            if !self.instrument_ids.contains(&kline.instrument_id)
249                || incoming_interval != requested_interval
250            {
251                continue;
252            }
253
254            let spec = self
255                .inner
256                .adapter
257                .as_adapter()
258                .resolve_instrument(&kline.instrument_id)
259                .ok_or_else(|| {
260                    bat_markets_core::MarketError::new(
261                        ErrorKind::Unsupported,
262                        format!(
263                            "unknown instrument {} for kline update",
264                            kline.instrument_id
265                        ),
266                    )
267                })?;
268            let mut unified = kline.to_unified(&spec);
269            unified.interval = Box::<str>::from(requested_interval);
270            return Ok(unified);
271        }
272    }
273}
274
275/// Live OHLCV watcher with typed updates and stream lifecycle control.
276pub struct OhlcvWatch<'a> {
277    updates: OhlcvUpdates<'a>,
278    _lease: PublicSubscriptionLease,
279}
280
281impl<'a> OhlcvWatch<'a> {
282    /// Wait for the next matching OHLCV candle from the live watcher.
283    pub async fn recv(&mut self) -> Result<Kline> {
284        self.updates.recv().await
285    }
286
287    /// Release the shared subscription lease.
288    pub async fn shutdown(self) -> Result<()> {
289        Ok(())
290    }
291
292    /// No-op for shared-hub watchers.
293    ///
294    /// The underlying socket is owned by the subscription hub and may be shared
295    /// with other watchers.
296    pub fn abort(&self) {}
297
298    /// Release the shared subscription lease.
299    pub async fn wait(self) -> Result<()> {
300        Ok(())
301    }
302}
303
304/// Typed ticker subscription receiver over the shared public event bus.
305pub(crate) struct TickerUpdates<'a> {
306    inner: &'a BatMarkets,
307    receiver: broadcast::Receiver<PublicLaneEvent>,
308    instrument_ids: InstrumentFilter,
309}
310
311impl<'a> TickerUpdates<'a> {
312    fn new(
313        inner: &'a BatMarkets,
314        receiver: broadcast::Receiver<PublicLaneEvent>,
315        request: WatchInstrumentsRequest,
316    ) -> Self {
317        Self {
318            inner,
319            receiver,
320            instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
321        }
322    }
323
324    /// Wait for the next matching ticker update.
325    pub async fn recv(&mut self) -> Result<Ticker> {
326        loop {
327            let event = recv_public_event(&mut self.receiver, "ticker subscription").await?;
328
329            let PublicLaneEvent::Ticker(ticker) = event else {
330                continue;
331            };
332            if !self.instrument_ids.contains(&ticker.instrument_id) {
333                continue;
334            }
335
336            let spec = resolve_public_spec(self.inner, &ticker.instrument_id, "ticker")?;
337            return Ok(ticker.to_unified(&spec));
338        }
339    }
340}
341
342/// Live ticker watcher with typed updates and stream lifecycle control.
343pub struct TickerWatch<'a> {
344    updates: TickerUpdates<'a>,
345    _lease: PublicSubscriptionLease,
346}
347
348impl<'a> TickerWatch<'a> {
349    /// Wait for the next matching ticker update from the live watcher.
350    pub async fn recv(&mut self) -> Result<Ticker> {
351        self.updates.recv().await
352    }
353
354    /// Release the shared subscription lease.
355    pub async fn shutdown(self) -> Result<()> {
356        Ok(())
357    }
358
359    /// No-op for shared-hub watchers.
360    pub fn abort(&self) {}
361
362    /// Release the shared subscription lease.
363    pub async fn wait(self) -> Result<()> {
364        Ok(())
365    }
366}
367
368/// Typed trade subscription receiver over the shared public event bus.
369pub(crate) struct TradeUpdates<'a> {
370    inner: &'a BatMarkets,
371    receiver: broadcast::Receiver<PublicLaneEvent>,
372    instrument_ids: InstrumentFilter,
373}
374
375impl<'a> TradeUpdates<'a> {
376    fn new(
377        inner: &'a BatMarkets,
378        receiver: broadcast::Receiver<PublicLaneEvent>,
379        request: WatchInstrumentsRequest,
380    ) -> Self {
381        Self {
382            inner,
383            receiver,
384            instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
385        }
386    }
387
388    /// Wait for the next matching public trade update.
389    pub async fn recv(&mut self) -> Result<TradeTick> {
390        loop {
391            let event = recv_public_event(&mut self.receiver, "trade subscription").await?;
392
393            let PublicLaneEvent::Trade(trade) = event else {
394                continue;
395            };
396            if !self.instrument_ids.contains(&trade.instrument_id) {
397                continue;
398            }
399
400            let spec = resolve_public_spec(self.inner, &trade.instrument_id, "trade")?;
401            return Ok(trade.to_unified(&spec));
402        }
403    }
404}
405
406/// Live trades watcher with typed updates and stream lifecycle control.
407pub struct TradesWatch<'a> {
408    updates: TradeUpdates<'a>,
409    _lease: PublicSubscriptionLease,
410}
411
412impl<'a> TradesWatch<'a> {
413    /// Wait for the next matching public trade update from the live watcher.
414    pub async fn recv(&mut self) -> Result<TradeTick> {
415        self.updates.recv().await
416    }
417
418    /// Release the shared subscription lease.
419    pub async fn shutdown(self) -> Result<()> {
420        Ok(())
421    }
422
423    /// No-op for shared-hub watchers.
424    pub fn abort(&self) {}
425
426    /// Release the shared subscription lease.
427    pub async fn wait(self) -> Result<()> {
428        Ok(())
429    }
430}
431
432/// Typed top-of-book subscription receiver over the shared public event bus.
433#[cfg(test)]
434pub(crate) struct BookTopUpdates<'a> {
435    inner: &'a BatMarkets,
436    receiver: broadcast::Receiver<PublicLaneEvent>,
437    instrument_ids: InstrumentFilter,
438}
439
440#[cfg(test)]
441impl<'a> BookTopUpdates<'a> {
442    fn new(
443        inner: &'a BatMarkets,
444        receiver: broadcast::Receiver<PublicLaneEvent>,
445        request: WatchInstrumentsRequest,
446    ) -> Self {
447        Self {
448            inner,
449            receiver,
450            instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
451        }
452    }
453
454    /// Wait for the next matching top-of-book update.
455    pub async fn recv(&mut self) -> Result<BookTop> {
456        loop {
457            let event = recv_public_event(&mut self.receiver, "book_top subscription").await?;
458
459            let PublicLaneEvent::BookTop(book_top) = event else {
460                continue;
461            };
462            if !self.instrument_ids.contains(&book_top.instrument_id) {
463                continue;
464            }
465
466            let spec = resolve_public_spec(self.inner, &book_top.instrument_id, "book_top")?;
467            return Ok(book_top.to_unified(&spec));
468        }
469    }
470}
471
472/// Typed mark-price subscription receiver over the shared public event bus.
473pub(crate) struct MarkPriceUpdates<'a> {
474    inner: &'a BatMarkets,
475    receiver: broadcast::Receiver<PublicLaneEvent>,
476    instrument_ids: InstrumentFilter,
477}
478
479impl<'a> MarkPriceUpdates<'a> {
480    fn new(
481        inner: &'a BatMarkets,
482        receiver: broadcast::Receiver<PublicLaneEvent>,
483        request: WatchInstrumentsRequest,
484    ) -> Self {
485        Self {
486            inner,
487            receiver,
488            instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
489        }
490    }
491
492    /// Wait for the next matching mark-price update.
493    pub async fn recv(&mut self) -> Result<MarkPrice> {
494        loop {
495            let event = recv_public_event(&mut self.receiver, "mark_price subscription").await?;
496
497            let PublicLaneEvent::MarkPrice(mark_price) = event else {
498                continue;
499            };
500            if !self.instrument_ids.contains(&mark_price.instrument_id) {
501                continue;
502            }
503
504            let spec = resolve_public_spec(self.inner, &mark_price.instrument_id, "mark_price")?;
505            return Ok(mark_price.to_unified(&spec));
506        }
507    }
508}
509
510/// Live mark-price watcher with typed updates and stream lifecycle control.
511pub struct MarkPriceWatch<'a> {
512    updates: MarkPriceUpdates<'a>,
513    _lease: PublicSubscriptionLease,
514}
515
516impl<'a> MarkPriceWatch<'a> {
517    /// Wait for the next matching mark-price update from the live watcher.
518    pub async fn recv(&mut self) -> Result<MarkPrice> {
519        self.updates.recv().await
520    }
521
522    /// Release the shared subscription lease.
523    pub async fn shutdown(self) -> Result<()> {
524        Ok(())
525    }
526}
527
528/// Typed funding-rate subscription receiver over the shared public event bus.
529pub(crate) struct FundingRateUpdates {
530    receiver: broadcast::Receiver<PublicLaneEvent>,
531    instrument_ids: InstrumentFilter,
532}
533
534impl FundingRateUpdates {
535    fn new(
536        receiver: broadcast::Receiver<PublicLaneEvent>,
537        request: WatchInstrumentsRequest,
538    ) -> Self {
539        Self {
540            receiver,
541            instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
542        }
543    }
544
545    /// Wait for the next matching funding-rate update.
546    pub async fn recv(&mut self) -> Result<FundingRate> {
547        loop {
548            let event = recv_public_event(&mut self.receiver, "funding_rate subscription").await?;
549
550            let PublicLaneEvent::FundingRate(funding_rate) = event else {
551                continue;
552            };
553            if !self.instrument_ids.contains(&funding_rate.instrument_id) {
554                continue;
555            }
556            return Ok(funding_rate);
557        }
558    }
559}
560
561/// Live funding-rate watcher with typed updates and stream lifecycle control.
562pub struct FundingRateWatch<'a> {
563    updates: FundingRateUpdates,
564    _lease: PublicSubscriptionLease,
565    _marker: std::marker::PhantomData<&'a BatMarkets>,
566}
567
568impl<'a> FundingRateWatch<'a> {
569    /// Wait for the next matching funding-rate update from the live watcher.
570    pub async fn recv(&mut self) -> Result<FundingRate> {
571        self.updates.recv().await
572    }
573
574    /// Release the shared subscription lease.
575    pub async fn shutdown(self) -> Result<()> {
576        Ok(())
577    }
578}
579
580/// Typed open-interest subscription receiver over the shared public event bus.
581pub(crate) struct OpenInterestUpdates {
582    receiver: broadcast::Receiver<PublicLaneEvent>,
583    instrument_ids: InstrumentFilter,
584}
585
586impl OpenInterestUpdates {
587    fn new(
588        receiver: broadcast::Receiver<PublicLaneEvent>,
589        request: WatchInstrumentsRequest,
590    ) -> Self {
591        Self {
592            receiver,
593            instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
594        }
595    }
596
597    /// Wait for the next matching open-interest update.
598    pub async fn recv(&mut self) -> Result<OpenInterest> {
599        loop {
600            let event = recv_public_event(&mut self.receiver, "open_interest subscription").await?;
601
602            let PublicLaneEvent::OpenInterest(open_interest) = event else {
603                continue;
604            };
605            if !self.instrument_ids.contains(&open_interest.instrument_id) {
606                continue;
607            }
608            return Ok(open_interest);
609        }
610    }
611}
612
613/// Live open-interest watcher with typed updates and stream lifecycle control.
614pub struct OpenInterestWatch<'a> {
615    updates: OpenInterestUpdates,
616    _lease: PublicSubscriptionLease,
617    _marker: std::marker::PhantomData<&'a BatMarkets>,
618}
619
620impl<'a> OpenInterestWatch<'a> {
621    /// Wait for the next matching open-interest update from the live watcher.
622    pub async fn recv(&mut self) -> Result<OpenInterest> {
623        self.updates.recv().await
624    }
625
626    /// Release the shared subscription lease.
627    pub async fn shutdown(self) -> Result<()> {
628        Ok(())
629    }
630}
631
632/// Typed liquidation subscription receiver over the shared public event bus.
633pub(crate) struct LiquidationUpdates<'a> {
634    inner: &'a BatMarkets,
635    receiver: broadcast::Receiver<PublicLaneEvent>,
636    instrument_ids: InstrumentFilter,
637}
638
639impl<'a> LiquidationUpdates<'a> {
640    fn new(
641        inner: &'a BatMarkets,
642        receiver: broadcast::Receiver<PublicLaneEvent>,
643        request: WatchInstrumentsRequest,
644    ) -> Self {
645        Self {
646            inner,
647            receiver,
648            instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
649        }
650    }
651
652    /// Wait for the next matching liquidation update.
653    pub async fn recv(&mut self) -> Result<Liquidation> {
654        loop {
655            let event = recv_public_event(&mut self.receiver, "liquidation subscription").await?;
656
657            let PublicLaneEvent::Liquidation(liquidation) = event else {
658                continue;
659            };
660            if !self.instrument_ids.contains(&liquidation.instrument_id) {
661                continue;
662            }
663
664            let spec = resolve_public_spec(self.inner, &liquidation.instrument_id, "liquidation")?;
665            return Ok(liquidation.to_unified(&spec));
666        }
667    }
668}
669
670/// Live liquidation watcher with typed updates and stream lifecycle control.
671pub struct LiquidationWatch<'a> {
672    updates: LiquidationUpdates<'a>,
673    _lease: PublicSubscriptionLease,
674}
675
676impl<'a> LiquidationWatch<'a> {
677    /// Wait for the next matching liquidation update from the live watcher.
678    pub async fn recv(&mut self) -> Result<Liquidation> {
679        self.updates.recv().await
680    }
681
682    /// Release the shared subscription lease.
683    pub async fn shutdown(self) -> Result<()> {
684        Ok(())
685    }
686}
687
688/// Typed order-book delta subscription receiver over the shared public event bus.
689pub(crate) struct OrderBookUpdates<'a> {
690    inner: &'a BatMarkets,
691    receiver: broadcast::Receiver<PublicLaneEvent>,
692    instrument_id: InstrumentId,
693}
694
695impl<'a> OrderBookUpdates<'a> {
696    fn new(
697        inner: &'a BatMarkets,
698        receiver: broadcast::Receiver<PublicLaneEvent>,
699        request: WatchOrderBookRequest,
700    ) -> Self {
701        Self {
702            inner,
703            receiver,
704            instrument_id: request.instrument_id,
705        }
706    }
707
708    /// Wait for the next matching order-book delta.
709    pub async fn recv(&mut self) -> Result<OrderBookDelta> {
710        loop {
711            let event = recv_public_event(&mut self.receiver, "order_book subscription").await?;
712
713            let PublicLaneEvent::OrderBookDelta(delta) = event else {
714                continue;
715            };
716            if delta.instrument_id != self.instrument_id {
717                continue;
718            }
719
720            let spec = resolve_public_spec(self.inner, &delta.instrument_id, "order_book")?;
721            return Ok(delta.to_unified(&spec));
722        }
723    }
724}
725
726/// Live order-book watcher with typed updates and stream lifecycle control.
727pub struct OrderBookWatch<'a> {
728    updates: OrderBookUpdates<'a>,
729    _lease: PublicSubscriptionLease,
730}
731
732impl<'a> OrderBookWatch<'a> {
733    /// Wait for the next matching order-book delta from the live watcher.
734    pub async fn recv(&mut self) -> Result<OrderBookDelta> {
735        self.updates.recv().await
736    }
737
738    /// Release the shared subscription lease.
739    pub async fn shutdown(self) -> Result<()> {
740        Ok(())
741    }
742}
743
744/// Compact fast-feed receiver for frontend fanout.
745#[cfg(test)]
746pub(crate) struct FastFeedUpdates {
747    receiver: broadcast::Receiver<PublicLaneEvent>,
748    request: WatchFastFeedRequest,
749    instrument_ids: InstrumentFilter,
750}
751
752#[cfg(test)]
753impl FastFeedUpdates {
754    fn new(receiver: broadcast::Receiver<PublicLaneEvent>, request: WatchFastFeedRequest) -> Self {
755        Self {
756            receiver,
757            instrument_ids: InstrumentFilter::from_slice(&request.instrument_ids),
758            request,
759        }
760    }
761
762    /// Wait for the next public-lane event enabled by the fast-feed request.
763    pub async fn recv(&mut self) -> Result<PublicLaneEvent> {
764        loop {
765            let event = recv_public_event(&mut self.receiver, "fast feed").await?;
766
767            let instrument_id = match &event {
768                PublicLaneEvent::Ticker(event) => {
769                    if !self.request.ticker {
770                        continue;
771                    }
772                    &event.instrument_id
773                }
774                PublicLaneEvent::Trade(event) => {
775                    if !self.request.trades {
776                        continue;
777                    }
778                    &event.instrument_id
779                }
780                PublicLaneEvent::BookTop(event) => {
781                    if !self.request.book_top {
782                        continue;
783                    }
784                    &event.instrument_id
785                }
786                PublicLaneEvent::MarkPrice(event) => {
787                    if !self.request.mark_price {
788                        continue;
789                    }
790                    &event.instrument_id
791                }
792                PublicLaneEvent::FundingRate(event) => {
793                    if !self.request.funding_rate {
794                        continue;
795                    }
796                    &event.instrument_id
797                }
798                PublicLaneEvent::OpenInterest(event) => {
799                    if !self.request.open_interest {
800                        continue;
801                    }
802                    &event.instrument_id
803                }
804                PublicLaneEvent::Liquidation(event) => {
805                    if !self.request.liquidations {
806                        continue;
807                    }
808                    &event.instrument_id
809                }
810                _ => continue,
811            };
812
813            if self.instrument_ids.contains(instrument_id) {
814                return Ok(event);
815            }
816        }
817    }
818}
819
820/// Typed order subscription receiver over the shared private event bus.
821pub(crate) struct OrderUpdates {
822    receiver: broadcast::Receiver<PrivateLaneEvent>,
823}
824
825impl OrderUpdates {
826    fn new(receiver: broadcast::Receiver<PrivateLaneEvent>) -> Self {
827        Self { receiver }
828    }
829
830    /// Wait for the next private order update.
831    pub async fn recv(&mut self) -> Result<Order> {
832        loop {
833            let event = recv_private_event(&mut self.receiver, "order subscription").await?;
834            if let PrivateLaneEvent::Order(order) = event {
835                return Ok(order);
836            }
837        }
838    }
839}
840
841/// Live private order watcher with typed updates and stream lifecycle control.
842pub struct OrdersWatch<'a> {
843    updates: OrderUpdates,
844    _lease: PrivateSubscriptionLease,
845    _marker: std::marker::PhantomData<&'a BatMarkets>,
846}
847
848impl<'a> OrdersWatch<'a> {
849    /// Wait for the next private order update from the live watcher.
850    pub async fn recv(&mut self) -> Result<Order> {
851        self.updates.recv().await
852    }
853
854    /// Release the shared private-stream lease.
855    pub async fn shutdown(self) -> Result<()> {
856        Ok(())
857    }
858}
859
860/// Typed execution subscription receiver over the shared private event bus.
861pub(crate) struct ExecutionUpdates {
862    receiver: broadcast::Receiver<PrivateLaneEvent>,
863}
864
865impl ExecutionUpdates {
866    fn new(receiver: broadcast::Receiver<PrivateLaneEvent>) -> Self {
867        Self { receiver }
868    }
869
870    /// Wait for the next private execution update.
871    pub async fn recv(&mut self) -> Result<Execution> {
872        loop {
873            let event = recv_private_event(&mut self.receiver, "execution subscription").await?;
874            if let PrivateLaneEvent::Execution(execution) = event {
875                return Ok(execution);
876            }
877        }
878    }
879}
880
881/// Live private execution watcher with typed updates and stream lifecycle control.
882pub struct ExecutionsWatch<'a> {
883    updates: ExecutionUpdates,
884    _lease: PrivateSubscriptionLease,
885    _marker: std::marker::PhantomData<&'a BatMarkets>,
886}
887
888impl<'a> ExecutionsWatch<'a> {
889    /// Wait for the next private execution update from the live watcher.
890    pub async fn recv(&mut self) -> Result<Execution> {
891        self.updates.recv().await
892    }
893
894    /// Release the shared private-stream lease.
895    pub async fn shutdown(self) -> Result<()> {
896        Ok(())
897    }
898}
899
900/// Typed position subscription receiver over the shared private event bus.
901pub(crate) struct PositionUpdates {
902    receiver: broadcast::Receiver<PrivateLaneEvent>,
903}
904
905impl PositionUpdates {
906    fn new(receiver: broadcast::Receiver<PrivateLaneEvent>) -> Self {
907        Self { receiver }
908    }
909
910    /// Wait for the next private position update.
911    pub async fn recv(&mut self) -> Result<Position> {
912        loop {
913            let event = recv_private_event(&mut self.receiver, "position subscription").await?;
914            if let PrivateLaneEvent::Position(position) = event {
915                return Ok(position);
916            }
917        }
918    }
919}
920
921/// Live private position watcher with typed updates and stream lifecycle control.
922pub struct PositionsWatch<'a> {
923    updates: PositionUpdates,
924    _lease: PrivateSubscriptionLease,
925    _marker: std::marker::PhantomData<&'a BatMarkets>,
926}
927
928impl<'a> PositionsWatch<'a> {
929    /// Wait for the next private position update from the live watcher.
930    pub async fn recv(&mut self) -> Result<Position> {
931        self.updates.recv().await
932    }
933
934    /// Release the shared private-stream lease.
935    pub async fn shutdown(self) -> Result<()> {
936        Ok(())
937    }
938}
939
940/// Typed balance subscription receiver over the shared private event bus.
941pub(crate) struct BalanceUpdates {
942    receiver: broadcast::Receiver<PrivateLaneEvent>,
943}
944
945impl BalanceUpdates {
946    fn new(receiver: broadcast::Receiver<PrivateLaneEvent>) -> Self {
947        Self { receiver }
948    }
949
950    /// Wait for the next private balance update.
951    pub async fn recv(&mut self) -> Result<Balance> {
952        loop {
953            let event = recv_private_event(&mut self.receiver, "balance subscription").await?;
954            if let PrivateLaneEvent::Balance(balance) = event {
955                return Ok(balance);
956            }
957        }
958    }
959}
960
961/// Live private balance watcher with typed updates and stream lifecycle control.
962pub struct BalancesWatch<'a> {
963    updates: BalanceUpdates,
964    _lease: PrivateSubscriptionLease,
965    _marker: std::marker::PhantomData<&'a BatMarkets>,
966}
967
968impl<'a> BalancesWatch<'a> {
969    /// Wait for the next private balance update from the live watcher.
970    pub async fn recv(&mut self) -> Result<Balance> {
971        self.updates.recv().await
972    }
973
974    /// Release the shared private-stream lease.
975    pub async fn shutdown(self) -> Result<()> {
976        Ok(())
977    }
978}
979
980fn normalize_interval_box(interval: Box<str>) -> Box<str> {
981    KlineInterval::parse(interval.as_ref())
982        .map(Into::into)
983        .unwrap_or(interval)
984}
985
986fn resolve_public_spec(
987    inner: &BatMarkets,
988    instrument_id: &InstrumentId,
989    event_name: &str,
990) -> Result<bat_markets_core::InstrumentSpec> {
991    inner
992        .adapter
993        .as_adapter()
994        .resolve_instrument(instrument_id)
995        .ok_or_else(|| {
996            bat_markets_core::MarketError::new(
997                ErrorKind::Unsupported,
998                format!("unknown instrument {instrument_id} for {event_name} update"),
999            )
1000        })
1001}
1002
1003fn parse_watch_interval(raw: &str) -> Result<KlineInterval> {
1004    KlineInterval::parse(raw).ok_or_else(|| {
1005        bat_markets_core::MarketError::new(
1006            ErrorKind::Unsupported,
1007            format!("unsupported OHLCV interval '{raw}'"),
1008        )
1009    })
1010}
1011
1012/// Public market-data lane ingestion.
1013pub(crate) struct PublicLaneClient<'a> {
1014    inner: &'a BatMarkets,
1015}
1016
1017impl<'a> PublicLaneClient<'a> {
1018    /// Decode a venue public websocket payload, publish events, and update state.
1019    ///
1020    /// This is primarily for fixtures, replay tools, and custom transports. Live
1021    /// applications usually prefer `watch_*` methods.
1022    #[cfg(test)]
1023    pub fn ingest_json(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
1024        let events = self.inner.adapter.as_adapter().parse_public(payload)?;
1025        self.inner.shared.apply_public_events(&events)?;
1026        Ok(events)
1027    }
1028
1029    /// Subscribe to fast public-lane events emitted by fixture ingest or live runtime.
1030    #[must_use]
1031    pub fn subscribe(&self) -> broadcast::Receiver<PublicLaneEvent> {
1032        self.inner.shared.subscribe_public_events()
1033    }
1034
1035    /// Subscribe to typed ticker updates already flowing through the public lane.
1036    #[must_use]
1037    pub fn subscribe_ticker(&self, request: WatchInstrumentsRequest) -> TickerUpdates<'a> {
1038        TickerUpdates::new(self.inner, self.subscribe(), request)
1039    }
1040
1041    /// Subscribe to typed trade updates already flowing through the public lane.
1042    #[must_use]
1043    pub fn subscribe_trades(&self, request: WatchInstrumentsRequest) -> TradeUpdates<'a> {
1044        TradeUpdates::new(self.inner, self.subscribe(), request)
1045    }
1046
1047    /// Subscribe to typed top-of-book updates already flowing through the public lane.
1048    #[must_use]
1049    #[cfg(test)]
1050    pub fn subscribe_book_top(&self, request: WatchInstrumentsRequest) -> BookTopUpdates<'a> {
1051        BookTopUpdates::new(self.inner, self.subscribe(), request)
1052    }
1053
1054    /// Subscribe to typed mark-price updates already flowing through the public lane.
1055    #[must_use]
1056    pub fn subscribe_mark_prices(&self, request: WatchInstrumentsRequest) -> MarkPriceUpdates<'a> {
1057        MarkPriceUpdates::new(self.inner, self.subscribe(), request)
1058    }
1059
1060    /// Subscribe to typed funding-rate updates already flowing through the public lane.
1061    #[must_use]
1062    pub fn subscribe_funding_rates(&self, request: WatchInstrumentsRequest) -> FundingRateUpdates {
1063        FundingRateUpdates::new(self.subscribe(), request)
1064    }
1065
1066    /// Subscribe to typed open-interest updates already flowing through the public lane.
1067    #[must_use]
1068    pub fn subscribe_open_interest(&self, request: WatchInstrumentsRequest) -> OpenInterestUpdates {
1069        OpenInterestUpdates::new(self.subscribe(), request)
1070    }
1071
1072    /// Subscribe to typed order-book deltas already flowing through the public lane.
1073    #[must_use]
1074    pub fn subscribe_order_book(&self, request: WatchOrderBookRequest) -> OrderBookUpdates<'a> {
1075        OrderBookUpdates::new(self.inner, self.subscribe(), request)
1076    }
1077
1078    /// Subscribe to typed liquidation updates already flowing through the public lane.
1079    #[must_use]
1080    pub fn subscribe_liquidations(
1081        &self,
1082        request: WatchInstrumentsRequest,
1083    ) -> LiquidationUpdates<'a> {
1084        LiquidationUpdates::new(self.inner, self.subscribe(), request)
1085    }
1086
1087    /// Subscribe to typed OHLCV updates already flowing through the public lane.
1088    #[must_use]
1089    pub fn subscribe_ohlcv(&self, request: WatchOhlcvRequest) -> OhlcvUpdates<'a> {
1090        OhlcvUpdates::new(self.inner, self.subscribe(), request)
1091    }
1092
1093    /// Subscribe to a compact multi-topic fast feed already flowing through the public lane.
1094    #[must_use]
1095    #[cfg(test)]
1096    pub fn subscribe_fast(&self, request: WatchFastFeedRequest) -> FastFeedUpdates {
1097        FastFeedUpdates::new(self.subscribe(), request)
1098    }
1099
1100    /// Spawn a reconnecting live public-stream runner.
1101    ///
1102    /// Prefer `watch_*` / `subscribe_fast(...)` in applications so the subscription hub can
1103    /// preserve shared subscriptions and avoid accidental duplicate sockets.
1104    /// Spawn a reconnecting live ticker watcher for one or many instruments.
1105    pub async fn watch_ticker(&self, request: WatchInstrumentsRequest) -> Result<TickerWatch<'a>> {
1106        let updates = self.subscribe_ticker(request.clone());
1107        let lease = self
1108            .inner
1109            .subscription_hubs
1110            .public
1111            .acquire(PublicSubscription {
1112                instrument_ids: request.instrument_ids,
1113                ticker: true,
1114                trades: false,
1115                book_top: false,
1116                order_book: false,
1117                mark_price: false,
1118                funding_rate: false,
1119                open_interest: false,
1120                liquidations: false,
1121                kline_intervals: Vec::new(),
1122            })
1123            .await?;
1124        Ok(TickerWatch {
1125            updates,
1126            _lease: lease,
1127        })
1128    }
1129
1130    /// Spawn a reconnecting live trade watcher for one or many instruments.
1131    pub async fn watch_trades(&self, request: WatchInstrumentsRequest) -> Result<TradesWatch<'a>> {
1132        let updates = self.subscribe_trades(request.clone());
1133        let lease = self
1134            .inner
1135            .subscription_hubs
1136            .public
1137            .acquire(PublicSubscription {
1138                instrument_ids: request.instrument_ids,
1139                ticker: false,
1140                trades: true,
1141                book_top: false,
1142                order_book: false,
1143                mark_price: false,
1144                funding_rate: false,
1145                open_interest: false,
1146                liquidations: false,
1147                kline_intervals: Vec::new(),
1148            })
1149            .await?;
1150        Ok(TradesWatch {
1151            updates,
1152            _lease: lease,
1153        })
1154    }
1155
1156    /// Spawn a reconnecting live mark-price watcher for one or many instruments.
1157    pub async fn watch_mark_prices(
1158        &self,
1159        request: WatchInstrumentsRequest,
1160    ) -> Result<MarkPriceWatch<'a>> {
1161        let updates = self.subscribe_mark_prices(request.clone());
1162        let lease = self
1163            .inner
1164            .subscription_hubs
1165            .public
1166            .acquire(PublicSubscription {
1167                instrument_ids: request.instrument_ids,
1168                ticker: false,
1169                trades: false,
1170                book_top: false,
1171                order_book: false,
1172                mark_price: true,
1173                funding_rate: false,
1174                open_interest: false,
1175                liquidations: false,
1176                kline_intervals: Vec::new(),
1177            })
1178            .await?;
1179        Ok(MarkPriceWatch {
1180            updates,
1181            _lease: lease,
1182        })
1183    }
1184
1185    /// Spawn a reconnecting live funding-rate watcher for one or many instruments.
1186    pub async fn watch_funding_rates(
1187        &self,
1188        request: WatchInstrumentsRequest,
1189    ) -> Result<FundingRateWatch<'a>> {
1190        let updates = self.subscribe_funding_rates(request.clone());
1191        let lease = self
1192            .inner
1193            .subscription_hubs
1194            .public
1195            .acquire(PublicSubscription {
1196                instrument_ids: request.instrument_ids,
1197                ticker: false,
1198                trades: false,
1199                book_top: false,
1200                order_book: false,
1201                mark_price: false,
1202                funding_rate: true,
1203                open_interest: false,
1204                liquidations: false,
1205                kline_intervals: Vec::new(),
1206            })
1207            .await?;
1208        Ok(FundingRateWatch {
1209            updates,
1210            _lease: lease,
1211            _marker: std::marker::PhantomData,
1212        })
1213    }
1214
1215    /// Spawn a reconnecting live open-interest watcher for one or many instruments.
1216    pub async fn watch_open_interest(
1217        &self,
1218        request: WatchInstrumentsRequest,
1219    ) -> Result<OpenInterestWatch<'a>> {
1220        let updates = self.subscribe_open_interest(request.clone());
1221        let lease = self
1222            .inner
1223            .subscription_hubs
1224            .public
1225            .acquire(PublicSubscription {
1226                instrument_ids: request.instrument_ids,
1227                ticker: false,
1228                trades: false,
1229                book_top: false,
1230                order_book: false,
1231                mark_price: false,
1232                funding_rate: false,
1233                open_interest: true,
1234                liquidations: false,
1235                kline_intervals: Vec::new(),
1236            })
1237            .await?;
1238        Ok(OpenInterestWatch {
1239            updates,
1240            _lease: lease,
1241            _marker: std::marker::PhantomData,
1242        })
1243    }
1244
1245    /// Spawn a reconnecting focused order-book watcher.
1246    pub async fn watch_order_book(
1247        &self,
1248        request: WatchOrderBookRequest,
1249    ) -> Result<OrderBookWatch<'a>> {
1250        let updates = self.subscribe_order_book(request.clone());
1251        let lease = self
1252            .inner
1253            .subscription_hubs
1254            .public
1255            .acquire(PublicSubscription {
1256                instrument_ids: vec![request.instrument_id],
1257                ticker: false,
1258                trades: false,
1259                book_top: false,
1260                order_book: true,
1261                mark_price: false,
1262                funding_rate: false,
1263                open_interest: false,
1264                liquidations: false,
1265                kline_intervals: Vec::new(),
1266            })
1267            .await?;
1268        Ok(OrderBookWatch {
1269            updates,
1270            _lease: lease,
1271        })
1272    }
1273
1274    /// Spawn a reconnecting liquidation watcher for one or many instruments.
1275    pub async fn watch_liquidations(
1276        &self,
1277        request: WatchInstrumentsRequest,
1278    ) -> Result<LiquidationWatch<'a>> {
1279        let updates = self.subscribe_liquidations(request.clone());
1280        let lease = self
1281            .inner
1282            .subscription_hubs
1283            .public
1284            .acquire(PublicSubscription {
1285                instrument_ids: request.instrument_ids,
1286                ticker: false,
1287                trades: false,
1288                book_top: false,
1289                order_book: false,
1290                mark_price: false,
1291                funding_rate: false,
1292                open_interest: false,
1293                liquidations: true,
1294                kline_intervals: Vec::new(),
1295            })
1296            .await?;
1297        Ok(LiquidationWatch {
1298            updates,
1299            _lease: lease,
1300        })
1301    }
1302
1303    /// Spawn a reconnecting live OHLCV watcher for one or many instruments.
1304    ///
1305    /// Intervals are accepted in canonical notation such as `1m`, `5m`, `1h`,
1306    /// `1d`, `1w`, and `1M`.
1307    ///
1308    /// ```no_run
1309    /// use bat_markets::{
1310    ///     BatMarkets,
1311    ///     errors::Result,
1312    ///     types::{InstrumentId, Product, Venue},
1313    /// };
1314    ///
1315    /// # #[tokio::main]
1316    /// # async fn main() -> Result<()> {
1317    /// let client = BatMarkets::builder()
1318    ///     .venue(Venue::Bybit)
1319    ///     .product(Product::LinearUsdt)
1320    ///     .build_live()
1321    ///     .await?;
1322    ///
1323    /// let mut watch = client
1324    ///     .watch_ohlcv_for_symbols(
1325    ///         vec![
1326    ///             InstrumentId::from("BTC/USDT:USDT"),
1327    ///             InstrumentId::from("ETH/USDT:USDT"),
1328    ///         ],
1329    ///         "1m",
1330    ///     )
1331    ///     .await?;
1332    ///
1333    /// let candle = watch.recv().await?;
1334    /// println!("{} {}", candle.instrument_id, candle.close);
1335    /// watch.shutdown().await?;
1336    /// # Ok(())
1337    /// # }
1338    /// ```
1339    pub async fn watch_ohlcv(&self, request: WatchOhlcvRequest) -> Result<OhlcvWatch<'a>> {
1340        let updates = self.subscribe_ohlcv(request.clone());
1341        let lease = self
1342            .inner
1343            .subscription_hubs
1344            .public
1345            .acquire(request.into_public_subscription())
1346            .await?;
1347        Ok(OhlcvWatch {
1348            updates,
1349            _lease: lease,
1350        })
1351    }
1352
1353    /// Alias with plural naming for consistency with multi-symbol streams.
1354    pub async fn watch_tickers(&self, request: WatchInstrumentsRequest) -> Result<TickerWatch<'a>> {
1355        self.watch_ticker(request).await
1356    }
1357}
1358
1359/// Private state-lane ingestion.
1360pub(crate) struct PrivateLaneClient<'a> {
1361    inner: &'a BatMarkets,
1362}
1363
1364impl<'a> PrivateLaneClient<'a> {
1365    /// Decode a venue private websocket payload, publish events, and update state.
1366    ///
1367    /// This is primarily for fixtures, replay tools, and custom transports. Live
1368    /// applications usually prefer `watch_*` methods.
1369    #[cfg(test)]
1370    pub fn ingest_json(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
1371        let events = self.inner.adapter.as_adapter().parse_private(payload)?;
1372        self.inner.shared.apply_private_events(&events);
1373        Ok(events)
1374    }
1375
1376    /// Subscribe to fast private-lane events emitted by fixture ingest or live runtime.
1377    #[must_use]
1378    pub fn subscribe(&self) -> broadcast::Receiver<PrivateLaneEvent> {
1379        self.inner.shared.subscribe_private_events()
1380    }
1381
1382    /// Subscribe to typed order updates already flowing through the private lane.
1383    #[must_use]
1384    pub fn subscribe_orders(&self) -> OrderUpdates {
1385        OrderUpdates::new(self.subscribe())
1386    }
1387
1388    /// Subscribe to typed execution updates already flowing through the private lane.
1389    #[must_use]
1390    pub fn subscribe_executions(&self) -> ExecutionUpdates {
1391        ExecutionUpdates::new(self.subscribe())
1392    }
1393
1394    /// Subscribe to typed position updates already flowing through the private lane.
1395    #[must_use]
1396    pub fn subscribe_positions(&self) -> PositionUpdates {
1397        PositionUpdates::new(self.subscribe())
1398    }
1399
1400    /// Subscribe to typed balance updates already flowing through the private lane.
1401    #[must_use]
1402    pub fn subscribe_balances(&self) -> BalanceUpdates {
1403        BalanceUpdates::new(self.subscribe())
1404    }
1405
1406    /// Spawn a reconnecting live private-stream runner.
1407    ///
1408    /// Prefer `watch_*` in applications so the private subscription hub preserves one shared
1409    /// account stream.
1410    /// Acquire the shared private stream and receive typed order updates.
1411    pub async fn watch_orders(&self) -> Result<OrdersWatch<'a>> {
1412        let updates = self.subscribe_orders();
1413        let lease = self.inner.subscription_hubs.private.acquire().await?;
1414        Ok(OrdersWatch {
1415            updates,
1416            _lease: lease,
1417            _marker: std::marker::PhantomData,
1418        })
1419    }
1420
1421    /// Acquire the shared private stream and receive typed execution updates.
1422    pub async fn watch_executions(&self) -> Result<ExecutionsWatch<'a>> {
1423        let updates = self.subscribe_executions();
1424        let lease = self.inner.subscription_hubs.private.acquire().await?;
1425        Ok(ExecutionsWatch {
1426            updates,
1427            _lease: lease,
1428            _marker: std::marker::PhantomData,
1429        })
1430    }
1431
1432    /// Acquire the shared private stream and receive typed position updates.
1433    pub async fn watch_positions(&self) -> Result<PositionsWatch<'a>> {
1434        let updates = self.subscribe_positions();
1435        let lease = self.inner.subscription_hubs.private.acquire().await?;
1436        Ok(PositionsWatch {
1437            updates,
1438            _lease: lease,
1439            _marker: std::marker::PhantomData,
1440        })
1441    }
1442
1443    /// Acquire the shared private stream and receive typed balance updates.
1444    pub async fn watch_balances(&self) -> Result<BalancesWatch<'a>> {
1445        let updates = self.subscribe_balances();
1446        let lease = self.inner.subscription_hubs.private.acquire().await?;
1447        Ok(BalancesWatch {
1448            updates,
1449            _lease: lease,
1450            _marker: std::marker::PhantomData,
1451        })
1452    }
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457    use tokio::sync::broadcast;
1458    use tokio::time::{Duration, timeout};
1459
1460    use bat_markets_core::{InstrumentId, Product, PublicLaneEvent, Venue};
1461
1462    use crate::BatMarketsBuilder;
1463    use bat_markets_core::{WatchFastFeedRequest, WatchOrderBookRequest};
1464
1465    use super::{
1466        InstrumentFilter, WatchInstrumentsRequest, WatchOhlcvRequest, private_lane, public_lane,
1467        recv_public_event,
1468    };
1469
1470    macro_rules! fixture {
1471        ($venue:literal, $file:literal) => {
1472            include_str!(concat!(
1473                env!("CARGO_MANIFEST_DIR"),
1474                "/../../fixtures/",
1475                $venue,
1476                "/",
1477                $file
1478            ))
1479        };
1480    }
1481
1482    const BINANCE_PUBLIC_TRADE: &str = fixture!("binance", "public_trade.json");
1483    const BINANCE_PUBLIC_TICKER: &str = fixture!("binance", "public_ticker.json");
1484    const BINANCE_PUBLIC_BOOK_TICKER: &str = fixture!("binance", "public_book_ticker.json");
1485    const BINANCE_PUBLIC_KLINE: &str = fixture!("binance", "public_kline.json");
1486    const BINANCE_PUBLIC_MARK_PRICE: &str = fixture!("binance", "public_mark_price.json");
1487    const BINANCE_PUBLIC_LIQUIDATION: &str = fixture!("binance", "public_liquidation.json");
1488    const BYBIT_PUBLIC_ORDERBOOK: &str = fixture!("bybit", "public_orderbook.json");
1489    const BYBIT_PRIVATE_ORDER: &str = fixture!("bybit", "private_order.json");
1490    const BYBIT_PRIVATE_EXECUTION: &str = fixture!("bybit", "private_execution.json");
1491    const BYBIT_PRIVATE_WALLET: &str = fixture!("bybit", "private_wallet.json");
1492
1493    #[test]
1494    fn instrument_filter_uses_single_fast_path_and_many_membership() {
1495        let btc = InstrumentId::from("BTC/USDT:USDT");
1496        let eth = InstrumentId::from("ETH/USDT:USDT");
1497        let sol = InstrumentId::from("SOL/USDT:USDT");
1498
1499        let single = InstrumentFilter::from_vec(vec![btc.clone()]);
1500        assert!(matches!(single, InstrumentFilter::Single(_)));
1501        assert!(single.contains(&btc));
1502        assert!(!single.contains(&eth));
1503
1504        let many = InstrumentFilter::from_vec(vec![btc.clone(), eth.clone()]);
1505        assert!(matches!(many, InstrumentFilter::Many(_)));
1506        assert!(many.contains(&btc));
1507        assert!(many.contains(&eth));
1508        assert!(!many.contains(&sol));
1509    }
1510
1511    #[tokio::test]
1512    async fn public_subscribe_receives_fixture_ingest_events() {
1513        let client = BatMarketsBuilder::default()
1514            .venue(Venue::Binance)
1515            .product(Product::LinearUsdt)
1516            .build()
1517            .expect("fixture client should build");
1518        let mut receiver = public_lane(&client).subscribe();
1519
1520        let events = public_lane(&client)
1521            .ingest_json(BINANCE_PUBLIC_TRADE)
1522            .expect("fixture payload should parse");
1523        assert!(!events.is_empty());
1524
1525        let received = timeout(Duration::from_secs(1), receiver.recv())
1526            .await
1527            .expect("public event should arrive")
1528            .expect("receiver should stay open");
1529
1530        assert!(matches!(received, PublicLaneEvent::Trade(_)));
1531    }
1532
1533    #[tokio::test]
1534    async fn subscribe_ticker_receives_typed_updates() {
1535        let client = BatMarketsBuilder::default()
1536            .venue(Venue::Binance)
1537            .product(Product::LinearUsdt)
1538            .build()
1539            .expect("fixture client should build");
1540        let mut updates = public_lane(&client).subscribe_ticker(
1541            WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1542        );
1543
1544        public_lane(&client)
1545            .ingest_json(BINANCE_PUBLIC_TICKER)
1546            .expect("fixture ticker should parse");
1547
1548        let received = timeout(Duration::from_secs(1), updates.recv())
1549            .await
1550            .expect("typed ticker should arrive")
1551            .expect("typed ticker should parse");
1552
1553        assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1554        assert_eq!(received.last_price.to_string(), "70100.50");
1555    }
1556
1557    #[tokio::test]
1558    async fn subscribe_trades_receives_typed_updates() {
1559        let client = BatMarketsBuilder::default()
1560            .venue(Venue::Binance)
1561            .product(Product::LinearUsdt)
1562            .build()
1563            .expect("fixture client should build");
1564        let mut updates = public_lane(&client).subscribe_trades(
1565            WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1566        );
1567
1568        public_lane(&client)
1569            .ingest_json(BINANCE_PUBLIC_TRADE)
1570            .expect("fixture trade should parse");
1571
1572        let received = timeout(Duration::from_secs(1), updates.recv())
1573            .await
1574            .expect("typed trade should arrive")
1575            .expect("typed trade should parse");
1576
1577        assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1578        assert!(!received.trade_id.as_ref().is_empty());
1579    }
1580
1581    #[tokio::test]
1582    async fn recv_public_event_skips_lagged_updates() {
1583        let client = BatMarketsBuilder::default()
1584            .venue(Venue::Binance)
1585            .product(Product::LinearUsdt)
1586            .build()
1587            .expect("fixture client should build");
1588        let sample_event = public_lane(&client)
1589            .ingest_json(BINANCE_PUBLIC_TICKER)
1590            .expect("fixture ticker should parse")
1591            .into_iter()
1592            .next()
1593            .expect("ticker fixture should emit one event");
1594
1595        let (tx, mut receiver) = broadcast::channel(2);
1596        tx.send(sample_event.clone())
1597            .expect("first event should enter channel");
1598        tx.send(sample_event.clone())
1599            .expect("second event should enter channel");
1600        tx.send(sample_event)
1601            .expect("third event should enter channel and force lag");
1602
1603        let received = timeout(
1604            Duration::from_secs(1),
1605            recv_public_event(&mut receiver, "public test subscription"),
1606        )
1607        .await
1608        .expect("lagged public receive should still complete")
1609        .expect("lagged public receive should retry and succeed");
1610
1611        assert!(matches!(received, PublicLaneEvent::Ticker(_)));
1612    }
1613
1614    #[tokio::test]
1615    async fn subscribe_book_top_receives_typed_updates() {
1616        let client = BatMarketsBuilder::default()
1617            .venue(Venue::Binance)
1618            .product(Product::LinearUsdt)
1619            .build()
1620            .expect("fixture client should build");
1621        let mut updates = public_lane(&client).subscribe_book_top(
1622            WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1623        );
1624
1625        public_lane(&client)
1626            .ingest_json(BINANCE_PUBLIC_BOOK_TICKER)
1627            .expect("fixture book ticker should parse");
1628
1629        let received = timeout(Duration::from_secs(1), updates.recv())
1630            .await
1631            .expect("typed book top should arrive")
1632            .expect("typed book top should parse");
1633
1634        assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1635        assert_eq!(received.bid.price.to_string(), "70100.90");
1636        assert_eq!(received.ask.price.to_string(), "70101.10");
1637    }
1638
1639    #[tokio::test]
1640    async fn subscribe_ohlcv_receives_typed_kline_updates() {
1641        let client = BatMarketsBuilder::default()
1642            .venue(Venue::Binance)
1643            .product(Product::LinearUsdt)
1644            .build()
1645            .expect("fixture client should build");
1646        let mut updates = public_lane(&client).subscribe_ohlcv(WatchOhlcvRequest::for_instrument(
1647            InstrumentId::from("BTC/USDT:USDT"),
1648            "1m",
1649        ));
1650
1651        public_lane(&client)
1652            .ingest_json(BINANCE_PUBLIC_KLINE)
1653            .expect("fixture kline should parse");
1654
1655        let received = timeout(Duration::from_secs(1), updates.recv())
1656            .await
1657            .expect("typed kline should arrive")
1658            .expect("typed kline should parse");
1659
1660        assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1661        assert_eq!(received.interval.as_ref(), "1m");
1662    }
1663
1664    #[tokio::test]
1665    async fn subscribe_ohlcv_filters_symbols_before_yielding() {
1666        let client = BatMarketsBuilder::default()
1667            .venue(Venue::Binance)
1668            .product(Product::LinearUsdt)
1669            .build()
1670            .expect("fixture client should build");
1671        let mut updates = public_lane(&client).subscribe_ohlcv(WatchOhlcvRequest::for_instrument(
1672            InstrumentId::from("BTC/USDT:USDT"),
1673            "1m",
1674        ));
1675
1676        public_lane(&client)
1677            .ingest_json(
1678                r#"{
1679                    "e":"kline",
1680                    "E":1710000002000,
1681                    "s":"ETHUSDT",
1682                    "k":{
1683                        "i":"1m",
1684                        "t":1710000000000,
1685                        "T":1710000059999,
1686                        "o":"3200.00",
1687                        "h":"3210.00",
1688                        "l":"3195.00",
1689                        "c":"3205.00",
1690                        "v":"42.0",
1691                        "x":false
1692                    }
1693                }"#,
1694            )
1695            .expect("eth kline should parse");
1696        public_lane(&client)
1697            .ingest_json(BINANCE_PUBLIC_KLINE)
1698            .expect("btc kline should parse");
1699
1700        let received = timeout(Duration::from_secs(1), updates.recv())
1701            .await
1702            .expect("filtered btc kline should arrive")
1703            .expect("filtered btc kline should parse");
1704
1705        assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1706    }
1707
1708    #[tokio::test]
1709    async fn subscribe_mark_prices_receives_typed_updates() {
1710        let client = BatMarketsBuilder::default()
1711            .venue(Venue::Binance)
1712            .product(Product::LinearUsdt)
1713            .build()
1714            .expect("fixture client should build");
1715        let mut updates = public_lane(&client).subscribe_mark_prices(
1716            WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1717        );
1718
1719        public_lane(&client)
1720            .ingest_json(BINANCE_PUBLIC_MARK_PRICE)
1721            .expect("fixture mark price should parse");
1722
1723        let received = timeout(Duration::from_secs(1), updates.recv())
1724            .await
1725            .expect("typed mark price should arrive")
1726            .expect("typed mark price should parse");
1727
1728        assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1729    }
1730
1731    #[tokio::test]
1732    async fn subscribe_fast_filters_multi_topic_updates() {
1733        let client = BatMarketsBuilder::default()
1734            .venue(Venue::Binance)
1735            .product(Product::LinearUsdt)
1736            .build()
1737            .expect("fixture client should build");
1738        let mut updates = public_lane(&client).subscribe_fast(WatchFastFeedRequest {
1739            instrument_ids: vec![InstrumentId::from("BTC/USDT:USDT")],
1740            ticker: false,
1741            trades: false,
1742            book_top: false,
1743            mark_price: true,
1744            funding_rate: false,
1745            open_interest: false,
1746            liquidations: false,
1747        });
1748
1749        public_lane(&client)
1750            .ingest_json(BINANCE_PUBLIC_TICKER)
1751            .expect("fixture ticker should parse");
1752        public_lane(&client)
1753            .ingest_json(BINANCE_PUBLIC_MARK_PRICE)
1754            .expect("fixture mark price should parse");
1755
1756        let received = timeout(Duration::from_secs(1), updates.recv())
1757            .await
1758            .expect("fast feed mark price should arrive")
1759            .expect("fast feed event should parse");
1760
1761        assert!(matches!(received, PublicLaneEvent::MarkPrice(_)));
1762    }
1763
1764    #[tokio::test]
1765    async fn subscribe_order_book_receives_typed_updates() {
1766        let client = BatMarketsBuilder::default()
1767            .venue(Venue::Bybit)
1768            .product(Product::LinearUsdt)
1769            .build()
1770            .expect("fixture client should build");
1771        let mut updates = public_lane(&client).subscribe_order_book(WatchOrderBookRequest::new(
1772            InstrumentId::from("BTC/USDT:USDT"),
1773            Some(50),
1774        ));
1775
1776        public_lane(&client)
1777            .ingest_json(BYBIT_PUBLIC_ORDERBOOK)
1778            .expect("fixture orderbook should parse");
1779
1780        let received = timeout(Duration::from_secs(1), updates.recv())
1781            .await
1782            .expect("typed order book should arrive")
1783            .expect("typed order book should parse");
1784
1785        assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1786        assert!(!received.bids.is_empty());
1787        assert!(!received.asks.is_empty());
1788    }
1789
1790    #[tokio::test]
1791    async fn subscribe_liquidations_receives_typed_updates() {
1792        let client = BatMarketsBuilder::default()
1793            .venue(Venue::Binance)
1794            .product(Product::LinearUsdt)
1795            .build()
1796            .expect("fixture client should build");
1797        let mut updates = public_lane(&client).subscribe_liquidations(
1798            WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1799        );
1800
1801        public_lane(&client)
1802            .ingest_json(BINANCE_PUBLIC_LIQUIDATION)
1803            .expect("fixture liquidation should parse");
1804
1805        let received = timeout(Duration::from_secs(1), updates.recv())
1806            .await
1807            .expect("typed liquidation should arrive")
1808            .expect("typed liquidation should parse");
1809
1810        assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1811        assert_eq!(received.side, bat_markets_core::Side::Sell);
1812        assert_eq!(received.quantity.to_string(), "0.014");
1813    }
1814
1815    #[tokio::test]
1816    async fn private_subscribe_order_execution_and_balance_receive_fixture_updates() {
1817        let client = BatMarketsBuilder::default()
1818            .venue(Venue::Bybit)
1819            .product(Product::LinearUsdt)
1820            .build()
1821            .expect("fixture client should build");
1822        let mut orders = private_lane(&client).subscribe_orders();
1823        let mut executions = private_lane(&client).subscribe_executions();
1824        let mut balances = private_lane(&client).subscribe_balances();
1825
1826        private_lane(&client)
1827            .ingest_json(BYBIT_PRIVATE_ORDER)
1828            .expect("fixture private order should parse");
1829        private_lane(&client)
1830            .ingest_json(BYBIT_PRIVATE_EXECUTION)
1831            .expect("fixture private execution should parse");
1832        private_lane(&client)
1833            .ingest_json(BYBIT_PRIVATE_WALLET)
1834            .expect("fixture private wallet should parse");
1835
1836        let order = timeout(Duration::from_secs(1), orders.recv())
1837            .await
1838            .expect("order update should arrive")
1839            .expect("order update should parse");
1840        let execution = timeout(Duration::from_secs(1), executions.recv())
1841            .await
1842            .expect("execution update should arrive")
1843            .expect("execution update should parse");
1844        let balance = timeout(Duration::from_secs(1), balances.recv())
1845            .await
1846            .expect("balance update should arrive")
1847            .expect("balance update should parse");
1848
1849        assert_eq!(order.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1850        assert_eq!(execution.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1851        assert_eq!(balance.asset.to_string(), "USDT");
1852    }
1853}