Skip to main content

rustrade_data/subscriber/
mod.rs

1use self::{
2    mapper::{SubscriptionMapper, WebSocketSubMapper},
3    validator::SubscriptionValidator,
4};
5use crate::{
6    Identifier,
7    exchange::Connector,
8    instrument::InstrumentData,
9    subscription::{Map, Subscription, SubscriptionKind, SubscriptionMeta},
10};
11use futures::SinkExt;
12use rustrade_integration::{
13    error::SocketError,
14    protocol::websocket::{WebSocket, WsMessage, connect},
15};
16use serde::{Deserialize, Serialize};
17use std::{fmt::Debug, future::Future};
18use tracing::debug;
19
20/// [`SubscriptionMapper`] implementations defining how to map a
21/// collection of Barter [`Subscription`]s into exchange specific [`SubscriptionMeta`].
22pub mod mapper;
23
24/// [`SubscriptionValidator`] implementations defining how to
25/// validate actioned [`Subscription`]s were successful.
26pub mod validator;
27
28/// Defines how to connect to a socket and subscribe to market data streams.
29///
30/// Subscribers may carry state such as authentication credentials.
31/// The trait requires `Clone` to support reconnection (subscribers are cloned
32/// into the reconnect closure).
33pub trait Subscriber: Clone + Send + Sync {
34    type SubMapper: SubscriptionMapper;
35
36    fn subscribe<Exchange, Instrument, Kind>(
37        &self,
38        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
39    ) -> impl Future<Output = Result<Subscribed<Instrument::Key>, SocketError>> + Send
40    where
41        Exchange: Connector + Send + Sync,
42        Kind: SubscriptionKind + Send + Sync,
43        Instrument: InstrumentData,
44        Subscription<Exchange, Instrument, Kind>:
45            Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
46}
47
48#[derive(Debug)]
49pub struct Subscribed<InstrumentKey> {
50    pub websocket: WebSocket,
51    pub map: Map<InstrumentKey>,
52    pub buffered_websocket_events: Vec<WsMessage>,
53}
54
55/// Standard [`Subscriber`] for [`WebSocket`]s suitable for most exchanges.
56///
57/// This is a stateless subscriber for unauthenticated market data streams.
58#[derive(
59    Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default, Deserialize, Serialize,
60)]
61pub struct WebSocketSubscriber;
62
63impl Subscriber for WebSocketSubscriber {
64    type SubMapper = WebSocketSubMapper;
65
66    async fn subscribe<Exchange, Instrument, Kind>(
67        &self,
68        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
69    ) -> Result<Subscribed<Instrument::Key>, SocketError>
70    where
71        Exchange: Connector + Send + Sync,
72        Kind: SubscriptionKind + Send + Sync,
73        Instrument: InstrumentData,
74        Subscription<Exchange, Instrument, Kind>:
75            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
76    {
77        // Define variables for logging ergonomics
78        let exchange = Exchange::ID;
79        let url = Exchange::url()?;
80        debug!(%exchange, %url, ?subscriptions, "subscribing to WebSocket");
81
82        // Connect to exchange
83        let mut websocket = connect(url).await?;
84        debug!(%exchange, ?subscriptions, "connected to WebSocket");
85
86        // Map &[Subscription<Exchange, Kind>] to SubscriptionMeta
87        let SubscriptionMeta {
88            instrument_map,
89            ws_subscriptions,
90        } = Self::SubMapper::map::<Exchange, Instrument, Kind>(subscriptions);
91
92        // Send Subscriptions over WebSocket
93        for subscription in ws_subscriptions {
94            debug!(%exchange, payload = ?subscription, "sending exchange subscription");
95            websocket
96                .send(subscription)
97                .await
98                .map_err(|error| SocketError::WebSocket(Box::new(error)))?;
99        }
100
101        // Validate Subscription responses
102        let (map, buffered_websocket_events) = Exchange::SubValidator::validate::<
103            Exchange,
104            Instrument::Key,
105            Kind,
106        >(instrument_map, &mut websocket)
107        .await?;
108
109        debug!(%exchange, "successfully initialised WebSocket stream with confirmed Subscriptions");
110        Ok(Subscribed {
111            websocket,
112            map,
113            buffered_websocket_events,
114        })
115    }
116}