Skip to main content

coreshift_core/reactor/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/
4
5//! Asynchronous event reactor.
6//!
7//! This module provides a lightweight wrapper around Linux `epoll` for
8//! multiplexing I/O events. It is optimized for edge-triggered monitoring.
9//! It is intentionally explicit about Linux readiness semantics rather than
10//! hiding them behind a higher-level async runtime abstraction.
11
12use crate::CoreError;
13use crate::error::syscall_ret;
14use std::io::Error as IoError;
15use std::time::Duration;
16
17#[inline(always)]
18fn errno() -> i32 {
19    IoError::last_os_error().raw_os_error().unwrap_or(0)
20}
21
22/// An owned file descriptor that closes on drop.
23///
24/// `Fd` is move-only. Constructing one from a raw descriptor transfers close
25/// ownership to `Fd`; do not also close the raw descriptor elsewhere.
26pub struct Fd(RawFd);
27
28use std::os::unix::io::{AsRawFd, RawFd};
29
30impl AsRawFd for Fd {
31    fn as_raw_fd(&self) -> RawFd {
32        self.0
33    }
34}
35
36impl Fd {
37    /// Wrap a raw file descriptor.
38    ///
39    /// # Errors
40    /// Returns a [`CoreError`] if the descriptor is negative.
41    #[inline(always)]
42    pub(crate) fn new(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
43        if fd < 0 {
44            Err(CoreError::sys(errno(), op))
45        } else {
46            Ok(Self(fd))
47        }
48    }
49
50    /// Wrap an owned raw file descriptor.
51    ///
52    /// # Safety
53    /// The caller must guarantee `fd` is valid, open, and uniquely owned by the
54    /// returned `Fd`. Passing a borrowed fd, or closing `fd` after this call,
55    /// can cause double-close or use-after-close bugs.
56    #[inline(always)]
57    pub unsafe fn from_owned_raw_fd(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
58        Self::new(fd, op)
59    }
60
61    /// Create a non-blocking `eventfd`.
62    pub fn eventfd(init: u32) -> Result<Self, CoreError> {
63        let fd = unsafe { libc::eventfd(init, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
64        syscall_ret(fd, "eventfd")?;
65        Self::new(fd, "eventfd")
66    }
67
68    /// Create a non-blocking `timerfd` using `CLOCK_MONOTONIC`.
69    pub fn timerfd() -> Result<Self, CoreError> {
70        let fd = unsafe {
71            libc::timerfd_create(
72                libc::CLOCK_MONOTONIC,
73                libc::TFD_CLOEXEC | libc::TFD_NONBLOCK,
74            )
75        };
76        syscall_ret(fd, "timerfd_create")?;
77        Self::new(fd, "timerfd_create")
78    }
79
80    /// Access the underlying raw file descriptor.
81    ///
82    /// NOTE: This is an escape hatch for low-level interactions. Prefer using
83    /// the safe methods on `Fd` or implementing `AsRawFd`.
84    #[inline(always)]
85    pub(crate) fn raw(&self) -> RawFd {
86        self.0
87    }
88
89    /// Perform a `dup2` syscall.
90    pub fn dup2(&self, target: RawFd) -> Result<(), CoreError> {
91        loop {
92            let r = unsafe { libc::dup2(self.0, target) };
93            if r < 0 {
94                let e = errno();
95                if e == libc::EINTR {
96                    continue;
97                }
98                return syscall_ret(r, "dup2");
99            }
100            return Ok(());
101        }
102    }
103
104    /// Set the `O_NONBLOCK` flag on the descriptor.
105    pub fn set_nonblock(&self) -> Result<(), CoreError> {
106        let flags = unsafe { libc::fcntl(self.0, libc::F_GETFL) };
107        syscall_ret(flags, "fcntl(F_GETFL)")?;
108        let r = unsafe { libc::fcntl(self.0, libc::F_SETFL, flags | libc::O_NONBLOCK) };
109        syscall_ret(r, "fcntl(F_SETFL)")
110    }
111
112    /// Set the `FD_CLOEXEC` flag on the descriptor.
113    pub fn set_cloexec(&self) -> Result<(), CoreError> {
114        let flags = unsafe { libc::fcntl(self.0, libc::F_GETFD) };
115        syscall_ret(flags, "fcntl(F_GETFD)")?;
116        let r = unsafe { libc::fcntl(self.0, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
117        syscall_ret(r, "fcntl(F_SETFD)")
118    }
119
120    /// Read bytes into a mutable slice.
121    ///
122    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
123    pub fn read_slice(&self, buf: &mut [u8]) -> Result<Option<usize>, CoreError> {
124        self.read_raw(buf.as_mut_ptr(), buf.len())
125    }
126
127    /// Seek to an absolute file offset.
128    pub fn seek_set(&self, offset: i64) -> Result<u64, CoreError> {
129        loop {
130            let pos = unsafe { libc::lseek(self.0, offset as libc::off_t, libc::SEEK_SET) };
131            if pos < 0 {
132                let e = errno();
133                if e == libc::EINTR {
134                    continue;
135                }
136                return Err(CoreError::sys(e, "lseek"));
137            }
138            return Ok(pos as u64);
139        }
140    }
141
142    /// Write bytes from a slice.
143    ///
144    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
145    pub fn write_slice(&self, buf: &[u8]) -> Result<Option<usize>, CoreError> {
146        self.write_raw(buf.as_ptr(), buf.len())
147    }
148
149    /// Read a native-endian `u64`.
150    ///
151    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
152    pub fn read_u64(&self) -> Result<Option<u64>, CoreError> {
153        let mut bytes = [0u8; std::mem::size_of::<u64>()];
154        match self.read_slice(&mut bytes)? {
155            Some(n) if n == bytes.len() => Ok(Some(u64::from_ne_bytes(bytes))),
156            Some(_) => Err(CoreError::sys(libc::EIO, "read_u64")),
157            None => Ok(None),
158        }
159    }
160
161    /// Write a native-endian `u64`.
162    ///
163    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
164    pub fn write_u64(&self, value: u64) -> Result<Option<usize>, CoreError> {
165        self.write_slice(&value.to_ne_bytes())
166    }
167
168    /// Arm or disarm a one-shot `timerfd`.
169    ///
170    /// Passing `None` disarms the timer. Zero durations are rounded up to one
171    /// nanosecond so the timer still expires.
172    pub fn set_timer_oneshot(&self, delay: Option<Duration>) -> Result<(), CoreError> {
173        let mut spec: libc::itimerspec = unsafe { std::mem::zeroed() };
174        if let Some(delay) = delay {
175            let delay = delay.max(Duration::from_nanos(1));
176            spec.it_value.tv_sec = delay.as_secs() as libc::time_t;
177            spec.it_value.tv_nsec = delay.subsec_nanos() as libc::c_long;
178        }
179
180        let ret = unsafe { libc::timerfd_settime(self.raw(), 0, &spec, std::ptr::null_mut()) };
181        syscall_ret(ret, "timerfd_settime")
182    }
183
184    /// Read bytes into a raw buffer.
185    ///
186    /// Internal callers must ensure `buf` points to a valid writable region of
187    /// at least `count` bytes.
188    ///
189    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
190    pub(crate) fn read_raw(&self, buf: *mut u8, count: usize) -> Result<Option<usize>, CoreError> {
191        loop {
192            let n = unsafe { libc::read(self.0, buf as *mut libc::c_void, count) };
193            if n < 0 {
194                let e = errno();
195                if e == libc::EINTR {
196                    continue;
197                }
198                if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
199                    return Ok(None);
200                }
201                return Err(CoreError::sys(e, "read"));
202            }
203            return Ok(Some(n as usize));
204        }
205    }
206
207    /// Write bytes from a raw buffer.
208    ///
209    /// Internal callers must ensure `buf` points to a valid readable region of
210    /// at least `count` bytes.
211    ///
212    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
213    pub(crate) fn write_raw(
214        &self,
215        buf: *const u8,
216        count: usize,
217    ) -> Result<Option<usize>, CoreError> {
218        loop {
219            let n = unsafe { libc::write(self.0, buf as *const libc::c_void, count) };
220            if n < 0 {
221                let e = errno();
222                if e == libc::EINTR {
223                    continue;
224                }
225                if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
226                    return Ok(None);
227                }
228                return Err(CoreError::sys(e, "write"));
229            }
230            return Ok(Some(n as usize));
231        }
232    }
233}
234
235impl Drop for Fd {
236    fn drop(&mut self) {
237        if self.0 >= 0 {
238            unsafe {
239                libc::close(self.0);
240            }
241        }
242    }
243}
244
245/// An opaque token representing a registered file descriptor.
246#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
247pub struct Token(u64);
248
249#[allow(dead_code)]
250impl Token {
251    #[inline(always)]
252    pub(crate) fn new(val: u64) -> Self {
253        Self(val)
254    }
255
256    #[inline(always)]
257    pub(crate) fn val(&self) -> u64 {
258        self.0
259    }
260}
261
262/// A readiness event generated by the reactor.
263#[derive(Clone, Copy, Debug)]
264pub struct Event {
265    /// Token associated with the ready descriptor.
266    pub token: Token,
267    /// Descriptor is ready for reading.
268    pub readable: bool,
269    /// Descriptor has priority data or an exceptional condition (EPOLLPRI).
270    pub priority: bool,
271    /// Descriptor is ready for writing.
272    pub writable: bool,
273    /// Indicates an error or hangup (EPOLLERR | EPOLLHUP).
274    ///
275    /// NOTE: For edge-triggered readiness, an error condition often means both
276    /// readable and writable are set to ensure the handler drains the FD.
277    pub error: bool,
278}
279
280/// A lightweight epoll reactor using edge-triggered monitoring (EPOLLET).
281///
282/// ### Edge-Triggered Contract
283/// Because this reactor uses EPOLLET, all handlers MUST drain their respective
284/// read or write sources until they receive an `EAGAIN` / `EWOULDBLOCK` error
285/// (represented as `Ok(None)` in the `Fd` helpers).
286///
287/// Failure to drain a source will result in missing future readiness events
288/// for that file descriptor until it is re-registered or another event occurs.
289///
290/// # Example
291/// ```no_run
292/// # use coreshift_core::reactor::{Reactor, Fd, Event};
293/// # fn example(fd: Fd) -> Result<(), Box<dyn std::error::Error>> {
294/// let mut reactor = Reactor::new()?;
295/// let token = reactor.add(&fd, true, false)?;
296///
297/// let mut events = Vec::new();
298/// loop {
299///     reactor.wait(&mut events, 64, -1)?;
300///     for ev in &events {
301///         if ev.token == token {
302///             // Drain fd...
303///         }
304///     }
305/// }
306/// # Ok(())
307/// # }
308/// ```
309pub struct Reactor {
310    epfd: RawFd,
311    next_token: u64,
312    events_buf: Vec<libc::epoll_event>,
313    signalfd: Option<Fd>,
314    signalfd_previous_mask: Option<libc::sigset_t>,
315    /// Token for the signalfd (if initialized).
316    sigchld_token: Option<Token>,
317    /// Token for the inotify fd (if initialized).
318    inotify_token: Option<Token>,
319}
320
321impl Reactor {
322    /// Create a new epoll reactor.
323    ///
324    /// # Errors
325    /// Returns [`CoreError`] if `epoll_create1` fails.
326    pub fn new() -> Result<Self, CoreError> {
327        let epfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
328        syscall_ret(epfd, "epoll_create1")?;
329        Ok(Self {
330            epfd,
331            next_token: 1,
332            events_buf: Vec::with_capacity(64),
333            signalfd: None,
334            signalfd_previous_mask: None,
335            sigchld_token: None,
336            inotify_token: None,
337        })
338    }
339
340    /// Initialize inotify and add it to the reactor.
341    ///
342    /// # Errors
343    /// Returns [`CoreError`] if `inotify_init1` or `epoll_ctl` fails.
344    pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError> {
345        let fd = unsafe { libc::inotify_init1(libc::IN_CLOEXEC | libc::IN_NONBLOCK) };
346        syscall_ret(fd, "inotify_init1")?;
347
348        let fd_obj = Fd::new(fd, "inotify")?;
349        let token = self.add(&fd_obj, true, false)?;
350        self.inotify_token = Some(token);
351
352        Ok((fd_obj, token))
353    }
354
355    /// Initialize signalfd for SIGCHLD and add it to the reactor.
356    ///
357    /// # Errors
358    /// Returns [`CoreError`] if `pthread_sigmask`, `signalfd`, or `epoll_ctl` fails.
359    ///
360    /// The previous current-thread signal mask is restored when the reactor is
361    /// dropped.
362    pub fn setup_signalfd(&mut self) -> Result<Token, CoreError> {
363        if self.signalfd.is_some() {
364            return Err(CoreError::sys(
365                libc::EINVAL,
366                "setup_signalfd already initialized",
367            ));
368        }
369
370        let mut mask: libc::sigset_t = unsafe { std::mem::zeroed() };
371        unsafe { libc::sigemptyset(&mut mask) };
372        unsafe { libc::sigaddset(&mut mask, libc::SIGCHLD) };
373
374        let mut previous_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
375        let r = unsafe { libc::pthread_sigmask(libc::SIG_BLOCK, &mask, &mut previous_mask) };
376        if r != 0 {
377            return Err(CoreError::sys(r, "pthread_sigmask(SIG_BLOCK)"));
378        }
379
380        let sfd = unsafe { libc::signalfd(-1, &mask, libc::SFD_NONBLOCK | libc::SFD_CLOEXEC) };
381        if let Err(err) = syscall_ret(sfd, "signalfd") {
382            let _ = unsafe {
383                libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
384            };
385            return Err(err);
386        }
387
388        let fd = Fd::new(sfd, "signalfd")?;
389        let token = match self.add(&fd, true, false) {
390            Ok(token) => token,
391            Err(err) => {
392                let _ = unsafe {
393                    libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
394                };
395                return Err(err);
396            }
397        };
398
399        self.signalfd = Some(fd);
400        self.signalfd_previous_mask = Some(previous_mask);
401        self.sigchld_token = Some(token);
402
403        Ok(token)
404    }
405
406    /// Drain the internal signalfd buffer.
407    pub fn drain_signalfd(&self) -> Result<(), CoreError> {
408        if let Some(fd) = &self.signalfd {
409            let mut buf = [0u8; std::mem::size_of::<libc::signalfd_siginfo>()];
410            loop {
411                match fd.read_slice(&mut buf) {
412                    Ok(Some(n)) if n < buf.len() => break,
413                    Ok(Some(_)) => continue,
414                    Ok(None) => break,
415                    Err(e) => return Err(e),
416                }
417            }
418        }
419        Ok(())
420    }
421
422    /// Register a file descriptor with the reactor.
423    ///
424    /// This assigns a new unique token for the descriptor and enables
425    /// edge-triggered monitoring.
426    #[inline(always)]
427    pub fn add(&mut self, fd: &Fd, readable: bool, writable: bool) -> Result<Token, CoreError> {
428        let token = Token(self.next_token);
429        self.next_token += 1;
430        self.add_with_token(fd.raw(), token, readable, writable, false)?;
431        Ok(token)
432    }
433
434    /// Register a file descriptor for priority readiness (EPOLLPRI).
435    #[inline(always)]
436    pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError> {
437        let token = Token(self.next_token);
438        self.next_token += 1;
439        self.add_with_token(fd.raw(), token, false, false, true)?;
440        Ok(token)
441    }
442
443    #[inline(always)]
444    pub(crate) fn add_with_token(
445        &mut self,
446        raw_fd: RawFd,
447        token: Token,
448        readable: bool,
449        writable: bool,
450        priority: bool,
451    ) -> Result<(), CoreError> {
452        let mut events = libc::EPOLLET as u32;
453        if readable {
454            events |= libc::EPOLLIN as u32;
455        }
456        if writable {
457            events |= libc::EPOLLOUT as u32;
458        }
459        if priority {
460            events |= libc::EPOLLPRI as u32;
461        }
462        let mut ev = libc::epoll_event {
463            events,
464            u64: token.0,
465        };
466        let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, raw_fd, &mut ev) };
467        syscall_ret(r, "epoll_ctl_add")?;
468        Ok(())
469    }
470
471    /// Remove a file descriptor from the reactor.
472    #[inline(always)]
473    pub fn del(&self, fd: &Fd) -> Result<(), CoreError> {
474        self.del_raw(fd.raw())
475    }
476
477    /// Remove a raw descriptor from the reactor.
478    ///
479    /// NOTE: This is an escape hatch for low-level interactions. Prefer using
480    /// [`del`](Self::del).
481    #[inline(always)]
482    pub(crate) fn del_raw(&self, raw: RawFd) -> Result<(), CoreError> {
483        loop {
484            let ret = unsafe {
485                libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, raw, std::ptr::null_mut())
486            };
487            if ret == -1 {
488                let e = errno();
489                if e == libc::EINTR {
490                    continue;
491                }
492                return Err(CoreError::sys(e, "epoll_ctl_del"));
493            }
494            return Ok(());
495        }
496    }
497
498    /// Wait for events.
499    ///
500    /// This function blocks until at least one event is ready or the timeout
501    /// expires. Ready events are appended to the `buffer`.
502    ///
503    /// Returns the number of events received.
504    #[inline(always)]
505    pub fn wait(
506        &mut self,
507        buffer: &mut Vec<Event>,
508        max_events: usize,
509        timeout: i32,
510    ) -> Result<usize, CoreError> {
511        buffer.clear();
512
513        if max_events == 0 {
514            return Ok(0);
515        }
516
517        // Ensure buffer has enough capacity
518        if buffer.capacity() < max_events {
519            buffer.reserve(max_events.saturating_sub(buffer.len()));
520        }
521
522        if self.events_buf.capacity() < max_events {
523            self.events_buf
524                .reserve(max_events.saturating_sub(self.events_buf.len()));
525        }
526
527        let n = unsafe {
528            libc::epoll_wait(
529                self.epfd,
530                self.events_buf.as_mut_ptr(),
531                max_events as i32,
532                timeout,
533            )
534        };
535
536        if n > 0 {
537            unsafe {
538                self.events_buf.set_len(n as usize);
539            }
540            for i in 0..n as usize {
541                let ev = self.events_buf[i];
542                let is_read = (ev.events & libc::EPOLLIN as u32) != 0;
543                let is_priority = (ev.events & libc::EPOLLPRI as u32) != 0;
544                let is_write = (ev.events & libc::EPOLLOUT as u32) != 0;
545                let is_err = (ev.events & (libc::EPOLLERR | libc::EPOLLHUP) as u32) != 0;
546
547                buffer.push(Event {
548                    token: Token(ev.u64),
549                    readable: is_read || is_err,
550                    priority: is_priority || is_err,
551                    writable: is_write || is_err,
552                    error: is_err,
553                });
554            }
555            return Ok(n as usize);
556        }
557
558        if n < 0 {
559            let e = errno();
560            if e == libc::EINTR {
561                return Ok(0);
562            }
563            return Err(CoreError::sys(e, "epoll_wait"));
564        }
565        Ok(0)
566    }
567
568    /// Return the raw epoll file descriptor.
569    ///
570    /// NOTE: This is an escape hatch for low-level interactions.
571    #[allow(dead_code)]
572    pub(crate) fn fd(&self) -> RawFd {
573        self.epfd
574    }
575}
576
577impl Drop for Reactor {
578    fn drop(&mut self) {
579        if let Some(mask) = self.signalfd_previous_mask.take() {
580            let _ =
581                unsafe { libc::pthread_sigmask(libc::SIG_SETMASK, &mask, std::ptr::null_mut()) };
582        }
583        if self.epfd >= 0 {
584            unsafe {
585                libc::close(self.epfd);
586            }
587        }
588    }
589}