Skip to main content

coreshift_core/reactor/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/
4
5//! Asynchronous event reactor.
6//!
7//! This module provides a lightweight wrapper around Linux `epoll` for
8//! multiplexing I/O events. It is optimized for edge-triggered monitoring.
9//! It is intentionally explicit about Linux readiness semantics rather than
10//! hiding them behind a higher-level async runtime abstraction.
11
12use crate::CoreError;
13use crate::error::syscall_ret;
14use std::io::Error as IoError;
15use std::time::Duration;
16
17#[inline(always)]
18fn errno() -> i32 {
19    IoError::last_os_error().raw_os_error().unwrap_or(0)
20}
21
22/// An owned file descriptor that closes on drop.
23///
24/// `Fd` is move-only. Constructing one from a raw descriptor transfers close
25/// ownership to `Fd`; do not also close the raw descriptor elsewhere.
26///
27/// ### Fork Safety
28/// `Fd` instances created by Core usually have `O_CLOEXEC` set. If the process
29/// forks, the descriptor will be inherited by the child but will be closed
30/// automatically upon `exec`. Callers that need a descriptor to survive `exec`
31/// must clear the flag manually.
32pub struct Fd(RawFd);
33
34use std::os::unix::io::{AsRawFd, RawFd};
35
36impl AsRawFd for Fd {
37    fn as_raw_fd(&self) -> RawFd {
38        self.0
39    }
40}
41
42impl Fd {
43    /// Wrap a raw file descriptor.
44    ///
45    /// # Errors
46    /// Returns a [`CoreError`] if the descriptor is negative.
47    #[inline(always)]
48    pub(crate) fn new(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
49        if fd < 0 {
50            Err(CoreError::sys(errno(), op))
51        } else {
52            Ok(Self(fd))
53        }
54    }
55
56    /// Wrap an owned raw file descriptor.
57    ///
58    /// # Safety
59    /// The caller must guarantee `fd` is valid, open, and uniquely owned by the
60    /// returned `Fd`. Passing a borrowed fd, or closing `fd` after this call,
61    /// can cause double-close or use-after-close bugs.
62    #[inline(always)]
63    pub unsafe fn from_owned_raw_fd(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
64        Self::new(fd, op)
65    }
66
67    /// Create a non-blocking `eventfd` with `EFD_CLOEXEC`.
68    ///
69    /// The descriptor is created with `FD_CLOEXEC` set.
70    ///
71    /// ### Errors
72    /// - `EINVAL`: `init` is invalid.
73    /// - `EMFILE`: Process limit on open file descriptors hit.
74    /// - `ENFILE`: System-wide limit on open files hit.
75    pub fn eventfd(init: u32) -> Result<Self, CoreError> {
76        let fd = unsafe { libc::eventfd(init, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
77        syscall_ret(fd, "eventfd")?;
78        Self::new(fd, "eventfd")
79    }
80
81    /// Create a non-blocking `timerfd` using `CLOCK_MONOTONIC` with `TFD_CLOEXEC`.
82    ///
83    /// The descriptor is created with `FD_CLOEXEC` set.
84    ///
85    /// ### Errors
86    /// - `EMFILE`: Process limit on open file descriptors hit.
87    /// - `ENFILE`: System-wide limit on open files hit.
88    /// - `ENOMEM`: Insufficient kernel memory.
89    pub fn timerfd() -> Result<Self, CoreError> {
90        let fd = unsafe {
91            libc::timerfd_create(
92                libc::CLOCK_MONOTONIC,
93                libc::TFD_CLOEXEC | libc::TFD_NONBLOCK,
94            )
95        };
96        syscall_ret(fd, "timerfd_create")?;
97        Self::new(fd, "timerfd_create")
98    }
99
100    /// Access the underlying raw file descriptor.
101    ///
102    /// NOTE: This is an escape hatch for low-level interactions. Prefer using
103    /// the safe methods on `Fd` or implementing `AsRawFd`.
104    #[inline(always)]
105    pub(crate) fn raw(&self) -> RawFd {
106        self.0
107    }
108
109    /// Perform a `dup2` syscall.
110    ///
111    /// ### Errors
112    /// - `EBADF`: The source or target file descriptor is invalid.
113    /// - `EMFILE`: The target descriptor exceeds the process limit.
114    pub fn dup2(&self, target: RawFd) -> Result<(), CoreError> {
115        loop {
116            let r = unsafe { libc::dup2(self.0, target) };
117            if r < 0 {
118                let e = errno();
119                if e == libc::EINTR {
120                    continue;
121                }
122                return syscall_ret(r, "dup2");
123            }
124            return Ok(());
125        }
126    }
127
128    /// Set the `O_NONBLOCK` flag on the descriptor.
129    ///
130    /// ### Errors
131    /// - `EBADF`: The file descriptor is invalid.
132    pub fn set_nonblock(&self) -> Result<(), CoreError> {
133        let flags = unsafe { libc::fcntl(self.0, libc::F_GETFL) };
134        syscall_ret(flags, "fcntl(F_GETFL)")?;
135        let r = unsafe { libc::fcntl(self.0, libc::F_SETFL, flags | libc::O_NONBLOCK) };
136        syscall_ret(r, "fcntl(F_SETFL)")
137    }
138
139    /// Set the `FD_CLOEXEC` flag on the descriptor.
140    ///
141    /// ### Errors
142    /// - `EBADF`: The file descriptor is invalid.
143    pub fn set_cloexec(&self) -> Result<(), CoreError> {
144        let flags = unsafe { libc::fcntl(self.0, libc::F_GETFD) };
145        syscall_ret(flags, "fcntl(F_GETFD)")?;
146        let r = unsafe { libc::fcntl(self.0, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
147        syscall_ret(r, "fcntl(F_SETFD)")
148    }
149
150    /// Read bytes into a mutable slice.
151    ///
152    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
153    ///
154    /// ### Edge Cases
155    /// - **Zero-length read**: Returns `Ok(Some(0))` immediately.
156    /// - **Partial read**: Returns the number of bytes actually read.
157    ///
158    /// ### Errors
159    /// - `EBADF`: The file descriptor is invalid or not open for reading.
160    /// - `EFAULT`: `buf` points outside the process's address space.
161    /// - `EIO`: Low-level I/O error.
162    pub fn read_slice(&self, buf: &mut [u8]) -> Result<Option<usize>, CoreError> {
163        self.read_raw(buf.as_mut_ptr(), buf.len())
164    }
165
166    /// Seek to an absolute file offset.
167    ///
168    /// ### Errors
169    /// - `EBADF`: The file descriptor is not seekable.
170    /// - `EINVAL`: `offset` is invalid.
171    /// - `EOVERFLOW`: The resulting offset exceeds the off_t range.
172    pub fn seek_set(&self, offset: i64) -> Result<u64, CoreError> {
173        loop {
174            let pos = unsafe { libc::lseek(self.0, offset as libc::off_t, libc::SEEK_SET) };
175            if pos < 0 {
176                let e = errno();
177                if e == libc::EINTR {
178                    continue;
179                }
180                return Err(CoreError::sys(e, "lseek"));
181            }
182            return Ok(pos as u64);
183        }
184    }
185
186    /// Write bytes from a slice.
187    ///
188    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
189    ///
190    /// ### Edge Cases
191    /// - **Zero-length write**: Returns `Ok(Some(0))` immediately.
192    /// - **Partial write**: Returns the number of bytes actually written.
193    ///
194    /// ### Errors
195    /// - `EBADF`: The file descriptor is invalid or not open for writing.
196    /// - `EFAULT`: `buf` points outside the process's address space.
197    /// - `EPIPE`: The reading end of a pipe or socket was closed.
198    pub fn write_slice(&self, buf: &[u8]) -> Result<Option<usize>, CoreError> {
199        self.write_raw(buf.as_ptr(), buf.len())
200    }
201
202    /// Read a native-endian `u64`.
203    ///
204    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
205    pub fn read_u64(&self) -> Result<Option<u64>, CoreError> {
206        let mut bytes = [0u8; std::mem::size_of::<u64>()];
207        match self.read_slice(&mut bytes)? {
208            Some(n) if n == bytes.len() => Ok(Some(u64::from_ne_bytes(bytes))),
209            Some(_) => Err(CoreError::sys(libc::EIO, "read_u64")),
210            None => Ok(None),
211        }
212    }
213
214    /// Write a native-endian `u64`.
215    ///
216    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
217    pub fn write_u64(&self, value: u64) -> Result<Option<usize>, CoreError> {
218        self.write_slice(&value.to_ne_bytes())
219    }
220
221    /// Arm or disarm a one-shot `timerfd`.
222    ///
223    /// Passing `None` disarms the timer. Zero durations are rounded up to one
224    /// nanosecond so the timer still expires.
225    ///
226    /// ### Errors
227    /// - `EBADF`: The file descriptor is invalid.
228    /// - `EINVAL`: The duration is invalid or not supported by the kernel.
229    pub fn set_timer_oneshot(&self, delay: Option<Duration>) -> Result<(), CoreError> {
230        let mut spec: libc::itimerspec = unsafe { std::mem::zeroed() };
231        if let Some(delay) = delay {
232            let delay = delay.max(Duration::from_nanos(1));
233            spec.it_value.tv_sec = delay.as_secs() as libc::time_t;
234            spec.it_value.tv_nsec = delay.subsec_nanos() as libc::c_long;
235        }
236
237        let ret = unsafe { libc::timerfd_settime(self.raw(), 0, &spec, std::ptr::null_mut()) };
238        syscall_ret(ret, "timerfd_settime")
239    }
240
241    /// Read bytes into a raw buffer.
242    ///
243    /// Internal callers must ensure `buf` points to a valid writable region of
244    /// at least `count` bytes.
245    ///
246    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
247    pub(crate) fn read_raw(&self, buf: *mut u8, count: usize) -> Result<Option<usize>, CoreError> {
248        loop {
249            let n = unsafe { libc::read(self.0, buf as *mut libc::c_void, count) };
250            if n < 0 {
251                let e = errno();
252                if e == libc::EINTR {
253                    continue;
254                }
255                if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
256                    return Ok(None);
257                }
258                return Err(CoreError::sys(e, "read"));
259            }
260            return Ok(Some(n as usize));
261        }
262    }
263
264    /// Write bytes from a raw buffer.
265    ///
266    /// Internal callers must ensure `buf` points to a valid readable region of
267    /// at least `count` bytes.
268    ///
269    /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
270    pub(crate) fn write_raw(
271        &self,
272        buf: *const u8,
273        count: usize,
274    ) -> Result<Option<usize>, CoreError> {
275        loop {
276            let n = unsafe { libc::write(self.0, buf as *const libc::c_void, count) };
277            if n < 0 {
278                let e = errno();
279                if e == libc::EINTR {
280                    continue;
281                }
282                if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
283                    return Ok(None);
284                }
285                return Err(CoreError::sys(e, "write"));
286            }
287            return Ok(Some(n as usize));
288        }
289    }
290}
291
292impl Drop for Fd {
293    fn drop(&mut self) {
294        if self.0 >= 0 {
295            unsafe {
296                libc::close(self.0);
297            }
298        }
299    }
300}
301
302/// An opaque token representing a registered file descriptor.
303#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
304pub struct Token(u64);
305
306#[allow(dead_code)]
307impl Token {
308    #[inline(always)]
309    pub(crate) fn new(val: u64) -> Self {
310        Self(val)
311    }
312
313    #[inline(always)]
314    pub(crate) fn val(&self) -> u64 {
315        self.0
316    }
317}
318
319/// A readiness event generated by the reactor.
320#[derive(Clone, Copy, Debug)]
321pub struct Event {
322    /// Token associated with the ready descriptor.
323    pub token: Token,
324    /// Descriptor is ready for reading (`EPOLLIN`).
325    pub readable: bool,
326    /// Descriptor has priority data or an exceptional condition (`EPOLLPRI`).
327    pub priority: bool,
328    /// Descriptor is ready for writing (`EPOLLOUT`).
329    pub writable: bool,
330    /// Indicates an error condition (`EPOLLERR`).
331    ///
332    /// NOTE: For edge-triggered readiness, an error condition often means both
333    /// readable and writable are set to ensure the handler drains the FD.
334    pub error: bool,
335    /// Indicates a remote hangup (`EPOLLHUP`).
336    pub hangup: bool,
337}
338
339const _: () = assert!(std::mem::size_of::<Event>() == 16);
340const _: () = assert!(std::mem::align_of::<Event>() == 8);
341
342/// A lightweight epoll reactor using edge-triggered monitoring (EPOLLET).
343///
344/// ### Edge-Triggered Contract
345/// Because this reactor uses EPOLLET, all handlers MUST drain their respective
346/// read or write sources until they receive an `EAGAIN` / `EWOULDBLOCK` error
347/// (represented as `Ok(None)` in the `Fd` helpers).
348///
349/// Failure to drain a source will result in missing future readiness events
350/// for that file descriptor until it is re-registered or another event occurs.
351///
352/// ### Fork Safety
353/// The `Reactor` owns an `epoll` descriptor which is `O_CLOEXEC`. After an
354/// `exec` call in a child process, the reactor and all its registrations are
355/// lost. If the child continues without `exec`, it shares the same epoll
356/// instance, which is generally unsafe and requires careful coordination.
357///
358/// # Example
359/// ```no_run
360/// # use coreshift_core::reactor::{Reactor, Fd, Event};
361/// # fn example(fd: Fd) -> Result<(), Box<dyn std::error::Error>> {
362/// let mut reactor = Reactor::new()?;
363/// let token = reactor.add(&fd, true, false)?;
364///
365/// let mut events = Vec::new();
366/// loop {
367///     reactor.wait(&mut events, 64, -1)?;
368///     for ev in &events {
369///         if ev.token == token {
370///             // Drain fd...
371///         }
372///     }
373/// }
374/// # Ok(())
375/// # }
376/// ```
377pub struct Reactor {
378    epfd: RawFd,
379    next_token: u64,
380    events_buf: Vec<libc::epoll_event>,
381    signalfd: Option<Fd>,
382    signalfd_previous_mask: Option<libc::sigset_t>,
383    /// Token for the signalfd (if initialized).
384    sigchld_token: Option<Token>,
385    /// Token for the inotify fd (if initialized).
386    inotify_token: Option<Token>,
387}
388
389impl Reactor {
390    /// Create a new epoll reactor.
391    ///
392    /// ### Errors
393    /// - `EMFILE`: Process limit on open file descriptors hit.
394    /// - `ENFILE`: System-wide limit on open files hit.
395    /// - `ENOMEM`: Insufficient kernel memory.
396    pub fn new() -> Result<Self, CoreError> {
397        let epfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
398        syscall_ret(epfd, "epoll_create1")?;
399        Ok(Self {
400            epfd,
401            next_token: 1,
402            events_buf: Vec::with_capacity(64),
403            signalfd: None,
404            signalfd_previous_mask: None,
405            sigchld_token: None,
406            inotify_token: None,
407        })
408    }
409
410    /// Initialize inotify and add it to the reactor.
411    ///
412    /// ### Errors
413    /// - `EMFILE`: Process limit on open file descriptors hit.
414    /// - `ENFILE`: System-wide limit on open files hit.
415    /// - `ENOMEM`: Insufficient kernel memory.
416    /// - `EPERM`: Permission denied to create inotify instance.
417    pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError> {
418        let fd = unsafe { libc::inotify_init1(libc::IN_CLOEXEC | libc::IN_NONBLOCK) };
419        syscall_ret(fd, "inotify_init1")?;
420
421        let fd_obj = Fd::new(fd, "inotify")?;
422        let token = self.add(&fd_obj, true, false)?;
423        self.inotify_token = Some(token);
424
425        Ok((fd_obj, token))
426    }
427
428    /// Initialize signalfd for SIGCHLD and add it to the reactor.
429    ///
430    /// The previous current-thread signal mask is restored when the reactor is
431    /// dropped.
432    ///
433    /// ### Errors
434    /// - `EBADF`: The provided file descriptor is invalid.
435    /// - `EINVAL`: Signal mask is invalid or already set up.
436    /// - `EMFILE`: Process limit on open file descriptors hit.
437    pub fn setup_signalfd(&mut self) -> Result<Token, CoreError> {
438        if self.signalfd.is_some() {
439            return Err(CoreError::sys(
440                libc::EINVAL,
441                "setup_signalfd already initialized",
442            ));
443        }
444
445        let mut mask: libc::sigset_t = unsafe { std::mem::zeroed() };
446        unsafe { libc::sigemptyset(&mut mask) };
447        unsafe { libc::sigaddset(&mut mask, libc::SIGCHLD) };
448
449        let mut previous_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
450        let r = unsafe { libc::pthread_sigmask(libc::SIG_BLOCK, &mask, &mut previous_mask) };
451        if r != 0 {
452            return Err(CoreError::sys(r, "pthread_sigmask(SIG_BLOCK)"));
453        }
454
455        let sfd = unsafe { libc::signalfd(-1, &mask, libc::SFD_NONBLOCK | libc::SFD_CLOEXEC) };
456        if let Err(err) = syscall_ret(sfd, "signalfd") {
457            let _ = unsafe {
458                libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
459            };
460            return Err(err);
461        }
462
463        let fd = Fd::new(sfd, "signalfd")?;
464        let token = match self.add(&fd, true, false) {
465            Ok(token) => token,
466            Err(err) => {
467                let _ = unsafe {
468                    libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
469                };
470                return Err(err);
471            }
472        };
473
474        self.signalfd = Some(fd);
475        self.signalfd_previous_mask = Some(previous_mask);
476        self.sigchld_token = Some(token);
477
478        Ok(token)
479    }
480
481    /// Drain the internal signalfd buffer.
482    pub fn drain_signalfd(&self) -> Result<(), CoreError> {
483        if let Some(fd) = &self.signalfd {
484            let mut buf = [0u8; std::mem::size_of::<libc::signalfd_siginfo>()];
485            loop {
486                match fd.read_slice(&mut buf) {
487                    Ok(Some(n)) if n < buf.len() => break,
488                    Ok(Some(_)) => continue,
489                    Ok(None) => break,
490                    Err(e) => return Err(e),
491                }
492            }
493        }
494        Ok(())
495    }
496
497    /// Register a file descriptor with the reactor.
498    ///
499    /// This assigns a new unique token for the descriptor and enables
500    /// edge-triggered monitoring.
501    #[inline(always)]
502    pub fn add(&mut self, fd: &Fd, readable: bool, writable: bool) -> Result<Token, CoreError> {
503        let token = Token(self.next_token);
504        self.next_token += 1;
505        self.add_with_token(fd.raw(), token, readable, writable, false)?;
506        Ok(token)
507    }
508
509    /// Register a file descriptor for priority readiness (EPOLLPRI).
510    #[inline(always)]
511    pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError> {
512        let token = Token(self.next_token);
513        self.next_token += 1;
514        self.add_with_token(fd.raw(), token, false, false, true)?;
515        Ok(token)
516    }
517
518    /// Register a file descriptor with custom epoll flags.
519    ///
520    /// This allows registration with flags like `EPOLLONESHOT` or explicit
521    /// control over `EPOLLET`.
522    ///
523    /// # Example
524    /// ```no_run
525    /// # use coreshift_core::reactor::{Reactor, Fd};
526    /// let mut reactor = Reactor::new().unwrap();
527    /// let fd = Fd::eventfd(0).unwrap();
528    /// reactor.add_with_flags(&fd, (libc::EPOLLIN | libc::EPOLLONESHOT) as u32).unwrap();
529    /// ```
530    #[inline(always)]
531    pub fn add_with_flags(&mut self, fd: &Fd, flags: u32) -> Result<Token, CoreError> {
532        let token = Token(self.next_token);
533        self.next_token += 1;
534        let mut ev = libc::epoll_event {
535            events: flags,
536            u64: token.0,
537        };
538        let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd.raw(), &mut ev) };
539        syscall_ret(r, "epoll_ctl_add")?;
540        Ok(token)
541    }
542
543    #[inline(always)]
544    pub(crate) fn add_with_token(
545        &mut self,
546        raw_fd: RawFd,
547        token: Token,
548        readable: bool,
549        writable: bool,
550        priority: bool,
551    ) -> Result<(), CoreError> {
552        let mut events = libc::EPOLLET as u32;
553        if readable {
554            events |= libc::EPOLLIN as u32;
555        }
556        if writable {
557            events |= libc::EPOLLOUT as u32;
558        }
559        if priority {
560            events |= libc::EPOLLPRI as u32;
561        }
562        let mut ev = libc::epoll_event {
563            events,
564            u64: token.0,
565        };
566        let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, raw_fd, &mut ev) };
567        syscall_ret(r, "epoll_ctl_add")?;
568        Ok(())
569    }
570
571    /// Remove a file descriptor from the reactor.
572    #[inline(always)]
573    pub fn del(&self, fd: &Fd) -> Result<(), CoreError> {
574        self.del_raw(fd.raw())
575    }
576
577    /// Remove a raw descriptor from the reactor.
578    ///
579    /// NOTE: This is an escape hatch for low-level interactions. Prefer using
580    /// [`del`](Self::del).
581    #[inline(always)]
582    pub(crate) fn del_raw(&self, raw: RawFd) -> Result<(), CoreError> {
583        loop {
584            let ret = unsafe {
585                libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, raw, std::ptr::null_mut())
586            };
587            if ret == -1 {
588                let e = errno();
589                if e == libc::EINTR {
590                    continue;
591                }
592                return Err(CoreError::sys(e, "epoll_ctl_del"));
593            }
594            return Ok(());
595        }
596    }
597
598    /// Wait for events.
599    ///
600    /// This function blocks until at least one event is ready or the timeout
601    /// expires. Ready events are appended to the `buffer`.
602    ///
603    /// ### Timeout Contract
604    /// - `-1`: Block indefinitely until an event occurs or a signal interrupts.
605    /// - `0`: Return immediately, even if no events are ready.
606    /// - `> 0`: Wait for up to the specified number of milliseconds.
607    ///
608    /// Returns the number of events received.
609    #[inline(always)]
610    pub fn wait(
611        &mut self,
612        buffer: &mut Vec<Event>,
613        max_events: usize,
614        timeout: i32,
615    ) -> Result<usize, CoreError> {
616        buffer.clear();
617
618        if max_events == 0 {
619            return Ok(0);
620        }
621
622        // Ensure buffer has enough capacity
623        if buffer.capacity() < max_events {
624            buffer.reserve(max_events.saturating_sub(buffer.len()));
625        }
626
627        if self.events_buf.capacity() < max_events {
628            self.events_buf
629                .reserve(max_events.saturating_sub(self.events_buf.len()));
630        }
631
632        let n = unsafe {
633            libc::epoll_wait(
634                self.epfd,
635                self.events_buf.as_mut_ptr(),
636                max_events as i32,
637                timeout,
638            )
639        };
640
641        if n > 0 {
642            unsafe {
643                self.events_buf.set_len(n as usize);
644            }
645            for i in 0..n as usize {
646                let ev = self.events_buf[i];
647                let is_read = (ev.events & libc::EPOLLIN as u32) != 0;
648                let is_priority = (ev.events & libc::EPOLLPRI as u32) != 0;
649                let is_write = (ev.events & libc::EPOLLOUT as u32) != 0;
650                let is_err = (ev.events & libc::EPOLLERR as u32) != 0;
651                let is_hup = (ev.events & libc::EPOLLHUP as u32) != 0;
652
653                buffer.push(Event {
654                    token: Token(ev.u64),
655                    readable: is_read || is_err,
656                    priority: is_priority || is_err,
657                    writable: is_write || is_err,
658                    error: is_err,
659                    hangup: is_hup,
660                });
661            }
662            return Ok(n as usize);
663        }
664
665        if n < 0 {
666            let e = errno();
667            if e == libc::EINTR {
668                return Ok(0);
669            }
670            return Err(CoreError::sys(e, "epoll_wait"));
671        }
672        Ok(0)
673    }
674
675    /// Return the raw epoll file descriptor.
676    ///
677    /// NOTE: This is an escape hatch for low-level interactions.
678    #[allow(dead_code)]
679    pub(crate) fn fd(&self) -> RawFd {
680        self.epfd
681    }
682}
683
684impl Drop for Reactor {
685    fn drop(&mut self) {
686        if let Some(mask) = self.signalfd_previous_mask.take() {
687            let _ =
688                unsafe { libc::pthread_sigmask(libc::SIG_SETMASK, &mask, std::ptr::null_mut()) };
689        }
690        if self.epfd >= 0 {
691            unsafe {
692                libc::close(self.epfd);
693            }
694        }
695    }
696}