Skip to main content

nodedb_bridge/
waker.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cross-runtime waker integration.
4//!
5//! The hardest part of the bridge: when the SPSC queue transitions between
6//! states (full→not-full, empty→not-empty), the sleeping side must be woken
7//! across the runtime boundary.
8//!
9//! ## Problem
10//!
11//! - Tokio wakers are `Send + Sync` (they post to Tokio's work-stealing scheduler).
12//! - Glommio/monoio wakers are `!Send` (they are core-local eventfd or io_uring CQE).
13//!
14//! We cannot store a `!Send` Glommio waker inside the shared ring buffer state
15//! (which is `Send + Sync`).
16//!
17//! ## Solution
18//!
19//! Use an **eventfd** as the cross-runtime signal. Both runtimes can poll an fd:
20//!
21//! - Producer (Tokio) writes 1 to eventfd when it pushes to a previously-empty queue.
22//! - Consumer (TPC) reads from eventfd to wake up.
23//! - Consumer (TPC) writes 1 to a *separate* eventfd when it pops from a previously-full queue.
24//! - Producer (Tokio) reads from that eventfd to wake up.
25//!
26//! This avoids storing any `!Send` waker in shared state. The eventfd is an OS
27//! primitive that both runtimes can integrate with their respective event loops.
28
29use std::sync::atomic::{AtomicBool, Ordering};
30
31/// Signal for waking the consumer when new data is available.
32///
33/// In production this will wrap an `eventfd` file descriptor.
34/// For testing without io_uring, it uses a simple atomic flag + thread parking.
35pub struct WakeSignal {
36    /// Atomic flag: true = there's a pending wake notification.
37    signaled: AtomicBool,
38}
39
40impl WakeSignal {
41    /// Create a new wake signal.
42    pub fn new() -> Self {
43        Self {
44            signaled: AtomicBool::new(false),
45        }
46    }
47
48    /// Signal the other side that it should wake up and check the queue.
49    pub fn notify(&self) {
50        self.signaled.store(true, Ordering::Release);
51    }
52
53    /// Check if a notification is pending and clear it.
54    /// Returns `true` if there was a pending notification.
55    pub fn take(&self) -> bool {
56        self.signaled.swap(false, Ordering::Acquire)
57    }
58
59    /// Check if a notification is pending without clearing it.
60    pub fn is_signaled(&self) -> bool {
61        self.signaled.load(Ordering::Acquire)
62    }
63}
64
65impl Default for WakeSignal {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74
75    #[test]
76    fn signal_notify_and_take() {
77        let signal = WakeSignal::new();
78
79        assert!(!signal.is_signaled());
80        assert!(!signal.take());
81
82        signal.notify();
83        assert!(signal.is_signaled());
84        assert!(signal.take());
85
86        // take() should have cleared it.
87        assert!(!signal.is_signaled());
88        assert!(!signal.take());
89    }
90
91    #[test]
92    fn multiple_notifies_coalesce() {
93        let signal = WakeSignal::new();
94
95        signal.notify();
96        signal.notify();
97        signal.notify();
98
99        // Single take() clears all pending notifications.
100        assert!(signal.take());
101        assert!(!signal.take());
102    }
103}