nominal_streaming/
lib.rs

1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5mod types;
6pub mod upload;
7
8/// This includes the most common types in this crate, re-exported for your convenience.
9pub mod prelude {
10    pub use conjure_object::BearerToken;
11    pub use conjure_object::ResourceIdentifier;
12    pub use nominal_api::tonic::google::protobuf::Timestamp;
13    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
14    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
15    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
16    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
17    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
18    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
19    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
20    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
21    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
22
23    pub use crate::consumer::NominalCoreConsumer;
24    pub use crate::stream::NominalDatasetStream;
25    pub use crate::stream::NominalDatasourceStream;
26    pub use crate::stream::NominalStreamOpts;
27    pub use crate::types::ChannelDescriptor;
28}
29
30#[cfg(test)]
31mod tests {
32    use std::collections::HashMap;
33    use std::sync::Arc;
34    use std::sync::Mutex;
35    use std::thread;
36    use std::time::Duration;
37    use std::time::UNIX_EPOCH;
38
39    use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
40
41    use crate::client::PRODUCTION_API_URL;
42    use crate::consumer::ConsumerResult;
43    use crate::consumer::WriteRequestConsumer;
44    use crate::prelude::*;
45    use crate::types::IntoTimestamp;
46
47    #[derive(Debug)]
48    struct TestDatasourceStream {
49        requests: Mutex<Vec<WriteRequestNominal>>,
50    }
51
52    impl WriteRequestConsumer for Arc<TestDatasourceStream> {
53        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
54            self.requests.lock().unwrap().push(request.clone());
55            Ok(())
56        }
57    }
58
59    fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
60        let test_consumer = Arc::new(TestDatasourceStream {
61            requests: Mutex::new(vec![]),
62        });
63        let stream = NominalDatasetStream::new_with_consumer(
64            test_consumer.clone(),
65            NominalStreamOpts {
66                max_points_per_record: 1000,
67                max_request_delay: Duration::from_millis(100),
68                max_buffered_requests: 2,
69                request_dispatcher_tasks: 4,
70                base_api_url: PRODUCTION_API_URL.to_string(),
71            },
72        );
73
74        (test_consumer, stream)
75    }
76
77    #[test]
78    fn test_stream() {
79        let (test_consumer, stream) = create_test_stream();
80
81        for batch in 0..5 {
82            let mut points = Vec::new();
83            for i in 0..1000 {
84                let start_time = UNIX_EPOCH.elapsed().unwrap();
85                points.push(DoublePoint {
86                    timestamp: Some(Timestamp {
87                        seconds: start_time.as_secs() as i64,
88                        nanos: start_time.subsec_nanos() as i32 + i,
89                    }),
90                    value: (i % 50) as f64,
91                });
92            }
93
94            stream.enqueue(
95                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
96                points,
97            );
98        }
99
100        drop(stream); // wait for points to flush
101
102        let requests = test_consumer.requests.lock().unwrap();
103
104        // validate that the requests were flushed based on the max_records value, not the
105        // max request delay
106        assert_eq!(requests.len(), 5);
107        let series = requests.first().unwrap().series.first().unwrap();
108        if let Some(PointsType::DoublePoints(points)) =
109            series.points.as_ref().unwrap().points_type.as_ref()
110        {
111            assert_eq!(points.points.len(), 1000);
112        } else {
113            panic!("unexpected data type");
114        }
115    }
116
117    #[test]
118    fn test_stream_types() {
119        let (test_consumer, stream) = create_test_stream();
120
121        for batch in 0..5 {
122            let mut doubles = Vec::new();
123            let mut strings = Vec::new();
124            let mut ints = Vec::new();
125            for i in 0..1000 {
126                let start_time = UNIX_EPOCH.elapsed().unwrap();
127                doubles.push(DoublePoint {
128                    timestamp: Some(start_time.into_timestamp()),
129                    value: (i % 50) as f64,
130                });
131                strings.push(StringPoint {
132                    timestamp: Some(start_time.into_timestamp()),
133                    value: format!("{}", i % 50),
134                });
135                ints.push(IntegerPoint {
136                    timestamp: Some(start_time.into_timestamp()),
137                    value: i % 50,
138                })
139            }
140
141            stream.enqueue(
142                &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
143                doubles,
144            );
145            stream.enqueue(
146                &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
147                strings,
148            );
149            stream.enqueue(
150                &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
151                ints,
152            );
153        }
154
155        drop(stream); // wait for points to flush
156
157        let requests = test_consumer.requests.lock().unwrap();
158
159        // validate that the requests were flushed based on the max_records value, not the
160        // max request delay
161        assert_eq!(requests.len(), 15);
162
163        let r = requests
164            .iter()
165            .flat_map(|r| r.series.clone())
166            .map(|s| {
167                (
168                    s.channel.unwrap().name,
169                    s.points.unwrap().points_type.unwrap(),
170                )
171            })
172            .collect::<HashMap<_, _>>();
173        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
174            panic!("invalid double points type");
175        };
176
177        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
178            panic!("invalid int points type");
179        };
180
181        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
182            panic!("invalid string points type");
183        };
184
185        // collect() overwrites into a single request
186        assert_eq!(dp.points.len(), 1000);
187        assert_eq!(sp.points.len(), 1000);
188        assert_eq!(ip.points.len(), 1000);
189    }
190
191    #[test_log::test]
192    fn test_writer() {
193        let (test_consumer, stream) = create_test_stream();
194
195        let cd = ChannelDescriptor::new("channel_1");
196        let mut writer = stream.double_writer(&cd);
197
198        for i in 0..5000 {
199            let start_time = UNIX_EPOCH.elapsed().unwrap();
200            let value = i % 50;
201            writer.push(start_time, value as f64);
202        }
203
204        drop(writer); // flush points to stream
205        drop(stream); // flush stream to nominal
206
207        let requests = test_consumer.requests.lock().unwrap();
208
209        assert_eq!(requests.len(), 5);
210        let series = requests.first().unwrap().series.first().unwrap();
211        if let Some(PointsType::DoublePoints(points)) =
212            series.points.as_ref().unwrap().points_type.as_ref()
213        {
214            assert_eq!(points.points.len(), 1000);
215        } else {
216            panic!("unexpected data type");
217        }
218    }
219
220    #[test_log::test]
221    fn test_time_flush() {
222        let (test_consumer, stream) = create_test_stream();
223
224        let cd = ChannelDescriptor::new("channel_1");
225        let mut writer = stream.double_writer(&cd);
226
227        writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
228        thread::sleep(Duration::from_millis(101));
229        writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); // first flush
230        thread::sleep(Duration::from_millis(101));
231        writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); // second flush
232
233        drop(writer);
234        drop(stream);
235
236        let requests = test_consumer.requests.lock().unwrap();
237        dbg!(&requests);
238        assert_eq!(requests.len(), 2);
239    }
240
241    #[test_log::test]
242    fn test_writer_types() {
243        let (test_consumer, stream) = create_test_stream();
244
245        let cd1 = ChannelDescriptor::new("double");
246        let cd2 = ChannelDescriptor::new("string");
247        let cd3 = ChannelDescriptor::new("int");
248        let mut double_writer = stream.double_writer(&cd1);
249        let mut string_writer = stream.string_writer(&cd2);
250        let mut integer_writer = stream.integer_writer(&cd3);
251
252        for i in 0..5000 {
253            let start_time = UNIX_EPOCH.elapsed().unwrap();
254            let value = i % 50;
255            double_writer.push(start_time, value as f64);
256            string_writer.push(start_time, format!("{}", value));
257            integer_writer.push(start_time, value);
258        }
259
260        drop(double_writer);
261        drop(string_writer);
262        drop(integer_writer);
263        drop(stream);
264
265        let requests = test_consumer.requests.lock().unwrap();
266
267        assert_eq!(requests.len(), 15);
268
269        let r = requests
270            .iter()
271            .flat_map(|r| r.series.clone())
272            .map(|s| {
273                (
274                    s.channel.unwrap().name,
275                    s.points.unwrap().points_type.unwrap(),
276                )
277            })
278            .collect::<HashMap<_, _>>();
279
280        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
281            panic!("invalid double points type");
282        };
283
284        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
285            panic!("invalid int points type");
286        };
287
288        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
289            panic!("invalid string points type");
290        };
291
292        // collect() overwrites into a single request
293        assert_eq!(dp.points.len(), 1000);
294        assert_eq!(sp.points.len(), 1000);
295        assert_eq!(ip.points.len(), 1000);
296    }
297}