rustrade_data/subscriber/
validator.rs1use crate::{
2 exchange::Connector,
3 subscription::{Map, SubscriptionKind},
4};
5use futures::StreamExt;
6use rustrade_integration::{
7 Validator,
8 error::SocketError,
9 protocol::{
10 StreamParser,
11 websocket::{WebSocket, WebSocketSerdeParser, WsMessage},
12 },
13};
14use serde::{Deserialize, Serialize};
15use std::future::Future;
16use tracing::debug;
17
18pub trait SubscriptionValidator {
21 type Parser;
22
23 fn validate<Exchange, InstrumentKey, Kind>(
24 instrument_map: Map<InstrumentKey>,
25 websocket: &mut WebSocket,
26 ) -> impl Future<Output = Result<(Map<InstrumentKey>, Vec<WsMessage>), SocketError>> + Send
27 where
28 Exchange: Connector + Send,
29 InstrumentKey: Send,
30 Kind: SubscriptionKind + Send;
31}
32
33#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
35pub struct WebSocketSubValidator;
36
37impl SubscriptionValidator for WebSocketSubValidator {
38 type Parser = WebSocketSerdeParser;
39
40 async fn validate<Exchange, Instrument, Kind>(
41 instrument_map: Map<Instrument>,
42 websocket: &mut WebSocket,
43 ) -> Result<(Map<Instrument>, Vec<WsMessage>), SocketError>
44 where
45 Exchange: Connector + Send,
46 Instrument: Send,
47 Kind: SubscriptionKind + Send,
48 {
49 let timeout = Exchange::subscription_timeout();
51 let expected_responses = Exchange::expected_responses(&instrument_map);
52
53 let mut success_responses = 0usize;
55
56 let mut buff_active_subscription_events = Vec::new();
58
59 loop {
60 if success_responses == expected_responses {
62 debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
63 break Ok((instrument_map, buff_active_subscription_events));
64 }
65
66 tokio::select! {
67 _ = tokio::time::sleep(timeout) => {
69 break Err(SocketError::Subscribe(
70 format!("subscription validation timeout reached: {timeout:?}")
71 ))
72 },
73 message = websocket.next() => {
75 let response = match message {
76 Some(response) => response,
77 None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
78 };
79
80 match <WebSocketSerdeParser as StreamParser<Exchange::SubResponse>>::parse(response) {
81 Some(Ok(response)) => match response.validate() {
82 Ok(response) => {
84 success_responses += 1;
85 debug!(
86 exchange = %Exchange::ID,
87 %success_responses,
88 %expected_responses,
89 payload = ?response,
90 "received valid Ok subscription response",
91 );
92 }
93
94 Err(err) => break Err(err)
96 }
97 Some(Err(SocketError::Deserialise { error: _, payload })) => {
98 buff_active_subscription_events.push(WsMessage::text(payload));
101 continue
102 }
103 Some(Err(SocketError::Terminated(close_frame))) => {
104 break Err(SocketError::Subscribe(
105 format!("received WebSocket CloseFrame: {close_frame}")
106 ))
107 }
108 _ => {
109 continue
111 }
112 }
113 }
114 }
115 }
116 }
117}