cs_utils/utils/futures/
generic_codec.rs1use serde::{Serialize, de::DeserializeOwned};
2use tokio_util::codec::LengthDelimitedCodec;
3use std::{marker::PhantomData, fmt::Debug};
4
5mod encoder;
6mod decoder;
7
8#[derive(Debug, Clone)]
48pub struct GenericCodec<T: Serialize + DeserializeOwned> {
49 length_delimited_codec: LengthDelimitedCodec,
50 _phantom: PhantomData<T>,
51}
52
53impl<T: Serialize + DeserializeOwned> GenericCodec<T> {
54 pub fn new() -> Self {
55 return GenericCodec {
56 length_delimited_codec: LengthDelimitedCodec::new(),
57 _phantom: PhantomData,
58 };
59 }
60}
61
62impl<T: Serialize + DeserializeOwned> Default for GenericCodec<T> {
63 fn default() -> Self {
64 return GenericCodec::new();
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 mod object {
71 use futures::{StreamExt, SinkExt};
72 use rstest::rstest;
73 use tokio::io::duplex;
74 use tokio_util::codec::Framed;
75 use tokio::try_join;
76 use serde::{Serialize, Deserialize};
77
78 use crate::{random_number, random_str, random_bool};
79 use crate::traits::Random;
80 use crate::futures::wait_random;
81
82 use crate::futures::GenericCodec;
83
84 #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
85 pub struct TestStruct {
86 pub id: u64,
87 pub name: String,
88 pub is_flag: bool,
89 }
90
91 impl Random for TestStruct {
92 fn random() -> Self {
93 return TestStruct {
94 id: random_number(0..=u64::MAX),
95 name: random_str(32),
96 is_flag: random_bool(),
97 };
98 }
99 }
100
101 #[rstest]
102 #[case(true)]
103 #[case(false)]
104 #[tokio::test]
105 async fn can_send_receive_messages_bidirectional(
106 #[case] is_forward_direction: bool,
107 ) {
108 use crate::test::random_vec_rg;
109 use crate::swap;
110
111 let (stream1, stream2) = duplex(4096);
112
113 let stream1_messages = random_vec_rg(4..8);
114 let stream1_messages1 = stream1_messages.clone();
115 let stream1_messages2 = stream1_messages.clone();
116
117 let stream2_messages = random_vec_rg(4..8);
118 let stream2_messages1 = stream2_messages.clone();
119 let stream2_messages2 = stream2_messages.clone();
120
121 let (stream1, stream2) = if !is_forward_direction {
123 swap(stream1, stream2)
124 } else {
125 (stream1, stream2)
126 };
127
128 try_join!(
129 tokio::spawn(async move {
130 let framed_stream = Framed::new(
131 stream1,
132 GenericCodec::<TestStruct>::new(),
133 );
134 let (
135 mut sink,
136 mut source,
137 ) = framed_stream.split();
138
139 try_join!(
140 tokio::spawn(async move {
141 for message in stream1_messages1 {
142 sink
143 .send(message).await
144 .expect("Cannot send message.");
145
146 wait_random(5..25).await;
147 }
148 }),
149 tokio::spawn(async move {
150 let mut received_messages = vec![];
151
152 while let Some(maybe_message) = source.next().await {
153 let message = maybe_message
154 .expect("Failed to unwrap the message.");
155
156 received_messages.push(message);
157
158 wait_random(5..25).await;
159
160 if received_messages.len() == stream2_messages2.len() {
162 break;
163 }
164 }
165
166 assert_eq!(
167 received_messages,
168 stream2_messages2,
169 "Sent and received messages must match.",
170 );
171 }),
172 ).unwrap();
173 }),
174 tokio::spawn(async move {
175 let framed_stream = Framed::new(
176 stream2,
177 GenericCodec::<TestStruct>::new(),
178 );
179 let (
180 mut sink,
181 mut source,
182 ) = framed_stream.split();
183
184 try_join!(
185 tokio::spawn(async move {
186 for message in stream2_messages1 {
187 sink
188 .send(message).await
189 .expect("Cannot send message.");
190
191 wait_random(5..25).await;
192 }
193 }),
194 tokio::spawn(async move {
195 let mut received_messages = vec![];
196
197 while let Some(maybe_message) = source.next().await {
198 let message = maybe_message
199 .expect("Failed to unwrap the message.");
200
201 received_messages.push(message);
202
203 wait_random(5..25).await;
204
205 if received_messages.len() == stream1_messages2.len() {
207 break;
208 }
209 }
210
211 assert_eq!(
212 received_messages,
213 stream1_messages2,
214 "Sent and received messages must match.",
215 );
216 }),
217 ).unwrap();
218 }),
219 ).unwrap();
220 }
221 }
222
223 mod string {
224 use futures::{StreamExt, SinkExt};
225 use rstest::rstest;
226 use tokio::io::duplex;
227 use tokio_util::codec::Framed;
228 use tokio::try_join;
229
230 use crate::random_str_rg;
231 use crate::traits::Random;
232 use crate::futures::wait_random;
233
234 use crate::futures::GenericCodec;
235
236 #[rstest]
237 #[case(true)]
238 #[case(false)]
239 #[tokio::test]
240 async fn can_send_receive_messages_bidirectional(
241 #[case] is_forward_direction: bool,
242 ) {
243 use crate::test::random_vec_rg;
244 use crate::swap;
245
246 impl Random for String {
247 fn random() -> String {
248 return random_str_rg(8..32);
249 }
250 }
251
252 let (stream1, stream2) = duplex(4096);
253
254 let stream1_messages = random_vec_rg(4..8);
255 let stream1_messages1 = stream1_messages.clone();
256 let stream1_messages2 = stream1_messages.clone();
257
258 let stream2_messages = random_vec_rg(4..8);
259
260 let stream2_messages1 = stream2_messages.clone();
261 let stream2_messages2 = stream2_messages.clone();
262
263 let (stream1, stream2) = if !is_forward_direction {
265 swap(stream1, stream2)
266 } else {
267 (stream1, stream2)
268 };
269
270 try_join!(
271 tokio::spawn(async move {
272 let framed_stream = Framed::new(
273 stream1,
274 GenericCodec::<String>::new(),
275 );
276 let (
277 mut sink,
278 mut source,
279 ) = framed_stream.split();
280
281 try_join!(
282 tokio::spawn(async move {
283 for message in stream1_messages1 {
284 sink
285 .send(message).await
286 .expect("Cannot send message.");
287
288 wait_random(5..25).await;
289 }
290 }),
291 tokio::spawn(async move {
292 let mut received_messages = vec![];
293
294 while let Some(maybe_message) = source.next().await {
295 let message = maybe_message
296 .expect("Failed to unwrap the message.");
297
298 received_messages.push(message);
299
300 wait_random(5..25).await;
301
302 if received_messages.len() == stream2_messages2.len() {
304 break;
305 }
306 }
307
308 assert_eq!(
309 received_messages,
310 stream2_messages2,
311 "Sent and received messages must match.",
312 );
313 }),
314 ).unwrap();
315 }),
316 tokio::spawn(async move {
317 let framed_stream = Framed::new(
318 stream2,
319 GenericCodec::<String>::new(),
320 );
321 let (
322 mut sink,
323 mut source,
324 ) = framed_stream.split();
325
326 try_join!(
327 tokio::spawn(async move {
328 for message in stream2_messages1 {
329 sink
330 .send(message).await
331 .expect("Cannot send message.");
332
333 wait_random(5..25).await;
334 }
335 }),
336 tokio::spawn(async move {
337 let mut received_messages = vec![];
338
339 while let Some(maybe_message) = source.next().await {
340 let message = maybe_message
341 .expect("Failed to unwrap the message.");
342
343 received_messages.push(message);
344
345 wait_random(5..25).await;
346
347 if received_messages.len() == stream1_messages2.len() {
349 break;
350 }
351 }
352
353 assert_eq!(
354 received_messages,
355 stream1_messages2,
356 "Sent and received messages must match.",
357 );
358 }),
359 ).unwrap();
360 }),
361 ).unwrap();
362 }
363 }
364}