eolify 0.4.0

High-performance line ending normalization for Rust.
Documentation
#![cfg(any(feature = "futures-io", feature = "tokio"))]

use std::{
    pin::{pin, Pin},
    task::{Context, Poll},
};

macro_rules! dual_test {
    ($name:ident, $body:block) => {
        mod $name {
            use super::*;
            use eolify::CRLF;

            #[cfg(feature = "futures-io")]
            #[async_std::test]
            async fn futures_io() {
                use eolify::FuturesIoExt;
                use futures_util::AsyncReadExt;

                $body
            }

            #[cfg(feature = "tokio")]
            #[tokio::test]
            async fn tokio() {
                use eolify::TokioExt;
                use tokio::io::AsyncReadExt;

                $body
            }
        }
    };
}

dual_test!(crlf_split_across_readers, {
    let readers = vec![b"foo\r".as_ref(), b"\nbar".as_ref()].into_iter();
    let test_reader = AsyncTestReader::new(readers);
    let mut nr = CRLF::wrap_async_reader_with_buffer_size(test_reader, 3);
    let mut out = Vec::new();
    nr.read_to_end(&mut out).await.unwrap();
    assert_eq!(out.as_slice(), b"foo\r\nbar");
});

dual_test!(crlf_split_across_three_reader, {
    let readers = vec![b"\r".as_ref(), b"".as_ref(), b"\n".as_ref()].into_iter();
    let test_reader = AsyncTestReader::new(readers);
    let mut nr = CRLF::wrap_async_reader_with_buffer_size(test_reader, 3);
    let mut out = Vec::new();
    nr.read_to_end(&mut out).await.unwrap();
    assert_eq!(out, b"\r\n".to_vec());
});

dual_test!(lone_lf_in_first_reader_converted_to_crlf, {
    let readers = vec![b"line1\n".as_ref(), b"line2".as_ref()].into_iter();
    let test_reader = AsyncTestReader::new(readers);
    let mut nr = CRLF::wrap_async_reader_with_buffer_size(test_reader, 4);
    let mut out = Vec::new();
    nr.read_to_end(&mut out).await.unwrap();
    assert_eq!(out, b"line1\r\nline2".to_vec());
});

dual_test!(multiple_crs_and_crlf_mixed_across_boundaries, {
    let readers = vec![b"\r".as_ref(), b"\r\n".as_ref()].into_iter();
    let test_reader = AsyncTestReader::new(readers);
    let mut nr = CRLF::wrap_async_reader_with_buffer_size(test_reader, 2);
    let mut out = Vec::new();
    nr.read_to_end(&mut out).await.unwrap();
    assert_eq!(out, b"\r\n\r\n".to_vec());
});

dual_test!(trailing_cr_at_eof_emits_crlf, {
    let readers = vec![b"foo\r".as_ref()].into_iter();
    let test_reader = AsyncTestReader::new(readers);
    let mut nr = CRLF::wrap_async_reader_with_buffer_size(test_reader, 4);
    let mut out = Vec::new();
    nr.read_to_end(&mut out).await.unwrap();
    assert_eq!(out, b"foo\r\n".to_vec());
});

pub struct AsyncTestReader<R, I> {
    readers: I,
    current: Option<R>,
}

impl<R, I: Iterator<Item = R>> AsyncTestReader<R, I> {
    pub fn new(mut readers: I) -> AsyncTestReader<R, I> {
        let current = readers.next();
        AsyncTestReader {
            readers: readers,
            current: current,
        }
    }
}

#[cfg(feature = "futures-io")]
impl<R: futures_io::AsyncRead + Unpin, I: Iterator<Item = R> + Unpin> futures_io::AsyncRead
    for AsyncTestReader<R, I>
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<std::io::Result<usize>> {
        let this = self.get_mut();
        loop {
            match this.current.as_mut() {
                Some(r) => {
                    let mut r = pin!(r);
                    match r.as_mut().poll_read(cx, buf) {
                        Poll::Ready(Ok(n)) => {
                            if n > 0 {
                                return Poll::Ready(Ok(n));
                            }
                        }
                        Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
                        Poll::Pending => return Poll::Pending,
                    }
                }
                None => return Poll::Ready(Ok(0)),
            }
            this.current = this.readers.next();
        }
    }
}
#[cfg(feature = "tokio")]
impl<R: tokio::io::AsyncRead + Unpin, I: Iterator<Item = R> + Unpin> tokio::io::AsyncRead
    for AsyncTestReader<R, I>
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let this = self.get_mut();
        loop {
            match this.current.as_mut() {
                Some(r) => {
                    let mut r = pin!(r);
                    let len = buf.filled().len();
                    match r.as_mut().poll_read(cx, buf) {
                        Poll::Ready(Ok(())) => {
                            if buf.filled().len() > len {
                                return Poll::Ready(Ok(()));
                            }
                        }
                        Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
                        Poll::Pending => return Poll::Pending,
                    }
                }
                None => return Poll::Ready(Ok(())),
            }
            this.current = this.readers.next();
        }
    }
}