Skip to main content

rustrade_integration/stream/
data.rs

1use chrono::{DateTime, Utc};
2use futures::Stream;
3use serde::{Deserialize, Serialize};
4
5/// Generic `DataStream`.
6///
7/// Defines how to initialise the `DataStream`, and what the stream contains.
8pub trait DataStream<Args> {
9    /// Stream::Item type yielded by the stream.
10    type Item;
11
12    /// Connection error type if initialisation fails.
13    type Error;
14
15    /// Initialise the `DataStream`.
16    fn init(
17        args: Args,
18    ) -> impl Future<Output = Result<impl Stream<Item = Self::Item> + Send, Self::Error>> + Send;
19}
20
21/// Configuration arguments for initialising a data stream.
22///
23/// This struct encapsulates all parameters required to initialise a data stream, including
24/// the streaming mode (live or historical), subscriptions, server configuration, and timeout settings.
25#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
26pub struct DataArgs<Mode, Subs, Config> {
27    /// The streaming mode (eg/ [`Live`] or [`Historical`]).
28    pub mode: Mode,
29
30    /// Subscriptions defining what data to stream.
31    pub subscriptions: Subs,
32
33    /// Configuration required for the `DataStream` source (eg/ credentials, urls, timeouts, etc.).
34    pub config: Config,
35}
36
37impl<Subs, Config> DataArgs<Live, Subs, Config> {
38    /// Construct [`DataArgs`] for a live data stream.
39    ///
40    /// # Arguments
41    /// * `subscriptions` - The subscriptions defining what data to stream
42    /// * `config` - Server-specific configuration for the stream connection
43    pub fn live(subscriptions: Subs, config: Config) -> Self {
44        Self {
45            mode: Live,
46            subscriptions,
47            config,
48        }
49    }
50}
51
52impl<Subs, Config> DataArgs<Historical, Subs, Config> {
53    /// Construct [`DataArgs`] for a historical data stream.
54    ///
55    /// # Arguments
56    /// * `historical` - Time range specification for the historical data
57    /// * `subscriptions` - The subscriptions defining what data to stream
58    /// * `config` - Server-specific configuration for the stream connection
59    pub fn historical(historical: Historical, subscriptions: Subs, config: Config) -> Self {
60        Self {
61            mode: historical,
62            subscriptions,
63            config,
64        }
65    }
66}
67
68/// Live [`DataStream`] kind.
69///
70/// Live `DataStream`s are real-time.
71#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
72pub struct Live;
73
74/// Historical [`DataStream`] kind.
75///
76/// Historical `DataStream`s replay past events within a specified time range.
77#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
78pub struct Historical {
79    /// Start timestamp of the historical data range.
80    pub start: DateTime<Utc>,
81
82    /// Optional end timestamp of the historical data range.
83    ///
84    /// If `None`, the stream continues until the present or until all available historical
85    /// data is consumed.
86    pub end: Option<DateTime<Utc>>,
87}
88
89#[cfg(test)]
90#[allow(clippy::unwrap_used)] // Test code: panics on bad input are acceptable
91mod tests {
92    use super::*;
93
94    #[test]
95    fn test_data_args_live() {
96        let args = DataArgs::live(vec!["sub1", "sub2"], "config");
97        assert_eq!(args.mode, Live);
98        assert_eq!(args.subscriptions, vec!["sub1", "sub2"]);
99        assert_eq!(args.config, "config");
100    }
101
102    #[test]
103    fn test_data_args_historical() {
104        let start = Utc::now();
105        let historical = Historical { start, end: None };
106        let args = DataArgs::historical(historical, vec!["sub"], "cfg");
107        assert_eq!(args.mode.start, start);
108        assert!(args.mode.end.is_none());
109        assert_eq!(args.subscriptions, vec!["sub"]);
110    }
111}