Skip to main content

rustrade_data/exchange/bitfinex/
validator.rs

1use super::subscription::{BitfinexPlatformEvent, BitfinexSubResponse};
2use crate::{
3    Identifier,
4    exchange::{Connector, ExchangeSub},
5    subscriber::validator::SubscriptionValidator,
6    subscription::{Map, SubscriptionKind},
7};
8use futures::StreamExt;
9use rustrade_integration::{
10    Validator,
11    error::SocketError,
12    protocol::{
13        StreamParser,
14        websocket::{WebSocket, WebSocketSerdeParser, WsMessage},
15    },
16    subscription::SubscriptionId,
17};
18use serde::{Deserialize, Serialize};
19use smol_str::ToSmolStr;
20use tracing::debug;
21
22/// [`Bitfinex`](super::Bitfinex) specific [`SubscriptionValidator`].
23///
24/// ### Notes
25/// - Required because Bitfinex has a non-self-describing data format after subscriptions have been
26///   validated.
27/// - The [`BitfinexChannelId`](super::subscription::BitfinexChannelId) is used to identify the
28///   [`Subscription`](crate::subscription::Subscription) associated with incoming
29///   events, rather than a `String` channel-market identifier.
30/// - Therefore the [`SubscriptionId`] format must change during [`BitfinexWebSocketSubValidator::validate`]
31///   to use the [`BitfinexChannelId`](super::subscription::BitfinexChannelId)
32///   (see module level "SubscriptionId" documentation notes for more details).
33#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
34pub struct BitfinexWebSocketSubValidator;
35
36impl SubscriptionValidator for BitfinexWebSocketSubValidator {
37    type Parser = WebSocketSerdeParser;
38
39    async fn validate<Exchange, Instrument, Kind>(
40        mut instrument_map: Map<Instrument>,
41        websocket: &mut WebSocket,
42    ) -> Result<(Map<Instrument>, Vec<WsMessage>), SocketError>
43    where
44        Exchange: Connector + Send,
45        Instrument: Send,
46        Kind: SubscriptionKind + Send,
47    {
48        // Establish exchange specific subscription validation parameters
49        let timeout = Exchange::subscription_timeout();
50        let expected_responses = Exchange::expected_responses(&instrument_map);
51
52        // Parameter to keep track of successful Subscription outcomes
53        // '--> Bitfinex sends snapshots as the first message, so count them also
54        let mut success_responses = 0usize;
55        let mut init_snapshots_received = 0usize;
56
57        // Buffer any active Subscription market events that are received during validation
58        let mut buff_active_subscription_events = Vec::new();
59
60        loop {
61            // Break if all Subscriptions were a success
62            if success_responses == expected_responses
63                && init_snapshots_received == expected_responses
64            {
65                debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
66                break Ok((instrument_map, buff_active_subscription_events));
67            }
68
69            tokio::select! {
70                // If timeout reached, return SubscribeError
71                _ = tokio::time::sleep(timeout) => {
72                    break Err(SocketError::Subscribe(
73                        format!("subscription validation timeout reached: {timeout:?}")
74                    ))
75                },
76                // Parse incoming messages and determine subscription outcomes
77                message = websocket.next() => {
78                    let response = match message {
79                        Some(response) => response,
80                        None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
81                    };
82
83                    match <WebSocketSerdeParser as StreamParser<BitfinexPlatformEvent>>::parse(response) {
84                        Some(Ok(response)) => match response.validate() {
85                            // Bitfinex server is online
86                            Ok(BitfinexPlatformEvent::PlatformStatus(status)) => {
87                                debug!(
88                                    exchange = %Exchange::ID,
89                                    %success_responses,
90                                    %expected_responses,
91                                    payload = ?status,
92                                    "received Bitfinex platform status",
93                                );
94                            }
95
96                            // Subscription success
97                            Ok(BitfinexPlatformEvent::Subscribed(response)) => {
98                                // Determine SubscriptionId associated with the success response
99                                let BitfinexSubResponse { channel, market, channel_id } = &response;
100                                let subscription_id = ExchangeSub::from((channel, market)).id();
101
102                                // Replace SubscriptionId with SubscriptionId(channel_id)
103                                if let Some(subscription) = instrument_map.0.remove(&subscription_id) {
104                                    success_responses += 1;
105                                    instrument_map.0.insert(SubscriptionId(channel_id.0.to_smolstr()), subscription);
106
107                                    debug!(
108                                        exchange = %Exchange::ID,
109                                        %success_responses,
110                                        %expected_responses,
111                                        payload = ?response,
112                                        "received valid Ok subscription response",
113                                    );
114                                }
115                            }
116
117                            // Subscription failure
118                            Err(err) => break Err(err),
119
120                            // Not reachable after BitfinexPlatformEvent validate()
121                            Ok(BitfinexPlatformEvent::Error(error)) => panic!("{error:?}"),
122                        }
123                        Some(Err(SocketError::Deserialise { error: _, payload })) if success_responses >= 1 => {
124                            // Already active Bitfinex subscriptions will send initial snapshots
125                            init_snapshots_received += 1;
126                            buff_active_subscription_events.push(WsMessage::text(payload));
127                            continue
128                        }
129                        Some(Err(SocketError::Terminated(close_frame))) => {
130                            break Err(SocketError::Subscribe(
131                                format!("received WebSocket CloseFrame: {close_frame}")
132                            ))
133                        }
134                        _ => {
135                            // Pings, Pongs, Frames, etc.
136                            continue
137                        }
138                    }
139                }
140            }
141        }
142    }
143}