noq 0.18.0

General purpose QUIC transport protocol implementation
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, ready};
use std::time::Duration;

use proto::{
    ClosePathError, ClosedPath, PathError, PathEvent, PathId, PathStats, PathStatus,
    SetPathStatusError, TransportErrorCode,
};
use tokio::sync::watch;
use tokio_stream::{Stream, wrappers::WatchStream};

use crate::connection::ConnectionRef;
use crate::{Runtime, WeakConnectionHandle};

/// Future produced by [`crate::Connection::open_path`]
pub struct OpenPath(OpenPathInner);

enum OpenPathInner {
    /// Opening a path in underway
    ///
    /// This might fail later on.
    Ongoing {
        opened: WatchStream<Result<(), PathError>>,
        path_id: PathId,
        conn: ConnectionRef,
    },
    /// Opening a path failed immediately
    Rejected {
        /// The error that occurred
        err: PathError,
    },
    /// The path is already open
    Ready {
        path_id: PathId,
        conn: ConnectionRef,
    },
}

impl OpenPath {
    pub(crate) fn new(
        path_id: PathId,
        opened: watch::Receiver<Result<(), PathError>>,
        conn: ConnectionRef,
    ) -> Self {
        Self(OpenPathInner::Ongoing {
            opened: WatchStream::from_changes(opened),
            path_id,
            conn,
        })
    }

    pub(crate) fn ready(path_id: PathId, conn: ConnectionRef) -> Self {
        Self(OpenPathInner::Ready { path_id, conn })
    }

    pub(crate) fn rejected(err: PathError) -> Self {
        Self(OpenPathInner::Rejected { err })
    }

    /// Returns the path ID of the new path being opened.
    ///
    /// If an error occurred before a path ID was allocated, `None` is returned.  In this
    /// case the future is ready and polling it will immediately yield the error.
    ///
    /// The returned value remains the same for the entire lifetime of this future.
    pub fn path_id(&self) -> Option<PathId> {
        match self.0 {
            OpenPathInner::Ongoing { path_id, .. } => Some(path_id),
            OpenPathInner::Rejected { .. } => None,
            OpenPathInner::Ready { path_id, .. } => Some(path_id),
        }
    }
}

impl Future for OpenPath {
    type Output = Result<Path, PathError>;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.get_mut().0 {
            OpenPathInner::Ongoing {
                ref mut opened,
                path_id,
                ref mut conn,
            } => match ready!(Pin::new(opened).poll_next(ctx)) {
                Some(value) => {
                    Poll::Ready(value.map(|_| Path::new_unchecked(conn.clone(), path_id)))
                }
                None => {
                    // This only happens if receiving a notification change failed, this means the
                    // sender was dropped. This generally should not happen so we use a transient
                    // error
                    Poll::Ready(Err(PathError::ValidationFailed))
                }
            },
            OpenPathInner::Ready {
                path_id,
                ref mut conn,
            } => Poll::Ready(Ok(Path::new_unchecked(conn.clone(), path_id))),
            OpenPathInner::Rejected { err } => Poll::Ready(Err(err)),
        }
    }
}

/// An open network transmission within a multipath-enabled connection.
///
/// As long as a [`Path`] or [`WeakPathHandle`] is alive, it is ensured that the
/// [`PathStats`] for this path are not dropped even after the path is abandoned.
///
/// [`WeakPathHandle`]: crate::path::WeakPathHandle
#[derive(Debug)]
pub struct Path {
    id: PathId,
    conn: ConnectionRef,
}

impl Clone for Path {
    fn clone(&self) -> Self {
        self.conn
            .lock_without_waking("Path::clone")
            .increment_path_refs(self.id);
        Self {
            id: self.id,
            conn: self.conn.clone(),
        }
    }
}

impl Drop for Path {
    fn drop(&mut self) {
        let mut state = self.conn.lock_without_waking("Path::drop");
        state.decrement_path_refs(self.id);
    }
}

impl Path {
    /// Returns a [`Path`] for a path id, after checking that the path is not closed.
    pub(crate) fn new(conn: &ConnectionRef, id: PathId) -> Option<Self> {
        {
            let mut state = conn.lock_without_waking("Path::new");
            // TODO(flub): Using this to know if the path still exists is... hacky.
            state.inner.path_status(id).ok()?;
            state.increment_path_refs(id);
        }
        Some(Self {
            id,
            conn: conn.clone(),
        })
    }

