barter_data/exchange/bitfinex/validator.rs
1use super::subscription::{BitfinexPlatformEvent, BitfinexSubResponse};
2use crate::{
3 Identifier,
4 exchange::{Connector, ExchangeSub},
5 subscriber::validator::SubscriptionValidator,
6 subscription::{Map, SubscriptionKind},
7};
8use async_trait::async_trait;
9use barter_integration::{
10 Validator,
11 error::SocketError,
12 protocol::{
13 StreamParser,
14 websocket::{WebSocket, WebSocketParser, WsMessage},
15 },
16 subscription::SubscriptionId,
17};
18use futures::StreamExt;
19use serde::{Deserialize, Serialize};
20use smol_str::ToSmolStr;
21use tracing::debug;
22
23/// [`Bitfinex`](super::Bitfinex) specific [`SubscriptionValidator`].
24///
25/// ### Notes
26/// - Required because Bitfinex has a non-self-describing data format after subscriptions have been
27/// validated.
28/// - The [`BitfinexChannelId`](super::subscription::BitfinexChannelId) is used to identify the
29/// [`Subscription`](crate::subscription::Subscription) associated with incoming
30/// events, rather than a `String` channel-market identifier.
31/// - Therefore the [`SubscriptionId`] format must change during [`BitfinexWebSocketSubValidator::validate`]
32/// to use the [`BitfinexChannelId`](super::subscription::BitfinexChannelId)
33/// (see module level "SubscriptionId" documentation notes for more details).
34#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
35pub struct BitfinexWebSocketSubValidator;
36
37#[async_trait]
38impl SubscriptionValidator for BitfinexWebSocketSubValidator {
39 type Parser = WebSocketParser;
40
41 async fn validate<Exchange, Instrument, Kind>(
42 mut instrument_map: Map<Instrument>,
43 websocket: &mut WebSocket,
44 ) -> Result<(Map<Instrument>, Vec<WsMessage>), SocketError>
45 where
46 Exchange: Connector + Send,
47 Instrument: Send,
48 Kind: SubscriptionKind + Send,
49 {
50 // Establish exchange specific subscription validation parameters
51 let timeout = Exchange::subscription_timeout();
52 let expected_responses = Exchange::expected_responses(&instrument_map);
53
54 // Parameter to keep track of successful Subscription outcomes
55 // '--> Bitfinex sends snapshots as the first message, so count them also
56 let mut success_responses = 0usize;
57 let mut init_snapshots_received = 0usize;
58
59 // Buffer any active Subscription market events that are received during validation
60 let mut buff_active_subscription_events = Vec::new();
61
62 loop {
63 // Break if all Subscriptions were a success
64 if success_responses == expected_responses
65 && init_snapshots_received == expected_responses
66 {
67 debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
68 break Ok((instrument_map, buff_active_subscription_events));
69 }
70
71 tokio::select! {
72 // If timeout reached, return SubscribeError
73 _ = tokio::time::sleep(timeout) => {
74 break Err(SocketError::Subscribe(
75 format!("subscription validation timeout reached: {timeout:?}")
76 ))
77 },
78 // Parse incoming messages and determine subscription outcomes
79 message = websocket.next() => {
80 let response = match message {
81 Some(response) => response,
82 None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
83 };
84
85 match Self::Parser::parse::<BitfinexPlatformEvent>(response) {
86 Some(Ok(response)) => match response.validate() {
87 // Bitfinex server is online
88 Ok(BitfinexPlatformEvent::PlatformStatus(status)) => {
89 debug!(
90 exchange = %Exchange::ID,
91 %success_responses,
92 %expected_responses,
93 payload = ?status,
94 "received Bitfinex platform status",
95 );
96 }
97
98 // Subscription success
99 Ok(BitfinexPlatformEvent::Subscribed(response)) => {
100 // Determine SubscriptionId associated with the success response
101 let BitfinexSubResponse { channel, market, channel_id } = &response;
102 let subscription_id = ExchangeSub::from((channel, market)).id();
103
104 // Replace SubscriptionId with SubscriptionId(channel_id)
105 if let Some(subscription) = instrument_map.0.remove(&subscription_id) {
106 success_responses += 1;
107 instrument_map.0.insert(SubscriptionId(channel_id.0.to_smolstr()), subscription);
108
109 debug!(
110 exchange = %Exchange::ID,
111 %success_responses,
112 %expected_responses,
113 payload = ?response,
114 "received valid Ok subscription response",
115 );
116 }
117 }
118
119 // Subscription failure
120 Err(err) => break Err(err),
121
122 // Not reachable after BitfinexPlatformEvent validate()
123 Ok(BitfinexPlatformEvent::Error(error)) => panic!("{error:?}"),
124 }
125 Some(Err(SocketError::Deserialise { error: _, payload })) if success_responses >= 1 => {
126 // Already active Bitfinex subscriptions will send initial snapshots
127 init_snapshots_received += 1;
128 buff_active_subscription_events.push(WsMessage::text(payload));
129 continue
130 }
131 Some(Err(SocketError::Terminated(close_frame))) => {
132 break Err(SocketError::Subscribe(
133 format!("received WebSocket CloseFrame: {close_frame}")
134 ))
135 }
136 _ => {
137 // Pings, Pongs, Frames, etc.
138 continue
139 }
140 }
141 }
142 }
143 }
144 }
145}