nominal_streaming/
lib.rs

1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5
6/// This includes the most common types in this crate, re-exported for your convenience.
7pub mod prelude {
8    pub use conjure_object::BearerToken;
9    pub use conjure_object::ResourceIdentifier;
10    pub use nominal_api::tonic::google::protobuf::Timestamp;
11    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
12    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
13    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
14    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
15    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
16    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
17    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
18
19    pub use crate::client::PRODUCTION_STREAMING_CLIENT;
20    pub use crate::client::STAGING_STREAMING_CLIENT;
21    pub use crate::consumer::NominalCoreConsumer;
22    pub use crate::stream::ChannelDescriptor;
23    pub use crate::stream::NominalDatasetStream;
24    pub use crate::stream::NominalDatasourceStream;
25    pub use crate::stream::NominalStreamOpts;
26}
27
28#[cfg(test)]
29mod tests {
30    use std::sync::Mutex;
31    use std::time::UNIX_EPOCH;
32
33    use crate::consumer::ConsumerResult;
34    use crate::consumer::WriteRequestConsumer;
35    use crate::prelude::*;
36
37    #[derive(Debug)]
38    struct TestDatasourceStream {
39        requests: Mutex<Vec<WriteRequestNominal>>,
40    }
41
42    impl WriteRequestConsumer for &TestDatasourceStream {
43        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
44            self.requests.lock().unwrap().push(request.clone());
45            Ok(())
46        }
47    }
48
49    #[test]
50    fn test_stream() {
51        let test_consumer = Box::new(TestDatasourceStream {
52            requests: Mutex::new(vec![]),
53        });
54        let test_consumer = Box::leak(test_consumer);
55        let stream = NominalDatasetStream::new_with_consumer(
56            &*test_consumer,
57            NominalStreamOpts {
58                max_points_per_record: 1000,
59                max_request_delay: Default::default(),
60                max_buffered_requests: 2,
61                request_dispatcher_tasks: 4,
62            },
63        );
64
65        for batch in 0..5 {
66            let mut points = Vec::new();
67            for i in 0..1000 {
68                let start_time = UNIX_EPOCH.elapsed().unwrap();
69                points.push(DoublePoint {
70                    timestamp: Some(Timestamp {
71                        seconds: start_time.as_secs() as i64,
72                        nanos: start_time.subsec_nanos() as i32 + i,
73                    }),
74                    value: (i % 50) as f64,
75                });
76            }
77
78            stream.enqueue(
79                &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
80                points,
81            );
82        }
83
84        drop(stream); // wait for points to flush
85
86        let requests = test_consumer.requests.lock().unwrap();
87
88        assert_eq!(requests.len(), 5);
89        let series = requests.first().unwrap().series.first().unwrap();
90        if let Some(PointsType::DoublePoints(points)) =
91            series.points.as_ref().unwrap().points_type.as_ref()
92        {
93            assert_eq!(points.points.len(), 1000);
94        } else {
95            panic!("unexpected data type");
96        }
97    }
98}