commonware-runtime 2026.4.0

Execute asynchronous tasks with a configurable scheduler.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
//! Waiter identity and lifecycle state for tracked io_uring requests.
//!
//! This module manages waiter IDs, request lifecycle transitions, and
//! outstanding-operation tracking. It is the source of truth for which logical
//! requests are still tracked and whether each currently has an operation SQE
//! outstanding.

use super::{request::Request, Tick, UserData};
use io_uring::squeue::Entry as SqueueEntry;
use tracing::warn;

/// Stable waiter identity packed into SQE/CQE `user_data`.
///
/// Layout:
/// - bits 0..31: slot index
/// - bits 32..62: generation (31 bits, wraps at 2^31)
/// - bit 63: reserved as cancel-tag in completion `user_data`
///
/// The generation counter detects stale CQEs that arrive after a slot has been
/// recycled. In normal (non-cancel) operation this cannot happen: a slot is
/// only freed after its CQE is processed, so the slot cannot be reused before
/// the CQE is consumed. With cancellation, the original op CQE can arrive
/// before the cancel CQE. When this happens the slot is freed and may be
/// recycled while the cancel CQE is still pending. The generation check
/// discards that stale cancel CQE. The 31-bit generation wraps after ~2 billion
/// reuses of the same slot, but cancellation is run synchronously on the kernel
/// side (a CQE is always generated by the time the cancel request has been
/// submitted, see
/// <https://man7.org/linux/man-pages/man3/io_uring_prep_cancel.3.html#NOTES>),
/// so a wrap-around collision is not feasible in practice.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct WaiterId(UserData);

impl WaiterId {
    /// Number of low-order bits reserved for the waiter slot index.
    const INDEX_BITS: u32 = 32;
    /// Number of bits reserved for the generation field.
    const GENERATION_BITS: u32 = 31;
    /// Bitmask that extracts the waiter slot index from packed user data.
    const INDEX_MASK: UserData = (1u64 << Self::INDEX_BITS) - 1;
    /// Bitmask that extracts the 31-bit generation from packed user data.
    const GENERATION_MASK: UserData = (1u64 << Self::GENERATION_BITS) - 1;
    /// High-bit tag used to mark cancellation CQE user data.
    const CANCEL_TAG: UserData = 1u64 << 63;

    /// Build a waiter id from slot index and generation components.
    pub const fn new(index: u32, generation: u32) -> Self {
        let index = index as UserData;
        let generation = generation as UserData;
        Self((generation & Self::GENERATION_MASK) << Self::INDEX_BITS | index)
    }

    /// Return the slot index component of this waiter id.
    pub const fn index(self) -> u32 {
        (self.0 & Self::INDEX_MASK) as u32
    }

    /// Return the generation component of this waiter id.
    const fn generation(self) -> u32 {
        ((self.0 >> Self::INDEX_BITS) & Self::GENERATION_MASK) as u32
    }

    /// Return the waiter id for the same slot with incremented generation.
    const fn next_generation(self) -> Self {
        let generation = ((self.generation() as UserData).wrapping_add(1)) & Self::GENERATION_MASK;
        Self::new(self.index(), generation as u32)
    }

    /// Encode this waiter id as `user_data` for the operation SQE/CQE.
    ///
    /// This value contains only the packed waiter identity (slot + generation),
    /// with the cancel tag bit clear.
    pub const fn user_data(self) -> UserData {
        self.0
    }

    /// Encode this waiter id as `user_data` for the cancel SQE/CQE.
    ///
    /// This preserves the waiter identity and sets the high cancel-tag bit so
    /// completion handling can distinguish cancel CQEs from operation CQEs.
    pub const fn cancel_user_data(self) -> UserData {
        self.0 | Self::CANCEL_TAG
    }

    /// Decode `user_data` into waiter identity and cancel-tag state.
    ///
    /// The returned waiter id always has the cancel-tag bit stripped. The
    /// boolean reports whether that bit was set in the input value.
    const fn from_user_data(user_data: UserData) -> (Self, bool) {
        let is_cancel = (user_data & Self::CANCEL_TAG) != 0;
        (Self(user_data & !Self::CANCEL_TAG), is_cancel)
    }
}

