1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
//! Initial connection functions.

use bytes::Bytes;
use futures::{future::BoxFuture, Future, FutureExt, Sink, Stream, TryStreamExt};
use std::{
    convert::TryInto,
    error::Error,
    fmt, io,
    pin::Pin,
    task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, BufReader, BufWriter};
use tokio_util::codec::LengthDelimitedCodec;

use crate::{
    chmux::{ChMux, ChMuxError},
    codec,
    rch::base,
    RemoteSend,
};

/// Error occurred during establishing a connection over a physical transport.
#[cfg_attr(docsrs, doc(cfg(feature = "rch")))]
#[derive(Debug, Clone)]
pub enum ConnectError<TransportSinkError, TransportStreamError> {
    /// Establishing [chmux](crate::chmux) connection failed.
    ChMux(ChMuxError<TransportSinkError, TransportStreamError>),
    /// Opening initial [remote](crate::rch::base) channel failed.
    RemoteConnect(base::ConnectError),
}

impl<TransportSinkError, TransportStreamError> fmt::Display
    for ConnectError<TransportSinkError, TransportStreamError>
where
    TransportSinkError: fmt::Display,
    TransportStreamError: fmt::Display,
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            Self::ChMux(err) => write!(f, "chmux error: {err}"),
            Self::RemoteConnect(err) => write!(f, "channel connect failed: {err}"),
        }
    }
}

impl<TransportSinkError, TransportStreamError> Error for ConnectError<TransportSinkError, TransportStreamError>
where
    TransportSinkError: Error,
    TransportStreamError: Error,
{
}

impl<TransportSinkError, TransportStreamError> From<ChMuxError<TransportSinkError, TransportStreamError>>
    for ConnectError<TransportSinkError, TransportStreamError>
{
    fn from(err: ChMuxError<TransportSinkError, TransportStreamError>) -> Self {
        Self::ChMux(err)
    }
}

impl<TransportSinkError, TransportStreamError> From<base::ConnectError>
    for ConnectError<TransportSinkError, TransportStreamError>
{
    fn from(err: base::ConnectError) -> Self {
        Self::RemoteConnect(err)
    }
}

