prosto 0.6.4

Compress prost! messages with zstd, optional tokio channels support
Documentation
use crate::decompress::*;
use crate::Error;
use crate::StreamError;
use futures::prelude::*;
use futures::stream;

type Result<M> = std::result::Result<M, Error>;

pub struct Decompressor;

impl Decompressor {
    pub fn stream<
        M: prost::Message + std::default::Default,
        In: AsRef<[u8]>,
        S: Stream<Item = In>,
    >(
        in_stream: S,
    ) -> impl Stream<Item = Result<M>> {
        in_stream
            .map(|compressed| ProstDecoder::new_decompressed(compressed.as_ref()))
            .map_ok(stream::iter)
            .try_flatten()
            .into_stream()
    }

    pub fn try_stream<
        M: prost::Message + std::default::Default,
        In: AsRef<[u8]>,
        E: std::fmt::Debug,
        S: TryStream<Ok = In, Error = E>,
    >(
        in_stream: S,
    ) -> impl TryStream<Ok = M, Error = StreamError<E>> {
        in_stream
            .map_err(StreamError::Other)
            .and_then(|compressed| async move {
                let iter = ProstDecoder::new_decompressed(compressed.as_ref())
                    .map_err(StreamError::Prosto)?;
                Ok(iter)
            })
            .map_ok(stream::iter)
            .map_ok(|iter| {
                iter.map(|r| match r {
                    Ok(msg) => Ok(msg),
                    Err(e) => Err(StreamError::Prosto(e)),
                })
            })
            .try_flatten()
    }
}