1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5
6pub mod prelude {
8 pub use conjure_object::BearerToken;
9 pub use conjure_object::ResourceIdentifier;
10 pub use nominal_api::tonic::google::protobuf::Timestamp;
11 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
12 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
13 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
14 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
15 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
16 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
17 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
18
19 pub use crate::client::PRODUCTION_STREAMING_CLIENT;
20 pub use crate::client::STAGING_STREAMING_CLIENT;
21 pub use crate::consumer::NominalCoreConsumer;
22 pub use crate::stream::ChannelDescriptor;
23 pub use crate::stream::NominalDatasetStream;
24 pub use crate::stream::NominalDatasourceStream;
25 pub use crate::stream::NominalStreamOpts;
26}
27
28#[cfg(test)]
29mod tests {
30 use std::sync::Mutex;
31 use std::time::UNIX_EPOCH;
32
33 use crate::consumer::ConsumerResult;
34 use crate::consumer::WriteRequestConsumer;
35 use crate::prelude::*;
36
37 #[derive(Debug)]
38 struct TestDatasourceStream {
39 requests: Mutex<Vec<WriteRequestNominal>>,
40 }
41
42 impl WriteRequestConsumer for &TestDatasourceStream {
43 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
44 self.requests.lock().unwrap().push(request.clone());
45 Ok(())
46 }
47 }
48
49 #[test]
50 fn test_stream() {
51 let test_consumer = Box::new(TestDatasourceStream {
52 requests: Mutex::new(vec![]),
53 });
54 let test_consumer = Box::leak(test_consumer);
55 let stream = NominalDatasetStream::new_with_consumer(
56 &*test_consumer,
57 NominalStreamOpts {
58 max_points_per_record: 1000,
59 max_request_delay: Default::default(),
60 max_buffered_requests: 2,
61 request_dispatcher_tasks: 4,
62 },
63 );
64
65 for batch in 0..5 {
66 let mut points = Vec::new();
67 for i in 0..1000 {
68 let start_time = UNIX_EPOCH.elapsed().unwrap();
69 points.push(DoublePoint {
70 timestamp: Some(Timestamp {
71 seconds: start_time.as_secs() as i64,
72 nanos: start_time.subsec_nanos() as i32 + i,
73 }),
74 value: (i % 50) as f64,
75 });
76 }
77
78 stream.enqueue(
79 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
80 points,
81 );
82 }
83
84 drop(stream); let requests = test_consumer.requests.lock().unwrap();
87
88 assert_eq!(requests.len(), 5);
89 let series = requests.first().unwrap().series.first().unwrap();
90 if let Some(PointsType::DoublePoints(points)) =
91 series.points.as_ref().unwrap().points_type.as_ref()
92 {
93 assert_eq!(points.points.len(), 1000);
94 } else {
95 panic!("unexpected data type");
96 }
97 }
98}