1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
//! A channel that exchanges values of arbitrary type with a remote endpoint
//! and is primarily used as the initial channel after [establishing a connection](crate::Connect)
//! with a remote endpoint.
//!
//! Each value is serialized into binary format before sending and deserialized
//! after it has been received.
//!
//! The sender and receiver of this channel cannot be sent to a remote endpoint.
//! However, you can send (objects containing) senders and receivers of other
//! channel types (for example [mpsc](super::mpsc), [oneshot](super::oneshot) or
//! [watch](super::watch)) via this channel to a remote endpoint.
//!
//! # Example
//!
//! In the following example the client sends a number over a base channel to the server.
//! The server converts it into a string and sends it back over another base channel.
//! The base channels have been obtained using a [connect function](crate::Connect).
//!
//! ```
//! use remoc::prelude::*;
//!
//! // This would be run on the client.
//! async fn client(mut tx: rch::base::Sender<u16>, mut rx: rch::base::Receiver<String>) {
//! tx.send(1).await.unwrap();
//! assert_eq!(rx.recv().await.unwrap(), Some("1".to_string()));
//!
//! tx.send(2).await.unwrap();
//! assert_eq!(rx.recv().await.unwrap(), Some("2".to_string()));
//!
//! tx.send(3).await.unwrap();
//! assert_eq!(rx.recv().await.unwrap(), Some("3".to_string()));
//! }
//!
//! // This would be run on the server.
//! async fn server(mut tx: rch::base::Sender<String>, mut rx: rch::base::Receiver<u16>) {
//! while let Some(number) = rx.recv().await.unwrap() {
//! tx.send(number.to_string()).await.unwrap();
//! }
//! }
//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
//! ```
//!
use serde::{Deserialize, Serialize};
use std::{error::Error, fmt};
mod io;
mod receiver;
mod sender;
pub use receiver::{PortDeserializer, Receiver, RecvError};
pub use sender::{Closed, PortSerializer, SendError, SendErrorKind, Sender};
use crate::{chmux, codec, RemoteSend};
/// Chunk queue length for big data (de-)serialization.
const BIG_DATA_CHUNK_QUEUE: usize = 32;
/// Limit for counting big data instances.
const BIG_DATA_LIMIT: i8 = 16;
/// Creating the remote channel failed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConnectError {
/// The connect request failed.
Connect(chmux::ConnectError),
/// Listening for the remote connect request failed.
Listen(chmux::ListenerError),
/// The remote endpoint did not send a connect request.
NoConnectRequest,
}
impl fmt::Display for ConnectError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ConnectError::Connect(err) => write!(f, "connect error: {}", err),
ConnectError::Listen(err) => write!(f, "listen error: {}", err),
ConnectError::NoConnectRequest => write!(f, "no connect request received"),
}
}
}
impl Error for ConnectError {}
impl From<chmux::ConnectError> for ConnectError {
fn from(err: chmux::ConnectError) -> Self {
Self::Connect(err)
}
}
impl From<chmux::ListenerError> for ConnectError {
fn from(err: chmux::ListenerError) -> Self {
Self::Listen(err)
}
}
/// Create a remote channel over an existing [chmux] connection.
///
/// This will send a connect request over the client and accept
/// one connection request from the listener.
///
/// Other connections may coexist on the chmux connection.
pub async fn connect<Tx, Rx, Codec>(
client: &chmux::Client, listener: &mut chmux::Listener,
) -> Result<(Sender<Tx, Codec>, Receiver<Rx, Codec>), ConnectError>
where
Tx: RemoteSend,
Rx: RemoteSend,
Codec: codec::Codec,
{
let (client_sr, listener_sr) = tokio::join!(client.connect(), listener.accept());
let (raw_sender, _) = client_sr?;
let (_, raw_receiver) = listener_sr?.ok_or(ConnectError::NoConnectRequest)?;
Ok((Sender::new(raw_sender), Receiver::new(raw_receiver)))
}
