1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
//! A channel that exchanges values of arbitrary type with a remote endpoint
//! and is primarily used as the initial channel after [establishing a connection](crate::Connect)
//! with a remote endpoint.
//!
//! Each value is serialized into binary format before sending and deserialized
//! after it has been received.
//!
//! The sender and receiver of this channel cannot be sent to a remote endpoint.
//! However, you can send (objects containing) senders and receivers of other
//! channel types (for example [mpsc](super::mpsc), [oneshot](super::oneshot) or
//! [watch](super::watch)) via this channel to a remote endpoint.
//!
//! # Example
//!
//! In the following example the client sends a number over a base channel to the server.
//! The server converts it into a string and sends it back over another base channel.
//! The base channels have been obtained using a [connect function](crate::Connect).
//!
//! ```
//! use remoc::prelude::*;
//!
//! // This would be run on the client.
//! async fn client(mut tx: rch::base::Sender<u16>, mut rx: rch::base::Receiver<String>) {
//!     tx.send(1).await.unwrap();
//!     assert_eq!(rx.recv().await.unwrap(), Some("1".to_string()));
//!
//!     tx.send(2).await.unwrap();
//!     assert_eq!(rx.recv().await.unwrap(), Some("2".to_string()));
//!
//!     tx.send(3).await.unwrap();
//!     assert_eq!(rx.recv().await.unwrap(), Some("3".to_string()));
//! }
//!
//! // This would be run on the server.
//! async fn server(mut tx: rch::base::Sender<String>, mut rx: rch::base::Receiver<u16>) {
//!     while let Some(number) = rx.recv().await.unwrap() {
//!         tx.send(number.to_string()).await.unwrap();
//!     }
//! }
//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
//! ```
//!

use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{error::Error, fmt};

mod io;
mod receiver;
mod sender;

pub use receiver::{PortDeserializer, Receiver, RecvError};
pub use sender::{Closed, PortSerializer, SendError, SendErrorKind, Sender};

use crate::{chmux, codec, RemoteSend};

/// Chunk queue length for big data (de-)serialization.
const BIG_DATA_CHUNK_QUEUE: usize = 32;

/// Limit for counting big data instances.
const BIG_DATA_LIMIT: i8 = 16;

/// Creating the remote channel failed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConnectError {
    /// The connect request failed.
    Connect(chmux::ConnectError),
    /// Listening for the remote connect request failed.
    Listen(chmux::ListenerError),
    /// The remote endpoint did not send a connect request.
    NoConnectRequest,
}

impl fmt::Display for ConnectError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            ConnectError::Connect(err) => write!(f, "connect error: {err}"),
            ConnectError::Listen(err) => write!(f, "listen error: {err}"),
            ConnectError::NoConnectRequest => write!(f, "no connect request received"),
        }
    }
}

impl Error for ConnectError {}

impl From<chmux::ConnectError> for ConnectError {
    fn from(err: chmux::ConnectError) -> Self {
        Self::Connect(err)
    }
}

impl From<chmux::ListenerError> for ConnectError {
    fn from(err: chmux::ListenerError) -> Self {
        Self::Listen(err)
    }
}

/// Create a remote channel over an existing [chmux] connection.
///
/// This will send a connect request over the client and accept
/// one connection request from the listener.
///
/// Other connections may coexist on the chmux connection.
pub async fn connect<Tx, Rx, Codec>(
    client: &chmux::Client, listener: &mut chmux::Listener,
) -> Result<(Sender<Tx, Codec>, Receiver<Rx, Codec>), ConnectError>
where
    Tx: RemoteSend,
    Rx: RemoteSend,
    Codec: codec::Codec,
{
    let (client_sr, listener_sr) = tokio::join!(client.connect(), listener.accept());
    let (raw_sender, _) = client_sr?;
    let (_, raw_receiver) = listener_sr?.ok_or(ConnectError::NoConnectRequest)?;
    Ok((Sender::new(raw_sender), Receiver::new(raw_receiver)))
}

/// Extensions for base channels.
pub trait BaseExt<T, Codec> {
    /// Sets the maximum item size for the channel.
    fn with_max_item_size(self, max_item_size: usize) -> (Sender<T, Codec>, Receiver<T, Codec>);
}

impl<T, Codec> BaseExt<T, Codec> for (Sender<T, Codec>, Receiver<T, Codec>)
where
    T: Serialize + DeserializeOwned + Send + 'static,
    Codec: codec::Codec,
{
    fn with_max_item_size(self, max_item_size: usize) -> (Sender<T, Codec>, Receiver<T, Codec>) {
        let (mut tx, mut rx) = self;
        tx.set_max_item_size(max_item_size);
        rx.set_max_item_size(max_item_size);
        (tx, rx)
    }
}