barter_data/subscriber/
mod.rs

1use self::{
2    mapper::{SubscriptionMapper, WebSocketSubMapper},
3    validator::SubscriptionValidator,
4};
5use crate::{
6    Identifier,
7    exchange::Connector,
8    instrument::InstrumentData,
9    subscription::{Map, Subscription, SubscriptionKind, SubscriptionMeta},
10};
11use async_trait::async_trait;
12use barter_integration::{
13    error::SocketError,
14    protocol::websocket::{WebSocket, WsMessage, connect},
15};
16use futures::SinkExt;
17use serde::{Deserialize, Serialize};
18use std::fmt::Debug;
19use tracing::debug;
20
21/// [`SubscriptionMapper`] implementations defining how to map a
22/// collection of Barter [`Subscription`]s into execution specific [`SubscriptionMeta`].
23pub mod mapper;
24
25/// [`SubscriptionValidator`] implementations defining how to
26/// validate actioned [`Subscription`]s were successful.
27pub mod validator;
28
29/// Defines how to connect to a socket and subscribe to market data streams.
30#[async_trait]
31pub trait Subscriber {
32    type SubMapper: SubscriptionMapper;
33
34    async fn subscribe<Exchange, Instrument, Kind>(
35        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
36    ) -> Result<Subscribed<Instrument::Key>, SocketError>
37    where
38        Exchange: Connector + Send + Sync,
39        Kind: SubscriptionKind + Send + Sync,
40        Instrument: InstrumentData,
41        Subscription<Exchange, Instrument, Kind>:
42            Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
43}
44
45#[derive(Debug)]
46pub struct Subscribed<InstrumentKey> {
47    pub websocket: WebSocket,
48    pub map: Map<InstrumentKey>,
49    pub buffered_websocket_events: Vec<WsMessage>,
50}
51
52/// Standard [`Subscriber`] for [`WebSocket`]s suitable for most exchanges.
53#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
54pub struct WebSocketSubscriber;
55
56#[async_trait]
57impl Subscriber for WebSocketSubscriber {
58    type SubMapper = WebSocketSubMapper;
59
60    async fn subscribe<Exchange, Instrument, Kind>(
61        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
62    ) -> Result<Subscribed<Instrument::Key>, SocketError>
63    where
64        Exchange: Connector + Send + Sync,
65        Kind: SubscriptionKind + Send + Sync,
66        Instrument: InstrumentData,
67        Subscription<Exchange, Instrument, Kind>:
68            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
69    {
70        // Define variables for logging ergonomics
71        let exchange = Exchange::ID;
72        let url = Exchange::url()?;
73        debug!(%exchange, %url, ?subscriptions, "subscribing to WebSocket");
74
75        // Connect to execution
76        let mut websocket = connect(url).await?;
77        debug!(%exchange, ?subscriptions, "connected to WebSocket");
78
79        // Map &[Subscription<Exchange, Kind>] to SubscriptionMeta
80        let SubscriptionMeta {
81            instrument_map,
82            ws_subscriptions,
83        } = Self::SubMapper::map::<Exchange, Instrument, Kind>(subscriptions);
84
85        // Send Subscriptions over WebSocket
86        for subscription in ws_subscriptions {
87            debug!(%exchange, payload = ?subscription, "sending execution subscription");
88            websocket.send(subscription).await?;
89        }
90
91        // Validate Subscription responses
92        let (map, buffered_websocket_events) = Exchange::SubValidator::validate::<
93            Exchange,
94            Instrument::Key,
95            Kind,
96        >(instrument_map, &mut websocket)
97        .await?;
98
99        debug!(%exchange, "successfully initialised WebSocket stream with confirmed Subscriptions");
100        Ok(Subscribed {
101            websocket,
102            map,
103            buffered_websocket_events,
104        })
105    }
106}