rustrade_integration/stream/data.rs
1use chrono::{DateTime, Utc};
2use futures::Stream;
3use serde::{Deserialize, Serialize};
4
5/// Generic `DataStream`.
6///
7/// Defines how to initialise the `DataStream`, and what the stream contains.
8pub trait DataStream<Args> {
9 /// Stream::Item type yielded by the stream.
10 type Item;
11
12 /// Connection error type if initialisation fails.
13 type Error;
14
15 /// Initialise the `DataStream`.
16 fn init(
17 args: Args,
18 ) -> impl Future<Output = Result<impl Stream<Item = Self::Item> + Send, Self::Error>> + Send;
19}
20
21/// Configuration arguments for initialising a data stream.
22///
23/// This struct encapsulates all parameters required to initialise a data stream, including
24/// the streaming mode (live or historical), subscriptions, server configuration, and timeout settings.
25#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
26pub struct DataArgs<Mode, Subs, Config> {
27 /// The streaming mode (eg/ [`Live`] or [`Historical`]).
28 pub mode: Mode,
29
30 /// Subscriptions defining what data to stream.
31 pub subscriptions: Subs,
32
33 /// Configuration required for the `DataStream` source (eg/ credentials, urls, timeouts, etc.).
34 pub config: Config,
35}
36
37impl<Subs, Config> DataArgs<Live, Subs, Config> {
38 /// Construct [`DataArgs`] for a live data stream.
39 ///
40 /// # Arguments
41 /// * `subscriptions` - The subscriptions defining what data to stream
42 /// * `config` - Server-specific configuration for the stream connection
43 pub fn live(subscriptions: Subs, config: Config) -> Self {
44 Self {
45 mode: Live,
46 subscriptions,
47 config,
48 }
49 }
50}
51
52impl<Subs, Config> DataArgs<Historical, Subs, Config> {
53 /// Construct [`DataArgs`] for a historical data stream.
54 ///
55 /// # Arguments
56 /// * `historical` - Time range specification for the historical data
57 /// * `subscriptions` - The subscriptions defining what data to stream
58 /// * `config` - Server-specific configuration for the stream connection
59 pub fn historical(historical: Historical, subscriptions: Subs, config: Config) -> Self {
60 Self {
61 mode: historical,
62 subscriptions,
63 config,
64 }
65 }
66}
67
68/// Live [`DataStream`] kind.
69///
70/// Live `DataStream`s are real-time.
71#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
72pub struct Live;
73
74/// Historical [`DataStream`] kind.
75///
76/// Historical `DataStream`s replay past events within a specified time range.
77#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
78pub struct Historical {
79 /// Start timestamp of the historical data range.
80 pub start: DateTime<Utc>,
81
82 /// Optional end timestamp of the historical data range.
83 ///
84 /// If `None`, the stream continues until the present or until all available historical
85 /// data is consumed.
86 pub end: Option<DateTime<Utc>>,
87}
88
89#[cfg(test)]
90#[allow(clippy::unwrap_used)] // Test code: panics on bad input are acceptable
91mod tests {
92 use super::*;
93
94 #[test]
95 fn test_data_args_live() {
96 let args = DataArgs::live(vec!["sub1", "sub2"], "config");
97 assert_eq!(args.mode, Live);
98 assert_eq!(args.subscriptions, vec!["sub1", "sub2"]);
99 assert_eq!(args.config, "config");
100 }
101
102 #[test]
103 fn test_data_args_historical() {
104 let start = Utc::now();
105 let historical = Historical { start, end: None };
106 let args = DataArgs::historical(historical, vec!["sub"], "cfg");
107 assert_eq!(args.mode.start, start);
108 assert!(args.mode.end.is_none());
109 assert_eq!(args.subscriptions, vec!["sub"]);
110 }
111}