barter_data/subscriber/
mod.rs

1use self::{
2    mapper::{SubscriptionMapper, WebSocketSubMapper},
3    validator::SubscriptionValidator,
4};
5use crate::{
6    Identifier,
7    exchange::Connector,
8    instrument::InstrumentData,
9    subscription::{Map, Subscription, SubscriptionKind, SubscriptionMeta},
10};
11use async_trait::async_trait;
12use barter_integration::{
13    error::SocketError,
14    protocol::websocket::{WebSocket, WsMessage, connect},
15};
16use futures::SinkExt;
17use serde::{Deserialize, Serialize};
18use std::fmt::Debug;
19use tracing::debug;
20
21/// [`SubscriptionMapper`] implementations defining how to map a
22/// collection of Barter [`Subscription`]s into exchange specific [`SubscriptionMeta`].
23pub mod mapper;
24
25/// [`SubscriptionValidator`] implementations defining how to
26/// validate actioned [`Subscription`]s were successful.
27pub mod validator;
28
29/// Defines how to connect to a socket and subscribe to market data streams.
30#[async_trait]
31pub trait Subscriber {
32    type SubMapper: SubscriptionMapper;
33
34    async fn subscribe<Exchange, Instrument, Kind>(
35        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
36    ) -> Result<Subscribed<Instrument::Key>, SocketError>
37    where
38        Exchange: Connector + Send + Sync,
39        Kind: SubscriptionKind + Send + Sync,
40        Instrument: InstrumentData,
41        Subscription<Exchange, Instrument, Kind>:
42            Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
43}
44
45#[derive(Debug)]
46pub struct Subscribed<InstrumentKey> {
47    pub websocket: WebSocket,
48    pub map: Map<InstrumentKey>,
49    pub buffered_websocket_events: Vec<WsMessage>,
50}
51
52/// Standard [`Subscriber`] for [`WebSocket`]s suitable for most exchanges.
53#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
54pub struct WebSocketSubscriber;
55
56#[async_trait]
57impl Subscriber for WebSocketSubscriber {
58    type SubMapper = WebSocketSubMapper;
59
60    async fn subscribe<Exchange, Instrument, Kind>(
61        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
62    ) -> Result<Subscribed<Instrument::Key>, SocketError>
63    where
64        Exchange: Connector + Send + Sync,
65        Kind: SubscriptionKind + Send + Sync,
66        Instrument: InstrumentData,
67        Subscription<Exchange, Instrument, Kind>:
68            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
69    {
70        // Define variables for logging ergonomics
71        let exchange = Exchange::ID;
72        let url = Exchange::url()?;
73        debug!(%exchange, %url, ?subscriptions, "subscribing to WebSocket");
74
75        // Connect to exchange
76        let mut websocket = connect(url).await?;
77        debug!(%exchange, ?subscriptions, "connected to WebSocket");
78
79        // Map &[Subscription<Exchange, Kind>] to SubscriptionMeta
80        let SubscriptionMeta {
81            instrument_map,
82            ws_subscriptions,
83        } = Self::SubMapper::map::<Exchange, Instrument, Kind>(subscriptions);
84
85        // Send Subscriptions over WebSocket
86        for subscription in ws_subscriptions {
87            debug!(%exchange, payload = ?subscription, "sending exchange subscription");
88            websocket
89                .send(subscription)
90                .await
91                .map_err(|error| SocketError::WebSocket(Box::new(error)))?;
92        }
93
94        // Validate Subscription responses
95        let (map, buffered_websocket_events) = Exchange::SubValidator::validate::<
96            Exchange,
97            Instrument::Key,
98            Kind,
99        >(instrument_map, &mut websocket)
100        .await?;
101
102        debug!(%exchange, "successfully initialised WebSocket stream with confirmed Subscriptions");
103        Ok(Subscribed {
104            websocket,
105            map,
106            buffered_websocket_events,
107        })
108    }
109}