1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5
6pub use stream::NominalDatasourceStream;
7
8pub mod api {
9 pub use conjure_object::BearerToken;
10 pub use conjure_object::ResourceIdentifier;
11 pub use nominal_api::tonic::google::protobuf::Timestamp;
12 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
13 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
14 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
15 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
16 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
17 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
18 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
19}
20
21#[cfg(test)]
22mod tests {
23 use std::collections::HashMap;
24 use std::sync::Mutex;
25 use std::time::UNIX_EPOCH;
26
27 use crate::api::*;
28 use crate::consumer::ConsumerResult;
29 use crate::consumer::WriteRequestConsumer;
30 use crate::stream::NominalStreamOpts;
31 use crate::NominalDatasourceStream;
32
33 #[derive(Debug)]
34 struct TestDatasourceStream {
35 requests: Mutex<Vec<WriteRequestNominal>>,
36 }
37
38 impl WriteRequestConsumer for &TestDatasourceStream {
39 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
40 self.requests.lock().unwrap().push(request.clone());
41 Ok(())
42 }
43 }
44
45 #[test]
46 fn test_stream() {
47 let test_consumer = Box::new(TestDatasourceStream {
48 requests: Mutex::new(vec![]),
49 });
50 let test_consumer = Box::leak(test_consumer);
51 let stream = NominalDatasourceStream::new_with_consumer(
52 &*test_consumer,
53 NominalStreamOpts {
54 max_points_per_record: 1000,
55 max_request_delay: Default::default(),
56 max_buffered_requests: 2,
57 request_dispatcher_tasks: 4,
58 },
59 );
60
61 for batch in 0..5 {
62 let mut points = Vec::new();
63 for i in 0..1000 {
64 let start_time = UNIX_EPOCH.elapsed().unwrap();
65 points.push(DoublePoint {
66 timestamp: Some(Timestamp {
67 seconds: start_time.as_secs() as i64,
68 nanos: start_time.subsec_nanos() as i32 + i,
69 }),
70 value: (i % 50) as f64,
71 });
72 }
73
74 stream.enqueue(
75 "channel_1".into(),
76 HashMap::from_iter([("batch_id".to_string(), format!("{batch}"))].into_iter()),
77 points,
78 );
79 }
80
81 drop(stream); let requests = test_consumer.requests.lock().unwrap();
84
85 assert_eq!(requests.len(), 5);
86 let series = requests.first().unwrap().series.first().unwrap();
87 if let Some(PointsType::DoublePoints(points)) =
88 series.points.as_ref().unwrap().points_type.as_ref()
89 {
90 assert_eq!(points.points.len(), 1000);
91 } else {
92 panic!("unexpected data type");
93 }
94 }
95}