1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use async_trait::async_trait;
use avro_rs::{from_value, Codec, Reader, Schema, Writer};
use pipebase::{
    common::{ConfigInto, FromConfig, FromPath},
    map::Map,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

#[derive(Deserialize)]
enum Compression {
    Null,
    Deflate,
    Snappy,
}

fn get_codec(compression: Compression) -> Codec {
    match compression {
        Compression::Null => Codec::Null,
        Compression::Deflate => Codec::Deflate,
        Compression::Snappy => Codec::Snappy,
    }
}

#[derive(Deserialize)]
pub struct AvroSerConfig {
    compression: Compression,
    schema: String,
}

impl FromPath for AvroSerConfig {}

impl ConfigInto<AvroSer> for AvroSerConfig {}

pub struct AvroSer {
    codec: Codec,
    schema: Schema,
}

#[async_trait]
impl FromConfig<AvroSerConfig> for AvroSer {
    async fn from_config(config: AvroSerConfig) -> anyhow::Result<Self> {
        Ok(AvroSer {
            codec: get_codec(config.compression),
            schema: Schema::parse_str(&config.schema)?,
        })
    }
}

impl AvroSer {
    fn serialize<T: Serialize>(
        items: Vec<T>,
        schema: &Schema,
        codec: Codec,
    ) -> anyhow::Result<Vec<u8>> {
        let mut writer = Writer::with_codec(schema, Vec::new(), codec);
        for item in items {
            writer.append_ser(item)?;
        }
        Ok(writer.into_inner()?)
    }
}

#[async_trait]
impl<T> Map<Vec<T>, Vec<u8>, AvroSerConfig> for AvroSer
where
    T: Serialize + Send + 'static,
{
    async fn map(&mut self, data: Vec<T>) -> anyhow::Result<Vec<u8>> {
        let schema = &self.schema;
        let codec = self.codec;
        Self::serialize(data, schema, codec)
    }
}

#[derive(Deserialize)]
pub struct AvroDeserConfig {}

#[async_trait]
impl FromPath for AvroDeserConfig {
    async fn from_path<P>(_path: P) -> anyhow::Result<Self>
    where
        P: AsRef<std::path::Path> + Send,
    {
        Ok(AvroDeserConfig {})
    }
}

impl ConfigInto<AvroDeser> for AvroDeserConfig {}

pub struct AvroDeser {}

#[async_trait]
impl FromConfig<AvroDeserConfig> for AvroDeser {
    async fn from_config(_config: AvroDeserConfig) -> anyhow::Result<Self> {
        Ok(AvroDeser {})
    }
}

impl AvroDeser {
    fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> anyhow::Result<Vec<T>> {
        let reader = Reader::new(bytes)?;
        let mut items: Vec<T> = vec![];
        for value in reader {
            items.push(from_value::<T>(&value?)?);
        }
        Ok(items)
    }
}

#[async_trait]
impl<T> Map<Vec<u8>, Vec<T>, AvroDeserConfig> for AvroDeser
where
    T: DeserializeOwned,
{
    async fn map(&mut self, data: Vec<u8>) -> anyhow::Result<Vec<T>> {
        Self::deserialize(&data)
    }
}