chunked_transfer_encoding 0.1.0

Chunked transfer encoding
Documentation
use core::{
    fmt,
    pin::Pin,
    task::{Context, Poll},
};

use futures_util::{stream::FusedStream, Stream};
use pin_project_lite::pin_project;

use crate::utils;

//
pin_project! {
    pub struct ChunkedStream<St> {
        #[pin]
        inner: St,
        is_final: bool,
    }
}

impl<St> fmt::Display for ChunkedStream<St>
where
    St: fmt::Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ChunkedStream")
            .field("inner", &self.inner)
            .field("is_final", &self.is_final)
            .finish()
    }
}

//
impl<St> ChunkedStream<St> {
    pub fn new(inner: St) -> Self {
        Self {
            inner,
            is_final: false,
        }
    }
}

//
impl<T, St> FusedStream for ChunkedStream<St>
where
    T: Into<Vec<u8>>,
    St: Stream<Item = T> + Send + 'static,
    St: FusedStream,
{
    fn is_terminated(&self) -> bool {
        self.inner.is_terminated()
    }
}

//
impl<T, St> Stream for ChunkedStream<St>
where
    T: Into<Vec<u8>>,
    St: Stream<Item = T> + Send + 'static,
{
    type Item = Vec<u8>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();

        if *this.is_final {
            return Poll::Ready(None);
        }

        match this.inner.poll_next(cx) {
            Poll::Ready(Some(item)) => {
                let mut bytes: Vec<u8> = item.into();

                if bytes.is_empty() {
                    *this.is_final = true;
                }

                let chunk_size = utils::chunk_size(&bytes);
                let chunk_size_len = chunk_size.as_bytes().len();
                for c in chunk_size.as_bytes().iter().rev() {
                    bytes.insert(0, *c);
                }
                bytes.insert(chunk_size_len, b'\r');
                bytes.insert(chunk_size_len + 1, b'\n');

                bytes.extend_from_slice(b"\r\n");

                Poll::Ready(Some(bytes))
            }
            Poll::Ready(None) => {
                *this.is_final = true;

                let bytes = b"0\r\n\r\n".to_vec();

                Poll::Ready(Some(bytes))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use futures_util::{stream, StreamExt as _};

    #[test]
    fn test_chunked_stream() {
        futures_executor::block_on(async {
            //
            let mut st =
                ChunkedStream::new(stream::iter(vec!["Wiki", "pedia ", "in \r\n\r\nchunks."]));

            assert_eq!(st.next().await, Some(b"4\r\nWiki\r\n".to_vec()));
            assert_eq!(st.next().await, Some(b"6\r\npedia \r\n".to_vec()));
            assert_eq!(
                st.next().await,
                Some(b"E\r\nin \r\n\r\nchunks.\r\n".to_vec())
            );
            assert_eq!(st.next().await, Some(b"0\r\n\r\n".to_vec()));
            assert_eq!(st.next().await, None);

            //
            let mut st = ChunkedStream::new(stream::iter(vec![
                "Wiki",
                "pedia ",
                "in \r\n\r\nchunks.",
                "",
            ]));

            assert_eq!(st.next().await, Some(b"4\r\nWiki\r\n".to_vec()));
            assert_eq!(st.next().await, Some(b"6\r\npedia \r\n".to_vec()));
            assert_eq!(
                st.next().await,
                Some(b"E\r\nin \r\n\r\nchunks.\r\n".to_vec())
            );
            assert_eq!(st.next().await, Some(b"0\r\n\r\n".to_vec()));
            assert_eq!(st.next().await, None);
        })
    }
}