nominal_streaming/
lib.rs

1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5mod types;
6pub mod upload;
7
8/// This includes the most common types in this crate, re-exported for your convenience.
9pub mod prelude {
10    pub use conjure_object::BearerToken;
11    pub use conjure_object::ResourceIdentifier;
12    pub use nominal_api::tonic::google::protobuf::Timestamp;
13    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
14    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
15    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
16    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
17    pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
18    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
19    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
20    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
21    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
22
23    pub use crate::consumer::NominalCoreConsumer;
24    pub use crate::stream::NominalDatasetStream;
25    pub use crate::stream::NominalDatasourceStream;
26    pub use crate::stream::NominalStreamOpts;
27    pub use crate::types::AuthProvider;
28    pub use crate::types::ChannelDescriptor;
29}
30
31#[cfg(test)]
32mod tests {
33    use std::collections::HashMap;
34    use std::sync::Arc;
35    use std::sync::Mutex;
36    use std::thread;
37    use std::time::Duration;
38    use std::time::UNIX_EPOCH;
39
40    use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
41
42    use crate::client::PRODUCTION_API_URL;
43    use crate::consumer::ConsumerResult;
44    use crate::consumer::WriteRequestConsumer;
45    use crate::prelude::*;
46    use crate::types::IntoTimestamp;
47
48    #[derive(Debug)]
49    struct TestDatasourceStream {
50        requests: Mutex<Vec<WriteRequestNominal>>,
51    }
52
53    impl WriteRequestConsumer for Arc<TestDatasourceStream> {
54        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
55            self.requests.lock().unwrap().push(request.clone());
56            Ok(())
57        }
58    }
59
60    fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
61        let test_consumer = Arc::new(TestDatasourceStream {
62            requests: Mutex::new(vec![]),
63        });
64        let stream = NominalDatasetStream::new_with_consumer(
65            test_consumer.clone(),
66            NominalStreamOpts {
67                max_points_per_record: 1000,
68                max_request_delay: Duration::from_millis(100),
69                max_buffered_requests: 2,
70                request_dispatcher_tasks: 4,
71                base_api_url: PRODUCTION_API_URL.to_string(),
72            },
73        );
74
75        (test_consumer, stream)
76    }
77
78    #[test]
79    fn test_stream() {
80        let (test_consumer, stream) = create_test_stream();
81
82        for batch in 0..5 {
83            let mut points = Vec::new();
84            for i in 0..1000 {
85                let start_time = UNIX_EPOCH.elapsed().unwrap();
86                points.push(DoublePoint {
87                    timestamp: Some(Timestamp {
88                        seconds: start_time.as_secs() as i64,
89                        nanos: start_time.subsec_nanos() as i32 + i,
90                    }),
91                    value: (i % 50) as f64,
92                });
93            }
94
95            stream.enqueue(
96                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
97                points,
98            );
99        }
100
101        drop(stream); // wait for points to flush
102
103        let requests = test_consumer.requests.lock().unwrap();
104
105        // validate that the requests were flushed based on the max_records value, not the
106        // max request delay
107        assert_eq!(requests.len(), 5);
108        let series = requests.first().unwrap().series.first().unwrap();
109        if let Some(PointsType::DoublePoints(points)) =
110            series.points.as_ref().unwrap().points_type.as_ref()
111        {
112            assert_eq!(points.points.len(), 1000);
113        } else {
114            panic!("unexpected data type");
115        }
116    }
117
118    #[test]
119    fn test_stream_types() {
120        let (test_consumer, stream) = create_test_stream();
121
122        for batch in 0..5 {
123            let mut doubles = Vec::new();
124            let mut strings = Vec::new();
125            let mut ints = Vec::new();
126            for i in 0..1000 {
127                let start_time = UNIX_EPOCH.elapsed().unwrap();
128                doubles.push(DoublePoint {
129                    timestamp: Some(start_time.into_timestamp()),
130                    value: (i % 50) as f64,
131                });
132                strings.push(StringPoint {
133                    timestamp: Some(start_time.into_timestamp()),
134                    value: format!("{}", i % 50),
135                });
136                ints.push(IntegerPoint {
137                    timestamp: Some(start_time.into_timestamp()),
138                    value: i % 50,
139                })
140            }
141
142            stream.enqueue(
143                &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
144                doubles,
145            );
146            stream.enqueue(
147                &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
148                strings,
149            );
150            stream.enqueue(
151                &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
152                ints,
153            );
154        }
155
156        drop(stream); // wait for points to flush
157
158        let requests = test_consumer.requests.lock().unwrap();
159
160        // validate that the requests were flushed based on the max_records value, not the
161        // max request delay
162        assert_eq!(requests.len(), 15);
163
164        let r = requests
165            .iter()
166            .flat_map(|r| r.series.clone())
167            .map(|s| {
168                (
169                    s.channel.unwrap().name,
170                    s.points.unwrap().points_type.unwrap(),
171                )
172            })
173            .collect::<HashMap<_, _>>();
174        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
175            panic!("invalid double points type");
176        };
177
178        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
179            panic!("invalid int points type");
180        };
181
182        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
183            panic!("invalid string points type");
184        };
185
186        // collect() overwrites into a single request
187        assert_eq!(dp.points.len(), 1000);
188        assert_eq!(sp.points.len(), 1000);
189        assert_eq!(ip.points.len(), 1000);
190    }
191
192    #[test_log::test]
193    fn test_writer() {
194        let (test_consumer, stream) = create_test_stream();
195
196        let cd = ChannelDescriptor::new("channel_1");
197        let mut writer = stream.double_writer(&cd);
198
199        for i in 0..5000 {
200            let start_time = UNIX_EPOCH.elapsed().unwrap();
201            let value = i % 50;
202            writer.push(start_time, value as f64);
203        }
204
205        drop(writer); // flush points to stream
206        drop(stream); // flush stream to nominal
207
208        let requests = test_consumer.requests.lock().unwrap();
209
210        assert_eq!(requests.len(), 5);
211        let series = requests.first().unwrap().series.first().unwrap();
212        if let Some(PointsType::DoublePoints(points)) =
213            series.points.as_ref().unwrap().points_type.as_ref()
214        {
215            assert_eq!(points.points.len(), 1000);
216        } else {
217            panic!("unexpected data type");
218        }
219    }
220
221    #[test_log::test]
222    fn test_time_flush() {
223        let (test_consumer, stream) = create_test_stream();
224
225        let cd = ChannelDescriptor::new("channel_1");
226        let mut writer = stream.double_writer(&cd);
227
228        writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
229        thread::sleep(Duration::from_millis(101));
230        writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); // first flush
231        thread::sleep(Duration::from_millis(101));
232        writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); // second flush
233
234        drop(writer);
235        drop(stream);
236
237        let requests = test_consumer.requests.lock().unwrap();
238        dbg!(&requests);
239        assert_eq!(requests.len(), 2);
240    }
241
242    #[test_log::test]
243    fn test_writer_types() {
244        let (test_consumer, stream) = create_test_stream();
245
246        let cd1 = ChannelDescriptor::new("double");
247        let cd2 = ChannelDescriptor::new("string");
248        let cd3 = ChannelDescriptor::new("int");
249        let mut double_writer = stream.double_writer(&cd1);
250        let mut string_writer = stream.string_writer(&cd2);
251        let mut integer_writer = stream.integer_writer(&cd3);
252
253        for i in 0..5000 {
254            let start_time = UNIX_EPOCH.elapsed().unwrap();
255            let value = i % 50;
256            double_writer.push(start_time, value as f64);
257            string_writer.push(start_time, format!("{}", value));
258            integer_writer.push(start_time, value);
259        }
260
261        drop(double_writer);
262        drop(string_writer);
263        drop(integer_writer);
264        drop(stream);
265
266        let requests = test_consumer.requests.lock().unwrap();
267
268        assert_eq!(requests.len(), 15);
269
270        let r = requests
271            .iter()
272            .flat_map(|r| r.series.clone())
273            .map(|s| {
274                (
275                    s.channel.unwrap().name,
276                    s.points.unwrap().points_type.unwrap(),
277                )
278            })
279            .collect::<HashMap<_, _>>();
280
281        let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
282            panic!("invalid double points type");
283        };
284
285        let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
286            panic!("invalid int points type");
287        };
288
289        let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
290            panic!("invalid string points type");
291        };
292
293        // collect() overwrites into a single request
294        assert_eq!(dp.points.len(), 1000);
295        assert_eq!(sp.points.len(), 1000);
296        assert_eq!(ip.points.len(), 1000);
297    }
298}