1use super::super::book::BinanceLevel;
2use crate::{
3 Identifier, SnapshotFetcher,
4 books::OrderBook,
5 error::DataError,
6 event::{MarketEvent, MarketIter},
7 exchange::{
8 Connector,
9 binance::{
10 book::l2::{BinanceOrderBookL2Meta, BinanceOrderBookL2Snapshot},
11 market::BinanceMarket,
12 spot::BinanceSpot,
13 },
14 },
15 instrument::InstrumentData,
16 subscription::{
17 Map, Subscription,
18 book::{OrderBookEvent, OrderBooksL2},
19 },
20 transformer::ExchangeTransformer,
21};
22use async_trait::async_trait;
23use barter_instrument::exchange::ExchangeId;
24use barter_integration::{
25 Transformer, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
26};
27use chrono::{DateTime, Utc};
28use futures_util::future::try_join_all;
29use serde::{Deserialize, Serialize};
30use std::future::Future;
31use tokio::sync::mpsc::UnboundedSender;
32
33pub const HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_SPOT: &str = "https://api.binance.com/api/v3/depth";
37
38#[derive(Debug)]
39pub struct BinanceSpotOrderBooksL2SnapshotFetcher;
40
41impl SnapshotFetcher<BinanceSpot, OrderBooksL2> for BinanceSpotOrderBooksL2SnapshotFetcher {
42 fn fetch_snapshots<Instrument>(
43 subscriptions: &[Subscription<BinanceSpot, Instrument, OrderBooksL2>],
44 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, OrderBookEvent>>, SocketError>>
45 + Send
46 where
47 Instrument: InstrumentData,
48 Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
49 {
50 let l2_snapshot_futures = subscriptions.iter().map(|subscription| {
51 let market = subscription.id();
53 let snapshot_url = format!(
54 "{}?symbol={}&limit=100",
55 HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_SPOT, market.0,
56 );
57
58 async move {
59 let snapshot = reqwest::get(snapshot_url)
61 .await
62 .map_err(SocketError::Http)?
63 .json::<BinanceOrderBookL2Snapshot>()
64 .await
65 .map_err(SocketError::Http)?;
66
67 Ok(MarketEvent::from((
68 ExchangeId::BinanceSpot,
69 subscription.instrument.key().clone(),
70 snapshot,
71 )))
72 }
73 });
74
75 try_join_all(l2_snapshot_futures)
76 }
77}
78
79#[derive(Debug)]
80pub struct BinanceSpotOrderBooksL2Transformer<InstrumentKey> {
81 instrument_map: Map<BinanceOrderBookL2Meta<InstrumentKey, BinanceSpotOrderBookL2Sequencer>>,
82}
83
84#[async_trait]
85impl<InstrumentKey> ExchangeTransformer<BinanceSpot, InstrumentKey, OrderBooksL2>
86 for BinanceSpotOrderBooksL2Transformer<InstrumentKey>
87where
88 InstrumentKey: Clone + PartialEq + Send + Sync,
89{
90 async fn init(
91 instrument_map: Map<InstrumentKey>,
92 initial_snapshots: &[MarketEvent<InstrumentKey, OrderBookEvent>],
93 _: UnboundedSender<WsMessage>,
94 ) -> Result<Self, DataError> {
95 let instrument_map = instrument_map
96 .0
97 .into_iter()
98 .map(|(sub_id, instrument_key)| {
99 let snapshot = initial_snapshots
100 .iter()
101 .find(|snapshot| snapshot.instrument == instrument_key)
102 .ok_or_else(|| DataError::InitialSnapshotMissing(sub_id.clone()))?;
103
104 let OrderBookEvent::Snapshot(snapshot) = &snapshot.kind else {
105 return Err(DataError::InitialSnapshotInvalid(String::from(
106 "expected OrderBookEvent::Snapshot but found OrderBookEvent::Update",
107 )));
108 };
109
110 let book_meta = BinanceOrderBookL2Meta::new(
111 instrument_key,
112 BinanceSpotOrderBookL2Sequencer::new(snapshot.sequence()),
113 );
114
115 Ok((sub_id, book_meta))
116 })
117 .collect::<Result<Map<_>, _>>()?;
118
119 Ok(Self { instrument_map })
120 }
121}
122
123impl<InstrumentKey> Transformer for BinanceSpotOrderBooksL2Transformer<InstrumentKey>
124where
125 InstrumentKey: Clone,
126{
127 type Error = DataError;
128 type Input = BinanceSpotOrderBookL2Update;
129 type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
130 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
131
132 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
133 let subscription_id = match input.id() {
135 Some(subscription_id) => subscription_id,
136 None => return vec![],
137 };
138
139 let instrument = match self.instrument_map.find_mut(&subscription_id) {
141 Ok(instrument) => instrument,
142 Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
143 };
144
145 let valid_update = match instrument.sequencer.validate_sequence(input) {
147 Ok(Some(valid_update)) => valid_update,
148 Ok(None) => return vec![],
149 Err(error) => return vec![Err(error)],
150 };
151
152 MarketIter::<InstrumentKey, OrderBookEvent>::from((
153 BinanceSpot::ID,
154 instrument.key.clone(),
155 valid_update,
156 ))
157 .0
158 }
159}
160
161#[derive(Debug)]
186pub struct BinanceSpotOrderBookL2Sequencer {
187 pub updates_processed: u64,
188 pub last_update_id: u64,
189 pub prev_last_update_id: u64,
190}
191
192impl BinanceSpotOrderBookL2Sequencer {
193 pub fn new(last_update_id: u64) -> Self {
195 Self {
196 updates_processed: 0,
197 last_update_id,
198 prev_last_update_id: last_update_id,
199 }
200 }
201
202 pub fn validate_sequence(
206 &mut self,
207 update: BinanceSpotOrderBookL2Update,
208 ) -> Result<Option<BinanceSpotOrderBookL2Update>, DataError> {
209 if update.last_update_id <= self.last_update_id {
211 return Ok(None);
212 }
213
214 if self.is_first_update() {
215 self.validate_first_update(&update)?;
217 } else {
218 self.validate_next_update(&update)?;
220 }
221
222 self.updates_processed += 1;
224 self.prev_last_update_id = self.last_update_id;
225 self.last_update_id = update.last_update_id;
226
227 Ok(Some(update))
228 }
229
230 pub fn is_first_update(&self) -> bool {
235 self.updates_processed == 0
236 }
237
238 pub fn validate_first_update(
243 &self,
244 update: &BinanceSpotOrderBookL2Update,
245 ) -> Result<(), DataError> {
246 let expected_next_id = self.last_update_id + 1;
247 if update.first_update_id <= expected_next_id && update.last_update_id >= expected_next_id {
248 Ok(())
249 } else {
250 Err(DataError::InvalidSequence {
251 prev_last_update_id: self.last_update_id,
252 first_update_id: update.first_update_id,
253 })
254 }
255 }
256
257 pub fn validate_next_update(
263 &self,
264 update: &BinanceSpotOrderBookL2Update,
265 ) -> Result<(), DataError> {
266 let expected_next_id = self.last_update_id + 1;
267 if update.first_update_id == expected_next_id {
268 Ok(())
269 } else {
270 Err(DataError::InvalidSequence {
271 prev_last_update_id: self.last_update_id,
272 first_update_id: update.first_update_id,
273 })
274 }
275 }
276}
277
278#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
297pub struct BinanceSpotOrderBookL2Update {
298 #[serde(
299 alias = "s",
300 deserialize_with = "super::super::book::l2::de_ob_l2_subscription_id"
301 )]
302 pub subscription_id: SubscriptionId,
303 #[serde(
304 alias = "E",
305 deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc"
306 )]
307 pub time_exchange: DateTime<Utc>,
308 #[serde(alias = "U")]
309 pub first_update_id: u64,
310 #[serde(alias = "u")]
311 pub last_update_id: u64,
312 #[serde(alias = "b")]
313 pub bids: Vec<BinanceLevel>,
314 #[serde(alias = "a")]
315 pub asks: Vec<BinanceLevel>,
316}
317
318impl Identifier<Option<SubscriptionId>> for BinanceSpotOrderBookL2Update {
319 fn id(&self) -> Option<SubscriptionId> {
320 Some(self.subscription_id.clone())
321 }
322}
323
324impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BinanceSpotOrderBookL2Update)>
325 for MarketIter<InstrumentKey, OrderBookEvent>
326{
327 fn from(
328 (exchange_id, instrument, update): (
329 ExchangeId,
330 InstrumentKey,
331 BinanceSpotOrderBookL2Update,
332 ),
333 ) -> Self {
334 Self(vec![Ok(MarketEvent {
335 time_exchange: update.time_exchange,
336 time_received: Utc::now(),
337 exchange: exchange_id,
338 instrument,
339 kind: OrderBookEvent::Update(OrderBook::new(
340 update.last_update_id,
341 None,
342 update.bids,
343 update.asks,
344 )),
345 })])
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352 use crate::books::Level;
353 use rust_decimal_macros::dec;
354
355 #[test]
356 fn test_de_binance_spot_order_book_l2_update() {
357 let input = r#"
358 {
359 "e":"depthUpdate",
360 "E":1671656397761,
361 "s":"ETHUSDT",
362 "U":22611425143,
363 "u":22611425151,
364 "b":[
365 ["1209.67000000","85.48210000"],
366 ["1209.66000000","20.68790000"]
367 ],
368 "a":[]
369 }
370 "#;
371
372 assert_eq!(
373 serde_json::from_str::<BinanceSpotOrderBookL2Update>(input).unwrap(),
374 BinanceSpotOrderBookL2Update {
375 subscription_id: SubscriptionId::from("@depth@100ms|ETHUSDT"),
376 time_exchange: DateTime::from_timestamp_millis(1671656397761).unwrap(),
377 first_update_id: 22611425143,
378 last_update_id: 22611425151,
379 bids: vec![
380 BinanceLevel {
381 price: dec!(1209.67000000),
382 amount: dec!(85.48210000)
383 },
384 BinanceLevel {
385 price: dec!(1209.66000000),
386 amount: dec!(20.68790000)
387 },
388 ],
389 asks: vec![]
390 }
391 );
392 }
393
394 #[test]
395 fn test_sequencer_is_first_update() {
396 struct TestCase {
397 input: BinanceSpotOrderBookL2Sequencer,
398 expected: bool,
399 }
400
401 let tests = vec![
402 TestCase {
403 input: BinanceSpotOrderBookL2Sequencer::new(10),
405 expected: true,
406 },
407 TestCase {
408 input: BinanceSpotOrderBookL2Sequencer {
410 updates_processed: 10,
411 last_update_id: 100,
412 prev_last_update_id: 90,
413 },
414 expected: false,
415 },
416 ];
417
418 for (index, test) in tests.into_iter().enumerate() {
419 assert_eq!(
420 test.input.is_first_update(),
421 test.expected,
422 "TC{} failed",
423 index
424 );
425 }
426 }
427
428 #[test]
429 fn test_sequencer_validate_first_update() {
430 struct TestCase {
431 sequencer: BinanceSpotOrderBookL2Sequencer,
432 input: BinanceSpotOrderBookL2Update,
433 expected: Result<(), DataError>,
434 }
435
436 let tests = vec![
437 TestCase {
438 sequencer: BinanceSpotOrderBookL2Sequencer {
440 updates_processed: 0,
441 last_update_id: 100,
442 prev_last_update_id: 90,
443 },
444 input: BinanceSpotOrderBookL2Update {
445 subscription_id: SubscriptionId::from("subscription_id"),
446 time_exchange: Default::default(),
447 first_update_id: 100,
448 last_update_id: 110,
449 bids: vec![],
450 asks: vec![],
451 },
452 expected: Ok(()),
453 },
454 TestCase {
455 sequencer: BinanceSpotOrderBookL2Sequencer {
457 updates_processed: 0,
458 last_update_id: 100,
459 prev_last_update_id: 90,
460 },
461 input: BinanceSpotOrderBookL2Update {
462 subscription_id: SubscriptionId::from("subscription_id"),
463 time_exchange: Default::default(),
464 first_update_id: 102,
465 last_update_id: 90,
466 bids: vec![],
467 asks: vec![],
468 },
469 expected: Err(DataError::InvalidSequence {
470 prev_last_update_id: 100,
471 first_update_id: 102,
472 }),
473 },
474 TestCase {
475 sequencer: BinanceSpotOrderBookL2Sequencer {
477 updates_processed: 0,
478 last_update_id: 100,
479 prev_last_update_id: 90,
480 },
481 input: BinanceSpotOrderBookL2Update {
482 subscription_id: SubscriptionId::from("subscription_id"),
483 time_exchange: Default::default(),
484 first_update_id: 110,
485 last_update_id: 90,
486 bids: vec![],
487 asks: vec![],
488 },
489 expected: Err(DataError::InvalidSequence {
490 prev_last_update_id: 100,
491 first_update_id: 110,
492 }),
493 },
494 TestCase {
495 sequencer: BinanceSpotOrderBookL2Sequencer {
497 updates_processed: 0,
498 last_update_id: 100,
499 prev_last_update_id: 90,
500 },
501 input: BinanceSpotOrderBookL2Update {
502 subscription_id: SubscriptionId::from("subscription_id"),
503 time_exchange: Default::default(),
504 first_update_id: 110,
505 last_update_id: 90,
506 bids: vec![],
507 asks: vec![],
508 },
509 expected: Err(DataError::InvalidSequence {
510 prev_last_update_id: 100,
511 first_update_id: 110,
512 }),
513 },
514 ];
515
516 for (index, test) in tests.into_iter().enumerate() {
517 let actual = test.sequencer.validate_first_update(&test.input);
518 match (actual, test.expected) {
519 (Ok(actual), Ok(expected)) => {
520 assert_eq!(actual, expected, "TC{} failed", index)
521 }
522 (Err(_), Err(_)) => {
523 }
525 (actual, expected) => {
526 panic!(
528 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
529 );
530 }
531 }
532 }
533 }
534
535 #[test]
536 fn test_sequencer_validate_next_update() {
537 struct TestCase {
538 sequencer: BinanceSpotOrderBookL2Sequencer,
539 input: BinanceSpotOrderBookL2Update,
540 expected: Result<(), DataError>,
541 }
542
543 let tests = vec![
544 TestCase {
545 sequencer: BinanceSpotOrderBookL2Sequencer {
547 updates_processed: 100,
548 last_update_id: 100,
549 prev_last_update_id: 100,
550 },
551 input: BinanceSpotOrderBookL2Update {
552 subscription_id: SubscriptionId::from("subscription_id"),
553 time_exchange: Default::default(),
554 first_update_id: 101,
555 last_update_id: 110,
556 bids: vec![],
557 asks: vec![],
558 },
559 expected: Ok(()),
560 },
561 TestCase {
562 sequencer: BinanceSpotOrderBookL2Sequencer {
564 updates_processed: 100,
565 last_update_id: 100,
566 prev_last_update_id: 90,
567 },
568 input: BinanceSpotOrderBookL2Update {
569 subscription_id: SubscriptionId::from("subscription_id"),
570 time_exchange: Default::default(),
571 first_update_id: 120,
572 last_update_id: 130,
573 bids: vec![],
574 asks: vec![],
575 },
576 expected: Err(DataError::InvalidSequence {
577 prev_last_update_id: 100,
578 first_update_id: 120,
579 }),
580 },
581 ];
582
583 for (index, test) in tests.into_iter().enumerate() {
584 let actual = test.sequencer.validate_next_update(&test.input);
585 match (actual, test.expected) {
586 (Ok(actual), Ok(expected)) => {
587 assert_eq!(actual, expected, "TC{} failed", index)
588 }
589 (Err(_), Err(_)) => {
590 }
592 (actual, expected) => {
593 panic!(
595 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
596 );
597 }
598 }
599 }
600 }
601
602 #[test]
603 fn test_update_barter_order_book_with_sequenced_updates() {
604 struct TestCase {
605 sequencer: BinanceSpotOrderBookL2Sequencer,
606 book: OrderBook,
607 input_update: BinanceSpotOrderBookL2Update,
608 expected: OrderBook,
609 }
610
611 let tests = vec![
612 TestCase {
613 sequencer: BinanceSpotOrderBookL2Sequencer {
615 updates_processed: 100,
616 last_update_id: 100,
617 prev_last_update_id: 0,
618 },
619 book: OrderBook::new(100, None, vec![Level::new(50, 1)], vec![Level::new(100, 1)]),
620 input_update: BinanceSpotOrderBookL2Update {
621 subscription_id: SubscriptionId::from("subscription_id"),
622 time_exchange: Default::default(),
623 first_update_id: 0,
624 last_update_id: 100, bids: vec![],
626 asks: vec![],
627 },
628 expected: OrderBook::new(
629 100,
630 None,
631 vec![Level::new(50, 1)],
632 vec![Level::new(100, 1)],
633 ),
634 },
635 TestCase {
636 sequencer: BinanceSpotOrderBookL2Sequencer {
638 updates_processed: 100,
639 last_update_id: 100,
640 prev_last_update_id: 100,
641 },
642 book: OrderBook::new(
643 100,
644 None,
645 vec![Level::new(80, 1), Level::new(100, 1), Level::new(90, 1)],
646 vec![Level::new(150, 1), Level::new(110, 1), Level::new(120, 1)],
647 ),
648 input_update: BinanceSpotOrderBookL2Update {
649 subscription_id: SubscriptionId::from("subscription_id"),
650 time_exchange: Default::default(),
651 first_update_id: 101,
652 last_update_id: 110,
653 bids: vec![
654 BinanceLevel {
656 price: dec!(80),
657 amount: dec!(0),
658 },
659 BinanceLevel {
661 price: dec!(90),
662 amount: dec!(10),
663 },
664 ],
665 asks: vec![
666 BinanceLevel {
668 price: dec!(200),
669 amount: dec!(1),
670 },
671 BinanceLevel {
673 price: dec!(500),
674 amount: dec!(0),
675 },
676 ],
677 },
678 expected: OrderBook::new(
679 110,
680 None,
681 vec![Level::new(100, 1), Level::new(90, 10)],
682 vec![
683 Level::new(110, 1),
684 Level::new(120, 1),
685 Level::new(150, 1),
686 Level::new(200, 1),
687 ],
688 ),
689 },
690 ];
691
692 for (index, mut test) in tests.into_iter().enumerate() {
693 if let Some(valid_update) = test.sequencer.validate_sequence(test.input_update).unwrap()
694 {
695 let barter_update = OrderBookEvent::Update(OrderBook::new(
696 valid_update.last_update_id,
697 None,
698 valid_update.bids,
699 valid_update.asks,
700 ));
701
702 test.book.update(&barter_update);
703 }
704
705 assert_eq!(test.book, test.expected, "TC{index} failed");
706 }
707 }
708}