1use std::error::Error;
2use std::fmt::Debug;
3use std::fmt::Formatter;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::sync::LazyLock;
7
8use apache_avro::types::Record;
9use apache_avro::types::Value;
10use conjure_object::ResourceIdentifier;
11use nominal_api::tonic::google::protobuf::Timestamp;
12use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
13use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
14use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
15use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
16use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
17use nominal_api::tonic::io::nominal::scout::api::proto::Points;
18use nominal_api::tonic::io::nominal::scout::api::proto::Series;
19use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
20use nominal_api::tonic::io::nominal::scout::api::proto::StructPoints;
21use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Points;
22use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
23use parking_lot::Mutex;
24use prost::Message;
25use tracing::warn;
26
27use crate::client::NominalApiClients;
28use crate::client::{self};
29use crate::listener::NominalStreamListener;
30use crate::types::AuthProvider;
31
32#[derive(Debug, thiserror::Error)]
33pub enum ConsumerError {
34 #[error("io error: {0}")]
35 IoError(#[from] std::io::Error),
36 #[error("avro error: {0}")]
37 AvroError(#[from] Box<apache_avro::Error>),
38 #[error("No auth token provided. Please make sure you're authenticated.")]
39 MissingTokenError,
40 #[error("request error: {0}")]
41 RequestError(String),
42 #[error("consumer error occurred: {0}")]
43 GenericConsumerError(#[from] Box<dyn Error + Send + Sync>),
44}
45
46pub type ConsumerResult<T> = Result<T, ConsumerError>;
47
48pub trait WriteRequestConsumer: Send + Sync + Debug {
49 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()>;
50}
51
52#[derive(Clone)]
53pub struct NominalCoreConsumer<A: AuthProvider> {
54 client: NominalApiClients,
55 handle: tokio::runtime::Handle,
56 auth_provider: A,
57 data_source_rid: ResourceIdentifier,
58}
59
60impl<A: AuthProvider> NominalCoreConsumer<A> {
61 pub fn new(
62 client: NominalApiClients,
63 handle: tokio::runtime::Handle,
64 auth_provider: A,
65 data_source_rid: ResourceIdentifier,
66 ) -> Self {
67 Self {
68 client,
69 handle,
70 auth_provider,
71 data_source_rid,
72 }
73 }
74}
75
76impl<T: AuthProvider> Debug for NominalCoreConsumer<T> {
77 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("NominalCoreConsumer")
79 .field("client", &self.client)
80 .field("data_source_rid", &self.data_source_rid)
81 .finish()
82 }
83}
84
85impl<T: AuthProvider + 'static> WriteRequestConsumer for NominalCoreConsumer<T> {
86 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
87 let token = self
88 .auth_provider
89 .token()
90 .ok_or(ConsumerError::MissingTokenError)?;
91 let write_request =
92 client::encode_request(request.encode_to_vec(), &token, &self.data_source_rid)?;
93 self.handle.block_on(async {
94 self.client
95 .send(write_request)
96 .await
97 .map_err(|e| ConsumerError::RequestError(format!("{e:?}")))
98 })?;
99 Ok(())
100 }
101}
102
103const DEFAULT_FILE_PREFIX: &str = "nominal_stream";
104
105pub static CORE_SCHEMA_STR: &str = r#"{
106 "type": "record",
107 "name": "AvroStream",
108 "namespace": "io.nominal.ingest",
109 "fields": [
110 {
111 "name": "channel",
112 "type": "string",
113 "doc": "Channel/series name (e.g., 'vehicle_id', 'col_1', 'temperature')"
114 },
115 {
116 "name": "timestamps",
117 "type": {"type": "array", "items": "long"},
118 "doc": "Array of Unix timestamps in nanoseconds"
119 },
120 {
121 "name": "values",
122 "type": {"type": "array", "items": [
123 "double",
124 "string",
125 "long",
126 {"type": "record", "name": "DoubleArray", "fields": [{"name": "items", "type": {"type": "array", "items": "double"}}]},
127 {"type": "record", "name": "StringArray", "fields": [{"name": "items", "type": {"type": "array", "items": "string"}}]},
128 {"type": "record", "name": "JsonStruct", "fields": [{"name": "json", "type": "string"}]}
129 ]},
130 "doc": "Array of values. Can be doubles, longs, strings, arrays, or JSON structs"
131 },
132 {
133 "name": "tags",
134 "type": {"type": "map", "values": "string"},
135 "default": {},
136 "doc": "Key-value metadata tags"
137 }
138 ]
139}
140"#;
141
142pub static CORE_AVRO_SCHEMA: LazyLock<apache_avro::Schema> = LazyLock::new(|| {
143 let json = serde_json::from_str(CORE_SCHEMA_STR).expect("Failed to parse JSON schema");
144 apache_avro::Schema::parse(&json).expect("Failed to parse Avro schema")
145});
146
147#[derive(Clone)]
148pub struct AvroFileConsumer {
149 writer: Arc<Mutex<apache_avro::Writer<'static, std::fs::File>>>,
150 path: PathBuf,
151}
152
153impl Debug for AvroFileConsumer {
154 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155 f.debug_struct("AvroFileConsumer")
156 .field("path", &self.path)
157 .finish()
158 }
159}
160
161impl AvroFileConsumer {
162 pub fn new(
163 directory: impl Into<PathBuf>,
164 file_prefix: Option<String>,
165 ) -> std::io::Result<Self> {
166 let datetime = chrono::Utc::now().format("%Y%m%d_%H%M%S").to_string();
167 let prefix = file_prefix.unwrap_or_else(|| DEFAULT_FILE_PREFIX.to_string());
168 let filename = format!("{prefix}_{datetime}.avro");
169 let directory = directory.into();
170 let full_path = directory.join(&filename);
171
172 Self::new_with_full_path(full_path)
173 }
174
175 pub fn new_with_full_path(file_path: impl Into<PathBuf>) -> std::io::Result<Self> {
176 let path = file_path.into();
177 std::fs::create_dir_all(path.parent().unwrap_or(&path))?;
178 let file = std::fs::OpenOptions::new()
179 .create(true)
180 .truncate(false)
181 .write(true)
182 .open(&path)?;
183
184 let writer = apache_avro::Writer::builder()
185 .schema(&CORE_AVRO_SCHEMA)
186 .writer(file)
187 .codec(apache_avro::Codec::Snappy)
188 .build();
189
190 Ok(Self {
191 writer: Arc::new(Mutex::new(writer)),
192 path,
193 })
194 }
195
196 fn append_series(&self, series: &[Series]) -> ConsumerResult<()> {
197 let mut records: Vec<Record> = Vec::new();
198 for series in series {
199 let (timestamps, values) = points_to_avro(series.points.as_ref());
200
201 let mut record = Record::new(&CORE_AVRO_SCHEMA).expect("Failed to create Avro record");
202
203 record.put(
204 "channel",
205 series
206 .channel
207 .as_ref()
208 .map(|c| c.name.clone())
209 .unwrap_or("values".to_string()),
210 );
211 record.put("timestamps", Value::Array(timestamps));
212 record.put("values", Value::Array(values));
213 record.put("tags", series.tags.clone());
214
215 records.push(record);
216 }
217
218 self.writer
219 .lock()
220 .extend(records)
221 .map_err(|e| ConsumerError::AvroError(Box::new(e)))?;
222
223 Ok(())
224 }
225}
226
227fn points_to_avro(points: Option<&Points>) -> (Vec<Value>, Vec<Value>) {
228 let Some(Points {
229 points_type: Some(points),
230 }) = points
231 else {
232 return (Vec::new(), Vec::new());
233 };
234
235 match points {
236 PointsType::DoublePoints(DoublePoints { points }) => points
237 .iter()
238 .map(|point| {
239 (
240 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
241 Value::Union(0, Box::new(Value::Double(point.value))),
242 )
243 })
244 .collect(),
245 PointsType::StringPoints(StringPoints { points }) => points
246 .iter()
247 .map(|point| {
248 (
249 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
250 Value::Union(1, Box::new(Value::String(point.value.clone()))),
251 )
252 })
253 .collect(),
254 PointsType::IntegerPoints(IntegerPoints { points }) => points
255 .iter()
256 .map(|point| {
257 (
258 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
259 Value::Union(2, Box::new(Value::Long(point.value))),
260 )
261 })
262 .collect(),
263 PointsType::ArrayPoints(ArrayPoints { array_type }) => match array_type {
264 Some(ArrayType::DoubleArrayPoints(points)) => points
265 .points
266 .iter()
267 .map(|point| {
268 let array_values: Vec<Value> =
269 point.value.iter().map(|v| Value::Double(*v)).collect();
270 let record =
271 Value::Record(vec![("items".to_string(), Value::Array(array_values))]);
272 (
273 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
274 Value::Union(3, Box::new(record)),
275 )
276 })
277 .collect(),
278 Some(ArrayType::StringArrayPoints(points)) => points
279 .points
280 .iter()
281 .map(|point| {
282 let array_values: Vec<Value> = point
283 .value
284 .iter()
285 .map(|v| Value::String(v.clone()))
286 .collect();
287 let record =
288 Value::Record(vec![("items".to_string(), Value::Array(array_values))]);
289 (
290 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
291 Value::Union(4, Box::new(record)),
292 )
293 })
294 .collect(),
295 None => (Vec::new(), Vec::new()),
296 },
297 PointsType::StructPoints(StructPoints { points }) => points
298 .iter()
299 .map(|point| {
300 let record = Value::Record(vec![(
301 "json".to_string(),
302 Value::String(point.json_string.clone()),
303 )]);
304 (
305 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
306 Value::Union(5, Box::new(record)),
307 )
308 })
309 .collect(),
310 PointsType::Uint64Points(Uint64Points { points }) => points
311 .iter()
312 .map(|point| {
313 (
314 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
315 Value::Union(2, Box::new(Value::Long(point.value as i64))),
316 )
317 })
318 .collect(),
319 }
320}
321
322fn convert_timestamp_to_nanoseconds(timestamp: Timestamp) -> Value {
323 Value::Long(timestamp.seconds * 1_000_000_000 + timestamp.nanos as i64)
324}
325
326impl WriteRequestConsumer for AvroFileConsumer {
327 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
328 self.append_series(&request.series)?;
329 Ok(())
330 }
331}
332
333#[derive(Clone)]
334pub struct RequestConsumerWithFallback<P, F>
335where
336 P: WriteRequestConsumer,
337 F: WriteRequestConsumer,
338{
339 primary: P,
340 fallback: F,
341}
342
343impl<P, F> RequestConsumerWithFallback<P, F>
344where
345 P: WriteRequestConsumer,
346 F: WriteRequestConsumer,
347{
348 pub fn new(primary: P, fallback: F) -> Self {
349 Self { primary, fallback }
350 }
351}
352
353impl<P, F> Debug for RequestConsumerWithFallback<P, F>
354where
355 F: Send + Sync + WriteRequestConsumer,
356 P: Send + Sync + WriteRequestConsumer,
357{
358 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
359 f.debug_struct("RequestConsumerWithFallback")
360 .field("primary", &self.primary)
361 .field("fallback", &self.fallback)
362 .finish()
363 }
364}
365
366#[derive(Debug, Clone)]
367pub struct DualWriteRequestConsumer<P, S>
368where
369 P: WriteRequestConsumer,
370 S: WriteRequestConsumer,
371{
372 primary: P,
373 secondary: S,
374}
375
376impl<P, S> DualWriteRequestConsumer<P, S>
377where
378 P: WriteRequestConsumer,
379 S: WriteRequestConsumer,
380{
381 pub fn new(primary: P, secondary: S) -> Self {
382 Self { primary, secondary }
383 }
384}
385
386impl<P, S> WriteRequestConsumer for DualWriteRequestConsumer<P, S>
387where
388 P: WriteRequestConsumer + Send + Sync,
389 S: WriteRequestConsumer + Send + Sync,
390{
391 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
392 let primary_result = self.primary.consume(request);
393 let secondary_result = self.secondary.consume(request);
394 if let Err(e) = &primary_result {
395 warn!("Sending request to primary consumer failed: {:?}", e);
396 }
397 if let Err(e) = &secondary_result {
398 warn!("Sending request to secondary consumer failed: {:?}", e);
399 }
400
401 primary_result.and(secondary_result)
403 }
404}
405
406impl<P, F> WriteRequestConsumer for RequestConsumerWithFallback<P, F>
407where
408 P: WriteRequestConsumer + Send + Sync,
409 F: WriteRequestConsumer + Send + Sync,
410{
411 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
412 if let Err(e) = self.primary.consume(request) {
413 warn!("Sending request to primary consumer failed. Attempting fallback.");
414 let fallback_result = self.fallback.consume(request);
415 if let ConsumerError::MissingTokenError = e {
418 return Err(ConsumerError::MissingTokenError);
419 }
420 return fallback_result;
421 }
422 Ok(())
423 }
424}
425
426#[derive(Debug, Clone)]
427pub struct ListeningWriteRequestConsumer<C>
428where
429 C: WriteRequestConsumer,
430{
431 consumer: C,
432 listeners: Vec<Arc<dyn NominalStreamListener>>,
433}
434
435impl<C> ListeningWriteRequestConsumer<C>
436where
437 C: WriteRequestConsumer,
438{
439 pub fn new(consumer: C, listeners: Vec<Arc<dyn NominalStreamListener>>) -> Self {
440 Self {
441 consumer,
442 listeners,
443 }
444 }
445}
446
447impl<C> WriteRequestConsumer for ListeningWriteRequestConsumer<C>
448where
449 C: WriteRequestConsumer + Send + Sync,
450{
451 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
452 match self.consumer.consume(request) {
453 Ok(_) => {
454 self.listeners.on_success(request);
455 Ok(())
456 }
457 Err(e) => {
458 self.listeners.on_error(&e, request);
459 Err(e)
460 }
461 }
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use std::collections::HashMap;
468
469 use apache_avro::Reader;
470 use nominal_api::tonic::google::protobuf::Timestamp;
471 use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
472 use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
473 use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
474 use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
475 use tempfile::NamedTempFile;
476
477 use super::*;
478
479 fn make_timestamp(secs: i64, nanos: i32) -> Option<Timestamp> {
480 Some(Timestamp {
481 seconds: secs,
482 nanos,
483 })
484 }
485
486 fn make_series(name: &str, points: Points) -> Series {
487 Series {
488 channel: Some(Channel {
489 name: name.to_string(),
490 }),
491 tags: HashMap::new(),
492 points: Some(points),
493 }
494 }
495
496 #[test]
497 fn test_avro_file_with_all_value_types() {
498 let tmp_file = NamedTempFile::new().unwrap();
499 let path: PathBuf = tmp_file.path().to_path_buf();
500
501 {
503 let consumer = AvroFileConsumer::new_with_full_path(&path).unwrap();
504
505 let double_series = make_series(
507 "doubles",
508 Points {
509 points_type: Some(PointsType::DoublePoints(DoublePoints {
510 points: vec![
511 nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint {
512 timestamp: make_timestamp(1000, 0),
513 value: 1.5,
514 },
515 nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint {
516 timestamp: make_timestamp(1001, 0),
517 value: 2.5,
518 },
519 ],
520 })),
521 },
522 );
523
524 let long_series = make_series(
525 "longs",
526 Points {
527 points_type: Some(PointsType::IntegerPoints(IntegerPoints {
528 points: vec![
529 nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
530 timestamp: make_timestamp(1000, 0),
531 value: 42,
532 },
533 nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint {
534 timestamp: make_timestamp(1001, 0),
535 value: -100,
536 },
537 ],
538 })),
539 },
540 );
541
542 let string_series = make_series(
543 "strings",
544 Points {
545 points_type: Some(PointsType::StringPoints(StringPoints {
546 points: vec![
547 nominal_api::tonic::io::nominal::scout::api::proto::StringPoint {
548 timestamp: make_timestamp(1000, 0),
549 value: "hello".to_string(),
550 },
551 nominal_api::tonic::io::nominal::scout::api::proto::StringPoint {
552 timestamp: make_timestamp(1001, 0),
553 value: "world".to_string(),
554 },
555 ],
556 })),
557 },
558 );
559
560 let double_array_series = make_series(
561 "double_arrays",
562 Points {
563 points_type: Some(PointsType::ArrayPoints(ArrayPoints {
564 array_type: Some(ArrayType::DoubleArrayPoints(
565 nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoints {
566 points: vec![
567 DoubleArrayPoint {
568 timestamp: make_timestamp(1000, 0),
569 value: vec![1.0, 2.0, 3.0],
570 },
571 DoubleArrayPoint {
572 timestamp: make_timestamp(1001, 0),
573 value: vec![4.0, 5.0],
574 },
575 ],
576 },
577 )),
578 })),
579 },
580 );
581
582 let string_array_series = make_series(
583 "string_arrays",
584 Points {
585 points_type: Some(PointsType::ArrayPoints(ArrayPoints {
586 array_type: Some(ArrayType::StringArrayPoints(
587 nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoints {
588 points: vec![
589 StringArrayPoint {
590 timestamp: make_timestamp(1000, 0),
591 value: vec!["a".to_string(), "b".to_string()],
592 },
593 StringArrayPoint {
594 timestamp: make_timestamp(1001, 0),
595 value: vec![
596 "c".to_string(),
597 "d".to_string(),
598 "e".to_string(),
599 ],
600 },
601 ],
602 },
603 )),
604 })),
605 },
606 );
607
608 let struct_series = make_series(
609 "structs",
610 Points {
611 points_type: Some(PointsType::StructPoints(StructPoints {
612 points: vec![
613 nominal_api::tonic::io::nominal::scout::api::proto::StructPoint {
614 timestamp: make_timestamp(1000, 0),
615 json_string: r#"{"key": "value"}"#.to_string(),
616 },
617 nominal_api::tonic::io::nominal::scout::api::proto::StructPoint {
618 timestamp: make_timestamp(1001, 0),
619 json_string: r#"{"count": 42}"#.to_string(),
620 },
621 ],
622 })),
623 },
624 );
625
626 let uint64_series = make_series(
627 "uint64s",
628 Points {
629 points_type: Some(PointsType::Uint64Points(Uint64Points {
630 points: vec![
631 nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point {
632 timestamp: make_timestamp(1000, 0),
633 value: u64::MAX,
634 },
635 nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point {
636 timestamp: make_timestamp(1001, 0),
637 value: 12345678901234567890,
638 },
639 ],
640 })),
641 },
642 );
643
644 let request = WriteRequestNominal {
645 series: vec![
646 double_series,
647 long_series,
648 string_series,
649 double_array_series,
650 string_array_series,
651 struct_series,
652 uint64_series,
653 ],
654 };
655
656 consumer.consume(&request).unwrap();
657
658 drop(consumer);
660 }
661
662 let file = std::fs::File::open(&path).unwrap();
664 let reader = Reader::new(file).unwrap();
665
666 let records: Vec<_> = reader.map(|r| r.unwrap()).collect();
667 assert_eq!(records.len(), 7, "Expected 7 series records");
668
669 let channels: Vec<String> = records
671 .iter()
672 .filter_map(|r| {
673 if let Value::Record(fields) = r {
674 fields.iter().find_map(|(name, value)| {
675 if name == "channel" {
676 if let Value::String(s) = value {
677 Some(s.clone())
678 } else {
679 None
680 }
681 } else {
682 None
683 }
684 })
685 } else {
686 None
687 }
688 })
689 .collect();
690
691 assert!(channels.contains(&"doubles".to_string()));
692 assert!(channels.contains(&"longs".to_string()));
693 assert!(channels.contains(&"strings".to_string()));
694 assert!(channels.contains(&"double_arrays".to_string()));
695 assert!(channels.contains(&"string_arrays".to_string()));
696 assert!(channels.contains(&"structs".to_string()));
697 assert!(channels.contains(&"uint64s".to_string()));
698
699 for record in &records {
701 if let Value::Record(fields) = record {
702 let channel = fields.iter().find_map(|(name, value)| {
703 if name == "channel" {
704 if let Value::String(s) = value {
705 Some(s.clone())
706 } else {
707 None
708 }
709 } else {
710 None
711 }
712 });
713
714 let values =
715 fields.iter().find_map(
716 |(name, value)| {
717 if name == "values" {
718 Some(value)
719 } else {
720 None
721 }
722 },
723 );
724
725 if let (Some(channel), Some(Value::Array(values))) = (channel, values) {
726 assert_eq!(values.len(), 2, "Channel {} should have 2 values", channel);
727
728 match channel.as_str() {
729 "doubles" => {
730 assert_eq!(values[0], Value::Union(0, Box::new(Value::Double(1.5))));
731 assert_eq!(values[1], Value::Union(0, Box::new(Value::Double(2.5))));
732 }
733 "strings" => {
734 assert_eq!(
735 values[0],
736 Value::Union(1, Box::new(Value::String("hello".to_string())))
737 );
738 assert_eq!(
739 values[1],
740 Value::Union(1, Box::new(Value::String("world".to_string())))
741 );
742 }
743 "longs" => {
744 assert_eq!(values[0], Value::Union(2, Box::new(Value::Long(42))));
745 assert_eq!(values[1], Value::Union(2, Box::new(Value::Long(-100))));
746 }
747 "double_arrays" => {
748 assert_eq!(
749 values[0],
750 Value::Union(
751 3,
752 Box::new(Value::Record(vec![(
753 "items".to_string(),
754 Value::Array(vec![
755 Value::Double(1.0),
756 Value::Double(2.0),
757 Value::Double(3.0)
758 ])
759 )]))
760 )
761 );
762 assert_eq!(
763 values[1],
764 Value::Union(
765 3,
766 Box::new(Value::Record(vec![(
767 "items".to_string(),
768 Value::Array(vec![Value::Double(4.0), Value::Double(5.0)])
769 )]))
770 )
771 );
772 }
773 "string_arrays" => {
774 assert_eq!(
775 values[0],
776 Value::Union(
777 4,
778 Box::new(Value::Record(vec![(
779 "items".to_string(),
780 Value::Array(vec![
781 Value::String("a".to_string()),
782 Value::String("b".to_string())
783 ])
784 )]))
785 )
786 );
787 assert_eq!(
788 values[1],
789 Value::Union(
790 4,
791 Box::new(Value::Record(vec![(
792 "items".to_string(),
793 Value::Array(vec![
794 Value::String("c".to_string()),
795 Value::String("d".to_string()),
796 Value::String("e".to_string())
797 ])
798 )]))
799 )
800 );
801 }
802 "structs" => {
803 assert_eq!(
804 values[0],
805 Value::Union(
806 5,
807 Box::new(Value::Record(vec![(
808 "json".to_string(),
809 Value::String(r#"{"key": "value"}"#.to_string())
810 )]))
811 )
812 );
813 assert_eq!(
814 values[1],
815 Value::Union(
816 5,
817 Box::new(Value::Record(vec![(
818 "json".to_string(),
819 Value::String(r#"{"count": 42}"#.to_string())
820 )]))
821 )
822 );
823 }
824 "uint64s" => {
825 assert_eq!(
827 values[0],
828 Value::Union(2, Box::new(Value::Long(u64::MAX as i64)))
829 );
830 assert_eq!(
831 values[1],
832 Value::Union(
833 2,
834 Box::new(Value::Long(12345678901234567890u64 as i64))
835 )
836 );
837 }
838 _ => panic!("Unexpected channel: {}", channel),
839 }
840 }
841 }
842 }
843 }
844}