/// Methods for establishing a connection over a physical transport.
///
/// You must poll the returned [Connect] future or spawn it onto a task for the connection to work.
///
/// # Physical transport
///
/// All functionality in Remoc requires that a connection over a physical
/// transport is established.
/// The underlying transport can either be of packet type (implementing [Sink] and [Stream])
/// or a socket-like object (implementing [AsyncRead] and [AsyncWrite]).
/// In both cases it must be ordered and reliable.
/// That means that all packets must arrive in the order they have been sent
/// and no packets must be lost.
/// The maximum packet size can be limited, see [the configuration](crate::Cfg) for that.
///
/// [TCP] is an example of an underlying transport that is suitable.
/// But there are many more candidates, for example, [UNIX domain sockets],
/// [pipes between processes], [serial links], [Bluetooth L2CAP streams], etc.
///
/// The [connect functions](Connect) are used to establish a
/// [base channel connection](crate::rch::base) over a physical transport.
/// Then, additional channels can be opened by sending either the sender or receiver
/// half of them over the established base channel or another connected channel.
/// See the examples in the [remote channel module](crate::rch) for details.
///
/// [Sink]: futures::Sink
/// [Stream]: futures::Stream
/// [AsyncRead]: tokio::io::AsyncRead
/// [AsyncWrite]: tokio::io::AsyncWrite
/// [TCP]: https://docs.rs/tokio/1.12.0/tokio/net/struct.TcpStream.html
/// [UNIX domain sockets]: https://docs.rs/tokio/1.12.0/tokio/net/struct.UnixStream.html
/// [pipes between processes]: https://docs.rs/tokio/1.12.0/tokio/process/struct.Child.html
/// [serial links]: https://docs.rs/tokio-serial/5.4.1/tokio_serial/
/// [Bluetooth L2CAP streams]: https://docs.rs/bluer/0.10.4/bluer/l2cap/struct.Stream.html
///
/// # Convenience functions
///
/// Methods from the [ConnectExt](crate::ConnectExt) trait can be used on the return values
/// of all connect methods.
/// They streamline connection handling when a single value, such as a [RTC](crate::rtc) client,
/// should be exchanged over the connection and the flexibility of a base channel is not necessary.
///
/// # Example
///
/// In the following example the server listens on TCP port 9875 and the client connects to it.
/// Then both ends establish a Remoc connection using [Connect::io] over the TCP connection.
/// The connection dispatchers are spawned onto new tasks and the `client` and `server` functions
/// are called with the established [base channel](crate::rch::base).
///
/// ```
/// use std::net::Ipv4Addr;
/// use tokio::net::{TcpStream, TcpListener};
/// use remoc::prelude::*;
///
/// #[tokio::main]
/// async fn main() {
///     // For demonstration we run both client and server in
///     // the same process. In real life connect_client() and
///     // connect_server() would run on different machines.
///     futures::join!(connect_client(), connect_server());
/// }
///
/// // This would be run on the client.
/// async fn connect_client() {
///     // Wait for server to be ready.
///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
///
///     // Establish TCP connection.
///     let socket = TcpStream::connect((Ipv4Addr::LOCALHOST, 9875)).await.unwrap();
///     let (socket_rx, socket_tx) = socket.into_split();
///
///     // Establish Remoc connection over TCP.
///     let (conn, tx, rx) =
///         remoc::Connect::io(remoc::Cfg::default(), socket_rx, socket_tx).await.unwrap();
///     tokio::spawn(conn);
///
///     // Run client.
///     client(tx, rx).await;
/// }
///
/// // This would be run on the server.
/// async fn connect_server() {
///     // Listen for incoming TCP connection.
///     let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 9875)).await.unwrap();
///     let (socket, _) = listener.accept().await.unwrap();
///     let (socket_rx, socket_tx) = socket.into_split();
///
///     // Establish Remoc connection over TCP.
///     let (conn, tx, rx) =
///         remoc::Connect::io(remoc::Cfg::default(), socket_rx, socket_tx).await.unwrap();
///     tokio::spawn(conn);
///
///     // Run server.
///     server(tx, rx).await;
/// }
///
/// // This would be run on the client.
/// async fn client(mut tx: rch::base::Sender<u16>, mut rx: rch::base::Receiver<String>) {
///     tx.send(1).await.unwrap();
///     assert_eq!(rx.recv().await.unwrap(), Some("1".to_string()));
/// }
///
/// // This would be run on the server.
/// async fn server(mut tx: rch::base::Sender<String>, mut rx: rch::base::Receiver<u16>) {
///     while let Some(number) = rx.recv().await.unwrap() {
///         tx.send(number.to_string()).await.unwrap();
///     }
/// }
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "rch")))]
#[must_use = "You must poll or spawn the Connect future for the connection to work."]
pub struct Connect<'transport, TransportSinkError, TransportStreamError>(
    BoxFuture<'transport, Result<(), ChMuxError<TransportSinkError, TransportStreamError>>>,
);

impl<'transport, TransportSinkError, TransportStreamError>
    Connect<'transport, TransportSinkError, TransportStreamError>
{
    /// Establishes a connection over a framed transport (a [sink](Sink) and a [stream](Stream) of binary data) and
    /// returns a remote [sender](base::Sender) and [receiver](base::Receiver).
    ///
    /// This establishes a [chmux](crate::chmux) connection over the transport and opens a remote channel.
    ///
    /// You must poll the returned [Connect] future or spawn it for the connection to work.
    ///
    /// # Panics
    /// Panics if the chmux configuration is invalid.
    pub async fn framed<TransportSink, TransportStream, Tx, Rx, Codec>(
        cfg: crate::Cfg, transport_sink: TransportSink, transport_stream: TransportStream,
    ) -> Result<
        (
            Connect<'transport, TransportSinkError, TransportStreamError>,
            base::Sender<Tx, Codec>,
            base::Receiver<Rx, Codec>,
        ),
        ConnectError<TransportSinkError, TransportStreamError>,
    >
    where
        TransportSink: Sink<Bytes, Error = TransportSinkError> + Send + Sync + Unpin + 'transport,
        TransportSinkError: Error + Send + Sync + 'static,
        TransportStream: Stream<Item = Result<Bytes, TransportStreamError>> + Send + Sync + Unpin + 'transport,
        TransportStreamError: Error + Send + Sync + 'static,
        Tx: RemoteSend,
        Rx: RemoteSend,
        Codec: codec::Codec,
    {
        let (mux, client, mut listener) = ChMux::new(cfg, transport_sink, transport_stream).await?;
        let mut connection = Self(mux.run().boxed());

        tokio::select! {
            biased;
            Err(err) = &mut connection => Err(err.into()),
            result = base::connect(&client, &mut listener) => {
                match result {
                    Ok((tx, rx)) => Ok((connection, tx, rx)),
                    Err(err) => Err(err.into()),
                }
            }
        }
    }
}

