barter_data/subscriber/
mod.rs1use self::{
2 mapper::{SubscriptionMapper, WebSocketSubMapper},
3 validator::SubscriptionValidator,
4};
5use crate::{
6 Identifier,
7 exchange::Connector,
8 instrument::InstrumentData,
9 subscription::{Map, Subscription, SubscriptionKind, SubscriptionMeta},
10};
11use async_trait::async_trait;
12use barter_integration::{
13 error::SocketError,
14 protocol::websocket::{WebSocket, WsMessage, connect},
15};
16use futures::SinkExt;
17use serde::{Deserialize, Serialize};
18use std::fmt::Debug;
19use tracing::debug;
20
21pub mod mapper;
24
25pub mod validator;
28
29#[async_trait]
31pub trait Subscriber {
32 type SubMapper: SubscriptionMapper;
33
34 async fn subscribe<Exchange, Instrument, Kind>(
35 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
36 ) -> Result<Subscribed<Instrument::Key>, SocketError>
37 where
38 Exchange: Connector + Send + Sync,
39 Kind: SubscriptionKind + Send + Sync,
40 Instrument: InstrumentData,
41 Subscription<Exchange, Instrument, Kind>:
42 Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
43}
44
45#[derive(Debug)]
46pub struct Subscribed<InstrumentKey> {
47 pub websocket: WebSocket,
48 pub map: Map<InstrumentKey>,
49 pub buffered_websocket_events: Vec<WsMessage>,
50}
51
52#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
54pub struct WebSocketSubscriber;
55
56#[async_trait]
57impl Subscriber for WebSocketSubscriber {
58 type SubMapper = WebSocketSubMapper;
59
60 async fn subscribe<Exchange, Instrument, Kind>(
61 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
62 ) -> Result<Subscribed<Instrument::Key>, SocketError>
63 where
64 Exchange: Connector + Send + Sync,
65 Kind: SubscriptionKind + Send + Sync,
66 Instrument: InstrumentData,
67 Subscription<Exchange, Instrument, Kind>:
68 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
69 {
70 let exchange = Exchange::ID;
72 let url = Exchange::url()?;
73 debug!(%exchange, %url, ?subscriptions, "subscribing to WebSocket");
74
75 let mut websocket = connect(url).await?;
77 debug!(%exchange, ?subscriptions, "connected to WebSocket");
78
79 let SubscriptionMeta {
81 instrument_map,
82 ws_subscriptions,
83 } = Self::SubMapper::map::<Exchange, Instrument, Kind>(subscriptions);
84
85 for subscription in ws_subscriptions {
87 debug!(%exchange, payload = ?subscription, "sending exchange subscription");
88 websocket
89 .send(subscription)
90 .await
91 .map_err(|error| SocketError::WebSocket(Box::new(error)))?;
92 }
93
94 let (map, buffered_websocket_events) = Exchange::SubValidator::validate::<
96 Exchange,
97 Instrument::Key,
98 Kind,
99 >(instrument_map, &mut websocket)
100 .await?;
101
102 debug!(%exchange, "successfully initialised WebSocket stream with confirmed Subscriptions");
103 Ok(Subscribed {
104 websocket,
105 map,
106 buffered_websocket_events,
107 })
108 }
109}