rustrade_data/subscriber/
mod.rs1use self::{
2 mapper::{SubscriptionMapper, WebSocketSubMapper},
3 validator::SubscriptionValidator,
4};
5use crate::{
6 Identifier,
7 exchange::Connector,
8 instrument::InstrumentData,
9 subscription::{Map, Subscription, SubscriptionKind, SubscriptionMeta},
10};
11use futures::SinkExt;
12use rustrade_integration::{
13 error::SocketError,
14 protocol::websocket::{WebSocket, WsMessage, connect},
15};
16use serde::{Deserialize, Serialize};
17use std::{fmt::Debug, future::Future};
18use tracing::debug;
19
20pub mod mapper;
23
24pub mod validator;
27
28pub trait Subscriber: Clone + Send + Sync {
34 type SubMapper: SubscriptionMapper;
35
36 fn subscribe<Exchange, Instrument, Kind>(
37 &self,
38 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
39 ) -> impl Future<Output = Result<Subscribed<Instrument::Key>, SocketError>> + Send
40 where
41 Exchange: Connector + Send + Sync,
42 Kind: SubscriptionKind + Send + Sync,
43 Instrument: InstrumentData,
44 Subscription<Exchange, Instrument, Kind>:
45 Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
46}
47
48#[derive(Debug)]
49pub struct Subscribed<InstrumentKey> {
50 pub websocket: WebSocket,
51 pub map: Map<InstrumentKey>,
52 pub buffered_websocket_events: Vec<WsMessage>,
53}
54
55#[derive(
59 Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default, Deserialize, Serialize,
60)]
61pub struct WebSocketSubscriber;
62
63impl Subscriber for WebSocketSubscriber {
64 type SubMapper = WebSocketSubMapper;
65
66 async fn subscribe<Exchange, Instrument, Kind>(
67 &self,
68 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
69 ) -> Result<Subscribed<Instrument::Key>, SocketError>
70 where
71 Exchange: Connector + Send + Sync,
72 Kind: SubscriptionKind + Send + Sync,
73 Instrument: InstrumentData,
74 Subscription<Exchange, Instrument, Kind>:
75 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
76 {
77 let exchange = Exchange::ID;
79 let url = Exchange::url()?;
80 debug!(%exchange, %url, ?subscriptions, "subscribing to WebSocket");
81
82 let mut websocket = connect(url).await?;
84 debug!(%exchange, ?subscriptions, "connected to WebSocket");
85
86 let SubscriptionMeta {
88 instrument_map,
89 ws_subscriptions,
90 } = Self::SubMapper::map::<Exchange, Instrument, Kind>(subscriptions);
91
92 for subscription in ws_subscriptions {
94 debug!(%exchange, payload = ?subscription, "sending exchange subscription");
95 websocket
96 .send(subscription)
97 .await
98 .map_err(|error| SocketError::WebSocket(Box::new(error)))?;
99 }
100
101 let (map, buffered_websocket_events) = Exchange::SubValidator::validate::<
103 Exchange,
104 Instrument::Key,
105 Kind,
106 >(instrument_map, &mut websocket)
107 .await?;
108
109 debug!(%exchange, "successfully initialised WebSocket stream with confirmed Subscriptions");
110 Ok(Subscribed {
111 websocket,
112 map,
113 buffered_websocket_events,
114 })
115 }
116}