1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5mod types;
6
7pub mod prelude {
9 pub use conjure_object::BearerToken;
10 pub use conjure_object::ResourceIdentifier;
11 pub use nominal_api::tonic::google::protobuf::Timestamp;
12 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
13 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
14 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
15 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
16 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
17 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
18 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
19
20 pub use crate::client::PRODUCTION_STREAMING_CLIENT;
21 pub use crate::client::STAGING_STREAMING_CLIENT;
22 pub use crate::consumer::NominalCoreConsumer;
23 pub use crate::stream::NominalDatasetStream;
24 pub use crate::stream::NominalDatasourceStream;
25 pub use crate::stream::NominalStreamOpts;
26 pub use crate::types::ChannelDescriptor;
27}
28
29#[cfg(test)]
30mod tests {
31 use std::collections::HashMap;
32 use std::sync::Arc;
33 use std::sync::Mutex;
34 use std::thread;
35 use std::time::Duration;
36 use std::time::UNIX_EPOCH;
37
38 use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
39
40 use crate::consumer::ConsumerResult;
41 use crate::consumer::WriteRequestConsumer;
42 use crate::prelude::*;
43 use crate::types::IntoTimestamp;
44
45 #[derive(Debug)]
46 struct TestDatasourceStream {
47 requests: Mutex<Vec<WriteRequestNominal>>,
48 }
49
50 impl WriteRequestConsumer for Arc<TestDatasourceStream> {
51 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
52 self.requests.lock().unwrap().push(request.clone());
53 Ok(())
54 }
55 }
56
57 fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
58 let test_consumer = Arc::new(TestDatasourceStream {
59 requests: Mutex::new(vec![]),
60 });
61 let stream = NominalDatasetStream::new_with_consumer(
62 test_consumer.clone(),
63 NominalStreamOpts {
64 max_points_per_record: 1000,
65 max_request_delay: Duration::from_millis(100),
66 max_buffered_requests: 2,
67 request_dispatcher_tasks: 4,
68 },
69 );
70
71 (test_consumer, stream)
72 }
73
74 #[test]
75 fn test_stream() {
76 let (test_consumer, stream) = create_test_stream();
77
78 for batch in 0..5 {
79 let mut points = Vec::new();
80 for i in 0..1000 {
81 let start_time = UNIX_EPOCH.elapsed().unwrap();
82 points.push(DoublePoint {
83 timestamp: Some(Timestamp {
84 seconds: start_time.as_secs() as i64,
85 nanos: start_time.subsec_nanos() as i32 + i,
86 }),
87 value: (i % 50) as f64,
88 });
89 }
90
91 stream.enqueue(
92 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
93 points,
94 );
95 }
96
97 drop(stream); let requests = test_consumer.requests.lock().unwrap();
100
101 assert_eq!(requests.len(), 5);
104 let series = requests.first().unwrap().series.first().unwrap();
105 if let Some(PointsType::DoublePoints(points)) =
106 series.points.as_ref().unwrap().points_type.as_ref()
107 {
108 assert_eq!(points.points.len(), 1000);
109 } else {
110 panic!("unexpected data type");
111 }
112 }
113
114 #[test]
115 fn test_stream_types() {
116 let (test_consumer, stream) = create_test_stream();
117
118 for batch in 0..5 {
119 let mut doubles = Vec::new();
120 let mut strings = Vec::new();
121 let mut ints = Vec::new();
122 for i in 0..1000 {
123 let start_time = UNIX_EPOCH.elapsed().unwrap();
124 doubles.push(DoublePoint {
125 timestamp: Some(start_time.into_timestamp()),
126 value: (i % 50) as f64,
127 });
128 strings.push(StringPoint {
129 timestamp: Some(start_time.into_timestamp()),
130 value: format!("{}", i % 50),
131 });
132 ints.push(IntegerPoint {
133 timestamp: Some(start_time.into_timestamp()),
134 value: i % 50,
135 })
136 }
137
138 stream.enqueue(
139 &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
140 doubles,
141 );
142 stream.enqueue(
143 &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
144 strings,
145 );
146 stream.enqueue(
147 &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
148 ints,
149 );
150 }
151
152 drop(stream); let requests = test_consumer.requests.lock().unwrap();
155
156 assert_eq!(requests.len(), 15);
159
160 let r = requests
161 .iter()
162 .flat_map(|r| r.series.clone())
163 .map(|s| {
164 (
165 s.channel.unwrap().name,
166 s.points.unwrap().points_type.unwrap(),
167 )
168 })
169 .collect::<HashMap<_, _>>();
170 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
171 panic!("invalid double points type");
172 };
173
174 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
175 panic!("invalid int points type");
176 };
177
178 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
179 panic!("invalid string points type");
180 };
181
182 assert_eq!(dp.points.len(), 1000);
184 assert_eq!(sp.points.len(), 1000);
185 assert_eq!(ip.points.len(), 1000);
186 }
187
188 #[test_log::test]
189 fn test_writer() {
190 let (test_consumer, stream) = create_test_stream();
191
192 let cd = ChannelDescriptor::new("channel_1");
193 let mut writer = stream.double_writer(&cd);
194
195 for i in 0..5000 {
196 let start_time = UNIX_EPOCH.elapsed().unwrap();
197 let value = i % 50;
198 writer.push(start_time, value as f64);
199 }
200
201 drop(writer); drop(stream); let requests = test_consumer.requests.lock().unwrap();
205
206 assert_eq!(requests.len(), 5);
207 let series = requests.first().unwrap().series.first().unwrap();
208 if let Some(PointsType::DoublePoints(points)) =
209 series.points.as_ref().unwrap().points_type.as_ref()
210 {
211 assert_eq!(points.points.len(), 1000);
212 } else {
213 panic!("unexpected data type");
214 }
215 }
216
217 #[test_log::test]
218 fn test_time_flush() {
219 let (test_consumer, stream) = create_test_stream();
220
221 let cd = ChannelDescriptor::new("channel_1");
222 let mut writer = stream.double_writer(&cd);
223
224 writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
225 thread::sleep(Duration::from_millis(101));
226 writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); thread::sleep(Duration::from_millis(101));
228 writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); drop(writer);
231 drop(stream);
232
233 let requests = test_consumer.requests.lock().unwrap();
234 dbg!(&requests);
235 assert_eq!(requests.len(), 2);
236 }
237
238 #[test_log::test]
239 fn test_writer_types() {
240 let (test_consumer, stream) = create_test_stream();
241
242 let cd1 = ChannelDescriptor::new("double");
243 let cd2 = ChannelDescriptor::new("string");
244 let cd3 = ChannelDescriptor::new("int");
245 let mut double_writer = stream.double_writer(&cd1);
246 let mut string_writer = stream.string_writer(&cd2);
247 let mut integer_writer = stream.integer_writer(&cd3);
248
249 for i in 0..5000 {
250 let start_time = UNIX_EPOCH.elapsed().unwrap();
251 let value = i % 50;
252 double_writer.push(start_time, value as f64);
253 string_writer.push(start_time, format!("{}", value));
254 integer_writer.push(start_time, value);
255 }
256
257 drop(double_writer);
258 drop(string_writer);
259 drop(integer_writer);
260 drop(stream);
261
262 let requests = test_consumer.requests.lock().unwrap();
263
264 assert_eq!(requests.len(), 15);
265
266 let r = requests
267 .iter()
268 .flat_map(|r| r.series.clone())
269 .map(|s| {
270 (
271 s.channel.unwrap().name,
272 s.points.unwrap().points_type.unwrap(),
273 )
274 })
275 .collect::<HashMap<_, _>>();
276
277 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
278 panic!("invalid double points type");
279 };
280
281 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
282 panic!("invalid int points type");
283 };
284
285 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
286 panic!("invalid string points type");
287 };
288
289 assert_eq!(dp.points.len(), 1000);
291 assert_eq!(sp.points.len(), 1000);
292 assert_eq!(ip.points.len(), 1000);
293 }
294}