connection-utils 0.8.0

Connection related utilities.
Documentation
use serde::{Serialize, de::DeserializeOwned};
use tokio_util::codec::LengthDelimitedCodec;
use std::{marker::PhantomData, fmt::Debug};

mod encoder;
mod decoder;

/// Codec to pass over any struct that implements `Serialize` and `DeserializeOwned` traits.
/// 
/// ### Examples
/// 
/// ```
/// use cs_utils::{random_number, random_str, random_bool};
/// use tokio_util::codec::Framed;
/// use futures::{StreamExt, SinkExt};
/// use tokio::{io::duplex, try_join};
/// use serde::{Serialize, Deserialize};
/// use connection_utils::codecs::GenericCodec;
/// 
/// #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
/// struct TestStruct {
///     id: u16,
///     name: String,
///     is_flag: bool,
/// }
/// 
/// println!("\n🏷️  Generic codec example:");
/// println!("\n⚠️ Note: the \"async\" feature must be enabled!\n");
///
/// let (stream, _) = duplex(4096);
///
/// let message = TestStruct {
///     id: random_number(0..=u16::MAX),
///     name: random_str(8),
///     is_flag: random_bool(),
/// };
/// 
/// let framed_stream = Framed::new(
///     stream,
///     GenericCodec::<TestStruct>::new(),
///  );
/// 
/// let (mut sink, mut source) = framed_stream.split();
/// 
/// // send/receive the framed data using the sink/source
/// // see "generic_codec" example for more details
/// ```
#[derive(Debug, Clone)]
pub struct GenericCodec<T: Serialize + DeserializeOwned> {
    length_delimited_codec: LengthDelimitedCodec,
    _phantom: PhantomData<T>,
}

impl<T: Serialize + DeserializeOwned> GenericCodec<T> {
    pub fn new() -> Self {
        return GenericCodec {
            length_delimited_codec: LengthDelimitedCodec::new(),
            _phantom: PhantomData,
        };
    }
}

impl<T: Serialize + DeserializeOwned> Default for GenericCodec<T> {
    fn default() -> Self {
        return GenericCodec::new();
    }
}

#[cfg(test)]
mod tests {
    use cs_utils::{random_number, random_str, random_bool, random_str_rg, futures::wait_random, traits::Random, test::random_vec_rg, swap};

    mod object {
        use rstest::rstest;
        use tokio::try_join;
        use tokio::io::duplex;
        use tokio_util::codec::Framed;
        use futures::{StreamExt, SinkExt};
        use serde::{Serialize, Deserialize};

        use super::*;
        use crate::codecs::GenericCodec;

        #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
        pub struct TestStruct {
            pub id: u64,
            pub name: String,
            pub is_flag: bool,
        }

        impl Random for TestStruct {
            fn random() -> Self {
                return TestStruct {
                    id: random_number(0..=u64::MAX),
                    name: random_str(32),
                    is_flag: random_bool(),
                };
            }
        }

        #[rstest]
        #[case(true)]
        #[case(false)]
        #[tokio::test]
        async fn can_send_receive_messages_bidirectional(
            #[case] is_forward_direction: bool,
        ) {
            let (stream1, stream2) = duplex(4096);
            
            let stream1_messages = random_vec_rg(4..8);
            let stream1_messages1 = stream1_messages.clone();
            let stream1_messages2 = stream1_messages.clone();

            let stream2_messages = random_vec_rg(4..8);
            let stream2_messages1 = stream2_messages.clone();
            let stream2_messages2 = stream2_messages.clone();

            // maybe swap the direction
            let (stream1, stream2) = if !is_forward_direction {
                swap(stream1, stream2)
            } else {
                (stream1, stream2)
            };

            try_join!(
                tokio::spawn(async move {
                    let framed_stream = Framed::new(
                        stream1,
                        GenericCodec::<TestStruct>::new(),
                    );
                    let (
                        mut sink,
                        mut source,
                    ) = framed_stream.split();

                    try_join!(
                        tokio::spawn(async move {
                            for message in stream1_messages1 {
                                sink
                                    .send(message).await
                                    .expect("Cannot send message.");
            
                                wait_random(5..25).await;
                            }
                        }),
                        tokio::spawn(async move {
                            let mut received_messages = vec![];
            
                            while let Some(maybe_message) = source.next().await {
                                let message = maybe_message
                                    .expect("Failed to unwrap the message.");
            
                                received_messages.push(message);
                                
                                wait_random(5..25).await;

                                // otherwise the test never completes
                                if received_messages.len() == stream2_messages2.len() {
                                    break;
                                }
                            }
            
                            assert_eq!(
                                received_messages,
                                stream2_messages2,
                                "Sent and received messages must match.",
                            );
                        }),
                    ).unwrap();
                }),
                tokio::spawn(async move {
                    let framed_stream = Framed::new(
                        stream2,
                        GenericCodec::<TestStruct>::new(),
                    );
                    let (
                        mut sink,
                        mut source,
                    ) = framed_stream.split();

                    try_join!(
                        tokio::spawn(async move {
                            for message in stream2_messages1 {
                                sink
                                    .send(message).await
                                    .expect("Cannot send message.");
            
                                wait_random(5..25).await;
                            }
                        }),
                        tokio::spawn(async move {
                            let mut received_messages = vec![];
            
                            while let Some(maybe_message) = source.next().await {
                                let message = maybe_message
                                    .expect("Failed to unwrap the message.");
            
                                received_messages.push(message);
                                
                                wait_random(5..25).await;

                                // otherwise the test never completes
                                if received_messages.len() == stream1_messages2.len() {
                                    break;
                                }
                            }
            
                            assert_eq!(
                                received_messages,
                                stream1_messages2,
                                "Sent and received messages must match.",
                            );
                        }),
                    ).unwrap();
                }),
            ).unwrap();
        }
    }

