barter_data/subscriber/
mod.rs1use self::{
2 mapper::{SubscriptionMapper, WebSocketSubMapper},
3 validator::SubscriptionValidator,
4};
5use crate::{
6 Identifier,
7 exchange::Connector,
8 instrument::InstrumentData,
9 subscription::{Map, Subscription, SubscriptionKind, SubscriptionMeta},
10};
11use async_trait::async_trait;
12use barter_integration::{
13 error::SocketError,
14 protocol::websocket::{WebSocket, WsMessage, connect},
15};
16use futures::SinkExt;
17use serde::{Deserialize, Serialize};
18use std::fmt::Debug;
19use tracing::debug;
20
21pub mod mapper;
24
25pub mod validator;
28
29#[async_trait]
31pub trait Subscriber {
32 type SubMapper: SubscriptionMapper;
33
34 async fn subscribe<Exchange, Instrument, Kind>(
35 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
36 ) -> Result<Subscribed<Instrument::Key>, SocketError>
37 where
38 Exchange: Connector + Send + Sync,
39 Kind: SubscriptionKind + Send + Sync,
40 Instrument: InstrumentData,
41 Subscription<Exchange, Instrument, Kind>:
42 Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
43}
44
45#[derive(Debug)]
46pub struct Subscribed<InstrumentKey> {
47 pub websocket: WebSocket,
48 pub map: Map<InstrumentKey>,
49 pub buffered_websocket_events: Vec<WsMessage>,
50}
51
52#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
54pub struct WebSocketSubscriber;
55
56#[async_trait]
57impl Subscriber for WebSocketSubscriber {
58 type SubMapper = WebSocketSubMapper;
59
60 async fn subscribe<Exchange, Instrument, Kind>(
61 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
62 ) -> Result<Subscribed<Instrument::Key>, SocketError>
63 where
64 Exchange: Connector + Send + Sync,
65 Kind: SubscriptionKind + Send + Sync,
66 Instrument: InstrumentData,
67 Subscription<Exchange, Instrument, Kind>:
68 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
69 {
70 let exchange = Exchange::ID;
72 let url = Exchange::url()?;
73 debug!(%exchange, %url, ?subscriptions, "subscribing to WebSocket");
74
75 let mut websocket = connect(url).await?;
77 debug!(%exchange, ?subscriptions, "connected to WebSocket");
78
79 let SubscriptionMeta {
81 instrument_map,
82 ws_subscriptions,
83 } = Self::SubMapper::map::<Exchange, Instrument, Kind>(subscriptions);
84
85 for subscription in ws_subscriptions {
87 debug!(%exchange, payload = ?subscription, "sending execution subscription");
88 websocket.send(subscription).await?;
89 }
90
91 let (map, buffered_websocket_events) = Exchange::SubValidator::validate::<
93 Exchange,
94 Instrument::Key,
95 Kind,
96 >(instrument_map, &mut websocket)
97 .await?;
98
99 debug!(%exchange, "successfully initialised WebSocket stream with confirmed Subscriptions");
100 Ok(Subscribed {
101 websocket,
102 map,
103 buffered_websocket_events,
104 })
105 }
106}