apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    encode::{encode, encode_internal, encode_to_vec},
21    rabin::Rabin,
22    schema::{AvroSchema, ResolvedOwnedSchema, ResolvedSchema, Schema},
23    ser::Serializer,
24    types::Value,
25    AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, marker::PhantomData};
29
30const DEFAULT_BLOCK_SIZE: usize = 16000;
31const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
32
33/// Main interface for writing Avro formatted values.
34#[derive(typed_builder::TypedBuilder)]
35pub struct Writer<'a, W> {
36    schema: &'a Schema,
37    writer: W,
38    #[builder(default, setter(skip))]
39    resolved_schema: Option<ResolvedSchema<'a>>,
40    #[builder(default = Codec::Null)]
41    codec: Codec,
42    #[builder(default = DEFAULT_BLOCK_SIZE)]
43    block_size: usize,
44    #[builder(default = Vec::with_capacity(block_size), setter(skip))]
45    buffer: Vec<u8>,
46    #[builder(default, setter(skip))]
47    serializer: Serializer,
48    #[builder(default = 0, setter(skip))]
49    num_values: usize,
50    #[builder(default = generate_sync_marker())]
51    marker: [u8; 16],
52    #[builder(default = false, setter(skip))]
53    has_header: bool,
54    #[builder(default)]
55    user_metadata: HashMap<String, Value>,
56}
57
58impl<'a, W: Write> Writer<'a, W> {
59    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
60    /// to.
61    /// No compression `Codec` will be used.
62    pub fn new(schema: &'a Schema, writer: W) -> Self {
63        Writer::with_codec(schema, writer, Codec::Null)
64    }
65
66    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
67    /// `io::Write` trait to write to.
68    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
69        let mut w = Self::builder()
70            .schema(schema)
71            .writer(writer)
72            .codec(codec)
73            .build();
74        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
75        w
76    }
77
78    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
79    /// `io::Write` trait to write to.
80    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
81    /// be provided in `schemata`.
82    pub fn with_schemata(
83        schema: &'a Schema,
84        schemata: Vec<&'a Schema>,
85        writer: W,
86        codec: Codec,
87    ) -> Self {
88        let mut w = Self::builder()
89            .schema(schema)
90            .writer(writer)
91            .codec(codec)
92            .build();
93        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
94        w
95    }
96
97    /// Creates a `Writer` that will append values to already populated
98    /// `std::io::Write` using the provided `marker`
99    /// No compression `Codec` will be used.
100    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
101        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
102    }
103
104    /// Creates a `Writer` that will append values to already populated
105    /// `std::io::Write` using the provided `marker`
106    pub fn append_to_with_codec(
107        schema: &'a Schema,
108        writer: W,
109        codec: Codec,
110        marker: [u8; 16],
111    ) -> Self {
112        let mut w = Self::builder()
113            .schema(schema)
114            .writer(writer)
115            .codec(codec)
116            .marker(marker)
117            .build();
118        w.has_header = true;
119        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
120        w
121    }
122
123    /// Creates a `Writer` that will append values to already populated
124    /// `std::io::Write` using the provided `marker`
125    pub fn append_to_with_codec_schemata(
126        schema: &'a Schema,
127        schemata: Vec<&'a Schema>,
128        writer: W,
129        codec: Codec,
130        marker: [u8; 16],
131    ) -> Self {
132        let mut w = Self::builder()
133            .schema(schema)
134            .writer(writer)
135            .codec(codec)
136            .marker(marker)
137            .build();
138        w.has_header = true;
139        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
140        w
141    }
142
143    /// Get a reference to the `Schema` associated to a `Writer`.
144    pub fn schema(&self) -> &'a Schema {
145        self.schema
146    }
147
148    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
149    /// schema validation.
150    ///
151    /// Return the number of bytes written (it might be 0, see below).
152    ///
153    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
154    /// internal buffering for performance reasons. If you want to be sure the value has been
155    /// written, then call [`flush`](struct.Writer.html#method.flush).
156    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
157        let n = self.maybe_write_header()?;
158
159        let avro = value.into();
160        self.append_value_ref(&avro).map(|m| m + n)
161    }
162
163    /// Append a compatible value to a `Writer`, also performing schema validation.
164    ///
165    /// Return the number of bytes written (it might be 0, see below).
166    ///
167    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
168    /// internal buffering for performance reasons. If you want to be sure the value has been
169    /// written, then call [`flush`](struct.Writer.html#method.flush).
170    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
171        let n = self.maybe_write_header()?;
172
173        // Lazy init for users using the builder pattern with error throwing
174        match self.resolved_schema {
175            Some(ref rs) => {
176                write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
177                self.num_values += 1;
178
179                if self.buffer.len() >= self.block_size {
180                    return self.flush().map(|b| b + n);
181                }
182
183                Ok(n)
184            }
185            None => {
186                let rs = ResolvedSchema::try_from(self.schema)?;
187                self.resolved_schema = Some(rs);
188                self.append_value_ref(value)
189            }
190        }
191    }
192
193    /// Append anything implementing the `Serialize` trait to a `Writer` for
194    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
195    /// validation.
196    ///
197    /// Return the number of bytes written.
198    ///
199    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
200    /// internal buffering for performance reasons. If you want to be sure the value has been
201    /// written, then call [`flush`](struct.Writer.html#method.flush).
202    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
203        let avro_value = value.serialize(&mut self.serializer)?;
204        self.append(avro_value)
205    }
206
207    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
208    /// trait), also performing schema validation.
209    ///
210    /// Return the number of bytes written.
211    ///
212    /// **NOTE** This function forces the written data to be flushed (an implicit
213    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
214    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
215    where
216        I: IntoIterator<Item = T>,
217    {
218        /*
219        https://github.com/rust-lang/rfcs/issues/811 :(
220        let mut stream = values
221            .filter_map(|value| value.serialize(&mut self.serializer).ok())
222            .map(|value| value.encode(self.schema))
223            .collect::<Option<Vec<_>>>()
224            .ok_or_else(|| err_msg("value does not match given schema"))?
225            .into_iter()
226            .fold(Vec::new(), |mut acc, stream| {
227                num_values += 1;
228                acc.extend(stream); acc
229            });
230        */
231
232        let mut num_bytes = 0;
233        for value in values {
234            num_bytes += self.append(value)?;
235        }
236        num_bytes += self.flush()?;
237
238        Ok(num_bytes)
239    }
240
241    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
242    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
243    /// validation.
244    ///
245    /// Return the number of bytes written.
246    ///
247    /// **NOTE** This function forces the written data to be flushed (an implicit
248    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
249    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
250    where
251        I: IntoIterator<Item = T>,
252    {
253        /*
254        https://github.com/rust-lang/rfcs/issues/811 :(
255        let mut stream = values
256            .filter_map(|value| value.serialize(&mut self.serializer).ok())
257            .map(|value| value.encode(self.schema))
258            .collect::<Option<Vec<_>>>()
259            .ok_or_else(|| err_msg("value does not match given schema"))?
260            .into_iter()
261            .fold(Vec::new(), |mut acc, stream| {
262                num_values += 1;
263                acc.extend(stream); acc
264            });
265        */
266
267        let mut num_bytes = 0;
268        for value in values {
269            num_bytes += self.append_ser(value)?;
270        }
271        num_bytes += self.flush()?;
272
273        Ok(num_bytes)
274    }
275
276    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
277    /// validation on each value appended.
278    ///
279    /// Return the number of bytes written.
280    ///
281    /// **NOTE** This function forces the written data to be flushed (an implicit
282    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
283    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
284        let mut num_bytes = 0;
285        for value in values {
286            num_bytes += self.append_value_ref(value)?;
287        }
288        num_bytes += self.flush()?;
289
290        Ok(num_bytes)
291    }
292
293    /// Flush the content appended to a `Writer`. Call this function to make sure all the content
294    /// has been written before releasing the `Writer`.
295    ///
296    /// Return the number of bytes written.
297    pub fn flush(&mut self) -> AvroResult<usize> {
298        if self.num_values == 0 {
299            return Ok(0);
300        }
301
302        self.codec.compress(&mut self.buffer)?;
303
304        let num_values = self.num_values;
305        let stream_len = self.buffer.len();
306
307        let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
308            + self.append_raw(&stream_len.into(), &Schema::Long)?
309            + self
310                .writer
311                .write(self.buffer.as_ref())
312                .map_err(Error::WriteBytes)?
313            + self.append_marker()?;
314
315        self.buffer.clear();
316        self.num_values = 0;
317
318        Ok(num_bytes)
319    }
320
321    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
322    ///
323    /// **NOTE** This function forces the written data to be flushed (an implicit
324    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
325    pub fn into_inner(mut self) -> AvroResult<W> {
326        self.maybe_write_header()?;
327        self.flush()?;
328        Ok(self.writer)
329    }
330
331    /// Generate and append synchronization marker to the payload.
332    fn append_marker(&mut self) -> AvroResult<usize> {
333        // using .writer.write directly to avoid mutable borrow of self
334        // with ref borrowing of self.marker
335        self.writer.write(&self.marker).map_err(Error::WriteMarker)
336    }
337
338    /// Append a raw Avro Value to the payload avoiding to encode it again.
339    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
340        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
341    }
342
343    /// Append pure bytes to the payload.
344    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
345        self.writer.write(bytes).map_err(Error::WriteBytes)
346    }
347
348    /// Adds custom metadata to the file.
349    /// This method could be used only before adding the first record to the writer.
350    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
351        if !self.has_header {
352            if key.starts_with("avro.") {
353                return Err(Error::InvalidMetadataKey(key));
354            }
355            self.user_metadata
356                .insert(key, Value::Bytes(value.as_ref().to_vec()));
357            Ok(())
358        } else {
359            Err(Error::FileHeaderAlreadyWritten)
360        }
361    }
362
363    /// Create an Avro header based on schema, codec and sync marker.
364    fn header(&self) -> Result<Vec<u8>, Error> {
365        let schema_bytes = serde_json::to_string(self.schema)
366            .map_err(Error::ConvertJsonToString)?
367            .into_bytes();
368
369        let mut metadata = HashMap::with_capacity(2);
370        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
371        metadata.insert("avro.codec", self.codec.into());
372
373        for (k, v) in &self.user_metadata {
374            metadata.insert(k.as_str(), v.clone());
375        }
376
377        let mut header = Vec::new();
378        header.extend_from_slice(AVRO_OBJECT_HEADER);
379        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
380        header.extend_from_slice(&self.marker);
381
382        Ok(header)
383    }
384
385    fn maybe_write_header(&mut self) -> AvroResult<usize> {
386        if !self.has_header {
387            let header = self.header()?;
388            let n = self.append_bytes(header.as_ref())?;
389            self.has_header = true;
390            Ok(n)
391        } else {
392            Ok(0)
393        }
394    }
395}
396
397/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
398/// schema validation.
399///
400/// This is an internal function which gets the bytes buffer where to write as parameter instead of
401/// creating a new one like `to_avro_datum`.
402fn write_avro_datum<T: Into<Value>>(
403    schema: &Schema,
404    value: T,
405    buffer: &mut Vec<u8>,
406) -> Result<(), Error> {
407    let avro = value.into();
408    if !avro.validate(schema) {
409        return Err(Error::Validation);
410    }
411    encode(&avro, schema, buffer)?;
412    Ok(())
413}
414
415fn write_avro_datum_schemata<T: Into<Value>>(
416    schema: &Schema,
417    schemata: Vec<&Schema>,
418    value: T,
419    buffer: &mut Vec<u8>,
420) -> AvroResult<()> {
421    let avro = value.into();
422    let rs = ResolvedSchema::try_from(schemata)?;
423    let names = rs.get_names();
424    let enclosing_namespace = schema.namespace();
425    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
426        return Err(Error::Validation);
427    }
428    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
429}
430
431/// Writer that encodes messages according to the single object encoding v1 spec
432/// Uses an API similar to the current File Writer
433/// Writes all object bytes at once, and drains internal buffer
434pub struct GenericSingleObjectWriter {
435    buffer: Vec<u8>,
436    resolved: ResolvedOwnedSchema,
437}
438
439impl GenericSingleObjectWriter {
440    pub fn new_with_capacity(
441        schema: &Schema,
442        initial_buffer_cap: usize,
443    ) -> AvroResult<GenericSingleObjectWriter> {
444        let fingerprint = schema.fingerprint::<Rabin>();
445        let mut buffer = Vec::with_capacity(initial_buffer_cap);
446        let header = [
447            0xC3,
448            0x01,
449            fingerprint.bytes[0],
450            fingerprint.bytes[1],
451            fingerprint.bytes[2],
452            fingerprint.bytes[3],
453            fingerprint.bytes[4],
454            fingerprint.bytes[5],
455            fingerprint.bytes[6],
456            fingerprint.bytes[7],
457        ];
458        buffer.extend_from_slice(&header);
459
460        Ok(GenericSingleObjectWriter {
461            buffer,
462            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
463        })
464    }
465
466    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
467    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
468        if self.buffer.len() != 10 {
469            Err(Error::IllegalSingleObjectWriterState)
470        } else {
471            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
472            writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
473            let len = self.buffer.len();
474            self.buffer.truncate(10);
475            Ok(len)
476        }
477    }
478
479    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
480    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
481        self.write_value_ref(&v, writer)
482    }
483}
484
485/// Writer that encodes messages according to the single object encoding v1 spec
486pub struct SpecificSingleObjectWriter<T>
487where
488    T: AvroSchema,
489{
490    inner: GenericSingleObjectWriter,
491    _model: PhantomData<T>,
492}
493
494impl<T> SpecificSingleObjectWriter<T>
495where
496    T: AvroSchema,
497{
498    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
499        let schema = T::get_schema();
500        Ok(SpecificSingleObjectWriter {
501            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
502            _model: PhantomData,
503        })
504    }
505}
506
507impl<T> SpecificSingleObjectWriter<T>
508where
509    T: AvroSchema + Into<Value>,
510{
511    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
512    /// of bytes written including the header
513    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
514        let v: Value = data.into();
515        self.inner.write_value_ref(&v, writer)
516    }
517}
518
519impl<T> SpecificSingleObjectWriter<T>
520where
521    T: AvroSchema + Serialize,
522{
523    /// Write the referenced Serialize object to the provided Write object. Returns a result with
524    /// the number of bytes written including the header
525    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
526        let mut serializer = Serializer::default();
527        let v = data.serialize(&mut serializer)?;
528        self.inner.write_value_ref(&v, writer)
529    }
530
531    /// Write the Serialize object to the provided Write object. Returns a result with the number
532    /// of bytes written including the header
533    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
534        self.write_ref(&data, writer)
535    }
536}
537
538fn write_value_ref_resolved(
539    schema: &Schema,
540    resolved_schema: &ResolvedSchema,
541    value: &Value,
542    buffer: &mut Vec<u8>,
543) -> AvroResult<()> {
544    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
545        Some(reason) => Err(Error::ValidationWithReason {
546            value: value.clone(),
547            schema: schema.clone(),
548            reason,
549        }),
550        None => encode_internal(
551            value,
552            schema,
553            resolved_schema.get_names(),
554            &schema.namespace(),
555            buffer,
556        ),
557    }
558}
559
560fn write_value_ref_owned_resolved(
561    resolved_schema: &ResolvedOwnedSchema,
562    value: &Value,
563    buffer: &mut Vec<u8>,
564) -> AvroResult<()> {
565    let root_schema = resolved_schema.get_root_schema();
566    if let Some(reason) = value.validate_internal(
567        root_schema,
568        resolved_schema.get_names(),
569        &root_schema.namespace(),
570    ) {
571        return Err(Error::ValidationWithReason {
572            value: value.clone(),
573            schema: root_schema.clone(),
574            reason,
575        });
576    }
577    encode_internal(
578        value,
579        root_schema,
580        resolved_schema.get_names(),
581        &root_schema.namespace(),
582        buffer,
583    )?;
584    Ok(())
585}
586
587/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
588/// performing schema validation.
589///
590/// **NOTE** This function has a quite small niche of usage and does NOT generate headers and sync
591/// markers; use [`Writer`](struct.Writer.html) to be fully Avro-compatible if you don't know what
592/// you are doing, instead.
593pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
594    let mut buffer = Vec::new();
595    write_avro_datum(schema, value, &mut buffer)?;
596    Ok(buffer)
597}
598
599/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
600/// performing schema validation.
601/// If the provided `schema` is incomplete then its dependencies must be
602/// provided in `schemata`
603pub fn to_avro_datum_schemata<T: Into<Value>>(
604    schema: &Schema,
605    schemata: Vec<&Schema>,
606    value: T,
607) -> AvroResult<Vec<u8>> {
608    let mut buffer = Vec::new();
609    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
610    Ok(buffer)
611}
612
613#[cfg(not(target_arch = "wasm32"))]
614fn generate_sync_marker() -> [u8; 16] {
615    let mut marker = [0_u8; 16];
616    std::iter::repeat_with(rand::random)
617        .take(16)
618        .enumerate()
619        .for_each(|(i, n)| marker[i] = n);
620    marker
621}
622
623#[cfg(target_arch = "wasm32")]
624fn generate_sync_marker() -> [u8; 16] {
625    let mut marker = [0_u8; 16];
626    std::iter::repeat_with(quad_rand::rand)
627        .take(4)
628        .flat_map(|i| i.to_be_bytes())
629        .enumerate()
630        .for_each(|(i, n)| marker[i] = n);
631    marker
632}
633
634#[cfg(test)]
635mod tests {
636    use super::*;
637    use crate::{
638        decimal::Decimal,
639        duration::{Days, Duration, Millis, Months},
640        schema::{DecimalSchema, FixedSchema, Name},
641        types::Record,
642        util::zig_i64,
643        Reader,
644    };
645    use pretty_assertions::assert_eq;
646    use serde::{Deserialize, Serialize};
647
648    use apache_avro_test_helper::TestResult;
649
650    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
651
652    const SCHEMA: &str = r#"
653    {
654      "type": "record",
655      "name": "test",
656      "fields": [
657        {
658          "name": "a",
659          "type": "long",
660          "default": 42
661        },
662        {
663          "name": "b",
664          "type": "string"
665        }
666      ]
667    }
668    "#;
669    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
670
671    #[test]
672    fn test_to_avro_datum() -> TestResult {
673        let schema = Schema::parse_str(SCHEMA)?;
674        let mut record = Record::new(&schema).unwrap();
675        record.put("a", 27i64);
676        record.put("b", "foo");
677
678        let mut expected = Vec::new();
679        zig_i64(27, &mut expected);
680        zig_i64(3, &mut expected);
681        expected.extend([b'f', b'o', b'o']);
682
683        assert_eq!(to_avro_datum(&schema, record)?, expected);
684
685        Ok(())
686    }
687
688    #[test]
689    fn test_union_not_null() -> TestResult {
690        let schema = Schema::parse_str(UNION_SCHEMA)?;
691        let union = Value::Union(1, Box::new(Value::Long(3)));
692
693        let mut expected = Vec::new();
694        zig_i64(1, &mut expected);
695        zig_i64(3, &mut expected);
696
697        assert_eq!(to_avro_datum(&schema, union)?, expected);
698
699        Ok(())
700    }
701
702    #[test]
703    fn test_union_null() -> TestResult {
704        let schema = Schema::parse_str(UNION_SCHEMA)?;
705        let union = Value::Union(0, Box::new(Value::Null));
706
707        let mut expected = Vec::new();
708        zig_i64(0, &mut expected);
709
710        assert_eq!(to_avro_datum(&schema, union)?, expected);
711
712        Ok(())
713    }
714
715    fn logical_type_test<T: Into<Value> + Clone>(
716        schema_str: &'static str,
717
718        expected_schema: &Schema,
719        value: Value,
720
721        raw_schema: &Schema,
722        raw_value: T,
723    ) -> TestResult {
724        let schema = Schema::parse_str(schema_str)?;
725        assert_eq!(&schema, expected_schema);
726        // The serialized format should be the same as the schema.
727        let ser = to_avro_datum(&schema, value.clone())?;
728        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
729        assert_eq!(ser, raw_ser);
730
731        // Should deserialize from the schema into the logical type.
732        let mut r = ser.as_slice();
733        let de = crate::from_avro_datum(&schema, &mut r, None)?;
734        assert_eq!(de, value);
735        Ok(())
736    }
737
738    #[test]
739    fn date() -> TestResult {
740        logical_type_test(
741            r#"{"type": "int", "logicalType": "date"}"#,
742            &Schema::Date,
743            Value::Date(1_i32),
744            &Schema::Int,
745            1_i32,
746        )
747    }
748
749    #[test]
750    fn time_millis() -> TestResult {
751        logical_type_test(
752            r#"{"type": "int", "logicalType": "time-millis"}"#,
753            &Schema::TimeMillis,
754            Value::TimeMillis(1_i32),
755            &Schema::Int,
756            1_i32,
757        )
758    }
759
760    #[test]
761    fn time_micros() -> TestResult {
762        logical_type_test(
763            r#"{"type": "long", "logicalType": "time-micros"}"#,
764            &Schema::TimeMicros,
765            Value::TimeMicros(1_i64),
766            &Schema::Long,
767            1_i64,
768        )
769    }
770
771    #[test]
772    fn timestamp_millis() -> TestResult {
773        logical_type_test(
774            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
775            &Schema::TimestampMillis,
776            Value::TimestampMillis(1_i64),
777            &Schema::Long,
778            1_i64,
779        )
780    }
781
782    #[test]
783    fn timestamp_micros() -> TestResult {
784        logical_type_test(
785            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
786            &Schema::TimestampMicros,
787            Value::TimestampMicros(1_i64),
788            &Schema::Long,
789            1_i64,
790        )
791    }
792
793    #[test]
794    fn decimal_fixed() -> TestResult {
795        let size = 30;
796        let inner = Schema::Fixed(FixedSchema {
797            name: Name::new("decimal")?,
798            aliases: None,
799            doc: None,
800            size,
801            default: None,
802            attributes: Default::default(),
803        });
804        let value = vec![0u8; size];
805        logical_type_test(
806            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
807            &Schema::Decimal(DecimalSchema {
808                precision: 20,
809                scale: 5,
810                inner: Box::new(inner.clone()),
811            }),
812            Value::Decimal(Decimal::from(value.clone())),
813            &inner,
814            Value::Fixed(size, value),
815        )
816    }
817
818    #[test]
819    fn decimal_bytes() -> TestResult {
820        let inner = Schema::Bytes;
821        let value = vec![0u8; 10];
822        logical_type_test(
823            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
824            &Schema::Decimal(DecimalSchema {
825                precision: 4,
826                scale: 3,
827                inner: Box::new(inner.clone()),
828            }),
829            Value::Decimal(Decimal::from(value.clone())),
830            &inner,
831            value,
832        )
833    }
834
835    #[test]
836    fn duration() -> TestResult {
837        let inner = Schema::Fixed(FixedSchema {
838            name: Name::new("duration")?,
839            aliases: None,
840            doc: None,
841            size: 12,
842            default: None,
843            attributes: Default::default(),
844        });
845        let value = Value::Duration(Duration::new(
846            Months::new(256),
847            Days::new(512),
848            Millis::new(1024),
849        ));
850        logical_type_test(
851            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
852            &Schema::Duration,
853            value,
854            &inner,
855            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
856        )
857    }
858
859    #[test]
860    fn test_writer_append() -> TestResult {
861        let schema = Schema::parse_str(SCHEMA)?;
862        let mut writer = Writer::new(&schema, Vec::new());
863
864        let mut record = Record::new(&schema).unwrap();
865        record.put("a", 27i64);
866        record.put("b", "foo");
867
868        let n1 = writer.append(record.clone())?;
869        let n2 = writer.append(record.clone())?;
870        let n3 = writer.flush()?;
871        let result = writer.into_inner()?;
872
873        assert_eq!(n1 + n2 + n3, result.len());
874
875        let mut data = Vec::new();
876        zig_i64(27, &mut data);
877        zig_i64(3, &mut data);
878        data.extend(b"foo");
879        data.extend(data.clone());
880
881        // starts with magic
882        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
883        // ends with data and sync marker
884        let last_data_byte = result.len() - 16;
885        assert_eq!(
886            &result[last_data_byte - data.len()..last_data_byte],
887            data.as_slice()
888        );
889
890        Ok(())
891    }
892
893    #[test]
894    fn test_writer_extend() -> TestResult {
895        let schema = Schema::parse_str(SCHEMA)?;
896        let mut writer = Writer::new(&schema, Vec::new());
897
898        let mut record = Record::new(&schema).unwrap();
899        record.put("a", 27i64);
900        record.put("b", "foo");
901        let record_copy = record.clone();
902        let records = vec![record, record_copy];
903
904        let n1 = writer.extend(records)?;
905        let n2 = writer.flush()?;
906        let result = writer.into_inner()?;
907
908        assert_eq!(n1 + n2, result.len());
909
910        let mut data = Vec::new();
911        zig_i64(27, &mut data);
912        zig_i64(3, &mut data);
913        data.extend(b"foo");
914        data.extend(data.clone());
915
916        // starts with magic
917        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
918        // ends with data and sync marker
919        let last_data_byte = result.len() - 16;
920        assert_eq!(
921            &result[last_data_byte - data.len()..last_data_byte],
922            data.as_slice()
923        );
924
925        Ok(())
926    }
927
928    #[derive(Debug, Clone, Deserialize, Serialize)]
929    struct TestSerdeSerialize {
930        a: i64,
931        b: String,
932    }
933
934    #[test]
935    fn test_writer_append_ser() -> TestResult {
936        let schema = Schema::parse_str(SCHEMA)?;
937        let mut writer = Writer::new(&schema, Vec::new());
938
939        let record = TestSerdeSerialize {
940            a: 27,
941            b: "foo".to_owned(),
942        };
943
944        let n1 = writer.append_ser(record)?;
945        let n2 = writer.flush()?;
946        let result = writer.into_inner()?;
947
948        assert_eq!(n1 + n2, result.len());
949
950        let mut data = Vec::new();
951        zig_i64(27, &mut data);
952        zig_i64(3, &mut data);
953        data.extend(b"foo");
954
955        // starts with magic
956        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
957        // ends with data and sync marker
958        let last_data_byte = result.len() - 16;
959        assert_eq!(
960            &result[last_data_byte - data.len()..last_data_byte],
961            data.as_slice()
962        );
963
964        Ok(())
965    }
966
967    #[test]
968    fn test_writer_extend_ser() -> TestResult {
969        let schema = Schema::parse_str(SCHEMA)?;
970        let mut writer = Writer::new(&schema, Vec::new());
971
972        let record = TestSerdeSerialize {
973            a: 27,
974            b: "foo".to_owned(),
975        };
976        let record_copy = record.clone();
977        let records = vec![record, record_copy];
978
979        let n1 = writer.extend_ser(records)?;
980        let n2 = writer.flush()?;
981        let result = writer.into_inner()?;
982
983        assert_eq!(n1 + n2, result.len());
984
985        let mut data = Vec::new();
986        zig_i64(27, &mut data);
987        zig_i64(3, &mut data);
988        data.extend(b"foo");
989        data.extend(data.clone());
990
991        // starts with magic
992        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
993        // ends with data and sync marker
994        let last_data_byte = result.len() - 16;
995        assert_eq!(
996            &result[last_data_byte - data.len()..last_data_byte],
997            data.as_slice()
998        );
999
1000        Ok(())
1001    }
1002
1003    fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1004        Writer::with_codec(schema, Vec::new(), Codec::Deflate)
1005    }
1006
1007    fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1008        Writer::builder()
1009            .writer(Vec::new())
1010            .schema(schema)
1011            .codec(Codec::Deflate)
1012            .block_size(100)
1013            .build()
1014    }
1015
1016    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1017        let mut record = Record::new(schema).unwrap();
1018        record.put("a", 27i64);
1019        record.put("b", "foo");
1020
1021        let n1 = writer.append(record.clone())?;
1022        let n2 = writer.append(record.clone())?;
1023        let n3 = writer.flush()?;
1024        let result = writer.into_inner()?;
1025
1026        assert_eq!(n1 + n2 + n3, result.len());
1027
1028        let mut data = Vec::new();
1029        zig_i64(27, &mut data);
1030        zig_i64(3, &mut data);
1031        data.extend(b"foo");
1032        data.extend(data.clone());
1033        Codec::Deflate.compress(&mut data)?;
1034
1035        // starts with magic
1036        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1037        // ends with data and sync marker
1038        let last_data_byte = result.len() - 16;
1039        assert_eq!(
1040            &result[last_data_byte - data.len()..last_data_byte],
1041            data.as_slice()
1042        );
1043
1044        Ok(())
1045    }
1046
1047    #[test]
1048    fn test_writer_with_codec() -> TestResult {
1049        let schema = Schema::parse_str(SCHEMA)?;
1050        let writer = make_writer_with_codec(&schema);
1051        check_writer(writer, &schema)
1052    }
1053
1054    #[test]
1055    fn test_writer_with_builder() -> TestResult {
1056        let schema = Schema::parse_str(SCHEMA)?;
1057        let writer = make_writer_with_builder(&schema);
1058        check_writer(writer, &schema)
1059    }
1060
1061    #[test]
1062    fn test_logical_writer() -> TestResult {
1063        const LOGICAL_TYPE_SCHEMA: &str = r#"
1064        {
1065          "type": "record",
1066          "name": "logical_type_test",
1067          "fields": [
1068            {
1069              "name": "a",
1070              "type": [
1071                "null",
1072                {
1073                  "type": "long",
1074                  "logicalType": "timestamp-micros"
1075                }
1076              ]
1077            }
1078          ]
1079        }
1080        "#;
1081        let codec = Codec::Deflate;
1082        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1083        let mut writer = Writer::builder()
1084            .schema(&schema)
1085            .codec(codec)
1086            .writer(Vec::new())
1087            .build();
1088
1089        let mut record1 = Record::new(&schema).unwrap();
1090        record1.put(
1091            "a",
1092            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1093        );
1094
1095        let mut record2 = Record::new(&schema).unwrap();
1096        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1097
1098        let n1 = writer.append(record1)?;
1099        let n2 = writer.append(record2)?;
1100        let n3 = writer.flush()?;
1101        let result = writer.into_inner()?;
1102
1103        assert_eq!(n1 + n2 + n3, result.len());
1104
1105        let mut data = Vec::new();
1106        // byte indicating not null
1107        zig_i64(1, &mut data);
1108        zig_i64(1234, &mut data);
1109
1110        // byte indicating null
1111        zig_i64(0, &mut data);
1112        codec.compress(&mut data)?;
1113
1114        // starts with magic
1115        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1116        // ends with data and sync marker
1117        let last_data_byte = result.len() - 16;
1118        assert_eq!(
1119            &result[last_data_byte - data.len()..last_data_byte],
1120            data.as_slice()
1121        );
1122
1123        Ok(())
1124    }
1125
1126    #[test]
1127    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1128        let schema = Schema::parse_str(SCHEMA)?;
1129        let mut writer = Writer::new(&schema, Vec::new());
1130
1131        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1132        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1133        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1134        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1135
1136        let mut record = Record::new(&schema).unwrap();
1137        record.put("a", 27i64);
1138        record.put("b", "foo");
1139
1140        writer.append(record.clone())?;
1141        writer.append(record.clone())?;
1142        writer.flush()?;
1143        let result = writer.into_inner()?;
1144
1145        assert_eq!(result.len(), 260);
1146
1147        Ok(())
1148    }
1149
1150    #[test]
1151    fn test_avro_3881_metadata_empty_body() -> TestResult {
1152        let schema = Schema::parse_str(SCHEMA)?;
1153        let mut writer = Writer::new(&schema, Vec::new());
1154        writer.add_user_metadata("a".to_string(), "b")?;
1155        let result = writer.into_inner()?;
1156
1157        let reader = Reader::with_schema(&schema, &result[..])?;
1158        let mut expected = HashMap::new();
1159        expected.insert("a".to_string(), vec![b'b']);
1160        assert_eq!(reader.user_metadata(), &expected);
1161        assert_eq!(reader.into_iter().count(), 0);
1162
1163        Ok(())
1164    }
1165
1166    #[test]
1167    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1168        let schema = Schema::parse_str(SCHEMA)?;
1169        let mut writer = Writer::new(&schema, Vec::new());
1170
1171        let mut record = Record::new(&schema).unwrap();
1172        record.put("a", 27i64);
1173        record.put("b", "foo");
1174        writer.append(record.clone())?;
1175
1176        match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1177            Err(e @ Error::FileHeaderAlreadyWritten) => {
1178                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1179            }
1180            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1181            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1182        }
1183
1184        Ok(())
1185    }
1186
1187    #[test]
1188    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1189        let schema = Schema::parse_str(SCHEMA)?;
1190        let mut writer = Writer::new(&schema, Vec::new());
1191
1192        let key = "avro.stringKey".to_string();
1193        match writer.add_user_metadata(key.clone(), "value") {
1194            Err(ref e @ Error::InvalidMetadataKey(_)) => {
1195                assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1196            }
1197            Err(e) => panic!(
1198                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1199            ),
1200            Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1201        }
1202
1203        Ok(())
1204    }
1205
1206    #[test]
1207    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1208        let schema = Schema::parse_str(SCHEMA)?;
1209
1210        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1211        user_meta_data.insert(
1212            "stringKey".to_string(),
1213            Value::String("stringValue".to_string()),
1214        );
1215        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1216        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1217
1218        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1219            .writer(Vec::new())
1220            .schema(&schema)
1221            .user_metadata(user_meta_data.clone())
1222            .build();
1223
1224        assert_eq!(writer.user_metadata, user_meta_data);
1225
1226        Ok(())
1227    }
1228
1229    #[derive(Serialize, Clone)]
1230    struct TestSingleObjectWriter {
1231        a: i64,
1232        b: f64,
1233        c: Vec<String>,
1234    }
1235
1236    impl AvroSchema for TestSingleObjectWriter {
1237        fn get_schema() -> Schema {
1238            let schema = r#"
1239            {
1240                "type":"record",
1241                "name":"TestSingleObjectWrtierSerialize",
1242                "fields":[
1243                    {
1244                        "name":"a",
1245                        "type":"long"
1246                    },
1247                    {
1248                        "name":"b",
1249                        "type":"double"
1250                    },
1251                    {
1252                        "name":"c",
1253                        "type":{
1254                            "type":"array",
1255                            "items":"string"
1256                        }
1257                    }
1258                ]
1259            }
1260            "#;
1261            Schema::parse_str(schema).unwrap()
1262        }
1263    }
1264
1265    impl From<TestSingleObjectWriter> for Value {
1266        fn from(obj: TestSingleObjectWriter) -> Value {
1267            Value::Record(vec![
1268                ("a".into(), obj.a.into()),
1269                ("b".into(), obj.b.into()),
1270                (
1271                    "c".into(),
1272                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1273                ),
1274            ])
1275        }
1276    }
1277
1278    #[test]
1279    fn test_single_object_writer() -> TestResult {
1280        let mut buf: Vec<u8> = Vec::new();
1281        let obj = TestSingleObjectWriter {
1282            a: 300,
1283            b: 34.555,
1284            c: vec!["cat".into(), "dog".into()],
1285        };
1286        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1287            &TestSingleObjectWriter::get_schema(),
1288            1024,
1289        )
1290        .expect("Should resolve schema");
1291        let value = obj.into();
1292        let written_bytes = writer
1293            .write_value_ref(&value, &mut buf)
1294            .expect("Error serializing properly");
1295
1296        assert!(buf.len() > 10, "no bytes written");
1297        assert_eq!(buf.len(), written_bytes);
1298        assert_eq!(buf[0], 0xC3);
1299        assert_eq!(buf[1], 0x01);
1300        assert_eq!(
1301            &buf[2..10],
1302            &TestSingleObjectWriter::get_schema()
1303                .fingerprint::<Rabin>()
1304                .bytes[..]
1305        );
1306        let mut msg_binary = Vec::new();
1307        encode(
1308            &value,
1309            &TestSingleObjectWriter::get_schema(),
1310            &mut msg_binary,
1311        )
1312        .expect("encode should have failed by here as a dependency of any writing");
1313        assert_eq!(&buf[10..], &msg_binary[..]);
1314
1315        Ok(())
1316    }
1317
1318    #[test]
1319    fn test_writer_parity() -> TestResult {
1320        let obj1 = TestSingleObjectWriter {
1321            a: 300,
1322            b: 34.555,
1323            c: vec!["cat".into(), "dog".into()],
1324        };
1325
1326        let mut buf1: Vec<u8> = Vec::new();
1327        let mut buf2: Vec<u8> = Vec::new();
1328        let mut buf3: Vec<u8> = Vec::new();
1329
1330        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1331            &TestSingleObjectWriter::get_schema(),
1332            1024,
1333        )
1334        .expect("Should resolve schema");
1335        let mut specific_writer =
1336            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1337                .expect("Resolved should pass");
1338        specific_writer
1339            .write(obj1.clone(), &mut buf1)
1340            .expect("Serialization expected");
1341        specific_writer
1342            .write_value(obj1.clone(), &mut buf2)
1343            .expect("Serialization expected");
1344        generic_writer
1345            .write_value(obj1.into(), &mut buf3)
1346            .expect("Serialization expected");
1347        assert_eq!(buf1, buf2);
1348        assert_eq!(buf1, buf3);
1349
1350        Ok(())
1351    }
1352
1353    #[test]
1354    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1355        const SCHEMA: &str = r#"
1356  {
1357      "type": "record",
1358      "name": "Conference",
1359      "fields": [
1360          {"type": "string", "name": "name"},
1361          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1362      ]
1363  }"#;
1364
1365        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1366        pub struct Conference {
1367            pub name: String,
1368            pub time: Option<i64>,
1369        }
1370
1371        let conf = Conference {
1372            name: "RustConf".to_string(),
1373            time: Some(1234567890),
1374        };
1375
1376        let schema = Schema::parse_str(SCHEMA)?;
1377        let mut writer = Writer::new(&schema, Vec::new());
1378
1379        let bytes = writer.append_ser(conf)?;
1380
1381        assert_eq!(198, bytes);
1382
1383        Ok(())
1384    }
1385
1386    #[test]
1387    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1388        const SCHEMA: &str = r#"
1389  {
1390      "type": "record",
1391      "name": "Conference",
1392      "fields": [
1393          {"type": "string", "name": "name"},
1394          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1395      ]
1396  }"#;
1397
1398        #[derive(Debug, PartialEq, Clone, Serialize)]
1399        pub struct Conference {
1400            pub name: String,
1401            pub time: Option<f64>, // wrong type: f64 instead of i64
1402        }
1403
1404        let conf = Conference {
1405            name: "RustConf".to_string(),
1406            time: Some(12345678.90),
1407        };
1408
1409        let schema = Schema::parse_str(SCHEMA)?;
1410        let mut writer = Writer::new(&schema, Vec::new());
1411
1412        match writer.append_ser(conf) {
1413            Ok(bytes) => panic!("Expected an error, but got {} bytes written", bytes),
1414            Err(e) => {
1415                assert_eq!(
1416                    e.to_string(),
1417                    r#"Value Record([("name", String("RustConf")), ("time", Union(1, Double(12345678.9)))]) does not match schema Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Reason: Unsupported value-schema combination! Value: Double(12345678.9), schema: Long"#
1418                );
1419            }
1420        }
1421        Ok(())
1422    }
1423}