Skip to main content

nodedb_bridge/
eventfd.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cross-runtime wake signaling via Linux eventfd.
4//!
5//! eventfd is the only safe primitive for waking across Tokio and Glommio/monoio:
6//!
7//! - Both runtimes can poll a file descriptor.
8//! - No `Send` requirement on the waker itself — just read/write an fd.
9//! - Coalescing: multiple writes produce a single readable event.
10//!
11//! ## Usage
12//!
13//! Two EventFd instances per bridge channel:
14//!
15//! - `producer_wake`: Consumer writes → Producer reads (queue was full, now has space)
16//! - `consumer_wake`: Producer writes → Consumer reads (queue was empty, now has data)
17//!
18//! The runtime-specific integration (registering the fd with epoll/io_uring) is
19//! done by the caller. This module only provides raw fd-based signaling.
20
21use std::io;
22use std::os::unix::io::{AsRawFd, FromRawFd, OwnedFd, RawFd};
23
24/// A cross-runtime wake signal backed by a Linux eventfd.
25///
26/// Write to signal, read to consume. Multiple signals coalesce into one.
27/// The fd can be registered with any event loop (epoll, io_uring, kqueue fallback).
28pub struct EventFd {
29    fd: OwnedFd,
30}
31
32impl EventFd {
33    /// Create a new eventfd in semaphore mode.
34    ///
35    /// `EFD_NONBLOCK` ensures reads/writes never block the calling thread.
36    /// `EFD_CLOEXEC` prevents fd leaks across fork/exec.
37    pub fn new() -> io::Result<Self> {
38        // SAFETY: eventfd2 is a standard Linux syscall. Flags are valid.
39        let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
40        if fd < 0 {
41            return Err(io::Error::last_os_error());
42        }
43        // SAFETY: fd is a valid file descriptor returned by eventfd().
44        let fd = unsafe { OwnedFd::from_raw_fd(fd) };
45        Ok(Self { fd })
46    }
47
48    /// Signal the other side to wake up.
49    ///
50    /// Writes 1 to the eventfd counter. Multiple writes accumulate but
51    /// a single read clears all pending signals.
52    pub fn notify(&self) -> io::Result<()> {
53        let val: u64 = 1;
54        // SAFETY: writing 8 bytes to a valid eventfd.
55        let ret = unsafe {
56            libc::write(
57                self.fd.as_raw_fd(),
58                &val as *const u64 as *const libc::c_void,
59                8,
60            )
61        };
62        if ret < 0 {
63            Err(io::Error::last_os_error())
64        } else {
65            Ok(())
66        }
67    }
68
69    /// Consume the pending signal count, returning the accumulated value.
70    ///
71    /// Returns `Ok(0)` if no signal was pending (EAGAIN on non-blocking fd).
72    /// Returns `Ok(n)` where n is the accumulated signal count.
73    pub fn try_read(&self) -> io::Result<u64> {
74        let mut val: u64 = 0;
75        // SAFETY: reading 8 bytes from a valid eventfd.
76        let ret = unsafe {
77            libc::read(
78                self.fd.as_raw_fd(),
79                &mut val as *mut u64 as *mut libc::c_void,
80                8,
81            )
82        };
83        if ret < 0 {
84            let err = io::Error::last_os_error();
85            if err.kind() == io::ErrorKind::WouldBlock {
86                Ok(0)
87            } else {
88                Err(err)
89            }
90        } else {
91            Ok(val)
92        }
93    }
94
95    /// Get the raw file descriptor for registration with an event loop.
96    ///
97    /// The caller can register this fd with:
98    /// - Tokio: `AsyncFd::new()`
99    /// - Glommio: `GlommioDma::from_raw_fd()` or similar
100    /// - io_uring: `IORING_OP_READ` on the fd
101    pub fn as_fd(&self) -> RawFd {
102        self.fd.as_raw_fd()
103    }
104}
105
106// SAFETY: eventfd is a kernel object. The fd can be shared across threads.
107// Writes are atomic (8-byte writes to eventfd are guaranteed atomic by Linux).
108unsafe impl Send for EventFd {}
109unsafe impl Sync for EventFd {}
110
111/// A pair of eventfds for bidirectional wake signaling across the bridge.
112///
113/// ```text
114/// Producer (Tokio)          Consumer (TPC)
115///    │                          │
116///    │── notify(consumer_wake) ──→│  "queue has data"
117///    │                          │
118///    │←── notify(producer_wake) ──│  "queue has space"
119/// ```
120pub struct WakePair {
121    /// Producer reads this to know the consumer freed space.
122    pub producer_wake: EventFd,
123    /// Consumer reads this to know the producer enqueued data.
124    pub consumer_wake: EventFd,
125}
126
127impl WakePair {
128    /// Create a new wake pair.
129    pub fn new() -> io::Result<Self> {
130        Ok(Self {
131            producer_wake: EventFd::new()?,
132            consumer_wake: EventFd::new()?,
133        })
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn notify_and_read() {
143        let efd = EventFd::new().unwrap();
144
145        // No pending signal.
146        assert_eq!(efd.try_read().unwrap(), 0);
147
148        // Signal once.
149        efd.notify().unwrap();
150        assert_eq!(efd.try_read().unwrap(), 1);
151
152        // Consumed — nothing pending.
153        assert_eq!(efd.try_read().unwrap(), 0);
154    }
155
156    #[test]
157    fn multiple_notifies_accumulate() {
158        let efd = EventFd::new().unwrap();
159
160        efd.notify().unwrap();
161        efd.notify().unwrap();
162        efd.notify().unwrap();
163
164        // Single read returns accumulated count.
165        assert_eq!(efd.try_read().unwrap(), 3);
166        assert_eq!(efd.try_read().unwrap(), 0);
167    }
168
169    #[test]
170    fn wake_pair_bidirectional() {
171        let pair = WakePair::new().unwrap();
172
173        // Producer signals consumer.
174        pair.consumer_wake.notify().unwrap();
175        assert_eq!(pair.consumer_wake.try_read().unwrap(), 1);
176
177        // Consumer signals producer.
178        pair.producer_wake.notify().unwrap();
179        assert_eq!(pair.producer_wake.try_read().unwrap(), 1);
180    }
181
182    #[test]
183    fn fd_is_valid() {
184        let efd = EventFd::new().unwrap();
185        assert!(efd.as_fd() >= 0);
186    }
187}