noq-proto 0.17.0

State machine for the QUIC transport protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
use identity_hash::{IdentityHashable, IntMap};

use crate::{
    Instant,
    connection::qlog::{QlogSink, QlogSinkWithTime},
};

use super::PathId;

#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub(crate) enum Timer {
    /// Per connection timers.
    Conn(ConnTimer),
    /// Per path timers.
    PerPath(PathId, PathTimer),
}

#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub(crate) enum ConnTimer {
    /// When to close the connection after no activity
    Idle = 0,
    /// When the close timer expires, the connection has been gracefully terminated.
    Close = 1,
    /// When keys are discarded because they should not be needed anymore
    KeyDiscard = 2,
    /// When to send a `PING` frame to keep the connection alive
    KeepAlive = 3,
    /// When to invalidate old CID and proactively push new one via NEW_CONNECTION_ID frame
    PushNewCid = 4,
    /// Grace period after the remote abandoned the last path.
    ///
    /// If no new path is opened before this fires, close the connection.
    /// See <https://www.ietf.org/archive/id/draft-ietf-quic-multipath-21.html#section-3.4-8>
    NoAvailablePath = 5,
    /// When to retry NAT traversal probes.
    ///
    /// Fires at initial PTO intervals to retransmit probes that got no PATH_RESPONSE.
    NatTraversalProbeRetry = 6,
}

impl ConnTimer {
    const VALUES: [Self; 7] = [
        Self::Idle,
        Self::Close,
        Self::KeyDiscard,
        Self::KeepAlive,
        Self::PushNewCid,
        Self::NoAvailablePath,
        Self::NatTraversalProbeRetry,
    ];
}

#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub(crate) enum PathTimer {
    /// When to send an ack-eliciting probe packet or declare unacked packets lost
    LossDetection = 0,
    /// When to abandon a path after no activity
    PathIdle = 1,
    /// When to give up on validating a new path from RFC9000 migration
    PathValidationFailed = 2,
    /// When to resend an on-path path challenge deemed lost
    PathChallengeLost = 3,
    /// When to abandon a path due to failed validation.
    ///
    /// There are two situations in which we give up a path from validation.
    /// 1. When opening a path we validate it according to RFC9000 ยง8.2 as required by the
    ///    multipath spec. This timer is armed to time-bound that validation.
    /// 2. When validating an already opened multipath path for various reasons in which we
    ///    expect the path to either work or not work and want to respond correspondingly in
    ///    a timely manner.
    AbandonFromValidation = 4,
    /// When to send a `PING` frame to keep the path alive
    PathKeepAlive = 5,
    /// When pacing will allow us to send a packet
    Pacing = 6,
    /// When to send an immediate ACK if there are unacked ack-eliciting packets of the peer
    MaxAckDelay = 7,
    /// When to clean up state for an abandoned path
    PathDrained = 8,
}

impl PathTimer {
    pub(super) const VALUES: [Self; 9] = [
        Self::LossDetection,
        Self::PathIdle,
        Self::PathValidationFailed,
        Self::PathChallengeLost,
        Self::AbandonFromValidation,
        Self::PathKeepAlive,
        Self::Pacing,
        Self::MaxAckDelay,
        Self::PathDrained,
    ];
}

/// Keeps track of the nearest timeout for each `Timer`
///
/// The [`TimerTable`] is advanced with [`TimerTable::expire_before`].
#[derive(Debug, Clone, Default)]
pub(crate) struct TimerTable {
    generic: [Option<Instant>; ConnTimer::VALUES.len()],
    path_timers: SmallMap<PathId, PathTimerTable, STACK_TIMERS>,
}

/// For how many paths we keep the timers on the stack, before spilling onto the heap.
const STACK_TIMERS: usize = 4;

/// Works like a `HashMap` but stores up to `SIZE` items on the stack.
#[derive(Debug, Clone)]
struct SmallMap<K, V, const SIZE: usize> {
    stack: [Option<(K, V)>; SIZE],
    heap: Option<IntMap<K, V>>,
}

