barter_data/streams/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
use self::builder::{multi::MultiStreamBuilder, StreamBuilder};
use crate::{exchange::ExchangeId, subscription::SubscriptionKind};
use fnv::FnvHashMap;
use futures::Stream;
use futures_util::stream::select_all;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

/// Defines the [`StreamBuilder`] and [`MultiStreamBuilder`] APIs for ergonomically initialising
/// [`MarketStream`](super::MarketStream) [`Streams`].
pub mod builder;

/// Central consumer loop functionality used by the [`StreamBuilder`] to
/// drive a re-connecting [`MarketStream`](super::MarketStream).
pub mod consumer;

/// Defines a [`ReconnectingStream`] and associated logic for generating an auto reconnecting
/// `Stream`.
pub mod reconnect;

/// Ergonomic collection of exchange [`MarketEvent<T>`](crate::event::MarketEvent) receivers.
#[derive(Debug)]
pub struct Streams<T> {
    pub streams: FnvHashMap<ExchangeId, mpsc::UnboundedReceiver<T>>,
}

impl<T> Streams<T> {
    /// Construct a [`StreamBuilder`] for configuring new
    /// [`MarketEvent<SubscriptionKind::Event>`](crate::event::MarketEvent) [`Streams`].
    pub fn builder<InstrumentKey, Kind>() -> StreamBuilder<InstrumentKey, Kind>
    where
        Kind: SubscriptionKind,
    {
        StreamBuilder::<InstrumentKey, Kind>::new()
    }

    /// Construct a [`MultiStreamBuilder`] for configuring new
    /// [`MarketEvent<T>`](crate::event::MarketEvent) [`Streams`].
    pub fn builder_multi() -> MultiStreamBuilder<T> {
        MultiStreamBuilder::<T>::new()
    }

    /// Remove an exchange [`mpsc::UnboundedReceiver`] from the [`Streams`] `HashMap`.
    pub fn select(&mut self, exchange: ExchangeId) -> Option<impl Stream<Item = T>> {
        self.streams
            .remove(&exchange)
            .map(UnboundedReceiverStream::new)
    }

    /// Select and merge every exchange `Stream` using [`select_all`].
    pub fn select_all(self) -> impl Stream<Item = T> {
        let all = self.streams.into_values().map(UnboundedReceiverStream::new);

        select_all(all)
    }
}