1use std::{pin::Pin, sync::Arc};
2
3use anyhow::Result;
4use tokio_util::codec::Framed;
5use serde::{Serialize, Deserialize};
6use cs_utils::{random_str, random_number};
7use futures::{StreamExt, stream::{SplitStream, SplitSink}, SinkExt};
8use tokio::{io::{duplex, WriteHalf, ReadHalf, AsyncReadExt, AsyncWriteExt, split, AsyncRead, AsyncWrite}, sync::{Mutex, watch::Receiver}};
9
10use crate::{codecs::GenericCodec, Channel, TransportChannel, create_framed_stream};
11
12#[derive(Serialize, Deserialize, Debug)]
13pub enum LayerMessage {
14 Data(u8, Vec<u8>),
15 Close(u8),
16}
17
18async fn forward_reads<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
19 on_channel1_closed: Receiver<bool>,
20 on_channel2_closed: Receiver<bool>,
21 mut channel: SplitStream<Framed<Pin<Box<TAsyncDuplex>>, GenericCodec<LayerMessage>>>,
22 mut child1: WriteHalf<Pin<Box<dyn Channel>>>,
23 mut child2: WriteHalf<Pin<Box<dyn Channel>>>,
24)-> Result<()> {
25 let mut child1 = Pin::new(&mut child1);
26 let mut child2 = Pin::new(&mut child2);
27
28 loop {
29 let message = match channel.next().await {
30 Some(item) => item?,
31 None => {
32 return Ok(());
35 },
36 };
37
38 match message {
39 LayerMessage::Data(id, data) => {
40 let child_channel = match id {
41 0 => &mut child1,
42 1 => &mut child2,
43 _ => panic!("Unknown channel number: {}.", id),
44 };
45
46 let on_channel_closed = match id {
47 0 => &on_channel1_closed,
48 1 => &on_channel2_closed,
49 _ => panic!("Unknown channel number: {}.", id),
50 };
51
52 if data.len() == 0 && *on_channel_closed.borrow() {
53 continue;
54 }
55
56 let result = child_channel.write_all(&data[..]).await;
57
58 if result.is_err() {
59 }
61 },
62 LayerMessage::Close(id) => {
63 let child_channel = match id {
64 0 => &mut child1,
65 1 => &mut child2,
66 _ => panic!("Unknown channel number: {}.", id),
67 };
68
69 child_channel.shutdown().await?;
70 },
71 };
72 }
73}
74
75async fn read_channel<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
76 on_channel_closed: Receiver<bool>,
77 channel_number: u8,
78 mut channel: ReadHalf<Pin<Box<dyn Channel>>>,
79 target_channel: Arc<Mutex<SplitSink<Framed<Pin<Box<TAsyncDuplex>>, GenericCodec<LayerMessage>>, LayerMessage>>>,
80 _buffer_size: u32,
81) -> Result<()> {
82 let mut buf = [0; 4_096];
83
84 loop {
85 let bytes_read = channel.read(&mut buf).await?;
86 let data = &buf[..bytes_read];
87
88 let is_closed = *on_channel_closed.borrow();
89
90 {
91 let mut lock = target_channel.lock().await;
92
93 if data.len() == 0 && is_closed {
94 let _result = lock.send(LayerMessage::Close(channel_number)).await;
95
96 return Ok(());
97 }
98
99 lock.send(LayerMessage::Data(channel_number, data.to_vec())).await?;
101 }
102 }
103}
104
105async fn forward_writes<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
106 on_channel1_closed: Receiver<bool>,
107 on_channel2_closed: Receiver<bool>,
108 channel: SplitSink<Framed<Pin<Box<TAsyncDuplex>>, GenericCodec<LayerMessage>>, LayerMessage>,
109 child1: ReadHalf<Pin<Box<dyn Channel>>>,
110 child2: ReadHalf<Pin<Box<dyn Channel>>>,
111 buffer_size: u32,
112) -> Result<()> {
113 let channel = Arc::new(Mutex::new(channel));
114
115 tokio::try_join!(
116 Box::pin(read_channel(on_channel1_closed, 0, child1, Arc::clone(&channel), buffer_size)),
117 Box::pin(read_channel(on_channel2_closed, 1, child2, Arc::clone(&channel), buffer_size)),
118 )?;
119
120 return Ok(());
121}
122
123async fn forward<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
124 channel: Box<TAsyncDuplex>,
125 child1: Box<dyn Channel>,
126 child2: Box<dyn Channel>,
127 buffer_size: u32,
128) {
129 let (
130 sink,
131 source,
132 ) = create_framed_stream::<LayerMessage, _>(channel).split();
133
134 let on_channel1_closed1 = child1.on_close();
135 let on_channel2_closed1 = child2.on_close();
136
137 let on_channel1_closed2 = child1.on_close();
138 let on_channel2_closed2 = child2.on_close();
139
140 let (child1_read, child1_write) = split(Pin::new(child1));
141 let (child2_read, child2_write) = split(Pin::new(child2));
142
143 tokio::join!(
144 Box::pin(async move {
145 match forward_reads(on_channel1_closed1, on_channel2_closed1, source, child1_write, child2_write).await {
146 Ok(_) => {
147 },
149 Err(_error) => {
150 },
154 };
155 }),
156 Box::pin(async move {
157 match forward_writes(on_channel1_closed2, on_channel2_closed2, sink, child1_read, child2_read, buffer_size).await {
158 Ok(_) => {
159 },
161 Err(_error) => {
162 },
166 };
167 }),
168 );
169}
170
171fn divide_impl<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
172 id: u16,
173 label: String,
174 buffer_size: u32,
175 stream: Box<TAsyncDuplex>,
176) -> (Box<dyn Channel>, Box<dyn Channel>) {
177 let (child1_sink, child1_source) = duplex(buffer_size as usize);
178 let (child2_sink, child2_source) = duplex(buffer_size as usize);
179
180 let (
181 child_channel1,
182 child_channel2,
183 ) = TransportChannel::new_pair(
184 id,
185 format!("{}-1", label),
186 (Box::new(child1_sink), Box::new(child1_source)),
187 buffer_size,
188 );
189
190 let (
191 forward_channel1,
192 forward_channel2,
193 ) = TransportChannel::new_pair(
194 id,
195 format!("{}-2", label),
196 (Box::new(child2_sink), Box::new(child2_source)),
197 buffer_size,
198 );
199
200 tokio::spawn(
201 forward(
202 stream,
203 child_channel1,
204 forward_channel1,
205 buffer_size,
206 ),
207 );
208
209 return (child_channel2, forward_channel2);
210}
211
212pub fn divide_channel(
214 channel: Box<dyn Channel>,
215) -> (Box<dyn Channel>, Box<dyn Channel>) {
216 let id = channel.id();
217 let label = channel.label().to_string();
218 let buffer_size = channel.buffer_size();
219
220 return divide_impl(id, label, buffer_size, channel);
221}
222
223pub fn divide_stream<TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static>(
226 stream: Box<TAsyncDuplex>,
227) -> (Box<dyn Channel>, Box<dyn Channel>) {
228 let id = random_number(0..=u16::MAX);
229 let label = format!("stream-{}", random_str(4));
230
231 return divide_impl(id, label, 4_096, stream);
232}
233
234#[cfg(test)]
235mod tests {
236 use rstest::rstest;
237 use tokio::io::{AsyncReadExt, AsyncWriteExt};
238 use cs_utils::{futures::wait_random, traits::Random};
239
240 use crate::test::{test_stream, TestOptions};
241 use crate::mocks::{channel_mock_pair, ChannelMockOptions};
242
243 use super::{divide_channel, divide_stream};
244
245 mod divide_channel {
246 use super::*;
247
248 #[rstest]
249 #[case(128)]
250 #[case(256)]
251 #[case(512)]
252 #[case(1_024)]
253 #[case(2_048)]
254 #[case(4_096)]
255 #[case(8_192)]
256 #[case(16_384)]
257 #[tokio::test]
258 async fn transfers_data(
259 #[case] test_data_size: usize,
260 ) {
261 let (local_channel, remote_channel) = channel_mock_pair(
262 ChannelMockOptions::random().with_label("local"),
263 ChannelMockOptions::random().with_label("remote"),
264 );
265
266 let (local_channel1, local_channel2) = divide_channel(local_channel);
267 let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
268
269 tokio::try_join!(
270 tokio::spawn(async move {
271 return test_stream(
272 local_channel1,
273 remote_channel1,
274 TestOptions::random()
275 .with_data_len(test_data_size),
276 ).await;
277 }),
278 tokio::spawn(async move {
279 return test_stream(
280 local_channel2,
281 remote_channel2,
282 TestOptions::random()
283 .with_data_len(test_data_size),
284 ).await;
285 }),
286 ).unwrap();
287 }
288
289 #[rstest]
290 #[case(128)]
291 #[case(256)]
292 #[case(512)]
293 #[case(1_024)]
294 #[case(2_048)]
295 #[case(4_096)]
296 #[case(8_192)]
297 #[case(16_384)]
298 #[tokio::test]
299 async fn transfers_data_if_second_channel_is_not_used(
300 #[case] test_data_size: usize,
301 ) {
302 let (local_channel, remote_channel) = channel_mock_pair(
303 ChannelMockOptions::random().with_label("local"),
304 ChannelMockOptions::random().with_label("remote"),
305 );
306
307 let (local_channel1, _local_channel2) = divide_channel(local_channel);
308 let (remote_channel1, _remote_channel2) = divide_channel(remote_channel);
309
310 test_stream(
311 local_channel1,
312 remote_channel1,
313 TestOptions::random()
314 .with_data_len(test_data_size),
315 ).await;
316 }
317
318 #[rstest]
319 #[case(128)]
320 #[case(256)]
321 #[case(512)]
322 #[case(1_024)]
323 #[case(2_048)]
324 #[case(4_096)]
325 #[case(8_192)]
326 #[case(16_384)]
327 #[tokio::test]
328 async fn transfers_data_if_first_channel_is_not_used(
329 #[case] test_data_size: usize,
330 ) {
331 let (local_channel, remote_channel) = channel_mock_pair(
332 ChannelMockOptions::random().with_label("local"),
333 ChannelMockOptions::random().with_label("remote"),
334 );
335
336 let (_local_channel1, local_channel2) = divide_channel(local_channel);
337 let (_remote_channel1, remote_channel2) = divide_channel(remote_channel);
338
339 test_stream(
340 local_channel2,
341 remote_channel2,
342 TestOptions::random()
343 .with_data_len(test_data_size),
344 ).await;
345 }
346
347 #[rstest]
348 #[case(128)]
349 #[case(256)]
350 #[case(512)]
351 #[case(1_024)]
352 #[case(2_048)]
353 #[case(4_096)]
354 #[case(8_192)]
355 #[case(16_384)]
356 #[tokio::test]
357 async fn transfers_data_if_second_channel_dropped(
358 #[case] test_data_size: usize,
359 ) {
360 let (local_channel, remote_channel) = channel_mock_pair(
361 ChannelMockOptions::random().with_label("local"),
362 ChannelMockOptions::random().with_label("remote"),
363 );
364
365 let (local_channel1, local_channel2) = divide_channel(local_channel);
366 let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
367
368 drop(local_channel2);
369 drop(remote_channel2);
370
371 test_stream(
372 local_channel1,
373 remote_channel1,
374 TestOptions::random()
375 .with_data_len(test_data_size),
376 ).await;
377 }
378
379 #[rstest]
380 #[case(128)]
381 #[case(256)]
382 #[case(512)]
383 #[case(1_024)]
384 #[case(2_048)]
385 #[case(4_096)]
386 #[case(8_192)]
387 #[case(16_384)]
388 #[tokio::test]
389 async fn transfers_data_if_first_channel_dropped(
390 #[case] test_data_size: usize,
391 ) {
392 let (local_channel, remote_channel) = channel_mock_pair(
393 ChannelMockOptions::random().with_label("local"),
394 ChannelMockOptions::random().with_label("remote"),
395 );
396
397 let (local_channel1, local_channel2) = divide_channel(local_channel);
398 let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
399
400 drop(local_channel1);
401 drop(remote_channel1);
402
403 test_stream(
404 local_channel2,
405 remote_channel2,
406 TestOptions::random()
407 .with_data_len(test_data_size),
408 ).await;
409 }
410
411 #[rstest]
412 #[case(128)]
413 #[case(256)]
414 #[case(512)]
415 #[case(1_024)]
416 #[case(2_048)]
417 #[case(4_096)]
418 #[case(8_192)]
419 #[case(16_384)]
420 #[tokio::test]
421 async fn transfers_data_if_second_channel_shut_down(
422 #[case] test_data_size: usize,
423 ) {
424 let (local_channel, remote_channel) = channel_mock_pair(
425 ChannelMockOptions::random().with_label("local"),
426 ChannelMockOptions::random().with_label("remote"),
427 );
428
429 let (local_channel1, mut local_channel2) = divide_channel(local_channel);
430 let (remote_channel1, mut remote_channel2) = divide_channel(remote_channel);
431
432 local_channel2.shutdown().await.unwrap();
433 remote_channel2.shutdown().await.unwrap();
434
435 test_stream(
436 local_channel1,
437 remote_channel1,
438 TestOptions::random()
439 .with_data_len(test_data_size),
440 ).await;
441 }
442
443 #[rstest]
444 #[case(512)]
445 #[case(1_024)]
446 #[case(2_048)]
447 #[case(4_096)]
448 #[case(8_192)]
449 #[case(16_384)]
450 #[case(32_768)]
451 #[tokio::test]
452 async fn transfers_data_if_second_channel_shut_down_during_transfer(
453 #[case] test_data_size: usize,
454 ) {
455 let (local_channel, remote_channel) = channel_mock_pair(
456 ChannelMockOptions::random().with_label("local"),
457 ChannelMockOptions::random().with_label("remote"),
458 );
459
460 let (local_channel1, local_channel2) = divide_channel(local_channel);
461 let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
462
463 tokio::join!(
464 Box::pin(async move {
465 test_stream(
466 local_channel1,
467 remote_channel1,
468 TestOptions::random()
469 .with_data_len(test_data_size),
470 ).await;
471 }),
472 Box::pin(async move {
473 let (mut local_channel2, mut remote_channel2) = test_stream(
475 local_channel2,
476 remote_channel2,
477 TestOptions::random()
478 .with_data_len(128),
479 ).await;
480
481 tokio::join!(
483 Box::pin(async move {
484 wait_random(1..5).await;
485 local_channel2.shutdown().await.unwrap();
486 }),
487 Box::pin(async move {
488 wait_random(1..5).await;
489 remote_channel2.shutdown().await.unwrap();
490 }),
491 );
492 }),
493 );
494 }
495
496 #[rstest]
497 #[case(128)]
498 #[case(256)]
499 #[case(512)]
500 #[case(1_024)]
501 #[case(2_048)]
502 #[case(4_096)]
503 #[case(8_192)]
504 #[case(16_384)]
505 #[case(32_768)]
506 #[tokio::test]
507 async fn transfers_data_if_first_channel_shut_down(
508 #[case] test_data_size: usize,
509 ) {
510 let (local_channel, remote_channel) = channel_mock_pair(
511 ChannelMockOptions::random().with_label("local"),
512 ChannelMockOptions::random().with_label("remote"),
513 );
514
515 let (mut local_channel1, local_channel2) = divide_channel(local_channel);
516 let (mut remote_channel1, remote_channel2) = divide_channel(remote_channel);
517
518 local_channel1.shutdown().await.unwrap();
519 remote_channel1.shutdown().await.unwrap();
520
521 test_stream(
522 local_channel2,
523 remote_channel2,
524 TestOptions::random()
525 .with_data_len(test_data_size),
526 ).await;
527 }
528
529 #[rstest]
530 #[case(512)]
531 #[case(1_024)]
532 #[case(2_048)]
533 #[case(4_096)]
534 #[case(8_192)]
535 #[case(16_384)]
536 #[case(32_768)]
537 #[tokio::test]
538 async fn transfers_data_if_first_channel_shut_down_during_transfer(
539 #[case] test_data_size: usize,
540 ) {
541 let (local_channel, remote_channel) = channel_mock_pair(
542 ChannelMockOptions::random().with_label("local"),
543 ChannelMockOptions::random().with_label("remote"),
544 );
545
546 let (local_channel1, local_channel2) = divide_channel(local_channel);
547 let (remote_channel1,remote_channel2) = divide_channel(remote_channel);
548
549 tokio::join!(
550 Box::pin(async move {
551 let (mut local_channel1, mut remote_channel1) = test_stream(
553 local_channel1,
554 remote_channel1,
555 TestOptions::random()
556 .with_data_len(128),
557 ).await;
558
559 tokio::join!(
561 Box::pin(async move {
562 wait_random(1..5).await;
563 local_channel1.shutdown().await.unwrap();
564 }),
565 Box::pin(async move {
566 wait_random(1..5).await;
567 remote_channel1.shutdown().await.unwrap();
568 }),
569 );
570 }),
571 Box::pin(async move {
572 test_stream(
573 local_channel2,
574 remote_channel2,
575 TestOptions::random()
576 .with_data_len(test_data_size),
577 ).await;
578 }),
579 );
580 }
581
582 mod shutdown {
583 use super::*;
584
585 #[rstest]
586 #[case(128)]
587 #[case(256)]
588 #[case(512)]
589 #[case(1_024)]
590 #[case(2_048)]
591 #[case(4_096)]
592 #[case(8_192)]
593 #[case(16_384)]
594 #[tokio::test]
595 async fn first_channel_closes_channel_if_remote_counterpart_is_closed(
596 #[case] test_data_size: usize,
597 ) {
598 let (local_channel, remote_channel) = channel_mock_pair(
599 ChannelMockOptions::random().with_label("local"),
600 ChannelMockOptions::random().with_label("remote"),
601 );
602
603 let (local_channel1, local_channel2) = divide_channel(local_channel);
604 let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
605
606 let (
607 mut local_channel1,
608 mut remote_channel1,
609 ) = test_stream(
610 local_channel1,
611 remote_channel1,
612 TestOptions::random()
613 .with_data_len(test_data_size),
614 ).await;
615
616 tokio::join!(
617 Box::pin(async move {
619 wait_random(1..=10).await;
620
621 return test_stream(
622 local_channel2,
623 remote_channel2,
624 TestOptions::random()
625 .with_data_len(2 * test_data_size),
626 ).await
627 }),
628 Box::pin(async move {
631 tokio::join!(
632 Box::pin(async move {
633 wait_random(5..=25).await;
634
635 local_channel1.shutdown().await.unwrap();
636
637 assert!(
638 local_channel1.write("test".as_bytes()).await.is_err(),
639 "Must fail to write to closed local channel.",
640 );
641
642 assert!(
643 local_channel1.is_closed(),
644 "Channel must be closed.",
645 );
646 }),
647 Box::pin(async move {
648 let mut buf = [0; 4096];
649
650 let bytes_read = remote_channel1.read(&mut buf)
651 .await.unwrap();
652
653 assert_eq!(
654 bytes_read,
655 0,
656 "Must be 0 bytes read if remote channel is closed.",
657 );
658
659 assert!(
660 remote_channel1.is_closed(),
661 "Channel must be closed.",
662 );
663 }),
664 );
665 }),
666 );
667 }
668
669 #[rstest]
670 #[case(128)]
671 #[case(256)]
672 #[case(512)]
673 #[case(1_024)]
674 #[case(2_048)]
675 #[case(4_096)]
676 #[case(8_192)]
677 #[case(16_384)]
678 #[tokio::test]
679 async fn second_channel_closes_channel_if_remote_counterpart_is_closed(
680 #[case] test_data_size: usize,
681 ) {
682 let (local_channel, remote_channel) = channel_mock_pair(
683 ChannelMockOptions::random().with_label("local"),
684 ChannelMockOptions::random().with_label("remote"),
685 );
686
687 let (local_channel1, local_channel2) = divide_channel(local_channel);
688 let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
689
690 let (
691 mut local_channel2,
692 mut remote_channel2,
693 ) = test_stream(
694 local_channel2,
695 remote_channel2,
696 TestOptions::random()
697 .with_data_len(test_data_size),
698 ).await;
699
700 tokio::join!(
701 Box::pin(async move {
703 wait_random(1..=10).await;
704
705 return test_stream(
706 local_channel1,
707 remote_channel1,
708 TestOptions::random()
709 .with_data_len(2 * test_data_size),
710 ).await
711 }),
712 Box::pin(async move {
715 tokio::join!(
716 Box::pin(async move {
717 wait_random(5..=25).await;
718
719 local_channel2.shutdown().await.unwrap();
720
721 assert!(
722 local_channel2.write("test".as_bytes()).await.is_err(),
723 "Must fail to write to closed local channel.",
724 );
725
726 assert!(
727 local_channel2.is_closed(),
728 "Channel must be closed.",
729 );
730 }),
731 Box::pin(async move {
732 let mut buf = [0; 4096];
733
734 let bytes_read = remote_channel2.read(&mut buf)
735 .await.unwrap();
736
737 assert_eq!(
738 bytes_read,
739 0,
740 "Must be 0 bytes read if remote channel is closed.",
741 );
742
743 assert!(
744 remote_channel2.is_closed(),
745 "Channel must be closed.",
746 );
747 }),
748 );
749 }),
750 );
751 }
752
753 #[rstest]
754 #[case(128)]
755 #[case(256)]
756 #[case(512)]
757 #[case(1_024)]
758 #[case(2_048)]
759 #[case(4_096)]
760 #[case(8_192)]
761 #[case(16_384)]
762 #[tokio::test]
763 async fn both_channels_close_channel_if_remote_counterpart_is_closed(
764 #[case] test_data_size: usize,
765 ) {
766 let (local_channel, remote_channel) = channel_mock_pair(
767 ChannelMockOptions::random().with_label("local"),
768 ChannelMockOptions::random().with_label("remote"),
769 );
770
771 let (local_channel1, local_channel2) = divide_channel(local_channel);
772 let (remote_channel1, remote_channel2) = divide_channel(remote_channel);
773
774 tokio::join!(
775 Box::pin(async move {
778 wait_random(1..=25).await;
779
780 let (
781 mut local_channel1,
782 mut remote_channel1,
783 ) = test_stream(
784 local_channel1,
785 remote_channel1,
786 TestOptions::random()
787 .with_data_len(test_data_size),
788 ).await;
789
790 tokio::join!(
791 Box::pin(async move {
792 wait_random(1..=25).await;
793
794 local_channel1.shutdown().await.unwrap();
795
796 assert!(
797 local_channel1.write(cs_utils::random_str_rg(16..64).as_bytes()).await.is_err(),
798 "Must fail to write to closed local channel.",
799 );
800
801 assert!(
802 local_channel1.write("test".as_bytes()).await.is_err(),
803 "Must fail to write to closed local channel.",
804 );
805
806 assert!(
807 local_channel1.is_closed(),
808 "Channel must be closed.",
809 );
810 }),
811 Box::pin(async move {
812 let mut buf = [0; 4096];
813
814 let bytes_read = remote_channel1.read(&mut buf)
815 .await.unwrap();
816
817 assert_eq!(
818 bytes_read,
819 0,
820 "Must be 0 bytes read if remote channel is closed.",
821 );
822
823 assert!(
824 remote_channel1.is_closed(),
825 "Channel must be closed.",
826 );
827 }),
828 );
829 }),
830 Box::pin(async move {
833 wait_random(1..=25).await;
834
835 let (
836 mut local_channel2,
837 mut remote_channel2,
838 ) = test_stream(
839 local_channel2,
840 remote_channel2,
841 TestOptions::random()
842 .with_data_len(test_data_size),
843 ).await;
844
845 tokio::join!(
846 Box::pin(async move {
847 wait_random(1..=25).await;
848
849 local_channel2.shutdown().await.unwrap();
850
851 assert!(
852 local_channel2.write("test".as_bytes()).await.is_err(),
853 "Must fail to write to closed local channel.",
854 );
855
856 assert!(
857 local_channel2.is_closed(),
858 "Channel must be closed.",
859 );
860 }),
861 Box::pin(async move {
862 let mut buf = [0; 4096];
863
864 let bytes_read = remote_channel2.read(&mut buf)
865 .await.unwrap();
866
867 assert_eq!(
868 bytes_read,
869 0,
870 "Must be 0 bytes read if remote channel is closed.",
871 );
872
873 assert!(
874 remote_channel2.is_closed(),
875 "Channel must be closed.",
876 );
877 }),
878 );
879 }),
880 );
881 }
882 }
883 }
884
885 mod divide_stream {
886 use super::*;
887
888 #[rstest]
889 #[case(128)]
890 #[case(256)]
891 #[case(512)]
892 #[case(1_024)]
893 #[case(2_048)]
894 #[case(4_096)]
895 #[case(8_192)]
896 #[case(16_384)]
897 #[tokio::test]
898 async fn transfers_data(
899 #[case] test_data_size: usize,
900 ) {
901 let (local_channel, remote_channel) = channel_mock_pair(
902 ChannelMockOptions::random().with_label("local"),
903 ChannelMockOptions::random().with_label("remote"),
904 );
905
906 let (local_channel1, local_channel2) = divide_stream(local_channel);
907 let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
908
909 tokio::try_join!(
910 tokio::spawn(async move {
911 return test_stream(
912 local_channel1,
913 remote_channel1,
914 TestOptions::random()
915 .with_data_len(test_data_size),
916 ).await;
917 }),
918 tokio::spawn(async move {
919 return test_stream(
920 local_channel2,
921 remote_channel2,
922 TestOptions::random()
923 .with_data_len(test_data_size),
924 ).await;
925 }),
926 ).unwrap();
927 }
928
929 #[rstest]
930 #[case(128)]
931 #[case(256)]
932 #[case(512)]
933 #[case(1_024)]
934 #[case(2_048)]
935 #[case(4_096)]
936 #[case(8_192)]
937 #[case(16_384)]
938 #[tokio::test]
939 async fn transfers_data_if_second_channel_is_not_used(
940 #[case] test_data_size: usize,
941 ) {
942 let (local_channel, remote_channel) = channel_mock_pair(
943 ChannelMockOptions::random().with_label("local"),
944 ChannelMockOptions::random().with_label("remote"),
945 );
946
947 let (local_channel1, _local_channel2) = divide_stream(local_channel);
948 let (remote_channel1, _remote_channel2) = divide_stream(remote_channel);
949
950 test_stream(
951 local_channel1,
952 remote_channel1,
953 TestOptions::random()
954 .with_data_len(test_data_size),
955 ).await;
956 }
957
958 #[rstest]
959 #[case(128)]
960 #[case(256)]
961 #[case(512)]
962 #[case(1_024)]
963 #[case(2_048)]
964 #[case(4_096)]
965 #[case(8_192)]
966 #[case(16_384)]
967 #[tokio::test]
968 async fn transfers_data_if_first_channel_is_not_used(
969 #[case] test_data_size: usize,
970 ) {
971 let (local_channel, remote_channel) = channel_mock_pair(
972 ChannelMockOptions::random().with_label("local"),
973 ChannelMockOptions::random().with_label("remote"),
974 );
975
976 let (_local_channel1, local_channel2) = divide_stream(local_channel);
977 let (_remote_channel1, remote_channel2) = divide_stream(remote_channel);
978
979 test_stream(
980 local_channel2,
981 remote_channel2,
982 TestOptions::random()
983 .with_data_len(test_data_size),
984 ).await;
985 }
986
987 #[rstest]
988 #[case(128)]
989 #[case(256)]
990 #[case(512)]
991 #[case(1_024)]
992 #[case(2_048)]
993 #[case(4_096)]
994 #[case(8_192)]
995 #[case(16_384)]
996 #[tokio::test]
997 async fn transfers_data_if_second_channel_dropped(
998 #[case] test_data_size: usize,
999 ) {
1000 let (local_channel, remote_channel) = channel_mock_pair(
1001 ChannelMockOptions::random().with_label("local"),
1002 ChannelMockOptions::random().with_label("remote"),
1003 );
1004
1005 let (local_channel1, local_channel2) = divide_stream(local_channel);
1006 let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1007
1008 drop(local_channel2);
1009 drop(remote_channel2);
1010
1011 test_stream(
1012 local_channel1,
1013 remote_channel1,
1014 TestOptions::random()
1015 .with_data_len(test_data_size),
1016 ).await;
1017 }
1018
1019 #[rstest]
1020 #[case(128)]
1021 #[case(256)]
1022 #[case(512)]
1023 #[case(1_024)]
1024 #[case(2_048)]
1025 #[case(4_096)]
1026 #[case(8_192)]
1027 #[case(16_384)]
1028 #[tokio::test]
1029 async fn transfers_data_if_first_channel_dropped(
1030 #[case] test_data_size: usize,
1031 ) {
1032 let (local_channel, remote_channel) = channel_mock_pair(
1033 ChannelMockOptions::random().with_label("local"),
1034 ChannelMockOptions::random().with_label("remote"),
1035 );
1036
1037 let (local_channel1, local_channel2) = divide_stream(local_channel);
1038 let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1039
1040 drop(local_channel1);
1041 drop(remote_channel1);
1042
1043 test_stream(
1044 local_channel2,
1045 remote_channel2,
1046 TestOptions::random()
1047 .with_data_len(test_data_size),
1048 ).await;
1049 }
1050
1051 #[rstest]
1052 #[case(128)]
1053 #[case(256)]
1054 #[case(512)]
1055 #[case(1_024)]
1056 #[case(2_048)]
1057 #[case(4_096)]
1058 #[case(8_192)]
1059 #[case(16_384)]
1060 #[tokio::test]
1061 async fn transfers_data_if_second_channel_shut_down(
1062 #[case] test_data_size: usize,
1063 ) {
1064 let (local_channel, remote_channel) = channel_mock_pair(
1065 ChannelMockOptions::random().with_label("local"),
1066 ChannelMockOptions::random().with_label("remote"),
1067 );
1068
1069 let (local_channel1, mut local_channel2) = divide_stream(local_channel);
1070 let (remote_channel1, mut remote_channel2) = divide_stream(remote_channel);
1071
1072 local_channel2.shutdown().await.unwrap();
1073 remote_channel2.shutdown().await.unwrap();
1074
1075 test_stream(
1076 local_channel1,
1077 remote_channel1,
1078 TestOptions::random()
1079 .with_data_len(test_data_size),
1080 ).await;
1081 }
1082
1083 #[rstest]
1084 #[case(512)]
1085 #[case(1_024)]
1086 #[case(2_048)]
1087 #[case(4_096)]
1088 #[case(8_192)]
1089 #[case(16_384)]
1090 #[case(32_768)]
1091 #[tokio::test]
1092 async fn transfers_data_if_second_channel_shut_down_during_transfer(
1093 #[case] test_data_size: usize,
1094 ) {
1095 let (local_channel, remote_channel) = channel_mock_pair(
1096 ChannelMockOptions::random().with_label("local"),
1097 ChannelMockOptions::random().with_label("remote"),
1098 );
1099
1100 let (local_channel1, local_channel2) = divide_stream(local_channel);
1101 let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1102
1103 tokio::join!(
1104 Box::pin(async move {
1105 test_stream(
1106 local_channel1,
1107 remote_channel1,
1108 TestOptions::random()
1109 .with_data_len(test_data_size),
1110 ).await;
1111 }),
1112 Box::pin(async move {
1113 let (mut local_channel2, mut remote_channel2) = test_stream(
1115 local_channel2,
1116 remote_channel2,
1117 TestOptions::random()
1118 .with_data_len(128),
1119 ).await;
1120
1121 tokio::join!(
1123 Box::pin(async move {
1124 wait_random(1..5).await;
1125 local_channel2.shutdown().await.unwrap();
1126 }),
1127 Box::pin(async move {
1128 wait_random(1..5).await;
1129 remote_channel2.shutdown().await.unwrap();
1130 }),
1131 );
1132 }),
1133 );
1134 }
1135
1136 #[rstest]
1137 #[case(128)]
1138 #[case(256)]
1139 #[case(512)]
1140 #[case(1_024)]
1141 #[case(2_048)]
1142 #[case(4_096)]
1143 #[case(8_192)]
1144 #[case(16_384)]
1145 #[case(32_768)]
1146 #[tokio::test]
1147 async fn transfers_data_if_first_channel_shut_down(
1148 #[case] test_data_size: usize,
1149 ) {
1150 let (local_channel, remote_channel) = channel_mock_pair(
1151 ChannelMockOptions::random().with_label("local"),
1152 ChannelMockOptions::random().with_label("remote"),
1153 );
1154
1155 let (mut local_channel1, local_channel2) = divide_stream(local_channel);
1156 let (mut remote_channel1, remote_channel2) = divide_stream(remote_channel);
1157
1158 local_channel1.shutdown().await.unwrap();
1159 remote_channel1.shutdown().await.unwrap();
1160
1161 test_stream(
1162 local_channel2,
1163 remote_channel2,
1164 TestOptions::random()
1165 .with_data_len(test_data_size),
1166 ).await;
1167 }
1168
1169 #[rstest]
1170 #[case(512)]
1171 #[case(1_024)]
1172 #[case(2_048)]
1173 #[case(4_096)]
1174 #[case(8_192)]
1175 #[case(16_384)]
1176 #[case(32_768)]
1177 #[tokio::test]
1178 async fn transfers_data_if_first_channel_shut_down_during_transfer(
1179 #[case] test_data_size: usize,
1180 ) {
1181 let (local_channel, remote_channel) = channel_mock_pair(
1182 ChannelMockOptions::random().with_label("local"),
1183 ChannelMockOptions::random().with_label("remote"),
1184 );
1185
1186 let (local_channel1, local_channel2) = divide_stream(local_channel);
1187 let (remote_channel1,remote_channel2) = divide_stream(remote_channel);
1188
1189 tokio::join!(
1190 Box::pin(async move {
1191 let (mut local_channel1, mut remote_channel1) = test_stream(
1193 local_channel1,
1194 remote_channel1,
1195 TestOptions::random()
1196 .with_data_len(128),
1197 ).await;
1198
1199 tokio::join!(
1201 Box::pin(async move {
1202 wait_random(1..5).await;
1203 local_channel1.shutdown().await.unwrap();
1204 }),
1205 Box::pin(async move {
1206 wait_random(1..5).await;
1207 remote_channel1.shutdown().await.unwrap();
1208 }),
1209 );
1210 }),
1211 Box::pin(async move {
1212 test_stream(
1213 local_channel2,
1214 remote_channel2,
1215 TestOptions::random()
1216 .with_data_len(test_data_size),
1217 ).await;
1218 }),
1219 );
1220 }
1221
1222 mod shutdown {
1223 use super::*;
1224
1225 #[rstest]
1226 #[case(128)]
1227 #[case(256)]
1228 #[case(512)]
1229 #[case(1_024)]
1230 #[case(2_048)]
1231 #[case(4_096)]
1232 #[case(8_192)]
1233 #[case(16_384)]
1234 #[tokio::test]
1235 async fn first_channel_closes_channel_if_remote_counterpart_is_closed(
1236 #[case] test_data_size: usize,
1237 ) {
1238 let (local_channel, remote_channel) = channel_mock_pair(
1239 ChannelMockOptions::random().with_label("local"),
1240 ChannelMockOptions::random().with_label("remote"),
1241 );
1242
1243 let (local_channel1, local_channel2) = divide_stream(local_channel);
1244 let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1245
1246 let (
1247 mut local_channel1,
1248 mut remote_channel1,
1249 ) = test_stream(
1250 local_channel1,
1251 remote_channel1,
1252 TestOptions::random()
1253 .with_data_len(test_data_size),
1254 ).await;
1255
1256 tokio::join!(
1257 Box::pin(async move {
1259 wait_random(1..=10).await;
1260
1261 return test_stream(
1262 local_channel2,
1263 remote_channel2,
1264 TestOptions::random()
1265 .with_data_len(2 * test_data_size),
1266 ).await
1267 }),
1268 Box::pin(async move {
1271 tokio::join!(
1272 Box::pin(async move {
1273 wait_random(5..=25).await;
1274
1275 local_channel1.shutdown().await.unwrap();
1276
1277 assert!(
1278 local_channel1.write("test".as_bytes()).await.is_err(),
1279 "Must fail to write to closed local channel.",
1280 );
1281
1282 assert!(
1283 local_channel1.is_closed(),
1284 "Channel must be closed.",
1285 );
1286 }),
1287 Box::pin(async move {
1288 let mut buf = [0; 4096];
1289
1290 let bytes_read = remote_channel1.read(&mut buf)
1291 .await.unwrap();
1292
1293 assert_eq!(
1294 bytes_read,
1295 0,
1296 "Must be 0 bytes read if remote channel is closed.",
1297 );
1298
1299 assert!(
1300 remote_channel1.is_closed(),
1301 "Channel must be closed.",
1302 );
1303 }),
1304 );
1305 }),
1306 );
1307 }
1308
1309 #[rstest]
1310 #[case(128)]
1311 #[case(256)]
1312 #[case(512)]
1313 #[case(1_024)]
1314 #[case(2_048)]
1315 #[case(4_096)]
1316 #[case(8_192)]
1317 #[case(16_384)]
1318 #[tokio::test]
1319 async fn second_channel_closes_channel_if_remote_counterpart_is_closed(
1320 #[case] test_data_size: usize,
1321 ) {
1322 let (local_channel, remote_channel) = channel_mock_pair(
1323 ChannelMockOptions::random().with_label("local"),
1324 ChannelMockOptions::random().with_label("remote"),
1325 );
1326
1327 let (local_channel1, local_channel2) = divide_stream(local_channel);
1328 let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1329
1330 let (
1331 mut local_channel2,
1332 mut remote_channel2,
1333 ) = test_stream(
1334 local_channel2,
1335 remote_channel2,
1336 TestOptions::random()
1337 .with_data_len(test_data_size),
1338 ).await;
1339
1340 tokio::join!(
1341 Box::pin(async move {
1343 wait_random(1..=10).await;
1344
1345 return test_stream(
1346 local_channel1,
1347 remote_channel1,
1348 TestOptions::random()
1349 .with_data_len(2 * test_data_size),
1350 ).await
1351 }),
1352 Box::pin(async move {
1355 tokio::join!(
1356 Box::pin(async move {
1357 wait_random(5..=25).await;
1358
1359 local_channel2.shutdown().await.unwrap();
1360
1361 assert!(
1362 local_channel2.write("test".as_bytes()).await.is_err(),
1363 "Must fail to write to closed local channel.",
1364 );
1365
1366 assert!(
1367 local_channel2.is_closed(),
1368 "Channel must be closed.",
1369 );
1370 }),
1371 Box::pin(async move {
1372 let mut buf = [0; 4096];
1373
1374 let bytes_read = remote_channel2.read(&mut buf)
1375 .await.unwrap();
1376
1377 assert_eq!(
1378 bytes_read,
1379 0,
1380 "Must be 0 bytes read if remote channel is closed.",
1381 );
1382
1383 assert!(
1384 remote_channel2.is_closed(),
1385 "Channel must be closed.",
1386 );
1387 }),
1388 );
1389 }),
1390 );
1391 }
1392
1393 #[rstest]
1394 #[case(128)]
1395 #[case(256)]
1396 #[case(512)]
1397 #[case(1_024)]
1398 #[case(2_048)]
1399 #[case(4_096)]
1400 #[case(8_192)]
1401 #[case(16_384)]
1402 #[tokio::test]
1403 async fn both_channels_close_channel_if_remote_counterpart_is_closed(
1404 #[case] test_data_size: usize,
1405 ) {
1406 let (local_channel, remote_channel) = channel_mock_pair(
1407 ChannelMockOptions::random().with_label("local"),
1408 ChannelMockOptions::random().with_label("remote"),
1409 );
1410
1411 let (local_channel1, local_channel2) = divide_stream(local_channel);
1412 let (remote_channel1, remote_channel2) = divide_stream(remote_channel);
1413
1414 tokio::join!(
1415 Box::pin(async move {
1418 wait_random(1..=25).await;
1419
1420 let (
1421 mut local_channel1,
1422 mut remote_channel1,
1423 ) = test_stream(
1424 local_channel1,
1425 remote_channel1,
1426 TestOptions::random()
1427 .with_data_len(test_data_size),
1428 ).await;
1429
1430 tokio::join!(
1431 Box::pin(async move {
1432 wait_random(1..=25).await;
1433
1434 local_channel1.shutdown().await.unwrap();
1435
1436 assert!(
1437 local_channel1.write(cs_utils::random_str_rg(16..64).as_bytes()).await.is_err(),
1438 "Must fail to write to closed local channel.",
1439 );
1440
1441 assert!(
1442 local_channel1.write("test".as_bytes()).await.is_err(),
1443 "Must fail to write to closed local channel.",
1444 );
1445
1446 assert!(
1447 local_channel1.is_closed(),
1448 "Channel must be closed.",
1449 );
1450 }),
1451 Box::pin(async move {
1452 let mut buf = [0; 4096];
1453
1454 let bytes_read = remote_channel1.read(&mut buf)
1455 .await.unwrap();
1456
1457 assert_eq!(
1458 bytes_read,
1459 0,
1460 "Must be 0 bytes read if remote channel is closed.",
1461 );
1462
1463 assert!(
1464 remote_channel1.is_closed(),
1465 "Channel must be closed.",
1466 );
1467 }),
1468 );
1469 }),
1470 Box::pin(async move {
1473 wait_random(1..=25).await;
1474
1475 let (
1476 mut local_channel2,
1477 mut remote_channel2,
1478 ) = test_stream(
1479 local_channel2,
1480 remote_channel2,
1481 TestOptions::random()
1482 .with_data_len(test_data_size),
1483 ).await;
1484
1485 tokio::join!(
1486 Box::pin(async move {
1487 wait_random(1..=25).await;
1488
1489 local_channel2.shutdown().await.unwrap();
1490
1491 assert!(
1492 local_channel2.write("test".as_bytes()).await.is_err(),
1493 "Must fail to write to closed local channel.",
1494 );
1495
1496 assert!(
1497 local_channel2.is_closed(),
1498 "Channel must be closed.",
1499 );
1500 }),
1501 Box::pin(async move {
1502 let mut buf = [0; 4096];
1503
1504 let bytes_read = remote_channel2.read(&mut buf)
1505 .await.unwrap();
1506
1507 assert_eq!(
1508 bytes_read,
1509 0,
1510 "Must be 0 bytes read if remote channel is closed.",
1511 );
1512
1513 assert!(
1514 remote_channel2.is_closed(),
1515 "Channel must be closed.",
1516 );
1517 }),
1518 );
1519 }),
1520 );
1521 }
1522 }
1523 }
1524}