1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
//! # WebTransport Connection
//!
//! [`Connection`] provides an essential building block for managing WebTransport
//! connections. It allows you to initiate, accept, and control data *streams*, send and receive
//! *datagrams*, monitor connection status, and interact with various aspects of your WebTransport
//! communication.
//!
//! WebTransport exchanges data either via [*streams*](crate#streams) or [*datagrams*](crate#datagrams).
//!
//! ## Streams
//! WebTransport streams provide a lightweight, ordered byte-stream abstraction.
//!
//! There are two fundamental types of streams:
//! - *Unidirectional* streams carry data in a single direction, from the stream initiator to its peer.
//! - *Bidirectional* streams allow for data to be sent in both directions.
//!
//! Both server and client endpoints have the capability to create an arbitrary number of streams to
//! operate concurrently.
//!
//! Each stream can be independently cancelled by both side.
//!
//! ### Examples
//! #### Open a stream
//! ```no_run
//! # use anyhow::Result;
//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
//! use wtransport::Connection;
//!
//! // Open a bi-directional stream
//! let (mut send_stream, mut recv_stream) = connection.open_bi().await?.await?;
//!
//! // Send data on the stream
//! send_stream.write_all(b"Hello, wtransport!").await?;
//!
//! // Receive data from the stream
//! let mut buffer = vec![0; 1024];
//! let bytes_read = recv_stream.read(&mut buffer).await?;
//!
//! // Open an uni-directional stream (can only send data)
//! let mut send_stream = connection.open_uni().await?.await?;
//!
//! // Send data on the stream
//! send_stream.write_all(b"Hello, wtransport!").await?;
//! # Ok(())
//! # }
//! ```
//!
//! #### Accept a stream
//! ```no_run
//! # use anyhow::Result;
//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
//! use wtransport::Connection;
//!
//! // Await the peer opens a bi-directional stream
//! let (mut send_stream, mut recv_stream) = connection.accept_bi().await?;
//!
//! // Can send and receive data on peer's stream
//! send_stream.write_all(b"Hello, wtransport!").await?;
//! # let mut buffer = vec![0; 1024];
//! let bytes_read = recv_stream.read(&mut buffer).await?;
//!
//! // Await the peer opens an uni-directional stream (can only receive data)
//! let mut recv_stream = connection.accept_uni().await?;
//!
//! // Receive data on the stream
//! let bytes_read = recv_stream.read(&mut buffer).await?;
//! # Ok(())
//! # }
//! ```
//!
//! ## Datagrams
//! WebTransport datagrams are similar to UDP datagrams but come with an
//! added layer of security through *encryption* and *congestion control*.
//! Datagrams can arrive out of order or might not arrive at all, offering
//! flexibility in data exchange scenarios.
//!
//! Unlike streams, which operate as byte-stream abstractions, WebTransport
//! datagrams act more like messages.
//!
//! ### Examples
//! ```no_run
//! # use anyhow::Result;
//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
//! use wtransport::Connection;
//!
//! // Send datagram message
//! connection.send_datagram(b"Hello, wtransport!")?;
//!
//! // Receive a datagram message
//! let message = connection.receive_datagram().await?;
//! # Ok(())
//! # }
//! ```
use crate::datagram::Datagram;
use crate::driver::utils::varint_w2q;
use crate::driver::Driver;
use crate::error::ConnectionError;
use crate::error::ExportKeyingMaterialError;
use crate::error::SendDatagramError;
use crate::stream::OpeningBiStream;
use crate::stream::OpeningUniStream;
use crate::stream::RecvStream;
use crate::stream::SendStream;
use crate::tls::Certificate;
use crate::tls::CertificateChain;
use crate::tls::HandshakeData;
use crate::SessionId;
use crate::VarInt;
use std::net::SocketAddr;
use std::time::Duration;
/// A WebTransport session connection.
///
/// For more details, see the [module documentation](crate::connection).
#[derive(Debug)]
pub struct Connection {
quic_connection: quinn::Connection,
driver: Driver,
session_id: SessionId,
}
impl Connection {
pub(crate) fn new(
quic_connection: quinn::Connection,
driver: Driver,
session_id: SessionId,
) -> Self {
Self {
quic_connection,
driver,
session_id,
}
}
/// Asynchronously accepts a unidirectional stream.
///
/// This method is used to accept incoming unidirectional streams that have been initiated
/// by the remote peer.
/// It waits for the next unidirectional stream to be available, then wraps it in a
/// [`RecvStream`] that can be used to read data from the stream.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub async fn accept_uni(&self) -> Result<RecvStream, ConnectionError> {
let stream = self
.driver
.accept_uni(self.session_id)
.await
.map_err(|driver_error| {
ConnectionError::with_driver_error(driver_error, &self.quic_connection)
})?
.into_stream();
Ok(RecvStream::new(stream))
}
/// Asynchronously accepts a bidirectional stream.
///
/// This method is used to accept incoming bidirectional streams that have been initiated
/// by the remote peer.
/// It waits for the next bidirectional stream to be available, then wraps it in a
/// tuple containing a [`SendStream`] for sending data and a [`RecvStream`] for receiving
/// data on the stream.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
let stream = self
.driver
.accept_bi(self.session_id)
.await
.map_err(|driver_error| {
ConnectionError::with_driver_error(driver_error, &self.quic_connection)
})?
.into_stream();
Ok((SendStream::new(stream.0), RecvStream::new(stream.1)))
}
/// Asynchronously opens a new unidirectional stream.
///
/// This method is used to initiate the opening of a new unidirectional stream.
///
/// # Asynchronous Behavior
///
/// This method is asynchronous and involves two `await` points:
///
/// 1. The first `await` occurs during the initial phase of opening the stream, which may involve awaiting
/// the flow controller. This wait is necessary to ensure proper resource allocation and flow control.
/// It is safe to cancel this `await` point if needed.
///
/// 2. The second `await` is internal to the returned [`OpeningUniStream`] object when it is used to initialize
/// the WebTransport stream. Cancelling this latter future before it completes may result in the stream
/// being closed during initialization.
///
/// # Example
///
/// ```no_run
/// # use wtransport::Connection;
/// # use anyhow::Result;
/// # async fn run(connection: Connection) -> Result<()> {
/// let send_stream = connection.open_uni().await?.await?;
/// # Ok(())
/// # }
/// ```
pub async fn open_uni(&self) -> Result<OpeningUniStream, ConnectionError> {
self.driver
.open_uni(self.session_id)
.await
.map_err(|driver_error| {
ConnectionError::with_driver_error(driver_error, &self.quic_connection)
})
}
/// Asynchronously opens a new bidirectional stream.
///
/// This method is used to initiate the opening of a new bidirectional stream.
///
/// # Asynchronous Behavior
///
/// This method is asynchronous and involves two `await` points:
///
/// 1. The first `await` occurs during the initial phase of opening the stream, which may involve awaiting
/// the flow controller. This wait is necessary to ensure proper resource allocation and flow control.
/// It is safe to cancel this `await` point if needed.
///
/// 2. The second `await` is internal to the returned [`OpeningBiStream`] object when it is used to initialize
/// the WebTransport stream. Cancelling this latter future before it completes may result in the stream
/// being closed during initialization.
///
/// # Example
///
/// ```no_run
/// # use wtransport::Connection;
/// # use anyhow::Result;
/// # async fn run(connection: Connection) -> Result<()> {
/// let (send_stream, recv_stream) = connection.open_bi().await?.await?;
/// # Ok(())
/// # }
/// ```
pub async fn open_bi(&self) -> Result<OpeningBiStream, ConnectionError> {
self.driver
.open_bi(self.session_id)
.await
.map_err(|driver_error| {
ConnectionError::with_driver_error(driver_error, &self.quic_connection)
})
}
/// Asynchronously receives an application datagram from the remote peer.
///
/// This method is used to receive an application datagram sent by the remote
/// peer over the connection.
/// It waits for a datagram to become available and returns the received [`Datagram`].
///
/// # Example
///
/// ```no_run
/// # use wtransport::Connection;
/// # use anyhow::Result;
/// # async fn run(connection: Connection) -> Result<()> {
/// let datagram = connection.receive_datagram().await?;
/// # Ok(())
/// # }
/// ```
pub async fn receive_datagram(&self) -> Result<Datagram, ConnectionError> {
self.driver
.receive_datagram(self.session_id)
.await
.map_err(|driver_error| {
ConnectionError::with_driver_error(driver_error, &self.quic_connection)
})
}
/// Sends an application datagram to the remote peer.
///
/// This method is used to send an application datagram to the remote peer
/// over the connection.
/// The datagram payload is provided as a reference to a slice of bytes.
///
/// # Example
///
/// ```no_run
/// # use wtransport::Connection;
/// # use anyhow::Result;
/// # async fn run(connection: Connection) -> Result<()> {
/// connection.send_datagram(b"Hello, wtransport!")?;
/// # Ok(())
/// # }
/// ```
pub fn send_datagram<D>(&self, payload: D) -> Result<(), SendDatagramError>
where
D: AsRef<[u8]>,
{
self.driver.send_datagram(self.session_id, payload.as_ref())
}
/// Closes the connection immediately.
pub fn close(&self, error_code: VarInt, reason: &[u8]) {
self.quic_connection.close(varint_w2q(error_code), reason);
}
/// Waits for the connection to be closed for any reason.
pub async fn closed(&self) -> ConnectionError {
self.quic_connection.closed().await.into()
}
/// Returns the WebTransport session identifier.
#[inline(always)]
pub fn session_id(&self) -> SessionId {
self.session_id
}
/// Returns the peer's UDP address.
///
/// **Note**: as QUIC supports migration, remote address may change
/// during connection. Furthermore, when IPv6 support is enabled, IPv4
/// addresses may be mapped to IPv6.
#[inline(always)]
pub fn remote_address(&self) -> SocketAddr {
self.quic_connection.remote_address()
}
/// A stable identifier for this connection.
///
/// Peer addresses and connection IDs can change, but this value will remain
/// fixed for the lifetime of the connection.
#[inline(always)]
pub fn stable_id(&self) -> usize {
self.quic_connection.stable_id()
}
/// Computes the maximum size of datagrams that may be passed to
/// [`send_datagram`](Self::send_datagram).
///
/// Returns `None` if datagrams are unsupported by the peer or disabled locally.
///
/// This may change over the lifetime of a connection according to variation in the path MTU
/// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
/// limit is large this is guaranteed to be a little over a kilobyte at minimum.
///
/// Not necessarily the maximum size of received datagrams.
#[inline(always)]
pub fn max_datagram_size(&self) -> Option<usize> {
self.quic_connection
.max_datagram_size()
.map(|quic_max_size| quic_max_size - Datagram::header_size(self.session_id))
}
/// Current best estimate of this connection's latency (round-trip-time).
#[inline(always)]
pub fn rtt(&self) -> Duration {
self.quic_connection.rtt()
}
/// Derive keying material from this connection's TLS session secrets.
///
/// When both peers call this method with the same `label` and `context`
/// arguments and `output` buffers of equal length, they will get the
/// same sequence of bytes in `output`. These bytes are cryptographically
/// strong and pseudorandom, and are suitable for use as keying material.
///
/// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
pub fn export_keying_material(
&self,
output: &mut [u8],
label: &[u8],
context: &[u8],
) -> Result<(), ExportKeyingMaterialError> {
self.quic_connection
.export_keying_material(output, label, context)
.map_err(|_: quinn::crypto::ExportKeyingMaterialError| ExportKeyingMaterialError)
}
/// Returns the peer's identity as a certificate chain if available.
///
/// This function returns an `Option` containing a [`CertificateChain`]. If the peer's identity
/// is available, it is converted into a `CertificateChain` and returned. If the peer's identity
/// is not available, `None` is returned.
pub fn peer_identity(&self) -> Option<CertificateChain> {
self.quic_connection.peer_identity().map(|any| {
any.downcast::<Vec<rustls::Certificate>>()
.expect("rustls certificate vector")
.into_iter()
.map(|c| Certificate::from_der(c.0).expect("valid certificate"))
.collect()
})
}
/// Retrieves handshake data associated with the connection.
pub fn handshake_data(&self) -> HandshakeData {
let hd = self
.quic_connection
.handshake_data()
.expect("fully established connection")
.downcast::<quinn::crypto::rustls::HandshakeData>()
.expect("valid downcast");
HandshakeData {
alpn: hd.protocol,
server_name: hd.server_name,
}
}
}