disintegrate_serde/serde/
avro.rs

1//! A module for serializing and deserializing data using Avro schema.
2use std::marker::PhantomData;
3
4use super::Error;
5use apache_avro::{from_value, Codec, Reader, Schema, Writer};
6use serde::{Deserialize, Serialize};
7
8use crate::serde::{Deserializer, Serializer};
9
10/// An Avro serialization and deserialization module.
11#[derive(Debug, Clone)]
12pub struct Avro<I, O> {
13    schema: Schema,
14    input: PhantomData<I>,
15    output: PhantomData<O>,
16}
17
18impl<I, O> Avro<I, O> {
19    /// Create a new instance of `Avro` with the specified Avro schema.
20    ///
21    /// # Arguments
22    ///
23    /// * `schema` - A string representing the Avro schema.
24    ///
25    /// # Returns
26    ///
27    /// A new `Avro` instance
28    pub fn new(schema: &str) -> Self {
29        let schema = Schema::parse_str(schema).unwrap();
30        Self {
31            schema,
32            input: PhantomData,
33            output: PhantomData,
34        }
35    }
36}
37
38impl<I, O> Serializer<I> for Avro<I, O>
39where
40    O: From<I> + Serialize,
41{
42    /// Serialize the given value to Avro format and return the serialized bytes.
43    ///
44    /// # Arguments
45    ///
46    /// * `value` - The value to be serialized.
47    ///
48    /// # Returns
49    ///
50    /// Serialized bytes representing the value in Avro format.
51    fn serialize(&self, value: I) -> Vec<u8> {
52        let target = O::from(value);
53        let mut writer = Writer::with_codec(&self.schema, Vec::new(), Codec::Deflate);
54        writer
55            .append_ser(target)
56            .expect("avro serialization should not fail");
57        writer.into_inner().expect("encoded avro should not fail")
58    }
59}
60
61impl<I, O> Deserializer<I> for Avro<I, O>
62where
63    I: TryFrom<O>,
64    for<'d> O: Deserialize<'d>,
65{
66    /// Deserialize the given Avro serialized bytes to produce a value of type `I`.
67    ///
68    /// # Arguments
69    ///
70    /// * `data` - The Avro serialized bytes to be deserialized.
71    ///
72    /// # Returns
73    ///
74    /// A `Result` containing the deserialized value on success, or an error on failure.
75    fn deserialize(&self, data: Vec<u8>) -> Result<I, Error> {
76        let mut reader = Reader::new(&data[..]).map_err(|e| Error::Deserialization(Box::new(e)))?;
77        let value = reader
78            .next()
79            .expect("at least one value should be present")
80            .map_err(|e| Error::Deserialization(Box::new(e)))?;
81        let target: O = from_value(&value).map_err(|e| Error::Deserialization(Box::new(e)))?;
82        I::try_from(target).map_err(|_| Error::Conversion)
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use std::convert::TryFrom;
90
91    #[derive(Debug, PartialEq, Clone)]
92    struct InputData {
93        value: u32,
94    }
95
96    #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
97    struct SerializedData {
98        value: String,
99    }
100
101    const TEST_SCHEMA: &str = r#"
102        {
103            "type": "record",
104            "name": "TestRecord",
105            "fields": [
106                { "name": "value", "type": "string" }
107            ]
108        }
109    "#;
110
111    #[derive(Debug, PartialEq)]
112    enum ConversionError {
113        InvalidValue,
114    }
115
116    impl TryFrom<SerializedData> for InputData {
117        type Error = ConversionError;
118
119        fn try_from(data: SerializedData) -> Result<Self, Self::Error> {
120            let input_value = data
121                .value
122                .parse::<u32>()
123                .map_err(|_| ConversionError::InvalidValue)?;
124            Ok(InputData { value: input_value })
125        }
126    }
127
128    impl From<InputData> for SerializedData {
129        fn from(data: InputData) -> Self {
130            SerializedData {
131                value: data.value.to_string(),
132            }
133        }
134    }
135
136    #[test]
137    fn it_serializes_and_deserializes_avro_data() {
138        // Create an instance of the Avro module with the test schema
139        let avro = Avro::<InputData, SerializedData>::new(TEST_SCHEMA);
140
141        let input = InputData { value: 42 };
142
143        // Serialize the input data
144        let serialized = avro.serialize(input.clone());
145
146        // Deserialize the serialized data
147        let deserialized: InputData = avro.deserialize(serialized).unwrap();
148
149        // Ensure the deserialized data matches the original input
150        assert_eq!(deserialized, input);
151    }
152}