    /// Returns a [`Path`] for a path id without checking if the path exists or is closed.
    fn new_unchecked(conn: ConnectionRef, id: PathId) -> Self {
        conn.lock_without_waking("Path::new_unchecked")
            .increment_path_refs(id);
        Self { id, conn }
    }

    /// Returns a [`WeakPathHandle`] for this path.
    ///
    /// Holding a [`WeakPathHandle`] does not keep a connection alive, but ensures that the
    /// path's stats are not dropped until the underlying connection is dropped, even if the
    /// path is abandoned.
    pub fn weak_handle(&self) -> WeakPathHandle {
        self.conn
            .lock_without_waking("Path::weak_handle")
            .increment_path_refs(self.id);
        WeakPathHandle {
            id: self.id,
            conn: self.conn.weak_handle(),
        }
    }

    /// The [`PathId`] of this path.
    pub fn id(&self) -> PathId {
        self.id
    }

    /// The current local [`PathStatus`] of this path.
    pub fn status(&self) -> Result<PathStatus, ClosedPath> {
        self.conn
            .lock_without_waking("path status")
            .inner
            .path_status(self.id)
    }

    /// Sets the [`PathStatus`] of this path.
    pub fn set_status(&self, status: PathStatus) -> Result<(), SetPathStatusError> {
        self.conn
            .lock_and_wake("set path status")
            .inner
            .set_path_status(self.id, status)?;
        Ok(())
    }

    /// Returns the [`PathStats`] for this path.
    pub fn stats(&self) -> PathStats {
        // The `expect` is safe:
        // - `Path` can only be created for non-closed paths.
        // - `Path` and its clones or `WeakPathHandle`s all increment the connection state's `path_ref`
        //   reference counter
        // - As long as a path is not abandoned, its stats are available from `proto::Connection`
        // - If a path is abandoned, the `crate::Connection` stores the final stats as long as
        //   the path's refcount is not 0
        // - Therefore, we always get stats here.
        self.conn
            .lock_without_waking("Path::stats")
            .path_stats(self.id)
            .expect("either path stats or discarded path stats are always set as long as Path is not dropped")
    }

    /// Closes this path.
    ///
    /// The path is immediately considered closed by the local endpoint. Once the state is removed,
    /// after a short period of time for any in-flight packets, a [`PathEvent::Abandoned`] is
    /// returned.
    pub fn close(&self) -> Result<(), ClosePathError> {
        let mut state = self.conn.lock_and_wake("close_path");
        state.inner.close_path(
            crate::Instant::now(),
            self.id,
            TransportErrorCode::APPLICATION_ABANDON_PATH.into(),
        )
    }

    /// Sets the max idle timeout for a specific path
    ///
    /// See [`TransportConfig::default_path_max_idle_timeout`] for details.
    ///
    /// Returns the previous value of the setting.
    ///
    /// [`TransportConfig::default_path_max_idle_timeout`]: crate::TransportConfig::default_path_max_idle_timeout
    pub fn set_max_idle_timeout(
        &self,
        timeout: Option<Duration>,
    ) -> Result<Option<Duration>, ClosedPath> {
        let mut state = self.conn.lock_and_wake("path_set_max_idle_timeout");
        let now = state.runtime.now();
        state.inner.set_path_max_idle_timeout(now, self.id, timeout)
    }

    /// Sets the keep_alive_interval for a specific path
    ///
    /// See [`TransportConfig::default_path_keep_alive_interval`] for details.
    ///
    /// Returns the previous value of the setting.
    ///
    /// [`TransportConfig::default_path_keep_alive_interval`]: crate::TransportConfig::default_path_keep_alive_interval
    pub fn set_keep_alive_interval(
        &self,
        interval: Option<Duration>,
    ) -> Result<Option<Duration>, ClosedPath> {
        let mut state = self.conn.lock_and_wake("path_set_keep_alive_interval");
        state.inner.set_path_keep_alive_interval(self.id, interval)
    }

    /// Track changes on our external address as reported by the peer.
    ///
    /// If the address-discovery extension is not negotiated, the stream will never return.
    pub fn observed_external_addr(&self) -> Result<AddressDiscovery, ClosedPath> {
        let state = self.conn.lock_without_waking("per_path_observed_address");
        let path_events = state.path_events.subscribe();
        let initial_value = state.inner.path_observed_address(self.id)?;
        Ok(AddressDiscovery::new(
            self.id,
            path_events,
            initial_value,
            state.runtime.clone(),
        ))
    }

