1pub mod client;
159pub mod consumer;
160pub mod listener;
161pub mod stream;
162pub mod types;
163pub mod upload;
164
165pub use nominal_api as api;
166
167pub mod prelude {
169 pub use conjure_object::BearerToken;
170 pub use conjure_object::ResourceIdentifier;
171 pub use nominal_api::tonic::google::protobuf::Timestamp;
172 pub use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
173 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
174 pub use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
175 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
176 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoints;
177 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
178 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
179 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
180 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
181 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
182 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoints;
183 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
184 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
185 pub use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
186 pub use nominal_api::tonic::io::nominal::scout::api::proto::StructPoints;
187 pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
188 pub use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
189 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
190 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
191
192 pub use crate::consumer::NominalCoreConsumer;
193 pub use crate::stream::NominalDatasetStream;
194 #[expect(deprecated)]
195 pub use crate::stream::NominalDatasourceStream;
196 pub use crate::stream::NominalStreamOpts;
197 pub use crate::types::AuthProvider;
198 pub use crate::types::ChannelDescriptor;
199 pub use crate::types::IntoTimestamp;
200}
201
202#[cfg(test)]
203mod tests {
204 use std::collections::HashMap;
205 use std::sync::Arc;
206 use std::sync::Mutex;
207 use std::thread;
208 use std::time::Duration;
209 use std::time::UNIX_EPOCH;
210
211 use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
212 use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
213 use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
214
215 use crate::client::PRODUCTION_API_URL;
216 use crate::consumer::ConsumerResult;
217 use crate::consumer::WriteRequestConsumer;
218 use crate::prelude::*;
219
220 #[derive(Debug)]
221 struct TestDatasourceStream {
222 requests: Mutex<Vec<WriteRequestNominal>>,
223 }
224
225 impl WriteRequestConsumer for Arc<TestDatasourceStream> {
226 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
227 self.requests.lock().unwrap().push(request.clone());
228 Ok(())
229 }
230 }
231
232 fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
233 let test_consumer = Arc::new(TestDatasourceStream {
234 requests: Mutex::new(vec![]),
235 });
236 let stream = NominalDatasetStream::new_with_consumer(
237 test_consumer.clone(),
238 NominalStreamOpts {
239 max_points_per_record: 1000,
240 max_request_delay: Duration::from_millis(100),
241 max_buffered_requests: 2,
242 request_dispatcher_tasks: 4,
243 base_api_url: PRODUCTION_API_URL.to_string(),
244 },
245 );
246
247 (test_consumer, stream)
248 }
249
250 #[test_log::test]
251 fn test_stream() {
252 let (test_consumer, stream) = create_test_stream();
253
254 for batch in 0..5 {
255 let mut points = Vec::new();
256 for i in 0..1000 {
257 let start_time = UNIX_EPOCH.elapsed().unwrap();
258 points.push(DoublePoint {
259 timestamp: Some(Timestamp {
260 seconds: start_time.as_secs() as i64,
261 nanos: start_time.subsec_nanos() as i32 + i,
262 }),
263 value: (i % 50) as f64,
264 });
265 }
266
267 stream.enqueue(
268 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
269 points,
270 );
271 }
272
273 drop(stream); let requests = test_consumer.requests.lock().unwrap();
276
277 assert_eq!(requests.len(), 5);
280 let series = requests.first().unwrap().series.first().unwrap();
281 if let Some(PointsType::DoublePoints(points)) =
282 series.points.as_ref().unwrap().points_type.as_ref()
283 {
284 assert_eq!(points.points.len(), 1000);
285 } else {
286 panic!("unexpected data type");
287 }
288 }
289
290 #[test_log::test]
291 fn test_stream_types() {
292 let (test_consumer, stream) = create_test_stream();
293
294 for batch in 0..5 {
295 let mut doubles = Vec::new();
296 let mut strings = Vec::new();
297 let mut structs = Vec::new();
298 let mut ints = Vec::new();
299 let mut uints = Vec::new();
300 let mut double_arrays = Vec::new();
301 let mut string_arrays = Vec::new();
302 for i in 0..1000 {
303 let start_time = UNIX_EPOCH.elapsed().unwrap();
304 doubles.push(DoublePoint {
305 timestamp: Some(start_time.into_timestamp()),
306 value: (i % 50) as f64,
307 });
308 strings.push(StringPoint {
309 timestamp: Some(start_time.into_timestamp()),
310 value: format!("{}", i % 50),
311 });
312 structs.push(StructPoint {
313 timestamp: Some(start_time.into_timestamp()),
314 json_string: format!("{{\"v\":{}}}", i % 50),
315 });
316 ints.push(IntegerPoint {
317 timestamp: Some(start_time.into_timestamp()),
318 value: i % 50,
319 });
320 uints.push(Uint64Point {
321 timestamp: Some(start_time.into_timestamp()),
322 value: (i % 50) as u64,
323 });
324 double_arrays.push(DoubleArrayPoint {
325 timestamp: Some(start_time.into_timestamp()),
326 value: vec![(i % 50) as f64, ((i + 1) % 50) as f64],
327 });
328 string_arrays.push(StringArrayPoint {
329 timestamp: Some(start_time.into_timestamp()),
330 value: vec![format!("{}", i % 50)],
331 });
332 }
333
334 stream.enqueue(
335 &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
336 doubles,
337 );
338 stream.enqueue(
339 &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
340 strings,
341 );
342 stream.enqueue(
343 &ChannelDescriptor::with_tags("struct", [("batch_id", batch.to_string())]),
344 structs,
345 );
346 stream.enqueue(
347 &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
348 ints,
349 );
350 stream.enqueue(
351 &ChannelDescriptor::with_tags("uint64", [("batch_id", batch.to_string())]),
352 uints,
353 );
354 stream.enqueue(
355 &ChannelDescriptor::with_tags("double_array", [("batch_id", batch.to_string())]),
356 double_arrays,
357 );
358 stream.enqueue(
359 &ChannelDescriptor::with_tags("string_array", [("batch_id", batch.to_string())]),
360 string_arrays,
361 );
362 }
363
364 drop(stream); let requests = test_consumer.requests.lock().unwrap();
367
368 assert_eq!(requests.len(), 35);
371
372 let r = requests
373 .iter()
374 .flat_map(|r| r.series.clone())
375 .map(|s| {
376 (
377 s.channel.unwrap().name,
378 s.points.unwrap().points_type.unwrap(),
379 )
380 })
381 .collect::<HashMap<_, _>>();
382 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
383 panic!("invalid double points type");
384 };
385
386 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
387 panic!("invalid int points type");
388 };
389
390 let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
391 panic!("invalid uint64 points type");
392 };
393
394 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
395 panic!("invalid string points type");
396 };
397
398 let PointsType::StructPoints(stp) = r.get("struct").unwrap() else {
399 panic!("invalid struct points type");
400 };
401
402 let PointsType::ArrayPoints(ArrayPoints {
403 array_type: Some(ArrayType::DoubleArrayPoints(dap)),
404 }) = r.get("double_array").unwrap()
405 else {
406 panic!("invalid double array points type");
407 };
408
409 let PointsType::ArrayPoints(ArrayPoints {
410 array_type: Some(ArrayType::StringArrayPoints(sap)),
411 }) = r.get("string_array").unwrap()
412 else {
413 panic!("invalid string array points type");
414 };
415
416 assert_eq!(dp.points.len(), 1000);
418 assert_eq!(sp.points.len(), 1000);
419 assert_eq!(ip.points.len(), 1000);
420 assert_eq!(up.points.len(), 1000);
421 assert_eq!(stp.points.len(), 1000);
422 assert_eq!(dap.points.len(), 1000);
423 assert_eq!(sap.points.len(), 1000);
424 }
425
426 #[test_log::test]
427 #[should_panic(expected = "mismatched types")]
428 fn test_mismatched_array_types_panics() {
429 let (_test_consumer, stream) = create_test_stream();
436 let stream = std::mem::ManuallyDrop::new(stream);
437 let cd = ChannelDescriptor::new("mixed_array");
438
439 let ts = UNIX_EPOCH.elapsed().unwrap().into_timestamp();
440
441 stream.enqueue(
442 &cd,
443 vec![DoubleArrayPoint {
444 timestamp: Some(ts),
445 value: vec![1.0, 2.0],
446 }],
447 );
448 stream.enqueue(
449 &cd,
450 vec![StringArrayPoint {
451 timestamp: Some(ts),
452 value: vec!["a".into()],
453 }],
454 );
455 }
456
457 #[test_log::test]
458 fn test_writer() {
459 let (test_consumer, stream) = create_test_stream();
460
461 let cd = ChannelDescriptor::new("channel_1");
462 let mut writer = stream.double_writer(cd);
463
464 for i in 0..5000 {
465 let start_time = UNIX_EPOCH.elapsed().unwrap();
466 let value = i % 50;
467 writer.push(start_time, value as f64);
468 }
469
470 drop(writer); drop(stream); let requests = test_consumer.requests.lock().unwrap();
474
475 assert_eq!(requests.len(), 5);
476 let series = requests.first().unwrap().series.first().unwrap();
477 if let Some(PointsType::DoublePoints(points)) =
478 series.points.as_ref().unwrap().points_type.as_ref()
479 {
480 assert_eq!(points.points.len(), 1000);
481 } else {
482 panic!("unexpected data type");
483 }
484 }
485
486 #[test_log::test]
487 fn test_time_flush() {
488 let (test_consumer, stream) = create_test_stream();
489
490 let cd = ChannelDescriptor::new("channel_1");
491 let mut writer = stream.double_writer(cd);
492
493 writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
494 thread::sleep(Duration::from_millis(101));
495 writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); thread::sleep(Duration::from_millis(101));
497 writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); drop(writer);
500 drop(stream);
501
502 let requests = test_consumer.requests.lock().unwrap();
503 dbg!(&requests);
504 assert_eq!(requests.len(), 2);
505 }
506
507 #[test_log::test]
508 fn test_writer_types() {
509 let (test_consumer, stream) = create_test_stream();
510
511 let cd1 = ChannelDescriptor::new("double");
512 let cd2 = ChannelDescriptor::new("string");
513 let cd3 = ChannelDescriptor::new("int");
514 let cd4 = ChannelDescriptor::new("uint64");
515 let cd5 = ChannelDescriptor::new("struct");
516 let cd6 = ChannelDescriptor::new("double_array");
517 let cd7 = ChannelDescriptor::new("string_array");
518 let mut double_writer = stream.double_writer(cd1);
519 let mut string_writer = stream.string_writer(cd2);
520 let mut integer_writer = stream.integer_writer(cd3);
521 let mut uint64_writer = stream.uint64_writer(cd4);
522 let mut struct_writer = stream.struct_writer(cd5);
523 let mut double_array_writer = stream.double_array_writer(cd6);
524 let mut string_array_writer = stream.string_array_writer(cd7);
525
526 for i in 0..5000 {
527 let start_time = UNIX_EPOCH.elapsed().unwrap();
528 let value = i % 50;
529 double_writer.push(start_time, value as f64);
530 string_writer.push(start_time, format!("{}", value));
531 integer_writer.push(start_time, value);
532 uint64_writer.push(start_time, value as u64);
533 struct_writer.push(start_time, format!("{{\"v\":{}}}", value));
534 double_array_writer.push(start_time, vec![value as f64, (value + 1) as f64]);
535 string_array_writer.push(start_time, vec![format!("{}", value)]);
536 }
537
538 drop(double_writer);
539 drop(string_writer);
540 drop(integer_writer);
541 drop(uint64_writer);
542 drop(struct_writer);
543 drop(double_array_writer);
544 drop(string_array_writer);
545 drop(stream);
546
547 let requests = test_consumer.requests.lock().unwrap();
548
549 assert_eq!(requests.len(), 35);
550
551 let r = requests
552 .iter()
553 .flat_map(|r| r.series.clone())
554 .map(|s| {
555 (
556 s.channel.unwrap().name,
557 s.points.unwrap().points_type.unwrap(),
558 )
559 })
560 .collect::<HashMap<_, _>>();
561
562 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
563 panic!("invalid double points type");
564 };
565
566 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
567 panic!("invalid int points type");
568 };
569
570 let PointsType::Uint64Points(up) = r.get("uint64").unwrap() else {
571 panic!("invalid uint64 points type");
572 };
573
574 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
575 panic!("invalid string points type");
576 };
577
578 let PointsType::StructPoints(stp) = r.get("struct").unwrap() else {
579 panic!("invalid struct points type");
580 };
581
582 let PointsType::ArrayPoints(ArrayPoints {
583 array_type: Some(ArrayType::DoubleArrayPoints(dap)),
584 }) = r.get("double_array").unwrap()
585 else {
586 panic!("invalid double array points type");
587 };
588
589 let PointsType::ArrayPoints(ArrayPoints {
590 array_type: Some(ArrayType::StringArrayPoints(sap)),
591 }) = r.get("string_array").unwrap()
592 else {
593 panic!("invalid string array points type");
594 };
595
596 assert_eq!(dp.points.len(), 1000);
598 assert_eq!(sp.points.len(), 1000);
599 assert_eq!(ip.points.len(), 1000);
600 assert_eq!(up.points.len(), 1000);
601 assert_eq!(stp.points.len(), 1000);
602 assert_eq!(dap.points.len(), 1000);
603 assert_eq!(sap.points.len(), 1000);
604 }
605}