nominal_streaming/
lib.rs

1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5
6/// This includes the most common types in this crate, re-exported for your convenience.
7pub mod prelude {
8    pub use conjure_object::BearerToken;
9    pub use conjure_object::ResourceIdentifier;
10    pub use nominal_api::tonic::google::protobuf::Timestamp;
11    pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
12    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
13    pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
14    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
15    pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
16    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
17    pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
18
19    pub use crate::client::PRODUCTION_STREAMING_CLIENT;
20    pub use crate::client::STAGING_STREAMING_CLIENT;
21    pub use crate::consumer::NominalCoreConsumer;
22    pub use crate::stream::ChannelDescriptor;
23    pub use crate::stream::NominalDatasourceStream;
24    pub use crate::stream::NominalStreamOpts;
25}
26
27#[cfg(test)]
28mod tests {
29    use std::sync::Mutex;
30    use std::time::UNIX_EPOCH;
31
32    use crate::consumer::ConsumerResult;
33    use crate::consumer::WriteRequestConsumer;
34    use crate::prelude::*;
35
36    #[derive(Debug)]
37    struct TestDatasourceStream {
38        requests: Mutex<Vec<WriteRequestNominal>>,
39    }
40
41    impl WriteRequestConsumer for &TestDatasourceStream {
42        fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
43            self.requests.lock().unwrap().push(request.clone());
44            Ok(())
45        }
46    }
47
48    #[test]
49    fn test_stream() {
50        let test_consumer = Box::new(TestDatasourceStream {
51            requests: Mutex::new(vec![]),
52        });
53        let test_consumer = Box::leak(test_consumer);
54        let stream = NominalDatasourceStream::new_with_consumer(
55            &*test_consumer,
56            NominalStreamOpts {
57                max_points_per_record: 1000,
58                max_request_delay: Default::default(),
59                max_buffered_requests: 2,
60                request_dispatcher_tasks: 4,
61            },
62        );
63
64        for batch in 0..5 {
65            let mut points = Vec::new();
66            for i in 0..1000 {
67                let start_time = UNIX_EPOCH.elapsed().unwrap();
68                points.push(DoublePoint {
69                    timestamp: Some(Timestamp {
70                        seconds: start_time.as_secs() as i64,
71                        nanos: start_time.subsec_nanos() as i32 + i,
72                    }),
73                    value: (i % 50) as f64,
74                });
75            }
76
77            stream.enqueue(
78                &ChannelDescriptor::new("channel_1", [("batch_id", batch.to_string())]),
79                points,
80            );
81        }
82
83        drop(stream); // wait for points to flush
84
85        let requests = test_consumer.requests.lock().unwrap();
86
87        assert_eq!(requests.len(), 5);
88        let series = requests.first().unwrap().series.first().unwrap();
89        if let Some(PointsType::DoublePoints(points)) =
90            series.points.as_ref().unwrap().points_type.as_ref()
91        {
92            assert_eq!(points.points.len(), 1000);
93        } else {
94            panic!("unexpected data type");
95        }
96    }
97}