logo
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
//! A channel that exchanges values of arbitrary type with a remote endpoint
//! and is primarily used as the initial channel after [establishing a connection](crate::Connect)
//! with a remote endpoint.
//!
//! Each value is serialized into binary format before sending and deserialized
//! after it has been received.
//!
//! The sender and receiver of this channel cannot be sent to a remote endpoint.
//! However, you can send (objects containing) senders and receivers of other
//! channel types (for example [mpsc](super::mpsc), [oneshot](super::oneshot) or
//! [watch](super::watch)) via this channel to a remote endpoint.
//!
//! # Example
//!
//! In the following example the client sends a number over a base channel to the server.
//! The server converts it into a string and sends it back over another base channel.
//! The base channels have been obtained using a [connect function](crate::Connect).
//!
//! ```
//! use remoc::prelude::*;
//!
//! // This would be run on the client.
//! async fn client(mut tx: rch::base::Sender<u16>, mut rx: rch::base::Receiver<String>) {
//!     tx.send(1).await.unwrap();
//!     assert_eq!(rx.recv().await.unwrap(), Some("1".to_string()));
//!
//!     tx.send(2).await.unwrap();
//!     assert_eq!(rx.recv().await.unwrap(), Some("2".to_string()));
//!
//!     tx.send(3).await.unwrap();
//!     assert_eq!(rx.recv().await.unwrap(), Some("3".to_string()));
//! }
//!
//! // This would be run on the server.
//! async fn server(mut tx: rch::base::Sender<String>, mut rx: rch::base::Receiver<u16>) {
//!     while let Some(number) = rx.recv().await.unwrap() {
//!         tx.send(number.to_string()).await.unwrap();
//!     }
//! }
//! # tokio_test::block_on(remoc::doctest::client_server_bidir(client, server));
//! ```
//!

use serde::{Deserialize, Serialize};
use std::{error::Error, fmt};

mod io;
mod receiver;
mod sender;

pub use receiver::{PortDeserializer, Receiver, RecvError};
pub use sender::{Closed, PortSerializer, SendError, SendErrorKind, Sender};

use crate::{chmux, codec, RemoteSend};

/// Chunk queue length for big data (de-)serialization.
const BIG_DATA_CHUNK_QUEUE: usize = 32;

/// Limit for counting big data instances.
const BIG_DATA_LIMIT: i8 = 16;

/// Creating the remote channel failed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConnectError {
    /// The connect request failed.
    Connect(chmux::ConnectError),
    /// Listening for the remote connect request failed.
    Listen(chmux::ListenerError),
    /// The remote endpoint did not send a connect request.
    NoConnectRequest,
}

impl fmt::Display for ConnectError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            ConnectError::Connect(err) => write!(f, "connect error: {}", err),
            ConnectError::Listen(err) => write!(f, "listen error: {}", err),
            ConnectError::NoConnectRequest => write!(f, "no connect request received"),
        }
    }
}

impl Error for ConnectError {}

impl From<chmux::ConnectError> for ConnectError {
    fn from(err: chmux::ConnectError) -> Self {
        Self::Connect(err)
    }
}

impl From<chmux::ListenerError> for ConnectError {
    fn from(err: chmux::ListenerError) -> Self {
        Self::Listen(err)
    }
}

/// Create a remote channel over an existing [chmux] connection.
///
/// This will send a connect request over the client and accept
/// one connection request from the listener.
///
/// Other connections may coexist on the chmux connection.
pub async fn connect<Tx, Rx, Codec>(
    client: &chmux::Client, listener: &mut chmux::Listener,
) -> Result<(Sender<Tx, Codec>, Receiver<Rx, Codec>), ConnectError>
where
    Tx: RemoteSend,
    Rx: RemoteSend,
    Codec: codec::Codec,
{
    let (client_sr, listener_sr) = tokio::join!(client.connect(), listener.accept());
    let (raw_sender, _) = client_sr?;
    let (_, raw_receiver) = listener_sr?.ok_or(ConnectError::NoConnectRequest)?;
    Ok((Sender::new(raw_sender), Receiver::new(raw_receiver)))
}