use std::pin::Pin;
use tokio_util::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite};
use serde::{Serialize, de::DeserializeOwned};
use crate::{divide_stream, Channel, codecs::GenericCodec, types::TFramedAsyncDuplex};
pub fn divide_into_framed_streams<
T: Serialize + DeserializeOwned,
TAsyncDuplex: AsyncRead + AsyncWrite + Send + Unpin + ?Sized + 'static,
>(
stream: Box<TAsyncDuplex>,
) -> (TFramedAsyncDuplex<T, dyn Channel>, TFramedAsyncDuplex<T, dyn Channel>) {
let (channel1, channel2) = divide_stream(stream);
let channel1 = Framed::new(
Pin::new(channel1),
GenericCodec::<T>::new(),
);
let channel2 = Framed::new(
Pin::new(channel2),
GenericCodec::<T>::new(),
);
return (channel1, channel2);
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use cs_utils::{traits::Random, futures::wait_random};
use crate::{test::{TestStreamMessage, test_framed_stream, TestOptions}, mocks::{channel_mock_pair, ChannelMockOptions}};
use super::divide_into_framed_streams;
#[rstest]
#[case(64)]
#[case(128)]
#[case(256)]
#[case(512)]
#[tokio::test]
async fn divides_stream_into_framed_channel(
#[case] data_len: usize,
) {
let options1 = ChannelMockOptions::random();
let options2 = ChannelMockOptions::random();
let (transport1, transport2) = channel_mock_pair(options1, options2);
let (
local_channel1,
remote_channel1,
) = divide_into_framed_streams::<TestStreamMessage, _>(Box::new(transport1));
let (
local_channel2,
remote_channel2,
) = divide_into_framed_streams::<TestStreamMessage, _>(Box::new(transport2));
let options1 = TestOptions::random().with_data_len(data_len);
let options2 = TestOptions::random().with_data_len(data_len);
tokio::join!(
Box::pin(async move {
wait_random(0..=50).await;
test_framed_stream(local_channel1, local_channel2, options1).await;
}),
Box::pin(async move {
wait_random(0..=50).await;
test_framed_stream(remote_channel1, remote_channel2, options2).await;
}),
);
}
}