rustrade_data/exchange/bitfinex/validator.rs
1use super::subscription::{BitfinexPlatformEvent, BitfinexSubResponse};
2use crate::{
3 Identifier,
4 exchange::{Connector, ExchangeSub},
5 subscriber::validator::SubscriptionValidator,
6 subscription::{Map, SubscriptionKind},
7};
8use futures::StreamExt;
9use rustrade_integration::{
10 Validator,
11 error::SocketError,
12 protocol::{
13 StreamParser,
14 websocket::{WebSocket, WebSocketSerdeParser, WsMessage},
15 },
16 subscription::SubscriptionId,
17};
18use serde::{Deserialize, Serialize};
19use smol_str::ToSmolStr;
20use tracing::debug;
21
22/// [`Bitfinex`](super::Bitfinex) specific [`SubscriptionValidator`].
23///
24/// ### Notes
25/// - Required because Bitfinex has a non-self-describing data format after subscriptions have been
26/// validated.
27/// - The [`BitfinexChannelId`](super::subscription::BitfinexChannelId) is used to identify the
28/// [`Subscription`](crate::subscription::Subscription) associated with incoming
29/// events, rather than a `String` channel-market identifier.
30/// - Therefore the [`SubscriptionId`] format must change during [`BitfinexWebSocketSubValidator::validate`]
31/// to use the [`BitfinexChannelId`](super::subscription::BitfinexChannelId)
32/// (see module level "SubscriptionId" documentation notes for more details).
33#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
34pub struct BitfinexWebSocketSubValidator;
35
36impl SubscriptionValidator for BitfinexWebSocketSubValidator {
37 type Parser = WebSocketSerdeParser;
38
39 async fn validate<Exchange, Instrument, Kind>(
40 mut instrument_map: Map<Instrument>,
41 websocket: &mut WebSocket,
42 ) -> Result<(Map<Instrument>, Vec<WsMessage>), SocketError>
43 where
44 Exchange: Connector + Send,
45 Instrument: Send,
46 Kind: SubscriptionKind + Send,
47 {
48 // Establish exchange specific subscription validation parameters
49 let timeout = Exchange::subscription_timeout();
50 let expected_responses = Exchange::expected_responses(&instrument_map);
51
52 // Parameter to keep track of successful Subscription outcomes
53 // '--> Bitfinex sends snapshots as the first message, so count them also
54 let mut success_responses = 0usize;
55 let mut init_snapshots_received = 0usize;
56
57 // Buffer any active Subscription market events that are received during validation
58 let mut buff_active_subscription_events = Vec::new();
59
60 loop {
61 // Break if all Subscriptions were a success
62 if success_responses == expected_responses
63 && init_snapshots_received == expected_responses
64 {
65 debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
66 break Ok((instrument_map, buff_active_subscription_events));
67 }
68
69 tokio::select! {
70 // If timeout reached, return SubscribeError
71 _ = tokio::time::sleep(timeout) => {
72 break Err(SocketError::Subscribe(
73 format!("subscription validation timeout reached: {timeout:?}")
74 ))
75 },
76 // Parse incoming messages and determine subscription outcomes
77 message = websocket.next() => {
78 let response = match message {
79 Some(response) => response,
80 None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
81 };
82
83 match <WebSocketSerdeParser as StreamParser<BitfinexPlatformEvent>>::parse(response) {
84 Some(Ok(response)) => match response.validate() {
85 // Bitfinex server is online
86 Ok(BitfinexPlatformEvent::PlatformStatus(status)) => {
87 debug!(
88 exchange = %Exchange::ID,
89 %success_responses,
90 %expected_responses,
91 payload = ?status,
92 "received Bitfinex platform status",
93 );
94 }
95
96 // Subscription success
97 Ok(BitfinexPlatformEvent::Subscribed(response)) => {
98 // Determine SubscriptionId associated with the success response
99 let BitfinexSubResponse { channel, market, channel_id } = &response;
100 let subscription_id = ExchangeSub::from((channel, market)).id();
101
102 // Replace SubscriptionId with SubscriptionId(channel_id)
103 if let Some(subscription) = instrument_map.0.remove(&subscription_id) {
104 success_responses += 1;
105 instrument_map.0.insert(SubscriptionId(channel_id.0.to_smolstr()), subscription);
106
107 debug!(
108 exchange = %Exchange::ID,
109 %success_responses,
110 %expected_responses,
111 payload = ?response,
112 "received valid Ok subscription response",
113 );
114 }
115 }
116
117 // Subscription failure
118 Err(err) => break Err(err),
119
120 // Not reachable after BitfinexPlatformEvent validate()
121 Ok(BitfinexPlatformEvent::Error(error)) => panic!("{error:?}"),
122 }
123 Some(Err(SocketError::Deserialise { error: _, payload })) if success_responses >= 1 => {
124 // Already active Bitfinex subscriptions will send initial snapshots
125 init_snapshots_received += 1;
126 buff_active_subscription_events.push(WsMessage::text(payload));
127 continue
128 }
129 Some(Err(SocketError::Terminated(close_frame))) => {
130 break Err(SocketError::Subscribe(
131 format!("received WebSocket CloseFrame: {close_frame}")
132 ))
133 }
134 _ => {
135 // Pings, Pongs, Frames, etc.
136 continue
137 }
138 }
139 }
140 }
141 }
142 }
143}