events 0.6.0

Async manual-reset and auto-reset events for multi-use signaling
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
// Intrusive doubly-linked waiter queue shared by all event types.
//
// # Design
//
// This module provides a FIFO doubly-linked list of waiter nodes used to
// park and wake futures that are waiting on an event. The list is intrusive:
// each node is embedded directly inside the wait future (behind an
// `UnsafeCell`) rather than being heap-allocated separately.
//
// # Ownership model
//
// The `WaiterList` does not own its nodes. Each `WaiterNode` is a field of
// the wait future that created it (e.g. `AutoResetWaitFuture`). The node is
// linked into the list when the future is first polled and unlinked either
// by the future's `Drop` impl or (for auto-reset events) by `set()`.
//
// Nodes are pinned (`PhantomPinned`) because the list holds raw pointers to
// them. The containing future must be pinned before polling, ensuring the
// node address remains stable for its entire lifetime in the list.
//
// # Thread safety
//
// The list itself has no internal synchronization. Callers are responsible
// for ensuring exclusive access:
//
// * **Sync event variants** (`ManualResetEvent`, `AutoResetEvent`): all node
//   and list access is protected by the owning event's `Mutex`.
// * **Local event variants** (`LocalManualResetEvent`, `LocalAutoResetEvent`):
//   the containing types are `!Send`, so all access is confined to a single
//   thread. No lock is needed.
//
// `WaiterList` has an explicit `unsafe impl Send` because it contains raw
// pointers (`*mut WaiterNode`), which are `!Send` by default in Rust.
// Sending is safe because sync variants wrap the list in a `Mutex` (so
// pointers are never dereferenced without the lock), and local variants
// never actually send the list across threads (their container is `!Send`).
//
// # Node lifecycle
//
// 1. **Creation**: a `WaiterNode` is created (unlinked, `notified = false`,
//    `waker = None`) when the wait future is constructed via `event.wait()`.
//
// 2. **Registration**: on the first `poll()`, the future stores the caller's
//    waker in the node and pushes it onto the list via `push_back()`. The
//    future's `registered` flag is set to `true`.
//
// 3. **Re-poll**: if polled again before being woken, the future replaces
//    the stored waker with the (possibly new) one from the context. The
//    node stays in the list.
//
// 4. **Notification**: `set()` wakes one or more nodes depending on the
//    event type:
//    - **Auto-reset**: `pop_front()` removes one node, sets its `notified`
//      flag to `true`, takes the waker, and calls `wake()`. The node is no
//      longer in the list.
//    - **Manual-reset**: walks the list via `head()`, takes each node's
//      waker, and calls `wake()` — but nodes remain in the list. Futures
//      remove themselves on their next poll (which sees `is_set == true`).
//
// 5. **Completion**: when the woken future is polled again, it sees either
//    `notified == true` (auto-reset) or `is_set == true` (manual-reset),
//    unlinks itself if still registered, and returns `Poll::Ready`.
//
// 6. **Cancellation (Drop)**: if a future is dropped while registered:
//    - **Auto-reset, not notified**: simply removes the node from the list.
//    - **Auto-reset, notified**: the signal must not be lost, so the drop
//      impl forwards the notification to the next waiter (or re-sets the
//      `is_set` flag if no waiters remain).
//    - **Manual-reset**: simply removes the node from the list. No signal
//      forwarding is needed because `is_set` remains `true`.
//
// # The `notified` flag
//
// Used only by auto-reset events. When `set()` pops a node, it sets
// `notified = true` on that node before waking. This serves two purposes:
//
// * The future's `poll()` can detect that it was specifically chosen by
//   `set()` and should return `Ready`, even though the event's `is_set`
//   flag may already be `false` (consumed by another waiter).
// * The future's `Drop` knows to forward the signal if the future is
//   cancelled after being notified but before being polled to completion.
//
// # The `registered` flag
//
// Tracked on the future (not the node) to know whether the node is
// currently in the list. This avoids scanning the list to check membership
// and allows `Drop` to skip cleanup when the future was never polled or
// has already been completed.
//
// # UnsafeCell for the node
//
// The node is wrapped in `UnsafeCell` inside the future because both
// `poll()` (via `Pin::get_unchecked_mut`) and the event's `set()` (via
// raw pointers from the list) need to access the same node. `UnsafeCell`
// opts out of Rust's aliasing guarantees, making this sound as long as
// accesses are serialized — which they are, by the Mutex or single-thread
// invariant described above.
//
// # Re-entrancy in `set()`
//
// Waker `wake()` calls can be re-entrant: a waker may immediately poll
// the future or drop it, causing further list mutations. The `set()`
// implementations handle this by:
//
// * **Sync variants**: releasing the Mutex before calling `wake()`, then
//   re-acquiring it and rescanning from the list head. No node pointer
//   survives across the lock release.
// * **Local variants**: accessing the list only through raw pointers (not
//   `&mut WaiterList` references) and rescanning from head after each
//   wake. No reference or stored `next` pointer survives across `wake()`.

