Skip to main content

nominal_streaming/
consumer.rs

1use std::error::Error;
2use std::fmt::Debug;
3use std::fmt::Formatter;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::sync::LazyLock;
7
8use apache_avro::types::Record;
9use apache_avro::types::Value;
10use conjure_object::ResourceIdentifier;
11use nominal_api::tonic::google::protobuf::Timestamp;
12use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
13use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
14use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
15use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
16use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
17use nominal_api::tonic::io::nominal::scout::api::proto::Points;
18use nominal_api::tonic::io::nominal::scout::api::proto::Series;
19use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
20use nominal_api::tonic::io::nominal::scout::api::proto::StructPoints;
21use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
22use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
23use parking_lot::Mutex;
24use prost::Message;
25use tracing::warn;
26
27use crate::client::NominalApiClients;
28use crate::client::{self};
29use crate::listener::NominalStreamListener;
30use crate::types::AuthProvider;
31
32#[derive(Debug, thiserror::Error)]
33pub enum ConsumerError {
34    #[error("io error: {0}")]
35    IoError(#[from] std::io::Error),
36    #[error("avro error: {0}")]
37    AvroError(#[from] Box<apache_avro::Error>),
38    #[error("No auth token provided. Please make sure you're authenticated.")]
39    MissingTokenError,
40    #[error("request error: {0}")]
41    RequestError(String),
42    #[error("consumer error occurred: {0}")]
43    GenericConsumerError(#[from] Box<dyn Error + Send + Sync>),
44}
45
46pub type ConsumerResult<T> = Result<T, ConsumerError>;
47
48pub trait WriteRequestConsumer: Send + Sync + Debug {
49    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()>;
50}
51
52#[derive(Clone)]
53pub struct NominalCoreConsumer<A: AuthProvider> {
54    client: NominalApiClients,
55    handle: tokio::runtime::Handle,
56    auth_provider: A,
57    data_source_rid: ResourceIdentifier,
58}
59
60impl<A: AuthProvider> NominalCoreConsumer<A> {
61    pub fn new(
62        client: NominalApiClients,
63        handle: tokio::runtime::Handle,
64        auth_provider: A,
65        data_source_rid: ResourceIdentifier,
66    ) -> Self {
67        Self {
68            client,
69            handle,
70            auth_provider,
71            data_source_rid,
72        }
73    }
74}
75
76impl<T: AuthProvider> Debug for NominalCoreConsumer<T> {
77    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("NominalCoreConsumer")
79            .field("client", &self.client)
80            .field("data_source_rid", &self.data_source_rid)
81            .finish()
82    }
83}
84
85impl<T: AuthProvider + 'static> WriteRequestConsumer for NominalCoreConsumer<T> {
86    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
87        let token = self
88            .auth_provider
89            .token()
90            .ok_or(ConsumerError::MissingTokenError)?;
91        let write_request =
92            client::encode_request(request.encode_to_vec(), &token, &self.data_source_rid)?;
93        self.handle.block_on(async {
94            self.client
95                .send(write_request)
96                .await
97                .map_err(|e| ConsumerError::RequestError(format!("{e:?}")))
98        })?;
99        Ok(())
100    }
101}
102
103const DEFAULT_FILE_PREFIX: &str = "nominal_stream";
104
105pub static CORE_SCHEMA_STR: &str = r#"{
106  "type": "record",
107  "name": "AvroStream",
108  "namespace": "io.nominal.ingest",
109  "fields": [
110      {
111          "name": "channel",
112          "type": "string",
113          "doc": "Channel/series name (e.g., 'vehicle_id', 'col_1', 'temperature')"
114      },
115      {
116          "name": "timestamps",
117          "type": {"type": "array", "items": "long"},
118          "doc": "Array of Unix timestamps in nanoseconds"
119      },
120      {
121          "name": "values",
122          "type": {"type": "array", "items": [
123              "double",
124              "string",
125              "long",
126              {"type": "record", "name": "DoubleArray", "fields": [{"name": "items", "type": {"type": "array", "items": "double"}}]},
127              {"type": "record", "name": "StringArray", "fields": [{"name": "items", "type": {"type": "array", "items": "string"}}]},
128              {"type": "record", "name": "JsonStruct", "fields": [{"name": "json", "type": "string"}]}
129          ]},
130          "doc": "Array of values. Can be doubles, longs, strings, arrays, or JSON structs"
131      },
132      {
133          "name": "tags",
134          "type": {"type": "map", "values": "string"},
135          "default": {},
136          "doc": "Key-value metadata tags"
137      }
138  ]
139}
140"#;
141
142pub static CORE_AVRO_SCHEMA: LazyLock<apache_avro::Schema> = LazyLock::new(|| {
143    let json = serde_json::from_str(CORE_SCHEMA_STR).expect("Failed to parse JSON schema");
144    apache_avro::Schema::parse(&json).expect("Failed to parse Avro schema")
145});
146
147#[derive(Clone)]
148pub struct AvroFileConsumer {
149    writer: Arc<Mutex<apache_avro::Writer<'static, std::fs::File>>>,
150    path: PathBuf,
151}
152
153impl Debug for AvroFileConsumer {
154    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155        f.debug_struct("AvroFileConsumer")
156            .field("path", &self.path)
157            .finish()
158    }
159}
160
161impl AvroFileConsumer {
162    pub fn new(
163        directory: impl Into<PathBuf>,
164        file_prefix: Option<String>,
165    ) -> std::io::Result<Self> {
166        let datetime = chrono::Utc::now().format("%Y%m%d_%H%M%S").to_string();
167        let prefix = file_prefix.unwrap_or_else(|| DEFAULT_FILE_PREFIX.to_string());
168        let filename = format!("{prefix}_{datetime}.avro");
169        let directory = directory.into();
170        let full_path = directory.join(&filename);
171
172        Self::new_with_full_path(full_path)
173    }
174
175    pub fn new_with_full_path(file_path: impl Into<PathBuf>) -> std::io::Result<Self> {
176        let path = file_path.into();
177        std::fs::create_dir_all(path.parent().unwrap_or(&path))?;
178        let file = std::fs::OpenOptions::new()
179            .create(true)
180            .truncate(false)
181            .write(true)
182            .open(&path)?;
183
184        let writer = apache_avro::Writer::builder()
185            .schema(&CORE_AVRO_SCHEMA)
186            .writer(file)
187            .codec(apache_avro::Codec::Snappy)
188            .build();
189
190        Ok(Self {
191            writer: Arc::new(Mutex::new(writer)),
192            path,
193        })
194    }
195
196    fn append_series(&self, series: &[Series]) -> ConsumerResult<()> {
197        let mut records: Vec<Record> = Vec::new();
198        for series in series {
199            let (timestamps, values) = points_to_avro(series.points.as_ref());
200
201            let mut record = Record::new(&CORE_AVRO_SCHEMA).expect("Failed to create Avro record");
202
203            record.put(
204                "channel",
205                series
206                    .channel
207                    .as_ref()
208                    .map(|c| c.name.clone())
209                    .unwrap_or("values".to_string()),
210            );
211            record.put("timestamps", Value::Array(timestamps));
212            record.put("values", Value::Array(values));
213            record.put("tags", series.tags.clone());
214
215            records.push(record);
216        }
217
218        self.writer
219            .lock()
220            .extend(records)
221            .map_err(|e| ConsumerError::AvroError(Box::new(e)))?;
222
223        Ok(())
224    }
225}
226
227fn points_to_avro(points: Option<&Points>) -> (Vec<Value>, Vec<Value>) {
228    let Some(Points {
229        points_type: Some(points),
230    }) = points
231    else {
232        return (Vec::new(), Vec::new());
233    };
234
235    match points {
236        PointsType::DoublePoints(DoublePoints { points }) => points
237            .iter()
238            .map(|point| {
239                (
240                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
241                    Value::Union(0, Box::new(Value::Double(point.value))),
242                )
243            })
244            .collect(),
245        PointsType::StringPoints(StringPoints { points }) => points
246            .iter()
247            .map(|point| {
248                (
249                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
250                    Value::Union(1, Box::new(Value::String(point.value.clone()))),
251                )
252            })
253            .collect(),
254        PointsType::IntegerPoints(IntegerPoints { points }) => points
255            .iter()
256            .map(|point| {
257                (
258                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
259                    Value::Union(2, Box::new(Value::Long(point.value))),
260                )
261            })
262            .collect(),
263        PointsType::ArrayPoints(ArrayPoints { array_type }) => match array_type {
264            Some(ArrayType::DoubleArrayPoints(points)) => points
265                .points
266                .iter()
267                .map(|point| {
268                    let array_values: Vec<Value> =
269                        point.value.iter().map(|v| Value::Double(*v)).collect();
270                    let record =
271                        Value::Record(vec![("items".to_string(), Value::Array(array_values))]);
272                    (
273                        convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
274                        Value::Union(3, Box::new(record)),
275                    )
276                })
277                .collect(),
278            Some(ArrayType::StringArrayPoints(points)) => points
279                .points
280                .iter()
281                .map(|point| {
282                    let array_values: Vec<Value> = point
283                        .value
284                        .iter()
285                        .map(|v| Value::String(v.clone()))
286                        .collect();
287                    let record =
288                        Value::Record(vec![("items".to_string(), Value::Array(array_values))]);
289                    (
290                        convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
291                        Value::Union(4, Box::new(record)),
292                    )
293                })
294                .collect(),
295            None => (Vec::new(), Vec::new()),
296        },
297        PointsType::StructPoints(StructPoints { points }) => points
298            .iter()
299            .map(|point| {
300                let record = Value::Record(vec![(
301                    "json".to_string(),
302                    Value::String(point.json_string.clone()),
303                )]);
304                (
305                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
306                    Value::Union(5, Box::new(record)),
307                )
308            })
309            .collect(),
310        PointsType::Uint64Points(Uint64Points { points }) => points
311            .iter()
312            .map(|point| {
313                (
314                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
315                    Value::Union(2, Box::new(Value::Long(point.value as i64))),
316                )
317            })
318            .collect(),
319    }
320}
321
322fn convert_timestamp_to_nanoseconds(timestamp: Timestamp) -> Value {
323    Value::Long(timestamp.seconds * 1_000_000_000 + timestamp.nanos as i64)
324}
325
326impl WriteRequestConsumer for AvroFileConsumer {
327    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
328        self.append_series(&request.series)?;
329        Ok(())
330    }
331}
332
333#[derive(Clone)]
334pub struct RequestConsumerWithFallback<P, F>
335where
336    P: WriteRequestConsumer,
337    F: WriteRequestConsumer,
338{
339    primary: P,
340    fallback: F,
341}
342
343impl<P, F> RequestConsumerWithFallback<P, F>
344where
345    P: WriteRequestConsumer,
346    F: WriteRequestConsumer,
347{
348    pub fn new(primary: P, fallback: F) -> Self {
349        Self { primary, fallback }
350    }
351}
352
353impl<P, F> Debug for RequestConsumerWithFallback<P, F>
354where
355    F: Send + Sync + WriteRequestConsumer,
356    P: Send + Sync + WriteRequestConsumer,
357{
358    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
359        f.debug_struct("RequestConsumerWithFallback")
360            .field("primary", &self.primary)
361            .field("fallback", &self.fallback)
362            .finish()
363    }
364}
365
366#[derive(Debug, Clone)]
367pub struct DualWriteRequestConsumer<P, S>
368where
369    P: WriteRequestConsumer,
370    S: WriteRequestConsumer,
371{
372    primary: P,
373    secondary: S,
374}
375
376impl<P, S> DualWriteRequestConsumer<P, S>
377where
378    P: WriteRequestConsumer,
379    S: WriteRequestConsumer,
380{
381    pub fn new(primary: P, secondary: S) -> Self {
382        Self { primary, secondary }
383    }
384}
385
386impl<P, S> WriteRequestConsumer for DualWriteRequestConsumer<P, S>
387where
388    P: WriteRequestConsumer + Send + Sync,
389    S: WriteRequestConsumer + Send + Sync,
390{
391    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
392        let primary_result = self.primary.consume(request);
393        let secondary_result = self.secondary.consume(request);
394        if let Err(e) = &primary_result {
395            warn!("Sending request to primary consumer failed: {:?}", e);
396        }
397        if let Err(e) = &secondary_result {
398            warn!("Sending request to secondary consumer failed: {:?}", e);
399        }
400
401        // If either failed, return the error
402        primary_result.and(secondary_result)
403    }
404}
405
406impl<P, F> WriteRequestConsumer for RequestConsumerWithFallback<P, F>
407where
408    P: WriteRequestConsumer + Send + Sync,
409    F: WriteRequestConsumer + Send + Sync,
410{
411    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
412        if let Err(e) = self.primary.consume(request) {
413            warn!("Sending request to primary consumer failed. Attempting fallback.");
414            let fallback_result = self.fallback.consume(request);
415            // we want to notify the caller about the missing token error as it is a user error
416            // todo: get rid of this once we figure out why the auth handle blocks in connect
417            if let ConsumerError::MissingTokenError = e {
418                return Err(ConsumerError::MissingTokenError);
419            }
420            return fallback_result;
421        }
422        Ok(())
423    }
424}
425
426#[derive(Debug, Clone)]
427pub struct ListeningWriteRequestConsumer<C>
428where
429    C: WriteRequestConsumer,
430{
431    consumer: C,
432    listeners: Vec<Arc<dyn NominalStreamListener>>,
433}
434
435impl<C> ListeningWriteRequestConsumer<C>
436where
437    C: WriteRequestConsumer,
438{
439    pub fn new(consumer: C, listeners: Vec<Arc<dyn NominalStreamListener>>) -> Self {
440        Self {
441            consumer,
442            listeners,
443        }
444    }
445}
446
447impl<C> WriteRequestConsumer for ListeningWriteRequestConsumer<C>
448where
449    C: WriteRequestConsumer + Send + Sync,
450{
451    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
452        match self.consumer.consume(request) {
453            Ok(_) => {
454                self.listeners.on_success(request);
455                Ok(())
456            }
457            Err(e) => {
458                self.listeners.on_error(&e, request);
459                Err(e)
460            }
461        }
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use std::collections::HashMap;
468
469    use apache_avro::Reader;
470    use nominal_api::tonic::google::protobuf::Timestamp;
471    use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
472    use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
473    use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
474    use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
475    use tempfile::NamedTempFile;
476
477    use super::*;
478
479    fn make_timestamp(secs: i64, nanos: i32) -> Option<Timestamp> {
480        Some(Timestamp {
481            seconds: secs,
482            nanos,
483        })
484    }
485
486    fn make_series(name: &str, points: Points) -> Series {
487        Series {
488            channel: Some(Channel {
489                name: name.to_string(),
490            }),
491            tags: HashMap::new(),
492            points: Some(points),
493        }
494    }
495
496    #[test]
497    fn test_avro_file_with_all_value_types() {
498        let tmp_file = NamedTempFile::new().unwrap();
499        let path: PathBuf = tmp_file.path().to_path_buf();
500
501        // Create consumer and write all types
502        {
503            let consumer = AvroFileConsumer::new_with_full_path(&path).unwrap();
504
505            // Create series with each type
506            let double_series = make_series(
507                "doubles",
508                Points {
509                    points_type: Some(PointsType::DoublePoints(DoublePoints {
510                        points: vec![
511                            nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint {
512                                timestamp: make_timestamp(1000, 0),
513                                value: 1.5,
514                            },
515                            nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint {
516                                timestamp: make_timestamp(1001, 0),
517                                value: 2.5,
518                            },
519                        ],
520                    })),
521                },
522            );
523
524            let long_series = make_series(
525                "longs",
526                Points {
527                    points_type: Some(PointsType::IntegerPoints(IntegerPoints {
528                        points: vec![
529                            nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
530                                timestamp: make_timestamp(1000, 0),
531                                value: 42,
532                            },
533                            nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
534                                timestamp: make_timestamp(1001, 0),
535                                value: -100,
536                            },
537                        ],
538                    })),
539                },
540            );
541
542            let string_series = make_series(
543                "strings",
544                Points {
545                    points_type: Some(PointsType::StringPoints(StringPoints {
546                        points: vec![
547                            nominal_api::tonic::io::nominal::scout::api::proto::StringPoint {
548                                timestamp: make_timestamp(1000, 0),
549                                value: "hello".to_string(),
550                            },
551                            nominal_api::tonic::io::nominal::scout::api::proto::StringPoint {
552                                timestamp: make_timestamp(1001, 0),
553                                value: "world".to_string(),
554                            },
555                        ],
556                    })),
557                },
558            );
559
560            let double_array_series = make_series(
561                "double_arrays",
562                Points {
563                    points_type: Some(PointsType::ArrayPoints(ArrayPoints {
564                        array_type: Some(ArrayType::DoubleArrayPoints(
565                            nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoints {
566                                points: vec![
567                                    DoubleArrayPoint {
568                                        timestamp: make_timestamp(1000, 0),
569                                        value: vec![1.0, 2.0, 3.0],
570                                    },
571                                    DoubleArrayPoint {
572                                        timestamp: make_timestamp(1001, 0),
573                                        value: vec![4.0, 5.0],
574                                    },
575                                ],
576                            },
577                        )),
578                    })),
579                },
580            );
581
582            let string_array_series = make_series(
583                "string_arrays",
584                Points {
585                    points_type: Some(PointsType::ArrayPoints(ArrayPoints {
586                        array_type: Some(ArrayType::StringArrayPoints(
587                            nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoints {
588                                points: vec![
589                                    StringArrayPoint {
590                                        timestamp: make_timestamp(1000, 0),
591                                        value: vec!["a".to_string(), "b".to_string()],
592                                    },
593                                    StringArrayPoint {
594                                        timestamp: make_timestamp(1001, 0),
595                                        value: vec![
596                                            "c".to_string(),
597                                            "d".to_string(),
598                                            "e".to_string(),
599                                        ],
600                                    },
601                                ],
602                            },
603                        )),
604                    })),
605                },
606            );
607
608            let struct_series = make_series(
609                "structs",
610                Points {
611                    points_type: Some(PointsType::StructPoints(StructPoints {
612                        points: vec![
613                            nominal_api::tonic::io::nominal::scout::api::proto::StructPoint {
614                                timestamp: make_timestamp(1000, 0),
615                                json_string: r#"{"key": "value"}"#.to_string(),
616                            },
617                            nominal_api::tonic::io::nominal::scout::api::proto::StructPoint {
618                                timestamp: make_timestamp(1001, 0),
619                                json_string: r#"{"count": 42}"#.to_string(),
620                            },
621                        ],
622                    })),
623                },
624            );
625
626            let uint64_series = make_series(
627                "uint64s",
628                Points {
629                    points_type: Some(PointsType::Uint64Points(Uint64Points {
630                        points: vec![
631                            nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point {
632                                timestamp: make_timestamp(1000, 0),
633                                value: u64::MAX,
634                            },
635                            nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point {
636                                timestamp: make_timestamp(1001, 0),
637                                value: 12345678901234567890,
638                            },
639                        ],
640                    })),
641                },
642            );
643
644            let request = WriteRequestNominal {
645                series: vec![
646                    double_series,
647                    long_series,
648                    string_series,
649                    double_array_series,
650                    string_array_series,
651                    struct_series,
652                    uint64_series,
653                ],
654            };
655
656            consumer.consume(&request).unwrap();
657
658            // Flush the writer by dropping it
659            drop(consumer);
660        }
661
662        // Read back the file and verify
663        let file = std::fs::File::open(&path).unwrap();
664        let reader = Reader::new(file).unwrap();
665
666        let records: Vec<_> = reader.map(|r| r.unwrap()).collect();
667        assert_eq!(records.len(), 7, "Expected 7 series records");
668
669        // Verify each record has the expected channel name and value types
670        let channels: Vec<String> = records
671            .iter()
672            .filter_map(|r| {
673                if let Value::Record(fields) = r {
674                    fields.iter().find_map(|(name, value)| {
675                        if name == "channel" {
676                            if let Value::String(s) = value {
677                                Some(s.clone())
678                            } else {
679                                None
680                            }
681                        } else {
682                            None
683                        }
684                    })
685                } else {
686                    None
687                }
688            })
689            .collect();
690
691        assert!(channels.contains(&"doubles".to_string()));
692        assert!(channels.contains(&"longs".to_string()));
693        assert!(channels.contains(&"strings".to_string()));
694        assert!(channels.contains(&"double_arrays".to_string()));
695        assert!(channels.contains(&"string_arrays".to_string()));
696        assert!(channels.contains(&"structs".to_string()));
697        assert!(channels.contains(&"uint64s".to_string()));
698
699        // Verify specific value types by checking the union discriminants
700        for record in &records {
701            if let Value::Record(fields) = record {
702                let channel = fields.iter().find_map(|(name, value)| {
703                    if name == "channel" {
704                        if let Value::String(s) = value {
705                            Some(s.clone())
706                        } else {
707                            None
708                        }
709                    } else {
710                        None
711                    }
712                });
713
714                let values =
715                    fields.iter().find_map(
716                        |(name, value)| {
717                            if name == "values" {
718                                Some(value)
719                            } else {
720                                None
721                            }
722                        },
723                    );
724
725                if let (Some(channel), Some(Value::Array(values))) = (channel, values) {
726                    assert_eq!(values.len(), 2, "Channel {} should have 2 values", channel);
727
728                    match channel.as_str() {
729                        "doubles" => {
730                            assert_eq!(values[0], Value::Union(0, Box::new(Value::Double(1.5))));
731                            assert_eq!(values[1], Value::Union(0, Box::new(Value::Double(2.5))));
732                        }
733                        "strings" => {
734                            assert_eq!(
735                                values[0],
736                                Value::Union(1, Box::new(Value::String("hello".to_string())))
737                            );
738                            assert_eq!(
739                                values[1],
740                                Value::Union(1, Box::new(Value::String("world".to_string())))
741                            );
742                        }
743                        "longs" => {
744                            assert_eq!(values[0], Value::Union(2, Box::new(Value::Long(42))));
745                            assert_eq!(values[1], Value::Union(2, Box::new(Value::Long(-100))));
746                        }
747                        "double_arrays" => {
748                            assert_eq!(
749                                values[0],
750                                Value::Union(
751                                    3,
752                                    Box::new(Value::Record(vec![(
753                                        "items".to_string(),
754                                        Value::Array(vec![
755                                            Value::Double(1.0),
756                                            Value::Double(2.0),
757                                            Value::Double(3.0)
758                                        ])
759                                    )]))
760                                )
761                            );
762                            assert_eq!(
763                                values[1],
764                                Value::Union(
765                                    3,
766                                    Box::new(Value::Record(vec![(
767                                        "items".to_string(),
768                                        Value::Array(vec![Value::Double(4.0), Value::Double(5.0)])
769                                    )]))
770                                )
771                            );
772                        }
773                        "string_arrays" => {
774                            assert_eq!(
775                                values[0],
776                                Value::Union(
777                                    4,
778                                    Box::new(Value::Record(vec![(
779                                        "items".to_string(),
780                                        Value::Array(vec![
781                                            Value::String("a".to_string()),
782                                            Value::String("b".to_string())
783                                        ])
784                                    )]))
785                                )
786                            );
787                            assert_eq!(
788                                values[1],
789                                Value::Union(
790                                    4,
791                                    Box::new(Value::Record(vec![(
792                                        "items".to_string(),
793                                        Value::Array(vec![
794                                            Value::String("c".to_string()),
795                                            Value::String("d".to_string()),
796                                            Value::String("e".to_string())
797                                        ])
798                                    )]))
799                                )
800                            );
801                        }
802                        "structs" => {
803                            assert_eq!(
804                                values[0],
805                                Value::Union(
806                                    5,
807                                    Box::new(Value::Record(vec![(
808                                        "json".to_string(),
809                                        Value::String(r#"{"key": "value"}"#.to_string())
810                                    )]))
811                                )
812                            );
813                            assert_eq!(
814                                values[1],
815                                Value::Union(
816                                    5,
817                                    Box::new(Value::Record(vec![(
818                                        "json".to_string(),
819                                        Value::String(r#"{"count": 42}"#.to_string())
820                                    )]))
821                                )
822                            );
823                        }
824                        "uint64s" => {
825                            // u64::MAX as i64 is -1, 12345678901234567890u64 as i64 is negative
826                            assert_eq!(
827                                values[0],
828                                Value::Union(2, Box::new(Value::Long(u64::MAX as i64)))
829                            );
830                            assert_eq!(
831                                values[1],
832                                Value::Union(
833                                    2,
834                                    Box::new(Value::Long(12345678901234567890u64 as i64))
835                                )
836                            );
837                        }
838                        _ => panic!("Unexpected channel: {}", channel),
839                    }
840                }
841            }
842        }
843    }
844}