use std::fmt;
use std::io::ErrorKind;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Poll, Context};
use futures::{FutureExt, Future, ready};
use jsonrpc_core_client::RpcError;
use super::RpcChannelsServiceClient;
use connection_utils::{Channel, TDataReader};
mod remote_rpc_channel_event;
pub use remote_rpc_channel_event::RemoteRpcChannelEvent;
mod async_read;
mod async_write;
pub struct RpcChannel {
id: u16,
label: String,
data_source: TDataReader,
rpc_client: Arc<RpcChannelsServiceClient>,
write_async_future: Option<Pin<Box<dyn Future<Output = Result<usize, RpcError>> + Send>>>,
}
impl fmt::Debug for RpcChannel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
return f.debug_struct("RpcChannel")
.field("id", &self.id)
.field("label", &self.label)
.finish();
}
}
impl RpcChannel {
pub fn new(
mut receiver: RemoteRpcChannelEvent,
rpc_client: Arc<RpcChannelsServiceClient>,
) -> Self {
let data_source = receiver.on_data().expect("Cannot get on_data source.");
return RpcChannel {
rpc_client,
id: receiver.id(),
label: receiver.label().clone(),
data_source,
write_async_future: None,
};
}
fn poll_async_write_future(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<usize, std::io::Error>> {
let fut = self.write_async_future
.as_mut()
.expect("Cannot get mutable reference of the async write future.");
let res = ready!(fut.poll_unpin(cx));
self.write_async_future.take().unwrap();
return match res {
Ok(bytes_sent) => Poll::Ready(Ok(bytes_sent)),
Err(rpc_error) => Poll::Ready(Err(from_rpc_error(rpc_error))),
};
}
}
fn from_rpc_error(rpc_error: RpcError) -> std::io::Error {
let err: Box<dyn std::error::Error> = Box::new(rpc_error).into();
let res = std::io::Error::new(
ErrorKind::Other,
err.to_string(),
);
return res;
}
impl Channel for RpcChannel {
fn id(&self) -> u16 {
return self.id;
}
fn label(&self) -> &String {
return &self.label;
}
}
#[cfg(test)]
pub mod tests {
use tokio::io::AsyncReadExt;
pub use super::RemoteRpcChannelEvent;
pub async fn assert_rpc_channel_receiver_receives_data(
mut channel: RemoteRpcChannelEvent,
test_data: String,
) {
let mut on_data = channel.on_data().unwrap();
let mut received_data = String::new();
let mut data = [0; 1024];
loop {
let bytes_read = on_data
.read(&mut data).await
.expect("Cannot receive message.");
let message_str = std::str::from_utf8(&data[..bytes_read])
.expect("Cannot parse UTF8 message.")
.to_string();
received_data = format!("{}{}", &received_data, message_str);
if received_data.len() == test_data.len() {
assert_eq!(
received_data,
test_data,
"Sent and received data must match.",
);
break;
}
}
}
mod data_transfer {
pub use super::RemoteRpcChannelEvent;
mod receive_data {
use rstest::rstest;
use tokio::try_join;
use cs_utils::{random_number, random_str, random_str_rg};
pub use super::RemoteRpcChannelEvent;
#[rstest]
#[case::size_8_32(8, 32)]
#[case::size_128_512(128, 512)]
#[case::size_2048_4096(2048, 4096)]
#[case::size_4096_8192(4096, 8192)]
#[case::size_8192_16384(8192, 16384)]
#[tokio::test]
async fn receives_data(
#[case] str_min_size: usize,
#[case] str_max_size: usize,
) {
use tokio::io::{duplex, AsyncWriteExt};
let channel_id = random_number(0..u16::MAX);
let label = format!("rpc-channel-#{}", random_str(4));
let (mut on_data_sink, data_duplex_stream) = duplex(str_max_size + 1);
let channel = RemoteRpcChannelEvent::new(
channel_id,
label.clone(),
data_duplex_stream,
);
let test_data = vec![
random_str_rg(str_min_size..=str_max_size),
random_str_rg(str_min_size..=str_max_size),
random_str_rg(str_min_size..=str_max_size),
random_str_rg(str_min_size..=str_max_size),
random_str_rg(str_min_size..=str_max_size),
random_str_rg(str_min_size..=str_max_size),
random_str_rg(str_min_size..=str_max_size),
].join("");
let data_to_send = test_data.clone();
try_join!(
tokio::spawn(async move {
let mut i = 0;
let data = data_to_send.as_bytes().to_vec();
while i < data_to_send.len() {
let message_len = random_number(str_min_size..=str_max_size) / 2;
let message_len = if i + message_len < data.len() {
i + message_len
} else {
data.len()
};
let bytes_sent = on_data_sink
.write(&data[i..message_len]).await
.expect("Cannot send a message.");
assert!(
bytes_sent > 0,
"No bytes sent.",
);
i += bytes_sent as usize;
}
data_to_send
}),
tokio::spawn(async move {
crate::multiplexed_connection::rpc::tests::assert_rpc_channel_receiver_receives_data(channel, test_data).await;
}),
).unwrap();
}
}
}
}