use serde::{Serialize, de::DeserializeOwned};
use tokio_util::codec::LengthDelimitedCodec;
use std::{marker::PhantomData, fmt::Debug};
mod encoder;
mod decoder;
#[derive(Debug, Clone)]
pub struct GenericCodec<T: Serialize + DeserializeOwned> {
length_delimited_codec: LengthDelimitedCodec,
_phantom: PhantomData<T>,
}
impl<T: Serialize + DeserializeOwned> GenericCodec<T> {
pub fn new() -> Self {
return GenericCodec {
length_delimited_codec: LengthDelimitedCodec::new(),
_phantom: PhantomData,
};
}
}
impl<T: Serialize + DeserializeOwned> Default for GenericCodec<T> {
fn default() -> Self {
return GenericCodec::new();
}
}
#[cfg(test)]
mod tests {
use cs_utils::{random_number, random_str, random_bool, random_str_rg, futures::wait_random, traits::Random, test::random_vec_rg, swap};
mod object {
use rstest::rstest;
use tokio::try_join;
use tokio::io::duplex;
use tokio_util::codec::Framed;
use futures::{StreamExt, SinkExt};
use serde::{Serialize, Deserialize};
use super::*;
use crate::codecs::GenericCodec;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct TestStruct {
pub id: u64,
pub name: String,
pub is_flag: bool,
}
impl Random for TestStruct {
fn random() -> Self {
return TestStruct {
id: random_number(0..=u64::MAX),
name: random_str(32),
is_flag: random_bool(),
};
}
}
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn can_send_receive_messages_bidirectional(
#[case] is_forward_direction: bool,
) {
let (stream1, stream2) = duplex(4096);
let stream1_messages = random_vec_rg(4..8);
let stream1_messages1 = stream1_messages.clone();
let stream1_messages2 = stream1_messages.clone();
let stream2_messages = random_vec_rg(4..8);
let stream2_messages1 = stream2_messages.clone();
let stream2_messages2 = stream2_messages.clone();
let (stream1, stream2) = if !is_forward_direction {
swap(stream1, stream2)
} else {
(stream1, stream2)
};
try_join!(
tokio::spawn(async move {
let framed_stream = Framed::new(
stream1,
GenericCodec::<TestStruct>::new(),
);
let (
mut sink,
mut source,
) = framed_stream.split();
try_join!(
tokio::spawn(async move {
for message in stream1_messages1 {
sink
.send(message).await
.expect("Cannot send message.");
wait_random(5..25).await;
}
}),
tokio::spawn(async move {
let mut received_messages = vec![];
while let Some(maybe_message) = source.next().await {
let message = maybe_message
.expect("Failed to unwrap the message.");
received_messages.push(message);
wait_random(5..25).await;
if received_messages.len() == stream2_messages2.len() {
break;
}
}
assert_eq!(
received_messages,
stream2_messages2,
"Sent and received messages must match.",
);
}),
).unwrap();
}),
tokio::spawn(async move {
let framed_stream = Framed::new(
stream2,
GenericCodec::<TestStruct>::new(),
);
let (
mut sink,
mut source,
) = framed_stream.split();
try_join!(
tokio::spawn(async move {
for message in stream2_messages1 {
sink
.send(message).await
.expect("Cannot send message.");
wait_random(5..25).await;
}
}),
tokio::spawn(async move {
let mut received_messages = vec![];
while let Some(maybe_message) = source.next().await {
let message = maybe_message
.expect("Failed to unwrap the message.");
received_messages.push(message);
wait_random(5..25).await;
if received_messages.len() == stream1_messages2.len() {
break;
}
}
assert_eq!(
received_messages,
stream1_messages2,
"Sent and received messages must match.",
);
}),
).unwrap();
}),
).unwrap();
}
}
mod string {
use rstest::rstest;
use tokio::try_join;
use tokio::io::duplex;
use tokio_util::codec::Framed;
use futures::{StreamExt, SinkExt};
use super::*;
use crate::codecs::GenericCodec;
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn can_send_receive_messages_bidirectional(
#[case] is_forward_direction: bool,
) {
let (stream1, stream2) = duplex(4096);
let mut stream1_messages = vec![];
for _ in 0..random_number(4..8) {
stream1_messages.push(random_str_rg(8..32));
}
let mut stream2_messages = vec![];
for _ in 0..random_number(4..8) {
stream2_messages.push(random_str_rg(8..32));
}
let stream1_messages1 = stream1_messages.clone();
let stream1_messages2 = stream1_messages.clone();
let stream2_messages1 = stream2_messages.clone();
let stream2_messages2 = stream2_messages.clone();
let (stream1, stream2) = if !is_forward_direction {
swap(stream1, stream2)
} else {
(stream1, stream2)
};
try_join!(
tokio::spawn(async move {
let framed_stream = Framed::new(
stream1,
GenericCodec::<String>::new(),
);
let (
mut sink,
mut source,
) = framed_stream.split();
try_join!(
tokio::spawn(async move {
for message in stream1_messages1 {
sink
.send(message).await
.expect("Cannot send message.");
wait_random(5..25).await;
}
}),
tokio::spawn(async move {
let mut received_messages = vec![];
while let Some(maybe_message) = source.next().await {
let message = maybe_message
.expect("Failed to unwrap the message.");
received_messages.push(message);
wait_random(5..25).await;
if received_messages.len() == stream2_messages2.len() {
break;
}
}
assert_eq!(
received_messages,
stream2_messages2,
"Sent and received messages must match.",
);
}),
).unwrap();
}),
tokio::spawn(async move {
let framed_stream = Framed::new(
stream2,
GenericCodec::<String>::new(),
);
let (
mut sink,
mut source,
) = framed_stream.split();
try_join!(
tokio::spawn(async move {
for message in stream2_messages1 {
sink
.send(message).await
.expect("Cannot send message.");
wait_random(5..25).await;
}
}),
tokio::spawn(async move {
let mut received_messages = vec![];
while let Some(maybe_message) = source.next().await {
let message = maybe_message
.expect("Failed to unwrap the message.");
received_messages.push(message);
wait_random(5..25).await;
if received_messages.len() == stream1_messages2.len() {
break;
}
}
assert_eq!(
received_messages,
stream1_messages2,
"Sent and received messages must match.",
);
}),
).unwrap();
}),
).unwrap();
}
}
}