connection-utils 0.8.0

Connection related utilities.
Documentation
use std::{fmt::Debug, pin::Pin};
use futures::{SinkExt, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Framed, Encoder, Decoder};
use cs_utils::{futures::wait_random, traits::Random, test::random_vec};

pub use crate::test::TestOptions;

pub type TFramedAsyncDuplex<T, K> = Framed<Pin<Box<K>>, T>;

/// Test data transfer in a framed stream. First passes data in `forward` direction and validates that all
/// data trassfered without issues. Then make another pass in `backward` direction with the same validation.
pub async fn test_framed_stream<
    TMessage: PartialEq + Clone + Debug + Random,
    TError: Debug,
    TEncoder: Encoder<TMessage> + Decoder<Item = TMessage, Error = TError>,
    TAsyncDuplex: AsyncRead + AsyncWrite + ?Sized,
>(
    stream1: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
    stream2: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
    options: TestOptions,
) -> (TFramedAsyncDuplex<TEncoder, TAsyncDuplex>, TFramedAsyncDuplex<TEncoder, TAsyncDuplex>) where <TEncoder as Encoder<TMessage>>::Error: Debug,
{
    // test in backward direction
    let (stream2, stream1) = run_framed_stream_test(stream2, stream1, options.clone()).await;
    // test in forward direction
    return run_framed_stream_test(stream1, stream2, options.clone()).await;
}

async fn run_framed_stream_test<
    TMessage: PartialEq + Clone + Debug + Random,
    TError: Debug,
    TEncoder: Encoder<TMessage> + Decoder<Item = TMessage, Error = TError>,
    TAsyncDuplex: AsyncRead + AsyncWrite + ?Sized,
>(
    mut stream1: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
    mut stream2: TFramedAsyncDuplex<TEncoder, TAsyncDuplex>,
    options: TestOptions,
) -> (TFramedAsyncDuplex<TEncoder, TAsyncDuplex>, TFramedAsyncDuplex<TEncoder, TAsyncDuplex>) where <TEncoder as Encoder<TMessage>>::Error: Debug,
{
    let data_len = options.data_len();
    let data = random_vec::<TMessage>(data_len as u32);

    let latency_range1 = options.latency_range();
    let latency_range2 = options.latency_range();

    let ((stream1, data), (stream2, received_data)) = tokio::join!(
        Box::pin(async move {
            for message in &data {
                wait_random(latency_range1.clone()).await;
    
                stream1.send(message.clone()).await.unwrap();
            }
    
            return (stream1, data);
        }),
        Box::pin(async move {
            let mut received_data = vec![];
    
            loop {
                wait_random(latency_range2.clone()).await;
    
                let message = stream2.next().await
                    .expect("Stream closed.")
                    .expect("Cannot read stream message.");
    
                received_data.push(message);
    
                if received_data.len() == data_len as usize {
                    break;
                }
            }
    
            return (stream2, received_data);
        }),
    );

    assert_eq!(
        data,
        received_data,
        "Sent and received data must match.",
    );

    return (stream1, stream2);
}