Skip to main content

nodedb_bridge/
eventfd.rs

1//! Cross-runtime wake signaling via Linux eventfd.
2//!
3//! eventfd is the only safe primitive for waking across Tokio and Glommio/monoio:
4//!
5//! - Both runtimes can poll a file descriptor.
6//! - No `Send` requirement on the waker itself — just read/write an fd.
7//! - Coalescing: multiple writes produce a single readable event.
8//!
9//! ## Usage
10//!
11//! Two EventFd instances per bridge channel:
12//!
13//! - `producer_wake`: Consumer writes → Producer reads (queue was full, now has space)
14//! - `consumer_wake`: Producer writes → Consumer reads (queue was empty, now has data)
15//!
16//! The runtime-specific integration (registering the fd with epoll/io_uring) is
17//! done by the caller. This module only provides raw fd-based signaling.
18
19use std::io;
20use std::os::unix::io::{AsRawFd, FromRawFd, OwnedFd, RawFd};
21
22/// A cross-runtime wake signal backed by a Linux eventfd.
23///
24/// Write to signal, read to consume. Multiple signals coalesce into one.
25/// The fd can be registered with any event loop (epoll, io_uring, kqueue fallback).
26pub struct EventFd {
27    fd: OwnedFd,
28}
29
30impl EventFd {
31    /// Create a new eventfd in semaphore mode.
32    ///
33    /// `EFD_NONBLOCK` ensures reads/writes never block the calling thread.
34    /// `EFD_CLOEXEC` prevents fd leaks across fork/exec.
35    pub fn new() -> io::Result<Self> {
36        // SAFETY: eventfd2 is a standard Linux syscall. Flags are valid.
37        let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
38        if fd < 0 {
39            return Err(io::Error::last_os_error());
40        }
41        // SAFETY: fd is a valid file descriptor returned by eventfd().
42        let fd = unsafe { OwnedFd::from_raw_fd(fd) };
43        Ok(Self { fd })
44    }
45
46    /// Signal the other side to wake up.
47    ///
48    /// Writes 1 to the eventfd counter. Multiple writes accumulate but
49    /// a single read clears all pending signals.
50    pub fn notify(&self) -> io::Result<()> {
51        let val: u64 = 1;
52        // SAFETY: writing 8 bytes to a valid eventfd.
53        let ret = unsafe {
54            libc::write(
55                self.fd.as_raw_fd(),
56                &val as *const u64 as *const libc::c_void,
57                8,
58            )
59        };
60        if ret < 0 {
61            Err(io::Error::last_os_error())
62        } else {
63            Ok(())
64        }
65    }
66
67    /// Consume the pending signal count, returning the accumulated value.
68    ///
69    /// Returns `Ok(0)` if no signal was pending (EAGAIN on non-blocking fd).
70    /// Returns `Ok(n)` where n is the accumulated signal count.
71    pub fn try_read(&self) -> io::Result<u64> {
72        let mut val: u64 = 0;
73        // SAFETY: reading 8 bytes from a valid eventfd.
74        let ret = unsafe {
75            libc::read(
76                self.fd.as_raw_fd(),
77                &mut val as *mut u64 as *mut libc::c_void,
78                8,
79            )
80        };
81        if ret < 0 {
82            let err = io::Error::last_os_error();
83            if err.kind() == io::ErrorKind::WouldBlock {
84                Ok(0)
85            } else {
86                Err(err)
87            }
88        } else {
89            Ok(val)
90        }
91    }
92
93    /// Get the raw file descriptor for registration with an event loop.
94    ///
95    /// The caller can register this fd with:
96    /// - Tokio: `AsyncFd::new()`
97    /// - Glommio: `GlommioDma::from_raw_fd()` or similar
98    /// - io_uring: `IORING_OP_READ` on the fd
99    pub fn as_fd(&self) -> RawFd {
100        self.fd.as_raw_fd()
101    }
102}
103
104// SAFETY: eventfd is a kernel object. The fd can be shared across threads.
105// Writes are atomic (8-byte writes to eventfd are guaranteed atomic by Linux).
106unsafe impl Send for EventFd {}
107unsafe impl Sync for EventFd {}
108
109/// A pair of eventfds for bidirectional wake signaling across the bridge.
110///
111/// ```text
112/// Producer (Tokio)          Consumer (TPC)
113///    │                          │
114///    │── notify(consumer_wake) ──→│  "queue has data"
115///    │                          │
116///    │←── notify(producer_wake) ──│  "queue has space"
117/// ```
118pub struct WakePair {
119    /// Producer reads this to know the consumer freed space.
120    pub producer_wake: EventFd,
121    /// Consumer reads this to know the producer enqueued data.
122    pub consumer_wake: EventFd,
123}
124
125impl WakePair {
126    /// Create a new wake pair.
127    pub fn new() -> io::Result<Self> {
128        Ok(Self {
129            producer_wake: EventFd::new()?,
130            consumer_wake: EventFd::new()?,
131        })
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn notify_and_read() {
141        let efd = EventFd::new().unwrap();
142
143        // No pending signal.
144        assert_eq!(efd.try_read().unwrap(), 0);
145
146        // Signal once.
147        efd.notify().unwrap();
148        assert_eq!(efd.try_read().unwrap(), 1);
149
150        // Consumed — nothing pending.
151        assert_eq!(efd.try_read().unwrap(), 0);
152    }
153
154    #[test]
155    fn multiple_notifies_accumulate() {
156        let efd = EventFd::new().unwrap();
157
158        efd.notify().unwrap();
159        efd.notify().unwrap();
160        efd.notify().unwrap();
161
162        // Single read returns accumulated count.
163        assert_eq!(efd.try_read().unwrap(), 3);
164        assert_eq!(efd.try_read().unwrap(), 0);
165    }
166
167    #[test]
168    fn wake_pair_bidirectional() {
169        let pair = WakePair::new().unwrap();
170
171        // Producer signals consumer.
172        pair.consumer_wake.notify().unwrap();
173        assert_eq!(pair.consumer_wake.try_read().unwrap(), 1);
174
175        // Consumer signals producer.
176        pair.producer_wake.notify().unwrap();
177        assert_eq!(pair.producer_wake.try_read().unwrap(), 1);
178    }
179
180    #[test]
181    fn fd_is_valid() {
182        let efd = EventFd::new().unwrap();
183        assert!(efd.as_fd() >= 0);
184    }
185}