apache-avro 0.16.0

A library for working with Apache Avro in Rust
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Logic handling writing in Avro format at user level.
use crate::{
    encode::{encode, encode_internal, encode_to_vec},
    rabin::Rabin,
    schema::{AvroSchema, ResolvedOwnedSchema, ResolvedSchema, Schema},
    ser::Serializer,
    types::Value,
    AvroResult, Codec, Error,
};
use serde::Serialize;
use std::{collections::HashMap, convert::TryFrom, io::Write, marker::PhantomData};

const DEFAULT_BLOCK_SIZE: usize = 16000;
const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";

/// Main interface for writing Avro formatted values.
#[derive(typed_builder::TypedBuilder)]
pub struct Writer<'a, W> {
    schema: &'a Schema,
    writer: W,
    #[builder(default, setter(skip))]
    resolved_schema: Option<ResolvedSchema<'a>>,
    #[builder(default = Codec::Null)]
    codec: Codec,
    #[builder(default = DEFAULT_BLOCK_SIZE)]
    block_size: usize,
    #[builder(default = Vec::with_capacity(block_size), setter(skip))]
    buffer: Vec<u8>,
    #[builder(default, setter(skip))]
    serializer: Serializer,
    #[builder(default = 0, setter(skip))]
    num_values: usize,
    #[builder(default = generate_sync_marker())]
    marker: [u8; 16],
    #[builder(default = false, setter(skip))]
    has_header: bool,
    #[builder(default)]
    user_metadata: HashMap<String, Value>,
}

