remoc/rch/base/mod.rs
1//! A channel that exchanges values of arbitrary type with a remote endpoint
2//! and is primarily used as the initial channel after [establishing a connection](crate::Connect)
3//! with a remote endpoint.
4//!
5//! Each value is serialized into binary format before sending and deserialized
6//! after it has been received.
7//!
8//! The sender and receiver of this channel cannot be sent to a remote endpoint.
9//! However, you can send (objects containing) senders and receivers of other
10//! channel types (for example [mpsc](super::mpsc), [oneshot](super::oneshot) or
11//! [watch](super::watch)) via this channel to a remote endpoint.
12//!
13//! # Example
14//!
15//! In the following example the client sends a number over a base channel to the server.
16//! The server converts it into a string and sends it back over another base channel.
17//! The base channels have been obtained using a [connect function](crate::Connect).
18//!
19//! ```
20//! use remoc::prelude::*;
21//!
22//! // This would be run on the client.
23//! async fn client(mut tx: rch::base::Sender<u16>, mut rx: rch::base::Receiver<String>) {
24//! tx.send(1).await.unwrap();
25//! assert_eq!(rx.recv().await.unwrap(), Some("1".to_string()));
26//!
27//! tx.send(2).await.unwrap();
28//! assert_eq!(rx.recv().await.unwrap(), Some("2".to_string()));
29//!
30//! tx.send(3).await.unwrap();
31//! assert_eq!(rx.recv().await.unwrap(), Some("3".to_string()));
32//! }
33//!
34//! // This would be run on the server.
35//! async fn server(mut tx: rch::base::Sender<String>, mut rx: rch::base::Receiver<u16>) {
36//! while let Some(number) = rx.recv().await.unwrap() {
37//! tx.send(number.to_string()).await.unwrap();
38//! }
39//! }
40//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
41//! ```
42//!
43
44use serde::{Deserialize, Serialize, de::DeserializeOwned};
45use std::{error::Error, fmt};
46
47mod io;
48mod receiver;
49mod sender;
50
51pub use receiver::{PortDeserializer, Receiver, RecvError};
52pub use sender::{Closed, PortSerializer, SendError, SendErrorKind, Sender};
53
54use crate::{RemoteSend, chmux, codec};
55
56/// Chunk queue length for big data (de-)serialization.
57const BIG_DATA_CHUNK_QUEUE: usize = 32;
58
59/// Limit for counting big data instances.
60const BIG_DATA_LIMIT: i8 = 16;
61
62/// Creating the remote channel failed.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub enum ConnectError {
65 /// The connect request failed.
66 Connect(chmux::ConnectError),
67 /// Listening for the remote connect request failed.
68 Listen(chmux::ListenerError),
69 /// The remote endpoint did not send a connect request.
70 NoConnectRequest,
71}
72
73impl fmt::Display for ConnectError {
74 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
75 match self {
76 ConnectError::Connect(err) => write!(f, "connect error: {err}"),
77 ConnectError::Listen(err) => write!(f, "listen error: {err}"),
78 ConnectError::NoConnectRequest => write!(f, "no connect request received"),
79 }
80 }
81}
82
83impl Error for ConnectError {}
84
85impl From<chmux::ConnectError> for ConnectError {
86 fn from(err: chmux::ConnectError) -> Self {
87 Self::Connect(err)
88 }
89}
90
91impl From<chmux::ListenerError> for ConnectError {
92 fn from(err: chmux::ListenerError) -> Self {
93 Self::Listen(err)
94 }
95}
96
97/// Create a remote channel over an existing [chmux] connection.
98///
99/// This will send a connect request over the client and accept
100/// one connection request from the listener.
101///
102/// Other connections may coexist on the chmux connection.
103pub async fn connect<Tx, Rx, Codec>(
104 client: &chmux::Client, listener: &mut chmux::Listener,
105) -> Result<(Sender<Tx, Codec>, Receiver<Rx, Codec>), ConnectError>
106where
107 Tx: RemoteSend,
108 Rx: RemoteSend,
109 Codec: codec::Codec,
110{
111 let (client_sr, listener_sr) = tokio::join!(client.connect(), listener.accept());
112 let (raw_sender, _) = client_sr?;
113 let (_, raw_receiver) = listener_sr?.ok_or(ConnectError::NoConnectRequest)?;
114 Ok((Sender::new(raw_sender), Receiver::new(raw_receiver)))
115}
116
117/// Extensions for base channels.
118pub trait BaseExt<T, Codec> {
119 /// Sets the maximum item size for the channel.
120 fn with_max_item_size(self, max_item_size: usize) -> (Sender<T, Codec>, Receiver<T, Codec>);
121}
122
123impl<T, Codec> BaseExt<T, Codec> for (Sender<T, Codec>, Receiver<T, Codec>)
124where
125 T: Serialize + DeserializeOwned + Send + 'static,
126 Codec: codec::Codec,
127{
128 fn with_max_item_size(self, max_item_size: usize) -> (Sender<T, Codec>, Receiver<T, Codec>) {
129 let (mut tx, mut rx) = self;
130 tx.set_max_item_size(max_item_size);
131 rx.set_max_item_size(max_item_size);
132 (tx, rx)
133 }
134}