1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use super::subscription::{BitfinexPlatformEvent, BitfinexSubResponse};
use crate::{
    exchange::{Connector, ExchangeSub},
    subscriber::validator::SubscriptionValidator,
    subscription::{Map, SubKind},
    Identifier,
};
use async_trait::async_trait;
use barter_integration::{
    error::SocketError,
    model::{instrument::Instrument, SubscriptionId},
    protocol::{
        websocket::{WebSocket, WebSocketParser},
        StreamParser,
    },
    Validator,
};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tracing::debug;

/// [`Bitfinex`](super::Bitfinex) specific [`SubscriptionValidator`].
///
/// ### Notes
/// - Required because Bitfinex has a non-self-describing data format after subscriptions have been
///   validated.
/// - The [`BitfinexChannelId`](super::subscription::BitfinexChannelId) is used to identify the
///   [`Subscription`](crate::subscription::Subscription) associated with incoming
///   events, rather than a `String` channel-market identifier.
/// - Therefore the [`SubscriptionId`] format must change during [`BitfinexWebSocketSubValidator::validate`]
///   to use the [`BitfinexChannelId`](super::subscription::BitfinexChannelId)
///   (see module level "SubscriptionId" documentation notes for more details).
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
pub struct BitfinexWebSocketSubValidator;

#[async_trait]
impl SubscriptionValidator for BitfinexWebSocketSubValidator {
    type Parser = WebSocketParser;

    async fn validate<Exchange, Kind>(
        mut map: Map<Instrument>,
        websocket: &mut WebSocket,
    ) -> Result<Map<Instrument>, SocketError>
    where
        Exchange: Connector + Send,
        Kind: SubKind + Send,
    {
        // Establish exchange specific subscription validation parameters
        let timeout = Exchange::subscription_timeout();
        let expected_responses = Exchange::expected_responses(&map);

        // Parameter to keep track of successful Subscription outcomes
        // '--> Bitfinex sends snapshots as the first message, so count them also
        let mut success_responses = 0usize;
        let mut init_snapshots_received = 0usize;

        loop {
            // Break if all Subscriptions were a success
            if success_responses == expected_responses
                && init_snapshots_received == expected_responses
            {
                debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
                break Ok(map);
            }

            tokio::select! {
                // If timeout reached, return SubscribeError
                _ = tokio::time::sleep(timeout) => {
                    break Err(SocketError::Subscribe(
                        format!("subscription validation timeout reached: {:?}", timeout)
                    ))
                },
                // Parse incoming messages and determine subscription outcomes
                message = websocket.next() => {
                    let response = match message {
                        Some(response) => response,
                        None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
                    };

                    match Self::Parser::parse::<BitfinexPlatformEvent>(response) {
                        Some(Ok(response)) => match response.validate() {
                            // Bitfinex server is online
                            Ok(BitfinexPlatformEvent::PlatformStatus(status)) => {
                                debug!(
                                    exchange = %Exchange::ID,
                                    %success_responses,
                                    %expected_responses,
                                    payload = ?status,
                                    "received Bitfinex platform status",
                                );
                            }

                            // Subscription success
                            Ok(BitfinexPlatformEvent::Subscribed(response)) => {
                                // Determine SubscriptionId associated with the success response
                                let BitfinexSubResponse { channel, market, channel_id } = &response;
                                let subscription_id = ExchangeSub::from((channel, market)).id();

                                // Replace SubscriptionId with SubscriptionId(channel_id)
                                if let Some(subscription) = map.0.remove(&subscription_id) {
                                    success_responses += 1;
                                    map.0.insert(SubscriptionId(channel_id.0.to_string()), subscription);

                                    debug!(
                                        exchange = %Exchange::ID,
                                        %success_responses,
                                        %expected_responses,
                                        payload = ?response,
                                        "received valid Ok subscription response",
                                    );
                                }
                            }

                            // Subscription failure
                            Err(err) => break Err(err),

                            // Not reachable after BitfinexPlatformEvent validate()
                            Ok(BitfinexPlatformEvent::Error(error)) => panic!("{error:?}"),
                        }
                        Some(Err(SocketError::Deserialise { error, payload })) if success_responses >= 1 => {
                            // Already active Bitfinex subscriptions will send initial snapshots
                            init_snapshots_received += 1;
                            debug!(
                                exchange = %Exchange::ID,
                                ?error,
                                %success_responses,
                                %expected_responses,
                                %payload,
                                "failed to deserialise non SubResponse payload"
                            );
                            continue
                        }
                        Some(Err(SocketError::Terminated(close_frame))) => {
                            break Err(SocketError::Subscribe(
                                format!("received WebSocket CloseFrame: {close_frame}")
                            ))
                        }
                        _ => {
                            // Pings, Pongs, Frames, etc.
                            continue
                        }
                    }
                }
            }
        }
    }
}