nodedb_bridge/waker.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cross-runtime waker integration.
4//!
5//! The hardest part of the bridge: when the SPSC queue transitions between
6//! states (full→not-full, empty→not-empty), the sleeping side must be woken
7//! across the runtime boundary.
8//!
9//! ## Problem
10//!
11//! - Tokio wakers are `Send + Sync` (they post to Tokio's work-stealing scheduler).
12//! - Glommio/monoio wakers are `!Send` (they are core-local eventfd or io_uring CQE).
13//!
14//! We cannot store a `!Send` Glommio waker inside the shared ring buffer state
15//! (which is `Send + Sync`).
16//!
17//! ## Solution
18//!
19//! Use an **eventfd** as the cross-runtime signal. Both runtimes can poll an fd:
20//!
21//! - Producer (Tokio) writes 1 to eventfd when it pushes to a previously-empty queue.
22//! - Consumer (TPC) reads from eventfd to wake up.
23//! - Consumer (TPC) writes 1 to a *separate* eventfd when it pops from a previously-full queue.
24//! - Producer (Tokio) reads from that eventfd to wake up.
25//!
26//! This avoids storing any `!Send` waker in shared state. The eventfd is an OS
27//! primitive that both runtimes can integrate with their respective event loops.
28
29use std::sync::atomic::{AtomicBool, Ordering};
30
31/// Signal for waking the consumer when new data is available.
32///
33/// In production this will wrap an `eventfd` file descriptor.
34/// For testing without io_uring, it uses a simple atomic flag + thread parking.
35pub struct WakeSignal {
36 /// Atomic flag: true = there's a pending wake notification.
37 signaled: AtomicBool,
38}
39
40impl WakeSignal {
41 /// Create a new wake signal.
42 pub fn new() -> Self {
43 Self {
44 signaled: AtomicBool::new(false),
45 }
46 }
47
48 /// Signal the other side that it should wake up and check the queue.
49 pub fn notify(&self) {
50 self.signaled.store(true, Ordering::Release);
51 }
52
53 /// Check if a notification is pending and clear it.
54 /// Returns `true` if there was a pending notification.
55 pub fn take(&self) -> bool {
56 self.signaled.swap(false, Ordering::Acquire)
57 }
58
59 /// Check if a notification is pending without clearing it.
60 pub fn is_signaled(&self) -> bool {
61 self.signaled.load(Ordering::Acquire)
62 }
63}
64
65impl Default for WakeSignal {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74
75 #[test]
76 fn signal_notify_and_take() {
77 let signal = WakeSignal::new();
78
79 assert!(!signal.is_signaled());
80 assert!(!signal.take());
81
82 signal.notify();
83 assert!(signal.is_signaled());
84 assert!(signal.take());
85
86 // take() should have cleared it.
87 assert!(!signal.is_signaled());
88 assert!(!signal.take());
89 }
90
91 #[test]
92 fn multiple_notifies_coalesce() {
93 let signal = WakeSignal::new();
94
95 signal.notify();
96 signal.notify();
97 signal.notify();
98
99 // Single take() clears all pending notifications.
100 assert!(signal.take());
101 assert!(!signal.take());
102 }
103}