nominal_streaming/
lib.rs

1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5
6pub use stream::NominalDatasourceStream;
7
8pub mod api {
9    pub use conjure_object::BearerToken;
10    pub use conjure_object::ResourceIdentifier;
11    pub use nominal_api::tonic::google::protobuf::Timestamp;
12    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
13    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
14    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
15    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
16    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
17    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
18    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
19}
20
21#[cfg(test)]
22mod tests {
23    use std::collections::HashMap;
24    use std::sync::Mutex;
25    use std::time::UNIX_EPOCH;
26
27    use crate::api::*;
28    use crate::consumer::ConsumerResult;
29    use crate::consumer::WriteRequestConsumer;
30    use crate::stream::NominalStreamOpts;
31    use crate::NominalDatasourceStream;
32
33    #[derive(Debug)]
34    struct TestDatasourceStream {
35        requests: Mutex<Vec<WriteRequestNominal>>,
36    }
37
38    impl WriteRequestConsumer for &TestDatasourceStream {
39        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
40            self.requests.lock().unwrap().push(request.clone());
41            Ok(())
42        }
43    }
44
45    #[test]
46    fn test_stream() {
47        let test_consumer = Box::new(TestDatasourceStream {
48            requests: Mutex::new(vec![]),
49        });
50        let test_consumer = Box::leak(test_consumer);
51        let stream = NominalDatasourceStream::new_with_consumer(
52            &*test_consumer,
53            NominalStreamOpts {
54                max_points_per_record: 1000,
55                max_request_delay: Default::default(),
56                max_buffered_requests: 2,
57                request_dispatcher_tasks: 4,
58            },
59        );
60
61        for batch in 0..5 {
62            let mut points = Vec::new();
63            for i in 0..1000 {
64                let start_time = UNIX_EPOCH.elapsed().unwrap();
65                points.push(DoublePoint {
66                    timestamp: Some(Timestamp {
67                        seconds: start_time.as_secs() as i64,
68                        nanos: start_time.subsec_nanos() as i32 + i,
69                    }),
70                    value: (i % 50) as f64,
71                });
72            }
73
74            stream.enqueue(
75                "channel_1".into(),
76                HashMap::from_iter([("batch_id".to_string(), format!("{batch}"))].into_iter()),
77                points,
78            );
79        }
80
81        drop(stream); // wait for points to flush
82
83        let requests = test_consumer.requests.lock().unwrap();
84
85        assert_eq!(requests.len(), 5);
86        let series = requests.first().unwrap().series.first().unwrap();
87        if let Some(PointsType::DoublePoints(points)) =
88            series.points.as_ref().unwrap().points_type.as_ref()
89        {
90            assert_eq!(points.points.len(), 1000);
91        } else {
92            panic!("unexpected data type");
93        }
94    }
95}