1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
//! # WebTransport Connection
//!
//! [`Connection`] provides an essential building block for managing WebTransport
//! connections. It allows you to initiate, accept, and control data *streams*, send and receive
//! *datagrams*, monitor connection status, and interact with various aspects of your WebTransport
//! communication.
//!
//! WebTransport exchanges data either via [*streams*](crate#streams) or [*datagrams*](crate#datagrams).
//!
//! ## Streams
//! WebTransport streams provide a lightweight, ordered byte-stream abstraction.
//!
//! There are two fundamental types of streams:
//!  - *Unidirectional* streams carry data in a single direction, from the stream initiator to its peer.
//!  - *Bidirectional* streams allow for data to be sent in both directions.
//!
//! Both server and client endpoints have the capability to create an arbitrary number of streams to
//! operate concurrently.
//!
//! Each stream can be independently cancelled by both side.
//!
//! ### Examples
//! #### Open a stream
//! ```no_run
//! # use anyhow::Result;
//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
//! use wtransport::Connection;
//!
//! // Open a bi-directional stream
//! let (mut send_stream, mut recv_stream) = connection.open_bi().await?.await?;
//!
//! // Send data on the stream
//! send_stream.write_all(b"Hello, wtransport!").await?;
//!
//! // Receive data from the stream
//! let mut buffer = vec![0; 1024];
//! let bytes_read = recv_stream.read(&mut buffer).await?;
//!
//! // Open an uni-directional stream (can only send data)
//! let mut send_stream = connection.open_uni().await?.await?;
//!
//! // Send data on the stream
//! send_stream.write_all(b"Hello, wtransport!").await?;
//! # Ok(())
//! # }
//! ```
//!
//! #### Accept a stream
//! ```no_run
//! # use anyhow::Result;
//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
//! use wtransport::Connection;
//!
//! // Await the peer opens a bi-directional stream
//! let (mut send_stream, mut recv_stream) = connection.accept_bi().await?;
//!
//! // Can send and receive data on peer's stream
//! send_stream.write_all(b"Hello, wtransport!").await?;
//! # let mut buffer = vec![0; 1024];
//! let bytes_read = recv_stream.read(&mut buffer).await?;
//!
//! // Await the peer opens an uni-directional stream (can only receive data)
//! let mut recv_stream = connection.accept_uni().await?;
//!
//! // Receive data on the stream
//! let bytes_read = recv_stream.read(&mut buffer).await?;
//! # Ok(())
//! # }
//! ```
//!
//! ## Datagrams
//! WebTransport datagrams are similar to UDP datagrams but come with an
//! added layer of security through *encryption* and *congestion control*.
//! Datagrams can arrive out of order or might not arrive at all, offering
//! flexibility in data exchange scenarios.
//!
//! Unlike streams, which operate as byte-stream abstractions, WebTransport
//! datagrams act more like messages.
//!
//! ### Examples
//! ```no_run
//! # use anyhow::Result;
//! # async fn foo(connection: wtransport::Connection) -> Result<()> {
//! use wtransport::Connection;
//!
//! // Send datagram message
//! connection.send_datagram(b"Hello, wtransport!")?;
//!
//! // Receive a datagram message
//! let message = connection.receive_datagram().await?;
//! # Ok(())
//! # }
//! ```

use crate::datagram::Datagram;
use crate::driver::utils::varint_w2q;
use crate::driver::Driver;
use crate::error::ConnectionError;
use crate::error::ExportKeyingMaterialError;
use crate::error::SendDatagramError;
use crate::stream::OpeningBiStream;
use crate::stream::OpeningUniStream;
use crate::stream::RecvStream;
use crate::stream::SendStream;
use crate::tls::Certificate;
use crate::tls::CertificateChain;
use crate::tls::HandshakeData;
use crate::SessionId;
use crate::VarInt;
use std::net::SocketAddr;
use std::time::Duration;

/// A WebTransport session connection.
///
/// For more details, see the [module documentation](crate::connection).
#[derive(Debug)]
pub struct Connection {
    quic_connection: quinn::Connection,
    driver: Driver,
    session_id: SessionId,
}

impl Connection {
    pub(crate) fn new(
        quic_connection: quinn::Connection,
        driver: Driver,
        session_id: SessionId,
    ) -> Self {
        Self {
            quic_connection,
            driver,
            session_id,
        }
    }

