use crate::Error;
use std::io::Write;
#[cfg(feature = "enable-async")]
pub use stream::*;
use zstd::stream::Encoder;
#[cfg(feature = "enable-async")]
pub mod stream;
type VecEncoder<'a, W> = Encoder<'a, W>;
pub struct ProstEncoder<'a, W: Write> {
protobuf: Vec<u8>,
inner: VecEncoder<'a, W>,
}
impl<'a, W: Write> ProstEncoder<'a, W> {
pub fn new(writer: W, level: i32) -> Result<Self, Error> {
let inner = VecEncoder::new(writer, level).map_err(Error::Zstd)?;
let protobuf = Self::new_protobuf();
Ok(Self { inner, protobuf })
}
#[tracing::instrument(level = "trace", skip(self))]
pub fn finish(mut self) -> Result<W, Error> {
tracing::trace!("finish");
if !self.protobuf.is_empty() {
self.flush()?;
}
self.inner.finish().map_err(Error::Zstd)
}
pub fn get_ref(&self) -> &W {
self.inner.get_ref()
}
pub fn get_mut(&mut self) -> &mut W {
self.inner.get_mut()
}
#[tracing::instrument(level = "trace", skip(self, message))]
pub fn write<M: prost::Message>(&mut self, message: &M) -> Result<usize, Error> {
let encoded_len = message.encoded_len();
tracing::trace!(encoded_len, "writing message to the internal buffer");
let mut buf = vec![];
prost::encode_length_delimiter(message.encoded_len(), &mut buf).unwrap();
message.encode_length_delimited(&mut self.protobuf)?;
let recommended_input_size = VecEncoder::<W>::recommended_input_size();
if self.protobuf.len() >= recommended_input_size {
self.flush()?;
}
Ok(encoded_len)
}
#[tracing::instrument(level = "trace", skip(self))]
pub fn flush(&mut self) -> Result<(), Error> {
tracing::trace!(len = self.protobuf.len(), "flush protobuf");
self.inner.write_all(&self.protobuf).map_err(Error::Zstd)?;
self.protobuf = Self::new_protobuf();
Ok(())
}
#[tracing::instrument(level = "trace", skip(self))]
pub(crate) fn flush_inner(&mut self) -> Result<(), Error> {
tracing::trace!("flush inner");
self.inner.flush().map_err(Error::Zstd)?;
Ok(())
}
fn new_protobuf() -> Vec<u8> {
Vec::with_capacity(VecEncoder::<W>::recommended_input_size())
}
}