/// Lifecycle state of a tracked request.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WaiterState {
    /// Request is still tracked and has not transitioned to cancellation.
    Active {
        /// Absolute wheel tick by which the request must complete.
        ///
        /// If completion has not been observed by this tick, cancellation is
        /// requested. `None` means this request has no timeout deadline.
        target_tick: Option<Tick>,
    },
    /// Cancellation was requested.
    ///
    /// If the request still has an operation SQE in flight, the loop stages an
    /// async cancel. If the request is only parked in the ready queue, the loop
    /// completes it locally with timeout when that entry is revisited.
    CancelRequested,
}

/// State for one tracked logical request.
struct Waiter {
    /// Stable identity of this waiter slot instance.
    id: WaiterId,
    /// Lifecycle state for the logical request stored in this slot.
    state: WaiterState,
    /// Whether the logical request currently has an operation SQE in flight.
    in_flight: bool,
    /// The active request state machine.
    request: Request,
}

/// Outcome produced when staging the next SQE for a waiter.
pub enum StageOutcome {
    /// The waiter was canceled while parked in the ready queue and should
    /// complete locally with timeout rather than issuing another SQE.
    Timeout(Request),
    /// The original caller dropped its wait handle before this SQE could be
    /// staged, so the waiter was retired locally.
    Orphaned {
        /// Active deadline tracking to remove from the timeout wheel, if any.
        target_tick: Option<Tick>,
    },
    /// The waiter is still active and produced an SQE for submission.
    Submit(SqueueEntry),
}

/// Outcome produced when handling an operation CQE for a waiter.
#[allow(clippy::large_enum_variant)]
pub enum CompletionOutcome {
    /// The CQE belonged to an async cancel SQE and was handled internally.
    Cancel,
    /// The logical request needs another SQE and should be placed back in the
    /// ready queue.
    Requeue(WaiterId),
    /// The logical request completed and was removed from the waiter table.
    Complete {
        /// The completed request, ready to deliver its cached result.
        request: Request,
        /// Active deadline tracking to remove from the timeout wheel, when
        /// completion happened before cancellation was requested.
        target_tick: Option<Tick>,
    },
}

/// Tracks logical requests and the state needed to complete them.
pub struct Waiters {
    /// Waiters indexed by slot index.
    ///
    /// Free slots have no waiter (`None`).
    entries: Vec<Option<Waiter>>,
    /// Stack of reusable waiter ids.
    free: Vec<WaiterId>,
    /// Number of tracked waiters currently stored in `entries`.
    len: usize,
}

impl Waiters {
    /// Create an empty waiter set that can track at most `capacity` logical
    /// requests at once.
    pub fn new(capacity: usize) -> Self {
        let mut entries = Vec::with_capacity(capacity);
        entries.resize_with(capacity, || None);

        let mut free = Vec::with_capacity(capacity);
        free.extend((0..capacity).rev().map(|index| {
            let index = u32::try_from(index).expect("slot index overflow");
            WaiterId::new(index, 0)
        }));

        Self {
            entries,
            free,
            len: 0,
        }
    }

    /// Return the number of currently tracked waiters.
    pub const fn len(&self) -> usize {
        self.len
    }

    /// Return whether there are no tracked waiters.
    pub const fn is_empty(&self) -> bool {
        self.len == 0
    }

    /// Insert a request and return its assigned id.
    ///
    /// Panics if no free slot is available.
    pub fn insert(&mut self, request: Request, target_tick: Option<Tick>) -> WaiterId {
        let id = self
            .free
            .pop()
            .expect("waiters should not exceed configured capacity");
        let index = id.index() as usize;
        let replaced = self.entries[index].replace(Waiter {
            id,
            state: WaiterState::Active { target_tick },
            in_flight: false,
            request,
        });
        assert!(replaced.is_none(), "free slot should not contain waiter");
        self.len += 1;
        id
    }

    /// Remove the waiter stored at `index`, returning its owned request.
    ///
    /// Panics if `index` is out of bounds or the slot is empty. Callers must
    /// already have validated that the slot still belongs to the expected
    /// waiter.
    fn take(&mut self, index: usize) -> Request {
        let slot = self.entries[index].take().expect("tracked waiter missing");
        self.free.push(slot.id.next_generation());
        self.len -= 1;
        slot.request
    }

