barter_data/subscriber/
validator.rs

1use crate::{
2    exchange::Connector,
3    subscription::{Map, SubscriptionKind},
4};
5use async_trait::async_trait;
6use barter_integration::{
7    Validator,
8    error::SocketError,
9    protocol::{
10        StreamParser,
11        websocket::{WebSocket, WebSocketParser, WsMessage},
12    },
13};
14use futures::StreamExt;
15use serde::{Deserialize, Serialize};
16use tracing::debug;
17
18/// Defines how to validate that actioned market data
19/// [`Subscription`](crate::subscription::Subscription)s were accepted by the exchange.
20#[async_trait]
21pub trait SubscriptionValidator {
22    type Parser: StreamParser;
23
24    async fn validate<Exchange, InstrumentKey, Kind>(
25        instrument_map: Map<InstrumentKey>,
26        websocket: &mut WebSocket,
27    ) -> Result<(Map<InstrumentKey>, Vec<WsMessage>), SocketError>
28    where
29        Exchange: Connector + Send,
30        InstrumentKey: Send,
31        Kind: SubscriptionKind + Send;
32}
33
34/// Standard [`SubscriptionValidator`] for [`WebSocket`]s suitable for most exchanges.
35#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
36pub struct WebSocketSubValidator;
37
38#[async_trait]
39impl SubscriptionValidator for WebSocketSubValidator {
40    type Parser = WebSocketParser;
41
42    async fn validate<Exchange, Instrument, Kind>(
43        instrument_map: Map<Instrument>,
44        websocket: &mut WebSocket,
45    ) -> Result<(Map<Instrument>, Vec<WsMessage>), SocketError>
46    where
47        Exchange: Connector + Send,
48        Instrument: Send,
49        Kind: SubscriptionKind + Send,
50    {
51        // Establish exchange specific subscription validation parameters
52        let timeout = Exchange::subscription_timeout();
53        let expected_responses = Exchange::expected_responses(&instrument_map);
54
55        // Parameter to keep track of successful Subscription outcomes
56        let mut success_responses = 0usize;
57
58        // Buffer any active Subscription market events that are received during validation
59        let mut buff_active_subscription_events = Vec::new();
60
61        loop {
62            // Break if all Subscriptions were a success
63            if success_responses == expected_responses {
64                debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
65                break Ok((instrument_map, buff_active_subscription_events));
66            }
67
68            tokio::select! {
69                // If timeout reached, return SubscribeError
70                _ = tokio::time::sleep(timeout) => {
71                    break Err(SocketError::Subscribe(
72                        format!("subscription validation timeout reached: {timeout:?}")
73                    ))
74                },
75                // Parse incoming messages and determine subscription outcomes
76                message = websocket.next() => {
77                    let response = match message {
78                        Some(response) => response,
79                        None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
80                    };
81
82                    match Self::Parser::parse::<Exchange::SubResponse>(response) {
83                        Some(Ok(response)) => match response.validate() {
84                            // Subscription success
85                            Ok(response) => {
86                                success_responses += 1;
87                                debug!(
88                                    exchange = %Exchange::ID,
89                                    %success_responses,
90                                    %expected_responses,
91                                    payload = ?response,
92                                    "received valid Ok subscription response",
93                                );
94                            }
95
96                            // Subscription failure
97                            Err(err) => break Err(err)
98                        }
99                        Some(Err(SocketError::Deserialise { error: _, payload })) => {
100                            // Most likely already active subscription payload, so add to market
101                            // event buffer for post validation processing
102                            buff_active_subscription_events.push(WsMessage::text(payload));
103                            continue
104                        }
105                        Some(Err(SocketError::Terminated(close_frame))) => {
106                            break Err(SocketError::Subscribe(
107                                format!("received WebSocket CloseFrame: {close_frame}")
108                            ))
109                        }
110                        _ => {
111                            // Pings, Pongs, Frames, etc.
112                            continue
113                        }
114                    }
115                }
116            }
117        }
118    }
119}