1use super::super::book::BinanceLevel;
2use crate::{
3 Identifier, SnapshotFetcher,
4 books::OrderBook,
5 error::DataError,
6 event::{MarketEvent, MarketIter},
7 exchange::{
8 Connector,
9 binance::{
10 book::l2::{BinanceOrderBookL2Meta, BinanceOrderBookL2Snapshot},
11 market::BinanceMarket,
12 spot::BinanceSpot,
13 },
14 },
15 instrument::InstrumentData,
16 subscription::{
17 Map, Subscription,
18 book::{OrderBookEvent, OrderBooksL2},
19 },
20 transformer::ExchangeTransformer,
21};
22use async_trait::async_trait;
23use chrono::{DateTime, Utc};
24use futures_util::future::try_join_all;
25use rustrade_instrument::exchange::ExchangeId;
26use rustrade_integration::{
27 Transformer, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
28};
29use serde::{Deserialize, Serialize};
30use std::future::Future;
31use tokio::sync::mpsc::UnboundedSender;
32
33pub const HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_SPOT: &str = "https://api.binance.com/api/v3/depth";
37
38#[derive(Debug)]
39pub struct BinanceSpotOrderBooksL2SnapshotFetcher;
40
41impl SnapshotFetcher<BinanceSpot, OrderBooksL2> for BinanceSpotOrderBooksL2SnapshotFetcher {
42 fn fetch_snapshots<Instrument>(
43 subscriptions: &[Subscription<BinanceSpot, Instrument, OrderBooksL2>],
44 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, OrderBookEvent>>, SocketError>>
45 + Send
46 where
47 Instrument: InstrumentData,
48 Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
49 {
50 let l2_snapshot_futures = subscriptions.iter().map(|subscription| {
51 let market = subscription.id();
53 let snapshot_url = format!(
54 "{}?symbol={}&limit=100",
55 HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_SPOT, market.0,
56 );
57
58 async move {
59 let snapshot = reqwest::get(snapshot_url)
61 .await
62 .map_err(SocketError::Http)?
63 .json::<BinanceOrderBookL2Snapshot>()
64 .await
65 .map_err(SocketError::Http)?;
66
67 Ok(MarketEvent::from((
68 ExchangeId::BinanceSpot,
69 subscription.instrument.key().clone(),
70 snapshot,
71 )))
72 }
73 });
74
75 try_join_all(l2_snapshot_futures)
76 }
77}
78
79#[derive(Debug)]
80pub struct BinanceSpotOrderBooksL2Transformer<InstrumentKey> {
81 instrument_map: Map<BinanceOrderBookL2Meta<InstrumentKey, BinanceSpotOrderBookL2Sequencer>>,
82}
83
84#[async_trait]
85impl<InstrumentKey> ExchangeTransformer<BinanceSpot, InstrumentKey, OrderBooksL2>
86 for BinanceSpotOrderBooksL2Transformer<InstrumentKey>
87where
88 InstrumentKey: Clone + PartialEq + Send + Sync,
89{
90 async fn init(
91 instrument_map: Map<InstrumentKey>,
92 initial_snapshots: &[MarketEvent<InstrumentKey, OrderBookEvent>],
93 _: UnboundedSender<WsMessage>,
94 ) -> Result<Self, DataError> {
95 let instrument_map = instrument_map
96 .0
97 .into_iter()
98 .map(|(sub_id, instrument_key)| {
99 let snapshot = initial_snapshots
100 .iter()
101 .find(|snapshot| snapshot.instrument == instrument_key)
102 .ok_or_else(|| DataError::InitialSnapshotMissing(sub_id.clone()))?;
103
104 let OrderBookEvent::Snapshot(snapshot) = &snapshot.kind else {
105 return Err(DataError::InitialSnapshotInvalid(String::from(
106 "expected OrderBookEvent::Snapshot but found OrderBookEvent::Update",
107 )));
108 };
109
110 let book_meta = BinanceOrderBookL2Meta::new(
111 instrument_key,
112 BinanceSpotOrderBookL2Sequencer::new(snapshot.sequence()),
113 );
114
115 Ok((sub_id, book_meta))
116 })
117 .collect::<Result<Map<_>, _>>()?;
118
119 Ok(Self { instrument_map })
120 }
121}
122
123impl<InstrumentKey> Transformer for BinanceSpotOrderBooksL2Transformer<InstrumentKey>
124where
125 InstrumentKey: Clone,
126{
127 type Error = DataError;
128 type Input = BinanceSpotOrderBookL2Update;
129 type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
130 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
131
132 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
133 let subscription_id = match input.id() {
135 Some(subscription_id) => subscription_id,
136 None => return vec![],
137 };
138
139 let instrument = match self.instrument_map.find_mut(&subscription_id) {
141 Ok(instrument) => instrument,
142 Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
143 };
144
145 let valid_update = match instrument.sequencer.validate_sequence(input) {
147 Ok(Some(valid_update)) => valid_update,
148 Ok(None) => return vec![],
149 Err(error) => return vec![Err(error)],
150 };
151
152 MarketIter::<InstrumentKey, OrderBookEvent>::from((
153 BinanceSpot::ID,
154 instrument.key.clone(),
155 valid_update,
156 ))
157 .0
158 }
159}
160
161#[derive(Debug)]
186pub struct BinanceSpotOrderBookL2Sequencer {
187 pub updates_processed: u64,
188 pub last_update_id: u64,
189 pub prev_last_update_id: u64,
190}
191
192impl BinanceSpotOrderBookL2Sequencer {
193 pub fn new(last_update_id: u64) -> Self {
195 Self {
196 updates_processed: 0,
197 last_update_id,
198 prev_last_update_id: last_update_id,
199 }
200 }
201
202 pub fn validate_sequence(
206 &mut self,
207 update: BinanceSpotOrderBookL2Update,
208 ) -> Result<Option<BinanceSpotOrderBookL2Update>, DataError> {
209 if update.last_update_id <= self.last_update_id {
211 return Ok(None);
212 }
213
214 if self.is_first_update() {
215 self.validate_first_update(&update)?;
217 } else {
218 self.validate_next_update(&update)?;
220 }
221
222 self.updates_processed += 1;
224 self.prev_last_update_id = self.last_update_id;
225 self.last_update_id = update.last_update_id;
226
227 Ok(Some(update))
228 }
229
230 pub fn is_first_update(&self) -> bool {
235 self.updates_processed == 0
236 }
237
238 pub fn validate_first_update(
243 &self,
244 update: &BinanceSpotOrderBookL2Update,
245 ) -> Result<(), DataError> {
246 let expected_next_id = self.last_update_id + 1;
247 if update.first_update_id <= expected_next_id && update.last_update_id >= expected_next_id {
248 Ok(())
249 } else {
250 Err(DataError::InvalidSequence {
251 prev_last_update_id: self.last_update_id,
252 first_update_id: update.first_update_id,
253 })
254 }
255 }
256
257 pub fn validate_next_update(
263 &self,
264 update: &BinanceSpotOrderBookL2Update,
265 ) -> Result<(), DataError> {
266 let expected_next_id = self.last_update_id + 1;
267 if update.first_update_id == expected_next_id {
268 Ok(())
269 } else {
270 Err(DataError::InvalidSequence {
271 prev_last_update_id: self.last_update_id,
272 first_update_id: update.first_update_id,
273 })
274 }
275 }
276}
277
278#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
297pub struct BinanceSpotOrderBookL2Update {
298 #[serde(
299 alias = "s",
300 deserialize_with = "super::super::book::l2::de_ob_l2_subscription_id"
301 )]
302 pub subscription_id: SubscriptionId,
303 #[serde(
304 alias = "E",
305 deserialize_with = "rustrade_integration::serde::de::de_u64_epoch_ms_as_datetime_utc"
306 )]
307 pub time_exchange: DateTime<Utc>,
308 #[serde(alias = "U")]
309 pub first_update_id: u64,
310 #[serde(alias = "u")]
311 pub last_update_id: u64,
312 #[serde(alias = "b")]
313 pub bids: Vec<BinanceLevel>,
314 #[serde(alias = "a")]
315 pub asks: Vec<BinanceLevel>,
316}
317
318impl Identifier<Option<SubscriptionId>> for BinanceSpotOrderBookL2Update {
319 fn id(&self) -> Option<SubscriptionId> {
320 Some(self.subscription_id.clone())
321 }
322}
323
324impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BinanceSpotOrderBookL2Update)>
325 for MarketIter<InstrumentKey, OrderBookEvent>
326{
327 fn from(
328 (exchange_id, instrument, update): (
329 ExchangeId,
330 InstrumentKey,
331 BinanceSpotOrderBookL2Update,
332 ),
333 ) -> Self {
334 Self(vec![Ok(MarketEvent {
335 time_exchange: update.time_exchange,
336 time_received: Utc::now(),
337 exchange: exchange_id,
338 instrument,
339 kind: OrderBookEvent::Update(OrderBook::new(
340 update.last_update_id,
341 None,
342 update.bids,
343 update.asks,
344 )),
345 })])
346 }
347}
348
349#[cfg(test)]
350#[allow(clippy::unwrap_used)] mod tests {
352 use super::*;
353 use crate::books::Level;
354 use rust_decimal_macros::dec;
355
356 #[test]
357 fn test_de_binance_spot_order_book_l2_update() {
358 let input = r#"
359 {
360 "e":"depthUpdate",
361 "E":1671656397761,
362 "s":"ETHUSDT",
363 "U":22611425143,
364 "u":22611425151,
365 "b":[
366 ["1209.67000000","85.48210000"],
367 ["1209.66000000","20.68790000"]
368 ],
369 "a":[]
370 }
371 "#;
372
373 assert_eq!(
374 serde_json::from_str::<BinanceSpotOrderBookL2Update>(input).unwrap(),
375 BinanceSpotOrderBookL2Update {
376 subscription_id: SubscriptionId::from("@depth@100ms|ETHUSDT"),
377 time_exchange: DateTime::from_timestamp_millis(1671656397761).unwrap(),
378 first_update_id: 22611425143,
379 last_update_id: 22611425151,
380 bids: vec![
381 BinanceLevel {
382 price: dec!(1209.67000000),
383 amount: dec!(85.48210000)
384 },
385 BinanceLevel {
386 price: dec!(1209.66000000),
387 amount: dec!(20.68790000)
388 },
389 ],
390 asks: vec![]
391 }
392 );
393 }
394
395 #[test]
396 fn test_sequencer_is_first_update() {
397 struct TestCase {
398 input: BinanceSpotOrderBookL2Sequencer,
399 expected: bool,
400 }
401
402 let tests = vec![
403 TestCase {
404 input: BinanceSpotOrderBookL2Sequencer::new(10),
406 expected: true,
407 },
408 TestCase {
409 input: BinanceSpotOrderBookL2Sequencer {
411 updates_processed: 10,
412 last_update_id: 100,
413 prev_last_update_id: 90,
414 },
415 expected: false,
416 },
417 ];
418
419 for (index, test) in tests.into_iter().enumerate() {
420 assert_eq!(
421 test.input.is_first_update(),
422 test.expected,
423 "TC{} failed",
424 index
425 );
426 }
427 }
428
429 #[test]
430 fn test_sequencer_validate_first_update() {
431 struct TestCase {
432 sequencer: BinanceSpotOrderBookL2Sequencer,
433 input: BinanceSpotOrderBookL2Update,
434 expected: Result<(), DataError>,
435 }
436
437 let tests = vec![
438 TestCase {
439 sequencer: BinanceSpotOrderBookL2Sequencer {
441 updates_processed: 0,
442 last_update_id: 100,
443 prev_last_update_id: 90,
444 },
445 input: BinanceSpotOrderBookL2Update {
446 subscription_id: SubscriptionId::from("subscription_id"),
447 time_exchange: Default::default(),
448 first_update_id: 100,
449 last_update_id: 110,
450 bids: vec![],
451 asks: vec![],
452 },
453 expected: Ok(()),
454 },
455 TestCase {
456 sequencer: BinanceSpotOrderBookL2Sequencer {
458 updates_processed: 0,
459 last_update_id: 100,
460 prev_last_update_id: 90,
461 },
462 input: BinanceSpotOrderBookL2Update {
463 subscription_id: SubscriptionId::from("subscription_id"),
464 time_exchange: Default::default(),
465 first_update_id: 102,
466 last_update_id: 90,
467 bids: vec![],
468 asks: vec![],
469 },
470 expected: Err(DataError::InvalidSequence {
471 prev_last_update_id: 100,
472 first_update_id: 102,
473 }),
474 },
475 TestCase {
476 sequencer: BinanceSpotOrderBookL2Sequencer {
478 updates_processed: 0,
479 last_update_id: 100,
480 prev_last_update_id: 90,
481 },
482 input: BinanceSpotOrderBookL2Update {
483 subscription_id: SubscriptionId::from("subscription_id"),
484 time_exchange: Default::default(),
485 first_update_id: 110,
486 last_update_id: 90,
487 bids: vec![],
488 asks: vec![],
489 },
490 expected: Err(DataError::InvalidSequence {
491 prev_last_update_id: 100,
492 first_update_id: 110,
493 }),
494 },
495 TestCase {
496 sequencer: BinanceSpotOrderBookL2Sequencer {
498 updates_processed: 0,
499 last_update_id: 100,
500 prev_last_update_id: 90,
501 },
502 input: BinanceSpotOrderBookL2Update {
503 subscription_id: SubscriptionId::from("subscription_id"),
504 time_exchange: Default::default(),
505 first_update_id: 110,
506 last_update_id: 90,
507 bids: vec![],
508 asks: vec![],
509 },
510 expected: Err(DataError::InvalidSequence {
511 prev_last_update_id: 100,
512 first_update_id: 110,
513 }),
514 },
515 ];
516
517 for (index, test) in tests.into_iter().enumerate() {
518 let actual = test.sequencer.validate_first_update(&test.input);
519 match (actual, test.expected) {
520 (Ok(_), Ok(_)) => {
521 }
523 (Err(actual), Err(expected)) => {
524 assert_eq!(actual, expected, "TC{index} error variant mismatch");
525 }
526 (actual, expected) => {
527 panic!(
529 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
530 );
531 }
532 }
533 }
534 }
535
536 #[test]
537 fn test_sequencer_validate_next_update() {
538 struct TestCase {
539 sequencer: BinanceSpotOrderBookL2Sequencer,
540 input: BinanceSpotOrderBookL2Update,
541 expected: Result<(), DataError>,
542 }
543
544 let tests = vec![
545 TestCase {
546 sequencer: BinanceSpotOrderBookL2Sequencer {
548 updates_processed: 100,
549 last_update_id: 100,
550 prev_last_update_id: 100,
551 },
552 input: BinanceSpotOrderBookL2Update {
553 subscription_id: SubscriptionId::from("subscription_id"),
554 time_exchange: Default::default(),
555 first_update_id: 101,
556 last_update_id: 110,
557 bids: vec![],
558 asks: vec![],
559 },
560 expected: Ok(()),
561 },
562 TestCase {
563 sequencer: BinanceSpotOrderBookL2Sequencer {
565 updates_processed: 100,
566 last_update_id: 100,
567 prev_last_update_id: 90,
568 },
569 input: BinanceSpotOrderBookL2Update {
570 subscription_id: SubscriptionId::from("subscription_id"),
571 time_exchange: Default::default(),
572 first_update_id: 120,
573 last_update_id: 130,
574 bids: vec![],
575 asks: vec![],
576 },
577 expected: Err(DataError::InvalidSequence {
578 prev_last_update_id: 100,
579 first_update_id: 120,
580 }),
581 },
582 ];
583
584 for (index, test) in tests.into_iter().enumerate() {
585 let actual = test.sequencer.validate_next_update(&test.input);
586 match (actual, test.expected) {
587 (Ok(_), Ok(_)) => {
588 }
590 (Err(actual), Err(expected)) => {
591 assert_eq!(actual, expected, "TC{index} error variant mismatch");
592 }
593 (actual, expected) => {
594 panic!(
596 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
597 );
598 }
599 }
600 }
601 }
602
603 #[test]
604 fn test_update_rustrade_order_book_with_sequenced_updates() {
605 struct TestCase {
606 sequencer: BinanceSpotOrderBookL2Sequencer,
607 book: OrderBook,
608 input_update: BinanceSpotOrderBookL2Update,
609 expected: OrderBook,
610 }
611
612 let tests = vec![
613 TestCase {
614 sequencer: BinanceSpotOrderBookL2Sequencer {
616 updates_processed: 100,
617 last_update_id: 100,
618 prev_last_update_id: 0,
619 },
620 book: OrderBook::new(100, None, vec![Level::new(50, 1)], vec![Level::new(100, 1)]),
621 input_update: BinanceSpotOrderBookL2Update {
622 subscription_id: SubscriptionId::from("subscription_id"),
623 time_exchange: Default::default(),
624 first_update_id: 0,
625 last_update_id: 100, bids: vec![],
627 asks: vec![],
628 },
629 expected: OrderBook::new(
630 100,
631 None,
632 vec![Level::new(50, 1)],
633 vec![Level::new(100, 1)],
634 ),
635 },
636 TestCase {
637 sequencer: BinanceSpotOrderBookL2Sequencer {
639 updates_processed: 100,
640 last_update_id: 100,
641 prev_last_update_id: 100,
642 },
643 book: OrderBook::new(
644 100,
645 None,
646 vec![Level::new(80, 1), Level::new(100, 1), Level::new(90, 1)],
647 vec![Level::new(150, 1), Level::new(110, 1), Level::new(120, 1)],
648 ),
649 input_update: BinanceSpotOrderBookL2Update {
650 subscription_id: SubscriptionId::from("subscription_id"),
651 time_exchange: Default::default(),
652 first_update_id: 101,
653 last_update_id: 110,
654 bids: vec![
655 BinanceLevel {
657 price: dec!(80),
658 amount: dec!(0),
659 },
660 BinanceLevel {
662 price: dec!(90),
663 amount: dec!(10),
664 },
665 ],
666 asks: vec![
667 BinanceLevel {
669 price: dec!(200),
670 amount: dec!(1),
671 },
672 BinanceLevel {
674 price: dec!(500),
675 amount: dec!(0),
676 },
677 ],
678 },
679 expected: OrderBook::new(
680 110,
681 None,
682 vec![Level::new(100, 1), Level::new(90, 10)],
683 vec![
684 Level::new(110, 1),
685 Level::new(120, 1),
686 Level::new(150, 1),
687 Level::new(200, 1),
688 ],
689 ),
690 },
691 ];
692
693 for (index, mut test) in tests.into_iter().enumerate() {
694 if let Some(valid_update) = test.sequencer.validate_sequence(test.input_update).unwrap()
695 {
696 let rustrade_update = OrderBookEvent::Update(OrderBook::new(
697 valid_update.last_update_id,
698 None,
699 valid_update.bids,
700 valid_update.asks,
701 ));
702
703 test.book.update(&rustrade_update);
704 }
705
706 assert_eq!(test.book, test.expected, "TC{index} failed");
707 }
708 }
709}