barter_data/streams/
consumer.rs

1use crate::{
2    Identifier, MarketStream,
3    error::DataError,
4    event::MarketEvent,
5    exchange::StreamSelector,
6    instrument::InstrumentData,
7    streams::{
8        reconnect,
9        reconnect::stream::{
10            ReconnectingStream, ReconnectionBackoffPolicy, init_reconnecting_stream,
11        },
12    },
13    subscription::{Subscription, SubscriptionKind, display_subscriptions_without_exchange},
14};
15use barter_instrument::exchange::ExchangeId;
16use derive_more::Constructor;
17use futures::Stream;
18use serde::{Deserialize, Serialize};
19use std::fmt::Display;
20use tracing::info;
21
22/// Default [`ReconnectionBackoffPolicy`] for a [`reconnecting`](`ReconnectingStream`) [`MarketStream`].
23pub const STREAM_RECONNECTION_POLICY: ReconnectionBackoffPolicy = ReconnectionBackoffPolicy {
24    backoff_ms_initial: 125,
25    backoff_multiplier: 2,
26    backoff_ms_max: 60000,
27};
28
29/// Convenient type alias for a [`MarketEvent`] [`Result`] consumed via a
30/// [`reconnecting`](`ReconnectingStream`) [`MarketStream`].
31pub type MarketStreamResult<InstrumentKey, Kind> =
32    reconnect::Event<ExchangeId, Result<MarketEvent<InstrumentKey, Kind>, DataError>>;
33
34/// Convenient type alias for a [`MarketEvent`] consumed via a
35/// [`reconnecting`](`ReconnectingStream`) [`MarketStream`].
36pub type MarketStreamEvent<InstrumentKey, Kind> =
37    reconnect::Event<ExchangeId, MarketEvent<InstrumentKey, Kind>>;
38
39/// Initialises a [`reconnecting`](`ReconnectingStream`) [`MarketStream`] using a collection of
40/// [`Subscription`]s.
41///
42/// The provided [`ReconnectionBackoffPolicy`] dictates how the exponential backoff scales
43/// between reconnections.
44pub async fn init_market_stream<Exchange, Instrument, Kind>(
45    policy: ReconnectionBackoffPolicy,
46    subscriptions: Vec<Subscription<Exchange, Instrument, Kind>>,
47) -> Result<impl Stream<Item = MarketStreamResult<Instrument::Key, Kind::Event>>, DataError>
48where
49    Exchange: StreamSelector<Instrument, Kind>,
50    Instrument: InstrumentData + Display,
51    Kind: SubscriptionKind + Display,
52    Subscription<Exchange, Instrument, Kind>:
53        Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
54{
55    // Determine ExchangeId associated with these Subscriptions
56    let exchange = Exchange::ID;
57
58    // Determine StreamKey for use in logging
59    let stream_key = subscriptions
60        .first()
61        .map(|sub| StreamKey::new("market_stream", exchange, Some(sub.kind.as_str())))
62        .ok_or(DataError::SubscriptionsEmpty)?;
63
64    info!(
65        %exchange,
66        subscriptions = %display_subscriptions_without_exchange(&subscriptions),
67        ?policy,
68        ?stream_key,
69        "MarketStream with auto reconnect initialising"
70    );
71
72    Ok(init_reconnecting_stream(move || {
73        let subscriptions = subscriptions.clone();
74        async move { Exchange::Stream::init::<Exchange::SnapFetcher>(&subscriptions).await }
75    })
76    .await?
77    .with_reconnect_backoff(policy, stream_key)
78    .with_termination_on_error(|error| error.is_terminal(), stream_key)
79    .with_reconnection_events(exchange))
80}
81
82#[derive(
83    Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
84)]
85pub struct StreamKey<Kind = &'static str> {
86    pub stream: &'static str,
87    pub exchange: ExchangeId,
88    pub kind: Option<Kind>,
89}
90
91impl StreamKey {
92    pub fn new_general(stream: &'static str, exchange: ExchangeId) -> Self {
93        Self::new(stream, exchange, None)
94    }
95}
96
97impl std::fmt::Debug for StreamKey {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        match self.kind {
100            None => write!(f, "{}-{}", self.stream, self.exchange),
101            Some(kind) => write!(f, "{}-{}-{}", self.stream, self.exchange, kind),
102        }
103    }
104}