cs_utils/utils/futures/
generic_codec.rs

1use serde::{Serialize, de::DeserializeOwned};
2use tokio_util::codec::LengthDelimitedCodec;
3use std::{marker::PhantomData, fmt::Debug};
4
5mod encoder;
6mod decoder;
7
8/// Codec to pass over any struct that implements `Serialize` and `DeserializeOwned` traits.
9/// 
10/// ### Examples
11/// 
12/// ```
13/// use cs_utils::{random_number, random_str, random_bool, futures::GenericCodec};
14/// use futures::{StreamExt, SinkExt};
15/// use serde::{Serialize, Deserialize};
16/// use tokio::{io::duplex, try_join};
17/// use tokio_util::codec::Framed;
18/// 
19/// #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
20/// struct TestStruct {
21///     id: u16,
22///     name: String,
23///     is_flag: bool,
24/// }
25/// 
26/// println!("\n🏷️  Generic codec example:");
27/// println!("\n⚠️ Note: the \"async\" feature must be enabled!\n");
28///
29/// let (stream, _) = duplex(4096);
30///
31/// let message = TestStruct {
32///     id: random_number(0..=u16::MAX),
33///     name: random_str(8),
34///     is_flag: random_bool(),
35/// };
36/// 
37/// let framed_stream = Framed::new(
38///     stream,
39///     GenericCodec::<TestStruct>::new(),
40///  );
41/// 
42/// let (mut sink, mut source) = framed_stream.split();
43/// 
44/// // send/receive the framed data using the sink/source
45/// // see "generic_codec" example for more details
46/// ```
47#[derive(Debug, Clone)]
48pub struct GenericCodec<T: Serialize + DeserializeOwned> {
49    length_delimited_codec: LengthDelimitedCodec,
50    _phantom: PhantomData<T>,
51}
52
53impl<T: Serialize + DeserializeOwned> GenericCodec<T> {
54    pub fn new() -> Self {
55        return GenericCodec {
56            length_delimited_codec: LengthDelimitedCodec::new(),
57            _phantom: PhantomData,
58        };
59    }
60}
61
62impl<T: Serialize + DeserializeOwned> Default for GenericCodec<T> {
63    fn default() -> Self {
64        return GenericCodec::new();
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    mod object {
71        use futures::{StreamExt, SinkExt};
72        use rstest::rstest;
73        use tokio::io::duplex;
74        use tokio_util::codec::Framed;
75        use tokio::try_join;
76        use serde::{Serialize, Deserialize};
77
78        use crate::{random_number, random_str, random_bool};
79        use crate::traits::Random;
80        use crate::futures::wait_random;
81
82        use crate::futures::GenericCodec;
83
84        #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
85        pub struct TestStruct {
86            pub id: u64,
87            pub name: String,
88            pub is_flag: bool,
89        }
90
91        impl Random for TestStruct {
92            fn random() -> Self {
93                return TestStruct {
94                    id: random_number(0..=u64::MAX),
95                    name: random_str(32),
96                    is_flag: random_bool(),
97                };
98            }
99        }
100
101        #[rstest]
102        #[case(true)]
103        #[case(false)]
104        #[tokio::test]
105        async fn can_send_receive_messages_bidirectional(
106            #[case] is_forward_direction: bool,
107        ) {
108            use crate::test::random_vec_rg;
109            use crate::swap;
110
111            let (stream1, stream2) = duplex(4096);
112            
113            let stream1_messages = random_vec_rg(4..8);
114            let stream1_messages1 = stream1_messages.clone();
115            let stream1_messages2 = stream1_messages.clone();
116
117            let stream2_messages = random_vec_rg(4..8);
118            let stream2_messages1 = stream2_messages.clone();
119            let stream2_messages2 = stream2_messages.clone();
120
121            // maybe swap the direction
122            let (stream1, stream2) = if !is_forward_direction {
123                swap(stream1, stream2)
124            } else {
125                (stream1, stream2)
126            };
127
128            try_join!(
129                tokio::spawn(async move {
130                    let framed_stream = Framed::new(
131                        stream1,
132                        GenericCodec::<TestStruct>::new(),
133                    );
134                    let (
135                        mut sink,
136                        mut source,
137                    ) = framed_stream.split();
138
139                    try_join!(
140                        tokio::spawn(async move {
141                            for message in stream1_messages1 {
142                                sink
143                                    .send(message).await
144                                    .expect("Cannot send message.");
145            
146                                wait_random(5..25).await;
147                            }
148                        }),
149                        tokio::spawn(async move {
150                            let mut received_messages = vec![];
151            
152                            while let Some(maybe_message) = source.next().await {
153                                let message = maybe_message
154                                    .expect("Failed to unwrap the message.");
155            
156                                received_messages.push(message);
157                                
158                                wait_random(5..25).await;
159
160                                // otherwise the test never completes
161                                if received_messages.len() == stream2_messages2.len() {
162                                    break;
163                                }
164                            }
165            
166                            assert_eq!(
167                                received_messages,
168                                stream2_messages2,
169                                "Sent and received messages must match.",
170                            );
171                        }),
172                    ).unwrap();
173                }),
174                tokio::spawn(async move {
175                    let framed_stream = Framed::new(
176                        stream2,
177                        GenericCodec::<TestStruct>::new(),
178                    );
179                    let (
180                        mut sink,
181                        mut source,
182                    ) = framed_stream.split();
183
184                    try_join!(
185                        tokio::spawn(async move {
186                            for message in stream2_messages1 {
187                                sink
188                                    .send(message).await
189                                    .expect("Cannot send message.");
190            
191                                wait_random(5..25).await;
192                            }
193                        }),
194                        tokio::spawn(async move {
195                            let mut received_messages = vec![];
196            
197                            while let Some(maybe_message) = source.next().await {
198                                let message = maybe_message
199                                    .expect("Failed to unwrap the message.");
200            
201                                received_messages.push(message);
202                                
203                                wait_random(5..25).await;
204
205                                // otherwise the test never completes
206                                if received_messages.len() == stream1_messages2.len() {
207                                    break;
208                                }
209                            }
210            
211                            assert_eq!(
212                                received_messages,
213                                stream1_messages2,
214                                "Sent and received messages must match.",
215                            );
216                        }),
217                    ).unwrap();
218                }),
219            ).unwrap();
220        }
221    }
222
223    mod string {
224        use futures::{StreamExt, SinkExt};
225        use rstest::rstest;
226        use tokio::io::duplex;
227        use tokio_util::codec::Framed;
228        use tokio::try_join;
229
230        use crate::random_str_rg;
231        use crate::traits::Random;
232        use crate::futures::wait_random;
233        
234        use crate::futures::GenericCodec;
235
236        #[rstest]
237        #[case(true)]
238        #[case(false)]
239        #[tokio::test]
240        async fn can_send_receive_messages_bidirectional(
241            #[case] is_forward_direction: bool,
242        ) {
243            use crate::test::random_vec_rg;
244            use crate::swap;
245
246            impl Random for String {
247                fn random() -> String {
248                    return random_str_rg(8..32);
249                }
250            }
251
252            let (stream1, stream2) = duplex(4096);
253            
254            let stream1_messages = random_vec_rg(4..8);
255            let stream1_messages1 = stream1_messages.clone();
256            let stream1_messages2 = stream1_messages.clone();
257
258            let stream2_messages = random_vec_rg(4..8);
259
260            let stream2_messages1 = stream2_messages.clone();
261            let stream2_messages2 = stream2_messages.clone();
262
263            // maybe swap the direction
264            let (stream1, stream2) = if !is_forward_direction {
265                swap(stream1, stream2)
266            } else {
267                (stream1, stream2)
268            };
269
270            try_join!(
271                tokio::spawn(async move {
272                    let framed_stream = Framed::new(
273                        stream1,
274                        GenericCodec::<String>::new(),
275                    );
276                    let (
277                        mut sink,
278                        mut source,
279                    ) = framed_stream.split();
280
281                    try_join!(
282                        tokio::spawn(async move {
283                            for message in stream1_messages1 {
284                                sink
285                                    .send(message).await
286                                    .expect("Cannot send message.");
287            
288                                wait_random(5..25).await;
289                            }
290                        }),
291                        tokio::spawn(async move {
292                            let mut received_messages = vec![];
293            
294                            while let Some(maybe_message) = source.next().await {
295                                let message = maybe_message
296                                    .expect("Failed to unwrap the message.");
297            
298                                received_messages.push(message);
299                                
300                                wait_random(5..25).await;
301
302                                // otherwise the test never completes
303                                if received_messages.len() == stream2_messages2.len() {
304                                    break;
305                                }
306                            }
307            
308                            assert_eq!(
309                                received_messages,
310                                stream2_messages2,
311                                "Sent and received messages must match.",
312                            );
313                        }),
314                    ).unwrap();
315                }),
316                tokio::spawn(async move {
317                    let framed_stream = Framed::new(
318                        stream2,
319                        GenericCodec::<String>::new(),
320                    );
321                    let (
322                        mut sink,
323                        mut source,
324                    ) = framed_stream.split();
325
326                    try_join!(
327                        tokio::spawn(async move {
328                            for message in stream2_messages1 {
329                                sink
330                                    .send(message).await
331                                    .expect("Cannot send message.");
332            
333                                wait_random(5..25).await;
334                            }
335                        }),
336                        tokio::spawn(async move {
337                            let mut received_messages = vec![];
338            
339                            while let Some(maybe_message) = source.next().await {
340                                let message = maybe_message
341                                    .expect("Failed to unwrap the message.");
342            
343                                received_messages.push(message);
344                                
345                                wait_random(5..25).await;
346
347                                // otherwise the test never completes
348                                if received_messages.len() == stream1_messages2.len() {
349                                    break;
350                                }
351                            }
352            
353                            assert_eq!(
354                                received_messages,
355                                stream1_messages2,
356                                "Sent and received messages must match.",
357                            );
358                        }),
359                    ).unwrap();
360                }),
361            ).unwrap();
362        }
363    }
364}