1use super::super::book::BinanceLevel;
2use crate::{
3 Identifier, SnapshotFetcher,
4 books::OrderBook,
5 error::DataError,
6 event::{MarketEvent, MarketIter},
7 exchange::{
8 Connector,
9 binance::{
10 book::l2::{BinanceOrderBookL2Meta, BinanceOrderBookL2Snapshot},
11 market::BinanceMarket,
12 spot::BinanceSpot,
13 },
14 },
15 instrument::InstrumentData,
16 subscription::{
17 Map, Subscription,
18 book::{OrderBookEvent, OrderBooksL2},
19 },
20 transformer::ExchangeTransformer,
21};
22use chrono::{DateTime, Utc};
23use futures_util::future::try_join_all;
24use rustrade_instrument::exchange::ExchangeId;
25use rustrade_integration::{
26 Transformer, error::SocketError, protocol::websocket::WsMessage, subscription::SubscriptionId,
27};
28use serde::{Deserialize, Serialize};
29use std::future::Future;
30use tokio::sync::mpsc::UnboundedSender;
31
32pub const HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_SPOT: &str = "https://api.binance.com/api/v3/depth";
36
37#[derive(Debug)]
38pub struct BinanceSpotOrderBooksL2SnapshotFetcher;
39
40impl SnapshotFetcher<BinanceSpot, OrderBooksL2> for BinanceSpotOrderBooksL2SnapshotFetcher {
41 fn fetch_snapshots<Instrument>(
42 subscriptions: &[Subscription<BinanceSpot, Instrument, OrderBooksL2>],
43 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, OrderBookEvent>>, SocketError>>
44 + Send
45 where
46 Instrument: InstrumentData,
47 Subscription<BinanceSpot, Instrument, OrderBooksL2>: Identifier<BinanceMarket>,
48 {
49 let l2_snapshot_futures = subscriptions.iter().map(|subscription| {
50 let market = subscription.id();
52 let snapshot_url = format!(
53 "{}?symbol={}&limit=100",
54 HTTP_BOOK_L2_SNAPSHOT_URL_BINANCE_SPOT, market.0,
55 );
56
57 async move {
58 let snapshot = reqwest::get(snapshot_url)
60 .await
61 .map_err(SocketError::Http)?
62 .json::<BinanceOrderBookL2Snapshot>()
63 .await
64 .map_err(SocketError::Http)?;
65
66 Ok(MarketEvent::from((
67 ExchangeId::BinanceSpot,
68 subscription.instrument.key().clone(),
69 snapshot,
70 )))
71 }
72 });
73
74 try_join_all(l2_snapshot_futures)
75 }
76}
77
78#[derive(Debug)]
79pub struct BinanceSpotOrderBooksL2Transformer<InstrumentKey> {
80 instrument_map: Map<BinanceOrderBookL2Meta<InstrumentKey, BinanceSpotOrderBookL2Sequencer>>,
81}
82
83impl<InstrumentKey> ExchangeTransformer<BinanceSpot, InstrumentKey, OrderBooksL2>
84 for BinanceSpotOrderBooksL2Transformer<InstrumentKey>
85where
86 InstrumentKey: Clone + PartialEq + Send + Sync,
87{
88 async fn init(
89 instrument_map: Map<InstrumentKey>,
90 initial_snapshots: &[MarketEvent<InstrumentKey, OrderBookEvent>],
91 _: UnboundedSender<WsMessage>,
92 ) -> Result<Self, DataError> {
93 let instrument_map = instrument_map
94 .0
95 .into_iter()
96 .map(|(sub_id, instrument_key)| {
97 let snapshot = initial_snapshots
98 .iter()
99 .find(|snapshot| snapshot.instrument == instrument_key)
100 .ok_or_else(|| DataError::InitialSnapshotMissing(sub_id.clone()))?;
101
102 let OrderBookEvent::Snapshot(snapshot) = &snapshot.kind else {
103 return Err(DataError::InitialSnapshotInvalid(String::from(
104 "expected OrderBookEvent::Snapshot but found OrderBookEvent::Update",
105 )));
106 };
107
108 let book_meta = BinanceOrderBookL2Meta::new(
109 instrument_key,
110 BinanceSpotOrderBookL2Sequencer::new(snapshot.sequence()),
111 );
112
113 Ok((sub_id, book_meta))
114 })
115 .collect::<Result<Map<_>, _>>()?;
116
117 Ok(Self { instrument_map })
118 }
119}
120
121impl<InstrumentKey> Transformer for BinanceSpotOrderBooksL2Transformer<InstrumentKey>
122where
123 InstrumentKey: Clone,
124{
125 type Error = DataError;
126 type Input = BinanceSpotOrderBookL2Update;
127 type Output = MarketEvent<InstrumentKey, OrderBookEvent>;
128 type OutputIter = Vec<Result<Self::Output, Self::Error>>;
129
130 fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
131 let subscription_id = match input.id() {
133 Some(subscription_id) => subscription_id,
134 None => return vec![],
135 };
136
137 let instrument = match self.instrument_map.find_mut(&subscription_id) {
139 Ok(instrument) => instrument,
140 Err(unidentifiable) => return vec![Err(DataError::from(unidentifiable))],
141 };
142
143 let valid_update = match instrument.sequencer.validate_sequence(input) {
145 Ok(Some(valid_update)) => valid_update,
146 Ok(None) => return vec![],
147 Err(error) => return vec![Err(error)],
148 };
149
150 MarketIter::<InstrumentKey, OrderBookEvent>::from((
151 BinanceSpot::ID,
152 instrument.key.clone(),
153 valid_update,
154 ))
155 .0
156 }
157}
158
159#[derive(Debug)]
184pub struct BinanceSpotOrderBookL2Sequencer {
185 pub updates_processed: u64,
186 pub last_update_id: u64,
187 pub prev_last_update_id: u64,
188}
189
190impl BinanceSpotOrderBookL2Sequencer {
191 pub fn new(last_update_id: u64) -> Self {
193 Self {
194 updates_processed: 0,
195 last_update_id,
196 prev_last_update_id: last_update_id,
197 }
198 }
199
200 pub fn validate_sequence(
204 &mut self,
205 update: BinanceSpotOrderBookL2Update,
206 ) -> Result<Option<BinanceSpotOrderBookL2Update>, DataError> {
207 if update.last_update_id <= self.last_update_id {
209 return Ok(None);
210 }
211
212 if self.is_first_update() {
213 self.validate_first_update(&update)?;
215 } else {
216 self.validate_next_update(&update)?;
218 }
219
220 self.updates_processed += 1;
222 self.prev_last_update_id = self.last_update_id;
223 self.last_update_id = update.last_update_id;
224
225 Ok(Some(update))
226 }
227
228 pub fn is_first_update(&self) -> bool {
233 self.updates_processed == 0
234 }
235
236 pub fn validate_first_update(
241 &self,
242 update: &BinanceSpotOrderBookL2Update,
243 ) -> Result<(), DataError> {
244 let expected_next_id = self.last_update_id + 1;
245 if update.first_update_id <= expected_next_id && update.last_update_id >= expected_next_id {
246 Ok(())
247 } else {
248 Err(DataError::InvalidSequence {
249 prev_last_update_id: self.last_update_id,
250 first_update_id: update.first_update_id,
251 })
252 }
253 }
254
255 pub fn validate_next_update(
261 &self,
262 update: &BinanceSpotOrderBookL2Update,
263 ) -> Result<(), DataError> {
264 let expected_next_id = self.last_update_id + 1;
265 if update.first_update_id == expected_next_id {
266 Ok(())
267 } else {
268 Err(DataError::InvalidSequence {
269 prev_last_update_id: self.last_update_id,
270 first_update_id: update.first_update_id,
271 })
272 }
273 }
274}
275
276#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)]
295pub struct BinanceSpotOrderBookL2Update {
296 #[serde(
297 alias = "s",
298 deserialize_with = "super::super::book::l2::de_ob_l2_subscription_id"
299 )]
300 pub subscription_id: SubscriptionId,
301 #[serde(
302 alias = "E",
303 deserialize_with = "rustrade_integration::serde::de::de_u64_epoch_ms_as_datetime_utc"
304 )]
305 pub time_exchange: DateTime<Utc>,
306 #[serde(alias = "U")]
307 pub first_update_id: u64,
308 #[serde(alias = "u")]
309 pub last_update_id: u64,
310 #[serde(alias = "b")]
311 pub bids: Vec<BinanceLevel>,
312 #[serde(alias = "a")]
313 pub asks: Vec<BinanceLevel>,
314}
315
316impl Identifier<Option<SubscriptionId>> for BinanceSpotOrderBookL2Update {
317 fn id(&self) -> Option<SubscriptionId> {
318 Some(self.subscription_id.clone())
319 }
320}
321
322impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BinanceSpotOrderBookL2Update)>
323 for MarketIter<InstrumentKey, OrderBookEvent>
324{
325 fn from(
326 (exchange_id, instrument, update): (
327 ExchangeId,
328 InstrumentKey,
329 BinanceSpotOrderBookL2Update,
330 ),
331 ) -> Self {
332 Self(vec![Ok(MarketEvent {
333 time_exchange: update.time_exchange,
334 time_received: Utc::now(),
335 exchange: exchange_id,
336 instrument,
337 kind: OrderBookEvent::Update(OrderBook::new(
338 update.last_update_id,
339 None,
340 update.bids,
341 update.asks,
342 )),
343 })])
344 }
345}
346
347#[cfg(test)]
348#[allow(clippy::unwrap_used)] mod tests {
350 use super::*;
351 use crate::books::Level;
352 use rust_decimal_macros::dec;
353
354 #[test]
355 fn test_de_binance_spot_order_book_l2_update() {
356 let input = r#"
357 {
358 "e":"depthUpdate",
359 "E":1671656397761,
360 "s":"ETHUSDT",
361 "U":22611425143,
362 "u":22611425151,
363 "b":[
364 ["1209.67000000","85.48210000"],
365 ["1209.66000000","20.68790000"]
366 ],
367 "a":[]
368 }
369 "#;
370
371 assert_eq!(
372 serde_json::from_str::<BinanceSpotOrderBookL2Update>(input).unwrap(),
373 BinanceSpotOrderBookL2Update {
374 subscription_id: SubscriptionId::from("@depth@100ms|ETHUSDT"),
375 time_exchange: DateTime::from_timestamp_millis(1671656397761).unwrap(),
376 first_update_id: 22611425143,
377 last_update_id: 22611425151,
378 bids: vec![
379 BinanceLevel {
380 price: dec!(1209.67000000),
381 amount: dec!(85.48210000)
382 },
383 BinanceLevel {
384 price: dec!(1209.66000000),
385 amount: dec!(20.68790000)
386 },
387 ],
388 asks: vec![]
389 }
390 );
391 }
392
393 #[test]
394 fn test_sequencer_is_first_update() {
395 struct TestCase {
396 input: BinanceSpotOrderBookL2Sequencer,
397 expected: bool,
398 }
399
400 let tests = vec![
401 TestCase {
402 input: BinanceSpotOrderBookL2Sequencer::new(10),
404 expected: true,
405 },
406 TestCase {
407 input: BinanceSpotOrderBookL2Sequencer {
409 updates_processed: 10,
410 last_update_id: 100,
411 prev_last_update_id: 90,
412 },
413 expected: false,
414 },
415 ];
416
417 for (index, test) in tests.into_iter().enumerate() {
418 assert_eq!(
419 test.input.is_first_update(),
420 test.expected,
421 "TC{} failed",
422 index
423 );
424 }
425 }
426
427 #[test]
428 fn test_sequencer_validate_first_update() {
429 struct TestCase {
430 sequencer: BinanceSpotOrderBookL2Sequencer,
431 input: BinanceSpotOrderBookL2Update,
432 expected: Result<(), DataError>,
433 }
434
435 let tests = vec![
436 TestCase {
437 sequencer: BinanceSpotOrderBookL2Sequencer {
439 updates_processed: 0,
440 last_update_id: 100,
441 prev_last_update_id: 90,
442 },
443 input: BinanceSpotOrderBookL2Update {
444 subscription_id: SubscriptionId::from("subscription_id"),
445 time_exchange: Default::default(),
446 first_update_id: 100,
447 last_update_id: 110,
448 bids: vec![],
449 asks: vec![],
450 },
451 expected: Ok(()),
452 },
453 TestCase {
454 sequencer: BinanceSpotOrderBookL2Sequencer {
456 updates_processed: 0,
457 last_update_id: 100,
458 prev_last_update_id: 90,
459 },
460 input: BinanceSpotOrderBookL2Update {
461 subscription_id: SubscriptionId::from("subscription_id"),
462 time_exchange: Default::default(),
463 first_update_id: 102,
464 last_update_id: 90,
465 bids: vec![],
466 asks: vec![],
467 },
468 expected: Err(DataError::InvalidSequence {
469 prev_last_update_id: 100,
470 first_update_id: 102,
471 }),
472 },
473 TestCase {
474 sequencer: BinanceSpotOrderBookL2Sequencer {
476 updates_processed: 0,
477 last_update_id: 100,
478 prev_last_update_id: 90,
479 },
480 input: BinanceSpotOrderBookL2Update {
481 subscription_id: SubscriptionId::from("subscription_id"),
482 time_exchange: Default::default(),
483 first_update_id: 110,
484 last_update_id: 90,
485 bids: vec![],
486 asks: vec![],
487 },
488 expected: Err(DataError::InvalidSequence {
489 prev_last_update_id: 100,
490 first_update_id: 110,
491 }),
492 },
493 TestCase {
494 sequencer: BinanceSpotOrderBookL2Sequencer {
496 updates_processed: 0,
497 last_update_id: 100,
498 prev_last_update_id: 90,
499 },
500 input: BinanceSpotOrderBookL2Update {
501 subscription_id: SubscriptionId::from("subscription_id"),
502 time_exchange: Default::default(),
503 first_update_id: 110,
504 last_update_id: 90,
505 bids: vec![],
506 asks: vec![],
507 },
508 expected: Err(DataError::InvalidSequence {
509 prev_last_update_id: 100,
510 first_update_id: 110,
511 }),
512 },
513 ];
514
515 for (index, test) in tests.into_iter().enumerate() {
516 let actual = test.sequencer.validate_first_update(&test.input);
517 match (actual, test.expected) {
518 (Ok(_), Ok(_)) => {
519 }
521 (Err(actual), Err(expected)) => {
522 assert_eq!(actual, expected, "TC{index} error variant mismatch");
523 }
524 (actual, expected) => {
525 panic!(
527 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
528 );
529 }
530 }
531 }
532 }
533
534 #[test]
535 fn test_sequencer_validate_next_update() {
536 struct TestCase {
537 sequencer: BinanceSpotOrderBookL2Sequencer,
538 input: BinanceSpotOrderBookL2Update,
539 expected: Result<(), DataError>,
540 }
541
542 let tests = vec![
543 TestCase {
544 sequencer: BinanceSpotOrderBookL2Sequencer {
546 updates_processed: 100,
547 last_update_id: 100,
548 prev_last_update_id: 100,
549 },
550 input: BinanceSpotOrderBookL2Update {
551 subscription_id: SubscriptionId::from("subscription_id"),
552 time_exchange: Default::default(),
553 first_update_id: 101,
554 last_update_id: 110,
555 bids: vec![],
556 asks: vec![],
557 },
558 expected: Ok(()),
559 },
560 TestCase {
561 sequencer: BinanceSpotOrderBookL2Sequencer {
563 updates_processed: 100,
564 last_update_id: 100,
565 prev_last_update_id: 90,
566 },
567 input: BinanceSpotOrderBookL2Update {
568 subscription_id: SubscriptionId::from("subscription_id"),
569 time_exchange: Default::default(),
570 first_update_id: 120,
571 last_update_id: 130,
572 bids: vec![],
573 asks: vec![],
574 },
575 expected: Err(DataError::InvalidSequence {
576 prev_last_update_id: 100,
577 first_update_id: 120,
578 }),
579 },
580 ];
581
582 for (index, test) in tests.into_iter().enumerate() {
583 let actual = test.sequencer.validate_next_update(&test.input);
584 match (actual, test.expected) {
585 (Ok(_), Ok(_)) => {
586 }
588 (Err(actual), Err(expected)) => {
589 assert_eq!(actual, expected, "TC{index} error variant mismatch");
590 }
591 (actual, expected) => {
592 panic!(
594 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
595 );
596 }
597 }
598 }
599 }
600
601 #[test]
602 fn test_update_rustrade_order_book_with_sequenced_updates() {
603 struct TestCase {
604 sequencer: BinanceSpotOrderBookL2Sequencer,
605 book: OrderBook,
606 input_update: BinanceSpotOrderBookL2Update,
607 expected: OrderBook,
608 }
609
610 let tests = vec![
611 TestCase {
612 sequencer: BinanceSpotOrderBookL2Sequencer {
614 updates_processed: 100,
615 last_update_id: 100,
616 prev_last_update_id: 0,
617 },
618 book: OrderBook::new(100, None, vec![Level::new(50, 1)], vec![Level::new(100, 1)]),
619 input_update: BinanceSpotOrderBookL2Update {
620 subscription_id: SubscriptionId::from("subscription_id"),
621 time_exchange: Default::default(),
622 first_update_id: 0,
623 last_update_id: 100, bids: vec![],
625 asks: vec![],
626 },
627 expected: OrderBook::new(
628 100,
629 None,
630 vec![Level::new(50, 1)],
631 vec![Level::new(100, 1)],
632 ),
633 },
634 TestCase {
635 sequencer: BinanceSpotOrderBookL2Sequencer {
637 updates_processed: 100,
638 last_update_id: 100,
639 prev_last_update_id: 100,
640 },
641 book: OrderBook::new(
642 100,
643 None,
644 vec![Level::new(80, 1), Level::new(100, 1), Level::new(90, 1)],
645 vec![Level::new(150, 1), Level::new(110, 1), Level::new(120, 1)],
646 ),
647 input_update: BinanceSpotOrderBookL2Update {
648 subscription_id: SubscriptionId::from("subscription_id"),
649 time_exchange: Default::default(),
650 first_update_id: 101,
651 last_update_id: 110,
652 bids: vec![
653 BinanceLevel {
655 price: dec!(80),
656 amount: dec!(0),
657 },
658 BinanceLevel {
660 price: dec!(90),
661 amount: dec!(10),
662 },
663 ],
664 asks: vec![
665 BinanceLevel {
667 price: dec!(200),
668 amount: dec!(1),
669 },
670 BinanceLevel {
672 price: dec!(500),
673 amount: dec!(0),
674 },
675 ],
676 },
677 expected: OrderBook::new(
678 110,
679 None,
680 vec![Level::new(100, 1), Level::new(90, 10)],
681 vec![
682 Level::new(110, 1),
683 Level::new(120, 1),
684 Level::new(150, 1),
685 Level::new(200, 1),
686 ],
687 ),
688 },
689 ];
690
691 for (index, mut test) in tests.into_iter().enumerate() {
692 if let Some(valid_update) = test.sequencer.validate_sequence(test.input_update).unwrap()
693 {
694 let rustrade_update = OrderBookEvent::Update(OrderBook::new(
695 valid_update.last_update_id,
696 None,
697 valid_update.bids,
698 valid_update.asks,
699 ));
700
701 test.book.update(&rustrade_update);
702 }
703
704 assert_eq!(test.book, test.expected, "TC{index} failed");
705 }
706 }
707}