use std::marker::PhantomPinned;
use std::task::Waker;

pub(crate) struct WaiterNode {
    /// The waker to call when this waiter is selected for notification.
    /// Stored by `poll_wait()` and consumed by `set()`.
    pub(crate) waker: Option<Waker>,

    /// Intrusive doubly-linked list pointers, managed by [`WaiterList`].
    pub(crate) next: *mut Self,
    pub(crate) prev: *mut Self,

    /// Set to `true` by `set()` after this node is popped from the waiter
    /// list. The owning future checks this flag on the next poll to know
    /// it should complete with `Ready`.
    pub(crate) notified: bool,

    _pinned: PhantomPinned,
}

impl WaiterNode {
    pub(crate) fn new() -> Self {
        Self {
            waker: None,
            next: std::ptr::null_mut(),
            prev: std::ptr::null_mut(),
            notified: false,
            _pinned: PhantomPinned,
        }
    }
}

pub(crate) struct WaiterList {
    head: *mut WaiterNode,
    tail: *mut WaiterNode,
}

impl WaiterList {
    pub(crate) fn new() -> Self {
        Self {
            head: std::ptr::null_mut(),
            tail: std::ptr::null_mut(),
        }
    }

    #[cfg(test)]
    fn is_empty(&self) -> bool {
        self.head.is_null()
    }

    /// Appends a node to the back of the list (FIFO enqueue).
    ///
    /// # Safety
    ///
    /// * `node` must point to a valid, pinned `WaiterNode` that is not already
    ///   in any list.
    /// * The caller must ensure exclusive access to the list.
    pub(crate) unsafe fn push_back(&mut self, node: *mut WaiterNode) {
        let node_mut = node;

        // SAFETY: Caller guarantees node is valid and we hold the lock.
        unsafe {
            (*node_mut).next = std::ptr::null_mut();
        }

        // SAFETY: Caller guarantees node is valid and we hold the lock.
        unsafe {
            (*node_mut).prev = self.tail;
        }

        if self.tail.is_null() {
            self.head = node;
        } else {
            // SAFETY: tail is non-null and we hold the lock.
            unsafe {
                (*self.tail).next = node;
            }
        }

        self.tail = node;
    }

    /// Removes a specific node from the list.
    ///
    /// # Safety
    ///
    /// * `node` must point to a valid, pinned `WaiterNode` that is currently
    ///   in this list.
    /// * The caller must ensure exclusive access to the list.
    pub(crate) unsafe fn remove(&mut self, node: *mut WaiterNode) {
        // SAFETY: Caller guarantees node is valid and in the list.
        let prev = unsafe { (*node).prev };
        // SAFETY: Caller guarantees node is valid and in the list.
        let next = unsafe { (*node).next };

        if prev.is_null() {
            self.head = next;
        } else {
            // SAFETY: prev is a valid node in the list.
            unsafe {
                (*prev).next = next;
            }
        }

        if next.is_null() {
            self.tail = prev;
        } else {
            // SAFETY: next is a valid node in the list.
            unsafe {
                (*next).prev = prev;
            }
        }

        // Clear the removed node's links for safety.
        let node_mut = node;

        // SAFETY: Caller guarantees node is valid and we hold the lock.
        unsafe {
            (*node_mut).next = std::ptr::null_mut();
        }

        // SAFETY: Caller guarantees node is valid and we hold the lock.
        unsafe {
            (*node_mut).prev = std::ptr::null_mut();
        }
    }

    /// Removes and returns the front node (FIFO dequeue).
    ///
    /// # Safety
    ///
    /// * The caller must ensure exclusive access to the list.
    /// * Any non-null node returned is valid and pinned.
    pub(crate) unsafe fn pop_front(&mut self) -> Option<*mut WaiterNode> {
        if self.head.is_null() {
            return None;
        }

        let node = self.head;

        // SAFETY: head is non-null, so it is a valid node in the list.
        let next = unsafe { (*node).next };

        self.head = next;

        if next.is_null() {
            self.tail = std::ptr::null_mut();
        } else {
            // SAFETY: next is a valid node in the list.
            unsafe {
                (*next).prev = std::ptr::null_mut();
            }
        }

        // Clear the removed node's links.
        // SAFETY: node is valid (we just read from it above).
        unsafe {
            (*node).next = std::ptr::null_mut();
        }

        // SAFETY: node is valid.
        unsafe {
            (*node).prev = std::ptr::null_mut();
        }

        Some(node)
    }

