nominal_streaming/
lib.rs

1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5mod types;
6
7/// This includes the most common types in this crate, re-exported for your convenience.
8pub mod prelude {
9    pub use conjure_object::BearerToken;
10    pub use conjure_object::ResourceIdentifier;
11    pub use nominal_api::tonic::google::protobuf::Timestamp;
12    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
13    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
14    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
15    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
16    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
17    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
18    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
19
20    pub use crate::client::PRODUCTION_STREAMING_CLIENT;
21    pub use crate::client::STAGING_STREAMING_CLIENT;
22    pub use crate::consumer::NominalCoreConsumer;
23    pub use crate::stream::NominalDatasetStream;
24    pub use crate::stream::NominalDatasourceStream;
25    pub use crate::stream::NominalStreamOpts;
26    pub use crate::types::ChannelDescriptor;
27}
28
29#[cfg(test)]
30mod tests {
31    use std::collections::HashMap;
32    use std::sync::Arc;
33    use std::sync::Mutex;
34    use std::thread;
35    use std::time::Duration;
36    use std::time::UNIX_EPOCH;
37
38    use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
39
40    use crate::consumer::ConsumerResult;
41    use crate::consumer::WriteRequestConsumer;
42    use crate::prelude::*;
43    use crate::types::IntoTimestamp;
44
45    #[derive(Debug)]
46    struct TestDatasourceStream {
47        requests: Mutex<Vec<WriteRequestNominal>>,
48    }
49
50    impl WriteRequestConsumer for Arc<TestDatasourceStream> {
51        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
52            self.requests.lock().unwrap().push(request.clone());
53            Ok(())
54        }
55    }
56
57    fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
58        let test_consumer = Arc::new(TestDatasourceStream {
59            requests: Mutex::new(vec![]),
60        });
61        let stream = NominalDatasetStream::new_with_consumer(
62            test_consumer.clone(),
63            NominalStreamOpts {
64                max_points_per_record: 1000,
65                max_request_delay: Duration::from_millis(100),
66                max_buffered_requests: 2,
67                request_dispatcher_tasks: 4,
68            },
69        );
70
71        (test_consumer, stream)
72    }
73
74    #[test]
75    fn test_stream() {
76        let (test_consumer, stream) = create_test_stream();
77
78        for batch in 0..5 {
79            let mut points = Vec::new();
80            for i in 0..1000 {
81                let start_time = UNIX_EPOCH.elapsed().unwrap();
82                points.push(DoublePoint {
83                    timestamp: Some(Timestamp {
84                        seconds: start_time.as_secs() as i64,
85                        nanos: start_time.subsec_nanos() as i32 + i,
86                    }),
87                    value: (i % 50) as f64,
88                });
89            }
90
91            stream.enqueue(
92                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
93                points,
94            );
95        }
96
97        drop(stream); // wait for points to flush
98
99        let requests = test_consumer.requests.lock().unwrap();
100
101        // validate that the requests were flushed based on the max_records value, not the
102        // max request delay
103        assert_eq!(requests.len(), 5);
104        let series = requests.first().unwrap().series.first().unwrap();
105        if let Some(PointsType::DoublePoints(points)) =
106            series.points.as_ref().unwrap().points_type.as_ref()
107        {
108            assert_eq!(points.points.len(), 1000);
109        } else {
110            panic!("unexpected data type");
111        }
112    }
113
114    #[test]
115    fn test_stream_types() {
116        let (test_consumer, stream) = create_test_stream();
117
118        for batch in 0..5 {
119            let mut doubles = Vec::new();
120            let mut strings = Vec::new();
121            let mut ints = Vec::new();
122            for i in 0..1000 {
123                let start_time = UNIX_EPOCH.elapsed().unwrap();
124                doubles.push(DoublePoint {
125                    timestamp: Some(start_time.into_timestamp()),
126                    value: (i % 50) as f64,
127                });
128                strings.push(StringPoint {
129                    timestamp: Some(start_time.into_timestamp()),
130                    value: format!("{}", i % 50),
131                });
132                ints.push(IntegerPoint {
133                    timestamp: Some(start_time.into_timestamp()),
134                    value: i % 50,
135                })
136            }
137
138            stream.enqueue(
139                &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
140                doubles,
141            );
142            stream.enqueue(
143                &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
144                strings,
145            );
146            stream.enqueue(
147                &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
148                ints,
149            );
150        }
151
152        drop(stream); // wait for points to flush
153
154        let requests = test_consumer.requests.lock().unwrap();
155
156        // validate that the requests were flushed based on the max_records value, not the
157        // max request delay
158        assert_eq!(requests.len(), 15);
159
160        let r = requests
161            .iter()
162            .flat_map(|r| r.series.clone())
163            .map(|s| {
164                (
165                    s.channel.unwrap().name,
166                    s.points.unwrap().points_type.unwrap(),
167                )
168            })
169            .collect::<HashMap<_, _>>();
170        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
171            panic!("invalid double points type");
172        };
173
174        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
175            panic!("invalid int points type");
176        };
177
178        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
179            panic!("invalid string points type");
180        };
181
182        // collect() overwrites into a single request
183        assert_eq!(dp.points.len(), 1000);
184        assert_eq!(sp.points.len(), 1000);
185        assert_eq!(ip.points.len(), 1000);
186    }
187
188    #[test_log::test]
189    fn test_writer() {
190        let (test_consumer, stream) = create_test_stream();
191
192        let cd = ChannelDescriptor::new("channel_1");
193        let mut writer = stream.double_writer(&cd);
194
195        for i in 0..5000 {
196            let start_time = UNIX_EPOCH.elapsed().unwrap();
197            let value = i % 50;
198            writer.push(start_time, value as f64);
199        }
200
201        drop(writer); // flush points to stream
202        drop(stream); // flush stream to nominal
203
204        let requests = test_consumer.requests.lock().unwrap();
205
206        assert_eq!(requests.len(), 5);
207        let series = requests.first().unwrap().series.first().unwrap();
208        if let Some(PointsType::DoublePoints(points)) =
209            series.points.as_ref().unwrap().points_type.as_ref()
210        {
211            assert_eq!(points.points.len(), 1000);
212        } else {
213            panic!("unexpected data type");
214        }
215    }
216
217    #[test_log::test]
218    fn test_time_flush() {
219        let (test_consumer, stream) = create_test_stream();
220
221        let cd = ChannelDescriptor::new("channel_1");
222        let mut writer = stream.double_writer(&cd);
223
224        writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
225        thread::sleep(Duration::from_millis(101));
226        writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); // first flush
227        thread::sleep(Duration::from_millis(101));
228        writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); // second flush
229
230        drop(writer);
231        drop(stream);
232
233        let requests = test_consumer.requests.lock().unwrap();
234        dbg!(&requests);
235        assert_eq!(requests.len(), 2);
236    }
237
238    #[test_log::test]
239    fn test_writer_types() {
240        let (test_consumer, stream) = create_test_stream();
241
242        let cd1 = ChannelDescriptor::new("double");
243        let cd2 = ChannelDescriptor::new("string");
244        let cd3 = ChannelDescriptor::new("int");
245        let mut double_writer = stream.double_writer(&cd1);
246        let mut string_writer = stream.string_writer(&cd2);
247        let mut integer_writer = stream.integer_writer(&cd3);
248
249        for i in 0..5000 {
250            let start_time = UNIX_EPOCH.elapsed().unwrap();
251            let value = i % 50;
252            double_writer.push(start_time, value as f64);
253            string_writer.push(start_time, format!("{}", value));
254            integer_writer.push(start_time, value);
255        }
256
257        drop(double_writer);
258        drop(string_writer);
259        drop(integer_writer);
260        drop(stream);
261
262        let requests = test_consumer.requests.lock().unwrap();
263
264        assert_eq!(requests.len(), 15);
265
266        let r = requests
267            .iter()
268            .flat_map(|r| r.series.clone())
269            .map(|s| {
270                (
271                    s.channel.unwrap().name,
272                    s.points.unwrap().points_type.unwrap(),
273                )
274            })
275            .collect::<HashMap<_, _>>();
276
277        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
278            panic!("invalid double points type");
279        };
280
281        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
282            panic!("invalid int points type");
283        };
284
285        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
286            panic!("invalid string points type");
287        };
288
289        // collect() overwrites into a single request
290        assert_eq!(dp.points.len(), 1000);
291        assert_eq!(sp.points.len(), 1000);
292        assert_eq!(ip.points.len(), 1000);
293    }
294}