impl<K, V, const SIZE: usize> Default for SmallMap<K, V, SIZE> {
    fn default() -> Self {
        Self {
            stack: [const { None }; SIZE],
            heap: None,
        }
    }
}

impl<K, V, const SIZE: usize> SmallMap<K, V, SIZE>
where
    K: std::cmp::Eq + std::hash::Hash + IdentityHashable,
{
    fn insert(&mut self, key: K, value: V) -> Option<V> {
        // check stack for space
        for el in self.stack.iter_mut() {
            match el {
                Some((k, v)) => {
                    if *k == key {
                        let old_value = std::mem::replace(v, value);
                        return Some(old_value);
                    }
                }
                None => {
                    // make sure to remove a potentially old value from the heap
                    let old_heap = self.heap.as_mut().and_then(|h| h.remove(&key));
                    *el = Some((key, value));

                    return old_heap;
                }
            }
        }

        // No space on the stack, use the heap
        let heap = self.heap.get_or_insert_default();
        heap.insert(key, value)
    }

    #[cfg(test)]
    fn remove(&mut self, key: &K) -> Option<V> {
        for el in self.stack.iter_mut() {
            if let Some((k, _)) = el
                && key == k
            {
                return el.take().map(|(_, v)| v);
            }
        }

        self.heap.as_mut().and_then(|h| h.remove(key))
    }

    fn get(&self, key: &K) -> Option<&V> {
        for (k, v) in self.stack.iter().filter_map(|v| v.as_ref()) {
            if k == key {
                return Some(v);
            }
        }

        self.heap.as_ref().and_then(|h| h.get(key))
    }

    fn get_mut(&mut self, key: &K) -> Option<&mut V> {
        for (k, v) in self.stack.iter_mut().filter_map(|v| v.as_mut()) {
            if k == key {
                return Some(v);
            }
        }

        self.heap.as_mut().and_then(|h| h.get_mut(key))
    }

    #[cfg(test)]
    fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
        let a = self
            .stack
            .iter()
            .filter_map(|v| v.as_ref().map(|(k, v)| (k, v)));
        let b = self.heap.iter().flat_map(|h| h.iter());
        a.chain(b)
    }

    fn values(&self) -> impl Iterator<Item = &V> {
        let a = self.stack.iter().filter_map(|v| v.as_ref().map(|(_, v)| v));
        let b = self.heap.iter().flat_map(|h| h.values());
        a.chain(b)
    }

    fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
        let a = self
            .stack
            .iter_mut()
            .filter_map(|v| v.as_mut().map(|(k, v)| (&*k, v)));
        let b = self.heap.iter_mut().flat_map(|h| h.iter_mut());
        a.chain(b)
    }

    fn retain<F>(&mut self, mut f: F)
    where
        F: FnMut(&K, &mut V) -> bool,
    {
        let mut to_remove = [false; SIZE];
        for (i, el) in self.stack.iter_mut().enumerate() {
            if let Some((key, value)) = el {
                to_remove[i] = !f(key, value);
            }
        }
        for (i, to_remove) in to_remove.into_iter().enumerate() {
            if to_remove {
                self.stack[i] = None;
            }
        }

        if let Some(ref mut heap) = self.heap {
            heap.retain(f);
        }
    }

    fn clear(&mut self) {
        for el in self.stack.iter_mut() {
            *el = None;
        }
        self.heap = None;
    }
}

#[derive(Debug, Clone, Copy, Default)]
struct PathTimerTable {
    timers: [Option<Instant>; PathTimer::VALUES.len()],
}

impl PathTimerTable {
    fn set(&mut self, timer: PathTimer, time: Instant) {
        self.timers[timer as usize] = Some(time);
    }

    fn get(&self, timer: PathTimer) -> Option<Instant> {
        self.timers[timer as usize]
    }

    fn stop(&mut self, timer: PathTimer) {
        self.timers[timer as usize] = None;
    }

