Skip to main content

disintegrate_serde/serde/
avro.rs

1//! A module for serializing and deserializing data using Avro schema.
2use std::marker::PhantomData;
3
4use super::Error;
5use apache_avro::{from_value, Codec, DeflateSettings, Reader, Schema, Writer};
6use serde::{Deserialize, Serialize};
7
8use crate::serde::{Deserializer, Serializer};
9
10/// An Avro serialization and deserialization module.
11#[derive(Debug, Clone)]
12pub struct Avro<I, O> {
13    schema: Schema,
14    input: PhantomData<I>,
15    output: PhantomData<O>,
16}
17
18impl<I, O> Avro<I, O> {
19    /// Create a new instance of `Avro` with the specified Avro schema.
20    ///
21    /// # Arguments
22    ///
23    /// * `schema` - A string representing the Avro schema.
24    ///
25    /// # Returns
26    ///
27    /// A new `Avro` instance
28    pub fn new(schema: &str) -> Self {
29        let schema = Schema::parse_str(schema).unwrap();
30        Self {
31            schema,
32            input: PhantomData,
33            output: PhantomData,
34        }
35    }
36}
37
38impl<I, O> Serializer<I> for Avro<I, O>
39where
40    O: From<I> + Serialize,
41{
42    /// Serialize the given value to Avro format and return the serialized bytes.
43    ///
44    /// # Arguments
45    ///
46    /// * `value` - The value to be serialized.
47    ///
48    /// # Returns
49    ///
50    /// Serialized bytes representing the value in Avro format.
51    fn serialize(&self, value: I) -> Vec<u8> {
52        let target = O::from(value);
53        let mut writer = Writer::with_codec(
54            &self.schema,
55            Vec::new(),
56            Codec::Deflate(DeflateSettings::default()),
57        );
58        writer
59            .append_ser(target)
60            .expect("avro serialization should not fail");
61        writer.into_inner().expect("encoded avro should not fail")
62    }
63}
64
65impl<I, O> Deserializer<I> for Avro<I, O>
66where
67    I: TryFrom<O>,
68    for<'d> O: Deserialize<'d>,
69{
70    /// Deserialize the given Avro serialized bytes to produce a value of type `I`.
71    ///
72    /// # Arguments
73    ///
74    /// * `data` - The Avro serialized bytes to be deserialized.
75    ///
76    /// # Returns
77    ///
78    /// A `Result` containing the deserialized value on success, or an error on failure.
79    fn deserialize(&self, data: Vec<u8>) -> Result<I, Error> {
80        let mut reader = Reader::new(&data[..]).map_err(|e| Error::Deserialization(Box::new(e)))?;
81        let value = reader
82            .next()
83            .expect("at least one value should be present")
84            .map_err(|e| Error::Deserialization(Box::new(e)))?;
85        let target: O = from_value(&value).map_err(|e| Error::Deserialization(Box::new(e)))?;
86        I::try_from(target).map_err(|_| Error::Conversion)
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use std::convert::TryFrom;
94
95    #[derive(Debug, PartialEq, Clone)]
96    struct InputData {
97        value: u32,
98    }
99
100    #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
101    struct SerializedData {
102        value: String,
103    }
104
105    const TEST_SCHEMA: &str = r#"
106        {
107            "type": "record",
108            "name": "TestRecord",
109            "fields": [
110                { "name": "value", "type": "string" }
111            ]
112        }
113    "#;
114
115    #[derive(Debug, PartialEq)]
116    enum ConversionError {
117        InvalidValue,
118    }
119
120    impl TryFrom<SerializedData> for InputData {
121        type Error = ConversionError;
122
123        fn try_from(data: SerializedData) -> Result<Self, Self::Error> {
124            let input_value = data
125                .value
126                .parse::<u32>()
127                .map_err(|_| ConversionError::InvalidValue)?;
128            Ok(InputData { value: input_value })
129        }
130    }
131
132    impl From<InputData> for SerializedData {
133        fn from(data: InputData) -> Self {
134            SerializedData {
135                value: data.value.to_string(),
136            }
137        }
138    }
139
140    #[test]
141    fn it_serializes_and_deserializes_avro_data() {
142        // Create an instance of the Avro module with the test schema
143        let avro = Avro::<InputData, SerializedData>::new(TEST_SCHEMA);
144
145        let input = InputData { value: 42 };
146
147        // Serialize the input data
148        let serialized = avro.serialize(input.clone());
149
150        // Deserialize the serialized data
151        let deserialized: InputData = avro.deserialize(serialized).unwrap();
152
153        // Ensure the deserialized data matches the original input
154        assert_eq!(deserialized, input);
155    }
156}