pipejson 0.1.6

A pipebase plugin using json serde
Documentation
use async_trait::async_trait;
use pipebase::{
    common::{ConfigInto, FromConfig, FromPath, GroupAs, Pair},
    map::Map,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::path::Path;

#[derive(Deserialize)]
pub struct JsonSerConfig {}

#[async_trait]
impl FromPath for JsonSerConfig {
    async fn from_path<P>(_path: P) -> anyhow::Result<Self>
    where
        P: AsRef<Path> + Send,
    {
        Ok(JsonSerConfig {})
    }
}

#[async_trait]
impl ConfigInto<JsonSer> for JsonSerConfig {}

pub struct JsonSer {}

#[async_trait]
impl FromConfig<JsonSerConfig> for JsonSer {
    async fn from_config(_config: JsonSerConfig) -> anyhow::Result<Self> {
        Ok(JsonSer {})
    }
}

impl JsonSer {
    fn serialize<T: Serialize>(t: &T) -> anyhow::Result<Vec<u8>> {
        match serde_json::to_vec(t) {
            Ok(r) => Ok(r),
            Err(err) => Err(err.into()),
        }
    }
}

#[async_trait]
impl<T> Map<T, Vec<u8>, JsonSerConfig> for JsonSer
where
    T: Serialize + Send + Sync + 'static,
{
    async fn map(&mut self, t: T) -> anyhow::Result<Vec<u8>> {
        JsonSer::serialize(&t)
    }
}

#[derive(Deserialize)]
pub struct JsonDeserConfig {}

#[async_trait]
impl FromPath for JsonDeserConfig {
    async fn from_path<P>(_path: P) -> anyhow::Result<Self>
    where
        P: AsRef<Path> + Send,
    {
        Ok(JsonDeserConfig {})
    }
}

#[async_trait]
impl ConfigInto<JsonDeser> for JsonDeserConfig {}

pub struct JsonDeser {}

#[async_trait]
impl FromConfig<JsonDeserConfig> for JsonDeser {
    async fn from_config(_config: JsonDeserConfig) -> anyhow::Result<Self> {
        Ok(JsonDeser {})
    }
}

impl JsonDeser {
    fn deserialize<T: DeserializeOwned>(bytes: &[u8]) -> anyhow::Result<T> {
        let t: T = serde_json::from_slice::<T>(bytes)?;
        Ok(t)
    }
}

#[async_trait]
impl<T> Map<Vec<u8>, T, JsonDeserConfig> for JsonDeser
where
    T: DeserializeOwned + Sync,
{
    async fn map(&mut self, bytes: Vec<u8>) -> anyhow::Result<T> {
        JsonDeser::deserialize(bytes.as_slice())
    }
}

#[derive(Deserialize)]
pub struct JsonRecordSerConfig {}

#[async_trait]
impl FromPath for JsonRecordSerConfig {
    async fn from_path<P>(_path: P) -> anyhow::Result<Self>
    where
        P: AsRef<std::path::Path> + Send,
    {
        Ok(JsonRecordSerConfig {})
    }
}

impl ConfigInto<JsonRecordSer> for JsonRecordSerConfig {}

#[async_trait]
impl FromConfig<JsonRecordSerConfig> for JsonRecordSer {
    async fn from_config(_config: JsonRecordSerConfig) -> anyhow::Result<Self> {
        Ok(JsonRecordSer {})
    }
}

pub struct JsonRecordSer {}

impl JsonRecordSer {
    fn serialize<K, R>(record: &R) -> anyhow::Result<Pair<K, Vec<u8>>>
    where
        R: GroupAs<K> + Serialize,
    {
        let bytes = serde_json::to_vec(record)?;
        let key = record.group();
        Ok(Pair::new(key, bytes))
    }
}

#[async_trait]
impl<K, R> Map<R, Pair<K, Vec<u8>>, JsonRecordSerConfig> for JsonRecordSer
where
    R: GroupAs<K> + Serialize + Send + 'static,
{
    async fn map(&mut self, data: R) -> anyhow::Result<Pair<K, Vec<u8>>> {
        Self::serialize(&data)
    }
}