1use super::super::book::BinanceLevel;
2use crate::{
3 Identifier, SnapshotFetcher,
4 books::OrderBook,
5 error::DataError,
6 event::{MarketEvent, MarketIter},
7 exchange::{
8 Connector,
9 binance::{
10 book::l2::{BinanceOrderBookL2Meta, BinanceOrderBookL2Snapshot},
11 futures::BinanceFuturesUsd,
12 market::BinanceMarket,
13 },
14 },
15 instrument::InstrumentData,
16 subscription::{
17 Map, Subscription,
18 book::{OrderBookEvent, OrderBooksL2},
19 },
20 transformer::ExchangeTransformer,
21};
22use async_trait::async_trait;
23use barter_instrument::exchange::ExchangeId;
24use barter_integration::{
25 Transformer, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
26};
27use chrono::{DateTime, Utc};
28use futures_util::future::try_join_all;
29use serde::{Deserialize, Serialize};
30use std::future::Future;
31use tokio::sync::mpsc::UnboundedSender;
32
33pub const HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_FUTURES_USD: &str =
37 "https://fapi.binance.com/fapi/v1/depth";
38
39#[derive(Debug)]
40pub struct BinanceFuturesUsdOrderBooksL2SnapshotFetcher;
41
42impl SnapshotFetcher<BinanceFuturesUsd, OrderBooksL2>
43 for BinanceFuturesUsdOrderBooksL2SnapshotFetcher
44{
45 fn fetch_snapshots<Instrument>(
46 subscriptions: &[Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>],
47 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, OrderBookEvent>>, SocketError>>
48 + Send
49 where
50 Instrument: InstrumentData,
51 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
52 {
53 let l2_snapshot_futures = subscriptions.iter().map(|sub| {
54 let market = sub.id();
56 let snapshot_url = format!(
57 "{}?symbol={}&limit=100",
58 HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_FUTURES_USD,
59 market.as_ref(),
60 );
61
62 async move {
63 let snapshot = reqwest::get(snapshot_url)
65 .await
66 .map_err(SocketError::Http)?
67 .json::<BinanceOrderBookL2Snapshot>()
68 .await
69 .map_err(SocketError::Http)?;
70
71 Ok(MarketEvent::from((
72 ExchangeId::BinanceFuturesUsd,
73 sub.instrument.key().clone(),
74 snapshot,
75 )))
76 }
77 });
78
79 try_join_all(l2_snapshot_futures)
80 }
81}
82
83#[derive(Debug)]
84pub struct BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey> {
85 instrument_map:
86 Map<BinanceOrderBookL2Meta<InstrumentKey, BinanceFuturesUsdOrderBookL2Sequencer>>,
87}
88
89#[async_trait]
90impl<InstrumentKey> ExchangeTransformer<BinanceFuturesUsd, InstrumentKey, OrderBooksL2>
91 for BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey>
92where
93 InstrumentKey: Clone + PartialEq + Send + Sync,
94{
95 async fn init(
96 instrument_map: Map<InstrumentKey>,
97 initial_snapshots: &[MarketEvent<InstrumentKey, OrderBookEvent>],
98 _: UnboundedSender<WsMessage>,
99 ) -> Result<Self, DataError> {
100 let instrument_map = instrument_map
101 .0
102 .into_iter()
103 .map(|(sub_id, instrument_key)| {
104 let snapshot = initial_snapshots
105 .iter()
106 .find(|snapshot| snapshot.instrument == instrument_key)
107 .ok_or_else(|| DataError::InitialSnapshotMissing(sub_id.clone()))?;
108
109 let OrderBookEvent::Snapshot(snapshot) = &snapshot.kind else {
110 return Err(DataError::InitialSnapshotInvalid(String::from(
111 "expected OrderBookEvent::Snapshot but found OrderBookEvent::Update",
112 )));
113 };
114
115 let sequencer = BinanceFuturesUsdOrderBookL2Sequencer {
116 updates_processed: 0,
117 last_update_id: snapshot.sequence,
118 };
119
120 Ok((
121 sub_id,
122 BinanceOrderBookL2Meta::new(instrument_key, sequencer),
123 ))
124 })
125 .collect::<Result<Map<_>, _>>()?;
126
127 Ok(Self { instrument_map })
128 }
129}
130
131impl<InstrumentKey> Transformer for BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey>
132where
133 InstrumentKey: Clone,
134{
135 type Error = DataError;
136 type Input = BinanceFuturesOrderBookL2Update;
137 type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
138 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
139
140 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
141 let subscription_id = match input.id() {
143 Some(subscription_id) => subscription_id,
144 None => return vec![],
145 };
146
147 let instrument = match self.instrument_map.find_mut(&subscription_id) {
149 Ok(instrument) => instrument,
150 Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
151 };
152
153 let valid_update = match instrument.sequencer.validate_sequence(input) {
155 Ok(Some(valid_update)) => valid_update,
156 Ok(None) => return vec![],
157 Err(error) => return vec![Err(error)],
158 };
159
160 MarketIter::<InstrumentKey, OrderBookEvent>::from((
161 BinanceFuturesUsd::ID,
162 instrument.key.clone(),
163 valid_update,
164 ))
165 .0
166 }
167}
168
169#[derive(Debug)]
195pub struct BinanceFuturesUsdOrderBookL2Sequencer {
196 pub updates_processed: u64,
197 pub last_update_id: u64,
198}
199
200impl BinanceFuturesUsdOrderBookL2Sequencer {
201 pub fn new(last_update_id: u64) -> Self {
203 Self {
204 updates_processed: 0,
205 last_update_id,
206 }
207 }
208
209 pub fn validate_sequence(
213 &mut self,
214 update: BinanceFuturesOrderBookL2Update,
215 ) -> Result<Option<BinanceFuturesOrderBookL2Update>, DataError> {
216 if update.last_update_id < self.last_update_id {
218 return Ok(None);
219 }
220
221 if self.is_first_update() {
222 self.validate_first_update(&update)?;
224 } else {
225 self.validate_next_update(&update)?;
227 }
228
229 self.updates_processed += 1;
231 self.last_update_id = update.last_update_id;
232
233 Ok(Some(update))
234 }
235
236 pub fn is_first_update(&self) -> bool {
241 self.updates_processed == 0
242 }
243
244 pub fn validate_first_update(
249 &self,
250 update: &BinanceFuturesOrderBookL2Update,
251 ) -> Result<(), DataError> {
252 if update.first_update_id <= self.last_update_id
253 && update.last_update_id >= self.last_update_id
254 {
255 Ok(())
256 } else {
257 Err(DataError::InvalidSequence {
258 prev_last_update_id: self.last_update_id,
259 first_update_id: update.first_update_id,
260 })
261 }
262 }
263
264 pub fn validate_next_update(
270 &self,
271 update: &BinanceFuturesOrderBookL2Update,
272 ) -> Result<(), DataError> {
273 if update.prev_last_update_id == self.last_update_id {
274 Ok(())
275 } else {
276 Err(DataError::InvalidSequence {
277 prev_last_update_id: self.last_update_id,
278 first_update_id: update.first_update_id,
279 })
280 }
281 }
282}
283
284#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
306pub struct BinanceFuturesOrderBookL2Update {
307 #[serde(
308 alias = "s",
309 deserialize_with = "super::super::book::l2::de_ob_l2_subscription_id"
310 )]
311 pub subscription_id: SubscriptionId,
312 #[serde(
313 alias = "E",
314 deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc"
315 )]
316 pub time_exchange: DateTime<Utc>,
317 #[serde(
318 alias = "T",
319 deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc"
320 )]
321 pub time_engine: DateTime<Utc>,
322 #[serde(alias = "U")]
323 pub first_update_id: u64,
324 #[serde(alias = "u")]
325 pub last_update_id: u64,
326 #[serde(alias = "pu")]
327 pub prev_last_update_id: u64,
328 #[serde(alias = "b")]
329 pub bids: Vec<BinanceLevel>,
330 #[serde(alias = "a")]
331 pub asks: Vec<BinanceLevel>,
332}
333
334impl Identifier<Option<SubscriptionId>> for BinanceFuturesOrderBookL2Update {
335 fn id(&self) -> Option<SubscriptionId> {
336 Some(self.subscription_id.clone())
337 }
338}
339
340impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BinanceFuturesOrderBookL2Update)>
341 for MarketIter<InstrumentKey, OrderBookEvent>
342{
343 fn from(
344 (exchange, instrument, update): (
345 ExchangeId,
346 InstrumentKey,
347 BinanceFuturesOrderBookL2Update,
348 ),
349 ) -> Self {
350 Self(vec![Ok(MarketEvent {
351 time_exchange: update.time_exchange,
352 time_received: Utc::now(),
353 exchange,
354 instrument,
355 kind: OrderBookEvent::Update(OrderBook::new(
356 update.last_update_id,
357 Some(update.time_engine),
358 update.bids,
359 update.asks,
360 )),
361 })])
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use crate::books::Level;
369 use rust_decimal_macros::dec;
370
371 #[test]
372 fn test_de_binance_futures_order_book_l2_update() {
373 let input = r#"
374 {
375 "e": "depthUpdate",
376 "E": 1571889248277,
377 "T": 1571889248276,
378 "s": "BTCUSDT",
379 "U": 157,
380 "u": 160,
381 "pu": 149,
382 "b": [
383 [
384 "0.0024",
385 "10"
386 ]
387 ],
388 "a": [
389 [
390 "0.0026",
391 "100"
392 ]
393 ]
394 }
395 "#;
396
397 assert_eq!(
398 serde_json::from_str::<BinanceFuturesOrderBookL2Update>(input).unwrap(),
399 BinanceFuturesOrderBookL2Update {
400 subscription_id: SubscriptionId::from("@depth@100ms|BTCUSDT"),
401 time_exchange: DateTime::from_timestamp_millis(1571889248277).unwrap(),
402 time_engine: DateTime::from_timestamp_millis(1571889248276).unwrap(),
403 first_update_id: 157,
404 last_update_id: 160,
405 prev_last_update_id: 149,
406 bids: vec![BinanceLevel {
407 price: dec!(0.0024),
408 amount: dec!(10.0)
409 },],
410 asks: vec![BinanceLevel {
411 price: dec!(0.0026),
412 amount: dec!(100.0)
413 },]
414 }
415 );
416 }
417
418 #[test]
419 fn test_sequencer_is_first_update() {
420 struct TestCase {
421 sequencer: BinanceFuturesUsdOrderBookL2Sequencer,
422 expected: bool,
423 }
424
425 let tests = vec![
426 TestCase {
427 sequencer: BinanceFuturesUsdOrderBookL2Sequencer::new(10),
429 expected: true,
430 },
431 TestCase {
432 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
434 updates_processed: 10,
435 last_update_id: 100,
436 },
437 expected: false,
438 },
439 ];
440
441 for (index, test) in tests.into_iter().enumerate() {
442 assert_eq!(
443 test.sequencer.is_first_update(),
444 test.expected,
445 "TC{} failed",
446 index
447 );
448 }
449 }
450
451 #[test]
452 fn test_sequencer_validate_first_update() {
453 struct TestCase {
454 updater: BinanceFuturesUsdOrderBookL2Sequencer,
455 input: BinanceFuturesOrderBookL2Update,
456 expected: Result<(), DataError>,
457 }
458
459 let tests = vec![
460 TestCase {
461 updater: BinanceFuturesUsdOrderBookL2Sequencer {
463 updates_processed: 0,
464 last_update_id: 100,
465 },
466 input: BinanceFuturesOrderBookL2Update {
467 subscription_id: SubscriptionId::from("subscription_id"),
468 time_exchange: Default::default(),
469 time_engine: Default::default(),
470 first_update_id: 100,
471 last_update_id: 110,
472 prev_last_update_id: 90,
473 bids: vec![],
474 asks: vec![],
475 },
476 expected: Ok(()),
477 },
478 TestCase {
479 updater: BinanceFuturesUsdOrderBookL2Sequencer {
481 updates_processed: 0,
482 last_update_id: 100,
483 },
484 input: BinanceFuturesOrderBookL2Update {
485 subscription_id: SubscriptionId::from("subscription_id"),
486 time_exchange: Default::default(),
487 time_engine: Default::default(),
488 first_update_id: 100,
489 last_update_id: 90,
490 prev_last_update_id: 90,
491 bids: vec![],
492 asks: vec![],
493 },
494 expected: Err(DataError::InvalidSequence {
495 prev_last_update_id: 100,
496 first_update_id: 100,
497 }),
498 },
499 TestCase {
500 updater: BinanceFuturesUsdOrderBookL2Sequencer {
502 updates_processed: 0,
503 last_update_id: 100,
504 },
505 input: BinanceFuturesOrderBookL2Update {
506 subscription_id: SubscriptionId::from("subscription_id"),
507 time_exchange: Default::default(),
508 time_engine: Default::default(),
509 first_update_id: 110,
510 last_update_id: 120,
511 prev_last_update_id: 90,
512 bids: vec![],
513 asks: vec![],
514 },
515 expected: Err(DataError::InvalidSequence {
516 prev_last_update_id: 100,
517 first_update_id: 110,
518 }),
519 },
520 TestCase {
521 updater: BinanceFuturesUsdOrderBookL2Sequencer {
523 updates_processed: 0,
524 last_update_id: 100,
525 },
526 input: BinanceFuturesOrderBookL2Update {
527 subscription_id: SubscriptionId::from("subscription_id"),
528 time_exchange: Default::default(),
529 time_engine: Default::default(),
530 first_update_id: 110,
531 last_update_id: 90,
532 prev_last_update_id: 90,
533 bids: vec![],
534 asks: vec![],
535 },
536 expected: Err(DataError::InvalidSequence {
537 prev_last_update_id: 100,
538 first_update_id: 110,
539 }),
540 },
541 ];
542
543 for (index, test) in tests.into_iter().enumerate() {
544 let actual = test.updater.validate_first_update(&test.input);
545 match (actual, test.expected) {
546 (Ok(actual), Ok(expected)) => {
547 assert_eq!(actual, expected, "TC{} failed", index)
548 }
549 (Err(_), Err(_)) => {
550 }
552 (actual, expected) => {
553 panic!(
555 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
556 );
557 }
558 }
559 }
560 }
561
562 #[test]
563 fn test_sequencer_validate_next_update() {
564 struct TestCase {
565 updater: BinanceFuturesUsdOrderBookL2Sequencer,
566 input: BinanceFuturesOrderBookL2Update,
567 expected: Result<(), DataError>,
568 }
569
570 let tests = vec![
571 TestCase {
572 updater: BinanceFuturesUsdOrderBookL2Sequencer {
574 updates_processed: 100,
575 last_update_id: 100,
576 },
577 input: BinanceFuturesOrderBookL2Update {
578 subscription_id: SubscriptionId::from("subscription_id"),
579 time_exchange: Default::default(),
580 time_engine: Default::default(),
581 first_update_id: 101,
582 last_update_id: 110,
583 prev_last_update_id: 100,
584 bids: vec![],
585 asks: vec![],
586 },
587 expected: Ok(()),
588 },
589 TestCase {
590 updater: BinanceFuturesUsdOrderBookL2Sequencer {
592 updates_processed: 100,
593 last_update_id: 100,
594 },
595 input: BinanceFuturesOrderBookL2Update {
596 subscription_id: SubscriptionId::from("subscription_id"),
597 time_exchange: Default::default(),
598 time_engine: Default::default(),
599 first_update_id: 100,
600 last_update_id: 90,
601 prev_last_update_id: 90,
602 bids: vec![],
603 asks: vec![],
604 },
605 expected: Err(DataError::InvalidSequence {
606 prev_last_update_id: 100,
607 first_update_id: 100,
608 }),
609 },
610 ];
611
612 for (index, test) in tests.into_iter().enumerate() {
613 let actual = test.updater.validate_next_update(&test.input);
614 match (actual, test.expected) {
615 (Ok(actual), Ok(expected)) => {
616 assert_eq!(actual, expected, "TC{} failed", index)
617 }
618 (Err(_), Err(_)) => {
619 }
621 (actual, expected) => {
622 panic!(
624 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
625 );
626 }
627 }
628 }
629 }
630
631 #[test]
632 fn test_update_barter_order_book_with_sequenced_updates() {
633 struct TestCase {
634 sequencer: BinanceFuturesUsdOrderBookL2Sequencer,
635 book: OrderBook,
636 input_update: BinanceFuturesOrderBookL2Update,
637 expected: OrderBook,
638 }
639
640 let tests = vec![
641 TestCase {
642 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
644 updates_processed: 100,
645 last_update_id: 100,
646 },
647 book: OrderBook::new(0, None, vec![Level::new(50, 1)], vec![Level::new(100, 1)]),
648 input_update: BinanceFuturesOrderBookL2Update {
649 subscription_id: SubscriptionId::from("subscription_id"),
650 time_exchange: Default::default(),
651 time_engine: Default::default(),
652 first_update_id: 0,
653 last_update_id: 0,
654 prev_last_update_id: 0,
655 bids: vec![],
656 asks: vec![],
657 },
658 expected: OrderBook::new(
659 0,
660 None,
661 vec![Level::new(50, 1)],
662 vec![Level::new(100, 1)],
663 ),
664 },
665 TestCase {
666 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
668 updates_processed: 100,
669 last_update_id: 100,
670 },
671 book: OrderBook::new(
672 100,
673 None,
674 vec![Level::new(80, 1), Level::new(100, 1), Level::new(90, 1)],
675 vec![Level::new(150, 1), Level::new(110, 1), Level::new(120, 1)],
676 ),
677 input_update: BinanceFuturesOrderBookL2Update {
678 subscription_id: SubscriptionId::from("subscription_id"),
679 time_exchange: Default::default(),
680 time_engine: Default::default(),
681 first_update_id: 101,
682 last_update_id: 110,
683 prev_last_update_id: 100,
684 bids: vec![
685 BinanceLevel {
687 price: dec!(80),
688 amount: dec!(0),
689 },
690 BinanceLevel {
692 price: dec!(90),
693 amount: dec!(10),
694 },
695 ],
696 asks: vec![
697 BinanceLevel {
699 price: dec!(200),
700 amount: dec!(1),
701 },
702 BinanceLevel {
704 price: dec!(500),
705 amount: dec!(0),
706 },
707 ],
708 },
709 expected: OrderBook::new(
710 110,
711 None,
712 vec![Level::new(100, 1), Level::new(90, 10)],
713 vec![
714 Level::new(110, 1),
715 Level::new(120, 1),
716 Level::new(150, 1),
717 Level::new(200, 1),
718 ],
719 ),
720 },
721 ];
722
723 for (index, mut test) in tests.into_iter().enumerate() {
724 if let Some(valid_update) = test.sequencer.validate_sequence(test.input_update).unwrap()
725 {
726 let barter_update = OrderBookEvent::Update(OrderBook::new(
727 valid_update.last_update_id,
728 None,
729 valid_update.bids,
730 valid_update.asks,
731 ));
732
733 test.book.update(barter_update);
734 }
735
736 assert_eq!(test.book, test.expected, "TC{index} failed");
737 }
738 }
739}