    /// Request cancellation for an active waiter.
    ///
    /// Returns `true` when the waiter was successfully transitioned to
    /// cancel-requested. Returns `false` when the waiter id is stale, not
    /// present, or already cancel-requested.
    pub fn cancel(&mut self, waiter_id: WaiterId) -> bool {
        let Some(slot) = self.entries.get_mut(waiter_id.index() as usize) else {
            return false;
        };
        let Some(slot) = slot.as_mut() else {
            return false;
        };
        if slot.id != waiter_id {
            // Slot was reused, this CQE belongs to an older waiter generation.
            return false;
        }
        match slot.state {
            WaiterState::Active { .. } => {
                slot.state = WaiterState::CancelRequested;
                true
            }
            WaiterState::CancelRequested => false,
        }
    }

    /// Stage the next SQE for a waiter.
    ///
    /// This either returns the next SQE to issue, or removes the waiter from
    /// the table and returns the local action that should happen instead.
    ///
    /// - [`StageOutcome::Submit`] leaves the waiter tracked and yields the next SQE.
    /// - [`StageOutcome::Timeout`] removes the waiter and completes it locally with
    ///   timeout.
    /// - [`StageOutcome::Orphaned`] removes the waiter because the caller dropped its
    ///   wait handle before restaging.
    ///
    /// When this returns [`StageOutcome::Submit`], the waiter is marked as having an
    /// operation SQE outstanding immediately, so [`Waiters::is_in_flight`] will return
    /// `true` for that waiter.
    ///
    /// Panics if `waiter_id` does not refer to a currently tracked waiter or if
    /// the waiter already has an operation SQE outstanding.
    pub fn stage(&mut self, waiter_id: WaiterId) -> StageOutcome {
        let index = waiter_id.index() as usize;
        let slot = self
            .entries
            .get_mut(index)
            .and_then(Option::as_mut)
            .expect("stage called for untracked waiter");
        assert_eq!(slot.id, waiter_id, "stage called with stale waiter id");

        match slot.state {
            WaiterState::CancelRequested => StageOutcome::Timeout(self.take(index)),
            WaiterState::Active { target_tick } if slot.request.is_orphaned() => {
                // The current request still owns all resources, but there is no
                // caller left to observe more progress, so retire it locally
                // instead of issuing another SQE.
                let _ = self.take(index);
                StageOutcome::Orphaned { target_tick }
            }
            WaiterState::Active { .. } => {
                assert!(
                    !slot.in_flight,
                    "stage called for waiter with op already in flight"
                );
                slot.in_flight = true;
                StageOutcome::Submit(slot.request.build_sqe(waiter_id))
            }
        }
    }

    /// Process one CQE for a waiter.
    ///
    /// Cancel CQEs are handled internally. Operation CQEs drive the request
    /// state machine and return a high-level loop action. If the current SQE
    /// completed but the original caller already dropped its wait handle, the
    /// waiter is retired locally instead of requeueing another SQE.
    ///
    /// Panics if a non-cancel CQE does not refer to a currently tracked waiter,
    /// if it uses a stale waiter generation, or if the waiter has no operation
    /// SQE outstanding.
    pub fn on_completion(&mut self, user_data: UserData, result: i32) -> CompletionOutcome {
        let (waiter_id, is_cancel) = WaiterId::from_user_data(user_data);
        let index = waiter_id.index() as usize;

        let Some(slot) = self.entries.get_mut(index).and_then(Option::as_mut) else {
            assert!(is_cancel, "operation CQE for untracked waiter");
            return CompletionOutcome::Cancel;
        };
        if slot.id != waiter_id {
            assert!(is_cancel, "operation CQE for stale waiter generation");
            return CompletionOutcome::Cancel;
        }

        if is_cancel {
            if result == 0 {
                // Cancellation successful.
            } else if result == -libc::EALREADY {
                // Cancellation is no longer possible at this stage. The target
                // operation CQE should follow shortly.
            } else if result == -libc::ENOENT {
                // Not found can mean the target already completed (common race) or
                // stale/invalid user_data.
            } else if result == -libc::EINVAL {
                panic!("async cancel SQE rejected by kernel: EINVAL");
            } else {
                warn!(result, "unexpected async cancel CQE result");
            }

            // Cancel CQEs acknowledge cancel requests but do not complete waiters.
            return CompletionOutcome::Cancel;
        }

        // The operation CQE retires the currently in-flight SQE, regardless of
        // whether the request completes or is requeued for another one.
        assert!(slot.in_flight);
        slot.in_flight = false;

        let state = slot.state;
        let completed = slot.request.on_cqe(slot.state, result);
        if completed || slot.request.is_orphaned() {
            // Either the request reached a terminal state, or the current SQE
            // made non-terminal progress for a caller that is already gone. In
            // both cases, remove the waiter now instead of requeueing another SQE.
            let target_tick = match state {
                WaiterState::Active { target_tick } => target_tick,
                WaiterState::CancelRequested => None,
            };

            CompletionOutcome::Complete {
                request: self.take(index),
                target_tick,
            }
        } else {
            CompletionOutcome::Requeue(waiter_id)
        }
    }