impl<'a, W: Write> Writer<'a, W> {
    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
    /// to.
    /// No compression `Codec` will be used.
    pub fn new(schema: &'a Schema, writer: W) -> Self {
        Writer::with_codec(schema, writer, Codec::Null)
    }

    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
    /// `io::Write` trait to write to.
    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
        let mut w = Self::builder()
            .schema(schema)
            .writer(writer)
            .codec(codec)
            .build();
        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
        w
    }

    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
    /// `io::Write` trait to write to.
    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
    /// be provided in `schemata`.
    pub fn with_schemata(
        schema: &'a Schema,
        schemata: Vec<&'a Schema>,
        writer: W,
        codec: Codec,
    ) -> Self {
        let mut w = Self::builder()
            .schema(schema)
            .writer(writer)
            .codec(codec)
            .build();
        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
        w
    }

    /// Creates a `Writer` that will append values to already populated
    /// `std::io::Write` using the provided `marker`
    /// No compression `Codec` will be used.
    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
    }

    /// Creates a `Writer` that will append values to already populated
    /// `std::io::Write` using the provided `marker`
    pub fn append_to_with_codec(
        schema: &'a Schema,
        writer: W,
        codec: Codec,
        marker: [u8; 16],
    ) -> Self {
        let mut w = Self::builder()
            .schema(schema)
            .writer(writer)
            .codec(codec)
            .marker(marker)
            .build();
        w.has_header = true;
        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
        w
    }

    /// Creates a `Writer` that will append values to already populated
    /// `std::io::Write` using the provided `marker`
    pub fn append_to_with_codec_schemata(
        schema: &'a Schema,
        schemata: Vec<&'a Schema>,
        writer: W,
        codec: Codec,
        marker: [u8; 16],
    ) -> Self {
        let mut w = Self::builder()
            .schema(schema)
            .writer(writer)
            .codec(codec)
            .marker(marker)
            .build();
        w.has_header = true;
        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
        w
    }

    /// Get a reference to the `Schema` associated to a `Writer`.
    pub fn schema(&self) -> &'a Schema {
        self.schema
    }

    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
    /// schema validation.
    ///
    /// Return the number of bytes written (it might be 0, see below).
    ///
    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
    /// internal buffering for performance reasons. If you want to be sure the value has been
    /// written, then call [`flush`](struct.Writer.html#method.flush).
    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
        let n = self.maybe_write_header()?;

        let avro = value.into();
        self.append_value_ref(&avro).map(|m| m + n)
    }

    /// Append a compatible value to a `Writer`, also performing schema validation.
    ///
    /// Return the number of bytes written (it might be 0, see below).
    ///
    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
    /// internal buffering for performance reasons. If you want to be sure the value has been
    /// written, then call [`flush`](struct.Writer.html#method.flush).
    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
        let n = self.maybe_write_header()?;

        // Lazy init for users using the builder pattern with error throwing
        match self.resolved_schema {
            Some(ref rs) => {
                write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
                self.num_values += 1;

                if self.buffer.len() >= self.block_size {
                    return self.flush().map(|b| b + n);
                }

                Ok(n)
            }
            None => {
                let rs = ResolvedSchema::try_from(self.schema)?;
                self.resolved_schema = Some(rs);
                self.append_value_ref(value)
            }
        }
    }

    /// Append anything implementing the `Serialize` trait to a `Writer` for
    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
    /// validation.
    ///
    /// Return the number of bytes written.
    ///
    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
    /// internal buffering for performance reasons. If you want to be sure the value has been
    /// written, then call [`flush`](struct.Writer.html#method.flush).
    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
        let avro_value = value.serialize(&mut self.serializer)?;
        self.append(avro_value)
    }

    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
    /// trait), also performing schema validation.
    ///
    /// Return the number of bytes written.
    ///
    /// **NOTE** This function forces the written data to be flushed (an implicit
    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
    where
        I: IntoIterator<Item = T>,
    {
        /*
        https://github.com/rust-lang/rfcs/issues/811 :(
        let mut stream = values
            .filter_map(|value| value.serialize(&mut self.serializer).ok())
            .map(|value| value.encode(self.schema))
            .collect::<Option<Vec<_>>>()
            .ok_or_else(|| err_msg("value does not match given schema"))?
            .into_iter()
            .fold(Vec::new(), |mut acc, stream| {
                num_values += 1;
                acc.extend(stream); acc
            });
        */

        let mut num_bytes = 0;
        for value in values {
            num_bytes += self.append(value)?;
        }
        num_bytes += self.flush()?;

        Ok(num_bytes)
    }

    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
    /// validation.
    ///
    /// Return the number of bytes written.
    ///
    /// **NOTE** This function forces the written data to be flushed (an implicit
    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
    where
        I: IntoIterator<Item = T>,
    {
        /*
        https://github.com/rust-lang/rfcs/issues/811 :(
        let mut stream = values
            .filter_map(|value| value.serialize(&mut self.serializer).ok())
            .map(|value| value.encode(self.schema))
            .collect::<Option<Vec<_>>>()
            .ok_or_else(|| err_msg("value does not match given schema"))?
            .into_iter()
            .fold(Vec::new(), |mut acc, stream| {
                num_values += 1;
                acc.extend(stream); acc
            });
        */

        let mut num_bytes = 0;
        for value in values {
            num_bytes += self.append_ser(value)?;
        }
        num_bytes += self.flush()?;

        Ok(num_bytes)
    }

    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
    /// validation on each value appended.
    ///
    /// Return the number of bytes written.
    ///
    /// **NOTE** This function forces the written data to be flushed (an implicit
    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
        let mut num_bytes = 0;
        for value in values {
            num_bytes += self.append_value_ref(value)?;
        }
        num_bytes += self.flush()?;

        Ok(num_bytes)
    }

    /// Flush the content appended to a `Writer`. Call this function to make sure all the content
    /// has been written before releasing the `Writer`.
    ///
    /// Return the number of bytes written.
    pub fn flush(&mut self) -> AvroResult<usize> {
        if self.num_values == 0 {
            return Ok(0);
        }

        self.codec.compress(&mut self.buffer)?;

        let num_values = self.num_values;
        let stream_len = self.buffer.len();

        let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
            + self.append_raw(&stream_len.into(), &Schema::Long)?
            + self
                .writer
                .write(self.buffer.as_ref())
                .map_err(Error::WriteBytes)?
            + self.append_marker()?;

        self.buffer.clear();
        self.num_values = 0;

        Ok(num_bytes)
    }

    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
    ///
    /// **NOTE** This function forces the written data to be flushed (an implicit
    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
    pub fn into_inner(mut self) -> AvroResult<W> {
        self.flush()?;
        Ok(self.writer)
    }

    /// Generate and append synchronization marker to the payload.
    fn append_marker(&mut self) -> AvroResult<usize> {
        // using .writer.write directly to avoid mutable borrow of self
        // with ref borrowing of self.marker
        self.writer.write(&self.marker).map_err(Error::WriteMarker)
    }

    /// Append a raw Avro Value to the payload avoiding to encode it again.
    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
    }

    /// Append pure bytes to the payload.
    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
        self.writer.write(bytes).map_err(Error::WriteBytes)
    }

    /// Adds custom metadata to the file.
    /// This method could be used only before adding the first record to the writer.
    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
        if !self.has_header {
            if key.starts_with("avro.") {
                return Err(Error::InvalidMetadataKey(key));
            }
            self.user_metadata
                .insert(key, Value::Bytes(value.as_ref().to_vec()));
            Ok(())
        } else {
            Err(Error::FileHeaderAlreadyWritten)
        }
    }

    /// Create an Avro header based on schema, codec and sync marker.
    fn header(&self) -> Result<Vec<u8>, Error> {
        let schema_bytes = serde_json::to_string(self.schema)
            .map_err(Error::ConvertJsonToString)?
            .into_bytes();

        let mut metadata = HashMap::with_capacity(2);
        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
        metadata.insert("avro.codec", self.codec.into());

        for (k, v) in &self.user_metadata {
            metadata.insert(k.as_str(), v.clone());
        }

        let mut header = Vec::new();
        header.extend_from_slice(AVRO_OBJECT_HEADER);
        encode(
            &metadata.into(),
            &Schema::Map(Box::new(Schema::Bytes)),
            &mut header,
        )?;
        header.extend_from_slice(&self.marker);

        Ok(header)
    }

    fn maybe_write_header(&mut self) -> AvroResult<usize> {
        if !self.has_header {
            let header = self.header()?;
            let n = self.append_bytes(header.as_ref())?;
            self.has_header = true;
            Ok(n)
        } else {
            Ok(0)
        }
    }
}

