connection-utils 0.8.0

Connection related utilities.
Documentation
use std::pin::Pin;

use tokio_util::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite};
use serde::{Serialize, de::DeserializeOwned};

use crate::{divide_stream, Channel, codecs::GenericCodec, types::TFramedAsyncDuplex};

/// Creates a new framed stream out of a channel.
pub fn divide_into_framed_streams<
    T: Serialize + DeserializeOwned,
    TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static,
>(
    stream: Box<TAsyncDuplex>,
) -> (TFramedAsyncDuplex<T, dyn Channel>, TFramedAsyncDuplex<T, dyn Channel>) {
    let (channel1, channel2) = divide_stream(stream);

    let channel1 = Framed::new(
        Pin::new(channel1),
        GenericCodec::<T>::new(),
    );

    let channel2 = Framed::new(
        Pin::new(channel2),
        GenericCodec::<T>::new(),
    );

    return (channel1, channel2);
}

#[cfg(test)]
mod tests {
    use rstest::rstest;
    use cs_utils::{traits::Random, futures::wait_random};

    use crate::{test::{TestStreamMessage, test_framed_stream, TestOptions}, mocks::{channel_mock_pair, ChannelMockOptions}};

    use super::divide_into_framed_streams;

    #[rstest]
    #[case(64)]
    #[case(128)]
    #[case(256)]
    #[case(512)]
    #[tokio::test]
    async fn divides_stream_into_framed_channel(
        #[case] data_len: usize,
    ) {
        let options1 = ChannelMockOptions::random();
        let options2 = ChannelMockOptions::random();

        let (transport1, transport2) = channel_mock_pair(options1, options2);

        let (
            local_channel1,
            remote_channel1,
        ) = divide_into_framed_streams::<TestStreamMessage, _>(Box::new(transport1));
        let (
            local_channel2,
            remote_channel2,
        ) = divide_into_framed_streams::<TestStreamMessage, _>(Box::new(transport2));

        let options1 = TestOptions::random().with_data_len(data_len);
        let options2 = TestOptions::random().with_data_len(data_len);

        tokio::join!(
            Box::pin(async move {
                wait_random(0..=50).await;

                test_framed_stream(local_channel1, local_channel2, options1).await;
            }),
            Box::pin(async move {
                wait_random(0..=50).await;

                test_framed_stream(remote_channel1, remote_channel2, options2).await;
            }),
        );
    }
}