Skip to main content

rustrade_data/subscriber/
validator.rs

1use crate::{
2    exchange::Connector,
3    subscription::{Map, SubscriptionKind},
4};
5use futures::StreamExt;
6use rustrade_integration::{
7    Validator,
8    error::SocketError,
9    protocol::{
10        StreamParser,
11        websocket::{WebSocket, WebSocketSerdeParser, WsMessage},
12    },
13};
14use serde::{Deserialize, Serialize};
15use std::future::Future;
16use tracing::debug;
17
18/// Defines how to validate that actioned market data
19/// [`Subscription`](crate::subscription::Subscription)s were accepted by the exchange.
20pub trait SubscriptionValidator {
21    type Parser;
22
23    fn validate<Exchange, InstrumentKey, Kind>(
24        instrument_map: Map<InstrumentKey>,
25        websocket: &mut WebSocket,
26    ) -> impl Future<Output = Result<(Map<InstrumentKey>, Vec<WsMessage>), SocketError>> + Send
27    where
28        Exchange: Connector + Send,
29        InstrumentKey: Send,
30        Kind: SubscriptionKind + Send;
31}
32
33/// Standard [`SubscriptionValidator`] for [`WebSocket`]s suitable for most exchanges.
34#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
35pub struct WebSocketSubValidator;
36
37impl SubscriptionValidator for WebSocketSubValidator {
38    type Parser = WebSocketSerdeParser;
39
40    async fn validate<Exchange, Instrument, Kind>(
41        instrument_map: Map<Instrument>,
42        websocket: &mut WebSocket,
43    ) -> Result<(Map<Instrument>, Vec<WsMessage>), SocketError>
44    where
45        Exchange: Connector + Send,
46        Instrument: Send,
47        Kind: SubscriptionKind + Send,
48    {
49        // Establish exchange specific subscription validation parameters
50        let timeout = Exchange::subscription_timeout();
51        let expected_responses = Exchange::expected_responses(&instrument_map);
52
53        // Parameter to keep track of successful Subscription outcomes
54        let mut success_responses = 0usize;
55
56        // Buffer any active Subscription market events that are received during validation
57        let mut buff_active_subscription_events = Vec::new();
58
59        loop {
60            // Break if all Subscriptions were a success
61            if success_responses == expected_responses {
62                debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
63                break Ok((instrument_map, buff_active_subscription_events));
64            }
65
66            tokio::select! {
67                // If timeout reached, return SubscribeError
68                _ = tokio::time::sleep(timeout) => {
69                    break Err(SocketError::Subscribe(
70                        format!("subscription validation timeout reached: {timeout:?}")
71                    ))
72                },
73                // Parse incoming messages and determine subscription outcomes
74                message = websocket.next() => {
75                    let response = match message {
76                        Some(response) => response,
77                        None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
78                    };
79
80                    match <WebSocketSerdeParser as StreamParser<Exchange::SubResponse>>::parse(response) {
81                        Some(Ok(response)) => match response.validate() {
82                            // Subscription success
83                            Ok(response) => {
84                                success_responses += 1;
85                                debug!(
86                                    exchange = %Exchange::ID,
87                                    %success_responses,
88                                    %expected_responses,
89                                    payload = ?response,
90                                    "received valid Ok subscription response",
91                                );
92                            }
93
94                            // Subscription failure
95                            Err(err) => break Err(err)
96                        }
97                        Some(Err(SocketError::Deserialise { error: _, payload })) => {
98                            // Most likely already active subscription payload, so add to market
99                            // event buffer for post validation processing
100                            buff_active_subscription_events.push(WsMessage::text(payload));
101                            continue
102                        }
103                        Some(Err(SocketError::Terminated(close_frame))) => {
104                            break Err(SocketError::Subscribe(
105                                format!("received WebSocket CloseFrame: {close_frame}")
106                            ))
107                        }
108                        _ => {
109                            // Pings, Pongs, Frames, etc.
110                            continue
111                        }
112                    }
113                }
114            }
115        }
116    }
117}