1pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162pub mod types;
163pub mod upload;
164
165pub use nominal_api as api;
166
167pub mod prelude {
169 pub use conjure_object::BearerToken;
170 pub use conjure_object::ResourceIdentifier;
171 pub use nominal_api::tonic::google::protobuf::Timestamp;
172 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
173 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
174 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
175 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
176 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
177 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
178 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
179 pub use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
180 pub use nominal_api::tonic::io::nominal::scout::api::proto::StructPoints;
181 pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
182 pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
183 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
184 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
185
186 pub use crate::consumer::NominalCoreConsumer;
187 pub use crate::stream::NominalDatasetStream;
188 #[expect(deprecated)]
189 pub use crate::stream::NominalDatasourceStream;
190 pub use crate::stream::NominalStreamOpts;
191 pub use crate::types::AuthProvider;
192 pub use crate::types::ChannelDescriptor;
193 pub use crate::types::IntoTimestamp;
194}
195
196#[cfg(test)]
197mod tests {
198 use std::collections::HashMap;
199 use std::sync::Arc;
200 use std::sync::Mutex;
201 use std::thread;
202 use std::time::Duration;
203 use std::time::UNIX_EPOCH;
204
205 use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
206
207 use crate::client::PRODUCTION_API_URL;
208 use crate::consumer::ConsumerResult;
209 use crate::consumer::WriteRequestConsumer;
210 use crate::prelude::*;
211
212 #[derive(Debug)]
213 struct TestDatasourceStream {
214 requests: Mutex<Vec<WriteRequestNominal>>,
215 }
216
217 impl WriteRequestConsumer for Arc<TestDatasourceStream> {
218 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
219 self.requests.lock().unwrap().push(request.clone());
220 Ok(())
221 }
222 }
223
224 fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
225 let test_consumer = Arc::new(TestDatasourceStream {
226 requests: Mutex::new(vec![]),
227 });
228 let stream = NominalDatasetStream::new_with_consumer(
229 test_consumer.clone(),
230 NominalStreamOpts {
231 max_points_per_record: 1000,
232 max_request_delay: Duration::from_millis(100),
233 max_buffered_requests: 2,
234 request_dispatcher_tasks: 4,
235 base_api_url: PRODUCTION_API_URL.to_string(),
236 },
237 );
238
239 (test_consumer, stream)
240 }
241
242 #[test_log::test]
243 fn test_stream() {
244 let (test_consumer, stream) = create_test_stream();
245
246 for batch in 0..5 {
247 let mut points = Vec::new();
248 for i in 0..1000 {
249 let start_time = UNIX_EPOCH.elapsed().unwrap();
250 points.push(DoublePoint {
251 timestamp: Some(Timestamp {
252 seconds: start_time.as_secs() as i64,
253 nanos: start_time.subsec_nanos() as i32 + i,
254 }),
255 value: (i % 50) as f64,
256 });
257 }
258
259 stream.enqueue(
260 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
261 points,
262 );
263 }
264
265 drop(stream); let requests = test_consumer.requests.lock().unwrap();
268
269 assert_eq!(requests.len(), 5);
272 let series = requests.first().unwrap().series.first().unwrap();
273 if let Some(PointsType::DoublePoints(points)) =
274 series.points.as_ref().unwrap().points_type.as_ref()
275 {
276 assert_eq!(points.points.len(), 1000);
277 } else {
278 panic!("unexpected data type");
279 }
280 }
281
282 #[test_log::test]
283 fn test_stream_types() {
284 let (test_consumer, stream) = create_test_stream();
285
286 for batch in 0..5 {
287 let mut doubles = Vec::new();
288 let mut strings = Vec::new();
289 let mut structs = Vec::new();
290 let mut ints = Vec::new();
291 let mut uints = Vec::new();
292 for i in 0..1000 {
293 let start_time = UNIX_EPOCH.elapsed().unwrap();
294 doubles.push(DoublePoint {
295 timestamp: Some(start_time.into_timestamp()),
296 value: (i % 50) as f64,
297 });
298 strings.push(StringPoint {
299 timestamp: Some(start_time.into_timestamp()),
300 value: format!("{}", i % 50),
301 });
302 structs.push(StructPoint {
303 timestamp: Some(start_time.into_timestamp()),
304 json_string: format!("{}", i % 50),
305 });
306 ints.push(IntegerPoint {
307 timestamp: Some(start_time.into_timestamp()),
308 value: i % 50,
309 });
310 uints.push(Uint64Point {
311 timestamp: Some(start_time.into_timestamp()),
312 value: (i % 50) as u64,
313 })
314 }
315
316 stream.enqueue(
317 &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
318 doubles,
319 );
320 stream.enqueue(
321 &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
322 strings,
323 );
324 stream.enqueue(
325 &ChannelDescriptor::with_tags("struct", [("batch_id", batch.to_string())]),
326 structs,
327 );
328 stream.enqueue(
329 &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
330 ints,
331 );
332 stream.enqueue(
333 &ChannelDescriptor::with_tags("uint64", [("batch_id", batch.to_string())]),
334 uints,
335 );
336 }
337
338 drop(stream); let requests = test_consumer.requests.lock().unwrap();
341
342 assert_eq!(requests.len(), 25);
345
346 let r = requests
347 .iter()
348 .flat_map(|r| r.series.clone())
349 .map(|s| {
350 (
351 s.channel.unwrap().name,
352 s.points.unwrap().points_type.unwrap(),
353 )
354 })
355 .collect::<HashMap<_, _>>();
356 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
357 panic!("invalid double points type");
358 };
359
360 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
361 panic!("invalid int points type");
362 };
363
364 let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
365 panic!("invalid uint64 points type");
366 };
367
368 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
369 panic!("invalid string points type");
370 };
371
372 let PointsType::StructPoints(stp) = r.get("struct").unwrap() else {
373 panic!("invalid struct points type");
374 };
375
376 assert_eq!(dp.points.len(), 1000);
378 assert_eq!(sp.points.len(), 1000);
379 assert_eq!(ip.points.len(), 1000);
380 assert_eq!(up.points.len(), 1000);
381 assert_eq!(stp.points.len(), 1000);
382 }
383
384 #[test_log::test]
385 fn test_writer() {
386 let (test_consumer, stream) = create_test_stream();
387
388 let cd = ChannelDescriptor::new("channel_1");
389 let mut writer = stream.double_writer(&cd);
390
391 for i in 0..5000 {
392 let start_time = UNIX_EPOCH.elapsed().unwrap();
393 let value = i % 50;
394 writer.push(start_time, value as f64);
395 }
396
397 drop(writer); drop(stream); let requests = test_consumer.requests.lock().unwrap();
401
402 assert_eq!(requests.len(), 5);
403 let series = requests.first().unwrap().series.first().unwrap();
404 if let Some(PointsType::DoublePoints(points)) =
405 series.points.as_ref().unwrap().points_type.as_ref()
406 {
407 assert_eq!(points.points.len(), 1000);
408 } else {
409 panic!("unexpected data type");
410 }
411 }
412
413 #[test_log::test]
414 fn test_time_flush() {
415 let (test_consumer, stream) = create_test_stream();
416
417 let cd = ChannelDescriptor::new("channel_1");
418 let mut writer = stream.double_writer(&cd);
419
420 writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
421 thread::sleep(Duration::from_millis(101));
422 writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); thread::sleep(Duration::from_millis(101));
424 writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); drop(writer);
427 drop(stream);
428
429 let requests = test_consumer.requests.lock().unwrap();
430 dbg!(&requests);
431 assert_eq!(requests.len(), 2);
432 }
433
434 #[test_log::test]
435 fn test_writer_types() {
436 let (test_consumer, stream) = create_test_stream();
437
438 let cd1 = ChannelDescriptor::new("double");
439 let cd2 = ChannelDescriptor::new("string");
440 let cd3 = ChannelDescriptor::new("int");
441 let cd4 = ChannelDescriptor::new("uint64");
442 let cd5 = ChannelDescriptor::new("struct");
443 let mut double_writer = stream.double_writer(&cd1);
444 let mut string_writer = stream.string_writer(&cd2);
445 let mut integer_writer = stream.integer_writer(&cd3);
446 let mut uint64_writer = stream.uint64_writer(&cd4);
447 let mut struct_writer = stream.struct_writer(&cd5);
448
449 for i in 0..5000 {
450 let start_time = UNIX_EPOCH.elapsed().unwrap();
451 let value = i % 50;
452 double_writer.push(start_time, value as f64);
453 string_writer.push(start_time, format!("{}", value));
454 integer_writer.push(start_time, value);
455 uint64_writer.push(start_time, value as u64);
456 struct_writer.push(start_time, format!("{}", value));
457 }
458
459 drop(double_writer);
460 drop(string_writer);
461 drop(integer_writer);
462 drop(uint64_writer);
463 drop(struct_writer);
464 drop(stream);
465
466 let requests = test_consumer.requests.lock().unwrap();
467
468 assert_eq!(requests.len(), 25);
469
470 let r = requests
471 .iter()
472 .flat_map(|r| r.series.clone())
473 .map(|s| {
474 (
475 s.channel.unwrap().name,
476 s.points.unwrap().points_type.unwrap(),
477 )
478 })
479 .collect::<HashMap<_, _>>();
480
481 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
482 panic!("invalid double points type");
483 };
484
485 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
486 panic!("invalid int points type");
487 };
488
489 let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
490 panic!("invalid uint64 points type");
491 };
492
493 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
494 panic!("invalid string points type");
495 };
496
497 let PointsType::StructPoints(stp) = r.get("struct").unwrap() else {
498 panic!("invalid struct points type");
499 };
500
501 assert_eq!(dp.points.len(), 1000);
503 assert_eq!(sp.points.len(), 1000);
504 assert_eq!(ip.points.len(), 1000);
505 assert_eq!(up.points.len(), 1000);
506 assert_eq!(stp.points.len(), 1000);
507 }
508}