    /// Remove the next timer up until `now`, including it
    fn expire_before(&mut self, now: Instant) -> Option<(PathTimer, Instant)> {
        for timer in PathTimer::VALUES {
            if self.timers[timer as usize].is_some()
                && self.timers[timer as usize].expect("checked") <= now
            {
                return self.timers[timer as usize].take().map(|time| (timer, time));
            }
        }

        None
    }
}

impl TimerTable {
    /// Sets the timer unconditionally.
    ///
    /// If the timer is already set, this will change the timer's value.
    pub(super) fn set(&mut self, timer: Timer, time: Instant, qlog: QlogSinkWithTime<'_>) {
        match timer {
            Timer::Conn(timer) => {
                self.generic[timer as usize] = Some(time);
            }
            Timer::PerPath(path_id, timer) => match self.path_timers.get_mut(&path_id) {
                None => {
                    let mut table = PathTimerTable::default();
                    table.set(timer, time);
                    self.path_timers.insert(path_id, table);
                }
                Some(table) => {
                    table.set(timer, time);
                }
            },
        }
        qlog.emit_timer_set(timer, time);
    }

    pub(super) fn get(&self, timer: Timer) -> Option<Instant> {
        match timer {
            Timer::Conn(timer) => self.generic[timer as usize],
            Timer::PerPath(path_id, timer) => self.path_timers.get(&path_id)?.get(timer),
        }
    }

    pub(super) fn set_or_stop(
        &mut self,
        timer: Timer,
        time: Option<Instant>,
        qlog: QlogSinkWithTime<'_>,
    ) {
        match time {
            Some(time) => self.set(timer, time, qlog),
            None => self.stop(timer, qlog),
        }
    }

    pub(super) fn stop(&mut self, timer: Timer, qlog: QlogSinkWithTime<'_>) {
        match timer {
            Timer::Conn(timer) => {
                self.generic[timer as usize] = None;
            }
            Timer::PerPath(path_id, timer) => {
                if let Some(e) = self.path_timers.get_mut(&path_id) {
                    e.stop(timer);
                }
            }
        }
        qlog.emit_timer_stop(timer);
    }

    /// Stops all per-path timers
    pub(super) fn stop_per_path(&mut self, path_id: PathId, qlog: QlogSinkWithTime<'_>) {
        for timer in PathTimer::VALUES {
            if let Some(e) = self.path_timers.get_mut(&path_id) {
                e.stop(timer);
                qlog.emit_timer_stop(Timer::PerPath(path_id, timer));
            }
        }
    }

    /// Get the next queued timeout
    pub(super) fn peek(&mut self) -> Option<Instant> {
        // TODO: this is currently linear in the number of paths

        let min_generic = self.generic.iter().filter_map(|&x| x).min();
        let min_path = self
            .path_timers
            .values()
            .flat_map(|p| p.timers.iter().filter_map(|&x| x))
            .min();

        match (min_generic, min_path) {
            (None, None) => None,
            (Some(val), None) => Some(val),
            (Some(a), Some(b)) => Some(a.min(b)),
            (None, Some(val)) => Some(val),
        }
    }

    /// Remove the next timer up until `now`, including it
    pub(super) fn expire_before(
        &mut self,
        now: Instant,
        qlog: &QlogSink,
    ) -> Option<(Timer, Instant)> {
        let (timer, instant) = self.expire_before_inner(now)?;
        qlog.with_time(now).emit_timer_expire(timer);
        Some((timer, instant))
    }

    fn expire_before_inner(&mut self, now: Instant) -> Option<(Timer, Instant)> {
        // TODO: this is currently linear in the number of paths

        for timer in ConnTimer::VALUES {
            if self.generic[timer as usize].is_some()
                && self.generic[timer as usize].expect("checked") <= now
            {
                return self.generic[timer as usize]
                    .take()
                    .map(|time| (Timer::Conn(timer), time));
            }
        }

        let mut res = None;
        for (path_id, timers) in self.path_timers.iter_mut() {
            if let Some((timer, time)) = timers.expire_before(now) {
                res = Some((Timer::PerPath(*path_id, timer), time));
                break;
            }
        }

        // clear out old timers
        self.path_timers
            .retain(|_path_id, timers| timers.timers.iter().any(|t| t.is_some()));
        res
    }

