remoc/rch/base/
mod.rs

1//! A channel that exchanges values of arbitrary type with a remote endpoint
2//! and is primarily used as the initial channel after [establishing a connection](crate::Connect)
3//! with a remote endpoint.
4//!
5//! Each value is serialized into binary format before sending and deserialized
6//! after it has been received.
7//!
8//! The sender and receiver of this channel cannot be sent to a remote endpoint.
9//! However, you can send (objects containing) senders and receivers of other
10//! channel types (for example [mpsc](super::mpsc), [oneshot](super::oneshot) or
11//! [watch](super::watch)) via this channel to a remote endpoint.
12//!
13//! # Example
14//!
15//! In the following example the client sends a number over a base channel to the server.
16//! The server converts it into a string and sends it back over another base channel.
17//! The base channels have been obtained using a [connect function](crate::Connect).
18//!
19//! ```
20//! use remoc::prelude::*;
21//!
22//! // This would be run on the client.
23//! async fn client(mut tx: rch::base::Sender<u16>, mut rx: rch::base::Receiver<String>) {
24//!     tx.send(1).await.unwrap();
25//!     assert_eq!(rx.recv().await.unwrap(), Some("1".to_string()));
26//!
27//!     tx.send(2).await.unwrap();
28//!     assert_eq!(rx.recv().await.unwrap(), Some("2".to_string()));
29//!
30//!     tx.send(3).await.unwrap();
31//!     assert_eq!(rx.recv().await.unwrap(), Some("3".to_string()));
32//! }
33//!
34//! // This would be run on the server.
35//! async fn server(mut tx: rch::base::Sender<String>, mut rx: rch::base::Receiver<u16>) {
36//!     while let Some(number) = rx.recv().await.unwrap() {
37//!         tx.send(number.to_string()).await.unwrap();
38//!     }
39//! }
40//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
41//! ```
42//!
43
44use serde::{Deserialize, Serialize, de::DeserializeOwned};
45use std::{error::Error, fmt};
46
47mod io;
48mod receiver;
49mod sender;
50
51pub use receiver::{PortDeserializer, Receiver, RecvError};
52pub use sender::{Closed, PortSerializer, SendError, SendErrorKind, Sender};
53
54use crate::{RemoteSend, chmux, codec};
55
56/// Chunk queue length for big data (de-)serialization.
57const BIG_DATA_CHUNK_QUEUE: usize = 32;
58
59/// Limit for counting big data instances.
60const BIG_DATA_LIMIT: i8 = 16;
61
62/// Creating the remote channel failed.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum ConnectError {
65    /// The connect request failed.
66    Connect(chmux::ConnectError),
67    /// Listening for the remote connect request failed.
68    Listen(chmux::ListenerError),
69    /// The remote endpoint did not send a connect request.
70    NoConnectRequest,
71}
72
73impl fmt::Display for ConnectError {
74    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75        match self {
76            ConnectError::Connect(err) => write!(f, "connect error: {err}"),
77            ConnectError::Listen(err) => write!(f, "listen error: {err}"),
78            ConnectError::NoConnectRequest => write!(f, "no connect request received"),
79        }
80    }
81}
82
83impl Error for ConnectError {}
84
85impl From<chmux::ConnectError> for ConnectError {
86    fn from(err: chmux::ConnectError) -> Self {
87        Self::Connect(err)
88    }
89}
90
91impl From<chmux::ListenerError> for ConnectError {
92    fn from(err: chmux::ListenerError) -> Self {
93        Self::Listen(err)
94    }
95}
96
97/// Create a remote channel over an existing [chmux] connection.
98///
99/// This will send a connect request over the client and accept
100/// one connection request from the listener.
101///
102/// Other connections may coexist on the chmux connection.
103pub async fn connect<Tx, Rx, Codec>(
104    client: &chmux::Client, listener: &mut chmux::Listener,
105) -> Result<(Sender<Tx, Codec>, Receiver<Rx, Codec>), ConnectError>
106where
107    Tx: RemoteSend,
108    Rx: RemoteSend,
109    Codec: codec::Codec,
110{
111    let (client_sr, listener_sr) = tokio::join!(client.connect(), listener.accept());
112    let (raw_sender, _) = client_sr?;
113    let (_, raw_receiver) = listener_sr?.ok_or(ConnectError::NoConnectRequest)?;
114    Ok((Sender::new(raw_sender), Receiver::new(raw_receiver)))
115}
116
117/// Extensions for base channels.
118pub trait BaseExt<T, Codec> {
119    /// Sets the maximum item size for the channel.
120    fn with_max_item_size(self, max_item_size: usize) -> (Sender<T, Codec>, Receiver<T, Codec>);
121}
122
123impl<T, Codec> BaseExt<T, Codec> for (Sender<T, Codec>, Receiver<T, Codec>)
124where
125    T: Serialize + DeserializeOwned + Send + 'static,
126    Codec: codec::Codec,
127{
128    fn with_max_item_size(self, max_item_size: usize) -> (Sender<T, Codec>, Receiver<T, Codec>) {
129        let (mut tx, mut rx) = self;
130        tx.set_max_item_size(max_item_size);
131        rx.set_max_item_size(max_item_size);
132        (tx, rx)
133    }
134}