barter_data/subscriber/
validator.rs1use crate::{
2 exchange::Connector,
3 subscription::{Map, SubscriptionKind},
4};
5use async_trait::async_trait;
6use barter_integration::{
7 Validator,
8 error::SocketError,
9 protocol::{
10 StreamParser,
11 websocket::{WebSocket, WebSocketParser, WsMessage},
12 },
13};
14use futures::StreamExt;
15use serde::{Deserialize, Serialize};
16use tracing::debug;
17
18#[async_trait]
21pub trait SubscriptionValidator {
22 type Parser: StreamParser;
23
24 async fn validate<Exchange, InstrumentKey, Kind>(
25 instrument_map: Map<InstrumentKey>,
26 websocket: &mut WebSocket,
27 ) -> Result<(Map<InstrumentKey>, Vec<WsMessage>), SocketError>
28 where
29 Exchange: Connector + Send,
30 InstrumentKey: Send,
31 Kind: SubscriptionKind + Send;
32}
33
34#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
36pub struct WebSocketSubValidator;
37
38#[async_trait]
39impl SubscriptionValidator for WebSocketSubValidator {
40 type Parser = WebSocketParser;
41
42 async fn validate<Exchange, Instrument, Kind>(
43 instrument_map: Map<Instrument>,
44 websocket: &mut WebSocket,
45 ) -> Result<(Map<Instrument>, Vec<WsMessage>), SocketError>
46 where
47 Exchange: Connector + Send,
48 Instrument: Send,
49 Kind: SubscriptionKind + Send,
50 {
51 let timeout = Exchange::subscription_timeout();
53 let expected_responses = Exchange::expected_responses(&instrument_map);
54
55 let mut success_responses = 0usize;
57
58 let mut buff_active_subscription_events = Vec::new();
60
61 loop {
62 if success_responses == expected_responses {
64 debug!(exchange = %Exchange::ID, "validated exchange WebSocket subscriptions");
65 break Ok((instrument_map, buff_active_subscription_events));
66 }
67
68 tokio::select! {
69 _ = tokio::time::sleep(timeout) => {
71 break Err(SocketError::Subscribe(
72 format!("subscription validation timeout reached: {timeout:?}")
73 ))
74 },
75 message = websocket.next() => {
77 let response = match message {
78 Some(response) => response,
79 None => break Err(SocketError::Subscribe("WebSocket stream terminated unexpectedly".to_string()))
80 };
81
82 match Self::Parser::parse::<Exchange::SubResponse>(response) {
83 Some(Ok(response)) => match response.validate() {
84 Ok(response) => {
86 success_responses += 1;
87 debug!(
88 exchange = %Exchange::ID,
89 %success_responses,
90 %expected_responses,
91 payload = ?response,
92 "received valid Ok subscription response",
93 );
94 }
95
96 Err(err) => break Err(err)
98 }
99 Some(Err(SocketError::Deserialise { error: _, payload })) => {
100 buff_active_subscription_events.push(WsMessage::text(payload));
103 continue
104 }
105 Some(Err(SocketError::Terminated(close_frame))) => {
106 break Err(SocketError::Subscribe(
107 format!("received WebSocket CloseFrame: {close_frame}")
108 ))
109 }
110 _ => {
111 continue
113 }
114 }
115 }
116 }
117 }
118 }
119}