cs_utils/utils/futures/
generic_lines_codec.rs

1use serde::{Serialize, de::DeserializeOwned};
2use tokio_util::codec::LinesCodec;
3use std::{marker::PhantomData, fmt::Debug};
4
5mod encoder;
6mod decoder;
7
8#[derive(Debug, Clone)]
9pub struct GenericLinesCodec<T: Serialize + DeserializeOwned> {
10    length_delimited_codec: LinesCodec,
11    _phantom: PhantomData<T>,
12}
13
14impl<T: Serialize + DeserializeOwned> GenericLinesCodec<T> {
15    pub fn new() -> Self {
16        return GenericLinesCodec {
17            length_delimited_codec: LinesCodec::new(),
18            _phantom: PhantomData,
19        };
20    }
21}
22
23impl<T: Serialize + DeserializeOwned> Default for GenericLinesCodec<T> {
24    fn default() -> Self {
25        return GenericLinesCodec::new();
26    }
27}
28
29#[cfg(test)]
30mod tests {
31    use super::GenericLinesCodec;
32
33    mod object {
34        use futures::{StreamExt, SinkExt};
35        use rstest::rstest;
36        use tokio::io::duplex;
37        use tokio_util::codec::Framed;
38        use tokio::try_join;
39        use serde::{Serialize, Deserialize};
40
41        use crate::{swap, random_number, random_str, random_bool};
42        use crate::futures::wait_random;
43
44        use super::GenericLinesCodec;
45
46        #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
47        pub struct TestStruct {
48            pub id: u64,
49            pub name: String,
50            pub is_flag: bool,
51        }
52
53        fn random_test_struct(data_len: usize) -> TestStruct {
54            return TestStruct {
55                id: random_number(0..=u64::MAX),
56                name: random_str(data_len),
57                is_flag: random_bool(),
58            };
59        }
60
61        fn random_test_structs(count: usize, data_len: usize) -> Vec<TestStruct> {
62            return (0..count).map(|_| random_test_struct(data_len)).collect();
63        }
64
65        #[rstest]
66        #[case(1, true)]
67        #[case(2, true)]
68        #[case(3, true)]
69        #[case(4, true)]
70        #[case(5, true)]
71        #[case(6, true)]
72        #[case(7, true)]
73        #[case(8, true)]
74        #[case(1, false)]
75        #[case(2, false)]
76        #[case(3, false)]
77        #[case(4, false)]
78        #[case(5, false)]
79        #[case(6, false)]
80        #[case(7, false)]
81        #[case(8, false)]
82        #[tokio::test]
83        async fn can_send_receive_messages_bidirectional(
84            #[case] data_len: usize,
85            #[case] is_forward_direction: bool,
86        ) {
87            let data_len = data_len * data_len * data_len * 1024;
88            let (stream1, stream2) = duplex(random_number(256..=4096));
89            
90            let stream1_messages = random_test_structs(random_number(4..8), data_len);
91            let stream1_messages1 = stream1_messages.clone();
92            let stream1_messages2 = stream1_messages.clone();
93
94            let stream2_messages = random_test_structs(random_number(4..8), data_len);
95            let stream2_messages1 = stream2_messages.clone();
96            let stream2_messages2 = stream2_messages.clone();
97
98            // maybe swap the direction
99            let (stream1, stream2) = if !is_forward_direction {
100                swap(stream1, stream2)
101            } else {
102                (stream1, stream2)
103            };
104
105            try_join!(
106                tokio::spawn(async move {
107                    let framed_stream = Framed::new(
108                        stream1,
109                        GenericLinesCodec::<TestStruct>::new(),
110                    );
111                    let (
112                        mut sink,
113                        mut source,
114                    ) = framed_stream.split();
115
116                    try_join!(
117                        tokio::spawn(async move {
118                            for message in stream1_messages1 {
119                                sink
120                                    .send(message).await
121                                    .expect("Cannot send message.");
122            
123                                wait_random(5..25).await;
124                            }
125                        }),
126                        tokio::spawn(async move {
127                            let mut received_messages = vec![];
128            
129                            while let Some(maybe_message) = source.next().await {
130                                let message = maybe_message
131                                    .expect("Failed to unwrap the message.");
132            
133                                received_messages.push(message);
134                                
135                                wait_random(5..25).await;
136
137                                // otherwise the test never completes
138                                if received_messages.len() == stream2_messages2.len() {
139                                    break;
140                                }
141                            }
142            
143                            assert_eq!(
144                                received_messages,
145                                stream2_messages2,
146                                "Sent and received messages must match.",
147                            );
148                        }),
149                    ).unwrap();
150                }),
151                tokio::spawn(async move {
152                    let framed_stream = Framed::new(
153                        stream2,
154                        GenericLinesCodec::<TestStruct>::new(),
155                    );
156                    let (
157                        mut sink,
158                        mut source,
159                    ) = framed_stream.split();
160
161                    try_join!(
162                        tokio::spawn(async move {
163                            for message in stream2_messages1 {
164                                sink
165                                    .send(message).await
166                                    .expect("Cannot send message.");
167            
168                                wait_random(5..25).await;
169                            }
170                        }),
171                        tokio::spawn(async move {
172                            let mut received_messages = vec![];
173            
174                            while let Some(maybe_message) = source.next().await {
175                                let message = maybe_message
176                                    .expect("Failed to unwrap the message.");
177            
178                                received_messages.push(message);
179                                
180                                wait_random(5..25).await;
181
182                                // otherwise the test never completes
183                                if received_messages.len() == stream1_messages2.len() {
184                                    break;
185                                }
186                            }
187            
188                            assert_eq!(
189                                received_messages,
190                                stream1_messages2,
191                                "Sent and received messages must match.",
192                            );
193                        }),
194                    ).unwrap();
195                }),
196            ).unwrap();
197        }
198    }
199
200    mod string {
201        use futures::{StreamExt, SinkExt};
202        use rstest::rstest;
203        use tokio::io::duplex;
204        use tokio_util::codec::Framed;
205        use tokio::try_join;
206
207        use crate::{swap, random_number, random_str_rg};
208        use crate::futures::wait_random;
209
210        use super::GenericLinesCodec;
211
212        fn random_strings(count: usize, data_len: usize) -> Vec<String> {
213            return (0..count)
214                .map(|_| {
215                    return format!(
216                        "{}\n{}",
217                        random_str_rg(1..=data_len/2),
218                        random_str_rg(1..=data_len/2)
219                    );
220                })
221                .collect();
222        }
223
224        #[rstest]
225        #[case(1, true)]
226        #[case(2, true)]
227        #[case(3, true)]
228        #[case(4, true)]
229        #[case(5, true)]
230        #[case(6, true)]
231        #[case(7, true)]
232        #[case(8, true)]
233        #[case(1, false)]
234        #[case(2, false)]
235        #[case(3, false)]
236        #[case(4, false)]
237        #[case(5, false)]
238        #[case(6, false)]
239        #[case(7, false)]
240        #[case(8, false)]
241        #[tokio::test]
242        async fn can_send_receive_messages_bidirectional(
243            #[case] data_len: usize,
244            #[case] is_forward_direction: bool,
245        ) {
246            let data_len = data_len * data_len * 1024;
247            let (stream1, stream2) = duplex(random_number(256..=1024));
248            
249            let stream1_messages = random_strings(random_number(4..8), data_len);
250            let stream1_messages1 = stream1_messages.clone();
251            let stream1_messages2 = stream1_messages.clone();
252
253            let stream2_messages = random_strings(random_number(4..8), data_len);
254
255            let stream2_messages1 = stream2_messages.clone();
256            let stream2_messages2 = stream2_messages.clone();
257
258            // maybe swap the direction
259            let (stream1, stream2) = if !is_forward_direction {
260                swap(stream1, stream2)
261            } else {
262                (stream1, stream2)
263            };
264
265            try_join!(
266                tokio::spawn(async move {
267                    let framed_stream = Framed::new(
268                        stream1,
269                        GenericLinesCodec::<String>::new(),
270                    );
271                    let (
272                        mut sink,
273                        mut source,
274                    ) = framed_stream.split();
275
276                    try_join!(
277                        tokio::spawn(async move {
278                            for message in stream1_messages1 {
279                                sink
280                                    .send(message).await
281                                    .expect("Cannot send message.");
282            
283                                wait_random(5..25).await;
284                            }
285                        }),
286                        tokio::spawn(async move {
287                            let mut received_messages = vec![];
288            
289                            while let Some(maybe_message) = source.next().await {
290                                let message = maybe_message
291                                    .expect("Failed to unwrap the message.");
292            
293                                received_messages.push(message);
294                                
295                                wait_random(5..25).await;
296
297                                // otherwise the test never completes
298                                if received_messages.len() == stream2_messages2.len() {
299                                    break;
300                                }
301                            }
302            
303                            assert_eq!(
304                                received_messages,
305                                stream2_messages2,
306                                "Sent and received messages must match.",
307                            );
308                        }),
309                    ).unwrap();
310                }),
311                tokio::spawn(async move {
312                    let framed_stream = Framed::new(
313                        stream2,
314                        GenericLinesCodec::<String>::new(),
315                    );
316                    let (
317                        mut sink,
318                        mut source,
319                    ) = framed_stream.split();
320
321                    try_join!(
322                        tokio::spawn(async move {
323                            for message in stream2_messages1 {
324                                sink
325                                    .send(message).await
326                                    .expect("Cannot send message.");
327            
328                                wait_random(5..25).await;
329                            }
330                        }),
331                        tokio::spawn(async move {
332                            let mut received_messages = vec![];
333            
334                            while let Some(maybe_message) = source.next().await {
335                                let message = maybe_message
336                                    .expect("Failed to unwrap the message.");
337            
338                                received_messages.push(message);
339                                
340                                wait_random(5..25).await;
341
342                                // otherwise the test never completes
343                                if received_messages.len() == stream1_messages2.len() {
344                                    break;
345                                }
346                            }
347            
348                            assert_eq!(
349                                received_messages,
350                                stream1_messages2,
351                                "Sent and received messages must match.",
352                            );
353                        }),
354                    ).unwrap();
355                }),
356            ).unwrap();
357        }
358    }
359}