    /// Return whether a waiter currently has an operation SQE in flight.
    pub fn is_in_flight(&self, waiter_id: WaiterId) -> bool {
        let index = waiter_id.index() as usize;
        self.entries
            .get(index)
            .and_then(Option::as_ref)
            .is_some_and(|slot| slot.id == waiter_id && slot.in_flight)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        iouring::request::{ReadAtRequest, RecvRequest, Request, SendRequest, SyncRequest},
        IoBuf, IoBufMut, IoBufs,
    };
    use commonware_utils::channel::oneshot;
    use std::{
        os::fd::{FromRawFd, IntoRawFd},
        panic::{catch_unwind, AssertUnwindSafe},
        sync::Arc,
    };

    /// Build a `Sync` request backed by a socket fd so waiter tests can
    /// exercise slot lifecycle without touching the filesystem.
    fn make_sync_request() -> (Request, oneshot::Receiver<std::io::Result<()>>) {
        let (sock_left, _sock_right) =
            std::os::unix::net::UnixStream::pair().expect("failed to create unix socket pair");
        // SAFETY: sock_left is a valid fd that we own.
        let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
        let (tx, rx) = oneshot::channel();
        let request = Request::Sync(SyncRequest {
            file: Arc::new(file),
            result: None,
            sender: tx,
        });
        (request, rx)
    }

    fn waiter_state(waiters: &Waiters, waiter_id: WaiterId) -> Option<WaiterState> {
        let index = waiter_id.index() as usize;
        let slot = waiters.entries.get(index)?.as_ref()?;
        (slot.id == waiter_id).then_some(slot.state)
    }

    fn remove_waiter(waiters: &mut Waiters, waiter_id: WaiterId) -> Request {
        let index = waiter_id.index() as usize;
        let slot = waiters
            .entries
            .get(index)
            .and_then(Option::as_ref)
            .expect("remove_waiter called for untracked waiter");
        assert_eq!(
            slot.id, waiter_id,
            "remove_waiter called with stale waiter id"
        );
        waiters.take(index)
    }

    #[test]
    fn test_waiter_id_encoding_and_generation_wrap() {
        // Verify waiter ids round-trip through user_data encoding and wrap their generation field
        // without corrupting the slot index bits.
        let wrapped = WaiterId::new(7, (WaiterId::GENERATION_MASK as u32).wrapping_add(5));
        assert_eq!(wrapped.generation(), 4);

        let max = WaiterId::new(7, WaiterId::GENERATION_MASK as u32);
        assert_eq!(max.next_generation().generation(), 0);

        let waiter_id = WaiterId::new(7, 3);
        assert_eq!(waiter_id.index(), 7);
        assert_eq!(waiter_id.generation(), 3);

        let (decoded_op, is_cancel_op) = WaiterId::from_user_data(waiter_id.user_data());
        assert_eq!(decoded_op, waiter_id);
        assert!(!is_cancel_op);

        let (decoded_cancel, is_cancel) = WaiterId::from_user_data(waiter_id.cancel_user_data());
        assert_eq!(decoded_cancel, waiter_id);
        assert!(is_cancel);
    }

