1use std::pin::Pin;
2use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, AsyncReadExt, split, WriteHalf, ReadHalf};
3
4use cs_utils::{random_str, futures::wait_random};
5
6use crate::test::TestOptions;
7
8pub async fn test_async_stream_sends_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
9 mut channel: Pin<Box<TAsyncDuplex>>,
10 options: TestOptions,
11 test_data: String,
12) -> Pin<Box<TAsyncDuplex>> {
13 let mut i = 0;
14 let data = test_data.as_bytes().to_vec();
15
16 while i < test_data.len() {
17 wait_random(options.latency_range()).await;
18
19 let bytes_sent = channel
20 .write(&data[i..]).await
21 .expect("Cannot send a message.");
22
23 assert!(
24 bytes_sent > 0,
25 "No bytes sent.",
26 );
27
28 i += bytes_sent as usize;
29 }
30
31 return channel;
32}
33
34pub async fn test_async_stream_receives_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
35 mut channel: Pin<Box<TAsyncDuplex>>,
36 options: TestOptions,
37 test_data: String,
38) -> Pin<Box<TAsyncDuplex>> {
39 let mut received_data = String::new();
40
41 let mut data = [0; 4096];
42 loop {
43 wait_random(options.latency_range()).await;
44
45 let bytes_read = channel
46 .read(&mut data).await
47 .expect("Cannot receive message.");
48
49 let message_str = std::str::from_utf8(&data[..bytes_read])
50 .expect("Cannot parse UTF8 message.")
51 .to_string();
52
53 received_data = format!("{}{}", &received_data, message_str);
54
55 assert!(
56 test_data.starts_with(&received_data),
57 "Data corruption, received data is not subset of the test data.",
58 );
59
60 if received_data.len() == test_data.len() {
61 assert_eq!(
62 received_data,
63 test_data,
64 "Sent and received data must match.",
65 );
66
67 break;
68 }
69 }
70
71 return channel;
72}
73
74pub async fn test_async_stream_data_transfer<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
75 channel1: Pin<Box<TAsyncDuplex>>,
76 channel2: Pin<Box<TAsyncDuplex>>,
77 options: TestOptions,
78) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
79 let test_data = random_str(options.data_len());
80
81 return tokio::try_join!(
82 tokio::spawn(test_async_stream_sends_data(channel1, options.clone(), test_data.clone())),
83 tokio::spawn(test_async_stream_receives_data(channel2, options.clone(), test_data.clone())),
84 ).unwrap();
85}
86
87pub async fn test_async_half_sends_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
88 mut channel: WriteHalf<TAsyncDuplex>,
89 options: TestOptions,
90 test_data: String,
91) -> WriteHalf<TAsyncDuplex> {
92 let mut i = 0;
93 let data = test_data.as_bytes().to_vec();
94
95 while i < test_data.len() {
96 wait_random(options.latency_range()).await;
97
98 let bytes_sent = channel
99 .write(&data[i..]).await
100 .expect("Cannot send a message.");
101
102 assert!(
103 bytes_sent > 0,
104 "No bytes sent.",
105 );
106
107 i += bytes_sent as usize;
108 }
109
110 return channel;
111}
112
113pub async fn test_async_half_receives_data<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
114 mut channel: ReadHalf<TAsyncDuplex>,
115 options: TestOptions,
116 test_data: String,
117) -> ReadHalf<TAsyncDuplex> {
118 let mut received_data = String::new();
119
120 let mut data = [0; 4096];
121
122 loop {
123 wait_random(options.latency_range()).await;
124
125 let bytes_read = channel
126 .read(&mut data).await
127 .expect("Cannot receive message.");
128
129 let message_str = std::str::from_utf8(&data[..bytes_read])
130 .expect("Cannot parse UTF8 message.")
131 .to_string();
132
133 received_data = format!("{}{}", &received_data, message_str);
134
135 assert!(
136 test_data.starts_with(&received_data),
137 "Data corruption, received data is not subset of the test data.",
138 );
139
140 if received_data.len() == test_data.len() {
141 assert_eq!(
142 received_data,
143 test_data,
144 "Sent and received data must match.",
145 );
146
147 break;
148 }
149 }
150
151 return channel;
152}
153
154pub async fn test_async_stream_duplex<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
155 channel1: Pin<Box<TAsyncDuplex>>,
156 channel2: Pin<Box<TAsyncDuplex>>,
157 options: TestOptions,
158) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
159 let test_data = random_str(options.data_len());
160
161 let (channel1_source, channel1_sink) = split(channel1);
162 let (channel2_source, channel2_sink) = split(channel2);
163
164 let reversed_test_data: String = test_data
165 .chars()
166 .rev()
167 .collect();
168
169 let (
170 channel2_sink,
171 channel1_source,
172 channel1_sink,
173 channel2_source,
174 ) = tokio::try_join!(
175 tokio::spawn(test_async_half_sends_data(channel2_sink, options.clone(), test_data.clone())),
176 tokio::spawn(test_async_half_receives_data(channel1_source, options.clone(), test_data.clone())),
177 tokio::spawn(test_async_half_sends_data(channel1_sink, options.clone(), reversed_test_data.clone())),
179 tokio::spawn(test_async_half_receives_data(channel2_source, options.clone(), reversed_test_data.clone())),
180 ).unwrap();
181
182 let channel1 = channel1_source.unsplit(channel1_sink);
183 let channel2 = channel2_source.unsplit(channel2_sink);
184
185 return (channel1, channel2);
186}
187
188pub async fn test_async_stream<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
222 channel1: Box<TAsyncDuplex>,
223 channel2: Box<TAsyncDuplex>,
224 options: TestOptions,
225) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
226 return test_async_stream_pinned(
227 Pin::new(channel1),
228 Pin::new(channel2),
229 options,
230 ).await;
231}
232
233pub async fn test_async_stream_pinned<TAsyncDuplex: AsyncRead + AsyncWrite + Send + ?Sized + 'static>(
234 channel1: Pin<Box<TAsyncDuplex>>,
235 channel2: Pin<Box<TAsyncDuplex>>,
236 options: TestOptions,
237) -> (Pin<Box<TAsyncDuplex>>, Pin<Box<TAsyncDuplex>>) {
238 let (channel1, channel2) = test_async_stream_data_transfer(
240 channel1,
241 channel2,
242 options.clone()
243 ).await;
244
245 let (channel2, channel1) = test_async_stream_data_transfer(
247 channel2,
248 channel1,
249 options.clone(),
250 ).await;
251
252 return test_async_stream_duplex(
254 channel1,
255 channel2,
256 options,
257 ).await;
258}