disintegrate_serde/serde/
avro.rs1use std::marker::PhantomData;
3
4use super::Error;
5use apache_avro::{from_value, Codec, Reader, Schema, Writer};
6use serde::{Deserialize, Serialize};
7
8use crate::serde::{Deserializer, Serializer};
9
10#[derive(Debug, Clone)]
12pub struct Avro<I, O> {
13 schema: Schema,
14 input: PhantomData<I>,
15 output: PhantomData<O>,
16}
17
18impl<I, O> Avro<I, O> {
19 pub fn new(schema: &str) -> Self {
29 let schema = Schema::parse_str(schema).unwrap();
30 Self {
31 schema,
32 input: PhantomData,
33 output: PhantomData,
34 }
35 }
36}
37
38impl<I, O> Serializer<I> for Avro<I, O>
39where
40 O: From<I> + Serialize,
41{
42 fn serialize(&self, value: I) -> Vec<u8> {
52 let target = O::from(value);
53 let mut writer = Writer::with_codec(&self.schema, Vec::new(), Codec::Deflate);
54 writer
55 .append_ser(target)
56 .expect("avro serialization should not fail");
57 writer.into_inner().expect("encoded avro should not fail")
58 }
59}
60
61impl<I, O> Deserializer<I> for Avro<I, O>
62where
63 I: TryFrom<O>,
64 for<'d> O: Deserialize<'d>,
65{
66 fn deserialize(&self, data: Vec<u8>) -> Result<I, Error> {
76 let mut reader = Reader::new(&data[..]).map_err(|e| Error::Deserialization(Box::new(e)))?;
77 let value = reader
78 .next()
79 .expect("at least one value should be present")
80 .map_err(|e| Error::Deserialization(Box::new(e)))?;
81 let target: O = from_value(&value).map_err(|e| Error::Deserialization(Box::new(e)))?;
82 I::try_from(target).map_err(|_| Error::Conversion)
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use std::convert::TryFrom;
90
91 #[derive(Debug, PartialEq, Clone)]
92 struct InputData {
93 value: u32,
94 }
95
96 #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
97 struct SerializedData {
98 value: String,
99 }
100
101 const TEST_SCHEMA: &str = r#"
102 {
103 "type": "record",
104 "name": "TestRecord",
105 "fields": [
106 { "name": "value", "type": "string" }
107 ]
108 }
109 "#;
110
111 #[derive(Debug, PartialEq)]
112 enum ConversionError {
113 InvalidValue,
114 }
115
116 impl TryFrom<SerializedData> for InputData {
117 type Error = ConversionError;
118
119 fn try_from(data: SerializedData) -> Result<Self, Self::Error> {
120 let input_value = data
121 .value
122 .parse::<u32>()
123 .map_err(|_| ConversionError::InvalidValue)?;
124 Ok(InputData { value: input_value })
125 }
126 }
127
128 impl From<InputData> for SerializedData {
129 fn from(data: InputData) -> Self {
130 SerializedData {
131 value: data.value.to_string(),
132 }
133 }
134 }
135
136 #[test]
137 fn it_serializes_and_deserializes_avro_data() {
138 let avro = Avro::<InputData, SerializedData>::new(TEST_SCHEMA);
140
141 let input = InputData { value: 42 };
142
143 let serialized = avro.serialize(input.clone());
145
146 let deserialized: InputData = avro.deserialize(serialized).unwrap();
148
149 assert_eq!(deserialized, input);
151 }
152}