Skip to main content

rustrade_data/streams/
consumer.rs

1use crate::{
2    Identifier, MarketStream,
3    error::DataError,
4    event::MarketEvent,
5    exchange::StreamSelector,
6    instrument::InstrumentData,
7    streams::{
8        reconnect,
9        reconnect::stream::{
10            ReconnectingStream, ReconnectionBackoffPolicy, init_reconnecting_stream,
11        },
12    },
13    subscription::{Subscription, SubscriptionKind, display_subscriptions_without_exchange},
14};
15use derive_more::Constructor;
16use futures::Stream;
17use rustrade_instrument::exchange::ExchangeId;
18use serde::{Deserialize, Serialize};
19use std::fmt::Display;
20use tracing::info;
21
22/// Default [`ReconnectionBackoffPolicy`] for a [`reconnecting`](`ReconnectingStream`) [`MarketStream`].
23pub const STREAM_RECONNECTION_POLICY: ReconnectionBackoffPolicy = ReconnectionBackoffPolicy {
24    backoff_ms_initial: 125,
25    backoff_multiplier: 2,
26    backoff_ms_max: 60000,
27};
28
29/// Convenient type alias for a [`MarketEvent`] [`Result`] consumed via a
30/// [`reconnecting`](`ReconnectingStream`) [`MarketStream`].
31pub type MarketStreamResult<InstrumentKey, Kind> =
32    reconnect::Event<ExchangeId, Result<MarketEvent<InstrumentKey, Kind>, DataError>>;
33
34/// Convenient type alias for a [`MarketEvent`] consumed via a
35/// [`reconnecting`](`ReconnectingStream`) [`MarketStream`].
36pub type MarketStreamEvent<InstrumentKey, Kind> =
37    reconnect::Event<ExchangeId, MarketEvent<InstrumentKey, Kind>>;
38
39/// Initialises a [`reconnecting`](`ReconnectingStream`) [`MarketStream`] using a collection of
40/// [`Subscription`]s.
41///
42/// The provided [`ReconnectionBackoffPolicy`] dictates how the exponential backoff scales
43/// between reconnections.
44///
45/// The `subscriber` is cloned into the reconnect closure, so authenticated subscribers
46/// will have their credentials available on reconnection.
47pub async fn init_market_stream<Exchange, Instrument, Kind>(
48    policy: ReconnectionBackoffPolicy,
49    subscriber: Exchange::Subscriber,
50    subscriptions: Vec<Subscription<Exchange, Instrument, Kind>>,
51) -> Result<impl Stream<Item = MarketStreamResult<Instrument::Key, Kind::Event>>, DataError>
52where
53    Exchange: StreamSelector<Instrument, Kind>,
54    Instrument: InstrumentData + Display,
55    Kind: SubscriptionKind + Display,
56    Subscription<Exchange, Instrument, Kind>:
57        Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
58{
59    // Determine ExchangeId associated with these Subscriptions
60    let exchange = Exchange::ID;
61
62    // Determine StreamKey for use in logging
63    let stream_key = subscriptions
64        .first()
65        .map(|sub| StreamKey::new("market_stream", exchange, Some(sub.kind.as_str())))
66        .ok_or(DataError::SubscriptionsEmpty)?;
67
68    info!(
69        %exchange,
70        subscriptions = %display_subscriptions_without_exchange(&subscriptions),
71        ?policy,
72        ?stream_key,
73        "MarketStream with auto reconnect initialising"
74    );
75
76    Ok(
77        init_reconnecting_stream(move || {
78            let subscriber = subscriber.clone();
79            let subscriptions = subscriptions.clone();
80            async move {
81                Exchange::Stream::init::<Exchange::SnapFetcher>(&subscriber, &subscriptions).await
82            }
83        })
84        .await?
85        .with_reconnect_backoff(policy, stream_key)
86        .with_termination_on_error(|error| error.is_terminal(), stream_key)
87        .with_reconnection_events(exchange),
88    )
89}
90
91#[derive(
92    Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
93)]
94pub struct StreamKey<Kind = &'static str> {
95    pub stream: &'static str,
96    pub exchange: ExchangeId,
97    pub kind: Option<Kind>,
98}
99
100impl StreamKey {
101    pub fn new_general(stream: &'static str, exchange: ExchangeId) -> Self {
102        Self::new(stream, exchange, None)
103    }
104}
105
106impl std::fmt::Debug for StreamKey {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        match self.kind {
109            None => write!(f, "{}-{}", self.stream, self.exchange),
110            Some(kind) => write!(f, "{}-{}-{}", self.stream, self.exchange, kind),
111        }
112    }
113}