connection_utils/test/
test_framed_stream.rs

1use std::{fmt::Debug, pin::Pin};
2use futures::{SinkExt, StreamExt};
3use tokio::io::{AsyncRead, AsyncWrite};
4use tokio_util::codec::{Framed, Encoder, Decoder};
5use cs_utils::{futures::wait_random, traits::Random, test::random_vec};
6
7pub use crate::test::TestOptions;
8
9pub type TFramedAsyncDuplex<T, K> = Framed<Pin<Box<K>>, T>;
10
11/// Test data transfer in a framed stream. First passes data in `forward` direction and validates that all
12/// data trassfered without issues. Then make another pass in `backward` direction with the same validation.
13pub async fn test_framed_stream<
14    TMessage: PartialEq + Clone + Debug + Random,
15    TError: Debug,
16    TEncoder: Encoder<TMessage> + Decoder<Item = TMessage, Error = TError>,
17    TAsyncDuplex: AsyncRead + AsyncWrite + ?Sized,
18>(
19    stream1: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
20    stream2: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
21    options: TestOptions,
22) -> (TFramedAsyncDuplex<TEncoder, TAsyncDuplex>, TFramedAsyncDuplex<TEncoder, TAsyncDuplex>) where <TEncoder as Encoder<TMessage>>::Error: Debug,
23{
24    // test in backward direction
25    let (stream2, stream1) = run_framed_stream_test(stream2, stream1, options.clone()).await;
26    // test in forward direction
27    return run_framed_stream_test(stream1, stream2, options.clone()).await;
28}
29
30async fn run_framed_stream_test<
31    TMessage: PartialEq + Clone + Debug + Random,
32    TError: Debug,
33    TEncoder: Encoder<TMessage> + Decoder<Item = TMessage, Error = TError>,
34    TAsyncDuplex: AsyncRead + AsyncWrite + ?Sized,
35>(
36    mut stream1: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
37    mut stream2: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
38    options: TestOptions,
39) -> (TFramedAsyncDuplex<TEncoder, TAsyncDuplex>, TFramedAsyncDuplex<TEncoder, TAsyncDuplex>) where <TEncoder as Encoder<TMessage>>::Error: Debug,
40{
41    let data_len = options.data_len();
42    let data = random_vec::<TMessage>(data_len as u32);
43
44    let latency_range1 = options.latency_range();
45    let latency_range2 = options.latency_range();
46
47    let ((stream1, data), (stream2, received_data)) = tokio::join!(
48        Box::pin(async move {
49            for message in &data {
50                wait_random(latency_range1.clone()).await;
51    
52                stream1.send(message.clone()).await.unwrap();
53            }
54    
55            return (stream1, data);
56        }),
57        Box::pin(async move {
58            let mut received_data = vec![];
59    
60            loop {
61                wait_random(latency_range2.clone()).await;
62    
63                let message = stream2.next().await
64                    .expect("Stream closed.")
65                    .expect("Cannot read stream message.");
66    
67                received_data.push(message);
68    
69                if received_data.len() == data_len as usize {
70                    break;
71                }
72            }
73    
74            return (stream2, received_data);
75        }),
76    );
77
78    assert_eq!(
79        data,
80        received_data,
81        "Sent and received data must match.",
82    );
83
84    return (stream1, stream2);
85}