/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
/// schema validation.
///
/// This is an internal function which gets the bytes buffer where to write as parameter instead of
/// creating a new one like `to_avro_datum`.
fn write_avro_datum<T: Into<Value>>(
    schema: &Schema,
    value: T,
    buffer: &mut Vec<u8>,
) -> Result<(), Error> {
    let avro = value.into();
    if !avro.validate(schema) {
        return Err(Error::Validation);
    }
    encode(&avro, schema, buffer)?;
    Ok(())
}

fn write_avro_datum_schemata<T: Into<Value>>(
    schema: &Schema,
    schemata: Vec<&Schema>,
    value: T,
    buffer: &mut Vec<u8>,
) -> AvroResult<()> {
    let avro = value.into();
    let rs = ResolvedSchema::try_from(schemata)?;
    let names = rs.get_names();
    let enclosing_namespace = schema.namespace();
    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
        return Err(Error::Validation);
    }
    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
}

/// Writer that encodes messages according to the single object encoding v1 spec
/// Uses an API similar to the current File Writer
/// Writes all object bytes at once, and drains internal buffer
pub struct GenericSingleObjectWriter {
    buffer: Vec<u8>,
    resolved: ResolvedOwnedSchema,
}

impl GenericSingleObjectWriter {
    pub fn new_with_capacity(
        schema: &Schema,
        initial_buffer_cap: usize,
    ) -> AvroResult<GenericSingleObjectWriter> {
        let fingerprint = schema.fingerprint::<Rabin>();
        let mut buffer = Vec::with_capacity(initial_buffer_cap);
        let header = [
            0xC3,
            0x01,
            fingerprint.bytes[0],
            fingerprint.bytes[1],
            fingerprint.bytes[2],
            fingerprint.bytes[3],
            fingerprint.bytes[4],
            fingerprint.bytes[5],
            fingerprint.bytes[6],
            fingerprint.bytes[7],
        ];
        buffer.extend_from_slice(&header);

        Ok(GenericSingleObjectWriter {
            buffer,
            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
        })
    }

    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
        if self.buffer.len() != 10 {
            Err(Error::IllegalSingleObjectWriterState)
        } else {
            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
            writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
            let len = self.buffer.len();
            self.buffer.truncate(10);
            Ok(len)
        }
    }

    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
        self.write_value_ref(&v, writer)
    }
}

