use core::{
fmt,
pin::Pin,
task::{Context, Poll},
};
use std::io::{Error, ErrorKind, Result};
use brotli2::{
raw::{CoStatus, Compress, CompressOp, DeStatus, Decompress},
CompressParams,
};
use bytes::{Bytes, BytesMut};
use futures::{ready, stream::Stream};
use pin_project::unsafe_project;
#[unsafe_project(Unpin)]
pub struct BrotliEncoder<S: Stream<Item = Result<Bytes>>> {
#[pin]
inner: S,
flush: bool,
compress: Compress,
}
#[unsafe_project(Unpin)]
pub struct BrotliDecoder<S: Stream<Item = Result<Bytes>>> {
#[pin]
inner: S,
flush: bool,
decompress: Decompress,
}
impl<S: Stream<Item = Result<Bytes>>> BrotliEncoder<S> {
pub fn new(stream: S, level: u32) -> BrotliEncoder<S> {
let mut params = CompressParams::new();
params.quality(level);
BrotliEncoder::from_params(stream, ¶ms)
}
pub fn from_params(stream: S, params: &CompressParams) -> BrotliEncoder<S> {
let mut compress = Compress::new();
compress.set_params(params);
BrotliEncoder {
inner: stream,
flush: false,
compress,
}
}
pub fn get_ref(&self) -> &S {
&self.inner
}
pub fn get_mut(&mut self) -> &mut S {
&mut self.inner
}
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut S> {
self.project().inner
}
pub fn into_inner(self) -> S {
self.inner
}
}
impl<S: Stream<Item = Result<Bytes>>> BrotliDecoder<S> {
pub fn new(stream: S) -> BrotliDecoder<S> {
BrotliDecoder {
inner: stream,
flush: false,
decompress: Decompress::new(),
}
}
pub fn get_ref(&self) -> &S {
&self.inner
}
pub fn get_mut(&mut self) -> &mut S {
&mut self.inner
}
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut S> {
self.project().inner
}
pub fn into_inner(self) -> S {
self.inner
}
}
impl<S: Stream<Item = Result<Bytes>>> Stream for BrotliEncoder<S> {
type Item = Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
const OUTPUT_BUFFER_SIZE: usize = 8_000;
let this = self.project();
if *this.flush {
return Poll::Ready(None);
}
let input_buffer = if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
bytes?
} else {
*this.flush = true;
Bytes::new()
};
let mut compressed_output = BytesMut::with_capacity(OUTPUT_BUFFER_SIZE);
let input_ref = &mut input_buffer.as_ref();
let output_ref = &mut &mut [][..];
loop {
let status = this.compress.compress(
if *this.flush {
CompressOp::Finish
} else {
CompressOp::Process
},
input_ref,
output_ref,
)?;
while let Some(buf) = this.compress.take_output(None) {
compressed_output.extend_from_slice(buf);
}
match status {
CoStatus::Finished => break,
CoStatus::Unfinished => (),
}
}
Poll::Ready(Some(Ok(compressed_output.freeze())))
}
}
impl<S: Stream<Item = Result<Bytes>>> Stream for BrotliDecoder<S> {
type Item = Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
const OUTPUT_BUFFER_SIZE: usize = 8_000;
let this = self.project();
if *this.flush {
return Poll::Ready(None);
}
let input_buffer = if let Some(bytes) = ready!(this.inner.poll_next(cx)) {
bytes?
} else {
*this.flush = true;
Bytes::new()
};
let mut decompressed_output = BytesMut::with_capacity(OUTPUT_BUFFER_SIZE);
let input_ref = &mut input_buffer.as_ref();
let output_ref = &mut &mut [][..];
loop {
let status = this.decompress.decompress(input_ref, output_ref)?;
while let Some(buf) = this.decompress.take_output(None) {
decompressed_output.extend_from_slice(buf);
}
match status {
DeStatus::Finished => break,
DeStatus::NeedInput => {
if *this.flush {
return Poll::Ready(Some(Err(Error::new(
ErrorKind::UnexpectedEof,
"reached unexpected EOF",
))));
}
break;
}
DeStatus::NeedOutput => (),
}
}
Poll::Ready(Some(Ok(decompressed_output.freeze())))
}
}
impl<S: Stream<Item = Result<Bytes>> + fmt::Debug> fmt::Debug for BrotliEncoder<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BrotliEncoder")
.field("inner", &self.inner)
.field("flush", &self.flush)
.field("compress", &"<no debug>")
.finish()
}
}
impl<S: Stream<Item = Result<Bytes>> + fmt::Debug> fmt::Debug for BrotliDecoder<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BrotliDecoder")
.field("inner", &self.inner)
.field("flush", &self.flush)
.field("decompress", &"<no debug>")
.finish()
}
}