Skip to main content

nodedb_bridge/
waker.rs

1//! Cross-runtime waker integration.
2//!
3//! The hardest part of the bridge: when the SPSC queue transitions between
4//! states (full→not-full, empty→not-empty), the sleeping side must be woken
5//! across the runtime boundary.
6//!
7//! ## Problem
8//!
9//! - Tokio wakers are `Send + Sync` (they post to Tokio's work-stealing scheduler).
10//! - Glommio/monoio wakers are `!Send` (they are core-local eventfd or io_uring CQE).
11//!
12//! We cannot store a `!Send` Glommio waker inside the shared ring buffer state
13//! (which is `Send + Sync`).
14//!
15//! ## Solution
16//!
17//! Use an **eventfd** as the cross-runtime signal. Both runtimes can poll an fd:
18//!
19//! - Producer (Tokio) writes 1 to eventfd when it pushes to a previously-empty queue.
20//! - Consumer (TPC) reads from eventfd to wake up.
21//! - Consumer (TPC) writes 1 to a *separate* eventfd when it pops from a previously-full queue.
22//! - Producer (Tokio) reads from that eventfd to wake up.
23//!
24//! This avoids storing any `!Send` waker in shared state. The eventfd is an OS
25//! primitive that both runtimes can integrate with their respective event loops.
26
27use std::sync::atomic::{AtomicBool, Ordering};
28
29/// Signal for waking the consumer when new data is available.
30///
31/// In production this will wrap an `eventfd` file descriptor.
32/// For testing without io_uring, it uses a simple atomic flag + thread parking.
33pub struct WakeSignal {
34    /// Atomic flag: true = there's a pending wake notification.
35    signaled: AtomicBool,
36}
37
38impl WakeSignal {
39    /// Create a new wake signal.
40    pub fn new() -> Self {
41        Self {
42            signaled: AtomicBool::new(false),
43        }
44    }
45
46    /// Signal the other side that it should wake up and check the queue.
47    pub fn notify(&self) {
48        self.signaled.store(true, Ordering::Release);
49    }
50
51    /// Check if a notification is pending and clear it.
52    /// Returns `true` if there was a pending notification.
53    pub fn take(&self) -> bool {
54        self.signaled.swap(false, Ordering::Acquire)
55    }
56
57    /// Check if a notification is pending without clearing it.
58    pub fn is_signaled(&self) -> bool {
59        self.signaled.load(Ordering::Acquire)
60    }
61}
62
63impl Default for WakeSignal {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72
73    #[test]
74    fn signal_notify_and_take() {
75        let signal = WakeSignal::new();
76
77        assert!(!signal.is_signaled());
78        assert!(!signal.take());
79
80        signal.notify();
81        assert!(signal.is_signaled());
82        assert!(signal.take());
83
84        // take() should have cleared it.
85        assert!(!signal.is_signaled());
86        assert!(!signal.take());
87    }
88
89    #[test]
90    fn multiple_notifies_coalesce() {
91        let signal = WakeSignal::new();
92
93        signal.notify();
94        signal.notify();
95        signal.notify();
96
97        // Single take() clears all pending notifications.
98        assert!(signal.take());
99        assert!(!signal.take());
100    }
101}