/// Writer that encodes messages according to the single object encoding v1 spec
pub struct SpecificSingleObjectWriter<T>
where
    T: AvroSchema,
{
    inner: GenericSingleObjectWriter,
    _model: PhantomData<T>,
}

impl<T> SpecificSingleObjectWriter<T>
where
    T: AvroSchema,
{
    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
        let schema = T::get_schema();
        Ok(SpecificSingleObjectWriter {
            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
            _model: PhantomData,
        })
    }
}

impl<T> SpecificSingleObjectWriter<T>
where
    T: AvroSchema + Into<Value>,
{
    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
    /// of bytes written including the header
    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
        let v: Value = data.into();
        self.inner.write_value_ref(&v, writer)
    }
}

impl<T> SpecificSingleObjectWriter<T>
where
    T: AvroSchema + Serialize,
{
    /// Write the referenced Serialize object to the provided Write object. Returns a result with
    /// the number of bytes written including the header
    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
        let mut serializer = Serializer::default();
        let v = data.serialize(&mut serializer)?;
        self.inner.write_value_ref(&v, writer)
    }

    /// Write the Serialize object to the provided Write object. Returns a result with the number
    /// of bytes written including the header
    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
        self.write_ref(&data, writer)
    }
}

fn write_value_ref_resolved(
    schema: &Schema,
    resolved_schema: &ResolvedSchema,
    value: &Value,
    buffer: &mut Vec<u8>,
) -> AvroResult<()> {
    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
        Some(err) => Err(Error::ValidationWithReason(err)),
        None => encode_internal(
            value,
            schema,
            resolved_schema.get_names(),
            &schema.namespace(),
            buffer,
        ),
    }
}

fn write_value_ref_owned_resolved(
    resolved_schema: &ResolvedOwnedSchema,
    value: &Value,
    buffer: &mut Vec<u8>,
) -> AvroResult<()> {
    let root_schema = resolved_schema.get_root_schema();
    if let Some(err) = value.validate_internal(
        root_schema,
        resolved_schema.get_names(),
        &root_schema.namespace(),
    ) {
        return Err(Error::ValidationWithReason(err));
    }
    encode_internal(
        value,
        root_schema,
        resolved_schema.get_names(),
        &root_schema.namespace(),
        buffer,
    )?;
    Ok(())
}

/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
/// performing schema validation.
///
/// **NOTE** This function has a quite small niche of usage and does NOT generate headers and sync
/// markers; use [`Writer`](struct.Writer.html) to be fully Avro-compatible if you don't know what
/// you are doing, instead.
pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
    let mut buffer = Vec::new();
    write_avro_datum(schema, value, &mut buffer)?;
    Ok(buffer)
}

/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
/// performing schema validation.
/// If the provided `schema` is incomplete then its dependencies must be
/// provided in `schemata`
pub fn to_avro_datum_schemata<T: Into<Value>>(
    schema: &Schema,
    schemata: Vec<&Schema>,
    value: T,
) -> AvroResult<Vec<u8>> {
    let mut buffer = Vec::new();
    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
    Ok(buffer)
}

#[cfg(not(target_arch = "wasm32"))]
fn generate_sync_marker() -> [u8; 16] {
    let mut marker = [0_u8; 16];
    std::iter::repeat_with(rand::random)
        .take(16)
        .enumerate()
        .for_each(|(i, n)| marker[i] = n);
    marker
}

