connection-utils 0.8.0

Connection related utilities.
Documentation
use std::io;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use tokio::io::{AsyncRead, AsyncWrite};

/// Join AsyncRead and AsyncWrite into a single `AsyncRead + AsyncWrite` object.
pub struct DuplexStream {
    reader: Box<dyn AsyncRead + Unpin + Send + 'static>,
    writer: Box<dyn AsyncWrite + Unpin + Send + 'static>,
}

impl DuplexStream {
    pub fn new(
        reader: impl AsyncRead + Unpin + Send + 'static,
        writer: impl AsyncWrite + Unpin + Send + 'static,
    ) -> Box<impl AsyncRead + AsyncWrite + Unpin + Send + 'static> {
        return Box::new(
            DuplexStream {
                reader: Box::new(reader),
                writer: Box::new(writer),
            },
        );
    }
}

impl AsyncRead for DuplexStream {
    fn poll_read(
        mut self: Pin<&mut DuplexStream>,
        cx: &mut Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        return AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf);
    }
}

impl AsyncWrite for DuplexStream {
    fn poll_write(
        mut self: Pin<&mut DuplexStream>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        return AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf);
    }

    fn poll_flush(
        mut self: Pin<&mut DuplexStream>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), io::Error>> {
        return AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx);
    }

    fn poll_shutdown(
        mut self: Pin<&mut DuplexStream>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), io::Error>> {
        return AsyncWrite::poll_shutdown(Pin::new(&mut self.writer), cx);
    }
}

impl fmt::Debug for DuplexStream {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        return f.debug_struct("DuplexStream")
            .finish();
    }
}

#[cfg(test)]
mod tests {
    use rstest::rstest;
    use tokio::io::split;
    use cs_utils::traits::Random;

    use super::*;
    use crate::{
        test::{TestOptions, test_async_stream},
        mocks::{ChannelMockOptions, channel_mock_pair},
    };

    mod data_transfer {
        use super::*;

        #[rstest]
        #[case(128)]
        #[case(256)]
        #[case(512)]
        #[case(1_024)]
        #[case(2_048)]
        #[case(4_096)]
        #[case(8_192)]
        #[case(16_384)]
        #[case(32_768)]
        #[tokio::test]
        async fn transfers_binary_data(
            #[case] test_data_size: usize,
        ) {
            let (channel1, channel2) = channel_mock_pair(
                ChannelMockOptions::random(),
                ChannelMockOptions::random(),
            );

            let (channel11, channel12) = split(channel1);
            let (channel21, channel22) = split(channel2);

            let channel1 = DuplexStream::new(channel11, channel12);
            let channel2 = DuplexStream::new(channel21, channel22);

            test_async_stream(
                channel1,
                channel2,
                TestOptions::random()
                    .with_data_len(test_data_size),
            ).await;
        }
    }

}