cs_utils/utils/futures/test/
test_async_stream.rs

1use std::pin::Pin;
2use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, AsyncReadExt, split, WriteHalf, ReadHalf};
3
4use crate::random_number;
5
6pub async fn test_async_stream_sends_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
7    mut channel: Pin<Box<TAsyncDuplex>>,
8    test_data: String,
9) -> Pin<Box<TAsyncDuplex>> {
10    let mut i = 0;
11    let data = test_data.as_bytes().to_vec();
12    let test_data_len = test_data.len();
13
14    while i < test_data.len() {
15        let message_len = random_number(8..=(test_data_len / 10));
16        
17        let message_len = if i + message_len < data.len() {
18            i + message_len
19        } else {
20            data.len()
21        };
22
23        let bytes_sent = channel
24            .write(&data[i..message_len]).await
25            .expect("Cannot send a message.");
26
27        assert!(
28            bytes_sent > 0,
29            "No bytes sent.",
30        );
31            
32        i += bytes_sent as usize;
33    }
34
35    return channel;
36}
37
38pub async fn test_async_stream_receives_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
39    mut channel: Pin<Box<TAsyncDuplex>>,
40    test_data: String,
41) -> Pin<Box<TAsyncDuplex>> {
42    let mut received_data = String::new();
43
44    let mut data = [0; 1024];
45
46    loop {
47        let bytes_read = channel
48            .read(&mut data).await
49            .expect("Cannot receive message.");
50
51        let message_str = std::str::from_utf8(&data[..bytes_read])
52            .expect("Cannot parse UTF8 message.")
53            .to_string();
54
55        received_data = format!("{}{}", &received_data, message_str);
56
57        if received_data.len() == test_data.len() {
58            assert_eq!(
59                received_data,
60                test_data,
61                "Sent and received data must match.",
62            );
63
64            break;
65        }
66    }
67
68    return channel;
69}
70
71pub async fn test_async_stream_data_transfer<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
72    channel1: Pin<Box<TAsyncDuplex>>,
73    channel2: Pin<Box<TAsyncDuplex>>,
74    test_data: String,
75) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
76    let test_data1 = test_data.clone();
77    let test_data2 = test_data.clone();
78
79    return tokio::try_join!(
80        tokio::spawn(test_async_stream_sends_data(channel1, test_data1)),
81        tokio::spawn(test_async_stream_receives_data(channel2, test_data2)),
82    ).unwrap();
83}
84
85pub async fn test_async_half_sends_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
86    mut channel: WriteHalf<TAsyncDuplex>,
87    test_data: String,
88) {
89    let mut i = 0;
90    let data = test_data.as_bytes().to_vec();
91    let test_data_len = test_data.len();
92
93    while i < test_data.len() {
94        let message_len = random_number(8..=(test_data_len / 10));
95        
96        let message_len = if i + message_len < data.len() {
97            i + message_len
98        } else {
99            data.len()
100        };
101
102        let bytes_sent = channel
103            .write(&data[i..message_len]).await
104            .expect("Cannot send a message.");
105
106        assert!(
107            bytes_sent > 0,
108            "No bytes sent.",
109        );
110            
111        i += bytes_sent as usize;
112    }
113}
114
115pub async fn test_async_half_receives_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
116    mut channel: ReadHalf<TAsyncDuplex>,
117    test_data: String,
118) {
119    let mut received_data = String::new();
120
121    let mut data = [0; 1024];
122
123    loop {
124        let bytes_read = channel
125            .read(&mut data).await
126            .expect("Cannot receive message.");
127
128        let message_str = std::str::from_utf8(&data[..bytes_read])
129            .expect("Cannot parse UTF8 message.")
130            .to_string();
131
132        received_data = format!("{}{}", &received_data, message_str);
133
134        if received_data.len() == test_data.len() {
135            assert_eq!(
136                received_data,
137                test_data,
138                "Sent and received data must match.",
139            );
140
141            break;
142        }
143    }
144}
145
146pub async fn test_async_stream_duplex<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
147    channel1: Pin<Box<TAsyncDuplex>>,
148    channel2: Pin<Box<TAsyncDuplex>>,
149    test_data: String,
150) {
151
152    let (channel1_source, channel1_sink) = split(channel1);
153    let (channel2_source, channel2_sink) = split(channel2);
154
155    let reversed_test_data: String = test_data
156        .chars()
157        .rev()
158        .collect();
159
160    tokio::try_join!(
161        tokio::spawn(test_async_half_sends_data(channel2_sink, test_data.clone())),
162        tokio::spawn(test_async_half_receives_data(channel1_source, test_data.clone())),
163        // for other direction, use reversed data string to send something different than `test_data`
164        tokio::spawn(test_async_half_sends_data(channel1_sink, reversed_test_data.clone())),
165        tokio::spawn(test_async_half_receives_data(channel2_source, reversed_test_data.clone())),
166    ).unwrap();
167}
168
169/// Test an `AsyncRead + AsyncWrite` stream data transfer
170/// 
171/// ### Examples
172/// 
173/// ```
174/// use tokio::io::duplex;
175/// 
176/// #[tokio::main]
177/// async fn main() {
178///     // either both `async` and `test` features must be enabled or the `all` one
179///     #[cfg(any(all(feature = "async", feature = "test"), feature = "all"))]
180///     {
181///         use cs_utils::{futures::test::test_async_stream, random_str_rg};
182///         // create stream to test
183///         let (channel1, channel2) = duplex(4096);
184///         
185///         // create test data
186///         let test_data = random_str_rg(1024..=25600);
187///         
188///         // test data transfer
189///         test_async_stream(
190///             Box::new(channel1),
191///             Box::new(channel2),
192///             test_data,
193///         ).await;
194///         
195///         println!("👌 data transfer succeeded");
196///     }
197/// }
198/// ```
199pub async fn test_async_stream<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
200    channel1: Box<TAsyncDuplex>,
201    channel2: Box<TAsyncDuplex>,
202    test_data: String,
203) {
204    return test_async_stream_pinned(
205        Pin::new(channel1),
206        Pin::new(channel2),
207        test_data,
208    ).await;
209}
210
211pub async fn test_async_stream_pinned<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
212    channel1: Pin<Box<TAsyncDuplex>>,
213    channel2: Pin<Box<TAsyncDuplex>>,
214    test_data: String,
215) {
216    // test `channel1` to `channel2` direction
217    let (channel1, channel2) = test_async_stream_data_transfer(
218        channel1,
219        channel2,
220        test_data.clone(),
221    ).await;
222
223    // test `channel2` to `channel1` direction
224    let (channel2, channel1) = test_async_stream_data_transfer(
225        channel2,
226        channel1,
227        test_data.clone(),
228    ).await;
229
230    // test bidirectional data transfer
231    test_async_stream_duplex(
232        channel1,
233        channel2,
234        test_data,
235    ).await;
236}