cs_utils/utils/futures/test/
test_async_stream.rs1use std::pin::Pin;
2use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, AsyncReadExt, split, WriteHalf, ReadHalf};
3
4use crate::random_number;
5
6pub async fn test_async_stream_sends_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
7 mut channel: Pin<Box<TAsyncDuplex>>,
8 test_data: String,
9) -> Pin<Box<TAsyncDuplex>> {
10 let mut i = 0;
11 let data = test_data.as_bytes().to_vec();
12 let test_data_len = test_data.len();
13
14 while i < test_data.len() {
15 let message_len = random_number(8..=(test_data_len / 10));
16
17 let message_len = if i + message_len < data.len() {
18 i + message_len
19 } else {
20 data.len()
21 };
22
23 let bytes_sent = channel
24 .write(&data[i..message_len]).await
25 .expect("Cannot send a message.");
26
27 assert!(
28 bytes_sent > 0,
29 "No bytes sent.",
30 );
31
32 i += bytes_sent as usize;
33 }
34
35 return channel;
36}
37
38pub async fn test_async_stream_receives_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
39 mut channel: Pin<Box<TAsyncDuplex>>,
40 test_data: String,
41) -> Pin<Box<TAsyncDuplex>> {
42 let mut received_data = String::new();
43
44 let mut data = [0; 1024];
45
46 loop {
47 let bytes_read = channel
48 .read(&mut data).await
49 .expect("Cannot receive message.");
50
51 let message_str = std::str::from_utf8(&data[..bytes_read])
52 .expect("Cannot parse UTF8 message.")
53 .to_string();
54
55 received_data = format!("{}{}", &received_data, message_str);
56
57 if received_data.len() == test_data.len() {
58 assert_eq!(
59 received_data,
60 test_data,
61 "Sent and received data must match.",
62 );
63
64 break;
65 }
66 }
67
68 return channel;
69}
70
71pub async fn test_async_stream_data_transfer<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
72 channel1: Pin<Box<TAsyncDuplex>>,
73 channel2: Pin<Box<TAsyncDuplex>>,
74 test_data: String,
75) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
76 let test_data1 = test_data.clone();
77 let test_data2 = test_data.clone();
78
79 return tokio::try_join!(
80 tokio::spawn(test_async_stream_sends_data(channel1, test_data1)),
81 tokio::spawn(test_async_stream_receives_data(channel2, test_data2)),
82 ).unwrap();
83}
84
85pub async fn test_async_half_sends_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
86 mut channel: WriteHalf<TAsyncDuplex>,
87 test_data: String,
88) {
89 let mut i = 0;
90 let data = test_data.as_bytes().to_vec();
91 let test_data_len = test_data.len();
92
93 while i < test_data.len() {
94 let message_len = random_number(8..=(test_data_len / 10));
95
96 let message_len = if i + message_len < data.len() {
97 i + message_len
98 } else {
99 data.len()
100 };
101
102 let bytes_sent = channel
103 .write(&data[i..message_len]).await
104 .expect("Cannot send a message.");
105
106 assert!(
107 bytes_sent > 0,
108 "No bytes sent.",
109 );
110
111 i += bytes_sent as usize;
112 }
113}
114
115pub async fn test_async_half_receives_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
116 mut channel: ReadHalf<TAsyncDuplex>,
117 test_data: String,
118) {
119 let mut received_data = String::new();
120
121 let mut data = [0; 1024];
122
123 loop {
124 let bytes_read = channel
125 .read(&mut data).await
126 .expect("Cannot receive message.");
127
128 let message_str = std::str::from_utf8(&data[..bytes_read])
129 .expect("Cannot parse UTF8 message.")
130 .to_string();
131
132 received_data = format!("{}{}", &received_data, message_str);
133
134 if received_data.len() == test_data.len() {
135 assert_eq!(
136 received_data,
137 test_data,
138 "Sent and received data must match.",
139 );
140
141 break;
142 }
143 }
144}
145
146pub async fn test_async_stream_duplex<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
147 channel1: Pin<Box<TAsyncDuplex>>,
148 channel2: Pin<Box<TAsyncDuplex>>,
149 test_data: String,
150) {
151
152 let (channel1_source, channel1_sink) = split(channel1);
153 let (channel2_source, channel2_sink) = split(channel2);
154
155 let reversed_test_data: String = test_data
156 .chars()
157 .rev()
158 .collect();
159
160 tokio::try_join!(
161 tokio::spawn(test_async_half_sends_data(channel2_sink, test_data.clone())),
162 tokio::spawn(test_async_half_receives_data(channel1_source, test_data.clone())),
163 tokio::spawn(test_async_half_sends_data(channel1_sink, reversed_test_data.clone())),
165 tokio::spawn(test_async_half_receives_data(channel2_source, reversed_test_data.clone())),
166 ).unwrap();
167}
168
169pub async fn test_async_stream<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
200 channel1: Box<TAsyncDuplex>,
201 channel2: Box<TAsyncDuplex>,
202 test_data: String,
203) {
204 return test_async_stream_pinned(
205 Pin::new(channel1),
206 Pin::new(channel2),
207 test_data,
208 ).await;
209}
210
211pub async fn test_async_stream_pinned<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
212 channel1: Pin<Box<TAsyncDuplex>>,
213 channel2: Pin<Box<TAsyncDuplex>>,
214 test_data: String,
215) {
216 let (channel1, channel2) = test_async_stream_data_transfer(
218 channel1,
219 channel2,
220 test_data.clone(),
221 ).await;
222
223 let (channel2, channel1) = test_async_stream_data_transfer(
225 channel2,
226 channel1,
227 test_data.clone(),
228 ).await;
229
230 test_async_stream_duplex(
232 channel1,
233 channel2,
234 test_data,
235 ).await;
236}