1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5
6pub mod prelude {
8 pub use conjure_object::BearerToken;
9 pub use conjure_object::ResourceIdentifier;
10 pub use nominal_api::tonic::google::protobuf::Timestamp;
11 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
12 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
13 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
14 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
15 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
16 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
17 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
18
19 pub use crate::client::PRODUCTION_STREAMING_CLIENT;
20 pub use crate::client::STAGING_STREAMING_CLIENT;
21 pub use crate::consumer::NominalCoreConsumer;
22 pub use crate::stream::ChannelDescriptor;
23 pub use crate::stream::NominalDatasourceStream;
24 pub use crate::stream::NominalStreamOpts;
25}
26
27#[cfg(test)]
28mod tests {
29 use std::sync::Mutex;
30 use std::time::UNIX_EPOCH;
31
32 use crate::consumer::ConsumerResult;
33 use crate::consumer::WriteRequestConsumer;
34 use crate::prelude::*;
35
36 #[derive(Debug)]
37 struct TestDatasourceStream {
38 requests: Mutex<Vec<WriteRequestNominal>>,
39 }
40
41 impl WriteRequestConsumer for &TestDatasourceStream {
42 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
43 self.requests.lock().unwrap().push(request.clone());
44 Ok(())
45 }
46 }
47
48 #[test]
49 fn test_stream() {
50 let test_consumer = Box::new(TestDatasourceStream {
51 requests: Mutex::new(vec![]),
52 });
53 let test_consumer = Box::leak(test_consumer);
54 let stream = NominalDatasourceStream::new_with_consumer(
55 &*test_consumer,
56 NominalStreamOpts {
57 max_points_per_record: 1000,
58 max_request_delay: Default::default(),
59 max_buffered_requests: 2,
60 request_dispatcher_tasks: 4,
61 },
62 );
63
64 for batch in 0..5 {
65 let mut points = Vec::new();
66 for i in 0..1000 {
67 let start_time = UNIX_EPOCH.elapsed().unwrap();
68 points.push(DoublePoint {
69 timestamp: Some(Timestamp {
70 seconds: start_time.as_secs() as i64,
71 nanos: start_time.subsec_nanos() as i32 + i,
72 }),
73 value: (i % 50) as f64,
74 });
75 }
76
77 stream.enqueue(
78 &ChannelDescriptor::new("channel_1", [("batch_id", batch.to_string())]),
79 points,
80 );
81 }
82
83 drop(stream); let requests = test_consumer.requests.lock().unwrap();
86
87 assert_eq!(requests.len(), 5);
88 let series = requests.first().unwrap().series.first().unwrap();
89 if let Some(PointsType::DoublePoints(points)) =
90 series.points.as_ref().unwrap().points_type.as_ref()
91 {
92 assert_eq!(points.points.len(), 1000);
93 } else {
94 panic!("unexpected data type");
95 }
96 }
97}