nodedb_bridge/eventfd.rs
1//! Cross-runtime wake signaling via Linux eventfd.
2//!
3//! eventfd is the only safe primitive for waking across Tokio and Glommio/monoio:
4//!
5//! - Both runtimes can poll a file descriptor.
6//! - No `Send` requirement on the waker itself — just read/write an fd.
7//! - Coalescing: multiple writes produce a single readable event.
8//!
9//! ## Usage
10//!
11//! Two EventFd instances per bridge channel:
12//!
13//! - `producer_wake`: Consumer writes → Producer reads (queue was full, now has space)
14//! - `consumer_wake`: Producer writes → Consumer reads (queue was empty, now has data)
15//!
16//! The runtime-specific integration (registering the fd with epoll/io_uring) is
17//! done by the caller. This module only provides raw fd-based signaling.
18
19use std::io;
20use std::os::unix::io::{AsRawFd, FromRawFd, OwnedFd, RawFd};
21
22/// A cross-runtime wake signal backed by a Linux eventfd.
23///
24/// Write to signal, read to consume. Multiple signals coalesce into one.
25/// The fd can be registered with any event loop (epoll, io_uring, kqueue fallback).
26pub struct EventFd {
27 fd: OwnedFd,
28}
29
30impl EventFd {
31 /// Create a new eventfd in semaphore mode.
32 ///
33 /// `EFD_NONBLOCK` ensures reads/writes never block the calling thread.
34 /// `EFD_CLOEXEC` prevents fd leaks across fork/exec.
35 pub fn new() -> io::Result<Self> {
36 // SAFETY: eventfd2 is a standard Linux syscall. Flags are valid.
37 let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
38 if fd < 0 {
39 return Err(io::Error::last_os_error());
40 }
41 // SAFETY: fd is a valid file descriptor returned by eventfd().
42 let fd = unsafe { OwnedFd::from_raw_fd(fd) };
43 Ok(Self { fd })
44 }
45
46 /// Signal the other side to wake up.
47 ///
48 /// Writes 1 to the eventfd counter. Multiple writes accumulate but
49 /// a single read clears all pending signals.
50 pub fn notify(&self) -> io::Result<()> {
51 let val: u64 = 1;
52 // SAFETY: writing 8 bytes to a valid eventfd.
53 let ret = unsafe {
54 libc::write(
55 self.fd.as_raw_fd(),
56 &val as *const u64 as *const libc::c_void,
57 8,
58 )
59 };
60 if ret < 0 {
61 Err(io::Error::last_os_error())
62 } else {
63 Ok(())
64 }
65 }
66
67 /// Consume the pending signal count, returning the accumulated value.
68 ///
69 /// Returns `Ok(0)` if no signal was pending (EAGAIN on non-blocking fd).
70 /// Returns `Ok(n)` where n is the accumulated signal count.
71 pub fn try_read(&self) -> io::Result<u64> {
72 let mut val: u64 = 0;
73 // SAFETY: reading 8 bytes from a valid eventfd.
74 let ret = unsafe {
75 libc::read(
76 self.fd.as_raw_fd(),
77 &mut val as *mut u64 as *mut libc::c_void,
78 8,
79 )
80 };
81 if ret < 0 {
82 let err = io::Error::last_os_error();
83 if err.kind() == io::ErrorKind::WouldBlock {
84 Ok(0)
85 } else {
86 Err(err)
87 }
88 } else {
89 Ok(val)
90 }
91 }
92
93 /// Get the raw file descriptor for registration with an event loop.
94 ///
95 /// The caller can register this fd with:
96 /// - Tokio: `AsyncFd::new()`
97 /// - Glommio: `GlommioDma::from_raw_fd()` or similar
98 /// - io_uring: `IORING_OP_READ` on the fd
99 pub fn as_fd(&self) -> RawFd {
100 self.fd.as_raw_fd()
101 }
102}
103
104// SAFETY: eventfd is a kernel object. The fd can be shared across threads.
105// Writes are atomic (8-byte writes to eventfd are guaranteed atomic by Linux).
106unsafe impl Send for EventFd {}
107unsafe impl Sync for EventFd {}
108
109/// A pair of eventfds for bidirectional wake signaling across the bridge.
110///
111/// ```text
112/// Producer (Tokio) Consumer (TPC)
113/// │ │
114/// │── notify(consumer_wake) ──→│ "queue has data"
115/// │ │
116/// │←── notify(producer_wake) ──│ "queue has space"
117/// ```
118pub struct WakePair {
119 /// Producer reads this to know the consumer freed space.
120 pub producer_wake: EventFd,
121 /// Consumer reads this to know the producer enqueued data.
122 pub consumer_wake: EventFd,
123}
124
125impl WakePair {
126 /// Create a new wake pair.
127 pub fn new() -> io::Result<Self> {
128 Ok(Self {
129 producer_wake: EventFd::new()?,
130 consumer_wake: EventFd::new()?,
131 })
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn notify_and_read() {
141 let efd = EventFd::new().unwrap();
142
143 // No pending signal.
144 assert_eq!(efd.try_read().unwrap(), 0);
145
146 // Signal once.
147 efd.notify().unwrap();
148 assert_eq!(efd.try_read().unwrap(), 1);
149
150 // Consumed — nothing pending.
151 assert_eq!(efd.try_read().unwrap(), 0);
152 }
153
154 #[test]
155 fn multiple_notifies_accumulate() {
156 let efd = EventFd::new().unwrap();
157
158 efd.notify().unwrap();
159 efd.notify().unwrap();
160 efd.notify().unwrap();
161
162 // Single read returns accumulated count.
163 assert_eq!(efd.try_read().unwrap(), 3);
164 assert_eq!(efd.try_read().unwrap(), 0);
165 }
166
167 #[test]
168 fn wake_pair_bidirectional() {
169 let pair = WakePair::new().unwrap();
170
171 // Producer signals consumer.
172 pair.consumer_wake.notify().unwrap();
173 assert_eq!(pair.consumer_wake.try_read().unwrap(), 1);
174
175 // Consumer signals producer.
176 pair.producer_wake.notify().unwrap();
177 assert_eq!(pair.producer_wake.try_read().unwrap(), 1);
178 }
179
180 #[test]
181 fn fd_is_valid() {
182 let efd = EventFd::new().unwrap();
183 assert!(efd.as_fd() >= 0);
184 }
185}