    /// The peer's UDP address for this path.
    pub fn remote_address(&self) -> Result<SocketAddr, ClosedPath> {
        let state = self.conn.lock_without_waking("per_path_remote_address");
        Ok(state.inner.network_path(self.id)?.remote())
    }

    /// Ping the remote endpoint over this path.
    pub fn ping(&self) -> Result<(), ClosedPath> {
        let mut state = self.conn.lock_and_wake("ping");
        state.inner.ping_path(self.id)
    }
}

impl PartialEq for Path {
    fn eq(&self, other: &Self) -> bool {
        self.id == other.id && self.conn.stable_id() == other.conn.stable_id()
    }
}

/// Weak handle for a [`Path`] that does not keep the connection alive.
///
/// As long as a [`WeakPathHandle`] for a path exists, that path's final stats will not be dropped even if
/// the path was abandoned.
///
/// The [`WeakPathHandle`] can be upgraded to a [`Path`] as long as its [`Connection`] has not been dropped.
///
/// [`Connection`]: crate::Connection
#[derive(Debug)]
pub struct WeakPathHandle {
    id: PathId,
    conn: WeakConnectionHandle,
}

impl Clone for WeakPathHandle {
    fn clone(&self) -> Self {
        if let Some(conn) = self.conn.upgrade_to_ref() {
            conn.lock_without_waking("WeakPathHandle::clone")
                .increment_path_refs(self.id);
        }
        Self {
            id: self.id,
            conn: self.conn.clone(),
        }
    }
}

impl PartialEq for WeakPathHandle {
    fn eq(&self, other: &Self) -> bool {
        self.id == other.id && self.conn.is_same_connection(&other.conn)
    }
}

impl Eq for WeakPathHandle {}

impl Drop for WeakPathHandle {
    fn drop(&mut self) {
        if let Some(conn) = self.conn.upgrade_to_ref() {
            conn.lock_without_waking("WeakPathHandle::drop")
                .decrement_path_refs(self.id);
        }
    }
}

impl WeakPathHandle {
    /// Returns the [`PathId`] of this path.
    pub fn id(&self) -> PathId {
        self.id
    }

    /// Upgrades to a [`Path`].
    ///
    /// Returns `None` if the connection was dropped.
    pub fn upgrade(&self) -> Option<Path> {
        let conn = self.conn.upgrade_to_ref()?;
        Some(Path::new_unchecked(conn, self.id))
    }
}

/// Stream produced by [`Path::observed_external_addr`]
///
/// This will always return the external address most recently reported by the remote over this
/// path. If the extension is not negotiated, this stream will never return.
// TODO(@divma): provide a way to check if the extension is negotiated.
pub struct AddressDiscovery {
    watcher: WatchStream<SocketAddr>,
}

impl AddressDiscovery {
    pub(super) fn new(
        path_id: PathId,
        mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
        initial_value: Option<SocketAddr>,
        runtime: Arc<dyn Runtime>,
    ) -> Self {
        let (tx, rx) = watch::channel(initial_value.unwrap_or_else(||
                // if the dummy value is used, it will be ignored
                SocketAddr::new([0, 0, 0, 0].into(), 0)));
        let filter = async move {
            loop {
                match path_events.recv().await {
                    Ok(PathEvent::ObservedAddr { id, addr: observed }) if id == path_id => {
                        tx.send_if_modified(|addr| {
                            let old = std::mem::replace(addr, observed);
                            old != *addr
                        });
                    }
                    Ok(PathEvent::Discarded { id, .. }) if id == path_id => {
                        // If the path is closed, terminate the stream
                        break;
                    }
                    Ok(_) => {
                        // ignore any other event
                    }
                    Err(_) => {
                        // A lagged error should never happen since this (detached) task is
                        // constantly reading from the channel. Therefore, if an error does happen,
                        // the stream can terminate
                        break;
                    }
                }
            }
        };

        let watcher = if initial_value.is_some() {
            WatchStream::new(rx)
        } else {
            WatchStream::from_changes(rx)
        };

        runtime.spawn(Box::pin(filter));
        // TODO(@divma): check if there's a way to ensure the future ends. AbortHandle is not an
        // option
        Self { watcher }
    }
}

impl Stream for AddressDiscovery {
    type Item = SocketAddr;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.watcher).poll_next(cx)
    }
}