1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use async_trait::async_trait;
use avro_rs::{from_value, Codec, Reader, Schema, Writer};
use pipebase::{
common::{ConfigInto, FromConfig, FromPath},
map::Map,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
#[derive(Deserialize)]
enum Compression {
Null,
Deflate,
Snappy,
}
fn get_codec(compression: Compression) -> Codec {
match compression {
Compression::Null => Codec::Null,
Compression::Deflate => Codec::Deflate,
Compression::Snappy => Codec::Snappy,
}
}
#[derive(Deserialize)]
pub struct AvroSerConfig {
compression: Compression,
schema: String,
}
impl FromPath for AvroSerConfig {}
impl ConfigInto<AvroSer> for AvroSerConfig {}
pub struct AvroSer {
codec: Codec,
schema: Schema,
}
#[async_trait]
impl FromConfig<AvroSerConfig> for AvroSer {
async fn from_config(config: AvroSerConfig) -> anyhow::Result<Self> {
Ok(AvroSer {
codec: get_codec(config.compression),
schema: Schema::parse_str(&config.schema)?,
})
}
}
impl AvroSer {
fn serialize<T: Serialize>(
items: Vec<T>,
schema: &Schema,
codec: Codec,
) -> anyhow::Result<Vec<u8>> {
let mut writer = Writer::with_codec(schema, Vec::new(), codec);
for item in items {
writer.append_ser(item)?;
}
Ok(writer.into_inner()?)
}
}
#[async_trait]
impl<T> Map<Vec<T>, Vec<u8>, AvroSerConfig> for AvroSer
where
T: Serialize + Send + 'static,
{
async fn map(&mut self, data: Vec<T>) -> anyhow::Result<Vec<u8>> {
let schema = &self.schema;
let codec = self.codec;
Self::serialize(data, schema, codec)
}
}
#[derive(Deserialize)]
pub struct AvroDeserConfig {}
#[async_trait]
impl FromPath for AvroDeserConfig {
async fn from_path<P>(_path: P) -> anyhow::Result<Self>
where
P: AsRef<std::path::Path> + Send,
{
Ok(AvroDeserConfig {})
}
}
impl ConfigInto<AvroDeser> for AvroDeserConfig {}
pub struct AvroDeser {}
#[async_trait]
impl FromConfig<AvroDeserConfig> for AvroDeser {
async fn from_config(_config: AvroDeserConfig) -> anyhow::Result<Self> {
Ok(AvroDeser {})
}
}
impl AvroDeser {
fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> anyhow::Result<Vec<T>> {
let reader = Reader::new(bytes)?;
let mut items: Vec<T> = vec![];
for value in reader {
items.push(from_value::<T>(&value?)?);
}
Ok(items)
}
}
#[async_trait]
impl<T> Map<Vec<u8>, Vec<T>, AvroDeserConfig> for AvroDeser
where
T: DeserializeOwned,
{
async fn map(&mut self, data: Vec<u8>) -> anyhow::Result<Vec<T>> {
Self::deserialize(&data)
}
}