1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
//! A channel that exchanges values of arbitrary type with a remote endpoint
//! and is primarily used as the initial channel after [establishing a connection](crate::Connect)
//! with a remote endpoint.
//!
//! Each value is serialized into binary format before sending and deserialized
//! after it has been received.
//!
//! The sender and receiver of this channel cannot be sent to a remote endpoint.
//! However, you can send (objects containing) senders and receivers of other
//! channel types (for example [mpsc](super::mpsc), [oneshot](super::oneshot) or
//! [watch](super::watch)) via this channel to a remote endpoint.
//!
//! # Example
//!
//! In the following example the client sends a number over a base channel to the server.
//! The server converts it into a string and sends it back over another base channel.
//! The base channels have been obtained using a [connect function](crate::Connect).
//!
//! ```
//! use remoc::prelude::*;
//!
//! // This would be run on the client.
//! async fn client(mut tx: rch::base::Sender<u16>, mut rx: rch::base::Receiver<String>) {
//! tx.send(1).await.unwrap();
//! assert_eq!(rx.recv().await.unwrap(), Some("1".to_string()));
//!
//! tx.send(2).await.unwrap();
//! assert_eq!(rx.recv().await.unwrap(), Some("2".to_string()));
//!
//! tx.send(3).await.unwrap();
//! assert_eq!(rx.recv().await.unwrap(), Some("3".to_string()));
//! }
//!
//! // This would be run on the server.
//! async fn server(mut tx: rch::base::Sender<String>, mut rx: rch::base::Receiver<u16>) {
//! while let Some(number) = rx.recv().await.unwrap() {
//! tx.send(number.to_string()).await.unwrap();
//! }
//! }
//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
//! ```
//!
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{error::Error, fmt};
mod io;
mod receiver;
mod sender;
pub use receiver::{PortDeserializer, Receiver, RecvError};
pub use sender::{Closed, PortSerializer, SendError, SendErrorKind, Sender};
use crate::{chmux, codec, RemoteSend};
/// Chunk queue length for big data (de-)serialization.
const BIG_DATA_CHUNK_QUEUE: usize = 32;
/// Limit for counting big data instances.
const BIG_DATA_LIMIT: i8 = 16;
/// Creating the remote channel failed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConnectError {
/// The connect request failed.
Connect(chmux::ConnectError),
/// Listening for the remote connect request failed.
Listen(chmux::ListenerError),
/// The remote endpoint did not send a connect request.
NoConnectRequest,
}
impl fmt::Display for ConnectError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ConnectError::Connect(err) => write!(f, "connect error: {err}"),
ConnectError::Listen(err) => write!(f, "listen error: {err}"),
ConnectError::NoConnectRequest => write!(f, "no connect request received"),
}
}
}
impl Error for ConnectError {}
impl From<chmux::ConnectError> for ConnectError {
fn from(err: chmux::ConnectError) -> Self {
Self::Connect(err)
}
}
impl From<chmux::ListenerError> for ConnectError {
fn from(err: chmux::ListenerError) -> Self {
Self::Listen(err)
}
}
/// Create a remote channel over an existing [chmux] connection.
///
/// This will send a connect request over the client and accept
/// one connection request from the listener.
///
/// Other connections may coexist on the chmux connection.
pub async fn connect<Tx, Rx, Codec>(
client: &chmux::Client, listener: &mut chmux::Listener,
) -> Result<(Sender<Tx, Codec>, Receiver<Rx, Codec>), ConnectError>
where
Tx: RemoteSend,
Rx: RemoteSend,
Codec: codec::Codec,
{
let (client_sr, listener_sr) = tokio::join!(client.connect(), listener.accept());
let (raw_sender, _) = client_sr?;
let (_, raw_receiver) = listener_sr?.ok_or(ConnectError::NoConnectRequest)?;
Ok((Sender::new(raw_sender), Receiver::new(raw_receiver)))
}
/// Extensions for base channels.
pub trait BaseExt<T, Codec> {
/// Sets the maximum item size for the channel.
fn with_max_item_size(self, max_item_size: usize) -> (Sender<T, Codec>, Receiver<T, Codec>);
}
impl<T, Codec> BaseExt<T, Codec> for (Sender<T, Codec>, Receiver<T, Codec>)
where
T: Serialize + DeserializeOwned + Send + 'static,
Codec: codec::Codec,
{
fn with_max_item_size(self, max_item_size: usize) -> (Sender<T, Codec>, Receiver<T, Codec>) {
let (mut tx, mut rx) = self;
tx.set_max_item_size(max_item_size);
rx.set_max_item_size(max_item_size);
(tx, rx)
}
}