connection_utils/utils/
divide_into_framed_streams.rs

1use std::pin::Pin;
2
3use tokio_util::codec::Framed;
4use tokio::io::{AsyncRead, AsyncWrite};
5use serde::{Serialize, de::DeserializeOwned};
6
7use crate::{divide_stream, Channel, codecs::GenericCodec, types::TFramedAsyncDuplex};
8
9/// Creates a new framed stream out of a channel.
10pub fn divide_into_framed_streams<
11    T: Serialize + DeserializeOwned,
12    TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static,
13>(
14    stream: Box<TAsyncDuplex>,
15) -> (TFramedAsyncDuplex<T, dyn Channel>, TFramedAsyncDuplex<T, dyn Channel>) {
16    let (channel1, channel2) = divide_stream(stream);
17
18    let channel1 = Framed::new(
19        Pin::new(channel1),
20        GenericCodec::<T>::new(),
21    );
22
23    let channel2 = Framed::new(
24        Pin::new(channel2),
25        GenericCodec::<T>::new(),
26    );
27
28    return (channel1, channel2);
29}
30
31#[cfg(test)]
32mod tests {
33    use rstest::rstest;
34    use cs_utils::{traits::Random, futures::wait_random};
35
36    use crate::{test::{TestStreamMessage, test_framed_stream, TestOptions}, mocks::{channel_mock_pair, ChannelMockOptions}};
37
38    use super::divide_into_framed_streams;
39
40    #[rstest]
41    #[case(64)]
42    #[case(128)]
43    #[case(256)]
44    #[case(512)]
45    #[tokio::test]
46    async fn divides_stream_into_framed_channel(
47        #[case] data_len: usize,
48    ) {
49        let options1 = ChannelMockOptions::random();
50        let options2 = ChannelMockOptions::random();
51
52        let (transport1, transport2) = channel_mock_pair(options1, options2);
53
54        let (
55            local_channel1,
56            remote_channel1,
57        ) = divide_into_framed_streams::<TestStreamMessage, _>(Box::new(transport1));
58        let (
59            local_channel2,
60            remote_channel2,
61        ) = divide_into_framed_streams::<TestStreamMessage, _>(Box::new(transport2));
62
63        let options1 = TestOptions::random().with_data_len(data_len);
64        let options2 = TestOptions::random().with_data_len(data_len);
65
66        tokio::join!(
67            Box::pin(async move {
68                wait_random(0..=50).await;
69
70                test_framed_stream(local_channel1, local_channel2, options1).await;
71            }),
72            Box::pin(async move {
73                wait_random(0..=50).await;
74
75                test_framed_stream(remote_channel1, remote_channel2, options2).await;
76            }),
77        );
78    }
79}