    /// Asynchronously accepts a unidirectional stream.
    ///
    /// This method is used to accept incoming unidirectional streams that have been initiated
    /// by the remote peer.
    /// It waits for the next unidirectional stream to be available, then wraps it in a
    /// [`RecvStream`] that can be used to read data from the stream.
    ///
    /// # Cancel safety
    ///
    /// This method is cancel safe.
    pub async fn accept_uni(&self) -> Result<RecvStream, ConnectionError> {
        let stream = self
            .driver
            .accept_uni(self.session_id)
            .await
            .map_err(|driver_error| {
                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
            })?
            .into_stream();

        Ok(RecvStream::new(stream))
    }

    /// Asynchronously accepts a bidirectional stream.
    ///
    /// This method is used to accept incoming bidirectional streams that have been initiated
    /// by the remote peer.
    /// It waits for the next bidirectional stream to be available, then wraps it in a
    /// tuple containing a [`SendStream`] for sending data and a [`RecvStream`] for receiving
    /// data on the stream.
    ///
    /// # Cancel safety
    ///
    /// This method is cancel safe.
    pub async fn accept_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
        let stream = self
            .driver
            .accept_bi(self.session_id)
            .await
            .map_err(|driver_error| {
                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
            })?
            .into_stream();

        Ok((SendStream::new(stream.0), RecvStream::new(stream.1)))
    }

    /// Asynchronously opens a new unidirectional stream.
    ///
    /// This method is used to initiate the opening of a new unidirectional stream.
    ///
    /// # Asynchronous Behavior
    ///
    /// This method is asynchronous and involves two `await` points:
    ///
    /// 1. The first `await` occurs during the initial phase of opening the stream, which may involve awaiting
    ///    the flow controller. This wait is necessary to ensure proper resource allocation and flow control.
    ///    It is safe to cancel this `await` point if needed.
    ///
    /// 2. The second `await` is internal to the returned [`OpeningUniStream`] object when it is used to initialize
    ///    the WebTransport stream. Cancelling this latter future before it completes may result in the stream
    ///    being closed during initialization.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use wtransport::Connection;
    /// # use anyhow::Result;
    /// # async fn run(connection: Connection) -> Result<()> {
    /// let send_stream = connection.open_uni().await?.await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn open_uni(&self) -> Result<OpeningUniStream, ConnectionError> {
        self.driver
            .open_uni(self.session_id)
            .await
            .map_err(|driver_error| {
                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
            })
    }

    /// Asynchronously opens a new bidirectional stream.
    ///
    /// This method is used to initiate the opening of a new bidirectional stream.
    ///
    /// # Asynchronous Behavior
    ///
    /// This method is asynchronous and involves two `await` points:
    ///
    /// 1. The first `await` occurs during the initial phase of opening the stream, which may involve awaiting
    ///    the flow controller. This wait is necessary to ensure proper resource allocation and flow control.
    ///    It is safe to cancel this `await` point if needed.
    ///
    /// 2. The second `await` is internal to the returned [`OpeningBiStream`] object when it is used to initialize
    ///    the WebTransport stream. Cancelling this latter future before it completes may result in the stream
    ///    being closed during initialization.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use wtransport::Connection;
    /// # use anyhow::Result;
    /// # async fn run(connection: Connection) -> Result<()> {
    /// let (send_stream, recv_stream) = connection.open_bi().await?.await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn open_bi(&self) -> Result<OpeningBiStream, ConnectionError> {
        self.driver
            .open_bi(self.session_id)
            .await
            .map_err(|driver_error| {
                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
            })
    }

    /// Asynchronously receives an application datagram from the remote peer.
    ///
    /// This method is used to receive an application datagram sent by the remote
    /// peer over the connection.
    /// It waits for a datagram to become available and returns the received [`Datagram`].
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use wtransport::Connection;
    /// # use anyhow::Result;
    /// # async fn run(connection: Connection) -> Result<()> {
    /// let datagram = connection.receive_datagram().await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn receive_datagram(&self) -> Result<Datagram, ConnectionError> {
        self.driver
            .receive_datagram(self.session_id)
            .await
            .map_err(|driver_error| {
                ConnectionError::with_driver_error(driver_error, &self.quic_connection)
            })
    }

    /// Sends an application datagram to the remote peer.
    ///
    /// This method is used to send an application datagram to the remote peer
    /// over the connection.
    /// The datagram payload is provided as a reference to a slice of bytes.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use wtransport::Connection;
    /// # use anyhow::Result;
    /// # async fn run(connection: Connection) -> Result<()> {
    /// connection.send_datagram(b"Hello, wtransport!")?;
    /// # Ok(())
    /// # }
    /// ```
    pub fn send_datagram<D>(&self, payload: D) -> Result<(), SendDatagramError>
    where
        D: AsRef<[u8]>,
    {
        self.driver.send_datagram(self.session_id, payload.as_ref())
    }

    /// Closes the connection immediately.
    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
        self.quic_connection.close(varint_w2q(error_code), reason);
    }

    /// Waits for the connection to be closed for any reason.
    pub async fn closed(&self) -> ConnectionError {
        self.quic_connection.closed().await.into()
    }

    /// Returns the WebTransport session identifier.
    #[inline(always)]
    pub fn session_id(&self) -> SessionId {
        self.session_id
    }

    /// Returns the peer's UDP address.
    ///
    /// **Note**: as QUIC supports migration, remote address may change
    /// during connection. Furthermore, when IPv6 support is enabled, IPv4
    /// addresses may be mapped to IPv6.
    #[inline(always)]
    pub fn remote_address(&self) -> SocketAddr {
        self.quic_connection.remote_address()
    }

    /// A stable identifier for this connection.
    ///
    /// Peer addresses and connection IDs can change, but this value will remain
    /// fixed for the lifetime of the connection.
    #[inline(always)]
    pub fn stable_id(&self) -> usize {
        self.quic_connection.stable_id()
    }

    /// Computes the maximum size of datagrams that may be passed to
    /// [`send_datagram`](Self::send_datagram).
    ///
    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
    ///
    /// This may change over the lifetime of a connection according to variation in the path MTU
    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
    ///
    /// Not necessarily the maximum size of received datagrams.
    #[inline(always)]
    pub fn max_datagram_size(&self) -> Option<usize> {
        self.quic_connection
            .max_datagram_size()
            .map(|quic_max_size| quic_max_size - Datagram::header_size(self.session_id))
    }

    /// Current best estimate of this connection's latency (round-trip-time).
    #[inline(always)]
    pub fn rtt(&self) -> Duration {
        self.quic_connection.rtt()
    }

    /// Derive keying material from this connection's TLS session secrets.
    ///
    /// When both peers call this method with the same `label` and `context`
    /// arguments and `output` buffers of equal length, they will get the
    /// same sequence of bytes in `output`. These bytes are cryptographically
    /// strong and pseudorandom, and are suitable for use as keying material.
    ///
    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
    pub fn export_keying_material(
        &self,
        output: &mut [u8],
        label: &[u8],
        context: &[u8],
    ) -> Result<(), ExportKeyingMaterialError> {
        self.quic_connection
            .export_keying_material(output, label, context)
            .map_err(|_: quinn::crypto::ExportKeyingMaterialError| ExportKeyingMaterialError)
    }

    /// Returns the peer's identity as a certificate chain if available.
    ///
    /// This function returns an `Option` containing a [`CertificateChain`]. If the peer's identity
    /// is available, it is converted into a `CertificateChain` and returned. If the peer's identity
    /// is not available, `None` is returned.
    pub fn peer_identity(&self) -> Option<CertificateChain> {
        self.quic_connection.peer_identity().map(|any| {
            any.downcast::<Vec<rustls::Certificate>>()
                .expect("rustls certificate vector")
                .into_iter()
                .map(|c| Certificate::from_der(c.0).expect("valid certificate"))
                .collect()
        })
    }

    /// Retrieves handshake data associated with the connection.
    pub fn handshake_data(&self) -> HandshakeData {
        let hd = self
            .quic_connection
            .handshake_data()
            .expect("fully established connection")
            .downcast::<quinn::crypto::rustls::HandshakeData>()
            .expect("valid downcast");

        HandshakeData {
            alpn: hd.protocol,
            server_name: hd.server_name,
        }
    }
}