    #[test]
    fn test_waiters_lifecycle_and_slot_reuse() {
        // Verify waiter insertion, completion, removal, and slot reuse all preserve generations.
        let mut waiters = Waiters::new(3);
        assert_eq!(waiters.entries.len(), 3);
        assert_eq!(waiters.len(), 0);
        assert!(waiters.is_empty());

        // Populate two slots so the test can later free and reuse one of them.
        let (req0, _rx0) = make_sync_request();
        let (req1, _rx1) = make_sync_request();
        let id0 = waiters.insert(req0, Some(5));
        let id1 = waiters.insert(req1, Some(9));
        assert_eq!((id0.index(), id1.index()), (0, 1));
        assert_eq!(waiters.len(), 2);

        // A stale operation CQE should panic because only cancel CQEs are
        // expected to arrive after slot reuse.
        let stale = WaiterId::new(id1.index(), id1.generation().wrapping_add(1));
        let stale_completion = catch_unwind(AssertUnwindSafe(|| {
            let _ = waiters.on_completion(stale.user_data(), 0);
        }));
        assert!(stale_completion.is_err());

        // Complete id1.
        assert!(matches!(waiters.stage(id1), StageOutcome::Submit(_)));
        assert!(matches!(
            waiters.on_completion(id1.user_data(), 0),
            CompletionOutcome::Complete {
                target_tick: Some(9),
                ..
            }
        ));
        assert_eq!(waiters.len(), 1);

        // Next allocation reuses the freed slot with incremented generation.
        let (req2, _rx2) = make_sync_request();
        let id2 = waiters.insert(req2, Some(11));
        assert_eq!(id2.index(), id1.index());
        assert_eq!(
            id2.generation(),
            id1.generation().wrapping_add(1) & (WaiterId::GENERATION_MASK as u32)
        );

        // All live waiters should still complete and remove cleanly after slot reuse.
        assert!(matches!(waiters.stage(id0), StageOutcome::Submit(_)));
        let _ = waiters.on_completion(id0.user_data(), 0);
        assert!(matches!(waiters.stage(id2), StageOutcome::Submit(_)));
        let _ = waiters.on_completion(id2.user_data(), 0);
        assert!(waiters.is_empty());
    }

    #[test]
    fn test_waiters_cancel_paths() {
        // Verify cancel requests transition waiter state, ignore cancel CQEs for completion, and
        // discard late cancel CQEs once the original operation has already completed.
        let mut waiters = Waiters::new(3);

        let (req, _rx) = make_sync_request();
        let waiter_id = waiters.insert(req, Some(2));

        let stale = WaiterId::new(waiter_id.index(), waiter_id.generation().wrapping_add(1));
        assert!(!waiters.cancel(stale));

        assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
        assert!(
            waiters.cancel(waiter_id),
            "cancel should transition active waiter"
        );

        // Cancel CQE does not complete the waiter.
        assert!(matches!(
            waiters.on_completion(waiter_id.cancel_user_data(), -libc::ECANCELED),
            CompletionOutcome::Cancel
        ));

        // Op CQE completes the waiter.
        assert!(matches!(
            waiters.on_completion(waiter_id.user_data(), 0),
            CompletionOutcome::Complete {
                target_tick: None,
                ..
            }
        ));
        assert!(waiters.is_empty());

        // Late cancel CQE for the already-completed waiter should be ignored.
        assert!(matches!(
            waiters.on_completion(waiter_id.cancel_user_data(), -libc::ECANCELED),
            CompletionOutcome::Cancel
        ));
        let missing_op_cqe = catch_unwind(AssertUnwindSafe(|| {
            let _ = waiters.on_completion(0, 1);
        }));
        assert!(missing_op_cqe.is_err());
    }

    #[test]
    fn test_waiters_track_in_flight_state() {
        // Verify `stage` tracks a staged operation and that the bit is
        // cleared again when the matching op CQE is processed.
        let mut waiters = Waiters::new(1);
        let (req, _rx) = make_sync_request();
        let waiter_id = waiters.insert(req, Some(4));

        assert!(!waiters.is_in_flight(waiter_id));
        assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
        assert!(waiters.is_in_flight(waiter_id));

        assert!(matches!(
            waiters.on_completion(waiter_id.user_data(), 0),
            CompletionOutcome::Complete { .. }
        ));
        assert!(!waiters.is_in_flight(waiter_id));
    }

    #[test]
    fn test_waiters_reject_stale_in_flight_queries() {
        // Verify stale waiter ids cannot observe in-flight state after
        // their slot has been recycled to a new generation.
        let mut waiters = Waiters::new(1);
        let (req0, _rx0) = make_sync_request();
        let stale_id = waiters.insert(req0, Some(1));
        let _ = remove_waiter(&mut waiters, stale_id);

        let (req1, _rx1) = make_sync_request();
        let active_id = waiters.insert(req1, Some(2));
        assert_ne!(active_id, stale_id);

        assert!(!waiters.is_in_flight(stale_id));
        assert!(matches!(waiters.stage(active_id), StageOutcome::Submit(_)));
        assert!(waiters.is_in_flight(active_id));
    }

