prosto 0.6.4

Compress prost! messages with zstd, optional tokio channels support
Documentation
use crate::compress::*;
use crate::Error;
use futures::prelude::*;
use futures::stream;
use std::io::Cursor;

type Result<M> = std::result::Result<M, Error>;

pub struct Compressor;

impl Compressor {
    pub fn build_stream<M: prost::Message, S: Stream<Item = M>>(
        in_stream: S,
        level: i32,
        chunk_size: usize,
    ) -> Result<impl Stream<Item = Result<Vec<u8>>>> {
        let mut buf = vec![];
        buf.reserve(chunk_size);
        let buf = Cursor::new(buf);
        let encoder = ProstEncoder::new(buf, level)?;

        let in_stream = in_stream.map(Some).chain(stream::once(async { None }));
        let stream = in_stream.scan(Some(encoder), move |encoder, message| {
            if let Some(message) = message {
                future::ready(Some(Self::write_produce(
                    encoder, &message, level, chunk_size,
                )))
            } else {
                future::ready(Some(Self::finish(encoder.take().unwrap())))
            }
        });
        let stream = stream
            .try_filter_map(|maybe| async { Ok(maybe) })
            .into_stream();
        Ok(stream)
    }

    #[tracing::instrument(level = "trace", skip(encoder, message))]
    fn write_produce<M: prost::Message>(
        encoder: &mut Option<ProstEncoder<Cursor<Vec<u8>>>>,
        message: &M,
        level: i32,
        chunk_size: usize,
    ) -> Result<Option<Vec<u8>>> {
        encoder.as_mut().unwrap().write(message)?;
        Self::maybe_produce(encoder, level, chunk_size)
    }

    #[tracing::instrument(level = "trace", skip(encoder))]
    fn maybe_produce(
        encoder: &mut Option<ProstEncoder<Cursor<Vec<u8>>>>,
        level: i32,
        chunk_size: usize,
    ) -> Result<Option<Vec<u8>>> {
        let compressed_len = encoder.as_ref().unwrap().get_ref().position() as usize;
        if compressed_len > chunk_size {
            tracing::trace!(compressed_len, "producing chunk");
            let compressed = encoder.take().unwrap().finish()?.into_inner();

            let buf = Vec::with_capacity(chunk_size);
            let buf = Cursor::new(buf);
            encoder.replace(ProstEncoder::new(buf, level)?);

            tracing::trace!(len = compressed.len(), "produced chunk");
            Ok(Some(compressed))
        } else {
            tracing::trace!(compressed_len, "not enough bytes for a chunk");
            Ok(None)
        }
    }

    #[tracing::instrument(level = "trace", skip(encoder))]
    fn finish(mut encoder: ProstEncoder<Cursor<Vec<u8>>>) -> Result<Option<Vec<u8>>> {
        encoder.flush().unwrap();
        tracing::trace!(position = encoder.get_ref().position(), "encoder finish");
        let cursor = encoder.finish()?;
        let position = cursor.position() as usize;
        tracing::trace!(position = position, "after encoder finish");
        if position != 0 {
            let mut compressed = cursor.into_inner();
            compressed.truncate(position);
            tracing::trace!(len = compressed.len(), "produced final chunk");
            Ok(Some(compressed))
        } else {
            Ok(None)
        }
    }
}