coreshift_core/reactor/mod.rs
1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/
4
5//! Asynchronous event reactor.
6//!
7//! This module provides a lightweight wrapper around Linux `epoll` for
8//! multiplexing I/O events. It is optimized for edge-triggered monitoring.
9//! It is intentionally explicit about Linux readiness semantics rather than
10//! hiding them behind a higher-level async runtime abstraction.
11
12use crate::CoreError;
13use crate::error::syscall_ret;
14use std::io::Error as IoError;
15use std::time::Duration;
16
17#[inline(always)]
18fn errno() -> i32 {
19 IoError::last_os_error().raw_os_error().unwrap_or(0)
20}
21
22/// An owned file descriptor that closes on drop.
23///
24/// `Fd` is move-only. Constructing one from a raw descriptor transfers close
25/// ownership to `Fd`; do not also close the raw descriptor elsewhere.
26///
27/// ### Fork Safety
28/// `Fd` instances created by Core usually have `O_CLOEXEC` set. If the process
29/// forks, the descriptor will be inherited by the child but will be closed
30/// automatically upon `exec`. Callers that need a descriptor to survive `exec`
31/// must clear the flag manually.
32pub struct Fd(RawFd);
33
34use std::os::unix::io::{AsRawFd, RawFd};
35
36impl AsRawFd for Fd {
37 fn as_raw_fd(&self) -> RawFd {
38 self.0
39 }
40}
41
42impl Fd {
43 /// Wrap a raw file descriptor.
44 ///
45 /// # Errors
46 /// Returns a [`CoreError`] if the descriptor is negative.
47 #[inline(always)]
48 pub(crate) fn new(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
49 if fd < 0 {
50 Err(CoreError::sys(errno(), op))
51 } else {
52 Ok(Self(fd))
53 }
54 }
55
56 /// Wrap an owned raw file descriptor.
57 ///
58 /// # Safety
59 /// The caller must guarantee `fd` is valid, open, and uniquely owned by the
60 /// returned `Fd`. Passing a borrowed fd, or closing `fd` after this call,
61 /// can cause double-close or use-after-close bugs.
62 #[inline(always)]
63 pub unsafe fn from_owned_raw_fd(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
64 Self::new(fd, op)
65 }
66
67 /// Create a non-blocking `eventfd` with `EFD_CLOEXEC`.
68 ///
69 /// The descriptor is created with `FD_CLOEXEC` set.
70 ///
71 /// ### Errors
72 /// - `EINVAL`: `init` is invalid.
73 /// - `EMFILE`: Process limit on open file descriptors hit.
74 /// - `ENFILE`: System-wide limit on open files hit.
75 pub fn eventfd(init: u32) -> Result<Self, CoreError> {
76 let fd = unsafe { libc::eventfd(init, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
77 syscall_ret(fd, "eventfd")?;
78 Self::new(fd, "eventfd")
79 }
80
81 /// Create a non-blocking `timerfd` using `CLOCK_MONOTONIC` with `TFD_CLOEXEC`.
82 ///
83 /// The descriptor is created with `FD_CLOEXEC` set.
84 ///
85 /// ### Errors
86 /// - `EMFILE`: Process limit on open file descriptors hit.
87 /// - `ENFILE`: System-wide limit on open files hit.
88 /// - `ENOMEM`: Insufficient kernel memory.
89 pub fn timerfd() -> Result<Self, CoreError> {
90 let fd = unsafe {
91 libc::timerfd_create(
92 libc::CLOCK_MONOTONIC,
93 libc::TFD_CLOEXEC | libc::TFD_NONBLOCK,
94 )
95 };
96 syscall_ret(fd, "timerfd_create")?;
97 Self::new(fd, "timerfd_create")
98 }
99
100 /// Access the underlying raw file descriptor.
101 ///
102 /// NOTE: This is an escape hatch for low-level interactions. Prefer using
103 /// the safe methods on `Fd` or implementing `AsRawFd`.
104 #[inline(always)]
105 pub(crate) fn raw(&self) -> RawFd {
106 self.0
107 }
108
109 /// Perform a `dup2` syscall.
110 ///
111 /// ### Errors
112 /// - `EBADF`: The source or target file descriptor is invalid.
113 /// - `EMFILE`: The target descriptor exceeds the process limit.
114 pub fn dup2(&self, target: RawFd) -> Result<(), CoreError> {
115 loop {
116 let r = unsafe { libc::dup2(self.0, target) };
117 if r < 0 {
118 let e = errno();
119 if e == libc::EINTR {
120 continue;
121 }
122 return syscall_ret(r, "dup2");
123 }
124 return Ok(());
125 }
126 }
127
128 /// Set the `O_NONBLOCK` flag on the descriptor.
129 ///
130 /// ### Errors
131 /// - `EBADF`: The file descriptor is invalid.
132 pub fn set_nonblock(&self) -> Result<(), CoreError> {
133 let flags = unsafe { libc::fcntl(self.0, libc::F_GETFL) };
134 syscall_ret(flags, "fcntl(F_GETFL)")?;
135 let r = unsafe { libc::fcntl(self.0, libc::F_SETFL, flags | libc::O_NONBLOCK) };
136 syscall_ret(r, "fcntl(F_SETFL)")
137 }
138
139 /// Set the `FD_CLOEXEC` flag on the descriptor.
140 ///
141 /// ### Errors
142 /// - `EBADF`: The file descriptor is invalid.
143 pub fn set_cloexec(&self) -> Result<(), CoreError> {
144 let flags = unsafe { libc::fcntl(self.0, libc::F_GETFD) };
145 syscall_ret(flags, "fcntl(F_GETFD)")?;
146 let r = unsafe { libc::fcntl(self.0, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
147 syscall_ret(r, "fcntl(F_SETFD)")
148 }
149
150 /// Read bytes into a mutable slice.
151 ///
152 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
153 ///
154 /// ### Edge Cases
155 /// - **Zero-length read**: Returns `Ok(Some(0))` immediately.
156 /// - **Partial read**: Returns the number of bytes actually read.
157 ///
158 /// ### Errors
159 /// - `EBADF`: The file descriptor is invalid or not open for reading.
160 /// - `EFAULT`: `buf` points outside the process's address space.
161 /// - `EIO`: Low-level I/O error.
162 pub fn read_slice(&self, buf: &mut [u8]) -> Result<Option<usize>, CoreError> {
163 self.read_raw(buf.as_mut_ptr(), buf.len())
164 }
165
166 /// Seek to an absolute file offset.
167 ///
168 /// ### Errors
169 /// - `EBADF`: The file descriptor is not seekable.
170 /// - `EINVAL`: `offset` is invalid.
171 /// - `EOVERFLOW`: The resulting offset exceeds the off_t range.
172 pub fn seek_set(&self, offset: i64) -> Result<u64, CoreError> {
173 loop {
174 let pos = unsafe { libc::lseek(self.0, offset as libc::off_t, libc::SEEK_SET) };
175 if pos < 0 {
176 let e = errno();
177 if e == libc::EINTR {
178 continue;
179 }
180 return Err(CoreError::sys(e, "lseek"));
181 }
182 return Ok(pos as u64);
183 }
184 }
185
186 /// Write bytes from a slice.
187 ///
188 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
189 ///
190 /// ### Edge Cases
191 /// - **Zero-length write**: Returns `Ok(Some(0))` immediately.
192 /// - **Partial write**: Returns the number of bytes actually written.
193 ///
194 /// ### Errors
195 /// - `EBADF`: The file descriptor is invalid or not open for writing.
196 /// - `EFAULT`: `buf` points outside the process's address space.
197 /// - `EPIPE`: The reading end of a pipe or socket was closed.
198 pub fn write_slice(&self, buf: &[u8]) -> Result<Option<usize>, CoreError> {
199 self.write_raw(buf.as_ptr(), buf.len())
200 }
201
202 /// Read a native-endian `u64`.
203 ///
204 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
205 pub fn read_u64(&self) -> Result<Option<u64>, CoreError> {
206 let mut bytes = [0u8; std::mem::size_of::<u64>()];
207 match self.read_slice(&mut bytes)? {
208 Some(n) if n == bytes.len() => Ok(Some(u64::from_ne_bytes(bytes))),
209 Some(_) => Err(CoreError::sys(libc::EIO, "read_u64")),
210 None => Ok(None),
211 }
212 }
213
214 /// Write a native-endian `u64`.
215 ///
216 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
217 pub fn write_u64(&self, value: u64) -> Result<Option<usize>, CoreError> {
218 self.write_slice(&value.to_ne_bytes())
219 }
220
221 /// Arm or disarm a one-shot `timerfd`.
222 ///
223 /// Passing `None` disarms the timer. Zero durations are rounded up to one
224 /// nanosecond so the timer still expires.
225 ///
226 /// ### Errors
227 /// - `EBADF`: The file descriptor is invalid.
228 /// - `EINVAL`: The duration is invalid or not supported by the kernel.
229 pub fn set_timer_oneshot(&self, delay: Option<Duration>) -> Result<(), CoreError> {
230 let mut spec: libc::itimerspec = unsafe { std::mem::zeroed() };
231 if let Some(delay) = delay {
232 let delay = delay.max(Duration::from_nanos(1));
233 spec.it_value.tv_sec = delay.as_secs() as libc::time_t;
234 spec.it_value.tv_nsec = delay.subsec_nanos() as libc::c_long;
235 }
236
237 let ret = unsafe { libc::timerfd_settime(self.raw(), 0, &spec, std::ptr::null_mut()) };
238 syscall_ret(ret, "timerfd_settime")
239 }
240
241 /// Read bytes into a raw buffer.
242 ///
243 /// Internal callers must ensure `buf` points to a valid writable region of
244 /// at least `count` bytes.
245 ///
246 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
247 pub(crate) fn read_raw(&self, buf: *mut u8, count: usize) -> Result<Option<usize>, CoreError> {
248 loop {
249 let n = unsafe { libc::read(self.0, buf as *mut libc::c_void, count) };
250 if n < 0 {
251 let e = errno();
252 if e == libc::EINTR {
253 continue;
254 }
255 if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
256 return Ok(None);
257 }
258 return Err(CoreError::sys(e, "read"));
259 }
260 return Ok(Some(n as usize));
261 }
262 }
263
264 /// Write bytes from a raw buffer.
265 ///
266 /// Internal callers must ensure `buf` points to a valid readable region of
267 /// at least `count` bytes.
268 ///
269 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
270 pub(crate) fn write_raw(
271 &self,
272 buf: *const u8,
273 count: usize,
274 ) -> Result<Option<usize>, CoreError> {
275 loop {
276 let n = unsafe { libc::write(self.0, buf as *const libc::c_void, count) };
277 if n < 0 {
278 let e = errno();
279 if e == libc::EINTR {
280 continue;
281 }
282 if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
283 return Ok(None);
284 }
285 return Err(CoreError::sys(e, "write"));
286 }
287 return Ok(Some(n as usize));
288 }
289 }
290}
291
292impl Drop for Fd {
293 fn drop(&mut self) {
294 if self.0 >= 0 {
295 unsafe {
296 libc::close(self.0);
297 }
298 }
299 }
300}
301
302/// An opaque token representing a registered file descriptor.
303#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
304pub struct Token(u64);
305
306#[allow(dead_code)]
307impl Token {
308 #[inline(always)]
309 pub(crate) fn new(val: u64) -> Self {
310 Self(val)
311 }
312
313 #[inline(always)]
314 pub(crate) fn val(&self) -> u64 {
315 self.0
316 }
317}
318
319/// A readiness event generated by the reactor.
320#[derive(Clone, Copy, Debug)]
321pub struct Event {
322 /// Token associated with the ready descriptor.
323 pub token: Token,
324 /// Descriptor is ready for reading (`EPOLLIN`).
325 pub readable: bool,
326 /// Descriptor has priority data or an exceptional condition (`EPOLLPRI`).
327 pub priority: bool,
328 /// Descriptor is ready for writing (`EPOLLOUT`).
329 pub writable: bool,
330 /// Indicates an error condition (`EPOLLERR`).
331 ///
332 /// NOTE: For edge-triggered readiness, an error condition often means both
333 /// readable and writable are set to ensure the handler drains the FD.
334 pub error: bool,
335 /// Indicates a remote hangup (`EPOLLHUP`).
336 pub hangup: bool,
337}
338
339const _: () = assert!(std::mem::size_of::<Event>() == 16);
340const _: () = assert!(std::mem::align_of::<Event>() == 8);
341
342/// A lightweight epoll reactor using edge-triggered monitoring (EPOLLET).
343///
344/// ### Edge-Triggered Contract
345/// Because this reactor uses EPOLLET, all handlers MUST drain their respective
346/// read or write sources until they receive an `EAGAIN` / `EWOULDBLOCK` error
347/// (represented as `Ok(None)` in the `Fd` helpers).
348///
349/// Failure to drain a source will result in missing future readiness events
350/// for that file descriptor until it is re-registered or another event occurs.
351///
352/// ### Fork Safety
353/// The `Reactor` owns an `epoll` descriptor which is `O_CLOEXEC`. After an
354/// `exec` call in a child process, the reactor and all its registrations are
355/// lost. If the child continues without `exec`, it shares the same epoll
356/// instance, which is generally unsafe and requires careful coordination.
357///
358/// # Example
359/// ```no_run
360/// # use coreshift_core::reactor::{Reactor, Fd, Event};
361/// # fn example(fd: Fd) -> Result<(), Box<dyn std::error::Error>> {
362/// let mut reactor = Reactor::new()?;
363/// let token = reactor.add(&fd, true, false)?;
364///
365/// let mut events = Vec::new();
366/// loop {
367/// reactor.wait(&mut events, 64, -1)?;
368/// for ev in &events {
369/// if ev.token == token {
370/// // Drain fd...
371/// }
372/// }
373/// }
374/// # Ok(())
375/// # }
376/// ```
377pub struct Reactor {
378 epfd: RawFd,
379 next_token: u64,
380 events_buf: Vec<libc::epoll_event>,
381 signalfd: Option<Fd>,
382 signalfd_previous_mask: Option<libc::sigset_t>,
383 /// Token for the signalfd (if initialized).
384 sigchld_token: Option<Token>,
385 /// Token for the inotify fd (if initialized).
386 inotify_token: Option<Token>,
387}
388
389impl Reactor {
390 /// Create a new epoll reactor.
391 ///
392 /// ### Errors
393 /// - `EMFILE`: Process limit on open file descriptors hit.
394 /// - `ENFILE`: System-wide limit on open files hit.
395 /// - `ENOMEM`: Insufficient kernel memory.
396 pub fn new() -> Result<Self, CoreError> {
397 let epfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
398 syscall_ret(epfd, "epoll_create1")?;
399 Ok(Self {
400 epfd,
401 next_token: 1,
402 events_buf: Vec::with_capacity(64),
403 signalfd: None,
404 signalfd_previous_mask: None,
405 sigchld_token: None,
406 inotify_token: None,
407 })
408 }
409
410 /// Initialize inotify and add it to the reactor.
411 ///
412 /// ### Errors
413 /// - `EMFILE`: Process limit on open file descriptors hit.
414 /// - `ENFILE`: System-wide limit on open files hit.
415 /// - `ENOMEM`: Insufficient kernel memory.
416 /// - `EPERM`: Permission denied to create inotify instance.
417 pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError> {
418 let fd = unsafe { libc::inotify_init1(libc::IN_CLOEXEC | libc::IN_NONBLOCK) };
419 syscall_ret(fd, "inotify_init1")?;
420
421 let fd_obj = Fd::new(fd, "inotify")?;
422 let token = self.add(&fd_obj, true, false)?;
423 self.inotify_token = Some(token);
424
425 Ok((fd_obj, token))
426 }
427
428 /// Initialize signalfd for SIGCHLD and add it to the reactor.
429 ///
430 /// The previous current-thread signal mask is restored when the reactor is
431 /// dropped.
432 ///
433 /// ### Errors
434 /// - `EBADF`: The provided file descriptor is invalid.
435 /// - `EINVAL`: Signal mask is invalid or already set up.
436 /// - `EMFILE`: Process limit on open file descriptors hit.
437 pub fn setup_signalfd(&mut self) -> Result<Token, CoreError> {
438 if self.signalfd.is_some() {
439 return Err(CoreError::sys(
440 libc::EINVAL,
441 "setup_signalfd already initialized",
442 ));
443 }
444
445 let mut mask: libc::sigset_t = unsafe { std::mem::zeroed() };
446 unsafe { libc::sigemptyset(&mut mask) };
447 unsafe { libc::sigaddset(&mut mask, libc::SIGCHLD) };
448
449 let mut previous_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
450 let r = unsafe { libc::pthread_sigmask(libc::SIG_BLOCK, &mask, &mut previous_mask) };
451 if r != 0 {
452 return Err(CoreError::sys(r, "pthread_sigmask(SIG_BLOCK)"));
453 }
454
455 let sfd = unsafe { libc::signalfd(-1, &mask, libc::SFD_NONBLOCK | libc::SFD_CLOEXEC) };
456 if let Err(err) = syscall_ret(sfd, "signalfd") {
457 let _ = unsafe {
458 libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
459 };
460 return Err(err);
461 }
462
463 let fd = Fd::new(sfd, "signalfd")?;
464 let token = match self.add(&fd, true, false) {
465 Ok(token) => token,
466 Err(err) => {
467 let _ = unsafe {
468 libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
469 };
470 return Err(err);
471 }
472 };
473
474 self.signalfd = Some(fd);
475 self.signalfd_previous_mask = Some(previous_mask);
476 self.sigchld_token = Some(token);
477
478 Ok(token)
479 }
480
481 /// Drain the internal signalfd buffer.
482 pub fn drain_signalfd(&self) -> Result<(), CoreError> {
483 if let Some(fd) = &self.signalfd {
484 let mut buf = [0u8; std::mem::size_of::<libc::signalfd_siginfo>()];
485 loop {
486 match fd.read_slice(&mut buf) {
487 Ok(Some(n)) if n < buf.len() => break,
488 Ok(Some(_)) => continue,
489 Ok(None) => break,
490 Err(e) => return Err(e),
491 }
492 }
493 }
494 Ok(())
495 }
496
497 /// Register a file descriptor with the reactor.
498 ///
499 /// This assigns a new unique token for the descriptor and enables
500 /// edge-triggered monitoring.
501 #[inline(always)]
502 pub fn add(&mut self, fd: &Fd, readable: bool, writable: bool) -> Result<Token, CoreError> {
503 let token = Token(self.next_token);
504 self.next_token += 1;
505 self.add_with_token(fd.raw(), token, readable, writable, false)?;
506 Ok(token)
507 }
508
509 /// Register a file descriptor for priority readiness (EPOLLPRI).
510 #[inline(always)]
511 pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError> {
512 let token = Token(self.next_token);
513 self.next_token += 1;
514 self.add_with_token(fd.raw(), token, false, false, true)?;
515 Ok(token)
516 }
517
518 /// Register a file descriptor with custom epoll flags.
519 ///
520 /// This allows registration with flags like `EPOLLONESHOT` or explicit
521 /// control over `EPOLLET`.
522 ///
523 /// # Example
524 /// ```no_run
525 /// # use coreshift_core::reactor::{Reactor, Fd};
526 /// let mut reactor = Reactor::new().unwrap();
527 /// let fd = Fd::eventfd(0).unwrap();
528 /// reactor.add_with_flags(&fd, (libc::EPOLLIN | libc::EPOLLONESHOT) as u32).unwrap();
529 /// ```
530 #[inline(always)]
531 pub fn add_with_flags(&mut self, fd: &Fd, flags: u32) -> Result<Token, CoreError> {
532 let token = Token(self.next_token);
533 self.next_token += 1;
534 let mut ev = libc::epoll_event {
535 events: flags,
536 u64: token.0,
537 };
538 let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd.raw(), &mut ev) };
539 syscall_ret(r, "epoll_ctl_add")?;
540 Ok(token)
541 }
542
543 #[inline(always)]
544 pub(crate) fn add_with_token(
545 &mut self,
546 raw_fd: RawFd,
547 token: Token,
548 readable: bool,
549 writable: bool,
550 priority: bool,
551 ) -> Result<(), CoreError> {
552 let mut events = libc::EPOLLET as u32;
553 if readable {
554 events |= libc::EPOLLIN as u32;
555 }
556 if writable {
557 events |= libc::EPOLLOUT as u32;
558 }
559 if priority {
560 events |= libc::EPOLLPRI as u32;
561 }
562 let mut ev = libc::epoll_event {
563 events,
564 u64: token.0,
565 };
566 let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, raw_fd, &mut ev) };
567 syscall_ret(r, "epoll_ctl_add")?;
568 Ok(())
569 }
570
571 /// Remove a file descriptor from the reactor.
572 #[inline(always)]
573 pub fn del(&self, fd: &Fd) -> Result<(), CoreError> {
574 self.del_raw(fd.raw())
575 }
576
577 /// Remove a raw descriptor from the reactor.
578 ///
579 /// NOTE: This is an escape hatch for low-level interactions. Prefer using
580 /// [`del`](Self::del).
581 #[inline(always)]
582 pub(crate) fn del_raw(&self, raw: RawFd) -> Result<(), CoreError> {
583 loop {
584 let ret = unsafe {
585 libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, raw, std::ptr::null_mut())
586 };
587 if ret == -1 {
588 let e = errno();
589 if e == libc::EINTR {
590 continue;
591 }
592 return Err(CoreError::sys(e, "epoll_ctl_del"));
593 }
594 return Ok(());
595 }
596 }
597
598 /// Wait for events.
599 ///
600 /// This function blocks until at least one event is ready or the timeout
601 /// expires. Ready events are appended to the `buffer`.
602 ///
603 /// ### Timeout Contract
604 /// - `-1`: Block indefinitely until an event occurs or a signal interrupts.
605 /// - `0`: Return immediately, even if no events are ready.
606 /// - `> 0`: Wait for up to the specified number of milliseconds.
607 ///
608 /// Returns the number of events received.
609 #[inline(always)]
610 pub fn wait(
611 &mut self,
612 buffer: &mut Vec<Event>,
613 max_events: usize,
614 timeout: i32,
615 ) -> Result<usize, CoreError> {
616 buffer.clear();
617
618 if max_events == 0 {
619 return Ok(0);
620 }
621
622 // Ensure buffer has enough capacity
623 if buffer.capacity() < max_events {
624 buffer.reserve(max_events.saturating_sub(buffer.len()));
625 }
626
627 if self.events_buf.capacity() < max_events {
628 self.events_buf
629 .reserve(max_events.saturating_sub(self.events_buf.len()));
630 }
631
632 let n = unsafe {
633 libc::epoll_wait(
634 self.epfd,
635 self.events_buf.as_mut_ptr(),
636 max_events as i32,
637 timeout,
638 )
639 };
640
641 if n > 0 {
642 unsafe {
643 self.events_buf.set_len(n as usize);
644 }
645 for i in 0..n as usize {
646 let ev = self.events_buf[i];
647 let is_read = (ev.events & libc::EPOLLIN as u32) != 0;
648 let is_priority = (ev.events & libc::EPOLLPRI as u32) != 0;
649 let is_write = (ev.events & libc::EPOLLOUT as u32) != 0;
650 let is_err = (ev.events & libc::EPOLLERR as u32) != 0;
651 let is_hup = (ev.events & libc::EPOLLHUP as u32) != 0;
652
653 buffer.push(Event {
654 token: Token(ev.u64),
655 readable: is_read || is_err,
656 priority: is_priority || is_err,
657 writable: is_write || is_err,
658 error: is_err,
659 hangup: is_hup,
660 });
661 }
662 return Ok(n as usize);
663 }
664
665 if n < 0 {
666 let e = errno();
667 if e == libc::EINTR {
668 return Ok(0);
669 }
670 return Err(CoreError::sys(e, "epoll_wait"));
671 }
672 Ok(0)
673 }
674
675 /// Return the raw epoll file descriptor.
676 ///
677 /// NOTE: This is an escape hatch for low-level interactions.
678 #[allow(dead_code)]
679 pub(crate) fn fd(&self) -> RawFd {
680 self.epfd
681 }
682}
683
684impl Drop for Reactor {
685 fn drop(&mut self) {
686 if let Some(mask) = self.signalfd_previous_mask.take() {
687 let _ =
688 unsafe { libc::pthread_sigmask(libc::SIG_SETMASK, &mask, std::ptr::null_mut()) };
689 }
690 if self.epfd >= 0 {
691 unsafe {
692 libc::close(self.epfd);
693 }
694 }
695 }
696}