    #[test]
    fn test_waiters_stage_panics_for_out_of_range_and_empty_slots() {
        // Verify `stage` treats impossible waiter ids as invariant failures,
        // while the tolerant query/cancel paths still reject them cleanly.
        let mut waiters = Waiters::new(1);
        let out_of_range = WaiterId::new(7, 0);
        assert!(!waiters.cancel(out_of_range));
        let out_of_range_stage = catch_unwind(AssertUnwindSafe(|| {
            let _ = waiters.stage(out_of_range);
        }));
        assert!(out_of_range_stage.is_err());

        let empty_slot = WaiterId::new(0, 0);
        assert!(!waiters.cancel(empty_slot));
        let empty_slot_stage = catch_unwind(AssertUnwindSafe(|| {
            let _ = waiters.stage(empty_slot);
        }));
        assert!(empty_slot_stage.is_err());
    }

    #[test]
    fn test_waiters_cancel_and_in_flight_reject_out_of_range_and_empty_slots() {
        // Verify cancel and in-flight tracking reject waiter ids that point
        // outside the table or at currently empty slots.
        let mut waiters = Waiters::new(1);
        let out_of_range = WaiterId::new(7, 0);
        assert!(!waiters.cancel(out_of_range));
        assert!(!waiters.is_in_flight(out_of_range));

        let empty_slot = WaiterId::new(0, 0);
        assert!(!waiters.cancel(empty_slot));
        assert!(!waiters.is_in_flight(empty_slot));
    }

    #[test]
    fn test_waiters_cancel_stage_only_when_in_flight() {
        // Verify timeout processing can distinguish between:
        // - a waiter whose current SQE is still in flight
        // - a waiter that is only parked in the ready queue
        let mut waiters = Waiters::new(2);

        // First build a waiter that still has an operation SQE outstanding.
        let (active_req, _active_rx) = make_sync_request();
        let active = waiters.insert(active_req, Some(2));
        assert!(matches!(waiters.stage(active), StageOutcome::Submit(_)));
        assert!(waiters.cancel(active));
        assert!(waiters.is_in_flight(active));
        let active_state = waiter_state(&waiters, active).expect("active waiter missing");
        assert!(matches!(active_state, WaiterState::CancelRequested));

        // Then build a waiter that has been canceled before any SQE was staged.
        let (ready_req, _ready_rx) = make_sync_request();
        let ready = waiters.insert(ready_req, Some(3));
        assert!(waiters.cancel(ready));
        assert!(!waiters.is_in_flight(ready));
        let ready_state = waiter_state(&waiters, ready).expect("ready waiter missing");
        assert!(matches!(ready_state, WaiterState::CancelRequested));
    }

    #[test]
    fn test_waiters_stage_orphans_closed_requests() {
        // Verify closed send and read-at requests are removed locally before
        // their first SQE is ever staged.
        {
            // Send request orphaned before first submit.
            let mut waiters = Waiters::new(1);
            let (tx, rx) = oneshot::channel();
            drop(rx);
            let waiter_id = waiters.insert(
                Request::Send(SendRequest {
                    fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()),
                    write: IoBufs::from(IoBuf::from(b"hello")).into(),
                    deadline: None,
                    result: None,
                    sender: tx,
                }),
                Some(7),
            );

            match waiters.stage(waiter_id) {
                StageOutcome::Orphaned {
                    target_tick: Some(7),
                } => {}
                _ => panic!("closed send waiter should be orphaned before staging"),
            }
            assert!(waiters.is_empty());
        }

