1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
//! Initial connection functions.
use bytes::Bytes;
use futures::{future::BoxFuture, Future, FutureExt, Sink, Stream, TryStreamExt};
use std::{
convert::TryInto,
error::Error,
fmt, io,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, BufReader, BufWriter};
use tokio_util::codec::LengthDelimitedCodec;
use crate::{
chmux::{ChMux, ChMuxError},
codec,
rch::base,
RemoteSend,
};
/// Error occurred during establishing a connection over a physical transport.
#[cfg_attr(docsrs, doc(cfg(feature = "rch")))]
#[derive(Debug, Clone)]
pub enum ConnectError<TransportSinkError, TransportStreamError> {
/// Establishing [chmux](crate::chmux) connection failed.
ChMux(ChMuxError<TransportSinkError, TransportStreamError>),
/// Opening initial [remote](crate::rch::base) channel failed.
RemoteConnect(base::ConnectError),
}
impl<TransportSinkError, TransportStreamError> fmt::Display
for ConnectError<TransportSinkError, TransportStreamError>
where
TransportSinkError: fmt::Display,
TransportStreamError: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::ChMux(err) => write!(f, "chmux error: {err}"),
Self::RemoteConnect(err) => write!(f, "channel connect failed: {err}"),
}
}
}
impl<TransportSinkError, TransportStreamError> Error for ConnectError<TransportSinkError, TransportStreamError>
where
TransportSinkError: Error,
TransportStreamError: Error,
{
}
impl<TransportSinkError, TransportStreamError> From<ChMuxError<TransportSinkError, TransportStreamError>>
for ConnectError<TransportSinkError, TransportStreamError>
{
fn from(err: ChMuxError<TransportSinkError, TransportStreamError>) -> Self {
Self::ChMux(err)
}
}
impl<TransportSinkError, TransportStreamError> From<base::ConnectError>
for ConnectError<TransportSinkError, TransportStreamError>
{
fn from(err: base::ConnectError) -> Self {
Self::RemoteConnect(err)
}
}
/// Methods for establishing a connection over a physical transport.
///
/// You must poll the returned [Connect] future or spawn it onto a task for the connection to work.
///
/// # Physical transport
///
/// All functionality in Remoc requires that a connection over a physical
/// transport is established.
/// The underlying transport can either be of packet type (implementing [Sink] and [Stream])
/// or a socket-like object (implementing [AsyncRead] and [AsyncWrite]).
/// In both cases it must be ordered and reliable.
/// That means that all packets must arrive in the order they have been sent
/// and no packets must be lost.
/// The maximum packet size can be limited, see [the configuration](crate::Cfg) for that.
///
/// [TCP] is an example of an underlying transport that is suitable.
/// But there are many more candidates, for example, [UNIX domain sockets],
/// [pipes between processes], [serial links], [Bluetooth L2CAP streams], etc.
///
/// The [connect functions](Connect) are used to establish a
/// [base channel connection](crate::rch::base) over a physical transport.
/// Then, additional channels can be opened by sending either the sender or receiver
/// half of them over the established base channel or another connected channel.
/// See the examples in the [remote channel module](crate::rch) for details.
///
/// [Sink]: futures::Sink
/// [Stream]: futures::Stream
/// [AsyncRead]: tokio::io::AsyncRead
/// [AsyncWrite]: tokio::io::AsyncWrite
/// [TCP]: https://docs.rs/tokio/1.12.0/tokio/net/struct.TcpStream.html
/// [UNIX domain sockets]: https://docs.rs/tokio/1.12.0/tokio/net/struct.UnixStream.html
/// [pipes between processes]: https://docs.rs/tokio/1.12.0/tokio/process/struct.Child.html
/// [serial links]: https://docs.rs/tokio-serial/5.4.1/tokio_serial/
/// [Bluetooth L2CAP streams]: https://docs.rs/bluer/0.10.4/bluer/l2cap/struct.Stream.html
///
/// # Convenience functions
///
/// Methods from the [ConnectExt](crate::ConnectExt) trait can be used on the return values
/// of all connect methods.
/// They streamline connection handling when a single value, such as a [RTC](crate::rtc) client,
/// should be exchanged over the connection and the flexibility of a base channel is not necessary.
///
/// # Example
///
/// In the following example the server listens on TCP port 9875 and the client connects to it.
/// Then both ends establish a Remoc connection using [Connect::io] over the TCP connection.
/// The connection dispatchers are spawned onto new tasks and the `client` and `server` functions
/// are called with the established [base channel](crate::rch::base).
///
/// ```
/// use std::net::Ipv4Addr;
/// use tokio::net::{TcpStream, TcpListener};
/// use remoc::prelude::*;
///
/// #[tokio::main]
/// async fn main() {
/// // For demonstration we run both client and server in
/// // the same process. In real life connect_client() and
/// // connect_server() would run on different machines.
/// futures::join!(connect_client(), connect_server());
/// }
///
/// // This would be run on the client.
/// async fn connect_client() {
/// // Wait for server to be ready.
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
///
/// // Establish TCP connection.
/// let socket = TcpStream::connect((Ipv4Addr::LOCALHOST, 9875)).await.unwrap();
/// let (socket_rx, socket_tx) = socket.into_split();
///
/// // Establish Remoc connection over TCP.
/// let (conn, tx, rx) =
/// remoc::Connect::io(remoc::Cfg::default(), socket_rx, socket_tx).await.unwrap();
/// tokio::spawn(conn);
///
/// // Run client.
/// client(tx, rx).await;
/// }
///
/// // This would be run on the server.
/// async fn connect_server() {
/// // Listen for incoming TCP connection.
/// let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 9875)).await.unwrap();
/// let (socket, _) = listener.accept().await.unwrap();
/// let (socket_rx, socket_tx) = socket.into_split();
///
/// // Establish Remoc connection over TCP.
/// let (conn, tx, rx) =
/// remoc::Connect::io(remoc::Cfg::default(), socket_rx, socket_tx).await.unwrap();
/// tokio::spawn(conn);
///
/// // Run server.
/// server(tx, rx).await;
/// }
///
/// // This would be run on the client.
/// async fn client(mut tx: rch::base::Sender<u16>, mut rx: rch::base::Receiver<String>) {
/// tx.send(1).await.unwrap();
/// assert_eq!(rx.recv().await.unwrap(), Some("1".to_string()));
/// }
///
/// // This would be run on the server.
/// async fn server(mut tx: rch::base::Sender<String>, mut rx: rch::base::Receiver<u16>) {
/// while let Some(number) = rx.recv().await.unwrap() {
/// tx.send(number.to_string()).await.unwrap();
/// }
/// }
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "rch")))]
#[must_use = "You must poll or spawn the Connect future for the connection to work."]
pub struct Connect<'transport, TransportSinkError, TransportStreamError>(
BoxFuture<'transport, Result<(), ChMuxError<TransportSinkError, TransportStreamError>>>,
);
impl<'transport, TransportSinkError, TransportStreamError>
Connect<'transport, TransportSinkError, TransportStreamError>
{
/// Establishes a connection over a framed transport (a [sink](Sink) and a [stream](Stream) of binary data) and
/// returns a remote [sender](base::Sender) and [receiver](base::Receiver).
///
/// This establishes a [chmux](crate::chmux) connection over the transport and opens a remote channel.
///
/// You must poll the returned [Connect] future or spawn it for the connection to work.
///
/// # Panics
/// Panics if the chmux configuration is invalid.
pub async fn framed<TransportSink, TransportStream, Tx, Rx, Codec>(
cfg: crate::Cfg, transport_sink: TransportSink, transport_stream: TransportStream,
) -> Result<
(
Connect<'transport, TransportSinkError, TransportStreamError>,
base::Sender<Tx, Codec>,
base::Receiver<Rx, Codec>,
),
ConnectError<TransportSinkError, TransportStreamError>,
>
where
TransportSink: Sink<Bytes, Error = TransportSinkError> + Send + Sync + Unpin + 'transport,
TransportSinkError: Error + Send + Sync + 'static,
TransportStream: Stream<Item = Result<Bytes, TransportStreamError>> + Send + Sync + Unpin + 'transport,
TransportStreamError: Error + Send + Sync + 'static,
Tx: RemoteSend,
Rx: RemoteSend,
Codec: codec::Codec,
{
let (mux, client, mut listener) = ChMux::new(cfg, transport_sink, transport_stream).await?;
let mut connection = Self(mux.run().boxed());
tokio::select! {
biased;
Err(err) = &mut connection => Err(err.into()),
result = base::connect(&client, &mut listener) => {
match result {
Ok((tx, rx)) => Ok((connection, tx, rx)),
Err(err) => Err(err.into()),
}
}
}
}
}
impl<'transport> Connect<'transport, io::Error, io::Error> {
/// Establishes a connection over an IO transport (an [AsyncRead] and [AsyncWrite]) and
/// returns a remote [sender](base::Sender) and [receiver](base::Receiver).
///
/// A [chmux](crate::chmux) connection is established over the transport and a remote channel is opened.
/// This prepends a length header to each chmux packet for transportation over the unframed connection.
///
/// This method performs no buffering of read and writes and thus may exhibit suboptimal
/// performance if the underlying reader and writer are unbuffered.
/// In this case use [io_buffered](Self::io_buffered) instead.
///
/// You must poll the returned [Connect] future or spawn it for the connection to work.
///
/// # Panics
/// Panics if the chmux configuration is invalid.
pub async fn io<Read, Write, Tx, Rx, Codec>(
cfg: crate::Cfg, input: Read, output: Write,
) -> Result<
(Connect<'transport, io::Error, io::Error>, base::Sender<Tx, Codec>, base::Receiver<Rx, Codec>),
ConnectError<io::Error, io::Error>,
>
where
Read: AsyncRead + Send + Sync + Unpin + 'transport,
Write: AsyncWrite + Send + Sync + Unpin + 'transport,
Tx: RemoteSend,
Rx: RemoteSend,
Codec: codec::Codec,
{
let max_recv_frame_length: usize = cfg.max_frame_length().try_into().unwrap();
let transport_sink = LengthDelimitedCodec::builder()
.little_endian()
.length_field_length(4)
.max_frame_length(u32::MAX as _)
.new_write(output);
let transport_stream = LengthDelimitedCodec::builder()
.little_endian()
.length_field_length(4)
.max_frame_length(max_recv_frame_length)
.new_read(input)
.map_ok(|item| item.freeze());
Self::framed(cfg, transport_sink, transport_stream).await
}
/// Establishes a buffered connection over an IO transport (an [AsyncRead] and [AsyncWrite]) and
/// returns a remote [sender](base::Sender) and [receiver](base::Receiver).
///
/// A [chmux](crate::chmux) connection is established over the transport and a remote channel is opened.
/// This prepends a length header to each chmux packet for transportation over the unframed connection.
///
/// This method performs internal buffering of reads and writes.
///
/// You must poll the returned [Connect] future or spawn it for the connection to work.
///
/// # Panics
/// Panics if the chmux configuration is invalid.
pub async fn io_buffered<Read, Write, Tx, Rx, Codec>(
cfg: crate::Cfg, input: Read, output: Write, buffer: usize,
) -> Result<
(Connect<'transport, io::Error, io::Error>, base::Sender<Tx, Codec>, base::Receiver<Rx, Codec>),
ConnectError<io::Error, io::Error>,
>
where
Read: AsyncRead + Send + Sync + Unpin + 'transport,
Write: AsyncWrite + Send + Sync + Unpin + 'transport,
Tx: RemoteSend,
Rx: RemoteSend,
Codec: codec::Codec,
{
let buf_input = BufReader::with_capacity(buffer, input);
let buf_output = BufWriter::with_capacity(buffer, output);
Self::io(cfg, buf_input, buf_output).await
}
}
impl<'transport, TransportSinkError, TransportStreamError> Future
for Connect<'transport, TransportSinkError, TransportStreamError>
{
/// Result of connection after it has been terminated.
type Output = Result<(), ChMuxError<TransportSinkError, TransportStreamError>>;
/// This future runs the dispatcher for this connection.
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::into_inner(self).0.poll_unpin(cx)
}
}