    pub(super) fn reset(&mut self) {
        for timer in ConnTimer::VALUES {
            self.generic[timer as usize] = None;
        }
        self.path_timers.clear();
    }

    #[cfg(test)]
    pub(super) fn values(&self) -> Vec<(Timer, Instant)> {
        let mut values = Vec::new();

        for timer in ConnTimer::VALUES {
            if let Some(time) = self.generic[timer as usize] {
                values.push((Timer::Conn(timer), time));
            }
        }

        for timer in PathTimer::VALUES {
            for (path_id, timers) in self.path_timers.iter() {
                if let Some(time) = timers.timers[timer as usize] {
                    values.push((Timer::PerPath(*path_id, timer), time));
                }
            }
        }

        values
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use crate::connection::qlog::QlogSink;

    use super::*;

    #[test]
    fn timer_table() {
        let mut timers = TimerTable::default();
        let sec = Duration::from_secs(1);
        let now = Instant::now() + Duration::from_secs(10);
        timers.set(
            Timer::Conn(ConnTimer::Idle),
            now - 3 * sec,
            QlogSink::default().with_time(now),
        );
        timers.set(
            Timer::Conn(ConnTimer::Close),
            now - 2 * sec,
            QlogSink::default().with_time(now),
        );

        assert_eq!(timers.peek(), Some(now - 3 * sec));
        assert_eq!(
            timers.expire_before(now, &QlogSink::default()),
            Some((Timer::Conn(ConnTimer::Idle), now - 3 * sec))
        );
        assert_eq!(
            timers.expire_before(now, &QlogSink::default()),
            Some((Timer::Conn(ConnTimer::Close), now - 2 * sec))
        );
        assert_eq!(timers.expire_before(now, &QlogSink::default()), None);
    }

    #[test]
    fn test_small_map() {
        let mut map = SmallMap::<usize, usize, 2>::default();

        // inserts only on the stack
        assert_eq!(map.insert(1, 1), None);
        assert!(map.heap.is_none());
        assert_eq!(map.insert(2, 2), None);
        assert!(map.heap.is_none());

        // replace on the stack
        assert_eq!(map.insert(1, 2), Some(1));

        assert_eq!(map.remove(&1), Some(2));
        assert_eq!(map.insert(3, 3), None);
        assert!(map.heap.is_none());

        // spill
        assert_eq!(map.insert(4, 4), None);
        assert!(map.heap.is_some());

        assert_eq!(
            map.iter()
                .map(|(&a, &b)| (a, b))
                .collect::<Vec<(usize, usize)>>(),
            vec![(3, 3), (2, 2), (4, 4)]
        );
        assert_eq!(
            map.iter()
                .map(|(a, b)| (*a, *b))
                .collect::<Vec<(usize, usize)>>(),
            map.iter_mut()
                .map(|(a, b)| (*a, *b))
                .collect::<Vec<(usize, usize)>>(),
        );

        assert_eq!(map.heap.as_ref().unwrap().len(), 1);

        for i in 0..10 {
            map.insert(10 + i, 10 + i);
        }
        assert_eq!(map.heap.as_ref().unwrap().len(), 11);
        map.retain(|k, _v| *k < 10);

        assert_eq!(map.heap.as_ref().unwrap().len(), 1);

        assert_eq!(
            map.iter()
                .map(|(&a, &b)| (a, b))
                .collect::<Vec<(usize, usize)>>(),
            vec![(3, 3), (2, 2), (4, 4)]
        );

        assert_eq!(
            map.iter()
                .map(|(a, b)| (*a, *b))
                .collect::<Vec<(usize, usize)>>(),
            map.iter_mut()
                .map(|(a, b)| (*a, *b))
                .collect::<Vec<(usize, usize)>>(),
        );

        map.clear();
        assert_eq!(map.iter().collect::<Vec<_>>(), Vec::new());
        assert!(map.heap.is_none());
    }
}