Skip to main content

coreshift_core/reactor/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/
4
5//! Asynchronous event reactor.
6//!
7//! This module provides a lightweight wrapper around Linux `epoll` for
8//! multiplexing I/O events. It is optimized for edge-triggered monitoring.
9//! It is intentionally explicit about Linux readiness semantics rather than
10//! hiding them behind a higher-level async runtime abstraction.
11
12use crate::CoreError;
13use crate::error::syscall_ret;
14use std::io::Error as IoError;
15use std::time::Duration;
16
17#[inline(always)]
18fn errno() -> i32 {
19    IoError::last_os_error().raw_os_error().unwrap_or(0)
20}
21
22/// An owned file descriptor that closes on drop.
23///
24/// `Fd` is move-only. Constructing one from a raw descriptor transfers close
25/// ownership to `Fd`; do not also close the raw descriptor elsewhere.
26///
27/// ### Fork Safety
28/// `Fd` instances created by Core usually have `O_CLOEXEC` set. If the process
29/// forks, the descriptor will be inherited by the child but will be closed
30/// automatically upon `exec`. Callers that need a descriptor to survive `exec`
31/// must clear the flag manually.
32pub struct Fd(RawFd);
33
34use std::os::unix::io::{AsRawFd, RawFd};
35
36impl AsRawFd for Fd {
37    fn as_raw_fd(&self) -> RawFd {
38        self.0
39    }
40}
41
42impl Fd {
43    /// Wrap a raw file descriptor.
44    ///
45    /// # Errors
46    /// Returns a [`CoreError`] if the descriptor is negative.
47    #[inline(always)]
48    pub(crate) fn new(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
49        if fd < 0 {
50            Err(CoreError::sys(errno(), op))
51        } else {
52            Ok(Self(fd))
53        }
54    }
55
56    /// Wrap an owned raw file descriptor.
57    ///
58    /// # Safety
59    /// The caller must guarantee `fd` is valid, open, and uniquely owned by the
60    /// returned `Fd`. Passing a borrowed fd, or closing `fd` after this call,
61    /// can cause double-close or use-after-close bugs.
62    #[inline(always)]
63    pub unsafe fn from_owned_raw_fd(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
64        Self::new(fd, op)
65    }
66
67    /// Create a non-blocking `eventfd` with `EFD_CLOEXEC`.
68    ///
69    /// The descriptor is created with `FD_CLOEXEC` set.
70    ///
71    /// ### Errors
72    /// - `EINVAL`: `init` is invalid.
73    /// - `EMFILE`: Process limit on open file descriptors hit.
74    /// - `ENFILE`: System-wide limit on open files hit.
75    pub fn eventfd(init: u32) -> Result<Self, CoreError> {
76        let fd = unsafe { libc::eventfd(init, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
77        syscall_ret(fd, "eventfd")?;
78        Self::new(fd, "eventfd")
79    }
80
81    /// Create a non-blocking `timerfd` using `CLOCK_MONOTONIC` with `TFD_CLOEXEC`.
82    ///
83    /// The descriptor is created with `FD_CLOEXEC` set.
84    ///
85    /// ### Errors
86    /// - `EMFILE`: Process limit on open file descriptors hit.
87    /// - `ENFILE`: System-wide limit on open files hit.
88    /// - `ENOMEM`: Insufficient kernel memory.
89    pub fn timerfd() -> Result<Self, CoreError> {
90        let fd = unsafe {
91            libc::timerfd_create(
92                libc::CLOCK_MONOTONIC,
93                libc::TFD_CLOEXEC | libc::TFD_NONBLOCK,
94            )
95        };
96        syscall_ret(fd, "timerfd_create")?;
97        Self::new(fd, "timerfd_create")
98    }
99
100    /// Access the underlying raw file descriptor.
101    ///
102    /// NOTE: This is an escape hatch for low-level interactions. Prefer using
103    /// the safe methods on `Fd` or implementing `AsRawFd`.
104    #[inline(always)]
105    pub(crate) fn raw(&self) -> RawFd {
106        self.0
107    }
108
109    /// Perform a `dup2` syscall.
110    ///
111    /// ### Errors
112    /// - `EBADF`: The source or target file descriptor is invalid.
113    /// - `EMFILE`: The target descriptor exceeds the process limit.
114    pub fn dup2(&self, target: RawFd) -> Result<(), CoreError> {
115        loop {
116            let r = unsafe { libc::dup2(self.0, target) };
117            if r < 0 {
118                let e = errno();
119                if e == libc::EINTR {
120                    continue;
121                }
122                return syscall_ret(r, "dup2");
123            }
124            return Ok(());
125        }
126    }
127
128    /// Set the `O_NONBLOCK` flag on the descriptor.
129    ///
130    /// ### Errors
131    /// - `EBADF`: The file descriptor is invalid.
132    pub fn set_nonblock(&self) -> Result<(), CoreError> {
133        let flags = unsafe { libc::fcntl(self.0, libc::F_GETFL) };
134        syscall_ret(flags, "fcntl(F_GETFL)")?;
135        let r = unsafe { libc::fcntl(self.0, libc::F_SETFL, flags | libc::O_NONBLOCK) };
136        syscall_ret(r, "fcntl(F_SETFL)")
137    }
138
139    /// Set the `FD_CLOEXEC` flag on the descriptor.
140    ///
141    /// ### Errors
142    /// - `EBADF`: The file descriptor is invalid.
143    pub fn set_cloexec(&self) -> Result<(), CoreError> {
144        let flags = unsafe { libc::fcntl(self.0, libc::F_GETFD) };
145        syscall_ret(flags, "fcntl(F_GETFD)")?;
146        let r = unsafe { libc::fcntl(self.0, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
147        syscall_ret(r, "fcntl(F_SETFD)")
148    }
149
150    /// Read bytes into a mutable slice.
151    ///
152    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
153    ///
154    /// ### Edge Cases
155    /// - **Zero-length read**: Returns `Ok(Some(0))` immediately.
156    /// - **Partial read**: Returns the number of bytes actually read.
157    ///
158    /// ### Errors
159    /// - `EBADF`: The file descriptor is invalid or not open for reading.
160    /// - `EFAULT`: `buf` points outside the process's address space.
161    /// - `EIO`: Low-level I/O error.
162    pub fn read_slice(&self, buf: &mut [u8]) -> Result<Option<usize>, CoreError> {
163        self.read_raw(buf.as_mut_ptr(), buf.len())
164    }
165
166    /// Seek to an absolute file offset.
167    ///
168    /// ### Errors
169    /// - `EBADF`: The file descriptor is not seekable.
170    /// - `EINVAL`: `offset` is invalid.
171    /// - `EOVERFLOW`: The resulting offset exceeds the off_t range.
172    pub fn seek_set(&self, offset: i64) -> Result<u64, CoreError> {
173        loop {
174            let pos = unsafe { libc::lseek(self.0, offset as libc::off_t, libc::SEEK_SET) };
175            if pos < 0 {
176                let e = errno();
177                if e == libc::EINTR {
178                    continue;
179                }
180                return Err(CoreError::sys(e, "lseek"));
181            }
182            return Ok(pos as u64);
183        }
184    }
185
186    /// Write bytes from a slice.
187    ///
188    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
189    ///
190    /// ### Edge Cases
191    /// - **Zero-length write**: Returns `Ok(Some(0))` immediately.
192    /// - **Partial write**: Returns the number of bytes actually written.
193    ///
194    /// ### Errors
195    /// - `EBADF`: The file descriptor is invalid or not open for writing.
196    /// - `EFAULT`: `buf` points outside the process's address space.
197    /// - `EPIPE`: The reading end of a pipe or socket was closed.
198    pub fn write_slice(&self, buf: &[u8]) -> Result<Option<usize>, CoreError> {
199        self.write_raw(buf.as_ptr(), buf.len())
200    }
201
202    /// Read a native-endian `u64`, blocking until data is available.
203    ///
204    /// Unlike `read_u64`, this never returns `Ok(None)` — it retries on `EINTR`
205    /// and returns `Err` only on a hard I/O failure. Intended for blocking
206    /// eventfds used as inter-thread notification primitives.
207    pub fn read_u64_blocking(&self) -> Result<u64, CoreError> {
208        let mut bytes = [0u8; std::mem::size_of::<u64>()];
209        loop {
210            let n = unsafe {
211                libc::read(self.0, bytes.as_mut_ptr() as *mut libc::c_void, bytes.len())
212            };
213            if n == bytes.len() as isize {
214                return Ok(u64::from_ne_bytes(bytes));
215            }
216            if n < 0 {
217                let e = errno();
218                if e == libc::EINTR { continue; }
219                return Err(CoreError::sys(e, "read_u64_blocking"));
220            }
221            return Err(CoreError::sys(libc::EIO, "read_u64_blocking:short_read"));
222        }
223    }
224
225    /// Read a native-endian `u64`.
226    ///
227    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
228    pub fn read_u64(&self) -> Result<Option<u64>, CoreError> {
229        let mut bytes = [0u8; std::mem::size_of::<u64>()];
230        match self.read_slice(&mut bytes)? {
231            Some(n) if n == bytes.len() => Ok(Some(u64::from_ne_bytes(bytes))),
232            Some(_) => Err(CoreError::sys(libc::EIO, "read_u64")),
233            None => Ok(None),
234        }
235    }
236
237    /// Write a native-endian `u64`.
238    ///
239    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
240    pub fn write_u64(&self, value: u64) -> Result<Option<usize>, CoreError> {
241        self.write_slice(&value.to_ne_bytes())
242    }
243
244    /// Arm or disarm a one-shot `timerfd`.
245    ///
246    /// Passing `None` disarms the timer. Zero durations are rounded up to one
247    /// nanosecond so the timer still expires.
248    ///
249    /// ### Errors
250    /// - `EBADF`: The file descriptor is invalid.
251    /// - `EINVAL`: The duration is invalid or not supported by the kernel.
252    pub fn set_timer_oneshot(&self, delay: Option<Duration>) -> Result<(), CoreError> {
253        let mut spec: libc::itimerspec = unsafe { std::mem::zeroed() };
254        if let Some(delay) = delay {
255            let delay = delay.max(Duration::from_nanos(1));
256            spec.it_value.tv_sec = delay.as_secs() as libc::time_t;
257            spec.it_value.tv_nsec = delay.subsec_nanos() as libc::c_long;
258        }
259
260        let ret = unsafe { libc::timerfd_settime(self.raw(), 0, &spec, std::ptr::null_mut()) };
261        syscall_ret(ret, "timerfd_settime")
262    }
263
264    /// Read bytes into a raw buffer.
265    ///
266    /// Internal callers must ensure `buf` points to a valid writable region of
267    /// at least `count` bytes.
268    ///
269    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
270    pub(crate) fn read_raw(&self, buf: *mut u8, count: usize) -> Result<Option<usize>, CoreError> {
271        loop {
272            let n = unsafe { libc::read(self.0, buf as *mut libc::c_void, count) };
273            if n < 0 {
274                let e = errno();
275                if e == libc::EINTR {
276                    continue;
277                }
278                if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
279                    return Ok(None);
280                }
281                return Err(CoreError::sys(e, "read"));
282            }
283            return Ok(Some(n as usize));
284        }
285    }
286
287    /// Write bytes from a raw buffer.
288    ///
289    /// Internal callers must ensure `buf` points to a valid readable region of
290    /// at least `count` bytes.
291    ///
292    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
293    pub(crate) fn write_raw(
294        &self,
295        buf: *const u8,
296        count: usize,
297    ) -> Result<Option<usize>, CoreError> {
298        loop {
299            let n = unsafe { libc::write(self.0, buf as *const libc::c_void, count) };
300            if n < 0 {
301                let e = errno();
302                if e == libc::EINTR {
303                    continue;
304                }
305                if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
306                    return Ok(None);
307                }
308                return Err(CoreError::sys(e, "write"));
309            }
310            return Ok(Some(n as usize));
311        }
312    }
313}
314
315impl Drop for Fd {
316    fn drop(&mut self) {
317        if self.0 >= 0 {
318            unsafe {
319                libc::close(self.0);
320            }
321        }
322    }
323}
324
325/// An opaque token representing a registered file descriptor.
326#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
327pub struct Token(u64);
328
329#[allow(dead_code)]
330impl Token {
331    #[inline(always)]
332    pub(crate) fn new(val: u64) -> Self {
333        Self(val)
334    }
335
336    #[inline(always)]
337    pub(crate) fn val(&self) -> u64 {
338        self.0
339    }
340}
341
342/// A readiness event generated by the reactor.
343#[derive(Clone, Copy, Debug)]
344pub struct Event {
345    /// Token associated with the ready descriptor.
346    pub token: Token,
347    /// Descriptor is ready for reading (`EPOLLIN`).
348    pub readable: bool,
349    /// Descriptor has priority data or an exceptional condition (`EPOLLPRI`).
350    pub priority: bool,
351    /// Descriptor is ready for writing (`EPOLLOUT`).
352    pub writable: bool,
353    /// Indicates an error condition (`EPOLLERR`).
354    ///
355    /// NOTE: For edge-triggered readiness, an error condition often means both
356    /// readable and writable are set to ensure the handler drains the FD.
357    pub error: bool,
358    /// Indicates a remote hangup (`EPOLLHUP`).
359    pub hangup: bool,
360}
361
362const _: () = assert!(std::mem::size_of::<Event>() == 16);
363const _: () = assert!(std::mem::align_of::<Event>() == 8);
364
365/// A lightweight epoll reactor using edge-triggered monitoring (EPOLLET).
366///
367/// ### Edge-Triggered Contract
368/// Because this reactor uses EPOLLET, all handlers MUST drain their respective
369/// read or write sources until they receive an `EAGAIN` / `EWOULDBLOCK` error
370/// (represented as `Ok(None)` in the `Fd` helpers).
371///
372/// Failure to drain a source will result in missing future readiness events
373/// for that file descriptor until it is re-registered or another event occurs.
374///
375/// ### Fork Safety
376/// The `Reactor` owns an `epoll` descriptor which is `O_CLOEXEC`. After an
377/// `exec` call in a child process, the reactor and all its registrations are
378/// lost. If the child continues without `exec`, it shares the same epoll
379/// instance, which is generally unsafe and requires careful coordination.
380///
381/// # Example
382/// ```no_run
383/// # use coreshift_core::reactor::{Reactor, Fd, Event};
384/// # fn example(fd: Fd) -> Result<(), Box<dyn std::error::Error>> {
385/// let mut reactor = Reactor::new()?;
386/// let token = reactor.add(&fd, true, false)?;
387///
388/// let mut events = Vec::new();
389/// loop {
390///     reactor.wait(&mut events, 64, -1)?;
391///     for ev in &events {
392///         if ev.token == token {
393///             // Drain fd...
394///         }
395///     }
396/// }
397/// # Ok(())
398/// # }
399/// ```
400pub struct Reactor {
401    epfd: RawFd,
402    next_token: u64,
403    events_buf: Vec<libc::epoll_event>,
404    signalfd: Option<Fd>,
405    signalfd_previous_mask: Option<libc::sigset_t>,
406    /// Token for the signalfd (if initialized).
407    sigchld_token: Option<Token>,
408    /// Token for the inotify fd (if initialized).
409    inotify_token: Option<Token>,
410}
411
412impl Reactor {
413    /// Create a new epoll reactor.
414    ///
415    /// ### Errors
416    /// - `EMFILE`: Process limit on open file descriptors hit.
417    /// - `ENFILE`: System-wide limit on open files hit.
418    /// - `ENOMEM`: Insufficient kernel memory.
419    pub fn new() -> Result<Self, CoreError> {
420        let epfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
421        syscall_ret(epfd, "epoll_create1")?;
422        Ok(Self {
423            epfd,
424            next_token: 1,
425            events_buf: Vec::with_capacity(64),
426            signalfd: None,
427            signalfd_previous_mask: None,
428            sigchld_token: None,
429            inotify_token: None,
430        })
431    }
432
433    /// Initialize inotify and add it to the reactor.
434    ///
435    /// ### Errors
436    /// - `EMFILE`: Process limit on open file descriptors hit.
437    /// - `ENFILE`: System-wide limit on open files hit.
438    /// - `ENOMEM`: Insufficient kernel memory.
439    /// - `EPERM`: Permission denied to create inotify instance.
440    pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError> {
441        let fd = unsafe { libc::inotify_init1(libc::IN_CLOEXEC | libc::IN_NONBLOCK) };
442        syscall_ret(fd, "inotify_init1")?;
443
444        let fd_obj = Fd::new(fd, "inotify")?;
445        let token = self.add(&fd_obj, true, false)?;
446        self.inotify_token = Some(token);
447
448        Ok((fd_obj, token))
449    }
450
451    /// Initialize signalfd for SIGCHLD and add it to the reactor.
452    ///
453    /// The previous current-thread signal mask is restored when the reactor is
454    /// dropped.
455    ///
456    /// ### Errors
457    /// - `EBADF`: The provided file descriptor is invalid.
458    /// - `EINVAL`: Signal mask is invalid or already set up.
459    /// - `EMFILE`: Process limit on open file descriptors hit.
460    pub fn setup_signalfd(&mut self) -> Result<Token, CoreError> {
461        if self.signalfd.is_some() {
462            return Err(CoreError::sys(
463                libc::EINVAL,
464                "setup_signalfd already initialized",
465            ));
466        }
467
468        let mut mask: libc::sigset_t = unsafe { std::mem::zeroed() };
469        unsafe { libc::sigemptyset(&mut mask) };
470        unsafe { libc::sigaddset(&mut mask, libc::SIGCHLD) };
471
472        let mut previous_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
473        let r = unsafe { libc::pthread_sigmask(libc::SIG_BLOCK, &mask, &mut previous_mask) };
474        if r != 0 {
475            return Err(CoreError::sys(r, "pthread_sigmask(SIG_BLOCK)"));
476        }
477
478        let sfd = unsafe { libc::signalfd(-1, &mask, libc::SFD_NONBLOCK | libc::SFD_CLOEXEC) };
479        if let Err(err) = syscall_ret(sfd, "signalfd") {
480            let _ = unsafe {
481                libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
482            };
483            return Err(err);
484        }
485
486        let fd = Fd::new(sfd, "signalfd")?;
487        let token = match self.add(&fd, true, false) {
488            Ok(token) => token,
489            Err(err) => {
490                let _ = unsafe {
491                    libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
492                };
493                return Err(err);
494            }
495        };
496
497        self.signalfd = Some(fd);
498        self.signalfd_previous_mask = Some(previous_mask);
499        self.sigchld_token = Some(token);
500
501        Ok(token)
502    }
503
504    /// Drain the internal signalfd buffer.
505    pub fn drain_signalfd(&self) -> Result<(), CoreError> {
506        if let Some(fd) = &self.signalfd {
507            let mut buf = [0u8; std::mem::size_of::<libc::signalfd_siginfo>()];
508            loop {
509                match fd.read_slice(&mut buf) {
510                    Ok(Some(n)) if n < buf.len() => break,
511                    Ok(Some(_)) => continue,
512                    Ok(None) => break,
513                    Err(e) => return Err(e),
514                }
515            }
516        }
517        Ok(())
518    }
519
520    /// Register a file descriptor with the reactor.
521    ///
522    /// This assigns a new unique token for the descriptor and enables
523    /// edge-triggered monitoring.
524    #[inline(always)]
525    pub fn add(&mut self, fd: &Fd, readable: bool, writable: bool) -> Result<Token, CoreError> {
526        let token = Token(self.next_token);
527        self.next_token += 1;
528        self.add_with_token(fd.raw(), token, readable, writable, false)?;
529        Ok(token)
530    }
531
532    /// Register a file descriptor for priority readiness (EPOLLPRI).
533    #[inline(always)]
534    pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError> {
535        let token = Token(self.next_token);
536        self.next_token += 1;
537        self.add_with_token(fd.raw(), token, false, false, true)?;
538        Ok(token)
539    }
540
541    /// Register a file descriptor with custom epoll flags.
542    ///
543    /// This allows registration with flags like `EPOLLONESHOT` or explicit
544    /// control over `EPOLLET`.
545    ///
546    /// # Example
547    /// ```no_run
548    /// # use coreshift_core::reactor::{Reactor, Fd};
549    /// let mut reactor = Reactor::new().unwrap();
550    /// let fd = Fd::eventfd(0).unwrap();
551    /// reactor.add_with_flags(&fd, (libc::EPOLLIN | libc::EPOLLONESHOT) as u32).unwrap();
552    /// ```
553    #[inline(always)]
554    pub fn add_with_flags(&mut self, fd: &Fd, flags: u32) -> Result<Token, CoreError> {
555        let token = Token(self.next_token);
556        self.next_token += 1;
557        let mut ev = libc::epoll_event {
558            events: flags,
559            u64: token.0,
560        };
561        let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd.raw(), &mut ev) };
562        syscall_ret(r, "epoll_ctl_add")?;
563        Ok(token)
564    }
565
566    #[inline(always)]
567    pub(crate) fn add_with_token(
568        &mut self,
569        raw_fd: RawFd,
570        token: Token,
571        readable: bool,
572        writable: bool,
573        priority: bool,
574    ) -> Result<(), CoreError> {
575        let mut events = libc::EPOLLET as u32;
576        if readable {
577            events |= libc::EPOLLIN as u32;
578        }
579        if writable {
580            events |= libc::EPOLLOUT as u32;
581        }
582        if priority {
583            events |= libc::EPOLLPRI as u32;
584        }
585        let mut ev = libc::epoll_event {
586            events,
587            u64: token.0,
588        };
589        let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, raw_fd, &mut ev) };
590        syscall_ret(r, "epoll_ctl_add")?;
591        Ok(())
592    }
593
594    /// Remove a file descriptor from the reactor.
595    #[inline(always)]
596    pub fn del(&self, fd: &Fd) -> Result<(), CoreError> {
597        self.del_raw(fd.raw())
598    }
599
600    /// Remove a raw descriptor from the reactor.
601    ///
602    /// NOTE: This is an escape hatch for low-level interactions. Prefer using
603    /// [`del`](Self::del).
604    #[inline(always)]
605    pub(crate) fn del_raw(&self, raw: RawFd) -> Result<(), CoreError> {
606        loop {
607            let ret = unsafe {
608                libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, raw, std::ptr::null_mut())
609            };
610            if ret == -1 {
611                let e = errno();
612                if e == libc::EINTR {
613                    continue;
614                }
615                return Err(CoreError::sys(e, "epoll_ctl_del"));
616            }
617            return Ok(());
618        }
619    }
620
621    /// Wait for events.
622    ///
623    /// This function blocks until at least one event is ready or the timeout
624    /// expires. Ready events are appended to the `buffer`.
625    ///
626    /// ### Timeout Contract
627    /// - `-1`: Block indefinitely until an event occurs or a signal interrupts.
628    /// - `0`: Return immediately, even if no events are ready.
629    /// - `> 0`: Wait for up to the specified number of milliseconds.
630    ///
631    /// Returns the number of events received.
632    #[inline(always)]
633    pub fn wait(
634        &mut self,
635        buffer: &mut Vec<Event>,
636        max_events: usize,
637        timeout: i32,
638    ) -> Result<usize, CoreError> {
639        buffer.clear();
640
641        if max_events == 0 {
642            return Ok(0);
643        }
644
645        // Ensure buffer has enough capacity
646        if buffer.capacity() < max_events {
647            buffer.reserve(max_events.saturating_sub(buffer.len()));
648        }
649
650        if self.events_buf.capacity() < max_events {
651            self.events_buf
652                .reserve(max_events.saturating_sub(self.events_buf.len()));
653        }
654
655        let n = unsafe {
656            libc::epoll_wait(
657                self.epfd,
658                self.events_buf.as_mut_ptr(),
659                max_events as i32,
660                timeout,
661            )
662        };
663
664        if n > 0 {
665            unsafe {
666                self.events_buf.set_len(n as usize);
667            }
668            for i in 0..n as usize {
669                let ev = self.events_buf[i];
670                let is_read = (ev.events & libc::EPOLLIN as u32) != 0;
671                let is_priority = (ev.events & libc::EPOLLPRI as u32) != 0;
672                let is_write = (ev.events & libc::EPOLLOUT as u32) != 0;
673                let is_err = (ev.events & libc::EPOLLERR as u32) != 0;
674                let is_hup = (ev.events & libc::EPOLLHUP as u32) != 0;
675
676                buffer.push(Event {
677                    token: Token(ev.u64),
678                    readable: is_read || is_err,
679                    priority: is_priority || is_err,
680                    writable: is_write || is_err,
681                    error: is_err,
682                    hangup: is_hup,
683                });
684            }
685            return Ok(n as usize);
686        }
687
688        if n < 0 {
689            let e = errno();
690            if e == libc::EINTR {
691                return Ok(0);
692            }
693            return Err(CoreError::sys(e, "epoll_wait"));
694        }
695        Ok(0)
696    }
697
698    /// Return the raw epoll file descriptor.
699    ///
700    /// NOTE: This is an escape hatch for low-level interactions.
701    #[allow(dead_code)]
702    pub(crate) fn fd(&self) -> RawFd {
703        self.epfd
704    }
705}
706
707impl Drop for Reactor {
708    fn drop(&mut self) {
709        if let Some(mask) = self.signalfd_previous_mask.take() {
710            let _ =
711                unsafe { libc::pthread_sigmask(libc::SIG_SETMASK, &mask, std::ptr::null_mut()) };
712        }
713        if self.epfd >= 0 {
714            unsafe {
715                libc::close(self.epfd);
716            }
717        }
718    }
719}