nodedb_bridge/eventfd.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cross-runtime wake signaling via Linux eventfd.
4//!
5//! eventfd is the only safe primitive for waking across Tokio and Glommio/monoio:
6//!
7//! - Both runtimes can poll a file descriptor.
8//! - No `Send` requirement on the waker itself — just read/write an fd.
9//! - Coalescing: multiple writes produce a single readable event.
10//!
11//! ## Usage
12//!
13//! Two EventFd instances per bridge channel:
14//!
15//! - `producer_wake`: Consumer writes → Producer reads (queue was full, now has space)
16//! - `consumer_wake`: Producer writes → Consumer reads (queue was empty, now has data)
17//!
18//! The runtime-specific integration (registering the fd with epoll/io_uring) is
19//! done by the caller. This module only provides raw fd-based signaling.
20
21use std::io;
22use std::os::unix::io::{AsRawFd, FromRawFd, OwnedFd, RawFd};
23
24/// A cross-runtime wake signal backed by a Linux eventfd.
25///
26/// Write to signal, read to consume. Multiple signals coalesce into one.
27/// The fd can be registered with any event loop (epoll, io_uring, kqueue fallback).
28pub struct EventFd {
29 fd: OwnedFd,
30}
31
32impl EventFd {
33 /// Create a new eventfd in semaphore mode.
34 ///
35 /// `EFD_NONBLOCK` ensures reads/writes never block the calling thread.
36 /// `EFD_CLOEXEC` prevents fd leaks across fork/exec.
37 pub fn new() -> io::Result<Self> {
38 // SAFETY: eventfd2 is a standard Linux syscall. Flags are valid.
39 let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
40 if fd < 0 {
41 return Err(io::Error::last_os_error());
42 }
43 // SAFETY: fd is a valid file descriptor returned by eventfd().
44 let fd = unsafe { OwnedFd::from_raw_fd(fd) };
45 Ok(Self { fd })
46 }
47
48 /// Signal the other side to wake up.
49 ///
50 /// Writes 1 to the eventfd counter. Multiple writes accumulate but
51 /// a single read clears all pending signals.
52 pub fn notify(&self) -> io::Result<()> {
53 let val: u64 = 1;
54 // SAFETY: writing 8 bytes to a valid eventfd.
55 let ret = unsafe {
56 libc::write(
57 self.fd.as_raw_fd(),
58 &val as *const u64 as *const libc::c_void,
59 8,
60 )
61 };
62 if ret < 0 {
63 Err(io::Error::last_os_error())
64 } else {
65 Ok(())
66 }
67 }
68
69 /// Consume the pending signal count, returning the accumulated value.
70 ///
71 /// Returns `Ok(0)` if no signal was pending (EAGAIN on non-blocking fd).
72 /// Returns `Ok(n)` where n is the accumulated signal count.
73 pub fn try_read(&self) -> io::Result<u64> {
74 let mut val: u64 = 0;
75 // SAFETY: reading 8 bytes from a valid eventfd.
76 let ret = unsafe {
77 libc::read(
78 self.fd.as_raw_fd(),
79 &mut val as *mut u64 as *mut libc::c_void,
80 8,
81 )
82 };
83 if ret < 0 {
84 let err = io::Error::last_os_error();
85 if err.kind() == io::ErrorKind::WouldBlock {
86 Ok(0)
87 } else {
88 Err(err)
89 }
90 } else {
91 Ok(val)
92 }
93 }
94
95 /// Get the raw file descriptor for registration with an event loop.
96 ///
97 /// The caller can register this fd with:
98 /// - Tokio: `AsyncFd::new()`
99 /// - Glommio: `GlommioDma::from_raw_fd()` or similar
100 /// - io_uring: `IORING_OP_READ` on the fd
101 pub fn as_fd(&self) -> RawFd {
102 self.fd.as_raw_fd()
103 }
104}
105
106// SAFETY: eventfd is a kernel object. The fd can be shared across threads.
107// Writes are atomic (8-byte writes to eventfd are guaranteed atomic by Linux).
108unsafe impl Send for EventFd {}
109unsafe impl Sync for EventFd {}
110
111/// A pair of eventfds for bidirectional wake signaling across the bridge.
112///
113/// ```text
114/// Producer (Tokio) Consumer (TPC)
115/// │ │
116/// │── notify(consumer_wake) ──→│ "queue has data"
117/// │ │
118/// │←── notify(producer_wake) ──│ "queue has space"
119/// ```
120pub struct WakePair {
121 /// Producer reads this to know the consumer freed space.
122 pub producer_wake: EventFd,
123 /// Consumer reads this to know the producer enqueued data.
124 pub consumer_wake: EventFd,
125}
126
127impl WakePair {
128 /// Create a new wake pair.
129 pub fn new() -> io::Result<Self> {
130 Ok(Self {
131 producer_wake: EventFd::new()?,
132 consumer_wake: EventFd::new()?,
133 })
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[test]
142 fn notify_and_read() {
143 let efd = EventFd::new().unwrap();
144
145 // No pending signal.
146 assert_eq!(efd.try_read().unwrap(), 0);
147
148 // Signal once.
149 efd.notify().unwrap();
150 assert_eq!(efd.try_read().unwrap(), 1);
151
152 // Consumed — nothing pending.
153 assert_eq!(efd.try_read().unwrap(), 0);
154 }
155
156 #[test]
157 fn multiple_notifies_accumulate() {
158 let efd = EventFd::new().unwrap();
159
160 efd.notify().unwrap();
161 efd.notify().unwrap();
162 efd.notify().unwrap();
163
164 // Single read returns accumulated count.
165 assert_eq!(efd.try_read().unwrap(), 3);
166 assert_eq!(efd.try_read().unwrap(), 0);
167 }
168
169 #[test]
170 fn wake_pair_bidirectional() {
171 let pair = WakePair::new().unwrap();
172
173 // Producer signals consumer.
174 pair.consumer_wake.notify().unwrap();
175 assert_eq!(pair.consumer_wake.try_read().unwrap(), 1);
176
177 // Consumer signals producer.
178 pair.producer_wake.notify().unwrap();
179 assert_eq!(pair.producer_wake.try_read().unwrap(), 1);
180 }
181
182 #[test]
183 fn fd_is_valid() {
184 let efd = EventFd::new().unwrap();
185 assert!(efd.as_fd() >= 0);
186 }
187}