    mod string {
        use rstest::rstest;
        use tokio::try_join;
        use tokio::io::duplex;
        use tokio_util::codec::Framed;
        use futures::{StreamExt, SinkExt};

        use super::*;
        use crate::codecs::GenericCodec;

        #[rstest]
        #[case(true)]
        #[case(false)]
        #[tokio::test]
        async fn can_send_receive_messages_bidirectional(
            #[case] is_forward_direction: bool,
        ) {
            let (stream1, stream2) = duplex(4096);

            let mut stream1_messages = vec![];
            for _ in 0..random_number(4..8) {
                stream1_messages.push(random_str_rg(8..32));
            }

            let mut stream2_messages = vec![];
            for _ in 0..random_number(4..8) {
                stream2_messages.push(random_str_rg(8..32));
            }
            
            let stream1_messages1 = stream1_messages.clone();
            let stream1_messages2 = stream1_messages.clone();

            let stream2_messages1 = stream2_messages.clone();
            let stream2_messages2 = stream2_messages.clone();

            // maybe swap the direction
            let (stream1, stream2) = if !is_forward_direction {
                swap(stream1, stream2)
            } else {
                (stream1, stream2)
            };

            try_join!(
                tokio::spawn(async move {
                    let framed_stream = Framed::new(
                        stream1,
                        GenericCodec::<String>::new(),
                    );
                    let (
                        mut sink,
                        mut source,
                    ) = framed_stream.split();

                    try_join!(
                        tokio::spawn(async move {
                            for message in stream1_messages1 {
                                sink
                                    .send(message).await
                                    .expect("Cannot send message.");
            
                                wait_random(5..25).await;
                            }
                        }),
                        tokio::spawn(async move {
                            let mut received_messages = vec![];
            
                            while let Some(maybe_message) = source.next().await {
                                let message = maybe_message
                                    .expect("Failed to unwrap the message.");
            
                                received_messages.push(message);
                                
                                wait_random(5..25).await;

                                // otherwise the test never completes
                                if received_messages.len() == stream2_messages2.len() {
                                    break;
                                }
                            }
            
                            assert_eq!(
                                received_messages,
                                stream2_messages2,
                                "Sent and received messages must match.",
                            );
                        }),
                    ).unwrap();
                }),
                tokio::spawn(async move {
                    let framed_stream = Framed::new(
                        stream2,
                        GenericCodec::<String>::new(),
                    );
                    let (
                        mut sink,
                        mut source,
                    ) = framed_stream.split();

                    try_join!(
                        tokio::spawn(async move {
                            for message in stream2_messages1 {
                                sink
                                    .send(message).await
                                    .expect("Cannot send message.");
            
                                wait_random(5..25).await;
                            }
                        }),
                        tokio::spawn(async move {
                            let mut received_messages = vec![];
            
                            while let Some(maybe_message) = source.next().await {
                                let message = maybe_message
                                    .expect("Failed to unwrap the message.");
            
                                received_messages.push(message);
                                
                                wait_random(5..25).await;

                                // otherwise the test never completes
                                if received_messages.len() == stream1_messages2.len() {
                                    break;
                                }
                            }
            
                            assert_eq!(
                                received_messages,
                                stream1_messages2,
                                "Sent and received messages must match.",
                            );
                        }),
                    ).unwrap();
                }),
            ).unwrap();
        }
    }
}