1use super::super::book::BinanceLevel;
2use crate::{
3 Identifier, SnapshotFetcher,
4 books::OrderBook,
5 error::DataError,
6 event::{MarketEvent, MarketIter},
7 exchange::{
8 Connector,
9 binance::{
10 book::l2::{BinanceOrderBookL2Meta, BinanceOrderBookL2Snapshot},
11 futures::BinanceFuturesUsd,
12 market::BinanceMarket,
13 },
14 },
15 instrument::InstrumentData,
16 subscription::{
17 Map, Subscription,
18 book::{OrderBookEvent, OrderBooksL2},
19 },
20 transformer::ExchangeTransformer,
21};
22use chrono::{DateTime, Utc};
23use futures_util::future::try_join_all;
24use rustrade_instrument::exchange::ExchangeId;
25use rustrade_integration::{
26 Transformer, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
27};
28use serde::{Deserialize, Serialize};
29use std::future::Future;
30use tokio::sync::mpsc::UnboundedSender;
31
32pub const HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_FUTURES_USD: &str =
36 "https://fapi.binance.com/fapi/v1/depth";
37
38#[derive(Debug)]
39pub struct BinanceFuturesUsdOrderBooksL2SnapshotFetcher;
40
41impl SnapshotFetcher<BinanceFuturesUsd, OrderBooksL2>
42 for BinanceFuturesUsdOrderBooksL2SnapshotFetcher
43{
44 fn fetch_snapshots<Instrument>(
45 subscriptions: &[Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>],
46 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, OrderBookEvent>>, SocketError>>
47 + Send
48 where
49 Instrument: InstrumentData,
50 Subscription<BinanceFuturesUsd, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
51 {
52 let l2_snapshot_futures = subscriptions.iter().map(|sub| {
53 let market = sub.id();
55 let snapshot_url = format!(
56 "{}?symbol={}&limit=100",
57 HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_FUTURES_USD,
58 market.as_ref(),
59 );
60
61 async move {
62 let snapshot = reqwest::get(snapshot_url)
64 .await
65 .map_err(SocketError::Http)?
66 .json::<BinanceOrderBookL2Snapshot>()
67 .await
68 .map_err(SocketError::Http)?;
69
70 Ok(MarketEvent::from((
71 ExchangeId::BinanceFuturesUsd,
72 sub.instrument.key().clone(),
73 snapshot,
74 )))
75 }
76 });
77
78 try_join_all(l2_snapshot_futures)
79 }
80}
81
82#[derive(Debug)]
83pub struct BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey> {
84 instrument_map:
85 Map<BinanceOrderBookL2Meta<InstrumentKey, BinanceFuturesUsdOrderBookL2Sequencer>>,
86}
87
88impl<InstrumentKey> ExchangeTransformer<BinanceFuturesUsd, InstrumentKey, OrderBooksL2>
89 for BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey>
90where
91 InstrumentKey: Clone + PartialEq + Send + Sync,
92{
93 async fn init(
94 instrument_map: Map<InstrumentKey>,
95 initial_snapshots: &[MarketEvent<InstrumentKey, OrderBookEvent>],
96 _: UnboundedSender<WsMessage>,
97 ) -> Result<Self, DataError> {
98 let instrument_map = instrument_map
99 .0
100 .into_iter()
101 .map(|(sub_id, instrument_key)| {
102 let snapshot = initial_snapshots
103 .iter()
104 .find(|snapshot| snapshot.instrument == instrument_key)
105 .ok_or_else(|| DataError::InitialSnapshotMissing(sub_id.clone()))?;
106
107 let OrderBookEvent::Snapshot(snapshot) = &snapshot.kind else {
108 return Err(DataError::InitialSnapshotInvalid(String::from(
109 "expected OrderBookEvent::Snapshot but found OrderBookEvent::Update",
110 )));
111 };
112
113 let sequencer = BinanceFuturesUsdOrderBookL2Sequencer {
114 updates_processed: 0,
115 last_update_id: snapshot.sequence(),
116 };
117
118 Ok((
119 sub_id,
120 BinanceOrderBookL2Meta::new(instrument_key, sequencer),
121 ))
122 })
123 .collect::<Result<Map<_>, _>>()?;
124
125 Ok(Self { instrument_map })
126 }
127}
128
129impl<InstrumentKey> Transformer for BinanceFuturesUsdOrderBooksL2Transformer<InstrumentKey>
130where
131 InstrumentKey: Clone,
132{
133 type Error = DataError;
134 type Input = BinanceFuturesOrderBookL2Update;
135 type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
136 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
137
138 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
139 let subscription_id = match input.id() {
141 Some(subscription_id) => subscription_id,
142 None => return vec![],
143 };
144
145 let instrument = match self.instrument_map.find_mut(&subscription_id) {
147 Ok(instrument) => instrument,
148 Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
149 };
150
151 let valid_update = match instrument.sequencer.validate_sequence(input) {
153 Ok(Some(valid_update)) => valid_update,
154 Ok(None) => return vec![],
155 Err(error) => return vec![Err(error)],
156 };
157
158 MarketIter::<InstrumentKey, OrderBookEvent>::from((
159 BinanceFuturesUsd::ID,
160 instrument.key.clone(),
161 valid_update,
162 ))
163 .0
164 }
165}
166
167#[derive(Debug)]
193pub struct BinanceFuturesUsdOrderBookL2Sequencer {
194 pub updates_processed: u64,
195 pub last_update_id: u64,
196}
197
198impl BinanceFuturesUsdOrderBookL2Sequencer {
199 pub fn new(last_update_id: u64) -> Self {
201 Self {
202 updates_processed: 0,
203 last_update_id,
204 }
205 }
206
207 pub fn validate_sequence(
211 &mut self,
212 update: BinanceFuturesOrderBookL2Update,
213 ) -> Result<Option<BinanceFuturesOrderBookL2Update>, DataError> {
214 if update.last_update_id < self.last_update_id {
216 return Ok(None);
217 }
218
219 if self.is_first_update() {
220 self.validate_first_update(&update)?;
222 } else {
223 self.validate_next_update(&update)?;
225 }
226
227 self.updates_processed += 1;
229 self.last_update_id = update.last_update_id;
230
231 Ok(Some(update))
232 }
233
234 pub fn is_first_update(&self) -> bool {
239 self.updates_processed == 0
240 }
241
242 pub fn validate_first_update(
247 &self,
248 update: &BinanceFuturesOrderBookL2Update,
249 ) -> Result<(), DataError> {
250 if update.first_update_id <= self.last_update_id
251 && update.last_update_id >= self.last_update_id
252 {
253 Ok(())
254 } else {
255 Err(DataError::InvalidSequence {
256 prev_last_update_id: self.last_update_id,
257 first_update_id: update.first_update_id,
258 })
259 }
260 }
261
262 pub fn validate_next_update(
268 &self,
269 update: &BinanceFuturesOrderBookL2Update,
270 ) -> Result<(), DataError> {
271 if update.prev_last_update_id == self.last_update_id {
272 Ok(())
273 } else {
274 Err(DataError::InvalidSequence {
275 prev_last_update_id: self.last_update_id,
276 first_update_id: update.first_update_id,
277 })
278 }
279 }
280}
281
282#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
304pub struct BinanceFuturesOrderBookL2Update {
305 #[serde(
306 alias = "s",
307 deserialize_with = "super::super::book::l2::de_ob_l2_subscription_id"
308 )]
309 pub subscription_id: SubscriptionId,
310 #[serde(
311 alias = "E",
312 deserialize_with = "rustrade_integration::serde::de::de_u64_epoch_ms_as_datetime_utc"
313 )]
314 pub time_exchange: DateTime<Utc>,
315 #[serde(
316 alias = "T",
317 deserialize_with = "rustrade_integration::serde::de::de_u64_epoch_ms_as_datetime_utc"
318 )]
319 pub time_engine: DateTime<Utc>,
320 #[serde(alias = "U")]
321 pub first_update_id: u64,
322 #[serde(alias = "u")]
323 pub last_update_id: u64,
324 #[serde(alias = "pu")]
325 pub prev_last_update_id: u64,
326 #[serde(alias = "b")]
327 pub bids: Vec<BinanceLevel>,
328 #[serde(alias = "a")]
329 pub asks: Vec<BinanceLevel>,
330}
331
332impl Identifier<Option<SubscriptionId>> for BinanceFuturesOrderBookL2Update {
333 fn id(&self) -> Option<SubscriptionId> {
334 Some(self.subscription_id.clone())
335 }
336}
337
338impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BinanceFuturesOrderBookL2Update)>
339 for MarketIter<InstrumentKey, OrderBookEvent>
340{
341 fn from(
342 (exchange, instrument, update): (
343 ExchangeId,
344 InstrumentKey,
345 BinanceFuturesOrderBookL2Update,
346 ),
347 ) -> Self {
348 Self(vec![Ok(MarketEvent {
349 time_exchange: update.time_exchange,
350 time_received: Utc::now(),
351 exchange,
352 instrument,
353 kind: OrderBookEvent::Update(OrderBook::new(
354 update.last_update_id,
355 Some(update.time_engine),
356 update.bids,
357 update.asks,
358 )),
359 })])
360 }
361}
362
363#[cfg(test)]
364#[allow(clippy::unwrap_used)] mod tests {
366 use super::*;
367 use crate::books::Level;
368 use rust_decimal_macros::dec;
369
370 #[test]
371 fn test_de_binance_futures_order_book_l2_update() {
372 let input = r#"
373 {
374 "e": "depthUpdate",
375 "E": 1571889248277,
376 "T": 1571889248276,
377 "s": "BTCUSDT",
378 "U": 157,
379 "u": 160,
380 "pu": 149,
381 "b": [
382 [
383 "0.0024",
384 "10"
385 ]
386 ],
387 "a": [
388 [
389 "0.0026",
390 "100"
391 ]
392 ]
393 }
394 "#;
395
396 assert_eq!(
397 serde_json::from_str::<BinanceFuturesOrderBookL2Update>(input).unwrap(),
398 BinanceFuturesOrderBookL2Update {
399 subscription_id: SubscriptionId::from("@depth@100ms|BTCUSDT"),
400 time_exchange: DateTime::from_timestamp_millis(1571889248277).unwrap(),
401 time_engine: DateTime::from_timestamp_millis(1571889248276).unwrap(),
402 first_update_id: 157,
403 last_update_id: 160,
404 prev_last_update_id: 149,
405 bids: vec![BinanceLevel {
406 price: dec!(0.0024),
407 amount: dec!(10.0)
408 },],
409 asks: vec![BinanceLevel {
410 price: dec!(0.0026),
411 amount: dec!(100.0)
412 },]
413 }
414 );
415 }
416
417 #[test]
418 fn test_sequencer_is_first_update() {
419 struct TestCase {
420 sequencer: BinanceFuturesUsdOrderBookL2Sequencer,
421 expected: bool,
422 }
423
424 let tests = vec![
425 TestCase {
426 sequencer: BinanceFuturesUsdOrderBookL2Sequencer::new(10),
428 expected: true,
429 },
430 TestCase {
431 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
433 updates_processed: 10,
434 last_update_id: 100,
435 },
436 expected: false,
437 },
438 ];
439
440 for (index, test) in tests.into_iter().enumerate() {
441 assert_eq!(
442 test.sequencer.is_first_update(),
443 test.expected,
444 "TC{} failed",
445 index
446 );
447 }
448 }
449
450 #[test]
451 fn test_sequencer_validate_first_update() {
452 struct TestCase {
453 updater: BinanceFuturesUsdOrderBookL2Sequencer,
454 input: BinanceFuturesOrderBookL2Update,
455 expected: Result<(), DataError>,
456 }
457
458 let tests = vec![
459 TestCase {
460 updater: BinanceFuturesUsdOrderBookL2Sequencer {
462 updates_processed: 0,
463 last_update_id: 100,
464 },
465 input: BinanceFuturesOrderBookL2Update {
466 subscription_id: SubscriptionId::from("subscription_id"),
467 time_exchange: Default::default(),
468 time_engine: Default::default(),
469 first_update_id: 100,
470 last_update_id: 110,
471 prev_last_update_id: 90,
472 bids: vec![],
473 asks: vec![],
474 },
475 expected: Ok(()),
476 },
477 TestCase {
478 updater: BinanceFuturesUsdOrderBookL2Sequencer {
480 updates_processed: 0,
481 last_update_id: 100,
482 },
483 input: BinanceFuturesOrderBookL2Update {
484 subscription_id: SubscriptionId::from("subscription_id"),
485 time_exchange: Default::default(),
486 time_engine: Default::default(),
487 first_update_id: 100,
488 last_update_id: 90,
489 prev_last_update_id: 90,
490 bids: vec![],
491 asks: vec![],
492 },
493 expected: Err(DataError::InvalidSequence {
494 prev_last_update_id: 100,
495 first_update_id: 100,
496 }),
497 },
498 TestCase {
499 updater: BinanceFuturesUsdOrderBookL2Sequencer {
501 updates_processed: 0,
502 last_update_id: 100,
503 },
504 input: BinanceFuturesOrderBookL2Update {
505 subscription_id: SubscriptionId::from("subscription_id"),
506 time_exchange: Default::default(),
507 time_engine: Default::default(),
508 first_update_id: 110,
509 last_update_id: 120,
510 prev_last_update_id: 90,
511 bids: vec![],
512 asks: vec![],
513 },
514 expected: Err(DataError::InvalidSequence {
515 prev_last_update_id: 100,
516 first_update_id: 110,
517 }),
518 },
519 TestCase {
520 updater: BinanceFuturesUsdOrderBookL2Sequencer {
522 updates_processed: 0,
523 last_update_id: 100,
524 },
525 input: BinanceFuturesOrderBookL2Update {
526 subscription_id: SubscriptionId::from("subscription_id"),
527 time_exchange: Default::default(),
528 time_engine: Default::default(),
529 first_update_id: 110,
530 last_update_id: 90,
531 prev_last_update_id: 90,
532 bids: vec![],
533 asks: vec![],
534 },
535 expected: Err(DataError::InvalidSequence {
536 prev_last_update_id: 100,
537 first_update_id: 110,
538 }),
539 },
540 ];
541
542 for (index, test) in tests.into_iter().enumerate() {
543 let actual = test.updater.validate_first_update(&test.input);
544 match (actual, test.expected) {
545 (Ok(_), Ok(_)) => {
546 }
548 (Err(actual), Err(expected)) => {
549 assert_eq!(actual, expected, "TC{index} error variant mismatch");
550 }
551 (actual, expected) => {
552 panic!(
554 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
555 );
556 }
557 }
558 }
559 }
560
561 #[test]
562 fn test_sequencer_validate_next_update() {
563 struct TestCase {
564 updater: BinanceFuturesUsdOrderBookL2Sequencer,
565 input: BinanceFuturesOrderBookL2Update,
566 expected: Result<(), DataError>,
567 }
568
569 let tests = vec![
570 TestCase {
571 updater: BinanceFuturesUsdOrderBookL2Sequencer {
573 updates_processed: 100,
574 last_update_id: 100,
575 },
576 input: BinanceFuturesOrderBookL2Update {
577 subscription_id: SubscriptionId::from("subscription_id"),
578 time_exchange: Default::default(),
579 time_engine: Default::default(),
580 first_update_id: 101,
581 last_update_id: 110,
582 prev_last_update_id: 100,
583 bids: vec![],
584 asks: vec![],
585 },
586 expected: Ok(()),
587 },
588 TestCase {
589 updater: BinanceFuturesUsdOrderBookL2Sequencer {
591 updates_processed: 100,
592 last_update_id: 100,
593 },
594 input: BinanceFuturesOrderBookL2Update {
595 subscription_id: SubscriptionId::from("subscription_id"),
596 time_exchange: Default::default(),
597 time_engine: Default::default(),
598 first_update_id: 100,
599 last_update_id: 90,
600 prev_last_update_id: 90,
601 bids: vec![],
602 asks: vec![],
603 },
604 expected: Err(DataError::InvalidSequence {
605 prev_last_update_id: 100,
606 first_update_id: 100,
607 }),
608 },
609 ];
610
611 for (index, test) in tests.into_iter().enumerate() {
612 let actual = test.updater.validate_next_update(&test.input);
613 match (actual, test.expected) {
614 (Ok(_), Ok(_)) => {
615 }
617 (Err(actual), Err(expected)) => {
618 assert_eq!(actual, expected, "TC{index} error variant mismatch");
619 }
620 (actual, expected) => {
621 panic!(
623 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
624 );
625 }
626 }
627 }
628 }
629
630 #[test]
631 fn test_update_rustrade_order_book_with_sequenced_updates() {
632 struct TestCase {
633 sequencer: BinanceFuturesUsdOrderBookL2Sequencer,
634 book: OrderBook,
635 input_update: BinanceFuturesOrderBookL2Update,
636 expected: OrderBook,
637 }
638
639 let tests = vec![
640 TestCase {
641 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
643 updates_processed: 100,
644 last_update_id: 100,
645 },
646 book: OrderBook::new(0, None, vec![Level::new(50, 1)], vec![Level::new(100, 1)]),
647 input_update: BinanceFuturesOrderBookL2Update {
648 subscription_id: SubscriptionId::from("subscription_id"),
649 time_exchange: Default::default(),
650 time_engine: Default::default(),
651 first_update_id: 0,
652 last_update_id: 0,
653 prev_last_update_id: 0,
654 bids: vec![],
655 asks: vec![],
656 },
657 expected: OrderBook::new(
658 0,
659 None,
660 vec![Level::new(50, 1)],
661 vec![Level::new(100, 1)],
662 ),
663 },
664 TestCase {
665 sequencer: BinanceFuturesUsdOrderBookL2Sequencer {
667 updates_processed: 100,
668 last_update_id: 100,
669 },
670 book: OrderBook::new(
671 100,
672 None,
673 vec![Level::new(80, 1), Level::new(100, 1), Level::new(90, 1)],
674 vec![Level::new(150, 1), Level::new(110, 1), Level::new(120, 1)],
675 ),
676 input_update: BinanceFuturesOrderBookL2Update {
677 subscription_id: SubscriptionId::from("subscription_id"),
678 time_exchange: Default::default(),
679 time_engine: Default::default(),
680 first_update_id: 101,
681 last_update_id: 110,
682 prev_last_update_id: 100,
683 bids: vec![
684 BinanceLevel {
686 price: dec!(80),
687 amount: dec!(0),
688 },
689 BinanceLevel {
691 price: dec!(90),
692 amount: dec!(10),
693 },
694 ],
695 asks: vec![
696 BinanceLevel {
698 price: dec!(200),
699 amount: dec!(1),
700 },
701 BinanceLevel {
703 price: dec!(500),
704 amount: dec!(0),
705 },
706 ],
707 },
708 expected: OrderBook::new(
709 110,
710 None,
711 vec![Level::new(100, 1), Level::new(90, 10)],
712 vec![
713 Level::new(110, 1),
714 Level::new(120, 1),
715 Level::new(150, 1),
716 Level::new(200, 1),
717 ],
718 ),
719 },
720 ];
721
722 for (index, mut test) in tests.into_iter().enumerate() {
723 if let Some(valid_update) = test.sequencer.validate_sequence(test.input_update).unwrap()
724 {
725 let rustrade_update = OrderBookEvent::Update(OrderBook::new(
726 valid_update.last_update_id,
727 None,
728 valid_update.bids,
729 valid_update.asks,
730 ));
731
732 test.book.update(&rustrade_update);
733 }
734
735 assert_eq!(test.book, test.expected, "TC{index} failed");
736 }
737 }
738}