1use serde::{Serialize, de::DeserializeOwned};
2use tokio_util::codec::LinesCodec;
3use std::{marker::PhantomData, fmt::Debug};
4
5mod encoder;
6mod decoder;
7
8#[derive(Debug, Clone)]
9pub struct GenericLinesCodec<T: Serialize + DeserializeOwned> {
10 length_delimited_codec: LinesCodec,
11 _phantom: PhantomData<T>,
12}
13
14impl<T: Serialize + DeserializeOwned> GenericLinesCodec<T> {
15 pub fn new() -> Self {
16 return GenericLinesCodec {
17 length_delimited_codec: LinesCodec::new(),
18 _phantom: PhantomData,
19 };
20 }
21}
22
23impl<T: Serialize + DeserializeOwned> Default for GenericLinesCodec<T> {
24 fn default() -> Self {
25 return GenericLinesCodec::new();
26 }
27}
28
29#[cfg(test)]
30mod tests {
31 use cs_utils::{swap, random_number, random_str, random_str_rg, random_bool, futures::wait_random};
32
33 use super::*;
34
35 mod object {
36 use rstest::rstest;
37 use tokio::try_join;
38 use tokio::io::duplex;
39 use tokio_util::codec::Framed;
40 use futures::{StreamExt, SinkExt};
41 use serde::{Serialize, Deserialize};
42
43 use super::*;
44
45 #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
46 pub struct TestStruct {
47 pub id: u64,
48 pub name: String,
49 pub is_flag: bool,
50 }
51
52 fn random_test_struct(data_len: usize) -> TestStruct {
53 return TestStruct {
54 id: random_number(0..=u64::MAX),
55 name: random_str(data_len),
56 is_flag: random_bool(),
57 };
58 }
59
60 fn random_test_structs(count: usize, data_len: usize) -> Vec<TestStruct> {
61 return (0..count).map(|_| random_test_struct(data_len)).collect();
62 }
63
64 #[rstest]
65 #[case(1, true)]
66 #[case(2, true)]
67 #[case(3, true)]
68 #[case(4, true)]
69 #[case(5, true)]
70 #[case(6, true)]
71 #[case(7, true)]
72 #[case(8, true)]
73 #[case(1, false)]
74 #[case(2, false)]
75 #[case(3, false)]
76 #[case(4, false)]
77 #[case(5, false)]
78 #[case(6, false)]
79 #[case(7, false)]
80 #[case(8, false)]
81 #[tokio::test]
82 async fn can_send_receive_messages_bidirectional(
83 #[case] data_len: usize,
84 #[case] is_forward_direction: bool,
85 ) {
86 let data_len = data_len * data_len * data_len * 1024;
87 let (stream1, stream2) = duplex(random_number(256..=4096));
88
89 let stream1_messages = random_test_structs(random_number(4..8), data_len);
90 let stream1_messages1 = stream1_messages.clone();
91 let stream1_messages2 = stream1_messages.clone();
92
93 let stream2_messages = random_test_structs(random_number(4..8), data_len);
94 let stream2_messages1 = stream2_messages.clone();
95 let stream2_messages2 = stream2_messages.clone();
96
97 let (stream1, stream2) = if !is_forward_direction {
99 swap(stream1, stream2)
100 } else {
101 (stream1, stream2)
102 };
103
104 try_join!(
105 tokio::spawn(async move {
106 let framed_stream = Framed::new(
107 stream1,
108 GenericLinesCodec::<TestStruct>::new(),
109 );
110 let (
111 mut sink,
112 mut source,
113 ) = framed_stream.split();
114
115 try_join!(
116 tokio::spawn(async move {
117 for message in stream1_messages1 {
118 sink
119 .send(message).await
120 .expect("Cannot send message.");
121
122 wait_random(5..25).await;
123 }
124 }),
125 tokio::spawn(async move {
126 let mut received_messages = vec![];
127
128 while let Some(maybe_message) = source.next().await {
129 let message = maybe_message
130 .expect("Failed to unwrap the message.");
131
132 received_messages.push(message);
133
134 wait_random(5..25).await;
135
136 if received_messages.len() == stream2_messages2.len() {
138 break;
139 }
140 }
141
142 assert_eq!(
143 received_messages,
144 stream2_messages2,
145 "Sent and received messages must match.",
146 );
147 }),
148 ).unwrap();
149 }),
150 tokio::spawn(async move {
151 let framed_stream = Framed::new(
152 stream2,
153 GenericLinesCodec::<TestStruct>::new(),
154 );
155 let (
156 mut sink,
157 mut source,
158 ) = framed_stream.split();
159
160 try_join!(
161 tokio::spawn(async move {
162 for message in stream2_messages1 {
163 sink
164 .send(message).await
165 .expect("Cannot send message.");
166
167 wait_random(5..25).await;
168 }
169 }),
170 tokio::spawn(async move {
171 let mut received_messages = vec![];
172
173 while let Some(maybe_message) = source.next().await {
174 let message = maybe_message
175 .expect("Failed to unwrap the message.");
176
177 received_messages.push(message);
178
179 wait_random(5..25).await;
180
181 if received_messages.len() == stream1_messages2.len() {
183 break;
184 }
185 }
186
187 assert_eq!(
188 received_messages,
189 stream1_messages2,
190 "Sent and received messages must match.",
191 );
192 }),
193 ).unwrap();
194 }),
195 ).unwrap();
196 }
197 }
198
199 mod string {
200 use rstest::rstest;
201 use tokio::try_join;
202 use tokio::io::duplex;
203 use tokio_util::codec::Framed;
204 use futures::{StreamExt, SinkExt};
205
206 use super::*;
207
208 fn random_strings(count: usize, data_len: usize) -> Vec<String> {
209 return (0..count)
210 .map(|_| {
211 return format!(
212 "{}\n{}",
213 random_str_rg(1..=data_len/2),
214 random_str_rg(1..=data_len/2)
215 );
216 })
217 .collect();
218 }
219
220 #[rstest]
221 #[case(1, true)]
222 #[case(2, true)]
223 #[case(3, true)]
224 #[case(4, true)]
225 #[case(5, true)]
226 #[case(6, true)]
227 #[case(7, true)]
228 #[case(8, true)]
229 #[case(1, false)]
230 #[case(2, false)]
231 #[case(3, false)]
232 #[case(4, false)]
233 #[case(5, false)]
234 #[case(6, false)]
235 #[case(7, false)]
236 #[case(8, false)]
237 #[tokio::test]
238 async fn can_send_receive_messages_bidirectional(
239 #[case] data_len: usize,
240 #[case] is_forward_direction: bool,
241 ) {
242 let data_len = data_len * data_len * 1024;
243 let (stream1, stream2) = duplex(random_number(256..=1024));
244
245 let stream1_messages = random_strings(random_number(4..8), data_len);
246 let stream1_messages1 = stream1_messages.clone();
247 let stream1_messages2 = stream1_messages.clone();
248
249 let stream2_messages = random_strings(random_number(4..8), data_len);
250
251 let stream2_messages1 = stream2_messages.clone();
252 let stream2_messages2 = stream2_messages.clone();
253
254 let (stream1, stream2) = if !is_forward_direction {
256 swap(stream1, stream2)
257 } else {
258 (stream1, stream2)
259 };
260
261 try_join!(
262 tokio::spawn(async move {
263 let framed_stream = Framed::new(
264 stream1,
265 GenericLinesCodec::<String>::new(),
266 );
267 let (
268 mut sink,
269 mut source,
270 ) = framed_stream.split();
271
272 try_join!(
273 tokio::spawn(async move {
274 for message in stream1_messages1 {
275 sink
276 .send(message).await
277 .expect("Cannot send message.");
278
279 wait_random(5..25).await;
280 }
281 }),
282 tokio::spawn(async move {
283 let mut received_messages = vec![];
284
285 while let Some(maybe_message) = source.next().await {
286 let message = maybe_message
287 .expect("Failed to unwrap the message.");
288
289 received_messages.push(message);
290
291 wait_random(5..25).await;
292
293 if received_messages.len() == stream2_messages2.len() {
295 break;
296 }
297 }
298
299 assert_eq!(
300 received_messages,
301 stream2_messages2,
302 "Sent and received messages must match.",
303 );
304 }),
305 ).unwrap();
306 }),
307 tokio::spawn(async move {
308 let framed_stream = Framed::new(
309 stream2,
310 GenericLinesCodec::<String>::new(),
311 );
312 let (
313 mut sink,
314 mut source,
315 ) = framed_stream.split();
316
317 try_join!(
318 tokio::spawn(async move {
319 for message in stream2_messages1 {
320 sink
321 .send(message).await
322 .expect("Cannot send message.");
323
324 wait_random(5..25).await;
325 }
326 }),
327 tokio::spawn(async move {
328 let mut received_messages = vec![];
329
330 while let Some(maybe_message) = source.next().await {
331 let message = maybe_message
332 .expect("Failed to unwrap the message.");
333
334 received_messages.push(message);
335
336 wait_random(5..25).await;
337
338 if received_messages.len() == stream1_messages2.len() {
340 break;
341 }
342 }
343
344 assert_eq!(
345 received_messages,
346 stream1_messages2,
347 "Sent and received messages must match.",
348 );
349 }),
350 ).unwrap();
351 }),
352 ).unwrap();
353 }
354 }
355}