1use std::collections::HashSet;
2
3use tokio::{
4 sync::{broadcast, oneshot},
5 task::JoinHandle,
6};
7
8use bat_markets_core::{
9 Balance, ErrorKind, Execution, FundingRate, InstrumentId, Kline, KlineInterval, Liquidation,
10 MarkPrice, OpenInterest, Order, OrderBookDelta, Position, PrivateLaneEvent, PublicLaneEvent,
11 Result, Ticker, TradeTick, WatchOrderBookRequest,
12};
13#[cfg(test)]
14use bat_markets_core::{BookTop, WatchFastFeedRequest};
15
16use crate::{
17 client::BatMarkets,
18 subscriptions::{PrivateSubscriptionLease, PublicSubscriptionLease},
19};
20
21pub(crate) const fn public_lane(inner: &BatMarkets) -> PublicLaneClient<'_> {
22 PublicLaneClient { inner }
23}
24
25pub(crate) const fn private_lane(inner: &BatMarkets) -> PrivateLaneClient<'_> {
26 PrivateLaneClient { inner }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
31pub(crate) struct PublicSubscription {
32 pub instrument_ids: Vec<InstrumentId>,
34 pub ticker: bool,
36 pub trades: bool,
38 pub book_top: bool,
40 pub order_book: bool,
42 pub mark_price: bool,
44 pub funding_rate: bool,
46 pub open_interest: bool,
48 pub liquidations: bool,
50 pub kline_intervals: Vec<Box<str>>,
52}
53
54#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
56pub(crate) enum PublicStreamRoute {
57 Default,
59 BinanceMarket,
61 BinancePublic,
63}
64
65#[derive(Clone, Debug, PartialEq, Eq)]
67pub(crate) struct WatchInstrumentsRequest {
68 pub instrument_ids: Vec<InstrumentId>,
70}
71
72impl WatchInstrumentsRequest {
73 #[must_use]
75 pub fn for_instrument(instrument_id: InstrumentId) -> Self {
76 Self {
77 instrument_ids: vec![instrument_id],
78 }
79 }
80
81 #[must_use]
83 pub fn for_instruments(instrument_ids: Vec<InstrumentId>) -> Self {
84 Self { instrument_ids }
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq)]
90pub(crate) struct WatchOhlcvRequest {
91 pub instrument_ids: Vec<InstrumentId>,
93 pub interval: Box<str>,
95}
96
97impl WatchOhlcvRequest {
98 #[must_use]
100 pub fn for_instrument(instrument_id: InstrumentId, interval: impl Into<Box<str>>) -> Self {
101 Self {
102 instrument_ids: vec![instrument_id],
103 interval: normalize_interval_box(interval.into()),
104 }
105 }
106
107 #[must_use]
109 pub fn for_instruments(
110 instrument_ids: Vec<InstrumentId>,
111 interval: impl Into<Box<str>>,
112 ) -> Self {
113 Self {
114 instrument_ids,
115 interval: normalize_interval_box(interval.into()),
116 }
117 }
118
119 fn into_public_subscription(self) -> PublicSubscription {
120 PublicSubscription {
121 instrument_ids: self.instrument_ids,
122 ticker: false,
123 trades: false,
124 book_top: false,
125 order_book: false,
126 mark_price: false,
127 funding_rate: false,
128 open_interest: false,
129 liquidations: false,
130 kline_intervals: vec![self.interval],
131 }
132 }
133}
134
135#[derive(Clone, Debug)]
136enum InstrumentFilter {
137 Single(InstrumentId),
138 Many(HashSet<InstrumentId>),
139}
140
141impl InstrumentFilter {
142 fn from_vec(instrument_ids: Vec<InstrumentId>) -> Self {
143 match instrument_ids.as_slice() {
144 [instrument_id] => Self::Single(instrument_id.clone()),
145 _ => Self::Many(instrument_ids.into_iter().collect()),
146 }
147 }
148
149 #[cfg(test)]
150 fn from_slice(instrument_ids: &[InstrumentId]) -> Self {
151 match instrument_ids {
152 [instrument_id] => Self::Single(instrument_id.clone()),
153 _ => Self::Many(instrument_ids.iter().cloned().collect()),
154 }
155 }
156
157 fn contains(&self, instrument_id: &InstrumentId) -> bool {
158 match self {
159 Self::Single(candidate) => candidate == instrument_id,
160 Self::Many(candidates) => candidates.contains(instrument_id),
161 }
162 }
163}
164
165pub(crate) struct LiveStreamHandle {
167 pub(crate) _shutdown: oneshot::Sender<()>,
168 pub(crate) join: JoinHandle<Result<()>>,
169}
170
171impl LiveStreamHandle {
172 pub fn abort(&self) {
174 self.join.abort();
175 }
176}
177
178async fn recv_public_event(
179 receiver: &mut broadcast::Receiver<PublicLaneEvent>,
180 label: &str,
181) -> Result<PublicLaneEvent> {
182 loop {
183 match receiver.recv().await {
184 Ok(event) => return Ok(event),
185 Err(broadcast::error::RecvError::Lagged(_)) => continue,
186 Err(error) => {
187 return Err(bat_markets_core::MarketError::new(
188 ErrorKind::TransportError,
189 format!("{label} receive failed: {error}"),
190 ));
191 }
192 }
193 }
194}
195
196async fn recv_private_event(
197 receiver: &mut broadcast::Receiver<PrivateLaneEvent>,
198 label: &str,
199) -> Result<PrivateLaneEvent> {
200 loop {
201 match receiver.recv().await {
202 Ok(event) => return Ok(event),
203 Err(broadcast::error::RecvError::Lagged(_)) => continue,
204 Err(error) => {
205 return Err(bat_markets_core::MarketError::new(
206 ErrorKind::TransportError,
207 format!("{label} receive failed: {error}"),
208 ));
209 }
210 }
211 }
212}
213
214pub(crate) struct OhlcvUpdates<'a> {
216 inner: &'a BatMarkets,
217 receiver: broadcast::Receiver<PublicLaneEvent>,
218 instrument_ids: InstrumentFilter,
219 interval: Box<str>,
220}
221
222impl<'a> OhlcvUpdates<'a> {
223 fn new(
224 inner: &'a BatMarkets,
225 receiver: broadcast::Receiver<PublicLaneEvent>,
226 request: WatchOhlcvRequest,
227 ) -> Self {
228 Self {
229 inner,
230 receiver,
231 instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
232 interval: request.interval,
233 }
234 }
235
236 pub async fn recv(&mut self) -> Result<Kline> {
238 let requested_interval = parse_watch_interval(self.interval.as_ref())?;
239 loop {
240 let event = recv_public_event(&mut self.receiver, "ohlcv subscription").await?;
241
242 let PublicLaneEvent::Kline(kline) = event else {
243 continue;
244 };
245 let Some(incoming_interval) = KlineInterval::parse(kline.interval.as_ref()) else {
246 continue;
247 };
248 if !self.instrument_ids.contains(&kline.instrument_id)
249 || incoming_interval != requested_interval
250 {
251 continue;
252 }
253
254 let spec = self
255 .inner
256 .adapter
257 .as_adapter()
258 .resolve_instrument(&kline.instrument_id)
259 .ok_or_else(|| {
260 bat_markets_core::MarketError::new(
261 ErrorKind::Unsupported,
262 format!(
263 "unknown instrument {} for kline update",
264 kline.instrument_id
265 ),
266 )
267 })?;
268 let mut unified = kline.to_unified(&spec);
269 unified.interval = Box::<str>::from(requested_interval);
270 return Ok(unified);
271 }
272 }
273}
274
275pub struct OhlcvWatch<'a> {
277 updates: OhlcvUpdates<'a>,
278 _lease: PublicSubscriptionLease,
279}
280
281impl<'a> OhlcvWatch<'a> {
282 pub async fn recv(&mut self) -> Result<Kline> {
284 self.updates.recv().await
285 }
286
287 pub async fn shutdown(self) -> Result<()> {
289 Ok(())
290 }
291
292 pub fn abort(&self) {}
297
298 pub async fn wait(self) -> Result<()> {
300 Ok(())
301 }
302}
303
304pub(crate) struct TickerUpdates<'a> {
306 inner: &'a BatMarkets,
307 receiver: broadcast::Receiver<PublicLaneEvent>,
308 instrument_ids: InstrumentFilter,
309}
310
311impl<'a> TickerUpdates<'a> {
312 fn new(
313 inner: &'a BatMarkets,
314 receiver: broadcast::Receiver<PublicLaneEvent>,
315 request: WatchInstrumentsRequest,
316 ) -> Self {
317 Self {
318 inner,
319 receiver,
320 instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
321 }
322 }
323
324 pub async fn recv(&mut self) -> Result<Ticker> {
326 loop {
327 let event = recv_public_event(&mut self.receiver, "ticker subscription").await?;
328
329 let PublicLaneEvent::Ticker(ticker) = event else {
330 continue;
331 };
332 if !self.instrument_ids.contains(&ticker.instrument_id) {
333 continue;
334 }
335
336 let spec = resolve_public_spec(self.inner, &ticker.instrument_id, "ticker")?;
337 return Ok(ticker.to_unified(&spec));
338 }
339 }
340}
341
342pub struct TickerWatch<'a> {
344 updates: TickerUpdates<'a>,
345 _lease: PublicSubscriptionLease,
346}
347
348impl<'a> TickerWatch<'a> {
349 pub async fn recv(&mut self) -> Result<Ticker> {
351 self.updates.recv().await
352 }
353
354 pub async fn shutdown(self) -> Result<()> {
356 Ok(())
357 }
358
359 pub fn abort(&self) {}
361
362 pub async fn wait(self) -> Result<()> {
364 Ok(())
365 }
366}
367
368pub(crate) struct TradeUpdates<'a> {
370 inner: &'a BatMarkets,
371 receiver: broadcast::Receiver<PublicLaneEvent>,
372 instrument_ids: InstrumentFilter,
373}
374
375impl<'a> TradeUpdates<'a> {
376 fn new(
377 inner: &'a BatMarkets,
378 receiver: broadcast::Receiver<PublicLaneEvent>,
379 request: WatchInstrumentsRequest,
380 ) -> Self {
381 Self {
382 inner,
383 receiver,
384 instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
385 }
386 }
387
388 pub async fn recv(&mut self) -> Result<TradeTick> {
390 loop {
391 let event = recv_public_event(&mut self.receiver, "trade subscription").await?;
392
393 let PublicLaneEvent::Trade(trade) = event else {
394 continue;
395 };
396 if !self.instrument_ids.contains(&trade.instrument_id) {
397 continue;
398 }
399
400 let spec = resolve_public_spec(self.inner, &trade.instrument_id, "trade")?;
401 return Ok(trade.to_unified(&spec));
402 }
403 }
404}
405
406pub struct TradesWatch<'a> {
408 updates: TradeUpdates<'a>,
409 _lease: PublicSubscriptionLease,
410}
411
412impl<'a> TradesWatch<'a> {
413 pub async fn recv(&mut self) -> Result<TradeTick> {
415 self.updates.recv().await
416 }
417
418 pub async fn shutdown(self) -> Result<()> {
420 Ok(())
421 }
422
423 pub fn abort(&self) {}
425
426 pub async fn wait(self) -> Result<()> {
428 Ok(())
429 }
430}
431
432#[cfg(test)]
434pub(crate) struct BookTopUpdates<'a> {
435 inner: &'a BatMarkets,
436 receiver: broadcast::Receiver<PublicLaneEvent>,
437 instrument_ids: InstrumentFilter,
438}
439
440#[cfg(test)]
441impl<'a> BookTopUpdates<'a> {
442 fn new(
443 inner: &'a BatMarkets,
444 receiver: broadcast::Receiver<PublicLaneEvent>,
445 request: WatchInstrumentsRequest,
446 ) -> Self {
447 Self {
448 inner,
449 receiver,
450 instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
451 }
452 }
453
454 pub async fn recv(&mut self) -> Result<BookTop> {
456 loop {
457 let event = recv_public_event(&mut self.receiver, "book_top subscription").await?;
458
459 let PublicLaneEvent::BookTop(book_top) = event else {
460 continue;
461 };
462 if !self.instrument_ids.contains(&book_top.instrument_id) {
463 continue;
464 }
465
466 let spec = resolve_public_spec(self.inner, &book_top.instrument_id, "book_top")?;
467 return Ok(book_top.to_unified(&spec));
468 }
469 }
470}
471
472pub(crate) struct MarkPriceUpdates<'a> {
474 inner: &'a BatMarkets,
475 receiver: broadcast::Receiver<PublicLaneEvent>,
476 instrument_ids: InstrumentFilter,
477}
478
479impl<'a> MarkPriceUpdates<'a> {
480 fn new(
481 inner: &'a BatMarkets,
482 receiver: broadcast::Receiver<PublicLaneEvent>,
483 request: WatchInstrumentsRequest,
484 ) -> Self {
485 Self {
486 inner,
487 receiver,
488 instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
489 }
490 }
491
492 pub async fn recv(&mut self) -> Result<MarkPrice> {
494 loop {
495 let event = recv_public_event(&mut self.receiver, "mark_price subscription").await?;
496
497 let PublicLaneEvent::MarkPrice(mark_price) = event else {
498 continue;
499 };
500 if !self.instrument_ids.contains(&mark_price.instrument_id) {
501 continue;
502 }
503
504 let spec = resolve_public_spec(self.inner, &mark_price.instrument_id, "mark_price")?;
505 return Ok(mark_price.to_unified(&spec));
506 }
507 }
508}
509
510pub struct MarkPriceWatch<'a> {
512 updates: MarkPriceUpdates<'a>,
513 _lease: PublicSubscriptionLease,
514}
515
516impl<'a> MarkPriceWatch<'a> {
517 pub async fn recv(&mut self) -> Result<MarkPrice> {
519 self.updates.recv().await
520 }
521
522 pub async fn shutdown(self) -> Result<()> {
524 Ok(())
525 }
526}
527
528pub(crate) struct FundingRateUpdates {
530 receiver: broadcast::Receiver<PublicLaneEvent>,
531 instrument_ids: InstrumentFilter,
532}
533
534impl FundingRateUpdates {
535 fn new(
536 receiver: broadcast::Receiver<PublicLaneEvent>,
537 request: WatchInstrumentsRequest,
538 ) -> Self {
539 Self {
540 receiver,
541 instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
542 }
543 }
544
545 pub async fn recv(&mut self) -> Result<FundingRate> {
547 loop {
548 let event = recv_public_event(&mut self.receiver, "funding_rate subscription").await?;
549
550 let PublicLaneEvent::FundingRate(funding_rate) = event else {
551 continue;
552 };
553 if !self.instrument_ids.contains(&funding_rate.instrument_id) {
554 continue;
555 }
556 return Ok(funding_rate);
557 }
558 }
559}
560
561pub struct FundingRateWatch<'a> {
563 updates: FundingRateUpdates,
564 _lease: PublicSubscriptionLease,
565 _marker: std::marker::PhantomData<&'a BatMarkets>,
566}
567
568impl<'a> FundingRateWatch<'a> {
569 pub async fn recv(&mut self) -> Result<FundingRate> {
571 self.updates.recv().await
572 }
573
574 pub async fn shutdown(self) -> Result<()> {
576 Ok(())
577 }
578}
579
580pub(crate) struct OpenInterestUpdates {
582 receiver: broadcast::Receiver<PublicLaneEvent>,
583 instrument_ids: InstrumentFilter,
584}
585
586impl OpenInterestUpdates {
587 fn new(
588 receiver: broadcast::Receiver<PublicLaneEvent>,
589 request: WatchInstrumentsRequest,
590 ) -> Self {
591 Self {
592 receiver,
593 instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
594 }
595 }
596
597 pub async fn recv(&mut self) -> Result<OpenInterest> {
599 loop {
600 let event = recv_public_event(&mut self.receiver, "open_interest subscription").await?;
601
602 let PublicLaneEvent::OpenInterest(open_interest) = event else {
603 continue;
604 };
605 if !self.instrument_ids.contains(&open_interest.instrument_id) {
606 continue;
607 }
608 return Ok(open_interest);
609 }
610 }
611}
612
613pub struct OpenInterestWatch<'a> {
615 updates: OpenInterestUpdates,
616 _lease: PublicSubscriptionLease,
617 _marker: std::marker::PhantomData<&'a BatMarkets>,
618}
619
620impl<'a> OpenInterestWatch<'a> {
621 pub async fn recv(&mut self) -> Result<OpenInterest> {
623 self.updates.recv().await
624 }
625
626 pub async fn shutdown(self) -> Result<()> {
628 Ok(())
629 }
630}
631
632pub(crate) struct LiquidationUpdates<'a> {
634 inner: &'a BatMarkets,
635 receiver: broadcast::Receiver<PublicLaneEvent>,
636 instrument_ids: InstrumentFilter,
637}
638
639impl<'a> LiquidationUpdates<'a> {
640 fn new(
641 inner: &'a BatMarkets,
642 receiver: broadcast::Receiver<PublicLaneEvent>,
643 request: WatchInstrumentsRequest,
644 ) -> Self {
645 Self {
646 inner,
647 receiver,
648 instrument_ids: InstrumentFilter::from_vec(request.instrument_ids),
649 }
650 }
651
652 pub async fn recv(&mut self) -> Result<Liquidation> {
654 loop {
655 let event = recv_public_event(&mut self.receiver, "liquidation subscription").await?;
656
657 let PublicLaneEvent::Liquidation(liquidation) = event else {
658 continue;
659 };
660 if !self.instrument_ids.contains(&liquidation.instrument_id) {
661 continue;
662 }
663
664 let spec = resolve_public_spec(self.inner, &liquidation.instrument_id, "liquidation")?;
665 return Ok(liquidation.to_unified(&spec));
666 }
667 }
668}
669
670pub struct LiquidationWatch<'a> {
672 updates: LiquidationUpdates<'a>,
673 _lease: PublicSubscriptionLease,
674}
675
676impl<'a> LiquidationWatch<'a> {
677 pub async fn recv(&mut self) -> Result<Liquidation> {
679 self.updates.recv().await
680 }
681
682 pub async fn shutdown(self) -> Result<()> {
684 Ok(())
685 }
686}
687
688pub(crate) struct OrderBookUpdates<'a> {
690 inner: &'a BatMarkets,
691 receiver: broadcast::Receiver<PublicLaneEvent>,
692 instrument_id: InstrumentId,
693}
694
695impl<'a> OrderBookUpdates<'a> {
696 fn new(
697 inner: &'a BatMarkets,
698 receiver: broadcast::Receiver<PublicLaneEvent>,
699 request: WatchOrderBookRequest,
700 ) -> Self {
701 Self {
702 inner,
703 receiver,
704 instrument_id: request.instrument_id,
705 }
706 }
707
708 pub async fn recv(&mut self) -> Result<OrderBookDelta> {
710 loop {
711 let event = recv_public_event(&mut self.receiver, "order_book subscription").await?;
712
713 let PublicLaneEvent::OrderBookDelta(delta) = event else {
714 continue;
715 };
716 if delta.instrument_id != self.instrument_id {
717 continue;
718 }
719
720 let spec = resolve_public_spec(self.inner, &delta.instrument_id, "order_book")?;
721 return Ok(delta.to_unified(&spec));
722 }
723 }
724}
725
726pub struct OrderBookWatch<'a> {
728 updates: OrderBookUpdates<'a>,
729 _lease: PublicSubscriptionLease,
730}
731
732impl<'a> OrderBookWatch<'a> {
733 pub async fn recv(&mut self) -> Result<OrderBookDelta> {
735 self.updates.recv().await
736 }
737
738 pub async fn shutdown(self) -> Result<()> {
740 Ok(())
741 }
742}
743
744#[cfg(test)]
746pub(crate) struct FastFeedUpdates {
747 receiver: broadcast::Receiver<PublicLaneEvent>,
748 request: WatchFastFeedRequest,
749 instrument_ids: InstrumentFilter,
750}
751
752#[cfg(test)]
753impl FastFeedUpdates {
754 fn new(receiver: broadcast::Receiver<PublicLaneEvent>, request: WatchFastFeedRequest) -> Self {
755 Self {
756 receiver,
757 instrument_ids: InstrumentFilter::from_slice(&request.instrument_ids),
758 request,
759 }
760 }
761
762 pub async fn recv(&mut self) -> Result<PublicLaneEvent> {
764 loop {
765 let event = recv_public_event(&mut self.receiver, "fast feed").await?;
766
767 let instrument_id = match &event {
768 PublicLaneEvent::Ticker(event) => {
769 if !self.request.ticker {
770 continue;
771 }
772 &event.instrument_id
773 }
774 PublicLaneEvent::Trade(event) => {
775 if !self.request.trades {
776 continue;
777 }
778 &event.instrument_id
779 }
780 PublicLaneEvent::BookTop(event) => {
781 if !self.request.book_top {
782 continue;
783 }
784 &event.instrument_id
785 }
786 PublicLaneEvent::MarkPrice(event) => {
787 if !self.request.mark_price {
788 continue;
789 }
790 &event.instrument_id
791 }
792 PublicLaneEvent::FundingRate(event) => {
793 if !self.request.funding_rate {
794 continue;
795 }
796 &event.instrument_id
797 }
798 PublicLaneEvent::OpenInterest(event) => {
799 if !self.request.open_interest {
800 continue;
801 }
802 &event.instrument_id
803 }
804 PublicLaneEvent::Liquidation(event) => {
805 if !self.request.liquidations {
806 continue;
807 }
808 &event.instrument_id
809 }
810 _ => continue,
811 };
812
813 if self.instrument_ids.contains(instrument_id) {
814 return Ok(event);
815 }
816 }
817 }
818}
819
820pub(crate) struct OrderUpdates {
822 receiver: broadcast::Receiver<PrivateLaneEvent>,
823}
824
825impl OrderUpdates {
826 fn new(receiver: broadcast::Receiver<PrivateLaneEvent>) -> Self {
827 Self { receiver }
828 }
829
830 pub async fn recv(&mut self) -> Result<Order> {
832 loop {
833 let event = recv_private_event(&mut self.receiver, "order subscription").await?;
834 if let PrivateLaneEvent::Order(order) = event {
835 return Ok(order);
836 }
837 }
838 }
839}
840
841pub struct OrdersWatch<'a> {
843 updates: OrderUpdates,
844 _lease: PrivateSubscriptionLease,
845 _marker: std::marker::PhantomData<&'a BatMarkets>,
846}
847
848impl<'a> OrdersWatch<'a> {
849 pub async fn recv(&mut self) -> Result<Order> {
851 self.updates.recv().await
852 }
853
854 pub async fn shutdown(self) -> Result<()> {
856 Ok(())
857 }
858}
859
860pub(crate) struct ExecutionUpdates {
862 receiver: broadcast::Receiver<PrivateLaneEvent>,
863}
864
865impl ExecutionUpdates {
866 fn new(receiver: broadcast::Receiver<PrivateLaneEvent>) -> Self {
867 Self { receiver }
868 }
869
870 pub async fn recv(&mut self) -> Result<Execution> {
872 loop {
873 let event = recv_private_event(&mut self.receiver, "execution subscription").await?;
874 if let PrivateLaneEvent::Execution(execution) = event {
875 return Ok(execution);
876 }
877 }
878 }
879}
880
881pub struct ExecutionsWatch<'a> {
883 updates: ExecutionUpdates,
884 _lease: PrivateSubscriptionLease,
885 _marker: std::marker::PhantomData<&'a BatMarkets>,
886}
887
888impl<'a> ExecutionsWatch<'a> {
889 pub async fn recv(&mut self) -> Result<Execution> {
891 self.updates.recv().await
892 }
893
894 pub async fn shutdown(self) -> Result<()> {
896 Ok(())
897 }
898}
899
900pub(crate) struct PositionUpdates {
902 receiver: broadcast::Receiver<PrivateLaneEvent>,
903}
904
905impl PositionUpdates {
906 fn new(receiver: broadcast::Receiver<PrivateLaneEvent>) -> Self {
907 Self { receiver }
908 }
909
910 pub async fn recv(&mut self) -> Result<Position> {
912 loop {
913 let event = recv_private_event(&mut self.receiver, "position subscription").await?;
914 if let PrivateLaneEvent::Position(position) = event {
915 return Ok(position);
916 }
917 }
918 }
919}
920
921pub struct PositionsWatch<'a> {
923 updates: PositionUpdates,
924 _lease: PrivateSubscriptionLease,
925 _marker: std::marker::PhantomData<&'a BatMarkets>,
926}
927
928impl<'a> PositionsWatch<'a> {
929 pub async fn recv(&mut self) -> Result<Position> {
931 self.updates.recv().await
932 }
933
934 pub async fn shutdown(self) -> Result<()> {
936 Ok(())
937 }
938}
939
940pub(crate) struct BalanceUpdates {
942 receiver: broadcast::Receiver<PrivateLaneEvent>,
943}
944
945impl BalanceUpdates {
946 fn new(receiver: broadcast::Receiver<PrivateLaneEvent>) -> Self {
947 Self { receiver }
948 }
949
950 pub async fn recv(&mut self) -> Result<Balance> {
952 loop {
953 let event = recv_private_event(&mut self.receiver, "balance subscription").await?;
954 if let PrivateLaneEvent::Balance(balance) = event {
955 return Ok(balance);
956 }
957 }
958 }
959}
960
961pub struct BalancesWatch<'a> {
963 updates: BalanceUpdates,
964 _lease: PrivateSubscriptionLease,
965 _marker: std::marker::PhantomData<&'a BatMarkets>,
966}
967
968impl<'a> BalancesWatch<'a> {
969 pub async fn recv(&mut self) -> Result<Balance> {
971 self.updates.recv().await
972 }
973
974 pub async fn shutdown(self) -> Result<()> {
976 Ok(())
977 }
978}
979
980fn normalize_interval_box(interval: Box<str>) -> Box<str> {
981 KlineInterval::parse(interval.as_ref())
982 .map(Into::into)
983 .unwrap_or(interval)
984}
985
986fn resolve_public_spec(
987 inner: &BatMarkets,
988 instrument_id: &InstrumentId,
989 event_name: &str,
990) -> Result<bat_markets_core::InstrumentSpec> {
991 inner
992 .adapter
993 .as_adapter()
994 .resolve_instrument(instrument_id)
995 .ok_or_else(|| {
996 bat_markets_core::MarketError::new(
997 ErrorKind::Unsupported,
998 format!("unknown instrument {instrument_id} for {event_name} update"),
999 )
1000 })
1001}
1002
1003fn parse_watch_interval(raw: &str) -> Result<KlineInterval> {
1004 KlineInterval::parse(raw).ok_or_else(|| {
1005 bat_markets_core::MarketError::new(
1006 ErrorKind::Unsupported,
1007 format!("unsupported OHLCV interval '{raw}'"),
1008 )
1009 })
1010}
1011
1012pub(crate) struct PublicLaneClient<'a> {
1014 inner: &'a BatMarkets,
1015}
1016
1017impl<'a> PublicLaneClient<'a> {
1018 #[cfg(test)]
1023 pub fn ingest_json(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
1024 let events = self.inner.adapter.as_adapter().parse_public(payload)?;
1025 self.inner.shared.apply_public_events(&events)?;
1026 Ok(events)
1027 }
1028
1029 #[must_use]
1031 pub fn subscribe(&self) -> broadcast::Receiver<PublicLaneEvent> {
1032 self.inner.shared.subscribe_public_events()
1033 }
1034
1035 #[must_use]
1037 pub fn subscribe_ticker(&self, request: WatchInstrumentsRequest) -> TickerUpdates<'a> {
1038 TickerUpdates::new(self.inner, self.subscribe(), request)
1039 }
1040
1041 #[must_use]
1043 pub fn subscribe_trades(&self, request: WatchInstrumentsRequest) -> TradeUpdates<'a> {
1044 TradeUpdates::new(self.inner, self.subscribe(), request)
1045 }
1046
1047 #[must_use]
1049 #[cfg(test)]
1050 pub fn subscribe_book_top(&self, request: WatchInstrumentsRequest) -> BookTopUpdates<'a> {
1051 BookTopUpdates::new(self.inner, self.subscribe(), request)
1052 }
1053
1054 #[must_use]
1056 pub fn subscribe_mark_prices(&self, request: WatchInstrumentsRequest) -> MarkPriceUpdates<'a> {
1057 MarkPriceUpdates::new(self.inner, self.subscribe(), request)
1058 }
1059
1060 #[must_use]
1062 pub fn subscribe_funding_rates(&self, request: WatchInstrumentsRequest) -> FundingRateUpdates {
1063 FundingRateUpdates::new(self.subscribe(), request)
1064 }
1065
1066 #[must_use]
1068 pub fn subscribe_open_interest(&self, request: WatchInstrumentsRequest) -> OpenInterestUpdates {
1069 OpenInterestUpdates::new(self.subscribe(), request)
1070 }
1071
1072 #[must_use]
1074 pub fn subscribe_order_book(&self, request: WatchOrderBookRequest) -> OrderBookUpdates<'a> {
1075 OrderBookUpdates::new(self.inner, self.subscribe(), request)
1076 }
1077
1078 #[must_use]
1080 pub fn subscribe_liquidations(
1081 &self,
1082 request: WatchInstrumentsRequest,
1083 ) -> LiquidationUpdates<'a> {
1084 LiquidationUpdates::new(self.inner, self.subscribe(), request)
1085 }
1086
1087 #[must_use]
1089 pub fn subscribe_ohlcv(&self, request: WatchOhlcvRequest) -> OhlcvUpdates<'a> {
1090 OhlcvUpdates::new(self.inner, self.subscribe(), request)
1091 }
1092
1093 #[must_use]
1095 #[cfg(test)]
1096 pub fn subscribe_fast(&self, request: WatchFastFeedRequest) -> FastFeedUpdates {
1097 FastFeedUpdates::new(self.subscribe(), request)
1098 }
1099
1100 pub async fn watch_ticker(&self, request: WatchInstrumentsRequest) -> Result<TickerWatch<'a>> {
1106 let updates = self.subscribe_ticker(request.clone());
1107 let lease = self
1108 .inner
1109 .subscription_hubs
1110 .public
1111 .acquire(PublicSubscription {
1112 instrument_ids: request.instrument_ids,
1113 ticker: true,
1114 trades: false,
1115 book_top: false,
1116 order_book: false,
1117 mark_price: false,
1118 funding_rate: false,
1119 open_interest: false,
1120 liquidations: false,
1121 kline_intervals: Vec::new(),
1122 })
1123 .await?;
1124 Ok(TickerWatch {
1125 updates,
1126 _lease: lease,
1127 })
1128 }
1129
1130 pub async fn watch_trades(&self, request: WatchInstrumentsRequest) -> Result<TradesWatch<'a>> {
1132 let updates = self.subscribe_trades(request.clone());
1133 let lease = self
1134 .inner
1135 .subscription_hubs
1136 .public
1137 .acquire(PublicSubscription {
1138 instrument_ids: request.instrument_ids,
1139 ticker: false,
1140 trades: true,
1141 book_top: false,
1142 order_book: false,
1143 mark_price: false,
1144 funding_rate: false,
1145 open_interest: false,
1146 liquidations: false,
1147 kline_intervals: Vec::new(),
1148 })
1149 .await?;
1150 Ok(TradesWatch {
1151 updates,
1152 _lease: lease,
1153 })
1154 }
1155
1156 pub async fn watch_mark_prices(
1158 &self,
1159 request: WatchInstrumentsRequest,
1160 ) -> Result<MarkPriceWatch<'a>> {
1161 let updates = self.subscribe_mark_prices(request.clone());
1162 let lease = self
1163 .inner
1164 .subscription_hubs
1165 .public
1166 .acquire(PublicSubscription {
1167 instrument_ids: request.instrument_ids,
1168 ticker: false,
1169 trades: false,
1170 book_top: false,
1171 order_book: false,
1172 mark_price: true,
1173 funding_rate: false,
1174 open_interest: false,
1175 liquidations: false,
1176 kline_intervals: Vec::new(),
1177 })
1178 .await?;
1179 Ok(MarkPriceWatch {
1180 updates,
1181 _lease: lease,
1182 })
1183 }
1184
1185 pub async fn watch_funding_rates(
1187 &self,
1188 request: WatchInstrumentsRequest,
1189 ) -> Result<FundingRateWatch<'a>> {
1190 let updates = self.subscribe_funding_rates(request.clone());
1191 let lease = self
1192 .inner
1193 .subscription_hubs
1194 .public
1195 .acquire(PublicSubscription {
1196 instrument_ids: request.instrument_ids,
1197 ticker: false,
1198 trades: false,
1199 book_top: false,
1200 order_book: false,
1201 mark_price: false,
1202 funding_rate: true,
1203 open_interest: false,
1204 liquidations: false,
1205 kline_intervals: Vec::new(),
1206 })
1207 .await?;
1208 Ok(FundingRateWatch {
1209 updates,
1210 _lease: lease,
1211 _marker: std::marker::PhantomData,
1212 })
1213 }
1214
1215 pub async fn watch_open_interest(
1217 &self,
1218 request: WatchInstrumentsRequest,
1219 ) -> Result<OpenInterestWatch<'a>> {
1220 let updates = self.subscribe_open_interest(request.clone());
1221 let lease = self
1222 .inner
1223 .subscription_hubs
1224 .public
1225 .acquire(PublicSubscription {
1226 instrument_ids: request.instrument_ids,
1227 ticker: false,
1228 trades: false,
1229 book_top: false,
1230 order_book: false,
1231 mark_price: false,
1232 funding_rate: false,
1233 open_interest: true,
1234 liquidations: false,
1235 kline_intervals: Vec::new(),
1236 })
1237 .await?;
1238 Ok(OpenInterestWatch {
1239 updates,
1240 _lease: lease,
1241 _marker: std::marker::PhantomData,
1242 })
1243 }
1244
1245 pub async fn watch_order_book(
1247 &self,
1248 request: WatchOrderBookRequest,
1249 ) -> Result<OrderBookWatch<'a>> {
1250 let updates = self.subscribe_order_book(request.clone());
1251 let lease = self
1252 .inner
1253 .subscription_hubs
1254 .public
1255 .acquire(PublicSubscription {
1256 instrument_ids: vec![request.instrument_id],
1257 ticker: false,
1258 trades: false,
1259 book_top: false,
1260 order_book: true,
1261 mark_price: false,
1262 funding_rate: false,
1263 open_interest: false,
1264 liquidations: false,
1265 kline_intervals: Vec::new(),
1266 })
1267 .await?;
1268 Ok(OrderBookWatch {
1269 updates,
1270 _lease: lease,
1271 })
1272 }
1273
1274 pub async fn watch_liquidations(
1276 &self,
1277 request: WatchInstrumentsRequest,
1278 ) -> Result<LiquidationWatch<'a>> {
1279 let updates = self.subscribe_liquidations(request.clone());
1280 let lease = self
1281 .inner
1282 .subscription_hubs
1283 .public
1284 .acquire(PublicSubscription {
1285 instrument_ids: request.instrument_ids,
1286 ticker: false,
1287 trades: false,
1288 book_top: false,
1289 order_book: false,
1290 mark_price: false,
1291 funding_rate: false,
1292 open_interest: false,
1293 liquidations: true,
1294 kline_intervals: Vec::new(),
1295 })
1296 .await?;
1297 Ok(LiquidationWatch {
1298 updates,
1299 _lease: lease,
1300 })
1301 }
1302
1303 pub async fn watch_ohlcv(&self, request: WatchOhlcvRequest) -> Result<OhlcvWatch<'a>> {
1340 let updates = self.subscribe_ohlcv(request.clone());
1341 let lease = self
1342 .inner
1343 .subscription_hubs
1344 .public
1345 .acquire(request.into_public_subscription())
1346 .await?;
1347 Ok(OhlcvWatch {
1348 updates,
1349 _lease: lease,
1350 })
1351 }
1352
1353 pub async fn watch_tickers(&self, request: WatchInstrumentsRequest) -> Result<TickerWatch<'a>> {
1355 self.watch_ticker(request).await
1356 }
1357}
1358
1359pub(crate) struct PrivateLaneClient<'a> {
1361 inner: &'a BatMarkets,
1362}
1363
1364impl<'a> PrivateLaneClient<'a> {
1365 #[cfg(test)]
1370 pub fn ingest_json(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
1371 let events = self.inner.adapter.as_adapter().parse_private(payload)?;
1372 self.inner.shared.apply_private_events(&events);
1373 Ok(events)
1374 }
1375
1376 #[must_use]
1378 pub fn subscribe(&self) -> broadcast::Receiver<PrivateLaneEvent> {
1379 self.inner.shared.subscribe_private_events()
1380 }
1381
1382 #[must_use]
1384 pub fn subscribe_orders(&self) -> OrderUpdates {
1385 OrderUpdates::new(self.subscribe())
1386 }
1387
1388 #[must_use]
1390 pub fn subscribe_executions(&self) -> ExecutionUpdates {
1391 ExecutionUpdates::new(self.subscribe())
1392 }
1393
1394 #[must_use]
1396 pub fn subscribe_positions(&self) -> PositionUpdates {
1397 PositionUpdates::new(self.subscribe())
1398 }
1399
1400 #[must_use]
1402 pub fn subscribe_balances(&self) -> BalanceUpdates {
1403 BalanceUpdates::new(self.subscribe())
1404 }
1405
1406 pub async fn watch_orders(&self) -> Result<OrdersWatch<'a>> {
1412 let updates = self.subscribe_orders();
1413 let lease = self.inner.subscription_hubs.private.acquire().await?;
1414 Ok(OrdersWatch {
1415 updates,
1416 _lease: lease,
1417 _marker: std::marker::PhantomData,
1418 })
1419 }
1420
1421 pub async fn watch_executions(&self) -> Result<ExecutionsWatch<'a>> {
1423 let updates = self.subscribe_executions();
1424 let lease = self.inner.subscription_hubs.private.acquire().await?;
1425 Ok(ExecutionsWatch {
1426 updates,
1427 _lease: lease,
1428 _marker: std::marker::PhantomData,
1429 })
1430 }
1431
1432 pub async fn watch_positions(&self) -> Result<PositionsWatch<'a>> {
1434 let updates = self.subscribe_positions();
1435 let lease = self.inner.subscription_hubs.private.acquire().await?;
1436 Ok(PositionsWatch {
1437 updates,
1438 _lease: lease,
1439 _marker: std::marker::PhantomData,
1440 })
1441 }
1442
1443 pub async fn watch_balances(&self) -> Result<BalancesWatch<'a>> {
1445 let updates = self.subscribe_balances();
1446 let lease = self.inner.subscription_hubs.private.acquire().await?;
1447 Ok(BalancesWatch {
1448 updates,
1449 _lease: lease,
1450 _marker: std::marker::PhantomData,
1451 })
1452 }
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457 use tokio::sync::broadcast;
1458 use tokio::time::{Duration, timeout};
1459
1460 use bat_markets_core::{InstrumentId, Product, PublicLaneEvent, Venue};
1461
1462 use crate::BatMarketsBuilder;
1463 use bat_markets_core::{WatchFastFeedRequest, WatchOrderBookRequest};
1464
1465 use super::{
1466 InstrumentFilter, WatchInstrumentsRequest, WatchOhlcvRequest, private_lane, public_lane,
1467 recv_public_event,
1468 };
1469
1470 macro_rules! fixture {
1471 ($venue:literal, $file:literal) => {
1472 include_str!(concat!(
1473 env!("CARGO_MANIFEST_DIR"),
1474 "/../../fixtures/",
1475 $venue,
1476 "/",
1477 $file
1478 ))
1479 };
1480 }
1481
1482 const BINANCE_PUBLIC_TRADE: &str = fixture!("binance", "public_trade.json");
1483 const BINANCE_PUBLIC_TICKER: &str = fixture!("binance", "public_ticker.json");
1484 const BINANCE_PUBLIC_BOOK_TICKER: &str = fixture!("binance", "public_book_ticker.json");
1485 const BINANCE_PUBLIC_KLINE: &str = fixture!("binance", "public_kline.json");
1486 const BINANCE_PUBLIC_MARK_PRICE: &str = fixture!("binance", "public_mark_price.json");
1487 const BINANCE_PUBLIC_LIQUIDATION: &str = fixture!("binance", "public_liquidation.json");
1488 const BYBIT_PUBLIC_ORDERBOOK: &str = fixture!("bybit", "public_orderbook.json");
1489 const BYBIT_PRIVATE_ORDER: &str = fixture!("bybit", "private_order.json");
1490 const BYBIT_PRIVATE_EXECUTION: &str = fixture!("bybit", "private_execution.json");
1491 const BYBIT_PRIVATE_WALLET: &str = fixture!("bybit", "private_wallet.json");
1492
1493 #[test]
1494 fn instrument_filter_uses_single_fast_path_and_many_membership() {
1495 let btc = InstrumentId::from("BTC/USDT:USDT");
1496 let eth = InstrumentId::from("ETH/USDT:USDT");
1497 let sol = InstrumentId::from("SOL/USDT:USDT");
1498
1499 let single = InstrumentFilter::from_vec(vec![btc.clone()]);
1500 assert!(matches!(single, InstrumentFilter::Single(_)));
1501 assert!(single.contains(&btc));
1502 assert!(!single.contains(ð));
1503
1504 let many = InstrumentFilter::from_vec(vec![btc.clone(), eth.clone()]);
1505 assert!(matches!(many, InstrumentFilter::Many(_)));
1506 assert!(many.contains(&btc));
1507 assert!(many.contains(ð));
1508 assert!(!many.contains(&sol));
1509 }
1510
1511 #[tokio::test]
1512 async fn public_subscribe_receives_fixture_ingest_events() {
1513 let client = BatMarketsBuilder::default()
1514 .venue(Venue::Binance)
1515 .product(Product::LinearUsdt)
1516 .build()
1517 .expect("fixture client should build");
1518 let mut receiver = public_lane(&client).subscribe();
1519
1520 let events = public_lane(&client)
1521 .ingest_json(BINANCE_PUBLIC_TRADE)
1522 .expect("fixture payload should parse");
1523 assert!(!events.is_empty());
1524
1525 let received = timeout(Duration::from_secs(1), receiver.recv())
1526 .await
1527 .expect("public event should arrive")
1528 .expect("receiver should stay open");
1529
1530 assert!(matches!(received, PublicLaneEvent::Trade(_)));
1531 }
1532
1533 #[tokio::test]
1534 async fn subscribe_ticker_receives_typed_updates() {
1535 let client = BatMarketsBuilder::default()
1536 .venue(Venue::Binance)
1537 .product(Product::LinearUsdt)
1538 .build()
1539 .expect("fixture client should build");
1540 let mut updates = public_lane(&client).subscribe_ticker(
1541 WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1542 );
1543
1544 public_lane(&client)
1545 .ingest_json(BINANCE_PUBLIC_TICKER)
1546 .expect("fixture ticker should parse");
1547
1548 let received = timeout(Duration::from_secs(1), updates.recv())
1549 .await
1550 .expect("typed ticker should arrive")
1551 .expect("typed ticker should parse");
1552
1553 assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1554 assert_eq!(received.last_price.to_string(), "70100.50");
1555 }
1556
1557 #[tokio::test]
1558 async fn subscribe_trades_receives_typed_updates() {
1559 let client = BatMarketsBuilder::default()
1560 .venue(Venue::Binance)
1561 .product(Product::LinearUsdt)
1562 .build()
1563 .expect("fixture client should build");
1564 let mut updates = public_lane(&client).subscribe_trades(
1565 WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1566 );
1567
1568 public_lane(&client)
1569 .ingest_json(BINANCE_PUBLIC_TRADE)
1570 .expect("fixture trade should parse");
1571
1572 let received = timeout(Duration::from_secs(1), updates.recv())
1573 .await
1574 .expect("typed trade should arrive")
1575 .expect("typed trade should parse");
1576
1577 assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1578 assert!(!received.trade_id.as_ref().is_empty());
1579 }
1580
1581 #[tokio::test]
1582 async fn recv_public_event_skips_lagged_updates() {
1583 let client = BatMarketsBuilder::default()
1584 .venue(Venue::Binance)
1585 .product(Product::LinearUsdt)
1586 .build()
1587 .expect("fixture client should build");
1588 let sample_event = public_lane(&client)
1589 .ingest_json(BINANCE_PUBLIC_TICKER)
1590 .expect("fixture ticker should parse")
1591 .into_iter()
1592 .next()
1593 .expect("ticker fixture should emit one event");
1594
1595 let (tx, mut receiver) = broadcast::channel(2);
1596 tx.send(sample_event.clone())
1597 .expect("first event should enter channel");
1598 tx.send(sample_event.clone())
1599 .expect("second event should enter channel");
1600 tx.send(sample_event)
1601 .expect("third event should enter channel and force lag");
1602
1603 let received = timeout(
1604 Duration::from_secs(1),
1605 recv_public_event(&mut receiver, "public test subscription"),
1606 )
1607 .await
1608 .expect("lagged public receive should still complete")
1609 .expect("lagged public receive should retry and succeed");
1610
1611 assert!(matches!(received, PublicLaneEvent::Ticker(_)));
1612 }
1613
1614 #[tokio::test]
1615 async fn subscribe_book_top_receives_typed_updates() {
1616 let client = BatMarketsBuilder::default()
1617 .venue(Venue::Binance)
1618 .product(Product::LinearUsdt)
1619 .build()
1620 .expect("fixture client should build");
1621 let mut updates = public_lane(&client).subscribe_book_top(
1622 WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1623 );
1624
1625 public_lane(&client)
1626 .ingest_json(BINANCE_PUBLIC_BOOK_TICKER)
1627 .expect("fixture book ticker should parse");
1628
1629 let received = timeout(Duration::from_secs(1), updates.recv())
1630 .await
1631 .expect("typed book top should arrive")
1632 .expect("typed book top should parse");
1633
1634 assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1635 assert_eq!(received.bid.price.to_string(), "70100.90");
1636 assert_eq!(received.ask.price.to_string(), "70101.10");
1637 }
1638
1639 #[tokio::test]
1640 async fn subscribe_ohlcv_receives_typed_kline_updates() {
1641 let client = BatMarketsBuilder::default()
1642 .venue(Venue::Binance)
1643 .product(Product::LinearUsdt)
1644 .build()
1645 .expect("fixture client should build");
1646 let mut updates = public_lane(&client).subscribe_ohlcv(WatchOhlcvRequest::for_instrument(
1647 InstrumentId::from("BTC/USDT:USDT"),
1648 "1m",
1649 ));
1650
1651 public_lane(&client)
1652 .ingest_json(BINANCE_PUBLIC_KLINE)
1653 .expect("fixture kline should parse");
1654
1655 let received = timeout(Duration::from_secs(1), updates.recv())
1656 .await
1657 .expect("typed kline should arrive")
1658 .expect("typed kline should parse");
1659
1660 assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1661 assert_eq!(received.interval.as_ref(), "1m");
1662 }
1663
1664 #[tokio::test]
1665 async fn subscribe_ohlcv_filters_symbols_before_yielding() {
1666 let client = BatMarketsBuilder::default()
1667 .venue(Venue::Binance)
1668 .product(Product::LinearUsdt)
1669 .build()
1670 .expect("fixture client should build");
1671 let mut updates = public_lane(&client).subscribe_ohlcv(WatchOhlcvRequest::for_instrument(
1672 InstrumentId::from("BTC/USDT:USDT"),
1673 "1m",
1674 ));
1675
1676 public_lane(&client)
1677 .ingest_json(
1678 r#"{
1679 "e":"kline",
1680 "E":1710000002000,
1681 "s":"ETHUSDT",
1682 "k":{
1683 "i":"1m",
1684 "t":1710000000000,
1685 "T":1710000059999,
1686 "o":"3200.00",
1687 "h":"3210.00",
1688 "l":"3195.00",
1689 "c":"3205.00",
1690 "v":"42.0",
1691 "x":false
1692 }
1693 }"#,
1694 )
1695 .expect("eth kline should parse");
1696 public_lane(&client)
1697 .ingest_json(BINANCE_PUBLIC_KLINE)
1698 .expect("btc kline should parse");
1699
1700 let received = timeout(Duration::from_secs(1), updates.recv())
1701 .await
1702 .expect("filtered btc kline should arrive")
1703 .expect("filtered btc kline should parse");
1704
1705 assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1706 }
1707
1708 #[tokio::test]
1709 async fn subscribe_mark_prices_receives_typed_updates() {
1710 let client = BatMarketsBuilder::default()
1711 .venue(Venue::Binance)
1712 .product(Product::LinearUsdt)
1713 .build()
1714 .expect("fixture client should build");
1715 let mut updates = public_lane(&client).subscribe_mark_prices(
1716 WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1717 );
1718
1719 public_lane(&client)
1720 .ingest_json(BINANCE_PUBLIC_MARK_PRICE)
1721 .expect("fixture mark price should parse");
1722
1723 let received = timeout(Duration::from_secs(1), updates.recv())
1724 .await
1725 .expect("typed mark price should arrive")
1726 .expect("typed mark price should parse");
1727
1728 assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1729 }
1730
1731 #[tokio::test]
1732 async fn subscribe_fast_filters_multi_topic_updates() {
1733 let client = BatMarketsBuilder::default()
1734 .venue(Venue::Binance)
1735 .product(Product::LinearUsdt)
1736 .build()
1737 .expect("fixture client should build");
1738 let mut updates = public_lane(&client).subscribe_fast(WatchFastFeedRequest {
1739 instrument_ids: vec![InstrumentId::from("BTC/USDT:USDT")],
1740 ticker: false,
1741 trades: false,
1742 book_top: false,
1743 mark_price: true,
1744 funding_rate: false,
1745 open_interest: false,
1746 liquidations: false,
1747 });
1748
1749 public_lane(&client)
1750 .ingest_json(BINANCE_PUBLIC_TICKER)
1751 .expect("fixture ticker should parse");
1752 public_lane(&client)
1753 .ingest_json(BINANCE_PUBLIC_MARK_PRICE)
1754 .expect("fixture mark price should parse");
1755
1756 let received = timeout(Duration::from_secs(1), updates.recv())
1757 .await
1758 .expect("fast feed mark price should arrive")
1759 .expect("fast feed event should parse");
1760
1761 assert!(matches!(received, PublicLaneEvent::MarkPrice(_)));
1762 }
1763
1764 #[tokio::test]
1765 async fn subscribe_order_book_receives_typed_updates() {
1766 let client = BatMarketsBuilder::default()
1767 .venue(Venue::Bybit)
1768 .product(Product::LinearUsdt)
1769 .build()
1770 .expect("fixture client should build");
1771 let mut updates = public_lane(&client).subscribe_order_book(WatchOrderBookRequest::new(
1772 InstrumentId::from("BTC/USDT:USDT"),
1773 Some(50),
1774 ));
1775
1776 public_lane(&client)
1777 .ingest_json(BYBIT_PUBLIC_ORDERBOOK)
1778 .expect("fixture orderbook should parse");
1779
1780 let received = timeout(Duration::from_secs(1), updates.recv())
1781 .await
1782 .expect("typed order book should arrive")
1783 .expect("typed order book should parse");
1784
1785 assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1786 assert!(!received.bids.is_empty());
1787 assert!(!received.asks.is_empty());
1788 }
1789
1790 #[tokio::test]
1791 async fn subscribe_liquidations_receives_typed_updates() {
1792 let client = BatMarketsBuilder::default()
1793 .venue(Venue::Binance)
1794 .product(Product::LinearUsdt)
1795 .build()
1796 .expect("fixture client should build");
1797 let mut updates = public_lane(&client).subscribe_liquidations(
1798 WatchInstrumentsRequest::for_instrument(InstrumentId::from("BTC/USDT:USDT")),
1799 );
1800
1801 public_lane(&client)
1802 .ingest_json(BINANCE_PUBLIC_LIQUIDATION)
1803 .expect("fixture liquidation should parse");
1804
1805 let received = timeout(Duration::from_secs(1), updates.recv())
1806 .await
1807 .expect("typed liquidation should arrive")
1808 .expect("typed liquidation should parse");
1809
1810 assert_eq!(received.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1811 assert_eq!(received.side, bat_markets_core::Side::Sell);
1812 assert_eq!(received.quantity.to_string(), "0.014");
1813 }
1814
1815 #[tokio::test]
1816 async fn private_subscribe_order_execution_and_balance_receive_fixture_updates() {
1817 let client = BatMarketsBuilder::default()
1818 .venue(Venue::Bybit)
1819 .product(Product::LinearUsdt)
1820 .build()
1821 .expect("fixture client should build");
1822 let mut orders = private_lane(&client).subscribe_orders();
1823 let mut executions = private_lane(&client).subscribe_executions();
1824 let mut balances = private_lane(&client).subscribe_balances();
1825
1826 private_lane(&client)
1827 .ingest_json(BYBIT_PRIVATE_ORDER)
1828 .expect("fixture private order should parse");
1829 private_lane(&client)
1830 .ingest_json(BYBIT_PRIVATE_EXECUTION)
1831 .expect("fixture private execution should parse");
1832 private_lane(&client)
1833 .ingest_json(BYBIT_PRIVATE_WALLET)
1834 .expect("fixture private wallet should parse");
1835
1836 let order = timeout(Duration::from_secs(1), orders.recv())
1837 .await
1838 .expect("order update should arrive")
1839 .expect("order update should parse");
1840 let execution = timeout(Duration::from_secs(1), executions.recv())
1841 .await
1842 .expect("execution update should arrive")
1843 .expect("execution update should parse");
1844 let balance = timeout(Duration::from_secs(1), balances.recv())
1845 .await
1846 .expect("balance update should arrive")
1847 .expect("balance update should parse");
1848
1849 assert_eq!(order.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1850 assert_eq!(execution.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1851 assert_eq!(balance.asset.to_string(), "USDT");
1852 }
1853}