rustrade_data/streams/
consumer.rs1use crate::{
2 Identifier, MarketStream,
3 error::DataError,
4 event::MarketEvent,
5 exchange::StreamSelector,
6 instrument::InstrumentData,
7 streams::{
8 reconnect,
9 reconnect::stream::{
10 ReconnectingStream, ReconnectionBackoffPolicy, init_reconnecting_stream,
11 },
12 },
13 subscription::{Subscription, SubscriptionKind, display_subscriptions_without_exchange},
14};
15use derive_more::Constructor;
16use futures::Stream;
17use rustrade_instrument::exchange::ExchangeId;
18use serde::{Deserialize, Serialize};
19use std::fmt::Display;
20use tracing::info;
21
22pub const STREAM_RECONNECTION_POLICY: ReconnectionBackoffPolicy = ReconnectionBackoffPolicy {
24 backoff_ms_initial: 125,
25 backoff_multiplier: 2,
26 backoff_ms_max: 60000,
27};
28
29pub type MarketStreamResult<InstrumentKey, Kind> =
32 reconnect::Event<ExchangeId, Result<MarketEvent<InstrumentKey, Kind>, DataError>>;
33
34pub type MarketStreamEvent<InstrumentKey, Kind> =
37 reconnect::Event<ExchangeId, MarketEvent<InstrumentKey, Kind>>;
38
39pub async fn init_market_stream<Exchange, Instrument, Kind>(
48 policy: ReconnectionBackoffPolicy,
49 subscriber: Exchange::Subscriber,
50 subscriptions: Vec<Subscription<Exchange, Instrument, Kind>>,
51) -> Result<impl Stream<Item = MarketStreamResult<Instrument::Key, Kind::Event>>, DataError>
52where
53 Exchange: StreamSelector<Instrument, Kind>,
54 Instrument: InstrumentData + Display,
55 Kind: SubscriptionKind + Display,
56 Subscription<Exchange, Instrument, Kind>:
57 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
58{
59 let exchange = Exchange::ID;
61
62 let stream_key = subscriptions
64 .first()
65 .map(|sub| StreamKey::new("market_stream", exchange, Some(sub.kind.as_str())))
66 .ok_or(DataError::SubscriptionsEmpty)?;
67
68 info!(
69 %exchange,
70 subscriptions = %display_subscriptions_without_exchange(&subscriptions),
71 ?policy,
72 ?stream_key,
73 "MarketStream with auto reconnect initialising"
74 );
75
76 Ok(
77 init_reconnecting_stream(move || {
78 let subscriber = subscriber.clone();
79 let subscriptions = subscriptions.clone();
80 async move {
81 Exchange::Stream::init::<Exchange::SnapFetcher>(&subscriber, &subscriptions).await
82 }
83 })
84 .await?
85 .with_reconnect_backoff(policy, stream_key)
86 .with_termination_on_error(|error| error.is_terminal(), stream_key)
87 .with_reconnection_events(exchange),
88 )
89}
90
91#[derive(
92 Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Constructor,
93)]
94pub struct StreamKey<Kind = &'static str> {
95 pub stream: &'static str,
96 pub exchange: ExchangeId,
97 pub kind: Option<Kind>,
98}
99
100impl StreamKey {
101 pub fn new_general(stream: &'static str, exchange: ExchangeId) -> Self {
102 Self::new(stream, exchange, None)
103 }
104}
105
106impl std::fmt::Debug for StreamKey {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 match self.kind {
109 None => write!(f, "{}-{}", self.stream, self.exchange),
110 Some(kind) => write!(f, "{}-{}-{}", self.stream, self.exchange, kind),
111 }
112 }
113}