use std::io;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
pub struct DuplexStream {
reader: Box<dyn AsyncRead + Unpin + Send + 'static>,
writer: Box<dyn AsyncWrite + Unpin + Send + 'static>,
}
impl DuplexStream {
pub fn new(
reader: impl AsyncRead + Unpin + Send + 'static,
writer: impl AsyncWrite + Unpin + Send + 'static,
) -> Box<impl AsyncRead + AsyncWrite + Unpin + Send + 'static> {
return Box::new(
DuplexStream {
reader: Box::new(reader),
writer: Box::new(writer),
},
);
}
}
impl AsyncRead for DuplexStream {
fn poll_read(
mut self: Pin<&mut DuplexStream>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
return AsyncRead::poll_read(Pin::new(&mut self.reader), cx, buf);
}
}
impl AsyncWrite for DuplexStream {
fn poll_write(
mut self: Pin<&mut DuplexStream>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
return AsyncWrite::poll_write(Pin::new(&mut self.writer), cx, buf);
}
fn poll_flush(
mut self: Pin<&mut DuplexStream>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
return AsyncWrite::poll_flush(Pin::new(&mut self.writer), cx);
}
fn poll_shutdown(
mut self: Pin<&mut DuplexStream>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
return AsyncWrite::poll_shutdown(Pin::new(&mut self.writer), cx);
}
}
impl fmt::Debug for DuplexStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
return f.debug_struct("DuplexStream")
.finish();
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use tokio::io::split;
use cs_utils::traits::Random;
use super::*;
use crate::{
test::{TestOptions, test_async_stream},
mocks::{ChannelMockOptions, channel_mock_pair},
};
mod data_transfer {
use super::*;
#[rstest]
#[case(128)]
#[case(256)]
#[case(512)]
#[case(1_024)]
#[case(2_048)]
#[case(4_096)]
#[case(8_192)]
#[case(16_384)]
#[case(32_768)]
#[tokio::test]
async fn transfers_binary_data(
#[case] test_data_size: usize,
) {
let (channel1, channel2) = channel_mock_pair(
ChannelMockOptions::random(),
ChannelMockOptions::random(),
);
let (channel11, channel12) = split(channel1);
let (channel21, channel22) = split(channel2);
let channel1 = DuplexStream::new(channel11, channel12);
let channel2 = DuplexStream::new(channel21, channel22);
test_async_stream(
channel1,
channel2,
TestOptions::random()
.with_data_len(test_data_size),
).await;
}
}
}