connection_utils/test/
test_framed_stream.rs1use std::{fmt::Debug, pin::Pin};
2use futures::{SinkExt, StreamExt};
3use tokio::io::{AsyncRead, AsyncWrite};
4use tokio_util::codec::{Framed, Encoder, Decoder};
5use cs_utils::{futures::wait_random, traits::Random, test::random_vec};
6
7pub use crate::test::TestOptions;
8
9pub type TFramedAsyncDuplex<T, K> = Framed<Pin<Box<K>>, T>;
10
11pub async fn test_framed_stream<
14 TMessage: PartialEq + Clone + Debug + Random,
15 TError: Debug,
16 TEncoder: Encoder<TMessage> + Decoder<Item = TMessage, Error = TError>,
17 TAsyncDuplex: AsyncRead + AsyncWrite + ?Sized,
18>(
19 stream1: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
20 stream2: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
21 options: TestOptions,
22) -> (TFramedAsyncDuplex<TEncoder, TAsyncDuplex>, TFramedAsyncDuplex<TEncoder, TAsyncDuplex>) where <TEncoder as Encoder<TMessage>>::Error: Debug,
23{
24 let (stream2, stream1) = run_framed_stream_test(stream2, stream1, options.clone()).await;
26 return run_framed_stream_test(stream1, stream2, options.clone()).await;
28}
29
30async fn run_framed_stream_test<
31 TMessage: PartialEq + Clone + Debug + Random,
32 TError: Debug,
33 TEncoder: Encoder<TMessage> + Decoder<Item = TMessage, Error = TError>,
34 TAsyncDuplex: AsyncRead + AsyncWrite + ?Sized,
35>(
36 mut stream1: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
37 mut stream2: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
38 options: TestOptions,
39) -> (TFramedAsyncDuplex<TEncoder, TAsyncDuplex>, TFramedAsyncDuplex<TEncoder, TAsyncDuplex>) where <TEncoder as Encoder<TMessage>>::Error: Debug,
40{
41 let data_len = options.data_len();
42 let data = random_vec::<TMessage>(data_len as u32);
43
44 let latency_range1 = options.latency_range();
45 let latency_range2 = options.latency_range();
46
47 let ((stream1, data), (stream2, received_data)) = tokio::join!(
48 Box::pin(async move {
49 for message in &data {
50 wait_random(latency_range1.clone()).await;
51
52 stream1.send(message.clone()).await.unwrap();
53 }
54
55 return (stream1, data);
56 }),
57 Box::pin(async move {
58 let mut received_data = vec![];
59
60 loop {
61 wait_random(latency_range2.clone()).await;
62
63 let message = stream2.next().await
64 .expect("Stream closed.")
65 .expect("Cannot read stream message.");
66
67 received_data.push(message);
68
69 if received_data.len() == data_len as usize {
70 break;
71 }
72 }
73
74 return (stream2, received_data);
75 }),
76 );
77
78 assert_eq!(
79 data,
80 received_data,
81 "Sent and received data must match.",
82 );
83
84 return (stream1, stream2);
85}