connection_utils/utils/
divide_channel.rs

1use std::{pin::Pin, sync::Arc};
2
3use anyhow::Result;
4use tokio_util::codec::Framed;
5use serde::{Serialize, Deserialize};
6use cs_utils::{random_str, random_number};
7use futures::{StreamExt, stream::{SplitStream, SplitSink}, SinkExt};
8use tokio::{io::{duplex, WriteHalf, ReadHalf, AsyncReadExt, AsyncWriteExt, split, AsyncRead, AsyncWrite}, sync::{Mutex, watch::Receiver}};
9
10use crate::{codecs::GenericCodec, Channel, TransportChannel, create_framed_stream};
11
12#[derive(Serialize, Deserialize, Debug)]
13pub enum LayerMessage {
14    Data(u8, Vec<u8>),
15    Close(u8),
16}
17
18async fn forward_reads<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
19    on_channel1_closed: Receiver<bool>,
20    on_channel2_closed: Receiver<bool>,
21    mut channel: SplitStream<Framed<Pin<Box<TAsyncDuplex>>, GenericCodec<LayerMessage>>>,
22    mut child1: WriteHalf<Pin<Box<dyn Channel>>>,
23    mut child2: WriteHalf<Pin<Box<dyn Channel>>>,
24)-> Result<()> {
25    let mut child1 = Pin::new(&mut child1);
26    let mut child2 = Pin::new(&mut child2);
27
28    loop {
29        let message = match channel.next().await {
30            Some(item) => item?,
31            None => {
32                // TODO: error out?
33
34                return Ok(());
35            },
36        };
37
38        match message {
39            LayerMessage::Data(id, data) => {
40                let child_channel = match id {
41                    0 => &mut child1,
42                    1 => &mut child2,
43                    _ => panic!("Unknown channel number: {}.", id),
44                };
45        
46                let on_channel_closed = match id {
47                    0 => &on_channel1_closed,
48                    1 => &on_channel2_closed,
49                    _ => panic!("Unknown channel number: {}.", id),
50                };
51        
52                if data.len() == 0 && *on_channel_closed.borrow()  {
53                    continue;
54                }
55        
56                let result = child_channel.write_all(&data[..]).await;
57        
58                if result.is_err() {
59                    // println!("[{}][forward_reads]> failed to write {} bytes to channel {}", label, data.len(), id);
60                }
61            },
62            LayerMessage::Close(id) => {
63                let child_channel = match id {
64                    0 => &mut child1,
65                    1 => &mut child2,
66                    _ => panic!("Unknown channel number: {}.", id),
67                };
68
69                child_channel.shutdown().await?;
70            },
71        };
72    }
73}
74
75async fn read_channel<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
76    on_channel_closed: Receiver<bool>,
77    channel_number: u8,
78    mut channel: ReadHalf<Pin<Box<dyn Channel>>>,
79    target_channel: Arc<Mutex<SplitSink<Framed<Pin<Box<TAsyncDuplex>>, GenericCodec<LayerMessage>>, LayerMessage>>>,
80    _buffer_size: u32,
81) -> Result<()> {
82    let mut buf = [0; 4_096];
83
84    loop {
85        let bytes_read = channel.read(&mut buf).await?;
86        let data = &buf[..bytes_read];
87
88        let is_closed = *on_channel_closed.borrow();
89
90        {
91            let mut lock = target_channel.lock().await;
92
93            if data.len() == 0 && is_closed {
94                let _result = lock.send(LayerMessage::Close(channel_number)).await;
95
96                return Ok(());
97            }
98
99            // send channel data message
100            lock.send(LayerMessage::Data(channel_number, data.to_vec())).await?;
101        }
102    }
103}
104
105async fn forward_writes<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
106    on_channel1_closed: Receiver<bool>,
107    on_channel2_closed: Receiver<bool>,
108    channel: SplitSink<Framed<Pin<Box<TAsyncDuplex>>, GenericCodec<LayerMessage>>, LayerMessage>,
109    child1: ReadHalf<Pin<Box<dyn Channel>>>,
110    child2: ReadHalf<Pin<Box<dyn Channel>>>,
111    buffer_size: u32,
112) -> Result<()> {
113    let channel = Arc::new(Mutex::new(channel));
114
115    tokio::try_join!(
116        Box::pin(read_channel(on_channel1_closed, 0, child1, Arc::clone(&channel), buffer_size)),
117        Box::pin(read_channel(on_channel2_closed, 1, child2, Arc::clone(&channel), buffer_size)),
118    )?;
119
120    return Ok(());
121}
122
123async fn forward<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
124    channel: Box<TAsyncDuplex>,
125    child1: Box<dyn Channel>,
126    child2: Box<dyn Channel>,
127    buffer_size: u32,
128) {
129    let (
130        sink,
131        source,
132    ) = create_framed_stream::<LayerMessage, _>(channel).split();
133
134    let on_channel1_closed1 = child1.on_close();
135    let on_channel2_closed1 = child2.on_close();
136
137    let on_channel1_closed2 = child1.on_close();
138    let on_channel2_closed2 = child2.on_close();
139
140    let (child1_read, child1_write) = split(Pin::new(child1));
141    let (child2_read, child2_write) = split(Pin::new(child2));
142
143    tokio::join!(
144        Box::pin(async move {
145            match forward_reads(on_channel1_closed1, on_channel2_closed1, source, child1_write, child2_write).await {
146                Ok(_) => {
147                    // println!("[{}][forward][forward_reads]> succeed", label1);
148                },
149                Err(_error) => {
150                    // println!("[{}][forward][forward_reads]> failed: {}", label1, error);
151
152                    // panic!("[{}][forward][forward_reads]> failed {}", label1, error);
153                },
154            };
155        }),
156        Box::pin(async move {
157            match forward_writes(on_channel1_closed2, on_channel2_closed2, sink, child1_read, child2_read, buffer_size).await {
158                Ok(_) => {
159                    // println!("[{}][forward][forward_writes]> succeed", label2);
160                },
161                Err(_error) => {
162                    // println!("[{}][forward][forward_writes]> failed {}", label2, error);
163
164                    // panic!("[{}][forward][forward_writes]> failed {}", label2, error);
165                },
166            };
167        }),
168    );
169}
170
171fn divide_impl<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
172    id: u16,
173    label: String,
174    buffer_size: u32,
175    stream: Box<TAsyncDuplex>,
176) -> (Box<dyn Channel>, Box<dyn Channel>) {
177    let (child1_sink, child1_source) = duplex(buffer_size as usize);
178    let (child2_sink, child2_source) = duplex(buffer_size as usize);
179
180    let (
181        child_channel1,
182        child_channel2,
183    ) = TransportChannel::new_pair(
184        id,
185        format!("{}-1", label),
186        (Box::new(child1_sink), Box::new(child1_source)),
187        buffer_size,
188    );
189
190    let (
191        forward_channel1,
192        forward_channel2,
193    ) = TransportChannel::new_pair(
194        id,
195        format!("{}-2", label),
196        (Box::new(child2_sink), Box::new(child2_source)),
197        buffer_size,
198    );
199
200   tokio::spawn(
201        forward(
202            stream,
203            child_channel1,
204            forward_channel1,
205            buffer_size,
206        ),
207    );
208
209    return (child_channel2, forward_channel2);
210}
211
212/// Divides a channel into two.
213pub fn divide_channel(
214    channel: Box<dyn Channel>,
215) -> (Box<dyn Channel>, Box<dyn Channel>) {
216    let id = channel.id();
217    let label = channel.label().to_string();
218    let buffer_size = channel.buffer_size();
219
220    return divide_impl(id, label, buffer_size, channel);
221}
222
223/// Divides a stream into two. Returns two channels since channels are more powerful while
224/// still implement the same traits as the streams do.
225pub fn divide_stream<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
226    stream: Box<TAsyncDuplex>,
227) -> (Box<dyn Channel>, Box<dyn Channel>) {
228    let id = random_number(0..=u16::MAX);
229    let label = format!("stream-{}", random_str(4));
230
231    return divide_impl(id, label, 4_096, stream);
232}
233
234#[cfg(test)]
235mod tests {
236    use rstest::rstest;
237    use tokio::io::{AsyncReadExt, AsyncWriteExt};
238    use cs_utils::{futures::wait_random, traits::Random};
239    
240    use crate::test::{test_stream, TestOptions};
241    use crate::mocks::{channel_mock_pair, ChannelMockOptions};
242
243    use super::{divide_channel, divide_stream};
244
245    mod divide_channel {
246        use super::*;
247
248        #[rstest]
249        #[case(128)]
250        #[case(256)]
251        #[case(512)]
252        #[case(1_024)]
253        #[case(2_048)]
254        #[case(4_096)]
255        #[case(8_192)]
256        #[case(16_384)]
257        #[tokio::test]
258        async fn transfers_data(
259            #[case] test_data_size: usize,
260        ) {
261            let (local_channel, remote_channel) = channel_mock_pair(
262                ChannelMockOptions::random().with_label("local"),
263                ChannelMockOptions::random().with_label("remote"),
264            );
265
266            let (local_channel1, local_channel2) = divide_channel(local_channel);
267            let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
268
269            tokio::try_join!(
270                tokio::spawn(async move {
271                    return test_stream(
272                        local_channel1,
273                        remote_channel1,
274                        TestOptions::random()
275                            .with_data_len(test_data_size),
276                    ).await;
277                }),
278                tokio::spawn(async move {
279                    return test_stream(
280                        local_channel2,
281                        remote_channel2,
282                        TestOptions::random()
283                            .with_data_len(test_data_size),
284                    ).await;
285                }),
286            ).unwrap();
287        }
288
289        #[rstest]
290        #[case(128)]
291        #[case(256)]
292        #[case(512)]
293        #[case(1_024)]
294        #[case(2_048)]
295        #[case(4_096)]
296        #[case(8_192)]
297        #[case(16_384)]
298        #[tokio::test]
299        async fn transfers_data_if_second_channel_is_not_used(
300            #[case] test_data_size: usize,
301        ) {
302            let (local_channel, remote_channel) = channel_mock_pair(
303                ChannelMockOptions::random().with_label("local"),
304                ChannelMockOptions::random().with_label("remote"),
305            );
306
307            let (local_channel1, _local_channel2) = divide_channel(local_channel);
308            let (remote_channel1, _remote_channel2) = divide_channel(remote_channel);
309
310            test_stream(
311                local_channel1,
312                remote_channel1,
313                TestOptions::random()
314                    .with_data_len(test_data_size),
315            ).await;
316        }
317
318        #[rstest]
319        #[case(128)]
320        #[case(256)]
321        #[case(512)]
322        #[case(1_024)]
323        #[case(2_048)]
324        #[case(4_096)]
325        #[case(8_192)]
326        #[case(16_384)]
327        #[tokio::test]
328        async fn transfers_data_if_first_channel_is_not_used(
329            #[case] test_data_size: usize,
330        ) {
331            let (local_channel, remote_channel) = channel_mock_pair(
332                ChannelMockOptions::random().with_label("local"),
333                ChannelMockOptions::random().with_label("remote"),
334            );
335
336            let (_local_channel1, local_channel2) = divide_channel(local_channel);
337            let (_remote_channel1, remote_channel2) = divide_channel(remote_channel);
338
339            test_stream(
340                local_channel2,
341                remote_channel2,
342                TestOptions::random()
343                    .with_data_len(test_data_size),
344            ).await;
345        }
346
347        #[rstest]
348        #[case(128)]
349        #[case(256)]
350        #[case(512)]
351        #[case(1_024)]
352        #[case(2_048)]
353        #[case(4_096)]
354        #[case(8_192)]
355        #[case(16_384)]
356        #[tokio::test]
357        async fn transfers_data_if_second_channel_dropped(
358            #[case] test_data_size: usize,
359        ) {
360            let (local_channel, remote_channel) = channel_mock_pair(
361                ChannelMockOptions::random().with_label("local"),
362                ChannelMockOptions::random().with_label("remote"),
363            );
364
365            let (local_channel1, local_channel2) = divide_channel(local_channel);
366            let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
367
368            drop(local_channel2);
369            drop(remote_channel2);
370
371            test_stream(
372                local_channel1,
373                remote_channel1,
374                TestOptions::random()
375                    .with_data_len(test_data_size),
376            ).await;
377        }
378
379        #[rstest]
380        #[case(128)]
381        #[case(256)]
382        #[case(512)]
383        #[case(1_024)]
384        #[case(2_048)]
385        #[case(4_096)]
386        #[case(8_192)]
387        #[case(16_384)]
388        #[tokio::test]
389        async fn transfers_data_if_first_channel_dropped(
390            #[case] test_data_size: usize,
391        ) {
392            let (local_channel, remote_channel) = channel_mock_pair(
393                ChannelMockOptions::random().with_label("local"),
394                ChannelMockOptions::random().with_label("remote"),
395            );
396
397            let (local_channel1, local_channel2) = divide_channel(local_channel);
398            let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
399
400            drop(local_channel1);
401            drop(remote_channel1);
402
403            test_stream(
404                local_channel2,
405                remote_channel2,
406                TestOptions::random()
407                    .with_data_len(test_data_size),
408            ).await;
409        }
410
411        #[rstest]
412        #[case(128)]
413        #[case(256)]
414        #[case(512)]
415        #[case(1_024)]
416        #[case(2_048)]
417        #[case(4_096)]
418        #[case(8_192)]
419        #[case(16_384)]
420        #[tokio::test]
421        async fn transfers_data_if_second_channel_shut_down(
422            #[case] test_data_size: usize,
423        ) {
424            let (local_channel, remote_channel) = channel_mock_pair(
425                ChannelMockOptions::random().with_label("local"),
426                ChannelMockOptions::random().with_label("remote"),
427            );
428
429            let (local_channel1, mut local_channel2) = divide_channel(local_channel);
430            let (remote_channel1, mut remote_channel2) = divide_channel(remote_channel);
431
432            local_channel2.shutdown().await.unwrap();
433            remote_channel2.shutdown().await.unwrap();
434
435            test_stream(
436                local_channel1,
437                remote_channel1,
438                TestOptions::random()
439                    .with_data_len(test_data_size),
440            ).await;
441        }
442
443        #[rstest]
444        #[case(512)]
445        #[case(1_024)]
446        #[case(2_048)]
447        #[case(4_096)]
448        #[case(8_192)]
449        #[case(16_384)]
450        #[case(32_768)]
451        #[tokio::test]
452        async fn transfers_data_if_second_channel_shut_down_during_transfer(
453            #[case] test_data_size: usize,
454        ) {
455            let (local_channel, remote_channel) = channel_mock_pair(
456                ChannelMockOptions::random().with_label("local"),
457                ChannelMockOptions::random().with_label("remote"),
458            );
459
460            let (local_channel1, local_channel2) = divide_channel(local_channel);
461            let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
462
463            tokio::join!(
464                Box::pin(async move {
465                    test_stream(
466                        local_channel1,
467                        remote_channel1,
468                        TestOptions::random()
469                            .with_data_len(test_data_size),
470                    ).await;
471                }),
472                Box::pin(async move {
473                    // transfer a bit of data
474                    let (mut local_channel2, mut remote_channel2) = test_stream(
475                        local_channel2,
476                        remote_channel2,
477                        TestOptions::random()
478                            .with_data_len(128),
479                    ).await;
480
481                    // shutdown the second channel
482                    tokio::join!(
483                        Box::pin(async move {
484                            wait_random(1..5).await;
485                            local_channel2.shutdown().await.unwrap();
486                        }),
487                        Box::pin(async move {
488                            wait_random(1..5).await;
489                            remote_channel2.shutdown().await.unwrap();
490                        }),
491                    );
492                }),
493            );
494        }
495
496        #[rstest]
497        #[case(128)]
498        #[case(256)]
499        #[case(512)]
500        #[case(1_024)]
501        #[case(2_048)]
502        #[case(4_096)]
503        #[case(8_192)]
504        #[case(16_384)]
505        #[case(32_768)]
506        #[tokio::test]
507        async fn transfers_data_if_first_channel_shut_down(
508            #[case] test_data_size: usize,
509        ) {
510            let (local_channel, remote_channel) = channel_mock_pair(
511                ChannelMockOptions::random().with_label("local"),
512                ChannelMockOptions::random().with_label("remote"),
513            );
514
515            let (mut local_channel1, local_channel2) = divide_channel(local_channel);
516            let (mut remote_channel1, remote_channel2) = divide_channel(remote_channel);
517
518            local_channel1.shutdown().await.unwrap();
519            remote_channel1.shutdown().await.unwrap();
520
521            test_stream(
522                local_channel2,
523                remote_channel2,
524                TestOptions::random()
525                    .with_data_len(test_data_size),
526            ).await;
527        }
528
529        #[rstest]
530        #[case(512)]
531        #[case(1_024)]
532        #[case(2_048)]
533        #[case(4_096)]
534        #[case(8_192)]
535        #[case(16_384)]
536        #[case(32_768)]
537        #[tokio::test]
538        async fn transfers_data_if_first_channel_shut_down_during_transfer(
539            #[case] test_data_size: usize,
540        ) {
541            let (local_channel, remote_channel) = channel_mock_pair(
542                ChannelMockOptions::random().with_label("local"),
543                ChannelMockOptions::random().with_label("remote"),
544            );
545
546            let (local_channel1, local_channel2) = divide_channel(local_channel);
547            let (remote_channel1,remote_channel2) = divide_channel(remote_channel);
548
549            tokio::join!(
550                Box::pin(async move {
551                    // transfer a bit of data
552                    let (mut local_channel1, mut remote_channel1) = test_stream(
553                        local_channel1,
554                        remote_channel1,
555                        TestOptions::random()
556                            .with_data_len(128),
557                    ).await;
558
559                    // shutdown the first channel
560                    tokio::join!(
561                        Box::pin(async move {
562                            wait_random(1..5).await;
563                            local_channel1.shutdown().await.unwrap();
564                        }),
565                        Box::pin(async move {
566                            wait_random(1..5).await;
567                            remote_channel1.shutdown().await.unwrap();
568                        }),
569                    );
570                }),
571                Box::pin(async move {
572                    test_stream(
573                        local_channel2,
574                        remote_channel2,
575                        TestOptions::random()
576                            .with_data_len(test_data_size),
577                    ).await;
578                }),
579            );
580        }
581
582        mod shutdown {
583            use super::*;
584
585            #[rstest]
586            #[case(128)]
587            #[case(256)]
588            #[case(512)]
589            #[case(1_024)]
590            #[case(2_048)]
591            #[case(4_096)]
592            #[case(8_192)]
593            #[case(16_384)]
594            #[tokio::test]
595            async fn first_channel_closes_channel_if_remote_counterpart_is_closed(
596                #[case] test_data_size: usize,
597            ) {
598                let (local_channel, remote_channel) = channel_mock_pair(
599                    ChannelMockOptions::random().with_label("local"),
600                    ChannelMockOptions::random().with_label("remote"),
601                );
602
603                let (local_channel1, local_channel2) = divide_channel(local_channel);
604                let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
605
606                let (
607                    mut local_channel1,
608                    mut remote_channel1,
609                ) = test_stream(
610                    local_channel1,
611                    remote_channel1,
612                    TestOptions::random()
613                        .with_data_len(test_data_size),
614                ).await;
615
616                tokio::join!(
617                    // test that the second channel is not affected by shutting down of the first one
618                    Box::pin(async move {
619                        wait_random(1..=10).await;
620
621                        return test_stream(
622                            local_channel2,
623                            remote_channel2,
624                            TestOptions::random()
625                                .with_data_len(2 * test_data_size),
626                        ).await
627                    }),
628                    // transfer data in the first channel and then shut it down
629                    // after shutting down, test that the remote counterpart receives `EOF` message
630                    Box::pin(async move {
631                        tokio::join!(
632                            Box::pin(async move {
633                                wait_random(5..=25).await;
634                
635                                local_channel1.shutdown().await.unwrap();
636
637                                assert!(
638                                    local_channel1.write("test".as_bytes()).await.is_err(),
639                                    "Must fail to write to closed local channel.",
640                                );
641
642                                assert!(
643                                    local_channel1.is_closed(),
644                                    "Channel must be closed.",
645                                );
646                            }),
647                            Box::pin(async move {
648                                let mut buf = [0; 4096];
649                
650                                let bytes_read = remote_channel1.read(&mut buf)
651                                    .await.unwrap();
652                
653                                assert_eq!(
654                                    bytes_read,
655                                    0,
656                                    "Must be 0 bytes read if remote channel is closed.",
657                                );
658
659                                assert!(
660                                    remote_channel1.is_closed(),
661                                    "Channel must be closed.",
662                                );
663                            }),
664                        );
665                    }),
666                );
667            }
668
669            #[rstest]
670            #[case(128)]
671            #[case(256)]
672            #[case(512)]
673            #[case(1_024)]
674            #[case(2_048)]
675            #[case(4_096)]
676            #[case(8_192)]
677            #[case(16_384)]
678            #[tokio::test]
679            async fn second_channel_closes_channel_if_remote_counterpart_is_closed(
680                #[case] test_data_size: usize,
681            ) {
682                let (local_channel, remote_channel) = channel_mock_pair(
683                    ChannelMockOptions::random().with_label("local"),
684                    ChannelMockOptions::random().with_label("remote"),
685                );
686
687                let (local_channel1, local_channel2) = divide_channel(local_channel);
688                let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
689
690                let (
691                    mut local_channel2,
692                    mut remote_channel2,
693                ) = test_stream(
694                    local_channel2,
695                    remote_channel2,
696                    TestOptions::random()
697                        .with_data_len(test_data_size),
698                ).await;
699
700                tokio::join!(
701                    // test that the first channel is not affected by shutting down of the first one
702                    Box::pin(async move {
703                        wait_random(1..=10).await;
704
705                        return test_stream(
706                            local_channel1,
707                            remote_channel1,
708                            TestOptions::random()
709                                .with_data_len(2 * test_data_size),
710                        ).await
711                    }),
712                    // transfer data in the second channel and then shut it down
713                    // after shutting down, test that the remote counterpart receives `EOF` message
714                    Box::pin(async move {
715                        tokio::join!(
716                            Box::pin(async move {
717                                wait_random(5..=25).await;
718                
719                                local_channel2.shutdown().await.unwrap();
720
721                                assert!(
722                                    local_channel2.write("test".as_bytes()).await.is_err(),
723                                    "Must fail to write to closed local channel.",
724                                );
725
726                                assert!(
727                                    local_channel2.is_closed(),
728                                    "Channel must be closed.",
729                                );
730                            }),
731                            Box::pin(async move {
732                                let mut buf = [0; 4096];
733                
734                                let bytes_read = remote_channel2.read(&mut buf)
735                                    .await.unwrap();
736                
737                                assert_eq!(
738                                    bytes_read,
739                                    0,
740                                    "Must be 0 bytes read if remote channel is closed.",
741                                );
742
743                                assert!(
744                                    remote_channel2.is_closed(),
745                                    "Channel must be closed.",
746                                );
747                            }),
748                        );
749                    }),
750                );
751            }
752
753            #[rstest]
754            #[case(128)]
755            #[case(256)]
756            #[case(512)]
757            #[case(1_024)]
758            #[case(2_048)]
759            #[case(4_096)]
760            #[case(8_192)]
761            #[case(16_384)]
762            #[tokio::test]
763            async fn both_channels_close_channel_if_remote_counterpart_is_closed(
764                #[case] test_data_size: usize,
765            ) {
766                let (local_channel, remote_channel) = channel_mock_pair(
767                    ChannelMockOptions::random().with_label("local"),
768                    ChannelMockOptions::random().with_label("remote"),
769                );
770
771                let (local_channel1, local_channel2) = divide_channel(local_channel);
772                let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
773
774                tokio::join!(
775                    // transfer data in the first channel and then shut it down
776                    // after shutting down, test that the remote counterpart receives `EOF` message
777                    Box::pin(async move {
778                        wait_random(1..=25).await;
779
780                        let (
781                            mut local_channel1,
782                            mut remote_channel1,
783                        ) = test_stream(
784                            local_channel1,
785                            remote_channel1,
786                            TestOptions::random()
787                                .with_data_len(test_data_size),
788                        ).await;
789
790                        tokio::join!(
791                            Box::pin(async move {
792                                wait_random(1..=25).await;
793                
794                                local_channel1.shutdown().await.unwrap();
795
796                                assert!(
797                                    local_channel1.write(cs_utils::random_str_rg(16..64).as_bytes()).await.is_err(),
798                                    "Must fail to write to closed local channel.",
799                                );
800
801                                assert!(
802                                    local_channel1.write("test".as_bytes()).await.is_err(),
803                                    "Must fail to write to closed local channel.",
804                                );
805
806                                assert!(
807                                    local_channel1.is_closed(),
808                                    "Channel must be closed.",
809                                );
810                            }),
811                            Box::pin(async move {
812                                let mut buf = [0; 4096];
813                
814                                let bytes_read = remote_channel1.read(&mut buf)
815                                    .await.unwrap();
816                
817                                assert_eq!(
818                                    bytes_read,
819                                    0,
820                                    "Must be 0 bytes read if remote channel is closed.",
821                                );
822
823                                assert!(
824                                    remote_channel1.is_closed(),
825                                    "Channel must be closed.",
826                                );
827                            }),
828                        );
829                    }),
830                    // transfer data in the second channel and then shut it down
831                    // after shutting down, test that the remote counterpart receives `EOF` message
832                    Box::pin(async move {
833                        wait_random(1..=25).await;
834
835                        let (
836                            mut local_channel2,
837                            mut remote_channel2,
838                        ) = test_stream(
839                            local_channel2,
840                            remote_channel2,
841                            TestOptions::random()
842                                .with_data_len(test_data_size),
843                        ).await;
844
845                        tokio::join!(
846                            Box::pin(async move {
847                                wait_random(1..=25).await;
848                
849                                local_channel2.shutdown().await.unwrap();
850
851                                assert!(
852                                    local_channel2.write("test".as_bytes()).await.is_err(),
853                                    "Must fail to write to closed local channel.",
854                                );
855
856                                assert!(
857                                    local_channel2.is_closed(),
858                                    "Channel must be closed.",
859                                );
860                            }),
861                            Box::pin(async move {
862                                let mut buf = [0; 4096];
863                
864                                let bytes_read = remote_channel2.read(&mut buf)
865                                    .await.unwrap();
866                
867                                assert_eq!(
868                                    bytes_read,
869                                    0,
870                                    "Must be 0 bytes read if remote channel is closed.",
871                                );
872
873                                assert!(
874                                    remote_channel2.is_closed(),
875                                    "Channel must be closed.",
876                                );
877                            }),
878                        );
879                    }),
880                );
881            }
882        }
883    }
884
885    mod divide_stream {
886        use super::*;
887
888        #[rstest]
889        #[case(128)]
890        #[case(256)]
891        #[case(512)]
892        #[case(1_024)]
893        #[case(2_048)]
894        #[case(4_096)]
895        #[case(8_192)]
896        #[case(16_384)]
897        #[tokio::test]
898        async fn transfers_data(
899            #[case] test_data_size: usize,
900        ) {
901            let (local_channel, remote_channel) = channel_mock_pair(
902                ChannelMockOptions::random().with_label("local"),
903                ChannelMockOptions::random().with_label("remote"),
904            );
905
906            let (local_channel1, local_channel2) = divide_stream(local_channel);
907            let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
908
909            tokio::try_join!(
910                tokio::spawn(async move {
911                    return test_stream(
912                        local_channel1,
913                        remote_channel1,
914                        TestOptions::random()
915                            .with_data_len(test_data_size),
916                    ).await;
917                }),
918                tokio::spawn(async move {
919                    return test_stream(
920                        local_channel2,
921                        remote_channel2,
922                        TestOptions::random()
923                            .with_data_len(test_data_size),
924                    ).await;
925                }),
926            ).unwrap();
927        }
928
929        #[rstest]
930        #[case(128)]
931        #[case(256)]
932        #[case(512)]
933        #[case(1_024)]
934        #[case(2_048)]
935        #[case(4_096)]
936        #[case(8_192)]
937        #[case(16_384)]
938        #[tokio::test]
939        async fn transfers_data_if_second_channel_is_not_used(
940            #[case] test_data_size: usize,
941        ) {
942            let (local_channel, remote_channel) = channel_mock_pair(
943                ChannelMockOptions::random().with_label("local"),
944                ChannelMockOptions::random().with_label("remote"),
945            );
946
947            let (local_channel1, _local_channel2) = divide_stream(local_channel);
948            let (remote_channel1, _remote_channel2) = divide_stream(remote_channel);
949
950            test_stream(
951                local_channel1,
952                remote_channel1,
953                TestOptions::random()
954                    .with_data_len(test_data_size),
955            ).await;
956        }
957
958        #[rstest]
959        #[case(128)]
960        #[case(256)]
961        #[case(512)]
962        #[case(1_024)]
963        #[case(2_048)]
964        #[case(4_096)]
965        #[case(8_192)]
966        #[case(16_384)]
967        #[tokio::test]
968        async fn transfers_data_if_first_channel_is_not_used(
969            #[case] test_data_size: usize,
970        ) {
971            let (local_channel, remote_channel) = channel_mock_pair(
972                ChannelMockOptions::random().with_label("local"),
973                ChannelMockOptions::random().with_label("remote"),
974            );
975
976            let (_local_channel1, local_channel2) = divide_stream(local_channel);
977            let (_remote_channel1, remote_channel2) = divide_stream(remote_channel);
978
979            test_stream(
980                local_channel2,
981                remote_channel2,
982                TestOptions::random()
983                    .with_data_len(test_data_size),
984            ).await;
985        }
986
987        #[rstest]
988        #[case(128)]
989        #[case(256)]
990        #[case(512)]
991        #[case(1_024)]
992        #[case(2_048)]
993        #[case(4_096)]
994        #[case(8_192)]
995        #[case(16_384)]
996        #[tokio::test]
997        async fn transfers_data_if_second_channel_dropped(
998            #[case] test_data_size: usize,
999        ) {
1000            let (local_channel, remote_channel) = channel_mock_pair(
1001                ChannelMockOptions::random().with_label("local"),
1002                ChannelMockOptions::random().with_label("remote"),
1003            );
1004
1005            let (local_channel1, local_channel2) = divide_stream(local_channel);
1006            let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1007
1008            drop(local_channel2);
1009            drop(remote_channel2);
1010
1011            test_stream(
1012                local_channel1,
1013                remote_channel1,
1014                TestOptions::random()
1015                    .with_data_len(test_data_size),
1016            ).await;
1017        }
1018
1019        #[rstest]
1020        #[case(128)]
1021        #[case(256)]
1022        #[case(512)]
1023        #[case(1_024)]
1024        #[case(2_048)]
1025        #[case(4_096)]
1026        #[case(8_192)]
1027        #[case(16_384)]
1028        #[tokio::test]
1029        async fn transfers_data_if_first_channel_dropped(
1030            #[case] test_data_size: usize,
1031        ) {
1032            let (local_channel, remote_channel) = channel_mock_pair(
1033                ChannelMockOptions::random().with_label("local"),
1034                ChannelMockOptions::random().with_label("remote"),
1035            );
1036
1037            let (local_channel1, local_channel2) = divide_stream(local_channel);
1038            let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1039
1040            drop(local_channel1);
1041            drop(remote_channel1);
1042
1043            test_stream(
1044                local_channel2,
1045                remote_channel2,
1046                TestOptions::random()
1047                    .with_data_len(test_data_size),
1048            ).await;
1049        }
1050
1051        #[rstest]
1052        #[case(128)]
1053        #[case(256)]
1054        #[case(512)]
1055        #[case(1_024)]
1056        #[case(2_048)]
1057        #[case(4_096)]
1058        #[case(8_192)]
1059        #[case(16_384)]
1060        #[tokio::test]
1061        async fn transfers_data_if_second_channel_shut_down(
1062            #[case] test_data_size: usize,
1063        ) {
1064            let (local_channel, remote_channel) = channel_mock_pair(
1065                ChannelMockOptions::random().with_label("local"),
1066                ChannelMockOptions::random().with_label("remote"),
1067            );
1068
1069            let (local_channel1, mut local_channel2) = divide_stream(local_channel);
1070            let (remote_channel1, mut remote_channel2) = divide_stream(remote_channel);
1071
1072            local_channel2.shutdown().await.unwrap();
1073            remote_channel2.shutdown().await.unwrap();
1074
1075            test_stream(
1076                local_channel1,
1077                remote_channel1,
1078                TestOptions::random()
1079                    .with_data_len(test_data_size),
1080            ).await;
1081        }
1082
1083        #[rstest]
1084        #[case(512)]
1085        #[case(1_024)]
1086        #[case(2_048)]
1087        #[case(4_096)]
1088        #[case(8_192)]
1089        #[case(16_384)]
1090        #[case(32_768)]
1091        #[tokio::test]
1092        async fn transfers_data_if_second_channel_shut_down_during_transfer(
1093            #[case] test_data_size: usize,
1094        ) {
1095            let (local_channel, remote_channel) = channel_mock_pair(
1096                ChannelMockOptions::random().with_label("local"),
1097                ChannelMockOptions::random().with_label("remote"),
1098            );
1099
1100            let (local_channel1, local_channel2) = divide_stream(local_channel);
1101            let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1102
1103            tokio::join!(
1104                Box::pin(async move {
1105                    test_stream(
1106                        local_channel1,
1107                        remote_channel1,
1108                        TestOptions::random()
1109                            .with_data_len(test_data_size),
1110                    ).await;
1111                }),
1112                Box::pin(async move {
1113                    // transfer a bit of data
1114                    let (mut local_channel2, mut remote_channel2) = test_stream(
1115                        local_channel2,
1116                        remote_channel2,
1117                        TestOptions::random()
1118                            .with_data_len(128),
1119                    ).await;
1120
1121                    // shutdown the second channel
1122                    tokio::join!(
1123                        Box::pin(async move {
1124                            wait_random(1..5).await;
1125                            local_channel2.shutdown().await.unwrap();
1126                        }),
1127                        Box::pin(async move {
1128                            wait_random(1..5).await;
1129                            remote_channel2.shutdown().await.unwrap();
1130                        }),
1131                    );
1132                }),
1133            );
1134        }
1135
1136        #[rstest]
1137        #[case(128)]
1138        #[case(256)]
1139        #[case(512)]
1140        #[case(1_024)]
1141        #[case(2_048)]
1142        #[case(4_096)]
1143        #[case(8_192)]
1144        #[case(16_384)]
1145        #[case(32_768)]
1146        #[tokio::test]
1147        async fn transfers_data_if_first_channel_shut_down(
1148            #[case] test_data_size: usize,
1149        ) {
1150            let (local_channel, remote_channel) = channel_mock_pair(
1151                ChannelMockOptions::random().with_label("local"),
1152                ChannelMockOptions::random().with_label("remote"),
1153            );
1154
1155            let (mut local_channel1, local_channel2) = divide_stream(local_channel);
1156            let (mut remote_channel1, remote_channel2) = divide_stream(remote_channel);
1157
1158            local_channel1.shutdown().await.unwrap();
1159            remote_channel1.shutdown().await.unwrap();
1160
1161            test_stream(
1162                local_channel2,
1163                remote_channel2,
1164                TestOptions::random()
1165                    .with_data_len(test_data_size),
1166            ).await;
1167        }
1168
1169        #[rstest]
1170        #[case(512)]
1171        #[case(1_024)]
1172        #[case(2_048)]
1173        #[case(4_096)]
1174        #[case(8_192)]
1175        #[case(16_384)]
1176        #[case(32_768)]
1177        #[tokio::test]
1178        async fn transfers_data_if_first_channel_shut_down_during_transfer(
1179            #[case] test_data_size: usize,
1180        ) {
1181            let (local_channel, remote_channel) = channel_mock_pair(
1182                ChannelMockOptions::random().with_label("local"),
1183                ChannelMockOptions::random().with_label("remote"),
1184            );
1185
1186            let (local_channel1, local_channel2) = divide_stream(local_channel);
1187            let (remote_channel1,remote_channel2) = divide_stream(remote_channel);
1188
1189            tokio::join!(
1190                Box::pin(async move {
1191                    // transfer a bit of data
1192                    let (mut local_channel1, mut remote_channel1) = test_stream(
1193                        local_channel1,
1194                        remote_channel1,
1195                        TestOptions::random()
1196                            .with_data_len(128),
1197                    ).await;
1198
1199                    // shutdown the first channel
1200                    tokio::join!(
1201                        Box::pin(async move {
1202                            wait_random(1..5).await;
1203                            local_channel1.shutdown().await.unwrap();
1204                        }),
1205                        Box::pin(async move {
1206                            wait_random(1..5).await;
1207                            remote_channel1.shutdown().await.unwrap();
1208                        }),
1209                    );
1210                }),
1211                Box::pin(async move {
1212                    test_stream(
1213                        local_channel2,
1214                        remote_channel2,
1215                        TestOptions::random()
1216                            .with_data_len(test_data_size),
1217                    ).await;
1218                }),
1219            );
1220        }
1221
1222        mod shutdown {
1223            use super::*;
1224
1225            #[rstest]
1226            #[case(128)]
1227            #[case(256)]
1228            #[case(512)]
1229            #[case(1_024)]
1230            #[case(2_048)]
1231            #[case(4_096)]
1232            #[case(8_192)]
1233            #[case(16_384)]
1234            #[tokio::test]
1235            async fn first_channel_closes_channel_if_remote_counterpart_is_closed(
1236                #[case] test_data_size: usize,
1237            ) {
1238                let (local_channel, remote_channel) = channel_mock_pair(
1239                    ChannelMockOptions::random().with_label("local"),
1240                    ChannelMockOptions::random().with_label("remote"),
1241                );
1242
1243                let (local_channel1, local_channel2) = divide_stream(local_channel);
1244                let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1245
1246                let (
1247                    mut local_channel1,
1248                    mut remote_channel1,
1249                ) = test_stream(
1250                    local_channel1,
1251                    remote_channel1,
1252                    TestOptions::random()
1253                        .with_data_len(test_data_size),
1254                ).await;
1255
1256                tokio::join!(
1257                    // test that the second channel is not affected by shutting down of the first one
1258                    Box::pin(async move {
1259                        wait_random(1..=10).await;
1260
1261                        return test_stream(
1262                            local_channel2,
1263                            remote_channel2,
1264                            TestOptions::random()
1265                                .with_data_len(2 * test_data_size),
1266                        ).await
1267                    }),
1268                    // transfer data in the first channel and then shut it down
1269                    // after shutting down, test that the remote counterpart receives `EOF` message
1270                    Box::pin(async move {
1271                        tokio::join!(
1272                            Box::pin(async move {
1273                                wait_random(5..=25).await;
1274                
1275                                local_channel1.shutdown().await.unwrap();
1276
1277                                assert!(
1278                                    local_channel1.write("test".as_bytes()).await.is_err(),
1279                                    "Must fail to write to closed local channel.",
1280                                );
1281
1282                                assert!(
1283                                    local_channel1.is_closed(),
1284                                    "Channel must be closed.",
1285                                );
1286                            }),
1287                            Box::pin(async move {
1288                                let mut buf = [0; 4096];
1289                
1290                                let bytes_read = remote_channel1.read(&mut buf)
1291                                    .await.unwrap();
1292                
1293                                assert_eq!(
1294                                    bytes_read,
1295                                    0,
1296                                    "Must be 0 bytes read if remote channel is closed.",
1297                                );
1298
1299                                assert!(
1300                                    remote_channel1.is_closed(),
1301                                    "Channel must be closed.",
1302                                );
1303                            }),
1304                        );
1305                    }),
1306                );
1307            }
1308
1309            #[rstest]
1310            #[case(128)]
1311            #[case(256)]
1312            #[case(512)]
1313            #[case(1_024)]
1314            #[case(2_048)]
1315            #[case(4_096)]
1316            #[case(8_192)]
1317            #[case(16_384)]
1318            #[tokio::test]
1319            async fn second_channel_closes_channel_if_remote_counterpart_is_closed(
1320                #[case] test_data_size: usize,
1321            ) {
1322                let (local_channel, remote_channel) = channel_mock_pair(
1323                    ChannelMockOptions::random().with_label("local"),
1324                    ChannelMockOptions::random().with_label("remote"),
1325                );
1326
1327                let (local_channel1, local_channel2) = divide_stream(local_channel);
1328                let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1329
1330                let (
1331                    mut local_channel2,
1332                    mut remote_channel2,
1333                ) = test_stream(
1334                    local_channel2,
1335                    remote_channel2,
1336                    TestOptions::random()
1337                        .with_data_len(test_data_size),
1338                ).await;
1339
1340                tokio::join!(
1341                    // test that the first channel is not affected by shutting down of the first one
1342                    Box::pin(async move {
1343                        wait_random(1..=10).await;
1344
1345                        return test_stream(
1346                            local_channel1,
1347                            remote_channel1,
1348                            TestOptions::random()
1349                                .with_data_len(2 * test_data_size),
1350                        ).await
1351                    }),
1352                    // transfer data in the second channel and then shut it down
1353                    // after shutting down, test that the remote counterpart receives `EOF` message
1354                    Box::pin(async move {
1355                        tokio::join!(
1356                            Box::pin(async move {
1357                                wait_random(5..=25).await;
1358                
1359                                local_channel2.shutdown().await.unwrap();
1360
1361                                assert!(
1362                                    local_channel2.write("test".as_bytes()).await.is_err(),
1363                                    "Must fail to write to closed local channel.",
1364                                );
1365
1366                                assert!(
1367                                    local_channel2.is_closed(),
1368                                    "Channel must be closed.",
1369                                );
1370                            }),
1371                            Box::pin(async move {
1372                                let mut buf = [0; 4096];
1373                
1374                                let bytes_read = remote_channel2.read(&mut buf)
1375                                    .await.unwrap();
1376                
1377                                assert_eq!(
1378                                    bytes_read,
1379                                    0,
1380                                    "Must be 0 bytes read if remote channel is closed.",
1381                                );
1382
1383                                assert!(
1384                                    remote_channel2.is_closed(),
1385                                    "Channel must be closed.",
1386                                );
1387                            }),
1388                        );
1389                    }),
1390                );
1391            }
1392
1393            #[rstest]
1394            #[case(128)]
1395            #[case(256)]
1396            #[case(512)]
1397            #[case(1_024)]
1398            #[case(2_048)]
1399            #[case(4_096)]
1400            #[case(8_192)]
1401            #[case(16_384)]
1402            #[tokio::test]
1403            async fn both_channels_close_channel_if_remote_counterpart_is_closed(
1404                #[case] test_data_size: usize,
1405            ) {
1406                let (local_channel, remote_channel) = channel_mock_pair(
1407                    ChannelMockOptions::random().with_label("local"),
1408                    ChannelMockOptions::random().with_label("remote"),
1409                );
1410
1411                let (local_channel1, local_channel2) = divide_stream(local_channel);
1412                let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1413
1414                tokio::join!(
1415                    // transfer data in the first channel and then shut it down
1416                    // after shutting down, test that the remote counterpart receives `EOF` message
1417                    Box::pin(async move {
1418                        wait_random(1..=25).await;
1419
1420                        let (
1421                            mut local_channel1,
1422                            mut remote_channel1,
1423                        ) = test_stream(
1424                            local_channel1,
1425                            remote_channel1,
1426                            TestOptions::random()
1427                                .with_data_len(test_data_size),
1428                        ).await;
1429
1430                        tokio::join!(
1431                            Box::pin(async move {
1432                                wait_random(1..=25).await;
1433                
1434                                local_channel1.shutdown().await.unwrap();
1435
1436                                assert!(
1437                                    local_channel1.write(cs_utils::random_str_rg(16..64).as_bytes()).await.is_err(),
1438                                    "Must fail to write to closed local channel.",
1439                                );
1440
1441                                assert!(
1442                                    local_channel1.write("test".as_bytes()).await.is_err(),
1443                                    "Must fail to write to closed local channel.",
1444                                );
1445
1446                                assert!(
1447                                    local_channel1.is_closed(),
1448                                    "Channel must be closed.",
1449                                );
1450                            }),
1451                            Box::pin(async move {
1452                                let mut buf = [0; 4096];
1453                
1454                                let bytes_read = remote_channel1.read(&mut buf)
1455                                    .await.unwrap();
1456                
1457                                assert_eq!(
1458                                    bytes_read,
1459                                    0,
1460                                    "Must be 0 bytes read if remote channel is closed.",
1461                                );
1462
1463                                assert!(
1464                                    remote_channel1.is_closed(),
1465                                    "Channel must be closed.",
1466                                );
1467                            }),
1468                        );
1469                    }),
1470                    // transfer data in the second channel and then shut it down
1471                    // after shutting down, test that the remote counterpart receives `EOF` message
1472                    Box::pin(async move {
1473                        wait_random(1..=25).await;
1474
1475                        let (
1476                            mut local_channel2,
1477                            mut remote_channel2,
1478                        ) = test_stream(
1479                            local_channel2,
1480                            remote_channel2,
1481                            TestOptions::random()
1482                                .with_data_len(test_data_size),
1483                        ).await;
1484
1485                        tokio::join!(
1486                            Box::pin(async move {
1487                                wait_random(1..=25).await;
1488                
1489                                local_channel2.shutdown().await.unwrap();
1490
1491                                assert!(
1492                                    local_channel2.write("test".as_bytes()).await.is_err(),
1493                                    "Must fail to write to closed local channel.",
1494                                );
1495
1496                                assert!(
1497                                    local_channel2.is_closed(),
1498                                    "Channel must be closed.",
1499                                );
1500                            }),
1501                            Box::pin(async move {
1502                                let mut buf = [0; 4096];
1503                
1504                                let bytes_read = remote_channel2.read(&mut buf)
1505                                    .await.unwrap();
1506                
1507                                assert_eq!(
1508                                    bytes_read,
1509                                    0,
1510                                    "Must be 0 bytes read if remote channel is closed.",
1511                                );
1512
1513                                assert!(
1514                                    remote_channel2.is_closed(),
1515                                    "Channel must be closed.",
1516                                );
1517                            }),
1518                        );
1519                    }),
1520                );
1521            }
1522        }
1523    }
1524}