connection_utils/codecs/
generic_codec.rs

1use serde::{Serialize, de::DeserializeOwned};
2use tokio_util::codec::LengthDelimitedCodec;
3use std::{marker::PhantomData, fmt::Debug};
4
5mod encoder;
6mod decoder;
7
8/// Codec to pass over any struct that implements `Serialize` and `DeserializeOwned` traits.
9/// 
10/// ### Examples
11/// 
12/// ```
13/// use cs_utils::{random_number, random_str, random_bool};
14/// use tokio_util::codec::Framed;
15/// use futures::{StreamExt, SinkExt};
16/// use tokio::{io::duplex, try_join};
17/// use serde::{Serialize, Deserialize};
18/// use connection_utils::codecs::GenericCodec;
19/// 
20/// #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
21/// struct TestStruct {
22///     id: u16,
23///     name: String,
24///     is_flag: bool,
25/// }
26/// 
27/// println!("\n🏷️  Generic codec example:");
28/// println!("\n⚠️ Note: the \"async\" feature must be enabled!\n");
29///
30/// let (stream, _) = duplex(4096);
31///
32/// let message = TestStruct {
33///     id: random_number(0..=u16::MAX),
34///     name: random_str(8),
35///     is_flag: random_bool(),
36/// };
37/// 
38/// let framed_stream = Framed::new(
39///     stream,
40///     GenericCodec::<TestStruct>::new(),
41///  );
42/// 
43/// let (mut sink, mut source) = framed_stream.split();
44/// 
45/// // send/receive the framed data using the sink/source
46/// // see "generic_codec" example for more details
47/// ```
48#[derive(Debug, Clone)]
49pub struct GenericCodec<T: Serialize + DeserializeOwned> {
50    length_delimited_codec: LengthDelimitedCodec,
51    _phantom: PhantomData<T>,
52}
53
54impl<T: Serialize + DeserializeOwned> GenericCodec<T> {
55    pub fn new() -> Self {
56        return GenericCodec {
57            length_delimited_codec: LengthDelimitedCodec::new(),
58            _phantom: PhantomData,
59        };
60    }
61}
62
63impl<T: Serialize + DeserializeOwned> Default for GenericCodec<T> {
64    fn default() -> Self {
65        return GenericCodec::new();
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use cs_utils::{random_number, random_str, random_bool, random_str_rg, futures::wait_random, traits::Random, test::random_vec_rg, swap};
72
73    mod object {
74        use rstest::rstest;
75        use tokio::try_join;
76        use tokio::io::duplex;
77        use tokio_util::codec::Framed;
78        use futures::{StreamExt, SinkExt};
79        use serde::{Serialize, Deserialize};
80
81        use super::*;
82        use crate::codecs::GenericCodec;
83
84        #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
85        pub struct TestStruct {
86            pub id: u64,
87            pub name: String,
88            pub is_flag: bool,
89        }
90
91        impl Random for TestStruct {
92            fn random() -> Self {
93                return TestStruct {
94                    id: random_number(0..=u64::MAX),
95                    name: random_str(32),
96                    is_flag: random_bool(),
97                };
98            }
99        }
100
101        #[rstest]
102        #[case(true)]
103        #[case(false)]
104        #[tokio::test]
105        async fn can_send_receive_messages_bidirectional(
106            #[case] is_forward_direction: bool,
107        ) {
108            let (stream1, stream2) = duplex(4096);
109            
110            let stream1_messages = random_vec_rg(4..8);
111            let stream1_messages1 = stream1_messages.clone();
112            let stream1_messages2 = stream1_messages.clone();
113
114            let stream2_messages = random_vec_rg(4..8);
115            let stream2_messages1 = stream2_messages.clone();
116            let stream2_messages2 = stream2_messages.clone();
117
118            // maybe swap the direction
119            let (stream1, stream2) = if !is_forward_direction {
120                swap(stream1, stream2)
121            } else {
122                (stream1, stream2)
123            };
124
125            try_join!(
126                tokio::spawn(async move {
127                    let framed_stream = Framed::new(
128                        stream1,
129                        GenericCodec::<TestStruct>::new(),
130                    );
131                    let (
132                        mut sink,
133                        mut source,
134                    ) = framed_stream.split();
135
136                    try_join!(
137                        tokio::spawn(async move {
138                            for message in stream1_messages1 {
139                                sink
140                                    .send(message).await
141                                    .expect("Cannot send message.");
142            
143                                wait_random(5..25).await;
144                            }
145                        }),
146                        tokio::spawn(async move {
147                            let mut received_messages = vec![];
148            
149                            while let Some(maybe_message) = source.next().await {
150                                let message = maybe_message
151                                    .expect("Failed to unwrap the message.");
152            
153                                received_messages.push(message);
154                                
155                                wait_random(5..25).await;
156
157                                // otherwise the test never completes
158                                if received_messages.len() == stream2_messages2.len() {
159                                    break;
160                                }
161                            }
162            
163                            assert_eq!(
164                                received_messages,
165                                stream2_messages2,
166                                "Sent and received messages must match.",
167                            );
168                        }),
169                    ).unwrap();
170                }),
171                tokio::spawn(async move {
172                    let framed_stream = Framed::new(
173                        stream2,
174                        GenericCodec::<TestStruct>::new(),
175                    );
176                    let (
177                        mut sink,
178                        mut source,
179                    ) = framed_stream.split();
180
181                    try_join!(
182                        tokio::spawn(async move {
183                            for message in stream2_messages1 {
184                                sink
185                                    .send(message).await
186                                    .expect("Cannot send message.");
187            
188                                wait_random(5..25).await;
189                            }
190                        }),
191                        tokio::spawn(async move {
192                            let mut received_messages = vec![];
193            
194                            while let Some(maybe_message) = source.next().await {
195                                let message = maybe_message
196                                    .expect("Failed to unwrap the message.");
197            
198                                received_messages.push(message);
199                                
200                                wait_random(5..25).await;
201
202                                // otherwise the test never completes
203                                if received_messages.len() == stream1_messages2.len() {
204                                    break;
205                                }
206                            }
207            
208                            assert_eq!(
209                                received_messages,
210                                stream1_messages2,
211                                "Sent and received messages must match.",
212                            );
213                        }),
214                    ).unwrap();
215                }),
216            ).unwrap();
217        }
218    }
219
220    mod string {
221        use rstest::rstest;
222        use tokio::try_join;
223        use tokio::io::duplex;
224        use tokio_util::codec::Framed;
225        use futures::{StreamExt, SinkExt};
226
227        use super::*;
228        use crate::codecs::GenericCodec;
229
230        #[rstest]
231        #[case(true)]
232        #[case(false)]
233        #[tokio::test]
234        async fn can_send_receive_messages_bidirectional(
235            #[case] is_forward_direction: bool,
236        ) {
237            let (stream1, stream2) = duplex(4096);
238
239            let mut stream1_messages = vec![];
240            for _ in 0..random_number(4..8) {
241                stream1_messages.push(random_str_rg(8..32));
242            }
243
244            let mut stream2_messages = vec![];
245            for _ in 0..random_number(4..8) {
246                stream2_messages.push(random_str_rg(8..32));
247            }
248            
249            let stream1_messages1 = stream1_messages.clone();
250            let stream1_messages2 = stream1_messages.clone();
251
252            let stream2_messages1 = stream2_messages.clone();
253            let stream2_messages2 = stream2_messages.clone();
254
255            // maybe swap the direction
256            let (stream1, stream2) = if !is_forward_direction {
257                swap(stream1, stream2)
258            } else {
259                (stream1, stream2)
260            };
261
262            try_join!(
263                tokio::spawn(async move {
264                    let framed_stream = Framed::new(
265                        stream1,
266                        GenericCodec::<String>::new(),
267                    );
268                    let (
269                        mut sink,
270                        mut source,
271                    ) = framed_stream.split();
272
273                    try_join!(
274                        tokio::spawn(async move {
275                            for message in stream1_messages1 {
276                                sink
277                                    .send(message).await
278                                    .expect("Cannot send message.");
279            
280                                wait_random(5..25).await;
281                            }
282                        }),
283                        tokio::spawn(async move {
284                            let mut received_messages = vec![];
285            
286                            while let Some(maybe_message) = source.next().await {
287                                let message = maybe_message
288                                    .expect("Failed to unwrap the message.");
289            
290                                received_messages.push(message);
291                                
292                                wait_random(5..25).await;
293
294                                // otherwise the test never completes
295                                if received_messages.len() == stream2_messages2.len() {
296                                    break;
297                                }
298                            }
299            
300                            assert_eq!(
301                                received_messages,
302                                stream2_messages2,
303                                "Sent and received messages must match.",
304                            );
305                        }),
306                    ).unwrap();
307                }),
308                tokio::spawn(async move {
309                    let framed_stream = Framed::new(
310                        stream2,
311                        GenericCodec::<String>::new(),
312                    );
313                    let (
314                        mut sink,
315                        mut source,
316                    ) = framed_stream.split();
317
318                    try_join!(
319                        tokio::spawn(async move {
320                            for message in stream2_messages1 {
321                                sink
322                                    .send(message).await
323                                    .expect("Cannot send message.");
324            
325                                wait_random(5..25).await;
326                            }
327                        }),
328                        tokio::spawn(async move {
329                            let mut received_messages = vec![];
330            
331                            while let Some(maybe_message) = source.next().await {
332                                let message = maybe_message
333                                    .expect("Failed to unwrap the message.");
334            
335                                received_messages.push(message);
336                                
337                                wait_random(5..25).await;
338
339                                // otherwise the test never completes
340                                if received_messages.len() == stream1_messages2.len() {
341                                    break;
342                                }
343                            }
344            
345                            assert_eq!(
346                                received_messages,
347                                stream1_messages2,
348                                "Sent and received messages must match.",
349                            );
350                        }),
351                    ).unwrap();
352                }),
353            ).unwrap();
354        }
355    }
356}