barter_data/exchange/bitfinex/
validator.rs

1use super::subscription::{BitfinexPlatformEvent, BitfinexSubResponse};
2use crate::{
3    Identifier,
4    exchange::{Connector, ExchangeSub},
5    subscriber::validator::SubscriptionValidator,
6    subscription::{Map, SubscriptionKind},
7};
8use async_trait::async_trait;
9use barter_integration::{
10    Validator,
11    error::SocketError,
12    protocol::{
13        StreamParser,
14        websocket::{WebSocket, WebSocketParser, WsMessage},
15    },
16    subscription::SubscriptionId,
17};
18use futures::StreamExt;
19use serde::{Deserialize, Serialize};
20use smol_str::ToSmolStr;
21use tracing::debug;
22
23/// [`Bitfinex`](super::Bitfinex) specific [`SubscriptionValidator`].
24///
25/// ### Notes
26/// - Required because Bitfinex has a non-self-describing data format after subscriptions have been
27///   validated.
28/// - The [`BitfinexChannelId`](super::subscription::BitfinexChannelId) is used to identify the
29///   [`Subscription`](crate::subscription::Subscription) associated with incoming
30///   events, rather than a `String` channel-market identifier.
31/// - Therefore the [`SubscriptionId`] format must change during [`BitfinexWebSocketSubValidator::validate`]
32///   to use the [`BitfinexChannelId`](super::subscription::BitfinexChannelId)
33///   (see module level "SubscriptionId" documentation notes for more details).
34#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
35pub struct BitfinexWebSocketSubValidator;
36
37#[async_trait]
38impl SubscriptionValidator for BitfinexWebSocketSubValidator {
39    type Parser = WebSocketParser;
40
41    async fn validate<Exchange, Instrument, Kind>(
42        mut instrument_map: Map<Instrument>,
43        websocket: &mut WebSocket,
44    ) -> Result<(Map<Instrument>, Vec<WsMessage>), SocketError>
45    where
46        Exchange: Connector + Send,
47        Instrument: Send,
48        Kind: SubscriptionKind + Send,
49    {
50        // Establish exchange specific subscription validation parameters
51        let timeout = Exchange::subscription_timeout();
52        let expected_responses = Exchange::expected_responses(&instrument_map);
53
54        // Parameter to keep track of successful Subscription outcomes
55        // '--> Bitfinex sends snapshots as the first message, so count them also
56        let mut success_responses = 0usize;
57        let mut init_snapshots_received = 0usize;
58
59        // Buffer any active Subscription market events that are received during validation
60        let mut buff_active_subscription_events = Vec::new();
61
62        loop {
63            // Break if all Subscriptions were a success
64            if success_responses == expected_responses
65                && init_snapshots_received == expected_responses
66            {
67                debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
68                break Ok((instrument_map, buff_active_subscription_events));
69            }
70
71            tokio::select! {
72                // If timeout reached, return SubscribeError
73                _ = tokio::time::sleep(timeout) => {
74                    break Err(SocketError::Subscribe(
75                        format!("subscription validation timeout reached: {timeout:?}")
76                    ))
77                },
78                // Parse incoming messages and determine subscription outcomes
79                message = websocket.next() => {
80                    let response = match message {
81                        Some(response) => response,
82                        None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
83                    };
84
85                    match Self::Parser::parse::<BitfinexPlatformEvent>(response) {
86                        Some(Ok(response)) => match response.validate() {
87                            // Bitfinex server is online
88                            Ok(BitfinexPlatformEvent::PlatformStatus(status)) => {
89                                debug!(
90                                    exchange = %Exchange::ID,
91                                    %success_responses,
92                                    %expected_responses,
93                                    payload = ?status,
94                                    "received Bitfinex platform status",
95                                );
96                            }
97
98                            // Subscription success
99                            Ok(BitfinexPlatformEvent::Subscribed(response)) => {
100                                // Determine SubscriptionId associated with the success response
101                                let BitfinexSubResponse { channel, market, channel_id } = &response;
102                                let subscription_id = ExchangeSub::from((channel, market)).id();
103
104                                // Replace SubscriptionId with SubscriptionId(channel_id)
105                                if let Some(subscription) = instrument_map.0.remove(&subscription_id) {
106                                    success_responses += 1;
107                                    instrument_map.0.insert(SubscriptionId(channel_id.0.to_smolstr()), subscription);
108
109                                    debug!(
110                                        exchange = %Exchange::ID,
111                                        %success_responses,
112                                        %expected_responses,
113                                        payload = ?response,
114                                        "received valid Ok subscription response",
115                                    );
116                                }
117                            }
118
119                            // Subscription failure
120                            Err(err) => break Err(err),
121
122                            // Not reachable after BitfinexPlatformEvent validate()
123                            Ok(BitfinexPlatformEvent::Error(error)) => panic!("{error:?}"),
124                        }
125                        Some(Err(SocketError::Deserialise { error: _, payload })) if success_responses >= 1 => {
126                            // Already active Bitfinex subscriptions will send initial snapshots
127                            init_snapshots_received += 1;
128                            buff_active_subscription_events.push(WsMessage::text(payload));
129                            continue
130                        }
131                        Some(Err(SocketError::Terminated(close_frame))) => {
132                            break Err(SocketError::Subscribe(
133                                format!("received WebSocket CloseFrame: {close_frame}")
134                            ))
135                        }
136                        _ => {
137                            // Pings, Pongs, Frames, etc.
138                            continue
139                        }
140                    }
141                }
142            }
143        }
144    }
145}