connection_utils/test/
test_async_stream.rs

1use std::pin::Pin;
2use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, AsyncReadExt, split, WriteHalf, ReadHalf};
3
4use cs_utils::{random_str, futures::wait_random};
5
6use crate::test::TestOptions;
7
8pub async fn test_async_stream_sends_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
9    mut channel: Pin<Box<TAsyncDuplex>>,
10    options: TestOptions,
11    test_data: String,
12) -> Pin<Box<TAsyncDuplex>> {
13    let mut i = 0;
14    let data = test_data.as_bytes().to_vec();
15
16    while i < test_data.len() {
17        wait_random(options.latency_range()).await;
18       
19        let bytes_sent = channel
20            .write(&data[i..]).await
21            .expect("Cannot send a message.");
22
23        assert!(
24            bytes_sent > 0,
25            "No bytes sent.",
26        );
27            
28        i += bytes_sent as usize;
29    }
30
31    return channel;
32}
33
34pub async fn test_async_stream_receives_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
35    mut channel: Pin<Box<TAsyncDuplex>>,
36    options: TestOptions,
37    test_data: String,
38) -> Pin<Box<TAsyncDuplex>> {
39    let mut received_data = String::new();
40
41    let mut data = [0; 4096];
42    loop {
43        wait_random(options.latency_range()).await;
44
45        let bytes_read = channel
46            .read(&mut data).await
47            .expect("Cannot receive message.");
48
49        let message_str = std::str::from_utf8(&data[..bytes_read])
50            .expect("Cannot parse UTF8 message.")
51            .to_string();
52
53        received_data = format!("{}{}", &received_data, message_str);
54
55        assert!(
56            test_data.starts_with(&received_data),
57            "Data corruption, received data is not subset of the test data.",
58        );
59
60        if received_data.len() == test_data.len() {
61            assert_eq!(
62                received_data,
63                test_data,
64                "Sent and received data must match.",
65            );
66
67            break;
68        }
69    }
70
71    return channel;
72}
73
74pub async fn test_async_stream_data_transfer<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
75    channel1: Pin<Box<TAsyncDuplex>>,
76    channel2: Pin<Box<TAsyncDuplex>>,
77    options: TestOptions,
78) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
79    let test_data = random_str(options.data_len());
80
81    return tokio::try_join!(
82        tokio::spawn(test_async_stream_sends_data(channel1, options.clone(), test_data.clone())),
83        tokio::spawn(test_async_stream_receives_data(channel2, options.clone(), test_data.clone())),
84    ).unwrap();
85}
86
87pub async fn test_async_half_sends_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
88    mut channel: WriteHalf<TAsyncDuplex>,
89    options: TestOptions,
90    test_data: String,
91) -> WriteHalf<TAsyncDuplex> {
92    let mut i = 0;
93    let data = test_data.as_bytes().to_vec();
94
95    while i < test_data.len() {
96        wait_random(options.latency_range()).await;
97
98        let bytes_sent = channel
99            .write(&data[i..]).await
100            .expect("Cannot send a message.");
101
102        assert!(
103            bytes_sent > 0,
104            "No bytes sent.",
105        );
106            
107        i += bytes_sent as usize;
108    }
109
110    return channel;
111}
112
113pub async fn test_async_half_receives_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
114    mut channel: ReadHalf<TAsyncDuplex>,
115    options: TestOptions,
116    test_data: String,
117) -> ReadHalf<TAsyncDuplex> {
118    let mut received_data = String::new();
119
120    let mut data = [0; 4096];
121
122    loop {
123        wait_random(options.latency_range()).await;
124
125        let bytes_read = channel
126            .read(&mut data).await
127            .expect("Cannot receive message.");
128
129        let message_str = std::str::from_utf8(&data[..bytes_read])
130            .expect("Cannot parse UTF8 message.")
131            .to_string();
132
133        received_data = format!("{}{}", &received_data, message_str);
134
135        assert!(
136            test_data.starts_with(&received_data),
137            "Data corruption, received data is not subset of the test data.",
138        );
139
140        if received_data.len() == test_data.len() {
141            assert_eq!(
142                received_data,
143                test_data,
144                "Sent and received data must match.",
145            );
146
147            break;
148        }
149    }
150
151    return channel;
152}
153
154pub async fn test_async_stream_duplex<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
155    channel1: Pin<Box<TAsyncDuplex>>,
156    channel2: Pin<Box<TAsyncDuplex>>,
157    options: TestOptions,
158) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
159    let test_data = random_str(options.data_len());
160
161    let (channel1_source, channel1_sink) = split(channel1);
162    let (channel2_source, channel2_sink) = split(channel2);
163
164    let reversed_test_data: String = test_data
165        .chars()
166        .rev()
167        .collect();
168
169    let (
170        channel2_sink,
171        channel1_source,
172        channel1_sink,
173        channel2_source,
174    ) = tokio::try_join!(
175        tokio::spawn(test_async_half_sends_data(channel2_sink, options.clone(), test_data.clone())),
176        tokio::spawn(test_async_half_receives_data(channel1_source, options.clone(), test_data.clone())),
177        // for other direction, use reversed data string to send something different than `test_data`
178        tokio::spawn(test_async_half_sends_data(channel1_sink, options.clone(), reversed_test_data.clone())),
179        tokio::spawn(test_async_half_receives_data(channel2_source, options.clone(), reversed_test_data.clone())),
180    ).unwrap();
181
182    let channel1 = channel1_source.unsplit(channel1_sink);
183    let channel2 = channel2_source.unsplit(channel2_sink);
184
185    return (channel1, channel2);
186}
187
188/// Test an `AsyncRead + AsyncWrite` stream data transfer
189/// 
190/// ### Examples
191/// 
192/// ```
193/// use tokio::io::duplex;
194/// 
195/// #[tokio::main]
196/// async fn main() {
197///     // either `test` or the `all` feature must be enabled
198///     #[cfg(any(feature = "test", feature = "all"))]
199///     {
200///         use connection_utils::test::{test_async_stream, TestOptions};
201///         use cs_utils::random_str_rg;
202///
203///         // create stream to test
204///         let (channel1, channel2) = duplex(4096);
205///         
206///         // create test data
207///         let options = TestOptions::default()
208///             .with_data_len(4096);
209///         
210///         // test data transfer
211///         test_async_stream(
212///             Box::new(channel1),
213///             Box::new(channel2),
214///             options,
215///         ).await;
216///         
217///         println!("👌 data transfer succeeded");
218///     }
219/// }
220/// ```
221pub async fn test_async_stream<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
222    channel1: Box<TAsyncDuplex>,
223    channel2: Box<TAsyncDuplex>,
224    options: TestOptions,
225) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
226    return test_async_stream_pinned(
227        Pin::new(channel1),
228        Pin::new(channel2),
229        options,
230    ).await;
231}
232
233pub async fn test_async_stream_pinned<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
234    channel1: Pin<Box<TAsyncDuplex>>,
235    channel2: Pin<Box<TAsyncDuplex>>,
236    options: TestOptions,
237) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
238    // test `channel1` to `channel2` direction
239    let (channel1, channel2) = test_async_stream_data_transfer(
240        channel1,
241        channel2,
242        options.clone()
243    ).await;
244
245    // test `channel2` to `channel1` direction
246    let (channel2, channel1) = test_async_stream_data_transfer(
247        channel2,
248        channel1,
249        options.clone(),
250    ).await;
251
252    // test bidirectional data transfer
253    return test_async_stream_duplex(
254        channel1,
255        channel2,
256        options,
257    ).await;
258}