impl<'transport> Connect<'transport, io::Error, io::Error> {
    /// Establishes a connection over an IO transport (an [AsyncRead] and [AsyncWrite]) and
    /// returns a remote [sender](base::Sender) and [receiver](base::Receiver).
    ///
    /// A [chmux](crate::chmux) connection is established over the transport and a remote channel is opened.
    /// This prepends a length header to each chmux packet for transportation over the unframed connection.
    ///
    /// This method performs no buffering of read and writes and thus may exhibit suboptimal
    /// performance if the underlying reader and writer are unbuffered.
    /// In this case use [io_buffered](Self::io_buffered) instead.
    ///
    /// You must poll the returned [Connect] future or spawn it for the connection to work.
    ///
    /// # Panics
    /// Panics if the chmux configuration is invalid.
    pub async fn io<Read, Write, Tx, Rx, Codec>(
        cfg: crate::Cfg, input: Read, output: Write,
    ) -> Result<
        (Connect<'transport, io::Error, io::Error>, base::Sender<Tx, Codec>, base::Receiver<Rx, Codec>),
        ConnectError<io::Error, io::Error>,
    >
    where
        Read: AsyncRead + Send + Sync + Unpin + 'transport,
        Write: AsyncWrite + Send + Sync + Unpin + 'transport,
        Tx: RemoteSend,
        Rx: RemoteSend,
        Codec: codec::Codec,
    {
        let max_recv_frame_length: usize = cfg.max_frame_length().try_into().unwrap();
        let transport_sink = LengthDelimitedCodec::builder()
            .little_endian()
            .length_field_length(4)
            .max_frame_length(u32::MAX as _)
            .new_write(output);
        let transport_stream = LengthDelimitedCodec::builder()
            .little_endian()
            .length_field_length(4)
            .max_frame_length(max_recv_frame_length)
            .new_read(input)
            .map_ok(|item| item.freeze());
        Self::framed(cfg, transport_sink, transport_stream).await
    }

    /// Establishes a buffered connection over an IO transport (an [AsyncRead] and [AsyncWrite]) and
    /// returns a remote [sender](base::Sender) and [receiver](base::Receiver).
    ///
    /// A [chmux](crate::chmux) connection is established over the transport and a remote channel is opened.
    /// This prepends a length header to each chmux packet for transportation over the unframed connection.
    ///
    /// This method performs internal buffering of reads and writes.
    ///
    /// You must poll the returned [Connect] future or spawn it for the connection to work.
    ///
    /// # Panics
    /// Panics if the chmux configuration is invalid.
    pub async fn io_buffered<Read, Write, Tx, Rx, Codec>(
        cfg: crate::Cfg, input: Read, output: Write, buffer: usize,
    ) -> Result<
        (Connect<'transport, io::Error, io::Error>, base::Sender<Tx, Codec>, base::Receiver<Rx, Codec>),
        ConnectError<io::Error, io::Error>,
    >
    where
        Read: AsyncRead + Send + Sync + Unpin + 'transport,
        Write: AsyncWrite + Send + Sync + Unpin + 'transport,
        Tx: RemoteSend,
        Rx: RemoteSend,
        Codec: codec::Codec,
    {
        let buf_input = BufReader::with_capacity(buffer, input);
        let buf_output = BufWriter::with_capacity(buffer, output);
        Self::io(cfg, buf_input, buf_output).await
    }
}

impl<'transport, TransportSinkError, TransportStreamError> Future
    for Connect<'transport, TransportSinkError, TransportStreamError>
{
    /// Result of connection after it has been terminated.
    type Output = Result<(), ChMuxError<TransportSinkError, TransportStreamError>>;

    /// This future runs the dispatcher for this connection.
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        Pin::into_inner(self).0.poll_unpin(cx)
    }
}