connection_utils/codecs/
generic_lines_codec.rs

1use serde::{Serialize, de::DeserializeOwned};
2use tokio_util::codec::LinesCodec;
3use std::{marker::PhantomData, fmt::Debug};
4
5mod encoder;
6mod decoder;
7
8#[derive(Debug, Clone)]
9pub struct GenericLinesCodec<T: Serialize + DeserializeOwned> {
10    length_delimited_codec: LinesCodec,
11    _phantom: PhantomData<T>,
12}
13
14impl<T: Serialize + DeserializeOwned> GenericLinesCodec<T> {
15    pub fn new() -> Self {
16        return GenericLinesCodec {
17            length_delimited_codec: LinesCodec::new(),
18            _phantom: PhantomData,
19        };
20    }
21}
22
23impl<T: Serialize + DeserializeOwned> Default for GenericLinesCodec<T> {
24    fn default() -> Self {
25        return GenericLinesCodec::new();
26    }
27}
28
29#[cfg(test)]
30mod tests {
31    use cs_utils::{swap, random_number, random_str, random_str_rg, random_bool, futures::wait_random};
32
33    use super::*;
34
35    mod object {
36        use rstest::rstest;
37        use tokio::try_join;
38        use tokio::io::duplex;
39        use tokio_util::codec::Framed;
40        use futures::{StreamExt, SinkExt};
41        use serde::{Serialize, Deserialize};
42
43        use super::*;
44
45        #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
46        pub struct TestStruct {
47            pub id: u64,
48            pub name: String,
49            pub is_flag: bool,
50        }
51
52        fn random_test_struct(data_len: usize) -> TestStruct {
53            return TestStruct {
54                id: random_number(0..=u64::MAX),
55                name: random_str(data_len),
56                is_flag: random_bool(),
57            };
58        }
59
60        fn random_test_structs(count: usize, data_len: usize) -> Vec<TestStruct> {
61            return (0..count).map(|_| random_test_struct(data_len)).collect();
62        }
63
64        #[rstest]
65        #[case(1, true)]
66        #[case(2, true)]
67        #[case(3, true)]
68        #[case(4, true)]
69        #[case(5, true)]
70        #[case(6, true)]
71        #[case(7, true)]
72        #[case(8, true)]
73        #[case(1, false)]
74        #[case(2, false)]
75        #[case(3, false)]
76        #[case(4, false)]
77        #[case(5, false)]
78        #[case(6, false)]
79        #[case(7, false)]
80        #[case(8, false)]
81        #[tokio::test]
82        async fn can_send_receive_messages_bidirectional(
83            #[case] data_len: usize,
84            #[case] is_forward_direction: bool,
85        ) {
86            let data_len = data_len * data_len * data_len * 1024;
87            let (stream1, stream2) = duplex(random_number(256..=4096));
88            
89            let stream1_messages = random_test_structs(random_number(4..8), data_len);
90            let stream1_messages1 = stream1_messages.clone();
91            let stream1_messages2 = stream1_messages.clone();
92
93            let stream2_messages = random_test_structs(random_number(4..8), data_len);
94            let stream2_messages1 = stream2_messages.clone();
95            let stream2_messages2 = stream2_messages.clone();
96
97            // maybe swap the direction
98            let (stream1, stream2) = if !is_forward_direction {
99                swap(stream1, stream2)
100            } else {
101                (stream1, stream2)
102            };
103
104            try_join!(
105                tokio::spawn(async move {
106                    let framed_stream = Framed::new(
107                        stream1,
108                        GenericLinesCodec::<TestStruct>::new(),
109                    );
110                    let (
111                        mut sink,
112                        mut source,
113                    ) = framed_stream.split();
114
115                    try_join!(
116                        tokio::spawn(async move {
117                            for message in stream1_messages1 {
118                                sink
119                                    .send(message).await
120                                    .expect("Cannot send message.");
121            
122                                wait_random(5..25).await;
123                            }
124                        }),
125                        tokio::spawn(async move {
126                            let mut received_messages = vec![];
127            
128                            while let Some(maybe_message) = source.next().await {
129                                let message = maybe_message
130                                    .expect("Failed to unwrap the message.");
131            
132                                received_messages.push(message);
133                                
134                                wait_random(5..25).await;
135
136                                // otherwise the test never completes
137                                if received_messages.len() == stream2_messages2.len() {
138                                    break;
139                                }
140                            }
141            
142                            assert_eq!(
143                                received_messages,
144                                stream2_messages2,
145                                "Sent and received messages must match.",
146                            );
147                        }),
148                    ).unwrap();
149                }),
150                tokio::spawn(async move {
151                    let framed_stream = Framed::new(
152                        stream2,
153                        GenericLinesCodec::<TestStruct>::new(),
154                    );
155                    let (
156                        mut sink,
157                        mut source,
158                    ) = framed_stream.split();
159
160                    try_join!(
161                        tokio::spawn(async move {
162                            for message in stream2_messages1 {
163                                sink
164                                    .send(message).await
165                                    .expect("Cannot send message.");
166            
167                                wait_random(5..25).await;
168                            }
169                        }),
170                        tokio::spawn(async move {
171                            let mut received_messages = vec![];
172            
173                            while let Some(maybe_message) = source.next().await {
174                                let message = maybe_message
175                                    .expect("Failed to unwrap the message.");
176            
177                                received_messages.push(message);
178                                
179                                wait_random(5..25).await;
180
181                                // otherwise the test never completes
182                                if received_messages.len() == stream1_messages2.len() {
183                                    break;
184                                }
185                            }
186            
187                            assert_eq!(
188                                received_messages,
189                                stream1_messages2,
190                                "Sent and received messages must match.",
191                            );
192                        }),
193                    ).unwrap();
194                }),
195            ).unwrap();
196        }
197    }
198
199    mod string {
200        use rstest::rstest;
201        use tokio::try_join;
202        use tokio::io::duplex;
203        use tokio_util::codec::Framed;
204        use futures::{StreamExt, SinkExt};
205
206        use super::*;
207
208        fn random_strings(count: usize, data_len: usize) -> Vec<String> {
209            return (0..count)
210                .map(|_| {
211                    return format!(
212                        "{}\n{}",
213                        random_str_rg(1..=data_len/2),
214                        random_str_rg(1..=data_len/2)
215                    );
216                })
217                .collect();
218        }
219
220        #[rstest]
221        #[case(1, true)]
222        #[case(2, true)]
223        #[case(3, true)]
224        #[case(4, true)]
225        #[case(5, true)]
226        #[case(6, true)]
227        #[case(7, true)]
228        #[case(8, true)]
229        #[case(1, false)]
230        #[case(2, false)]
231        #[case(3, false)]
232        #[case(4, false)]
233        #[case(5, false)]
234        #[case(6, false)]
235        #[case(7, false)]
236        #[case(8, false)]
237        #[tokio::test]
238        async fn can_send_receive_messages_bidirectional(
239            #[case] data_len: usize,
240            #[case] is_forward_direction: bool,
241        ) {
242            let data_len = data_len * data_len * 1024;
243            let (stream1, stream2) = duplex(random_number(256..=1024));
244            
245            let stream1_messages = random_strings(random_number(4..8), data_len);
246            let stream1_messages1 = stream1_messages.clone();
247            let stream1_messages2 = stream1_messages.clone();
248
249            let stream2_messages = random_strings(random_number(4..8), data_len);
250
251            let stream2_messages1 = stream2_messages.clone();
252            let stream2_messages2 = stream2_messages.clone();
253
254            // maybe swap the direction
255            let (stream1, stream2) = if !is_forward_direction {
256                swap(stream1, stream2)
257            } else {
258                (stream1, stream2)
259            };
260
261            try_join!(
262                tokio::spawn(async move {
263                    let framed_stream = Framed::new(
264                        stream1,
265                        GenericLinesCodec::<String>::new(),
266                    );
267                    let (
268                        mut sink,
269                        mut source,
270                    ) = framed_stream.split();
271
272                    try_join!(
273                        tokio::spawn(async move {
274                            for message in stream1_messages1 {
275                                sink
276                                    .send(message).await
277                                    .expect("Cannot send message.");
278            
279                                wait_random(5..25).await;
280                            }
281                        }),
282                        tokio::spawn(async move {
283                            let mut received_messages = vec![];
284            
285                            while let Some(maybe_message) = source.next().await {
286                                let message = maybe_message
287                                    .expect("Failed to unwrap the message.");
288            
289                                received_messages.push(message);
290                                
291                                wait_random(5..25).await;
292
293                                // otherwise the test never completes
294                                if received_messages.len() == stream2_messages2.len() {
295                                    break;
296                                }
297                            }
298            
299                            assert_eq!(
300                                received_messages,
301                                stream2_messages2,
302                                "Sent and received messages must match.",
303                            );
304                        }),
305                    ).unwrap();
306                }),
307                tokio::spawn(async move {
308                    let framed_stream = Framed::new(
309                        stream2,
310                        GenericLinesCodec::<String>::new(),
311                    );
312                    let (
313                        mut sink,
314                        mut source,
315                    ) = framed_stream.split();
316
317                    try_join!(
318                        tokio::spawn(async move {
319                            for message in stream2_messages1 {
320                                sink
321                                    .send(message).await
322                                    .expect("Cannot send message.");
323            
324                                wait_random(5..25).await;
325                            }
326                        }),
327                        tokio::spawn(async move {
328                            let mut received_messages = vec![];
329            
330                            while let Some(maybe_message) = source.next().await {
331                                let message = maybe_message
332                                    .expect("Failed to unwrap the message.");
333            
334                                received_messages.push(message);
335                                
336                                wait_random(5..25).await;
337
338                                // otherwise the test never completes
339                                if received_messages.len() == stream1_messages2.len() {
340                                    break;
341                                }
342                            }
343            
344                            assert_eq!(
345                                received_messages,
346                                stream1_messages2,
347                                "Sent and received messages must match.",
348                            );
349                        }),
350                    ).unwrap();
351                }),
352            ).unwrap();
353        }
354    }
355}