nodedb_bridge/waker.rs
1//! Cross-runtime waker integration.
2//!
3//! The hardest part of the bridge: when the SPSC queue transitions between
4//! states (full→not-full, empty→not-empty), the sleeping side must be woken
5//! across the runtime boundary.
6//!
7//! ## Problem
8//!
9//! - Tokio wakers are `Send + Sync` (they post to Tokio's work-stealing scheduler).
10//! - Glommio/monoio wakers are `!Send` (they are core-local eventfd or io_uring CQE).
11//!
12//! We cannot store a `!Send` Glommio waker inside the shared ring buffer state
13//! (which is `Send + Sync`).
14//!
15//! ## Solution
16//!
17//! Use an **eventfd** as the cross-runtime signal. Both runtimes can poll an fd:
18//!
19//! - Producer (Tokio) writes 1 to eventfd when it pushes to a previously-empty queue.
20//! - Consumer (TPC) reads from eventfd to wake up.
21//! - Consumer (TPC) writes 1 to a *separate* eventfd when it pops from a previously-full queue.
22//! - Producer (Tokio) reads from that eventfd to wake up.
23//!
24//! This avoids storing any `!Send` waker in shared state. The eventfd is an OS
25//! primitive that both runtimes can integrate with their respective event loops.
26
27use std::sync::atomic::{AtomicBool, Ordering};
28
29/// Signal for waking the consumer when new data is available.
30///
31/// In production this will wrap an `eventfd` file descriptor.
32/// For testing without io_uring, it uses a simple atomic flag + thread parking.
33pub struct WakeSignal {
34 /// Atomic flag: true = there's a pending wake notification.
35 signaled: AtomicBool,
36}
37
38impl WakeSignal {
39 /// Create a new wake signal.
40 pub fn new() -> Self {
41 Self {
42 signaled: AtomicBool::new(false),
43 }
44 }
45
46 /// Signal the other side that it should wake up and check the queue.
47 pub fn notify(&self) {
48 self.signaled.store(true, Ordering::Release);
49 }
50
51 /// Check if a notification is pending and clear it.
52 /// Returns `true` if there was a pending notification.
53 pub fn take(&self) -> bool {
54 self.signaled.swap(false, Ordering::Acquire)
55 }
56
57 /// Check if a notification is pending without clearing it.
58 pub fn is_signaled(&self) -> bool {
59 self.signaled.load(Ordering::Acquire)
60 }
61}
62
63impl Default for WakeSignal {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::*;
72
73 #[test]
74 fn signal_notify_and_take() {
75 let signal = WakeSignal::new();
76
77 assert!(!signal.is_signaled());
78 assert!(!signal.take());
79
80 signal.notify();
81 assert!(signal.is_signaled());
82 assert!(signal.take());
83
84 // take() should have cleared it.
85 assert!(!signal.is_signaled());
86 assert!(!signal.take());
87 }
88
89 #[test]
90 fn multiple_notifies_coalesce() {
91 let signal = WakeSignal::new();
92
93 signal.notify();
94 signal.notify();
95 signal.notify();
96
97 // Single take() clears all pending notifications.
98 assert!(signal.take());
99 assert!(!signal.take());
100 }
101}