Skip to main content

nominal_streaming/
consumer.rs

1use std::error::Error;
2use std::fmt::Debug;
3use std::fmt::Formatter;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::sync::LazyLock;
7
8use apache_avro::types::Record;
9use apache_avro::types::Value;
10use conjure_object::ResourceIdentifier;
11use nominal_api::tonic::google::protobuf::Timestamp;
12use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
13use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
14use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
15use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
16use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
17use nominal_api::tonic::io::nominal::scout::api::proto::Points;
18use nominal_api::tonic::io::nominal::scout::api::proto::Series;
19use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
20use nominal_api::tonic::io::nominal::scout::api::proto::StructPoints;
21use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
22use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
23use parking_lot::Mutex;
24use prost::Message;
25use tracing::warn;
26
27use crate::client::NominalApiClients;
28use crate::client::{self};
29use crate::listener::NominalStreamListener;
30use crate::types::AuthProvider;
31
32#[derive(Debug, thiserror::Error)]
33pub enum ConsumerError {
34    #[error("io error: {0}")]
35    IoError(#[from] std::io::Error),
36    #[error("avro error: {0}")]
37    AvroError(#[from] Box<apache_avro::Error>),
38    #[error("No auth token provided. Please make sure you're authenticated.")]
39    MissingTokenError,
40    #[error("request error: {0}")]
41    RequestError(String),
42    #[error("consumer error occurred: {0}")]
43    GenericConsumerError(#[from] Box<dyn Error + Send + Sync>),
44}
45
46pub type ConsumerResult<T> = Result<T, ConsumerError>;
47
48pub trait WriteRequestConsumer: Send + Sync + Debug {
49    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()>;
50}
51
52#[derive(Clone)]
53pub struct NominalCoreConsumer<A: AuthProvider> {
54    client: NominalApiClients,
55    handle: tokio::runtime::Handle,
56    auth_provider: A,
57    data_source_rid: ResourceIdentifier,
58}
59
60impl<A: AuthProvider> NominalCoreConsumer<A> {
61    pub fn new(
62        client: NominalApiClients,
63        handle: tokio::runtime::Handle,
64        auth_provider: A,
65        data_source_rid: ResourceIdentifier,
66    ) -> Self {
67        Self {
68            client,
69            handle,
70            auth_provider,
71            data_source_rid,
72        }
73    }
74}
75
76impl<T: AuthProvider> Debug for NominalCoreConsumer<T> {
77    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("NominalCoreConsumer")
79            .field("client", &self.client)
80            .field("data_source_rid", &self.data_source_rid)
81            .finish()
82    }
83}
84
85impl<T: AuthProvider + 'static> WriteRequestConsumer for NominalCoreConsumer<T> {
86    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
87        let token = self
88            .auth_provider
89            .token()
90            .ok_or(ConsumerError::MissingTokenError)?;
91        let write_request =
92            client::encode_request(request.encode_to_vec(), &token, &self.data_source_rid)?;
93        self.handle.block_on(async {
94            self.client
95                .send(write_request)
96                .await
97                .map_err(|e| ConsumerError::RequestError(format!("{e:?}")))
98        })?;
99        Ok(())
100    }
101}
102
103const DEFAULT_FILE_PREFIX: &str = "nominal_stream";
104
105pub const DATASET_RID_METADATA_KEY: &str = "nominal.dataset_rid";
106
107pub static CORE_SCHEMA_STR: &str = r#"{
108  "type": "record",
109  "name": "AvroStream",
110  "namespace": "io.nominal.ingest",
111  "fields": [
112      {
113          "name": "channel",
114          "type": "string",
115          "doc": "Channel/series name (e.g., 'vehicle_id', 'col_1', 'temperature')"
116      },
117      {
118          "name": "timestamps",
119          "type": {"type": "array", "items": "long"},
120          "doc": "Array of Unix timestamps in nanoseconds"
121      },
122      {
123          "name": "values",
124          "type": {"type": "array", "items": [
125              "double",
126              "string",
127              "long",
128              {"type": "record", "name": "DoubleArray", "fields": [{"name": "items", "type": {"type": "array", "items": "double"}}]},
129              {"type": "record", "name": "StringArray", "fields": [{"name": "items", "type": {"type": "array", "items": "string"}}]},
130              {"type": "record", "name": "JsonStruct", "fields": [{"name": "json", "type": "string"}]}
131          ]},
132          "doc": "Array of values. Can be doubles, longs, strings, arrays, or JSON structs"
133      },
134      {
135          "name": "tags",
136          "type": {"type": "map", "values": "string"},
137          "default": {},
138          "doc": "Key-value metadata tags"
139      }
140  ]
141}
142"#;
143
144pub static CORE_AVRO_SCHEMA: LazyLock<apache_avro::Schema> = LazyLock::new(|| {
145    let json = serde_json::from_str(CORE_SCHEMA_STR).expect("Failed to parse JSON schema");
146    apache_avro::Schema::parse(&json).expect("Failed to parse Avro schema")
147});
148
149#[derive(Clone)]
150pub struct AvroFileConsumer {
151    writer: Arc<Mutex<apache_avro::Writer<'static, std::fs::File>>>,
152    path: PathBuf,
153}
154
155impl Debug for AvroFileConsumer {
156    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
157        f.debug_struct("AvroFileConsumer")
158            .field("path", &self.path)
159            .finish()
160    }
161}
162
163impl AvroFileConsumer {
164    pub fn new(
165        directory: impl Into<PathBuf>,
166        file_prefix: Option<String>,
167        dataset_rid: Option<ResourceIdentifier>,
168    ) -> std::io::Result<Self> {
169        let datetime = chrono::Utc::now().format("%Y%m%d_%H%M%S").to_string();
170        let prefix = file_prefix.unwrap_or_else(|| DEFAULT_FILE_PREFIX.to_string());
171        let filename = format!("{prefix}_{datetime}.avro");
172        let directory = directory.into();
173        let full_path = directory.join(&filename);
174
175        Self::new_with_full_path(full_path, true, dataset_rid)
176    }
177
178    /// Opens `file_path` for writing and wraps it in an avro `Writer`.
179    ///
180    /// If `overwrite` is true and the path already exists, its prior contents
181    /// are discarded. Truncation is required when reusing a path: the avro
182    /// container format is single-header-and-blocks, so opening a longer
183    /// existing file without truncating would leave leftover bytes from the
184    /// previous run past the new content's end and produce a corrupt reader
185    /// stream.
186    ///
187    /// If `overwrite` is false and the path already exists, an
188    /// `io::ErrorKind::AlreadyExists` error is returned and no file is
189    /// touched. This is the safe choice when the caller does not want to
190    /// silently destroy prior data.
191    ///
192    /// If `dataset_rid` is provided, it is written to the avro file's user
193    /// metadata under the `nominal.dataset_rid` key so downstream readers
194    /// can identify the dataset the file belongs to.
195    pub fn new_with_full_path(
196        file_path: impl Into<PathBuf>,
197        overwrite: bool,
198        dataset_rid: Option<ResourceIdentifier>,
199    ) -> std::io::Result<Self> {
200        let path = file_path.into();
201        std::fs::create_dir_all(path.parent().unwrap_or(&path))?;
202        let mut options = std::fs::OpenOptions::new();
203        options.write(true);
204        if overwrite {
205            options.create(true).truncate(true);
206        } else {
207            options.create_new(true);
208        }
209        let file = options.open(&path)?;
210
211        let mut writer = apache_avro::Writer::builder()
212            .schema(&CORE_AVRO_SCHEMA)
213            .writer(file)
214            .codec(apache_avro::Codec::Snappy)
215            .build();
216
217        if let Some(rid) = dataset_rid {
218            writer
219                .add_user_metadata(DATASET_RID_METADATA_KEY.to_string(), rid.to_string())
220                .map_err(|e| {
221                    std::io::Error::other(format!("failed to write avro metadata: {e}"))
222                })?;
223        }
224
225        Ok(Self {
226            writer: Arc::new(Mutex::new(writer)),
227            path,
228        })
229    }
230
231    fn append_series(&self, series: &[Series]) -> ConsumerResult<()> {
232        let mut records: Vec<Record> = Vec::new();
233        for series in series {
234            let (timestamps, values) = points_to_avro(series.points.as_ref());
235
236            let mut record = Record::new(&CORE_AVRO_SCHEMA).expect("Failed to create Avro record");
237
238            record.put(
239                "channel",
240                series
241                    .channel
242                    .as_ref()
243                    .map(|c| c.name.clone())
244                    .unwrap_or("values".to_string()),
245            );
246            record.put("timestamps", Value::Array(timestamps));
247            record.put("values", Value::Array(values));
248            record.put("tags", series.tags.clone());
249
250            records.push(record);
251        }
252
253        self.writer
254            .lock()
255            .extend(records)
256            .map_err(|e| ConsumerError::AvroError(Box::new(e)))?;
257
258        Ok(())
259    }
260}
261
262fn points_to_avro(points: Option<&Points>) -> (Vec<Value>, Vec<Value>) {
263    let Some(Points {
264        points_type: Some(points),
265    }) = points
266    else {
267        return (Vec::new(), Vec::new());
268    };
269
270    match points {
271        PointsType::DoublePoints(DoublePoints { points }) => points
272            .iter()
273            .map(|point| {
274                (
275                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
276                    Value::Union(0, Box::new(Value::Double(point.value))),
277                )
278            })
279            .collect(),
280        PointsType::StringPoints(StringPoints { points }) => points
281            .iter()
282            .map(|point| {
283                (
284                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
285                    Value::Union(1, Box::new(Value::String(point.value.clone()))),
286                )
287            })
288            .collect(),
289        PointsType::IntegerPoints(IntegerPoints { points }) => points
290            .iter()
291            .map(|point| {
292                (
293                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
294                    Value::Union(2, Box::new(Value::Long(point.value))),
295                )
296            })
297            .collect(),
298        PointsType::ArrayPoints(ArrayPoints { array_type }) => match array_type {
299            Some(ArrayType::DoubleArrayPoints(points)) => points
300                .points
301                .iter()
302                .map(|point| {
303                    let array_values: Vec<Value> =
304                        point.value.iter().map(|v| Value::Double(*v)).collect();
305                    let record =
306                        Value::Record(vec![("items".to_string(), Value::Array(array_values))]);
307                    (
308                        convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
309                        Value::Union(3, Box::new(record)),
310                    )
311                })
312                .collect(),
313            Some(ArrayType::StringArrayPoints(points)) => points
314                .points
315                .iter()
316                .map(|point| {
317                    let array_values: Vec<Value> = point
318                        .value
319                        .iter()
320                        .map(|v| Value::String(v.clone()))
321                        .collect();
322                    let record =
323                        Value::Record(vec![("items".to_string(), Value::Array(array_values))]);
324                    (
325                        convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
326                        Value::Union(4, Box::new(record)),
327                    )
328                })
329                .collect(),
330            None => (Vec::new(), Vec::new()),
331        },
332        PointsType::StructPoints(StructPoints { points }) => points
333            .iter()
334            .map(|point| {
335                let record = Value::Record(vec![(
336                    "json".to_string(),
337                    Value::String(point.json_string.clone()),
338                )]);
339                (
340                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
341                    Value::Union(5, Box::new(record)),
342                )
343            })
344            .collect(),
345        PointsType::Uint64Points(Uint64Points { points }) => points
346            .iter()
347            .map(|point| {
348                (
349                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
350                    Value::Union(2, Box::new(Value::Long(point.value as i64))),
351                )
352            })
353            .collect(),
354    }
355}
356
357fn convert_timestamp_to_nanoseconds(timestamp: Timestamp) -> Value {
358    Value::Long(timestamp.seconds * 1_000_000_000 + timestamp.nanos as i64)
359}
360
361impl WriteRequestConsumer for AvroFileConsumer {
362    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
363        self.append_series(&request.series)?;
364        Ok(())
365    }
366}
367
368impl Drop for AvroFileConsumer {
369    /// Defensive flush-on-drop. In normal operation, records reach disk via
370    /// `append_series` → `apache_avro::Writer::extend`, which flushes at the
371    /// end of every call. But `apache_avro::Writer` itself does not flush on
372    /// drop, so any code path that bypasses `extend` (e.g. a direct
373    /// `Writer::append`, or a future writer call that forgets to flush) would
374    /// silently lose buffered records when the consumer goes out of scope.
375    /// This impl makes that failure mode impossible regardless of how the
376    /// inner writer is driven.
377    fn drop(&mut self) {
378        if let Err(e) = self.writer.lock().flush() {
379            warn!(
380                "failed to flush avro writer for {:?} on drop: {e:?}",
381                self.path
382            );
383        }
384    }
385}
386
387#[derive(Clone)]
388pub struct RequestConsumerWithFallback<P, F>
389where
390    P: WriteRequestConsumer,
391    F: WriteRequestConsumer,
392{
393    primary: P,
394    fallback: F,
395}
396
397impl<P, F> RequestConsumerWithFallback<P, F>
398where
399    P: WriteRequestConsumer,
400    F: WriteRequestConsumer,
401{
402    pub fn new(primary: P, fallback: F) -> Self {
403        Self { primary, fallback }
404    }
405}
406
407impl<P, F> Debug for RequestConsumerWithFallback<P, F>
408where
409    F: Send + Sync + WriteRequestConsumer,
410    P: Send + Sync + WriteRequestConsumer,
411{
412    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
413        f.debug_struct("RequestConsumerWithFallback")
414            .field("primary", &self.primary)
415            .field("fallback", &self.fallback)
416            .finish()
417    }
418}
419
420#[derive(Debug, Clone)]
421pub struct DualWriteRequestConsumer<P, S>
422where
423    P: WriteRequestConsumer,
424    S: WriteRequestConsumer,
425{
426    primary: P,
427    secondary: S,
428}
429
430impl<P, S> DualWriteRequestConsumer<P, S>
431where
432    P: WriteRequestConsumer,
433    S: WriteRequestConsumer,
434{
435    pub fn new(primary: P, secondary: S) -> Self {
436        Self { primary, secondary }
437    }
438}
439
440impl<P, S> WriteRequestConsumer for DualWriteRequestConsumer<P, S>
441where
442    P: WriteRequestConsumer + Send + Sync,
443    S: WriteRequestConsumer + Send + Sync,
444{
445    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
446        let primary_result = self.primary.consume(request);
447        let secondary_result = self.secondary.consume(request);
448        if let Err(e) = &primary_result {
449            warn!("Sending request to primary consumer failed: {:?}", e);
450        }
451        if let Err(e) = &secondary_result {
452            warn!("Sending request to secondary consumer failed: {:?}", e);
453        }
454
455        // If either failed, return the error
456        primary_result.and(secondary_result)
457    }
458}
459
460impl<P, F> WriteRequestConsumer for RequestConsumerWithFallback<P, F>
461where
462    P: WriteRequestConsumer + Send + Sync,
463    F: WriteRequestConsumer + Send + Sync,
464{
465    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
466        if let Err(e) = self.primary.consume(request) {
467            warn!("Sending request to primary consumer failed. Attempting fallback.");
468            let fallback_result = self.fallback.consume(request);
469            // we want to notify the caller about the missing token error as it is a user error
470            // todo: get rid of this once we figure out why the auth handle blocks in connect
471            if let ConsumerError::MissingTokenError = e {
472                return Err(ConsumerError::MissingTokenError);
473            }
474            return fallback_result;
475        }
476        Ok(())
477    }
478}
479
480#[derive(Debug, Clone)]
481pub struct ListeningWriteRequestConsumer<C>
482where
483    C: WriteRequestConsumer,
484{
485    consumer: C,
486    listeners: Vec<Arc<dyn NominalStreamListener>>,
487}
488
489impl<C> ListeningWriteRequestConsumer<C>
490where
491    C: WriteRequestConsumer,
492{
493    pub fn new(consumer: C, listeners: Vec<Arc<dyn NominalStreamListener>>) -> Self {
494        Self {
495            consumer,
496            listeners,
497        }
498    }
499}
500
501impl<C> WriteRequestConsumer for ListeningWriteRequestConsumer<C>
502where
503    C: WriteRequestConsumer + Send + Sync,
504{
505    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
506        match self.consumer.consume(request) {
507            Ok(_) => {
508                self.listeners.on_success(request);
509                Ok(())
510            }
511            Err(e) => {
512                self.listeners.on_error(&e, request);
513                Err(e)
514            }
515        }
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use std::collections::HashMap;
522
523    use apache_avro::Reader;
524    use nominal_api::tonic::google::protobuf::Timestamp;
525    use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
526    use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
527    use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
528    use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
529    use tempfile::NamedTempFile;
530
531    use super::*;
532
533    fn make_timestamp(secs: i64, nanos: i32) -> Option<Timestamp> {
534        Some(Timestamp {
535            seconds: secs,
536            nanos,
537        })
538    }
539
540    fn make_series(name: &str, points: Points) -> Series {
541        Series {
542            channel: Some(Channel {
543                name: name.to_string(),
544            }),
545            tags: HashMap::new(),
546            points: Some(points),
547        }
548    }
549
550    #[test]
551    fn test_avro_file_with_all_value_types() {
552        let tmp_file = NamedTempFile::new().unwrap();
553        let path: PathBuf = tmp_file.path().to_path_buf();
554
555        // Create consumer and write all types
556        {
557            let consumer = AvroFileConsumer::new_with_full_path(&path, true, None).unwrap();
558
559            // Create series with each type
560            let double_series = make_series(
561                "doubles",
562                Points {
563                    points_type: Some(PointsType::DoublePoints(DoublePoints {
564                        points: vec![
565                            nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint {
566                                timestamp: make_timestamp(1000, 0),
567                                value: 1.5,
568                            },
569                            nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint {
570                                timestamp: make_timestamp(1001, 0),
571                                value: 2.5,
572                            },
573                        ],
574                    })),
575                },
576            );
577
578            let long_series = make_series(
579                "longs",
580                Points {
581                    points_type: Some(PointsType::IntegerPoints(IntegerPoints {
582                        points: vec![
583                            nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
584                                timestamp: make_timestamp(1000, 0),
585                                value: 42,
586                            },
587                            nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
588                                timestamp: make_timestamp(1001, 0),
589                                value: -100,
590                            },
591                        ],
592                    })),
593                },
594            );
595
596            let string_series = make_series(
597                "strings",
598                Points {
599                    points_type: Some(PointsType::StringPoints(StringPoints {
600                        points: vec![
601                            nominal_api::tonic::io::nominal::scout::api::proto::StringPoint {
602                                timestamp: make_timestamp(1000, 0),
603                                value: "hello".to_string(),
604                            },
605                            nominal_api::tonic::io::nominal::scout::api::proto::StringPoint {
606                                timestamp: make_timestamp(1001, 0),
607                                value: "world".to_string(),
608                            },
609                        ],
610                    })),
611                },
612            );
613
614            let double_array_series = make_series(
615                "double_arrays",
616                Points {
617                    points_type: Some(PointsType::ArrayPoints(ArrayPoints {
618                        array_type: Some(ArrayType::DoubleArrayPoints(
619                            nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoints {
620                                points: vec![
621                                    DoubleArrayPoint {
622                                        timestamp: make_timestamp(1000, 0),
623                                        value: vec![1.0, 2.0, 3.0],
624                                    },
625                                    DoubleArrayPoint {
626                                        timestamp: make_timestamp(1001, 0),
627                                        value: vec![4.0, 5.0],
628                                    },
629                                ],
630                            },
631                        )),
632                    })),
633                },
634            );
635
636            let string_array_series = make_series(
637                "string_arrays",
638                Points {
639                    points_type: Some(PointsType::ArrayPoints(ArrayPoints {
640                        array_type: Some(ArrayType::StringArrayPoints(
641                            nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoints {
642                                points: vec![
643                                    StringArrayPoint {
644                                        timestamp: make_timestamp(1000, 0),
645                                        value: vec!["a".to_string(), "b".to_string()],
646                                    },
647                                    StringArrayPoint {
648                                        timestamp: make_timestamp(1001, 0),
649                                        value: vec![
650                                            "c".to_string(),
651                                            "d".to_string(),
652                                            "e".to_string(),
653                                        ],
654                                    },
655                                ],
656                            },
657                        )),
658                    })),
659                },
660            );
661
662            let struct_series = make_series(
663                "structs",
664                Points {
665                    points_type: Some(PointsType::StructPoints(StructPoints {
666                        points: vec![
667                            nominal_api::tonic::io::nominal::scout::api::proto::StructPoint {
668                                timestamp: make_timestamp(1000, 0),
669                                json_string: r#"{"key": "value"}"#.to_string(),
670                            },
671                            nominal_api::tonic::io::nominal::scout::api::proto::StructPoint {
672                                timestamp: make_timestamp(1001, 0),
673                                json_string: r#"{"count": 42}"#.to_string(),
674                            },
675                        ],
676                    })),
677                },
678            );
679
680            let uint64_series = make_series(
681                "uint64s",
682                Points {
683                    points_type: Some(PointsType::Uint64Points(Uint64Points {
684                        points: vec![
685                            nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point {
686                                timestamp: make_timestamp(1000, 0),
687                                value: u64::MAX,
688                            },
689                            nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point {
690                                timestamp: make_timestamp(1001, 0),
691                                value: 12345678901234567890,
692                            },
693                        ],
694                    })),
695                },
696            );
697
698            let request = WriteRequestNominal {
699                series: vec![
700                    double_series,
701                    long_series,
702                    string_series,
703                    double_array_series,
704                    string_array_series,
705                    struct_series,
706                    uint64_series,
707                ],
708                session_name: None,
709            };
710
711            consumer.consume(&request).unwrap();
712
713            // Flush the writer by dropping it
714            drop(consumer);
715        }
716
717        // Read back the file and verify
718        let file = std::fs::File::open(&path).unwrap();
719        let reader = Reader::new(file).unwrap();
720
721        let records: Vec<_> = reader.map(|r| r.unwrap()).collect();
722        assert_eq!(records.len(), 7, "Expected 7 series records");
723
724        // Verify each record has the expected channel name and value types
725        let channels: Vec<String> = records
726            .iter()
727            .filter_map(|r| {
728                if let Value::Record(fields) = r {
729                    fields.iter().find_map(|(name, value)| {
730                        if name == "channel" {
731                            if let Value::String(s) = value {
732                                Some(s.clone())
733                            } else {
734                                None
735                            }
736                        } else {
737                            None
738                        }
739                    })
740                } else {
741                    None
742                }
743            })
744            .collect();
745
746        assert!(channels.contains(&"doubles".to_string()));
747        assert!(channels.contains(&"longs".to_string()));
748        assert!(channels.contains(&"strings".to_string()));
749        assert!(channels.contains(&"double_arrays".to_string()));
750        assert!(channels.contains(&"string_arrays".to_string()));
751        assert!(channels.contains(&"structs".to_string()));
752        assert!(channels.contains(&"uint64s".to_string()));
753
754        // Verify specific value types by checking the union discriminants
755        for record in &records {
756            if let Value::Record(fields) = record {
757                let channel = fields.iter().find_map(|(name, value)| {
758                    if name == "channel" {
759                        if let Value::String(s) = value {
760                            Some(s.clone())
761                        } else {
762                            None
763                        }
764                    } else {
765                        None
766                    }
767                });
768
769                let values =
770                    fields.iter().find_map(
771                        |(name, value)| {
772                            if name == "values" {
773                                Some(value)
774                            } else {
775                                None
776                            }
777                        },
778                    );
779
780                if let (Some(channel), Some(Value::Array(values))) = (channel, values) {
781                    assert_eq!(values.len(), 2, "Channel {} should have 2 values", channel);
782
783                    match channel.as_str() {
784                        "doubles" => {
785                            assert_eq!(values[0], Value::Union(0, Box::new(Value::Double(1.5))));
786                            assert_eq!(values[1], Value::Union(0, Box::new(Value::Double(2.5))));
787                        }
788                        "strings" => {
789                            assert_eq!(
790                                values[0],
791                                Value::Union(1, Box::new(Value::String("hello".to_string())))
792                            );
793                            assert_eq!(
794                                values[1],
795                                Value::Union(1, Box::new(Value::String("world".to_string())))
796                            );
797                        }
798                        "longs" => {
799                            assert_eq!(values[0], Value::Union(2, Box::new(Value::Long(42))));
800                            assert_eq!(values[1], Value::Union(2, Box::new(Value::Long(-100))));
801                        }
802                        "double_arrays" => {
803                            assert_eq!(
804                                values[0],
805                                Value::Union(
806                                    3,
807                                    Box::new(Value::Record(vec![(
808                                        "items".to_string(),
809                                        Value::Array(vec![
810                                            Value::Double(1.0),
811                                            Value::Double(2.0),
812                                            Value::Double(3.0)
813                                        ])
814                                    )]))
815                                )
816                            );
817                            assert_eq!(
818                                values[1],
819                                Value::Union(
820                                    3,
821                                    Box::new(Value::Record(vec![(
822                                        "items".to_string(),
823                                        Value::Array(vec![Value::Double(4.0), Value::Double(5.0)])
824                                    )]))
825                                )
826                            );
827                        }
828                        "string_arrays" => {
829                            assert_eq!(
830                                values[0],
831                                Value::Union(
832                                    4,
833                                    Box::new(Value::Record(vec![(
834                                        "items".to_string(),
835                                        Value::Array(vec![
836                                            Value::String("a".to_string()),
837                                            Value::String("b".to_string())
838                                        ])
839                                    )]))
840                                )
841                            );
842                            assert_eq!(
843                                values[1],
844                                Value::Union(
845                                    4,
846                                    Box::new(Value::Record(vec![(
847                                        "items".to_string(),
848                                        Value::Array(vec![
849                                            Value::String("c".to_string()),
850                                            Value::String("d".to_string()),
851                                            Value::String("e".to_string())
852                                        ])
853                                    )]))
854                                )
855                            );
856                        }
857                        "structs" => {
858                            assert_eq!(
859                                values[0],
860                                Value::Union(
861                                    5,
862                                    Box::new(Value::Record(vec![(
863                                        "json".to_string(),
864                                        Value::String(r#"{"key": "value"}"#.to_string())
865                                    )]))
866                                )
867                            );
868                            assert_eq!(
869                                values[1],
870                                Value::Union(
871                                    5,
872                                    Box::new(Value::Record(vec![(
873                                        "json".to_string(),
874                                        Value::String(r#"{"count": 42}"#.to_string())
875                                    )]))
876                                )
877                            );
878                        }
879                        "uint64s" => {
880                            // u64::MAX as i64 is -1, 12345678901234567890u64 as i64 is negative
881                            assert_eq!(
882                                values[0],
883                                Value::Union(2, Box::new(Value::Long(u64::MAX as i64)))
884                            );
885                            assert_eq!(
886                                values[1],
887                                Value::Union(
888                                    2,
889                                    Box::new(Value::Long(12345678901234567890u64 as i64))
890                                )
891                            );
892                        }
893                        _ => panic!("Unexpected channel: {}", channel),
894                    }
895                }
896            }
897        }
898    }
899
900    #[test]
901    fn reopening_path_with_overwrite_truncates_to_valid_avro_file() {
902        // Write 500 points, then re-open the same path with overwrite=true
903        // and write 5 points. Both passes must produce a file that reads back
904        // cleanly with the expected point count, AND the second write must
905        // shrink the file at the filesystem level — not just produce a
906        // readable record count. Without truncate, the second pass would
907        // overwrite from offset 0 and leave the tail of the longer first
908        // file intact, corrupting the reader stream.
909        let tmp_file = NamedTempFile::new().unwrap();
910        let path: PathBuf = tmp_file.path().to_path_buf();
911
912        write_integer_points(&path, 500);
913        assert_eq!(read_integer_point_count(&path), 500);
914        let first_size = std::fs::metadata(&path).unwrap().len();
915
916        write_integer_points(&path, 5);
917        assert_eq!(read_integer_point_count(&path), 5);
918        let second_size = std::fs::metadata(&path).unwrap().len();
919
920        assert!(
921            second_size < first_size,
922            "second write should shrink the file (first: {first_size} bytes, second: {second_size} bytes)"
923        );
924    }
925
926    #[test]
927    fn dropping_consumer_flushes_buffered_records() {
928        // Defensive test against future misuse of avro api (writing without flushing).
929        // Current stream implementation uses .extend(), which flushes internally.
930        let tmp_file = NamedTempFile::new().unwrap();
931        let path: PathBuf = tmp_file.path().to_path_buf();
932
933        {
934            let consumer = AvroFileConsumer::new_with_full_path(&path, true, None).unwrap();
935
936            let mut record = Record::new(&CORE_AVRO_SCHEMA).expect("Failed to create Avro record");
937            record.put("channel", "ch".to_string());
938            record.put("timestamps", Value::Array(vec![Value::Long(0)]));
939            record.put(
940                "values",
941                Value::Array(vec![Value::Union(2, Box::new(Value::Long(42)))]),
942            );
943            record.put("tags", HashMap::<String, String>::new());
944
945            consumer.writer.lock().append(record).unwrap();
946            // consumer drops here — the only thing that can land the buffered
947            // record on disk is a flush from the Drop impl.
948        }
949
950        assert_eq!(
951            read_integer_point_count(&path),
952            1,
953            "expected the buffered point to land on disk after the consumer dropped"
954        );
955    }
956
957    #[test]
958    fn new_with_full_path_errors_when_overwrite_false_and_path_exists() {
959        // Pre-create a file at the target path; opening with overwrite=false
960        // must fail rather than silently destroying the existing data.
961        let tmp_file = NamedTempFile::new().unwrap();
962        let path: PathBuf = tmp_file.path().to_path_buf();
963        std::fs::write(&path, b"prior content").unwrap();
964
965        let err = AvroFileConsumer::new_with_full_path(&path, false, None)
966            .expect_err("expected AlreadyExists when overwrite=false and file exists");
967        assert_eq!(err.kind(), std::io::ErrorKind::AlreadyExists);
968
969        // Pre-existing bytes must be untouched.
970        assert_eq!(std::fs::read(&path).unwrap(), b"prior content");
971    }
972
973    #[test]
974    fn new_with_full_path_succeeds_when_overwrite_false_and_path_missing() {
975        // overwrite=false should still create a brand-new file; the guard is
976        // only against clobbering existing content.
977        let tmp_dir = tempfile::tempdir().unwrap();
978        let path = tmp_dir.path().join("fresh.avro");
979
980        write_integer_points_with(&path, 3, false, None);
981        assert_eq!(read_integer_point_count(&path), 3);
982    }
983
984    #[test]
985    fn writes_dataset_rid_to_avro_user_metadata() {
986        let tmp_file = NamedTempFile::new().unwrap();
987        let path: PathBuf = tmp_file.path().to_path_buf();
988        let rid = ResourceIdentifier::new("ri.catalog.main.dataset.abc123").unwrap();
989
990        write_integer_points_with(&path, 1, true, Some(rid.clone()));
991
992        let stored = read_dataset_rid_metadata(&path).expect("dataset_rid metadata missing");
993        assert_eq!(stored, rid.to_string());
994    }
995
996    #[test]
997    fn omits_dataset_rid_metadata_when_none() {
998        let tmp_file = NamedTempFile::new().unwrap();
999        let path: PathBuf = tmp_file.path().to_path_buf();
1000
1001        write_integer_points_with(&path, 1, true, None);
1002
1003        assert!(read_dataset_rid_metadata(&path).is_none());
1004    }
1005
1006    fn write_integer_points(path: &PathBuf, count: i64) {
1007        write_integer_points_with(path, count, true, None);
1008    }
1009
1010    fn write_integer_points_with(
1011        path: &PathBuf,
1012        count: i64,
1013        overwrite: bool,
1014        dataset_rid: Option<ResourceIdentifier>,
1015    ) {
1016        let points = (0..count)
1017            .map(
1018                |i| nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
1019                    timestamp: make_timestamp(i, 0),
1020                    value: i,
1021                },
1022            )
1023            .collect();
1024        let consumer = AvroFileConsumer::new_with_full_path(path, overwrite, dataset_rid).unwrap();
1025        consumer
1026            .append_series(&[make_series(
1027                "ch",
1028                Points {
1029                    points_type: Some(PointsType::IntegerPoints(IntegerPoints { points })),
1030                },
1031            )])
1032            .unwrap();
1033        // Consumer drops at end of scope, flushing the avro writer to disk.
1034    }
1035
1036    fn read_integer_point_count(path: &PathBuf) -> usize {
1037        let reader = Reader::new(std::fs::File::open(path).unwrap()).unwrap();
1038        let mut total = 0;
1039        for record in reader {
1040            let Value::Record(fields) = record.unwrap() else {
1041                panic!("expected Record");
1042            };
1043            let timestamps = fields
1044                .iter()
1045                .find(|(name, _)| name == "timestamps")
1046                .map(|(_, v)| v)
1047                .unwrap();
1048            if let Value::Array(arr) = timestamps {
1049                total += arr.len();
1050            }
1051        }
1052        total
1053    }
1054
1055    fn read_dataset_rid_metadata(path: &PathBuf) -> Option<String> {
1056        let file = std::fs::File::open(path).unwrap();
1057        let reader = Reader::new(file).unwrap();
1058        reader
1059            .user_metadata()
1060            .get(DATASET_RID_METADATA_KEY)
1061            .map(|bytes| String::from_utf8(bytes.clone()).unwrap())
1062    }
1063}