        {
            // Read-at request orphaned before first submit.
            let mut waiters = Waiters::new(1);
            let (sock_left, _sock_right) =
                std::os::unix::net::UnixStream::pair().expect("failed to create unix socket pair");
            // SAFETY: sock_left is a valid fd that we own.
            let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
            let (tx, rx) = oneshot::channel();
            drop(rx);
            let waiter_id = waiters.insert(
                Request::ReadAt(ReadAtRequest {
                    file: Arc::new(file),
                    offset: 0,
                    len: 8,
                    read: 0,
                    buf: IoBufMut::with_capacity(8),
                    result: None,
                    sender: tx,
                }),
                Some(8),
            );

            match waiters.stage(waiter_id) {
                StageOutcome::Orphaned {
                    target_tick: Some(8),
                } => {}
                _ => panic!("closed read waiter should be orphaned before staging"),
            }
            assert!(waiters.is_empty());
        }
    }

    #[test]
    fn test_waiters_orphan_closed_requests_after_nonterminal_completion() {
        // Verify retryable and partial-progress send, recv, and read-at CQEs
        // remove the waiter instead of requeueing once the caller is gone.
        {
            // Send request orphaned after a retryable CQE.
            let mut waiters = Waiters::new(1);
            let (tx, rx) = oneshot::channel();
            let waiter_id = waiters.insert(
                Request::Send(SendRequest {
                    fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()),
                    write: IoBufs::from(IoBuf::from(b"hello")).into(),
                    deadline: None,
                    result: None,
                    sender: tx,
                }),
                Some(5),
            );
            assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
            drop(rx);

            match waiters.on_completion(waiter_id.user_data(), -libc::EAGAIN) {
                CompletionOutcome::Complete {
                    request,
                    target_tick: Some(5),
                } => request.complete(),
                _ => panic!("closed send waiter should be orphaned after retry CQE"),
            }
            assert!(waiters.is_empty());
        }

        {
            // Send request orphaned after a partial-progress CQE.
            let mut waiters = Waiters::new(1);
            let (tx, rx) = oneshot::channel();
            let waiter_id = waiters.insert(
                Request::Send(SendRequest {
                    fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()),
                    write: IoBufs::from(IoBuf::from(b"hello")).into(),
                    deadline: None,
                    result: None,
                    sender: tx,
                }),
                Some(5),
            );
            assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
            drop(rx);

            match waiters.on_completion(waiter_id.user_data(), 2) {
                CompletionOutcome::Complete {
                    request,
                    target_tick: Some(5),
                } => request.complete(),
                _ => panic!("closed send waiter should be orphaned after partial CQE"),
            }
            assert!(waiters.is_empty());
        }

        {
            // Exact recv orphaned after a retryable CQE.
            let mut waiters = Waiters::new(1);
            let (tx, rx) = oneshot::channel();
            let waiter_id = waiters.insert(
                Request::Recv(RecvRequest {
                    fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()),
                    buf: IoBufMut::with_capacity(5),
                    offset: 0,
                    len: 5,
                    exact: true,
                    deadline: None,
                    result: None,
                    sender: tx,
                }),
                Some(6),
            );
            assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
            drop(rx);

            match waiters.on_completion(waiter_id.user_data(), -libc::EAGAIN) {
                CompletionOutcome::Complete {
                    request,
                    target_tick: Some(6),
                } => request.complete(),
                _ => panic!("closed recv waiter should be orphaned after retry CQE"),
            }
            assert!(waiters.is_empty());
        }

        {
            // Exact recv orphaned after a partial-progress CQE.
            let mut waiters = Waiters::new(1);
            let (tx, rx) = oneshot::channel();
            let waiter_id = waiters.insert(
                Request::Recv(RecvRequest {
                    fd: Arc::new(std::os::unix::net::UnixStream::pair().unwrap().0.into()),
                    buf: IoBufMut::with_capacity(5),
                    offset: 0,
                    len: 5,
                    exact: true,
                    deadline: None,
                    result: None,
                    sender: tx,
                }),
                Some(6),
            );
            assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
            drop(rx);

            match waiters.on_completion(waiter_id.user_data(), 3) {
                CompletionOutcome::Complete {
                    request,
                    target_tick: Some(6),
                } => request.complete(),
                _ => panic!("closed recv waiter should be orphaned after partial CQE"),
            }
            assert!(waiters.is_empty());
        }

        {
            // Read-at request orphaned after a partial-progress CQE.
            let mut waiters = Waiters::new(1);
            let (sock_left, _sock_right) =
                std::os::unix::net::UnixStream::pair().expect("failed to create unix socket pair");
            // SAFETY: sock_left is a valid fd that we own.
            let file = unsafe { std::fs::File::from_raw_fd(sock_left.into_raw_fd()) };
            let (tx, rx) = oneshot::channel();
            let waiter_id = waiters.insert(
                Request::ReadAt(ReadAtRequest {
                    file: Arc::new(file),
                    offset: 0,
                    len: 8,
                    read: 0,
                    buf: IoBufMut::with_capacity(8),
                    result: None,
                    sender: tx,
                }),
                Some(9),
            );
            assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
            drop(rx);

            match waiters.on_completion(waiter_id.user_data(), 3) {
                CompletionOutcome::Complete {
                    request,
                    target_tick: Some(9),
                } => request.complete(),
                _ => panic!("closed read waiter should be orphaned after partial CQE"),
            }
            assert!(waiters.is_empty());
        }
    }

    #[test]
    fn test_waiters_accept_expected_cancel_cqe_results() {
        // Verify the expected kernel cancel CQE results leave the waiter alive
        // for the original operation CQE to finish it later.
        for result in [0, -libc::EALREADY, -libc::ENOENT] {
            let mut waiters = Waiters::new(1);
            let (req, _rx) = make_sync_request();
            let waiter_id = waiters.insert(req, Some(2));
            assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
            assert!(waiters.cancel(waiter_id));

            assert!(matches!(
                waiters.on_completion(waiter_id.cancel_user_data(), result),
                CompletionOutcome::Cancel
            ));
            let state = waiter_state(&waiters, waiter_id).expect("waiter should remain tracked");
            assert!(matches!(state, WaiterState::CancelRequested));
        }
    }

    #[test]
    fn test_waiters_tolerate_unexpected_negative_cancel_result() {
        // Verify unexpected negative cancel CQEs are ignored rather than
        // corrupting waiter state.
        let mut waiters = Waiters::new(1);
        let (req, _rx) = make_sync_request();
        let waiter_id = waiters.insert(req, Some(2));
        assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
        assert!(waiters.cancel(waiter_id));

        assert!(matches!(
            waiters.on_completion(waiter_id.cancel_user_data(), -libc::EPERM),
            CompletionOutcome::Cancel
        ));
        let state = waiter_state(&waiters, waiter_id).expect("waiter should remain tracked");
        assert!(matches!(state, WaiterState::CancelRequested));
    }

    #[test]
    fn test_waiters_cancel_cqe_einval_panics() {
        // Verify `EINVAL` remains a hard invariant failure because the kernel
        // rejected our async cancel SQE.
        let mut waiters = Waiters::new(1);
        let (req, _rx) = make_sync_request();
        let waiter_id = waiters.insert(req, Some(2));
        assert!(matches!(waiters.stage(waiter_id), StageOutcome::Submit(_)));
        assert!(waiters.cancel(waiter_id));

        let result = catch_unwind(AssertUnwindSafe(|| {
            let _ = waiters.on_completion(waiter_id.cancel_user_data(), -libc::EINVAL);
        }));
        assert!(result.is_err());
    }

    #[test]
    fn test_waiters_stale_ids_cannot_remove_reused_slots() {
        // Verify stale waiter ids cannot observe state or remove a reused slot.
        let mut waiters = Waiters::new(1);
        let (req0, _rx0) = make_sync_request();
        let waiter_id = waiters.insert(req0, Some(1));
        let _ = remove_waiter(&mut waiters, waiter_id);

        let (req1, _rx1) = make_sync_request();
        let reused_id = waiters.insert(req1, Some(2));
        assert_ne!(reused_id, waiter_id);
        assert!(waiter_state(&waiters, waiter_id).is_none());
        let stale_remove = catch_unwind(AssertUnwindSafe(|| {
            let _ = remove_waiter(&mut waiters, waiter_id);
        }));
        assert!(stale_remove.is_err());
    }

    #[test]
    fn test_waiters_insert_and_cancel_invariants() {
        // Verify waiter capacity is enforced and that cancel remains valid even for waiters that
        // were inserted without a deadline.
        let mut waiters = Waiters::new(2);

        // Inserting beyond configured capacity should panic.
        let (req0, _rx0) = make_sync_request();
        let (req1, _rx1) = make_sync_request();
        let _ = waiters.insert(req0, None);
        let _ = waiters.insert(req1, None);
        let insert_overflow = catch_unwind(AssertUnwindSafe(|| {
            let (req2, _rx2) = make_sync_request();
            let _ = waiters.insert(req2, None);
        }));
        assert!(insert_overflow.is_err());

        // Cancellation is allowed even when no deadline is tracked.
        let mut waiters = Waiters::new(2);
        let (req, _rx) = make_sync_request();
        let no_deadline = waiters.insert(req, None);
        assert!(
            waiters.cancel(no_deadline),
            "cancel should support active waiter without deadline"
        );

        // Repeated cancel on the same waiter must be ignored.
        let (req, _rx) = make_sync_request();
        let active = waiters.insert(req, Some(3));
        assert!(waiters.cancel(active));
        assert!(!waiters.cancel(active));
    }
}