use std::{fmt::Debug, pin::Pin};
use futures::{SinkExt, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Framed, Encoder, Decoder};
use cs_utils::{futures::wait_random, traits::Random, test::random_vec};
pub use crate::test::TestOptions;
pub type TFramedAsyncDuplex<T, K> = Framed<Pin<Box<K>>, T>;
pub async fn test_framed_stream<
TMessage: PartialEq + Clone + Debug + Random,
TError: Debug,
TEncoder: Encoder<TMessage> + Decoder<Item = TMessage, Error = TError>,
TAsyncDuplex: AsyncRead + AsyncWrite + ?Sized,
>(
stream1: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
stream2: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
options: TestOptions,
) -> (TFramedAsyncDuplex<TEncoder, TAsyncDuplex>, TFramedAsyncDuplex<TEncoder, TAsyncDuplex>) where <TEncoder as Encoder<TMessage>>::Error: Debug,
{
let (stream2, stream1) = run_framed_stream_test(stream2, stream1, options.clone()).await;
return run_framed_stream_test(stream1, stream2, options.clone()).await;
}
async fn run_framed_stream_test<
TMessage: PartialEq + Clone + Debug + Random,
TError: Debug,
TEncoder: Encoder<TMessage> + Decoder<Item = TMessage, Error = TError>,
TAsyncDuplex: AsyncRead + AsyncWrite + ?Sized,
>(
mut stream1: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
mut stream2: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
options: TestOptions,
) -> (TFramedAsyncDuplex<TEncoder, TAsyncDuplex>, TFramedAsyncDuplex<TEncoder, TAsyncDuplex>) where <TEncoder as Encoder<TMessage>>::Error: Debug,
{
let data_len = options.data_len();
let data = random_vec::<TMessage>(data_len as u32);
let latency_range1 = options.latency_range();
let latency_range2 = options.latency_range();
let ((stream1, data), (stream2, received_data)) = tokio::join!(
Box::pin(async move {
for message in &data {
wait_random(latency_range1.clone()).await;
stream1.send(message.clone()).await.unwrap();
}
return (stream1, data);
}),
Box::pin(async move {
let mut received_data = vec![];
loop {
wait_random(latency_range2.clone()).await;
let message = stream2.next().await
.expect("Stream closed.")
.expect("Cannot read stream message.");
received_data.push(message);
if received_data.len() == data_len as usize {
break;
}
}
return (stream2, received_data);
}),
);
assert_eq!(
data,
received_data,
"Sent and received data must match.",
);
return (stream1, stream2);
}