1use super::super::book::BinanceLevel;
2use crate::{
3 Identifier, SnapshotFetcher,
4 books::OrderBook,
5 error::DataError,
6 event::{MarketEvent, MarketIter},
7 exchange::{
8 Connector,
9 binance::{
10 book::l2::{BinanceOrderBookL2Meta, BinanceOrderBookL2Snapshot},
11 futures::BinanceFuturesUsd,
12 market::BinanceMarket,
13 },
14 },
15 instrument::InstrumentData,
16 subscription::{
17 Map, Subscription,
18 book::{OrderBookEvent, OrderBooksL2},
19 },
20 transformer::ExchangeTransformer,
21};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use futures_util::future::try_join_all;
25use rustrade_instrument::exchange::ExchangeId;
26use rustrade_integration::{
27 Transformer, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
28};
29use serde::{Deserialize, Serialize};
30use std::future::Future;
31use tokio::sync::mpsc::UnboundedSender;
32
33pub const HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_FUTURES_USD: &str =
37 "https://fapi.binance.com/fapi/v1/depth";
38
39#[derive(Debug)]
40pub struct BinanceFuturesUsdOrderBooksL2SnapshotFetcher;
41
42impl SnapshotFetcher<BinanceFuturesUsd, OrderBooksL2>
43 for BinanceFuturesUsdOrderBooksL2SnapshotFetcher
44{
45 fn fetch_snapshots<Instrument>(
46 subscriptions: &[Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>],
47 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, OrderBookEvent>>, SocketError>>
48 + Send
49 where
50 Instrument: InstrumentData,
51 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
52 {
53 let l2_snapshot_futures = subscriptions.iter().map(|sub| {
54 let market = sub.id();
56 let snapshot_url = format!(
57 "{}?symbol={}&limit=100",
58 HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_FUTURES_USD,
59 market.as_ref(),
60 );
61
62 async move {
63 let snapshot = reqwest::get(snapshot_url)
65 .await
66 .map_err(SocketError::Http)?
67 .json::<BinanceOrderBookL2Snapshot>()
68 .await
69 .map_err(SocketError::Http)?;
70
71 Ok(MarketEvent::from((
72 ExchangeId::BinanceFuturesUsd,
73 sub.instrument.key().clone(),
74 snapshot,
75 )))
76 }
77 });
78
79 try_join_all(l2_snapshot_futures)
80 }
81}
82
83#[derive(Debug)]
84pub struct BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey> {
85 instrument_map:
86 Map<BinanceOrderBookL2Meta<InstrumentKey, BinanceFuturesUsdOrderBookL2Sequencer>>,
87}
88
89#[async_trait]
90impl<InstrumentKey> ExchangeTransformer<BinanceFuturesUsd, InstrumentKey, OrderBooksL2>
91 for BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey>
92where
93 InstrumentKey: Clone + PartialEq + Send + Sync,
94{
95 async fn init(
96 instrument_map: Map<InstrumentKey>,
97 initial_snapshots: &[MarketEvent<InstrumentKey, OrderBookEvent>],
98 _: UnboundedSender<WsMessage>,
99 ) -> Result<Self, DataError> {
100 let instrument_map = instrument_map
101 .0
102 .into_iter()
103 .map(|(sub_id, instrument_key)| {
104 let snapshot = initial_snapshots
105 .iter()
106 .find(|snapshot| snapshot.instrument == instrument_key)
107 .ok_or_else(|| DataError::InitialSnapshotMissing(sub_id.clone()))?;
108
109 let OrderBookEvent::Snapshot(snapshot) = &snapshot.kind else {
110 return Err(DataError::InitialSnapshotInvalid(String::from(
111 "expected OrderBookEvent::Snapshot but found OrderBookEvent::Update",
112 )));
113 };
114
115 let sequencer = BinanceFuturesUsdOrderBookL2Sequencer {
116 updates_processed: 0,
117 last_update_id: snapshot.sequence(),
118 };
119
120 Ok((
121 sub_id,
122 BinanceOrderBookL2Meta::new(instrument_key, sequencer),
123 ))
124 })
125 .collect::<Result<Map<_>, _>>()?;
126
127 Ok(Self { instrument_map })
128 }
129}
130
131impl<InstrumentKey> Transformer for BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey>
132where
133 InstrumentKey: Clone,
134{
135 type Error = DataError;
136 type Input = BinanceFuturesOrderBookL2Update;
137 type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
138 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
139
140 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
141 let subscription_id = match input.id() {
143 Some(subscription_id) => subscription_id,
144 None => return vec![],
145 };
146
147 let instrument = match self.instrument_map.find_mut(&subscription_id) {
149 Ok(instrument) => instrument,
150 Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
151 };
152
153 let valid_update = match instrument.sequencer.validate_sequence(input) {
155 Ok(Some(valid_update)) => valid_update,
156 Ok(None) => return vec![],
157 Err(error) => return vec![Err(error)],
158 };
159
160 MarketIter::<InstrumentKey, OrderBookEvent>::from((
161 BinanceFuturesUsd::ID,
162 instrument.key.clone(),
163 valid_update,
164 ))
165 .0
166 }
167}
168
169#[derive(Debug)]
195pub struct BinanceFuturesUsdOrderBookL2Sequencer {
196 pub updates_processed: u64,
197 pub last_update_id: u64,
198}
199
200impl BinanceFuturesUsdOrderBookL2Sequencer {
201 pub fn new(last_update_id: u64) -> Self {
203 Self {
204 updates_processed: 0,
205 last_update_id,
206 }
207 }
208
209 pub fn validate_sequence(
213 &mut self,
214 update: BinanceFuturesOrderBookL2Update,
215 ) -> Result<Option<BinanceFuturesOrderBookL2Update>, DataError> {
216 if update.last_update_id < self.last_update_id {
218 return Ok(None);
219 }
220
221 if self.is_first_update() {
222 self.validate_first_update(&update)?;
224 } else {
225 self.validate_next_update(&update)?;
227 }
228
229 self.updates_processed += 1;
231 self.last_update_id = update.last_update_id;
232
233 Ok(Some(update))
234 }
235
236 pub fn is_first_update(&self) -> bool {
241 self.updates_processed == 0
242 }
243
244 pub fn validate_first_update(
249 &self,
250 update: &BinanceFuturesOrderBookL2Update,
251 ) -> Result<(), DataError> {
252 if update.first_update_id <= self.last_update_id
253 && update.last_update_id >= self.last_update_id
254 {
255 Ok(())
256 } else {
257 Err(DataError::InvalidSequence {
258 prev_last_update_id: self.last_update_id,
259 first_update_id: update.first_update_id,
260 })
261 }
262 }
263
264 pub fn validate_next_update(
270 &self,
271 update: &BinanceFuturesOrderBookL2Update,
272 ) -> Result<(), DataError> {
273 if update.prev_last_update_id == self.last_update_id {
274 Ok(())
275 } else {
276 Err(DataError::InvalidSequence {
277 prev_last_update_id: self.last_update_id,
278 first_update_id: update.first_update_id,
279 })
280 }
281 }
282}
283
284#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
306pub struct BinanceFuturesOrderBookL2Update {
307 #[serde(
308 alias = "s",
309 deserialize_with = "super::super::book::l2::de_ob_l2_subscription_id"
310 )]
311 pub subscription_id: SubscriptionId,
312 #[serde(
313 alias = "E",
314 deserialize_with = "rustrade_integration::serde::de::de_u64_epoch_ms_as_datetime_utc"
315 )]
316 pub time_exchange: DateTime<Utc>,
317 #[serde(
318 alias = "T",
319 deserialize_with = "rustrade_integration::serde::de::de_u64_epoch_ms_as_datetime_utc"
320 )]
321 pub time_engine: DateTime<Utc>,
322 #[serde(alias = "U")]
323 pub first_update_id: u64,
324 #[serde(alias = "u")]
325 pub last_update_id: u64,
326 #[serde(alias = "pu")]
327 pub prev_last_update_id: u64,
328 #[serde(alias = "b")]
329 pub bids: Vec<BinanceLevel>,
330 #[serde(alias = "a")]
331 pub asks: Vec<BinanceLevel>,
332}
333
334impl Identifier<Option<SubscriptionId>> for BinanceFuturesOrderBookL2Update {
335 fn id(&self) -> Option<SubscriptionId> {
336 Some(self.subscription_id.clone())
337 }
338}
339
340impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BinanceFuturesOrderBookL2Update)>
341 for MarketIter<InstrumentKey, OrderBookEvent>
342{
343 fn from(
344 (exchange, instrument, update): (
345 ExchangeId,
346 InstrumentKey,
347 BinanceFuturesOrderBookL2Update,
348 ),
349 ) -> Self {
350 Self(vec![Ok(MarketEvent {
351 time_exchange: update.time_exchange,
352 time_received: Utc::now(),
353 exchange,
354 instrument,
355 kind: OrderBookEvent::Update(OrderBook::new(
356 update.last_update_id,
357 Some(update.time_engine),
358 update.bids,
359 update.asks,
360 )),
361 })])
362 }
363}
364
365#[cfg(test)]
366#[allow(clippy::unwrap_used)] mod tests {
368 use super::*;
369 use crate::books::Level;
370 use rust_decimal_macros::dec;
371
372 #[test]
373 fn test_de_binance_futures_order_book_l2_update() {
374 let input = r#"
375 {
376 "e": "depthUpdate",
377 "E": 1571889248277,
378 "T": 1571889248276,
379 "s": "BTCUSDT",
380 "U": 157,
381 "u": 160,
382 "pu": 149,
383 "b": [
384 [
385 "0.0024",
386 "10"
387 ]
388 ],
389 "a": [
390 [
391 "0.0026",
392 "100"
393 ]
394 ]
395 }
396 "#;
397
398 assert_eq!(
399 serde_json::from_str::<BinanceFuturesOrderBookL2Update>(input).unwrap(),
400 BinanceFuturesOrderBookL2Update {
401 subscription_id: SubscriptionId::from("@depth@100ms|BTCUSDT"),
402 time_exchange: DateTime::from_timestamp_millis(1571889248277).unwrap(),
403 time_engine: DateTime::from_timestamp_millis(1571889248276).unwrap(),
404 first_update_id: 157,
405 last_update_id: 160,
406 prev_last_update_id: 149,
407 bids: vec![BinanceLevel {
408 price: dec!(0.0024),
409 amount: dec!(10.0)
410 },],
411 asks: vec![BinanceLevel {
412 price: dec!(0.0026),
413 amount: dec!(100.0)
414 },]
415 }
416 );
417 }
418
419 #[test]
420 fn test_sequencer_is_first_update() {
421 struct TestCase {
422 sequencer: BinanceFuturesUsdOrderBookL2Sequencer,
423 expected: bool,
424 }
425
426 let tests = vec![
427 TestCase {
428 sequencer: BinanceFuturesUsdOrderBookL2Sequencer::new(10),
430 expected: true,
431 },
432 TestCase {
433 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
435 updates_processed: 10,
436 last_update_id: 100,
437 },
438 expected: false,
439 },
440 ];
441
442 for (index, test) in tests.into_iter().enumerate() {
443 assert_eq!(
444 test.sequencer.is_first_update(),
445 test.expected,
446 "TC{} failed",
447 index
448 );
449 }
450 }
451
452 #[test]
453 fn test_sequencer_validate_first_update() {
454 struct TestCase {
455 updater: BinanceFuturesUsdOrderBookL2Sequencer,
456 input: BinanceFuturesOrderBookL2Update,
457 expected: Result<(), DataError>,
458 }
459
460 let tests = vec![
461 TestCase {
462 updater: BinanceFuturesUsdOrderBookL2Sequencer {
464 updates_processed: 0,
465 last_update_id: 100,
466 },
467 input: BinanceFuturesOrderBookL2Update {
468 subscription_id: SubscriptionId::from("subscription_id"),
469 time_exchange: Default::default(),
470 time_engine: Default::default(),
471 first_update_id: 100,
472 last_update_id: 110,
473 prev_last_update_id: 90,
474 bids: vec![],
475 asks: vec![],
476 },
477 expected: Ok(()),
478 },
479 TestCase {
480 updater: BinanceFuturesUsdOrderBookL2Sequencer {
482 updates_processed: 0,
483 last_update_id: 100,
484 },
485 input: BinanceFuturesOrderBookL2Update {
486 subscription_id: SubscriptionId::from("subscription_id"),
487 time_exchange: Default::default(),
488 time_engine: Default::default(),
489 first_update_id: 100,
490 last_update_id: 90,
491 prev_last_update_id: 90,
492 bids: vec![],
493 asks: vec![],
494 },
495 expected: Err(DataError::InvalidSequence {
496 prev_last_update_id: 100,
497 first_update_id: 100,
498 }),
499 },
500 TestCase {
501 updater: BinanceFuturesUsdOrderBookL2Sequencer {
503 updates_processed: 0,
504 last_update_id: 100,
505 },
506 input: BinanceFuturesOrderBookL2Update {
507 subscription_id: SubscriptionId::from("subscription_id"),
508 time_exchange: Default::default(),
509 time_engine: Default::default(),
510 first_update_id: 110,
511 last_update_id: 120,
512 prev_last_update_id: 90,
513 bids: vec![],
514 asks: vec![],
515 },
516 expected: Err(DataError::InvalidSequence {
517 prev_last_update_id: 100,
518 first_update_id: 110,
519 }),
520 },
521 TestCase {
522 updater: BinanceFuturesUsdOrderBookL2Sequencer {
524 updates_processed: 0,
525 last_update_id: 100,
526 },
527 input: BinanceFuturesOrderBookL2Update {
528 subscription_id: SubscriptionId::from("subscription_id"),
529 time_exchange: Default::default(),
530 time_engine: Default::default(),
531 first_update_id: 110,
532 last_update_id: 90,
533 prev_last_update_id: 90,
534 bids: vec![],
535 asks: vec![],
536 },
537 expected: Err(DataError::InvalidSequence {
538 prev_last_update_id: 100,
539 first_update_id: 110,
540 }),
541 },
542 ];
543
544 for (index, test) in tests.into_iter().enumerate() {
545 let actual = test.updater.validate_first_update(&test.input);
546 match (actual, test.expected) {
547 (Ok(_), Ok(_)) => {
548 }
550 (Err(actual), Err(expected)) => {
551 assert_eq!(actual, expected, "TC{index} error variant mismatch");
552 }
553 (actual, expected) => {
554 panic!(
556 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
557 );
558 }
559 }
560 }
561 }
562
563 #[test]
564 fn test_sequencer_validate_next_update() {
565 struct TestCase {
566 updater: BinanceFuturesUsdOrderBookL2Sequencer,
567 input: BinanceFuturesOrderBookL2Update,
568 expected: Result<(), DataError>,
569 }
570
571 let tests = vec![
572 TestCase {
573 updater: BinanceFuturesUsdOrderBookL2Sequencer {
575 updates_processed: 100,
576 last_update_id: 100,
577 },
578 input: BinanceFuturesOrderBookL2Update {
579 subscription_id: SubscriptionId::from("subscription_id"),
580 time_exchange: Default::default(),
581 time_engine: Default::default(),
582 first_update_id: 101,
583 last_update_id: 110,
584 prev_last_update_id: 100,
585 bids: vec![],
586 asks: vec![],
587 },
588 expected: Ok(()),
589 },
590 TestCase {
591 updater: BinanceFuturesUsdOrderBookL2Sequencer {
593 updates_processed: 100,
594 last_update_id: 100,
595 },
596 input: BinanceFuturesOrderBookL2Update {
597 subscription_id: SubscriptionId::from("subscription_id"),
598 time_exchange: Default::default(),
599 time_engine: Default::default(),
600 first_update_id: 100,
601 last_update_id: 90,
602 prev_last_update_id: 90,
603 bids: vec![],
604 asks: vec![],
605 },
606 expected: Err(DataError::InvalidSequence {
607 prev_last_update_id: 100,
608 first_update_id: 100,
609 }),
610 },
611 ];
612
613 for (index, test) in tests.into_iter().enumerate() {
614 let actual = test.updater.validate_next_update(&test.input);
615 match (actual, test.expected) {
616 (Ok(_), Ok(_)) => {
617 }
619 (Err(actual), Err(expected)) => {
620 assert_eq!(actual, expected, "TC{index} error variant mismatch");
621 }
622 (actual, expected) => {
623 panic!(
625 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
626 );
627 }
628 }
629 }
630 }
631
632 #[test]
633 fn test_update_rustrade_order_book_with_sequenced_updates() {
634 struct TestCase {
635 sequencer: BinanceFuturesUsdOrderBookL2Sequencer,
636 book: OrderBook,
637 input_update: BinanceFuturesOrderBookL2Update,
638 expected: OrderBook,
639 }
640
641 let tests = vec![
642 TestCase {
643 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
645 updates_processed: 100,
646 last_update_id: 100,
647 },
648 book: OrderBook::new(0, None, vec![Level::new(50, 1)], vec![Level::new(100, 1)]),
649 input_update: BinanceFuturesOrderBookL2Update {
650 subscription_id: SubscriptionId::from("subscription_id"),
651 time_exchange: Default::default(),
652 time_engine: Default::default(),
653 first_update_id: 0,
654 last_update_id: 0,
655 prev_last_update_id: 0,
656 bids: vec![],
657 asks: vec![],
658 },
659 expected: OrderBook::new(
660 0,
661 None,
662 vec![Level::new(50, 1)],
663 vec![Level::new(100, 1)],
664 ),
665 },
666 TestCase {
667 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
669 updates_processed: 100,
670 last_update_id: 100,
671 },
672 book: OrderBook::new(
673 100,
674 None,
675 vec![Level::new(80, 1), Level::new(100, 1), Level::new(90, 1)],
676 vec![Level::new(150, 1), Level::new(110, 1), Level::new(120, 1)],
677 ),
678 input_update: BinanceFuturesOrderBookL2Update {
679 subscription_id: SubscriptionId::from("subscription_id"),
680 time_exchange: Default::default(),
681 time_engine: Default::default(),
682 first_update_id: 101,
683 last_update_id: 110,
684 prev_last_update_id: 100,
685 bids: vec![
686 BinanceLevel {
688 price: dec!(80),
689 amount: dec!(0),
690 },
691 BinanceLevel {
693 price: dec!(90),
694 amount: dec!(10),
695 },
696 ],
697 asks: vec![
698 BinanceLevel {
700 price: dec!(200),
701 amount: dec!(1),
702 },
703 BinanceLevel {
705 price: dec!(500),
706 amount: dec!(0),
707 },
708 ],
709 },
710 expected: OrderBook::new(
711 110,
712 None,
713 vec![Level::new(100, 1), Level::new(90, 10)],
714 vec![
715 Level::new(110, 1),
716 Level::new(120, 1),
717 Level::new(150, 1),
718 Level::new(200, 1),
719 ],
720 ),
721 },
722 ];
723
724 for (index, mut test) in tests.into_iter().enumerate() {
725 if let Some(valid_update) = test.sequencer.validate_sequence(test.input_update).unwrap()
726 {
727 let rustrade_update = OrderBookEvent::Update(OrderBook::new(
728 valid_update.last_update_id,
729 None,
730 valid_update.bids,
731 valid_update.asks,
732 ));
733
734 test.book.update(&rustrade_update);
735 }
736
737 assert_eq!(test.book, test.expected, "TC{index} failed");
738 }
739 }
740}