disintegrate_serde/serde/
avro.rs1use std::marker::PhantomData;
3
4use super::Error;
5use apache_avro::{from_value, Codec, DeflateSettings, Reader, Schema, Writer};
6use serde::{Deserialize, Serialize};
7
8use crate::serde::{Deserializer, Serializer};
9
10#[derive(Debug, Clone)]
12pub struct Avro<I, O> {
13 schema: Schema,
14 input: PhantomData<I>,
15 output: PhantomData<O>,
16}
17
18impl<I, O> Avro<I, O> {
19 pub fn new(schema: &str) -> Self {
29 let schema = Schema::parse_str(schema).unwrap();
30 Self {
31 schema,
32 input: PhantomData,
33 output: PhantomData,
34 }
35 }
36}
37
38impl<I, O> Serializer<I> for Avro<I, O>
39where
40 O: From<I> + Serialize,
41{
42 fn serialize(&self, value: I) -> Vec<u8> {
52 let target = O::from(value);
53 let mut writer = Writer::with_codec(
54 &self.schema,
55 Vec::new(),
56 Codec::Deflate(DeflateSettings::default()),
57 );
58 writer
59 .append_ser(target)
60 .expect("avro serialization should not fail");
61 writer.into_inner().expect("encoded avro should not fail")
62 }
63}
64
65impl<I, O> Deserializer<I> for Avro<I, O>
66where
67 I: TryFrom<O>,
68 for<'d> O: Deserialize<'d>,
69{
70 fn deserialize(&self, data: Vec<u8>) -> Result<I, Error> {
80 let mut reader = Reader::new(&data[..]).map_err(|e| Error::Deserialization(Box::new(e)))?;
81 let value = reader
82 .next()
83 .expect("at least one value should be present")
84 .map_err(|e| Error::Deserialization(Box::new(e)))?;
85 let target: O = from_value(&value).map_err(|e| Error::Deserialization(Box::new(e)))?;
86 I::try_from(target).map_err(|_| Error::Conversion)
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use std::convert::TryFrom;
94
95 #[derive(Debug, PartialEq, Clone)]
96 struct InputData {
97 value: u32,
98 }
99
100 #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
101 struct SerializedData {
102 value: String,
103 }
104
105 const TEST_SCHEMA: &str = r#"
106 {
107 "type": "record",
108 "name": "TestRecord",
109 "fields": [
110 { "name": "value", "type": "string" }
111 ]
112 }
113 "#;
114
115 #[derive(Debug, PartialEq)]
116 enum ConversionError {
117 InvalidValue,
118 }
119
120 impl TryFrom<SerializedData> for InputData {
121 type Error = ConversionError;
122
123 fn try_from(data: SerializedData) -> Result<Self, Self::Error> {
124 let input_value = data
125 .value
126 .parse::<u32>()
127 .map_err(|_| ConversionError::InvalidValue)?;
128 Ok(InputData { value: input_value })
129 }
130 }
131
132 impl From<InputData> for SerializedData {
133 fn from(data: InputData) -> Self {
134 SerializedData {
135 value: data.value.to_string(),
136 }
137 }
138 }
139
140 #[test]
141 fn it_serializes_and_deserializes_avro_data() {
142 let avro = Avro::<InputData, SerializedData>::new(TEST_SCHEMA);
144
145 let input = InputData { value: 42 };
146
147 let serialized = avro.serialize(input.clone());
149
150 let deserialized: InputData = avro.deserialize(serialized).unwrap();
152
153 assert_eq!(deserialized, input);
155 }
156}