#[cfg(target_arch = "wasm32")]
fn generate_sync_marker() -> [u8; 16] {
    let mut marker = [0_u8; 16];
    std::iter::repeat_with(quad_rand::rand)
        .take(4)
        .flat_map(|i| i.to_be_bytes())
        .enumerate()
        .for_each(|(i, n)| marker[i] = n);
    marker
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        decimal::Decimal,
        duration::{Days, Duration, Millis, Months},
        schema::{DecimalSchema, FixedSchema, Name},
        types::Record,
        util::zig_i64,
    };
    use pretty_assertions::assert_eq;
    use serde::{Deserialize, Serialize};

    use apache_avro_test_helper::TestResult;

    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();

    const SCHEMA: &str = r#"
    {
      "type": "record",
      "name": "test",
      "fields": [
        {
          "name": "a",
          "type": "long",
          "default": 42
        },
        {
          "name": "b",
          "type": "string"
        }
      ]
    }
    "#;
    const UNION_SCHEMA: &str = r#"["null", "long"]"#;

    #[test]
    fn test_to_avro_datum() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let mut record = Record::new(&schema).unwrap();
        record.put("a", 27i64);
        record.put("b", "foo");

        let mut expected = Vec::new();
        zig_i64(27, &mut expected);
        zig_i64(3, &mut expected);
        expected.extend([b'f', b'o', b'o']);

        assert_eq!(to_avro_datum(&schema, record)?, expected);

        Ok(())
    }

    #[test]
    fn test_union_not_null() -> TestResult {
        let schema = Schema::parse_str(UNION_SCHEMA)?;
        let union = Value::Union(1, Box::new(Value::Long(3)));

        let mut expected = Vec::new();
        zig_i64(1, &mut expected);
        zig_i64(3, &mut expected);

        assert_eq!(to_avro_datum(&schema, union)?, expected);

        Ok(())
    }

    #[test]
    fn test_union_null() -> TestResult {
        let schema = Schema::parse_str(UNION_SCHEMA)?;
        let union = Value::Union(0, Box::new(Value::Null));

        let mut expected = Vec::new();
        zig_i64(0, &mut expected);

        assert_eq!(to_avro_datum(&schema, union)?, expected);

        Ok(())
    }

    fn logical_type_test<T: Into<Value> + Clone>(
        schema_str: &'static str,

        expected_schema: &Schema,
        value: Value,

        raw_schema: &Schema,
        raw_value: T,
    ) -> TestResult {
        let schema = Schema::parse_str(schema_str)?;
        assert_eq!(&schema, expected_schema);
        // The serialized format should be the same as the schema.
        let ser = to_avro_datum(&schema, value.clone())?;
        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
        assert_eq!(ser, raw_ser);

        // Should deserialize from the schema into the logical type.
        let mut r = ser.as_slice();
        let de = crate::from_avro_datum(&schema, &mut r, None)?;
        assert_eq!(de, value);
        Ok(())
    }

    #[test]
    fn date() -> TestResult {
        logical_type_test(
            r#"{"type": "int", "logicalType": "date"}"#,
            &Schema::Date,
            Value::Date(1_i32),
            &Schema::Int,
            1_i32,
        )
    }

    #[test]
    fn time_millis() -> TestResult {
        logical_type_test(
            r#"{"type": "int", "logicalType": "time-millis"}"#,
            &Schema::TimeMillis,
            Value::TimeMillis(1_i32),
            &Schema::Int,
            1_i32,
        )
    }

    #[test]
    fn time_micros() -> TestResult {
        logical_type_test(
            r#"{"type": "long", "logicalType": "time-micros"}"#,
            &Schema::TimeMicros,
            Value::TimeMicros(1_i64),
            &Schema::Long,
            1_i64,
        )
    }

    #[test]
    fn timestamp_millis() -> TestResult {
        logical_type_test(
            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
            &Schema::TimestampMillis,
            Value::TimestampMillis(1_i64),
            &Schema::Long,
            1_i64,
        )
    }

    #[test]
    fn timestamp_micros() -> TestResult {
        logical_type_test(
            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
            &Schema::TimestampMicros,
            Value::TimestampMicros(1_i64),
            &Schema::Long,
            1_i64,
        )
    }

    #[test]
    fn decimal_fixed() -> TestResult {
        let size = 30;
        let inner = Schema::Fixed(FixedSchema {
            name: Name::new("decimal")?,
            aliases: None,
            doc: None,
            size,
            attributes: Default::default(),
        });
        let value = vec![0u8; size];
        logical_type_test(
            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
            &Schema::Decimal(DecimalSchema {
                precision: 20,
                scale: 5,
                inner: Box::new(inner.clone()),
            }),
            Value::Decimal(Decimal::from(value.clone())),
            &inner,
            Value::Fixed(size, value),
        )
    }

    #[test]
    fn decimal_bytes() -> TestResult {
        let inner = Schema::Bytes;
        let value = vec![0u8; 10];
        logical_type_test(
            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
            &Schema::Decimal(DecimalSchema {
                precision: 4,
                scale: 3,
                inner: Box::new(inner.clone()),
            }),
            Value::Decimal(Decimal::from(value.clone())),
            &inner,
            value,
        )
    }

    #[test]
    fn duration() -> TestResult {
        let inner = Schema::Fixed(FixedSchema {
            name: Name::new("duration")?,
            aliases: None,
            doc: None,
            size: 12,
            attributes: Default::default(),
        });
        let value = Value::Duration(Duration::new(
            Months::new(256),
            Days::new(512),
            Millis::new(1024),
        ));
        logical_type_test(
            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
            &Schema::Duration,
            value,
            &inner,
            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
        )
    }

    #[test]
    fn test_writer_append() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let mut writer = Writer::new(&schema, Vec::new());

        let mut record = Record::new(&schema).unwrap();
        record.put("a", 27i64);
        record.put("b", "foo");

        let n1 = writer.append(record.clone())?;
        let n2 = writer.append(record.clone())?;
        let n3 = writer.flush()?;
        let result = writer.into_inner()?;

        assert_eq!(n1 + n2 + n3, result.len());

        let mut data = Vec::new();
        zig_i64(27, &mut data);
        zig_i64(3, &mut data);
        data.extend(b"foo");
        data.extend(data.clone());

        // starts with magic
        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
        // ends with data and sync marker
        let last_data_byte = result.len() - 16;
        assert_eq!(
            &result[last_data_byte - data.len()..last_data_byte],
            data.as_slice()
        );

        Ok(())
    }

    #[test]
    fn test_writer_extend() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let mut writer = Writer::new(&schema, Vec::new());

        let mut record = Record::new(&schema).unwrap();
        record.put("a", 27i64);
        record.put("b", "foo");
        let record_copy = record.clone();
        let records = vec![record, record_copy];

        let n1 = writer.extend(records.into_iter())?;
        let n2 = writer.flush()?;
        let result = writer.into_inner()?;

        assert_eq!(n1 + n2, result.len());

        let mut data = Vec::new();
        zig_i64(27, &mut data);
        zig_i64(3, &mut data);
        data.extend(b"foo");
        data.extend(data.clone());

        // starts with magic
        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
        // ends with data and sync marker
        let last_data_byte = result.len() - 16;
        assert_eq!(
            &result[last_data_byte - data.len()..last_data_byte],
            data.as_slice()
        );

        Ok(())
    }

    #[derive(Debug, Clone, Deserialize, Serialize)]
    struct TestSerdeSerialize {
        a: i64,
        b: String,
    }

    #[test]
    fn test_writer_append_ser() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let mut writer = Writer::new(&schema, Vec::new());

        let record = TestSerdeSerialize {
            a: 27,
            b: "foo".to_owned(),
        };

        let n1 = writer.append_ser(record)?;
        let n2 = writer.flush()?;
        let result = writer.into_inner()?;

        assert_eq!(n1 + n2, result.len());

        let mut data = Vec::new();
        zig_i64(27, &mut data);
        zig_i64(3, &mut data);
        data.extend(b"foo");

        // starts with magic
        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
        // ends with data and sync marker
        let last_data_byte = result.len() - 16;
        assert_eq!(
            &result[last_data_byte - data.len()..last_data_byte],
            data.as_slice()
        );

        Ok(())
    }

    #[test]
    fn test_writer_extend_ser() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let mut writer = Writer::new(&schema, Vec::new());

        let record = TestSerdeSerialize {
            a: 27,
            b: "foo".to_owned(),
        };
        let record_copy = record.clone();
        let records = vec![record, record_copy];

        let n1 = writer.extend_ser(records.into_iter())?;
        let n2 = writer.flush()?;
        let result = writer.into_inner()?;

        assert_eq!(n1 + n2, result.len());

        let mut data = Vec::new();
        zig_i64(27, &mut data);
        zig_i64(3, &mut data);
        data.extend(b"foo");
        data.extend(data.clone());

        // starts with magic
        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
        // ends with data and sync marker
        let last_data_byte = result.len() - 16;
        assert_eq!(
            &result[last_data_byte - data.len()..last_data_byte],
            data.as_slice()
        );

        Ok(())
    }

    fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
        Writer::with_codec(schema, Vec::new(), Codec::Deflate)
    }

    fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
        Writer::builder()
            .writer(Vec::new())
            .schema(schema)
            .codec(Codec::Deflate)
            .block_size(100)
            .build()
    }

    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
        let mut record = Record::new(schema).unwrap();
        record.put("a", 27i64);
        record.put("b", "foo");

        let n1 = writer.append(record.clone())?;
        let n2 = writer.append(record.clone())?;
        let n3 = writer.flush()?;
        let result = writer.into_inner()?;

        assert_eq!(n1 + n2 + n3, result.len());

        let mut data = Vec::new();
        zig_i64(27, &mut data);
        zig_i64(3, &mut data);
        data.extend(b"foo");
        data.extend(data.clone());
        Codec::Deflate.compress(&mut data)?;

        // starts with magic
        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
        // ends with data and sync marker
        let last_data_byte = result.len() - 16;
        assert_eq!(
            &result[last_data_byte - data.len()..last_data_byte],
            data.as_slice()
        );

        Ok(())
    }

    #[test]
    fn test_writer_with_codec() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let writer = make_writer_with_codec(&schema);
        check_writer(writer, &schema)
    }

    #[test]
    fn test_writer_with_builder() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let writer = make_writer_with_builder(&schema);
        check_writer(writer, &schema)
    }

    #[test]
    fn test_logical_writer() -> TestResult {
        const LOGICAL_TYPE_SCHEMA: &str = r#"
        {
          "type": "record",
          "name": "logical_type_test",
          "fields": [
            {
              "name": "a",
              "type": [
                "null",
                {
                  "type": "long",
                  "logicalType": "timestamp-micros"
                }
              ]
            }
          ]
        }
        "#;
        let codec = Codec::Deflate;
        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
        let mut writer = Writer::builder()
            .schema(&schema)
            .codec(codec)
            .writer(Vec::new())
            .build();

        let mut record1 = Record::new(&schema).unwrap();
        record1.put(
            "a",
            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
        );

        let mut record2 = Record::new(&schema).unwrap();
        record2.put("a", Value::Union(0, Box::new(Value::Null)));

        let n1 = writer.append(record1)?;
        let n2 = writer.append(record2)?;
        let n3 = writer.flush()?;
        let result = writer.into_inner()?;

        assert_eq!(n1 + n2 + n3, result.len());

        let mut data = Vec::new();
        // byte indicating not null
        zig_i64(1, &mut data);
        zig_i64(1234, &mut data);

        // byte indicating null
        zig_i64(0, &mut data);
        codec.compress(&mut data)?;

        // starts with magic
        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
        // ends with data and sync marker
        let last_data_byte = result.len() - 16;
        assert_eq!(
            &result[last_data_byte - data.len()..last_data_byte],
            data.as_slice()
        );

        Ok(())
    }

    #[test]
    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let mut writer = Writer::new(&schema, Vec::new());

        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
        writer.add_user_metadata("strKey".to_string(), "strValue")?;
        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;

        let mut record = Record::new(&schema).unwrap();
        record.put("a", 27i64);
        record.put("b", "foo");

        writer.append(record.clone())?;
        writer.append(record.clone())?;
        writer.flush()?;
        let result = writer.into_inner()?;

        assert_eq!(result.len(), 260);

        Ok(())
    }

    #[test]
    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let mut writer = Writer::new(&schema, Vec::new());

        let mut record = Record::new(&schema).unwrap();
        record.put("a", 27i64);
        record.put("b", "foo");
        writer.append(record.clone())?;

        match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
            Err(e @ Error::FileHeaderAlreadyWritten) => {
                assert_eq!(e.to_string(), "The file metadata is already flushed.")
            }
            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
        }

        Ok(())
    }

    #[test]
    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;
        let mut writer = Writer::new(&schema, Vec::new());

        let key = "avro.stringKey".to_string();
        match writer.add_user_metadata(key.clone(), "value") {
            Err(ref e @ Error::InvalidMetadataKey(_)) => {
                assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
            }
            Err(e) => panic!(
                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
            ),
            Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
        }

        Ok(())
    }

    #[test]
    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
        let schema = Schema::parse_str(SCHEMA)?;

        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
        user_meta_data.insert(
            "stringKey".to_string(),
            Value::String("stringValue".to_string()),
        );
        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));

        let writer: Writer<'_, Vec<u8>> = Writer::builder()
            .writer(Vec::new())
            .schema(&schema)
            .user_metadata(user_meta_data.clone())
            .build();

        assert_eq!(writer.user_metadata, user_meta_data);

        Ok(())
    }

    #[derive(Serialize, Clone)]
    struct TestSingleObjectWriter {
        a: i64,
        b: f64,
        c: Vec<String>,
    }

    impl AvroSchema for TestSingleObjectWriter {
        fn get_schema() -> Schema {
            let schema = r#"
            {
                "type":"record",
                "name":"TestSingleObjectWrtierSerialize",
                "fields":[
                    {
                        "name":"a",
                        "type":"long"
                    },
                    {
                        "name":"b",
                        "type":"double"
                    },
                    {
                        "name":"c",
                        "type":{
                            "type":"array",
                            "items":"string"
                        }
                    }
                ]
            }
            "#;
            Schema::parse_str(schema).unwrap()
        }
    }

    impl From<TestSingleObjectWriter> for Value {
        fn from(obj: TestSingleObjectWriter) -> Value {
            Value::Record(vec![
                ("a".into(), obj.a.into()),
                ("b".into(), obj.b.into()),
                (
                    "c".into(),
                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
                ),
            ])
        }
    }

    #[test]
    fn test_single_object_writer() -> TestResult {
        let mut buf: Vec<u8> = Vec::new();
        let obj = TestSingleObjectWriter {
            a: 300,
            b: 34.555,
            c: vec!["cat".into(), "dog".into()],
        };
        let mut writer = GenericSingleObjectWriter::new_with_capacity(
            &TestSingleObjectWriter::get_schema(),
            1024,
        )
        .expect("Should resolve schema");
        let value = obj.into();
        let written_bytes = writer
            .write_value_ref(&value, &mut buf)
            .expect("Error serializing properly");

        assert!(buf.len() > 10, "no bytes written");
        assert_eq!(buf.len(), written_bytes);
        assert_eq!(buf[0], 0xC3);
        assert_eq!(buf[1], 0x01);
        assert_eq!(
            &buf[2..10],
            &TestSingleObjectWriter::get_schema()
                .fingerprint::<Rabin>()
                .bytes[..]
        );
        let mut msg_binary = Vec::new();
        encode(
            &value,
            &TestSingleObjectWriter::get_schema(),
            &mut msg_binary,
        )
        .expect("encode should have failed by here as a dependency of any writing");
        assert_eq!(&buf[10..], &msg_binary[..]);

        Ok(())
    }

    #[test]
    fn test_writer_parity() -> TestResult {
        let obj1 = TestSingleObjectWriter {
            a: 300,
            b: 34.555,
            c: vec!["cat".into(), "dog".into()],
        };

        let mut buf1: Vec<u8> = Vec::new();
        let mut buf2: Vec<u8> = Vec::new();
        let mut buf3: Vec<u8> = Vec::new();

        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
            &TestSingleObjectWriter::get_schema(),
            1024,
        )
        .expect("Should resolve schema");
        let mut specific_writer =
            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
                .expect("Resolved should pass");
        specific_writer
            .write(obj1.clone(), &mut buf1)
            .expect("Serialization expected");
        specific_writer
            .write_value(obj1.clone(), &mut buf2)
            .expect("Serialization expected");
        generic_writer
            .write_value(obj1.into(), &mut buf3)
            .expect("Serialization expected");
        assert_eq!(buf1, buf2);
        assert_eq!(buf1, buf3);

        Ok(())
    }
}