    /// Iterates over all nodes, calling `f` with each node pointer.
    /// Nodes are visited front-to-back. The callback must not modify
    /// the list structure.
    ///
    /// # Safety
    ///
    /// * The caller must ensure exclusive access to the list.
    /// * The callback must not add or remove nodes from the list.
    #[cfg(test)]
    pub(crate) unsafe fn for_each(&self, mut f: impl FnMut(*mut WaiterNode)) {
        let mut current = self.head;
        while !current.is_null() {
            // Read next before calling f so we do not hold a reference to
            // the current node across the callback.
            // SAFETY: current is non-null and in the list.
            let next = unsafe { (*current).next };
            f(current);
            current = next;
        }
    }

    /// Returns a pointer to the head node, or null if the list is empty.
    pub(crate) fn head(&self) -> *mut WaiterNode {
        self.head
    }
}

// SAFETY: See the module-level comment for the full thread safety rationale.
// Raw pointers are `!Send` by default; this impl is sound because all pointer
// dereferences are serialized by external synchronization (Mutex or
// single-thread confinement).
unsafe impl Send for WaiterList {}

#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
#[allow(
    clippy::undocumented_unsafe_blocks,
    clippy::multiple_unsafe_ops_per_block,
    clippy::indexing_slicing,
    reason = "test-only code with trivial safety invariants"
)]
mod tests {
    use super::*;

    #[test]
    fn push_back_and_pop_front_fifo_order() {
        let mut list = WaiterList::new();
        let mut a = WaiterNode::new();
        let mut b = WaiterNode::new();
        let mut c = WaiterNode::new();

        let pa: *mut WaiterNode = &raw mut a;
        let pb: *mut WaiterNode = &raw mut b;
        let pc: *mut WaiterNode = &raw mut c;

        unsafe {
            list.push_back(pa);
            list.push_back(pb);
            list.push_back(pc);
        }

        assert!(!list.is_empty());

        let first = unsafe { list.pop_front() };
        assert!(std::ptr::eq(first.unwrap(), pa));

        let second = unsafe { list.pop_front() };
        assert!(std::ptr::eq(second.unwrap(), pb));

        let third = unsafe { list.pop_front() };
        assert!(std::ptr::eq(third.unwrap(), pc));

        assert!(list.is_empty());
        assert!(unsafe { list.pop_front() }.is_none());
    }

    #[test]
    fn remove_middle_node() {
        let mut list = WaiterList::new();
        let mut a = WaiterNode::new();
        let mut b = WaiterNode::new();
        let mut c = WaiterNode::new();

        let pa: *mut WaiterNode = &raw mut a;
        let pb: *mut WaiterNode = &raw mut b;
        let pc: *mut WaiterNode = &raw mut c;

        unsafe {
            list.push_back(pa);
            list.push_back(pb);
            list.push_back(pc);
            list.remove(pb);
        }

        let first = unsafe { list.pop_front() };
        assert!(std::ptr::eq(first.unwrap(), pa));

        let second = unsafe { list.pop_front() };
        assert!(std::ptr::eq(second.unwrap(), pc));

        assert!(list.is_empty());
    }

    #[test]
    fn remove_head_node() {
        let mut list = WaiterList::new();
        let mut a = WaiterNode::new();
        let mut b = WaiterNode::new();

        let pa: *mut WaiterNode = &raw mut a;
        let pb: *mut WaiterNode = &raw mut b;

        unsafe {
            list.push_back(pa);
            list.push_back(pb);
            list.remove(pa);
        }

        let first = unsafe { list.pop_front() };
        assert!(std::ptr::eq(first.unwrap(), pb));
        assert!(list.is_empty());
    }

    #[test]
    fn remove_tail_node() {
        let mut list = WaiterList::new();
        let mut a = WaiterNode::new();
        let mut b = WaiterNode::new();

        let pa: *mut WaiterNode = &raw mut a;
        let pb: *mut WaiterNode = &raw mut b;

        unsafe {
            list.push_back(pa);
            list.push_back(pb);
            list.remove(pb);
        }

        let first = unsafe { list.pop_front() };
        assert!(std::ptr::eq(first.unwrap(), pa));
        assert!(list.is_empty());
    }

    #[test]
    fn remove_only_node() {
        let mut list = WaiterList::new();
        let mut a = WaiterNode::new();

        let pa: *mut WaiterNode = &raw mut a;

        unsafe {
            list.push_back(pa);
            list.remove(pa);
        }

        assert!(list.is_empty());
    }

    #[test]
    fn for_each_visits_all_nodes() {
        let mut list = WaiterList::new();
        let mut a = WaiterNode::new();
        let mut b = WaiterNode::new();

        let pa: *mut WaiterNode = &raw mut a;
        let pb: *mut WaiterNode = &raw mut b;

        unsafe {
            list.push_back(pa);
            list.push_back(pb);
        }

        let mut visited = Vec::new();
        unsafe {
            list.for_each(|node| visited.push(node));
        }

        assert_eq!(visited.len(), 2);
        assert!(std::ptr::eq(visited[0], pa));
        assert!(std::ptr::eq(visited[1], pb));
    }
}