connection_utils/utils/
divide_into_framed_streams.rs1use std::pin::Pin;
2
3use tokio_util::codec::Framed;
4use tokio::io::{AsyncRead, AsyncWrite};
5use serde::{Serialize, de::DeserializeOwned};
6
7use crate::{divide_stream, Channel, codecs::GenericCodec, types::TFramedAsyncDuplex};
8
9pub fn divide_into_framed_streams<
11 T: Serialize + DeserializeOwned,
12 TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static,
13>(
14 stream: Box<TAsyncDuplex>,
15) -> (TFramedAsyncDuplex<T, dyn Channel>, TFramedAsyncDuplex<T, dyn Channel>) {
16 let (channel1, channel2) = divide_stream(stream);
17
18 let channel1 = Framed::new(
19 Pin::new(channel1),
20 GenericCodec::<T>::new(),
21 );
22
23 let channel2 = Framed::new(
24 Pin::new(channel2),
25 GenericCodec::<T>::new(),
26 );
27
28 return (channel1, channel2);
29}
30
31#[cfg(test)]
32mod tests {
33 use rstest::rstest;
34 use cs_utils::{traits::Random, futures::wait_random};
35
36 use crate::{test::{TestStreamMessage, test_framed_stream, TestOptions}, mocks::{channel_mock_pair, ChannelMockOptions}};
37
38 use super::divide_into_framed_streams;
39
40 #[rstest]
41 #[case(64)]
42 #[case(128)]
43 #[case(256)]
44 #[case(512)]
45 #[tokio::test]
46 async fn divides_stream_into_framed_channel(
47 #[case] data_len: usize,
48 ) {
49 let options1 = ChannelMockOptions::random();
50 let options2 = ChannelMockOptions::random();
51
52 let (transport1, transport2) = channel_mock_pair(options1, options2);
53
54 let (
55 local_channel1,
56 remote_channel1,
57 ) = divide_into_framed_streams::<TestStreamMessage, _>(Box::new(transport1));
58 let (
59 local_channel2,
60 remote_channel2,
61 ) = divide_into_framed_streams::<TestStreamMessage, _>(Box::new(transport2));
62
63 let options1 = TestOptions::random().with_data_len(data_len);
64 let options2 = TestOptions::random().with_data_len(data_len);
65
66 tokio::join!(
67 Box::pin(async move {
68 wait_random(0..=50).await;
69
70 test_framed_stream(local_channel1, local_channel2, options1).await;
71 }),
72 Box::pin(async move {
73 wait_random(0..=50).await;
74
75 test_framed_stream(remote_channel1, remote_channel2, options2).await;
76 }),
77 );
78 }
79}