1use serde::{Serialize, de::DeserializeOwned};
2use tokio_util::codec::LinesCodec;
3use std::{marker::PhantomData, fmt::Debug};
4
5mod encoder;
6mod decoder;
7
8#[derive(Debug, Clone)]
9pub struct GenericLinesCodec<T: Serialize + DeserializeOwned> {
10 length_delimited_codec: LinesCodec,
11 _phantom: PhantomData<T>,
12}
13
14impl<T: Serialize + DeserializeOwned> GenericLinesCodec<T> {
15 pub fn new() -> Self {
16 return GenericLinesCodec {
17 length_delimited_codec: LinesCodec::new(),
18 _phantom: PhantomData,
19 };
20 }
21}
22
23impl<T: Serialize + DeserializeOwned> Default for GenericLinesCodec<T> {
24 fn default() -> Self {
25 return GenericLinesCodec::new();
26 }
27}
28
29#[cfg(test)]
30mod tests {
31 use super::GenericLinesCodec;
32
33 mod object {
34 use futures::{StreamExt, SinkExt};
35 use rstest::rstest;
36 use tokio::io::duplex;
37 use tokio_util::codec::Framed;
38 use tokio::try_join;
39 use serde::{Serialize, Deserialize};
40
41 use crate::{swap, random_number, random_str, random_bool};
42 use crate::futures::wait_random;
43
44 use super::GenericLinesCodec;
45
46 #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
47 pub struct TestStruct {
48 pub id: u64,
49 pub name: String,
50 pub is_flag: bool,
51 }
52
53 fn random_test_struct(data_len: usize) -> TestStruct {
54 return TestStruct {
55 id: random_number(0..=u64::MAX),
56 name: random_str(data_len),
57 is_flag: random_bool(),
58 };
59 }
60
61 fn random_test_structs(count: usize, data_len: usize) -> Vec<TestStruct> {
62 return (0..count).map(|_| random_test_struct(data_len)).collect();
63 }
64
65 #[rstest]
66 #[case(1, true)]
67 #[case(2, true)]
68 #[case(3, true)]
69 #[case(4, true)]
70 #[case(5, true)]
71 #[case(6, true)]
72 #[case(7, true)]
73 #[case(8, true)]
74 #[case(1, false)]
75 #[case(2, false)]
76 #[case(3, false)]
77 #[case(4, false)]
78 #[case(5, false)]
79 #[case(6, false)]
80 #[case(7, false)]
81 #[case(8, false)]
82 #[tokio::test]
83 async fn can_send_receive_messages_bidirectional(
84 #[case] data_len: usize,
85 #[case] is_forward_direction: bool,
86 ) {
87 let data_len = data_len * data_len * data_len * 1024;
88 let (stream1, stream2) = duplex(random_number(256..=4096));
89
90 let stream1_messages = random_test_structs(random_number(4..8), data_len);
91 let stream1_messages1 = stream1_messages.clone();
92 let stream1_messages2 = stream1_messages.clone();
93
94 let stream2_messages = random_test_structs(random_number(4..8), data_len);
95 let stream2_messages1 = stream2_messages.clone();
96 let stream2_messages2 = stream2_messages.clone();
97
98 let (stream1, stream2) = if !is_forward_direction {
100 swap(stream1, stream2)
101 } else {
102 (stream1, stream2)
103 };
104
105 try_join!(
106 tokio::spawn(async move {
107 let framed_stream = Framed::new(
108 stream1,
109 GenericLinesCodec::<TestStruct>::new(),
110 );
111 let (
112 mut sink,
113 mut source,
114 ) = framed_stream.split();
115
116 try_join!(
117 tokio::spawn(async move {
118 for message in stream1_messages1 {
119 sink
120 .send(message).await
121 .expect("Cannot send message.");
122
123 wait_random(5..25).await;
124 }
125 }),
126 tokio::spawn(async move {
127 let mut received_messages = vec![];
128
129 while let Some(maybe_message) = source.next().await {
130 let message = maybe_message
131 .expect("Failed to unwrap the message.");
132
133 received_messages.push(message);
134
135 wait_random(5..25).await;
136
137 if received_messages.len() == stream2_messages2.len() {
139 break;
140 }
141 }
142
143 assert_eq!(
144 received_messages,
145 stream2_messages2,
146 "Sent and received messages must match.",
147 );
148 }),
149 ).unwrap();
150 }),
151 tokio::spawn(async move {
152 let framed_stream = Framed::new(
153 stream2,
154 GenericLinesCodec::<TestStruct>::new(),
155 );
156 let (
157 mut sink,
158 mut source,
159 ) = framed_stream.split();
160
161 try_join!(
162 tokio::spawn(async move {
163 for message in stream2_messages1 {
164 sink
165 .send(message).await
166 .expect("Cannot send message.");
167
168 wait_random(5..25).await;
169 }
170 }),
171 tokio::spawn(async move {
172 let mut received_messages = vec![];
173
174 while let Some(maybe_message) = source.next().await {
175 let message = maybe_message
176 .expect("Failed to unwrap the message.");
177
178 received_messages.push(message);
179
180 wait_random(5..25).await;
181
182 if received_messages.len() == stream1_messages2.len() {
184 break;
185 }
186 }
187
188 assert_eq!(
189 received_messages,
190 stream1_messages2,
191 "Sent and received messages must match.",
192 );
193 }),
194 ).unwrap();
195 }),
196 ).unwrap();
197 }
198 }
199
200 mod string {
201 use futures::{StreamExt, SinkExt};
202 use rstest::rstest;
203 use tokio::io::duplex;
204 use tokio_util::codec::Framed;
205 use tokio::try_join;
206
207 use crate::{swap, random_number, random_str_rg};
208 use crate::futures::wait_random;
209
210 use super::GenericLinesCodec;
211
212 fn random_strings(count: usize, data_len: usize) -> Vec<String> {
213 return (0..count)
214 .map(|_| {
215 return format!(
216 "{}\n{}",
217 random_str_rg(1..=data_len/2),
218 random_str_rg(1..=data_len/2)
219 );
220 })
221 .collect();
222 }
223
224 #[rstest]
225 #[case(1, true)]
226 #[case(2, true)]
227 #[case(3, true)]
228 #[case(4, true)]
229 #[case(5, true)]
230 #[case(6, true)]
231 #[case(7, true)]
232 #[case(8, true)]
233 #[case(1, false)]
234 #[case(2, false)]
235 #[case(3, false)]
236 #[case(4, false)]
237 #[case(5, false)]
238 #[case(6, false)]
239 #[case(7, false)]
240 #[case(8, false)]
241 #[tokio::test]
242 async fn can_send_receive_messages_bidirectional(
243 #[case] data_len: usize,
244 #[case] is_forward_direction: bool,
245 ) {
246 let data_len = data_len * data_len * 1024;
247 let (stream1, stream2) = duplex(random_number(256..=1024));
248
249 let stream1_messages = random_strings(random_number(4..8), data_len);
250 let stream1_messages1 = stream1_messages.clone();
251 let stream1_messages2 = stream1_messages.clone();
252
253 let stream2_messages = random_strings(random_number(4..8), data_len);
254
255 let stream2_messages1 = stream2_messages.clone();
256 let stream2_messages2 = stream2_messages.clone();
257
258 let (stream1, stream2) = if !is_forward_direction {
260 swap(stream1, stream2)
261 } else {
262 (stream1, stream2)
263 };
264
265 try_join!(
266 tokio::spawn(async move {
267 let framed_stream = Framed::new(
268 stream1,
269 GenericLinesCodec::<String>::new(),
270 );
271 let (
272 mut sink,
273 mut source,
274 ) = framed_stream.split();
275
276 try_join!(
277 tokio::spawn(async move {
278 for message in stream1_messages1 {
279 sink
280 .send(message).await
281 .expect("Cannot send message.");
282
283 wait_random(5..25).await;
284 }
285 }),
286 tokio::spawn(async move {
287 let mut received_messages = vec![];
288
289 while let Some(maybe_message) = source.next().await {
290 let message = maybe_message
291 .expect("Failed to unwrap the message.");
292
293 received_messages.push(message);
294
295 wait_random(5..25).await;
296
297 if received_messages.len() == stream2_messages2.len() {
299 break;
300 }
301 }
302
303 assert_eq!(
304 received_messages,
305 stream2_messages2,
306 "Sent and received messages must match.",
307 );
308 }),
309 ).unwrap();
310 }),
311 tokio::spawn(async move {
312 let framed_stream = Framed::new(
313 stream2,
314 GenericLinesCodec::<String>::new(),
315 );
316 let (
317 mut sink,
318 mut source,
319 ) = framed_stream.split();
320
321 try_join!(
322 tokio::spawn(async move {
323 for message in stream2_messages1 {
324 sink
325 .send(message).await
326 .expect("Cannot send message.");
327
328 wait_random(5..25).await;
329 }
330 }),
331 tokio::spawn(async move {
332 let mut received_messages = vec![];
333
334 while let Some(maybe_message) = source.next().await {
335 let message = maybe_message
336 .expect("Failed to unwrap the message.");
337
338 received_messages.push(message);
339
340 wait_random(5..25).await;
341
342 if received_messages.len() == stream1_messages2.len() {
344 break;
345 }
346 }
347
348 assert_eq!(
349 received_messages,
350 stream1_messages2,
351 "Sent and received messages must match.",
352 );
353 }),
354 ).unwrap();
355 }),
356 ).unwrap();
357 }
358 }
359}