1use std::error::Error;
2use std::fmt::Debug;
3use std::fmt::Formatter;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::sync::LazyLock;
7
8use apache_avro::types::Record;
9use apache_avro::types::Value;
10use conjure_object::ResourceIdentifier;
11use nominal_api::tonic::google::protobuf::Timestamp;
12use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
13use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
14use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
15use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
16use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
17use nominal_api::tonic::io::nominal::scout::api::proto::Points;
18use nominal_api::tonic::io::nominal::scout::api::proto::Series;
19use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
20use nominal_api::tonic::io::nominal::scout::api::proto::StructPoints;
21use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
22use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
23use parking_lot::Mutex;
24use prost::Message;
25use tracing::warn;
26
27use crate::client::NominalApiClients;
28use crate::client::{self};
29use crate::listener::NominalStreamListener;
30use crate::types::AuthProvider;
31
32#[derive(Debug, thiserror::Error)]
33pub enum ConsumerError {
34 #[error("io error: {0}")]
35 IoError(#[from] std::io::Error),
36 #[error("avro error: {0}")]
37 AvroError(#[from] Box<apache_avro::Error>),
38 #[error("No auth token provided. Please make sure you're authenticated.")]
39 MissingTokenError,
40 #[error("request error: {0}")]
41 RequestError(String),
42 #[error("consumer error occurred: {0}")]
43 GenericConsumerError(#[from] Box<dyn Error + Send + Sync>),
44}
45
46pub type ConsumerResult<T> = Result<T, ConsumerError>;
47
48pub trait WriteRequestConsumer: Send + Sync + Debug {
49 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()>;
50}
51
52#[derive(Clone)]
53pub struct NominalCoreConsumer<A: AuthProvider> {
54 client: NominalApiClients,
55 handle: tokio::runtime::Handle,
56 auth_provider: A,
57 data_source_rid: ResourceIdentifier,
58}
59
60impl<A: AuthProvider> NominalCoreConsumer<A> {
61 pub fn new(
62 client: NominalApiClients,
63 handle: tokio::runtime::Handle,
64 auth_provider: A,
65 data_source_rid: ResourceIdentifier,
66 ) -> Self {
67 Self {
68 client,
69 handle,
70 auth_provider,
71 data_source_rid,
72 }
73 }
74}
75
76impl<T: AuthProvider> Debug for NominalCoreConsumer<T> {
77 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("NominalCoreConsumer")
79 .field("client", &self.client)
80 .field("data_source_rid", &self.data_source_rid)
81 .finish()
82 }
83}
84
85impl<T: AuthProvider + 'static> WriteRequestConsumer for NominalCoreConsumer<T> {
86 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
87 let token = self
88 .auth_provider
89 .token()
90 .ok_or(ConsumerError::MissingTokenError)?;
91 let write_request =
92 client::encode_request(request.encode_to_vec(), &token, &self.data_source_rid)?;
93 self.handle.block_on(async {
94 self.client
95 .send(write_request)
96 .await
97 .map_err(|e| ConsumerError::RequestError(format!("{e:?}")))
98 })?;
99 Ok(())
100 }
101}
102
103const DEFAULT_FILE_PREFIX: &str = "nominal_stream";
104
105pub const DATASET_RID_METADATA_KEY: &str = "nominal.dataset_rid";
106
107pub static CORE_SCHEMA_STR: &str = r#"{
108 "type": "record",
109 "name": "AvroStream",
110 "namespace": "io.nominal.ingest",
111 "fields": [
112 {
113 "name": "channel",
114 "type": "string",
115 "doc": "Channel/series name (e.g., 'vehicle_id', 'col_1', 'temperature')"
116 },
117 {
118 "name": "timestamps",
119 "type": {"type": "array", "items": "long"},
120 "doc": "Array of Unix timestamps in nanoseconds"
121 },
122 {
123 "name": "values",
124 "type": {"type": "array", "items": [
125 "double",
126 "string",
127 "long",
128 {"type": "record", "name": "DoubleArray", "fields": [{"name": "items", "type": {"type": "array", "items": "double"}}]},
129 {"type": "record", "name": "StringArray", "fields": [{"name": "items", "type": {"type": "array", "items": "string"}}]},
130 {"type": "record", "name": "JsonStruct", "fields": [{"name": "json", "type": "string"}]}
131 ]},
132 "doc": "Array of values. Can be doubles, longs, strings, arrays, or JSON structs"
133 },
134 {
135 "name": "tags",
136 "type": {"type": "map", "values": "string"},
137 "default": {},
138 "doc": "Key-value metadata tags"
139 }
140 ]
141}
142"#;
143
144pub static CORE_AVRO_SCHEMA: LazyLock<apache_avro::Schema> = LazyLock::new(|| {
145 let json = serde_json::from_str(CORE_SCHEMA_STR).expect("Failed to parse JSON schema");
146 apache_avro::Schema::parse(&json).expect("Failed to parse Avro schema")
147});
148
149#[derive(Clone)]
150pub struct AvroFileConsumer {
151 writer: Arc<Mutex<apache_avro::Writer<'static, std::fs::File>>>,
152 path: PathBuf,
153}
154
155impl Debug for AvroFileConsumer {
156 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("AvroFileConsumer")
158 .field("path", &self.path)
159 .finish()
160 }
161}
162
163impl AvroFileConsumer {
164 pub fn new(
165 directory: impl Into<PathBuf>,
166 file_prefix: Option<String>,
167 dataset_rid: Option<ResourceIdentifier>,
168 ) -> std::io::Result<Self> {
169 let datetime = chrono::Utc::now().format("%Y%m%d_%H%M%S").to_string();
170 let prefix = file_prefix.unwrap_or_else(|| DEFAULT_FILE_PREFIX.to_string());
171 let filename = format!("{prefix}_{datetime}.avro");
172 let directory = directory.into();
173 let full_path = directory.join(&filename);
174
175 Self::new_with_full_path(full_path, true, dataset_rid)
176 }
177
178 pub fn new_with_full_path(
196 file_path: impl Into<PathBuf>,
197 overwrite: bool,
198 dataset_rid: Option<ResourceIdentifier>,
199 ) -> std::io::Result<Self> {
200 let path = file_path.into();
201 std::fs::create_dir_all(path.parent().unwrap_or(&path))?;
202 let mut options = std::fs::OpenOptions::new();
203 options.write(true);
204 if overwrite {
205 options.create(true).truncate(true);
206 } else {
207 options.create_new(true);
208 }
209 let file = options.open(&path)?;
210
211 let mut writer = apache_avro::Writer::builder()
212 .schema(&CORE_AVRO_SCHEMA)
213 .writer(file)
214 .codec(apache_avro::Codec::Snappy)
215 .build();
216
217 if let Some(rid) = dataset_rid {
218 writer
219 .add_user_metadata(DATASET_RID_METADATA_KEY.to_string(), rid.to_string())
220 .map_err(|e| {
221 std::io::Error::other(format!("failed to write avro metadata: {e}"))
222 })?;
223 }
224
225 Ok(Self {
226 writer: Arc::new(Mutex::new(writer)),
227 path,
228 })
229 }
230
231 fn append_series(&self, series: &[Series]) -> ConsumerResult<()> {
232 let mut records: Vec<Record> = Vec::new();
233 for series in series {
234 let (timestamps, values) = points_to_avro(series.points.as_ref());
235
236 let mut record = Record::new(&CORE_AVRO_SCHEMA).expect("Failed to create Avro record");
237
238 record.put(
239 "channel",
240 series
241 .channel
242 .as_ref()
243 .map(|c| c.name.clone())
244 .unwrap_or("values".to_string()),
245 );
246 record.put("timestamps", Value::Array(timestamps));
247 record.put("values", Value::Array(values));
248 record.put("tags", series.tags.clone());
249
250 records.push(record);
251 }
252
253 self.writer
254 .lock()
255 .extend(records)
256 .map_err(|e| ConsumerError::AvroError(Box::new(e)))?;
257
258 Ok(())
259 }
260}
261
262fn points_to_avro(points: Option<&Points>) -> (Vec<Value>, Vec<Value>) {
263 let Some(Points {
264 points_type: Some(points),
265 }) = points
266 else {
267 return (Vec::new(), Vec::new());
268 };
269
270 match points {
271 PointsType::DoublePoints(DoublePoints { points }) => points
272 .iter()
273 .map(|point| {
274 (
275 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
276 Value::Union(0, Box::new(Value::Double(point.value))),
277 )
278 })
279 .collect(),
280 PointsType::StringPoints(StringPoints { points }) => points
281 .iter()
282 .map(|point| {
283 (
284 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
285 Value::Union(1, Box::new(Value::String(point.value.clone()))),
286 )
287 })
288 .collect(),
289 PointsType::IntegerPoints(IntegerPoints { points }) => points
290 .iter()
291 .map(|point| {
292 (
293 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
294 Value::Union(2, Box::new(Value::Long(point.value))),
295 )
296 })
297 .collect(),
298 PointsType::ArrayPoints(ArrayPoints { array_type }) => match array_type {
299 Some(ArrayType::DoubleArrayPoints(points)) => points
300 .points
301 .iter()
302 .map(|point| {
303 let array_values: Vec<Value> =
304 point.value.iter().map(|v| Value::Double(*v)).collect();
305 let record =
306 Value::Record(vec![("items".to_string(), Value::Array(array_values))]);
307 (
308 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
309 Value::Union(3, Box::new(record)),
310 )
311 })
312 .collect(),
313 Some(ArrayType::StringArrayPoints(points)) => points
314 .points
315 .iter()
316 .map(|point| {
317 let array_values: Vec<Value> = point
318 .value
319 .iter()
320 .map(|v| Value::String(v.clone()))
321 .collect();
322 let record =
323 Value::Record(vec![("items".to_string(), Value::Array(array_values))]);
324 (
325 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
326 Value::Union(4, Box::new(record)),
327 )
328 })
329 .collect(),
330 None => (Vec::new(), Vec::new()),
331 },
332 PointsType::StructPoints(StructPoints { points }) => points
333 .iter()
334 .map(|point| {
335 let record = Value::Record(vec![(
336 "json".to_string(),
337 Value::String(point.json_string.clone()),
338 )]);
339 (
340 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
341 Value::Union(5, Box::new(record)),
342 )
343 })
344 .collect(),
345 PointsType::Uint64Points(Uint64Points { points }) => points
346 .iter()
347 .map(|point| {
348 (
349 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
350 Value::Union(2, Box::new(Value::Long(point.value as i64))),
351 )
352 })
353 .collect(),
354 }
355}
356
357fn convert_timestamp_to_nanoseconds(timestamp: Timestamp) -> Value {
358 Value::Long(timestamp.seconds * 1_000_000_000 + timestamp.nanos as i64)
359}
360
361impl WriteRequestConsumer for AvroFileConsumer {
362 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
363 self.append_series(&request.series)?;
364 Ok(())
365 }
366}
367
368impl Drop for AvroFileConsumer {
369 fn drop(&mut self) {
378 if let Err(e) = self.writer.lock().flush() {
379 warn!(
380 "failed to flush avro writer for {:?} on drop: {e:?}",
381 self.path
382 );
383 }
384 }
385}
386
387#[derive(Clone)]
388pub struct RequestConsumerWithFallback<P, F>
389where
390 P: WriteRequestConsumer,
391 F: WriteRequestConsumer,
392{
393 primary: P,
394 fallback: F,
395}
396
397impl<P, F> RequestConsumerWithFallback<P, F>
398where
399 P: WriteRequestConsumer,
400 F: WriteRequestConsumer,
401{
402 pub fn new(primary: P, fallback: F) -> Self {
403 Self { primary, fallback }
404 }
405}
406
407impl<P, F> Debug for RequestConsumerWithFallback<P, F>
408where
409 F: Send + Sync + WriteRequestConsumer,
410 P: Send + Sync + WriteRequestConsumer,
411{
412 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
413 f.debug_struct("RequestConsumerWithFallback")
414 .field("primary", &self.primary)
415 .field("fallback", &self.fallback)
416 .finish()
417 }
418}
419
420#[derive(Debug, Clone)]
421pub struct DualWriteRequestConsumer<P, S>
422where
423 P: WriteRequestConsumer,
424 S: WriteRequestConsumer,
425{
426 primary: P,
427 secondary: S,
428}
429
430impl<P, S> DualWriteRequestConsumer<P, S>
431where
432 P: WriteRequestConsumer,
433 S: WriteRequestConsumer,
434{
435 pub fn new(primary: P, secondary: S) -> Self {
436 Self { primary, secondary }
437 }
438}
439
440impl<P, S> WriteRequestConsumer for DualWriteRequestConsumer<P, S>
441where
442 P: WriteRequestConsumer + Send + Sync,
443 S: WriteRequestConsumer + Send + Sync,
444{
445 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
446 let primary_result = self.primary.consume(request);
447 let secondary_result = self.secondary.consume(request);
448 if let Err(e) = &primary_result {
449 warn!("Sending request to primary consumer failed: {:?}", e);
450 }
451 if let Err(e) = &secondary_result {
452 warn!("Sending request to secondary consumer failed: {:?}", e);
453 }
454
455 primary_result.and(secondary_result)
457 }
458}
459
460impl<P, F> WriteRequestConsumer for RequestConsumerWithFallback<P, F>
461where
462 P: WriteRequestConsumer + Send + Sync,
463 F: WriteRequestConsumer + Send + Sync,
464{
465 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
466 if let Err(e) = self.primary.consume(request) {
467 warn!("Sending request to primary consumer failed. Attempting fallback.");
468 let fallback_result = self.fallback.consume(request);
469 if let ConsumerError::MissingTokenError = e {
472 return Err(ConsumerError::MissingTokenError);
473 }
474 return fallback_result;
475 }
476 Ok(())
477 }
478}
479
480#[derive(Debug, Clone)]
481pub struct ListeningWriteRequestConsumer<C>
482where
483 C: WriteRequestConsumer,
484{
485 consumer: C,
486 listeners: Vec<Arc<dyn NominalStreamListener>>,
487}
488
489impl<C> ListeningWriteRequestConsumer<C>
490where
491 C: WriteRequestConsumer,
492{
493 pub fn new(consumer: C, listeners: Vec<Arc<dyn NominalStreamListener>>) -> Self {
494 Self {
495 consumer,
496 listeners,
497 }
498 }
499}
500
501impl<C> WriteRequestConsumer for ListeningWriteRequestConsumer<C>
502where
503 C: WriteRequestConsumer + Send + Sync,
504{
505 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
506 match self.consumer.consume(request) {
507 Ok(_) => {
508 self.listeners.on_success(request);
509 Ok(())
510 }
511 Err(e) => {
512 self.listeners.on_error(&e, request);
513 Err(e)
514 }
515 }
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use std::collections::HashMap;
522
523 use apache_avro::Reader;
524 use nominal_api::tonic::google::protobuf::Timestamp;
525 use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
526 use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
527 use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
528 use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
529 use tempfile::NamedTempFile;
530
531 use super::*;
532
533 fn make_timestamp(secs: i64, nanos: i32) -> Option<Timestamp> {
534 Some(Timestamp {
535 seconds: secs,
536 nanos,
537 })
538 }
539
540 fn make_series(name: &str, points: Points) -> Series {
541 Series {
542 channel: Some(Channel {
543 name: name.to_string(),
544 }),
545 tags: HashMap::new(),
546 points: Some(points),
547 }
548 }
549
550 #[test]
551 fn test_avro_file_with_all_value_types() {
552 let tmp_file = NamedTempFile::new().unwrap();
553 let path: PathBuf = tmp_file.path().to_path_buf();
554
555 {
557 let consumer = AvroFileConsumer::new_with_full_path(&path, true, None).unwrap();
558
559 let double_series = make_series(
561 "doubles",
562 Points {
563 points_type: Some(PointsType::DoublePoints(DoublePoints {
564 points: vec![
565 nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint {
566 timestamp: make_timestamp(1000, 0),
567 value: 1.5,
568 },
569 nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint {
570 timestamp: make_timestamp(1001, 0),
571 value: 2.5,
572 },
573 ],
574 })),
575 },
576 );
577
578 let long_series = make_series(
579 "longs",
580 Points {
581 points_type: Some(PointsType::IntegerPoints(IntegerPoints {
582 points: vec![
583 nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
584 timestamp: make_timestamp(1000, 0),
585 value: 42,
586 },
587 nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
588 timestamp: make_timestamp(1001, 0),
589 value: -100,
590 },
591 ],
592 })),
593 },
594 );
595
596 let string_series = make_series(
597 "strings",
598 Points {
599 points_type: Some(PointsType::StringPoints(StringPoints {
600 points: vec![
601 nominal_api::tonic::io::nominal::scout::api::proto::StringPoint {
602 timestamp: make_timestamp(1000, 0),
603 value: "hello".to_string(),
604 },
605 nominal_api::tonic::io::nominal::scout::api::proto::StringPoint {
606 timestamp: make_timestamp(1001, 0),
607 value: "world".to_string(),
608 },
609 ],
610 })),
611 },
612 );
613
614 let double_array_series = make_series(
615 "double_arrays",
616 Points {
617 points_type: Some(PointsType::ArrayPoints(ArrayPoints {
618 array_type: Some(ArrayType::DoubleArrayPoints(
619 nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoints {
620 points: vec![
621 DoubleArrayPoint {
622 timestamp: make_timestamp(1000, 0),
623 value: vec![1.0, 2.0, 3.0],
624 },
625 DoubleArrayPoint {
626 timestamp: make_timestamp(1001, 0),
627 value: vec![4.0, 5.0],
628 },
629 ],
630 },
631 )),
632 })),
633 },
634 );
635
636 let string_array_series = make_series(
637 "string_arrays",
638 Points {
639 points_type: Some(PointsType::ArrayPoints(ArrayPoints {
640 array_type: Some(ArrayType::StringArrayPoints(
641 nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoints {
642 points: vec![
643 StringArrayPoint {
644 timestamp: make_timestamp(1000, 0),
645 value: vec!["a".to_string(), "b".to_string()],
646 },
647 StringArrayPoint {
648 timestamp: make_timestamp(1001, 0),
649 value: vec![
650 "c".to_string(),
651 "d".to_string(),
652 "e".to_string(),
653 ],
654 },
655 ],
656 },
657 )),
658 })),
659 },
660 );
661
662 let struct_series = make_series(
663 "structs",
664 Points {
665 points_type: Some(PointsType::StructPoints(StructPoints {
666 points: vec![
667 nominal_api::tonic::io::nominal::scout::api::proto::StructPoint {
668 timestamp: make_timestamp(1000, 0),
669 json_string: r#"{"key": "value"}"#.to_string(),
670 },
671 nominal_api::tonic::io::nominal::scout::api::proto::StructPoint {
672 timestamp: make_timestamp(1001, 0),
673 json_string: r#"{"count": 42}"#.to_string(),
674 },
675 ],
676 })),
677 },
678 );
679
680 let uint64_series = make_series(
681 "uint64s",
682 Points {
683 points_type: Some(PointsType::Uint64Points(Uint64Points {
684 points: vec![
685 nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point {
686 timestamp: make_timestamp(1000, 0),
687 value: u64::MAX,
688 },
689 nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point {
690 timestamp: make_timestamp(1001, 0),
691 value: 12345678901234567890,
692 },
693 ],
694 })),
695 },
696 );
697
698 let request = WriteRequestNominal {
699 series: vec![
700 double_series,
701 long_series,
702 string_series,
703 double_array_series,
704 string_array_series,
705 struct_series,
706 uint64_series,
707 ],
708 session_name: None,
709 };
710
711 consumer.consume(&request).unwrap();
712
713 drop(consumer);
715 }
716
717 let file = std::fs::File::open(&path).unwrap();
719 let reader = Reader::new(file).unwrap();
720
721 let records: Vec<_> = reader.map(|r| r.unwrap()).collect();
722 assert_eq!(records.len(), 7, "Expected 7 series records");
723
724 let channels: Vec<String> = records
726 .iter()
727 .filter_map(|r| {
728 if let Value::Record(fields) = r {
729 fields.iter().find_map(|(name, value)| {
730 if name == "channel" {
731 if let Value::String(s) = value {
732 Some(s.clone())
733 } else {
734 None
735 }
736 } else {
737 None
738 }
739 })
740 } else {
741 None
742 }
743 })
744 .collect();
745
746 assert!(channels.contains(&"doubles".to_string()));
747 assert!(channels.contains(&"longs".to_string()));
748 assert!(channels.contains(&"strings".to_string()));
749 assert!(channels.contains(&"double_arrays".to_string()));
750 assert!(channels.contains(&"string_arrays".to_string()));
751 assert!(channels.contains(&"structs".to_string()));
752 assert!(channels.contains(&"uint64s".to_string()));
753
754 for record in &records {
756 if let Value::Record(fields) = record {
757 let channel = fields.iter().find_map(|(name, value)| {
758 if name == "channel" {
759 if let Value::String(s) = value {
760 Some(s.clone())
761 } else {
762 None
763 }
764 } else {
765 None
766 }
767 });
768
769 let values =
770 fields.iter().find_map(
771 |(name, value)| {
772 if name == "values" {
773 Some(value)
774 } else {
775 None
776 }
777 },
778 );
779
780 if let (Some(channel), Some(Value::Array(values))) = (channel, values) {
781 assert_eq!(values.len(), 2, "Channel {} should have 2 values", channel);
782
783 match channel.as_str() {
784 "doubles" => {
785 assert_eq!(values[0], Value::Union(0, Box::new(Value::Double(1.5))));
786 assert_eq!(values[1], Value::Union(0, Box::new(Value::Double(2.5))));
787 }
788 "strings" => {
789 assert_eq!(
790 values[0],
791 Value::Union(1, Box::new(Value::String("hello".to_string())))
792 );
793 assert_eq!(
794 values[1],
795 Value::Union(1, Box::new(Value::String("world".to_string())))
796 );
797 }
798 "longs" => {
799 assert_eq!(values[0], Value::Union(2, Box::new(Value::Long(42))));
800 assert_eq!(values[1], Value::Union(2, Box::new(Value::Long(-100))));
801 }
802 "double_arrays" => {
803 assert_eq!(
804 values[0],
805 Value::Union(
806 3,
807 Box::new(Value::Record(vec![(
808 "items".to_string(),
809 Value::Array(vec![
810 Value::Double(1.0),
811 Value::Double(2.0),
812 Value::Double(3.0)
813 ])
814 )]))
815 )
816 );
817 assert_eq!(
818 values[1],
819 Value::Union(
820 3,
821 Box::new(Value::Record(vec![(
822 "items".to_string(),
823 Value::Array(vec![Value::Double(4.0), Value::Double(5.0)])
824 )]))
825 )
826 );
827 }
828 "string_arrays" => {
829 assert_eq!(
830 values[0],
831 Value::Union(
832 4,
833 Box::new(Value::Record(vec![(
834 "items".to_string(),
835 Value::Array(vec![
836 Value::String("a".to_string()),
837 Value::String("b".to_string())
838 ])
839 )]))
840 )
841 );
842 assert_eq!(
843 values[1],
844 Value::Union(
845 4,
846 Box::new(Value::Record(vec![(
847 "items".to_string(),
848 Value::Array(vec![
849 Value::String("c".to_string()),
850 Value::String("d".to_string()),
851 Value::String("e".to_string())
852 ])
853 )]))
854 )
855 );
856 }
857 "structs" => {
858 assert_eq!(
859 values[0],
860 Value::Union(
861 5,
862 Box::new(Value::Record(vec![(
863 "json".to_string(),
864 Value::String(r#"{"key": "value"}"#.to_string())
865 )]))
866 )
867 );
868 assert_eq!(
869 values[1],
870 Value::Union(
871 5,
872 Box::new(Value::Record(vec![(
873 "json".to_string(),
874 Value::String(r#"{"count": 42}"#.to_string())
875 )]))
876 )
877 );
878 }
879 "uint64s" => {
880 assert_eq!(
882 values[0],
883 Value::Union(2, Box::new(Value::Long(u64::MAX as i64)))
884 );
885 assert_eq!(
886 values[1],
887 Value::Union(
888 2,
889 Box::new(Value::Long(12345678901234567890u64 as i64))
890 )
891 );
892 }
893 _ => panic!("Unexpected channel: {}", channel),
894 }
895 }
896 }
897 }
898 }
899
900 #[test]
901 fn reopening_path_with_overwrite_truncates_to_valid_avro_file() {
902 let tmp_file = NamedTempFile::new().unwrap();
910 let path: PathBuf = tmp_file.path().to_path_buf();
911
912 write_integer_points(&path, 500);
913 assert_eq!(read_integer_point_count(&path), 500);
914 let first_size = std::fs::metadata(&path).unwrap().len();
915
916 write_integer_points(&path, 5);
917 assert_eq!(read_integer_point_count(&path), 5);
918 let second_size = std::fs::metadata(&path).unwrap().len();
919
920 assert!(
921 second_size < first_size,
922 "second write should shrink the file (first: {first_size} bytes, second: {second_size} bytes)"
923 );
924 }
925
926 #[test]
927 fn dropping_consumer_flushes_buffered_records() {
928 let tmp_file = NamedTempFile::new().unwrap();
931 let path: PathBuf = tmp_file.path().to_path_buf();
932
933 {
934 let consumer = AvroFileConsumer::new_with_full_path(&path, true, None).unwrap();
935
936 let mut record = Record::new(&CORE_AVRO_SCHEMA).expect("Failed to create Avro record");
937 record.put("channel", "ch".to_string());
938 record.put("timestamps", Value::Array(vec![Value::Long(0)]));
939 record.put(
940 "values",
941 Value::Array(vec![Value::Union(2, Box::new(Value::Long(42)))]),
942 );
943 record.put("tags", HashMap::<String, String>::new());
944
945 consumer.writer.lock().append(record).unwrap();
946 }
949
950 assert_eq!(
951 read_integer_point_count(&path),
952 1,
953 "expected the buffered point to land on disk after the consumer dropped"
954 );
955 }
956
957 #[test]
958 fn new_with_full_path_errors_when_overwrite_false_and_path_exists() {
959 let tmp_file = NamedTempFile::new().unwrap();
962 let path: PathBuf = tmp_file.path().to_path_buf();
963 std::fs::write(&path, b"prior content").unwrap();
964
965 let err = AvroFileConsumer::new_with_full_path(&path, false, None)
966 .expect_err("expected AlreadyExists when overwrite=false and file exists");
967 assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
968
969 assert_eq!(std::fs::read(&path).unwrap(), b"prior content");
971 }
972
973 #[test]
974 fn new_with_full_path_succeeds_when_overwrite_false_and_path_missing() {
975 let tmp_dir = tempfile::tempdir().unwrap();
978 let path = tmp_dir.path().join("fresh.avro");
979
980 write_integer_points_with(&path, 3, false, None);
981 assert_eq!(read_integer_point_count(&path), 3);
982 }
983
984 #[test]
985 fn writes_dataset_rid_to_avro_user_metadata() {
986 let tmp_file = NamedTempFile::new().unwrap();
987 let path: PathBuf = tmp_file.path().to_path_buf();
988 let rid = ResourceIdentifier::new("ri.catalog.main.dataset.abc123").unwrap();
989
990 write_integer_points_with(&path, 1, true, Some(rid.clone()));
991
992 let stored = read_dataset_rid_metadata(&path).expect("dataset_rid metadata missing");
993 assert_eq!(stored, rid.to_string());
994 }
995
996 #[test]
997 fn omits_dataset_rid_metadata_when_none() {
998 let tmp_file = NamedTempFile::new().unwrap();
999 let path: PathBuf = tmp_file.path().to_path_buf();
1000
1001 write_integer_points_with(&path, 1, true, None);
1002
1003 assert!(read_dataset_rid_metadata(&path).is_none());
1004 }
1005
1006 fn write_integer_points(path: &PathBuf, count: i64) {
1007 write_integer_points_with(path, count, true, None);
1008 }
1009
1010 fn write_integer_points_with(
1011 path: &PathBuf,
1012 count: i64,
1013 overwrite: bool,
1014 dataset_rid: Option<ResourceIdentifier>,
1015 ) {
1016 let points = (0..count)
1017 .map(
1018 |i| nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
1019 timestamp: make_timestamp(i, 0),
1020 value: i,
1021 },
1022 )
1023 .collect();
1024 let consumer = AvroFileConsumer::new_with_full_path(path, overwrite, dataset_rid).unwrap();
1025 consumer
1026 .append_series(&[make_series(
1027 "ch",
1028 Points {
1029 points_type: Some(PointsType::IntegerPoints(IntegerPoints { points })),
1030 },
1031 )])
1032 .unwrap();
1033 }
1035
1036 fn read_integer_point_count(path: &PathBuf) -> usize {
1037 let reader = Reader::new(std::fs::File::open(path).unwrap()).unwrap();
1038 let mut total = 0;
1039 for record in reader {
1040 let Value::Record(fields) = record.unwrap() else {
1041 panic!("expected Record");
1042 };
1043 let timestamps = fields
1044 .iter()
1045 .find(|(name, _)| name == "timestamps")
1046 .map(|(_, v)| v)
1047 .unwrap();
1048 if let Value::Array(arr) = timestamps {
1049 total += arr.len();
1050 }
1051 }
1052 total
1053 }
1054
1055 fn read_dataset_rid_metadata(path: &PathBuf) -> Option<String> {
1056 let file = std::fs::File::open(path).unwrap();
1057 let reader = Reader::new(file).unwrap();
1058 reader
1059 .user_metadata()
1060 .get(DATASET_RID_METADATA_KEY)
1061 .map(|bytes| String::from_utf8(bytes.clone()).unwrap())
1062 }
1063}