connection_utils/codecs/
generic_codec.rs1use serde::{Serialize, de::DeserializeOwned};
2use tokio_util::codec::LengthDelimitedCodec;
3use std::{marker::PhantomData, fmt::Debug};
4
5mod encoder;
6mod decoder;
7
8#[derive(Debug, Clone)]
49pub struct GenericCodec<T: Serialize + DeserializeOwned> {
50 length_delimited_codec: LengthDelimitedCodec,
51 _phantom: PhantomData<T>,
52}
53
54impl<T: Serialize + DeserializeOwned> GenericCodec<T> {
55 pub fn new() -> Self {
56 return GenericCodec {
57 length_delimited_codec: LengthDelimitedCodec::new(),
58 _phantom: PhantomData,
59 };
60 }
61}
62
63impl<T: Serialize + DeserializeOwned> Default for GenericCodec<T> {
64 fn default() -> Self {
65 return GenericCodec::new();
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use cs_utils::{random_number, random_str, random_bool, random_str_rg, futures::wait_random, traits::Random, test::random_vec_rg, swap};
72
73 mod object {
74 use rstest::rstest;
75 use tokio::try_join;
76 use tokio::io::duplex;
77 use tokio_util::codec::Framed;
78 use futures::{StreamExt, SinkExt};
79 use serde::{Serialize, Deserialize};
80
81 use super::*;
82 use crate::codecs::GenericCodec;
83
84 #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
85 pub struct TestStruct {
86 pub id: u64,
87 pub name: String,
88 pub is_flag: bool,
89 }
90
91 impl Random for TestStruct {
92 fn random() -> Self {
93 return TestStruct {
94 id: random_number(0..=u64::MAX),
95 name: random_str(32),
96 is_flag: random_bool(),
97 };
98 }
99 }
100
101 #[rstest]
102 #[case(true)]
103 #[case(false)]
104 #[tokio::test]
105 async fn can_send_receive_messages_bidirectional(
106 #[case] is_forward_direction: bool,
107 ) {
108 let (stream1, stream2) = duplex(4096);
109
110 let stream1_messages = random_vec_rg(4..8);
111 let stream1_messages1 = stream1_messages.clone();
112 let stream1_messages2 = stream1_messages.clone();
113
114 let stream2_messages = random_vec_rg(4..8);
115 let stream2_messages1 = stream2_messages.clone();
116 let stream2_messages2 = stream2_messages.clone();
117
118 let (stream1, stream2) = if !is_forward_direction {
120 swap(stream1, stream2)
121 } else {
122 (stream1, stream2)
123 };
124
125 try_join!(
126 tokio::spawn(async move {
127 let framed_stream = Framed::new(
128 stream1,
129 GenericCodec::<TestStruct>::new(),
130 );
131 let (
132 mut sink,
133 mut source,
134 ) = framed_stream.split();
135
136 try_join!(
137 tokio::spawn(async move {
138 for message in stream1_messages1 {
139 sink
140 .send(message).await
141 .expect("Cannot send message.");
142
143 wait_random(5..25).await;
144 }
145 }),
146 tokio::spawn(async move {
147 let mut received_messages = vec![];
148
149 while let Some(maybe_message) = source.next().await {
150 let message = maybe_message
151 .expect("Failed to unwrap the message.");
152
153 received_messages.push(message);
154
155 wait_random(5..25).await;
156
157 if received_messages.len() == stream2_messages2.len() {
159 break;
160 }
161 }
162
163 assert_eq!(
164 received_messages,
165 stream2_messages2,
166 "Sent and received messages must match.",
167 );
168 }),
169 ).unwrap();
170 }),
171 tokio::spawn(async move {
172 let framed_stream = Framed::new(
173 stream2,
174 GenericCodec::<TestStruct>::new(),
175 );
176 let (
177 mut sink,
178 mut source,
179 ) = framed_stream.split();
180
181 try_join!(
182 tokio::spawn(async move {
183 for message in stream2_messages1 {
184 sink
185 .send(message).await
186 .expect("Cannot send message.");
187
188 wait_random(5..25).await;
189 }
190 }),
191 tokio::spawn(async move {
192 let mut received_messages = vec![];
193
194 while let Some(maybe_message) = source.next().await {
195 let message = maybe_message
196 .expect("Failed to unwrap the message.");
197
198 received_messages.push(message);
199
200 wait_random(5..25).await;
201
202 if received_messages.len() == stream1_messages2.len() {
204 break;
205 }
206 }
207
208 assert_eq!(
209 received_messages,
210 stream1_messages2,
211 "Sent and received messages must match.",
212 );
213 }),
214 ).unwrap();
215 }),
216 ).unwrap();
217 }
218 }
219
220 mod string {
221 use rstest::rstest;
222 use tokio::try_join;
223 use tokio::io::duplex;
224 use tokio_util::codec::Framed;
225 use futures::{StreamExt, SinkExt};
226
227 use super::*;
228 use crate::codecs::GenericCodec;
229
230 #[rstest]
231 #[case(true)]
232 #[case(false)]
233 #[tokio::test]
234 async fn can_send_receive_messages_bidirectional(
235 #[case] is_forward_direction: bool,
236 ) {
237 let (stream1, stream2) = duplex(4096);
238
239 let mut stream1_messages = vec![];
240 for _ in 0..random_number(4..8) {
241 stream1_messages.push(random_str_rg(8..32));
242 }
243
244 let mut stream2_messages = vec![];
245 for _ in 0..random_number(4..8) {
246 stream2_messages.push(random_str_rg(8..32));
247 }
248
249 let stream1_messages1 = stream1_messages.clone();
250 let stream1_messages2 = stream1_messages.clone();
251
252 let stream2_messages1 = stream2_messages.clone();
253 let stream2_messages2 = stream2_messages.clone();
254
255 let (stream1, stream2) = if !is_forward_direction {
257 swap(stream1, stream2)
258 } else {
259 (stream1, stream2)
260 };
261
262 try_join!(
263 tokio::spawn(async move {
264 let framed_stream = Framed::new(
265 stream1,
266 GenericCodec::<String>::new(),
267 );
268 let (
269 mut sink,
270 mut source,
271 ) = framed_stream.split();
272
273 try_join!(
274 tokio::spawn(async move {
275 for message in stream1_messages1 {
276 sink
277 .send(message).await
278 .expect("Cannot send message.");
279
280 wait_random(5..25).await;
281 }
282 }),
283 tokio::spawn(async move {
284 let mut received_messages = vec![];
285
286 while let Some(maybe_message) = source.next().await {
287 let message = maybe_message
288 .expect("Failed to unwrap the message.");
289
290 received_messages.push(message);
291
292 wait_random(5..25).await;
293
294 if received_messages.len() == stream2_messages2.len() {
296 break;
297 }
298 }
299
300 assert_eq!(
301 received_messages,
302 stream2_messages2,
303 "Sent and received messages must match.",
304 );
305 }),
306 ).unwrap();
307 }),
308 tokio::spawn(async move {
309 let framed_stream = Framed::new(
310 stream2,
311 GenericCodec::<String>::new(),
312 );
313 let (
314 mut sink,
315 mut source,
316 ) = framed_stream.split();
317
318 try_join!(
319 tokio::spawn(async move {
320 for message in stream2_messages1 {
321 sink
322 .send(message).await
323 .expect("Cannot send message.");
324
325 wait_random(5..25).await;
326 }
327 }),
328 tokio::spawn(async move {
329 let mut received_messages = vec![];
330
331 while let Some(maybe_message) = source.next().await {
332 let message = maybe_message
333 .expect("Failed to unwrap the message.");
334
335 received_messages.push(message);
336
337 wait_random(5..25).await;
338
339 if received_messages.len() == stream1_messages2.len() {
341 break;
342 }
343 }
344
345 assert_eq!(
346 received_messages,
347 stream1_messages2,
348 "Sent and received messages must match.",
349 );
350 }),
351 ).unwrap();
352 }),
353 ).unwrap();
354 }
355 }
356}