coreshift_core/reactor/mod.rs
1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/
4
5//! Asynchronous event reactor.
6//!
7//! This module provides a lightweight wrapper around Linux `epoll` for
8//! multiplexing I/O events. It is optimized for edge-triggered monitoring.
9//! It is intentionally explicit about Linux readiness semantics rather than
10//! hiding them behind a higher-level async runtime abstraction.
11
12use crate::CoreError;
13use crate::error::syscall_ret;
14use std::io::Error as IoError;
15use std::time::Duration;
16
17#[inline(always)]
18fn errno() -> i32 {
19 IoError::last_os_error().raw_os_error().unwrap_or(0)
20}
21
22/// An owned file descriptor that closes on drop.
23///
24/// `Fd` is move-only. Constructing one from a raw descriptor transfers close
25/// ownership to `Fd`; do not also close the raw descriptor elsewhere.
26///
27/// ### Fork Safety
28/// `Fd` instances created by Core usually have `O_CLOEXEC` set. If the process
29/// forks, the descriptor will be inherited by the child but will be closed
30/// automatically upon `exec`. Callers that need a descriptor to survive `exec`
31/// must clear the flag manually.
32pub struct Fd(RawFd);
33
34use std::os::unix::io::{AsRawFd, RawFd};
35
36impl AsRawFd for Fd {
37 fn as_raw_fd(&self) -> RawFd {
38 self.0
39 }
40}
41
42impl Fd {
43 /// Wrap a raw file descriptor.
44 ///
45 /// # Errors
46 /// Returns a [`CoreError`] if the descriptor is negative.
47 #[inline(always)]
48 pub(crate) fn new(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
49 if fd < 0 {
50 Err(CoreError::sys(errno(), op))
51 } else {
52 Ok(Self(fd))
53 }
54 }
55
56 /// Wrap an owned raw file descriptor.
57 ///
58 /// # Safety
59 /// The caller must guarantee `fd` is valid, open, and uniquely owned by the
60 /// returned `Fd`. Passing a borrowed fd, or closing `fd` after this call,
61 /// can cause double-close or use-after-close bugs.
62 #[inline(always)]
63 pub unsafe fn from_owned_raw_fd(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
64 Self::new(fd, op)
65 }
66
67 /// Create a non-blocking `eventfd` with `EFD_CLOEXEC`.
68 ///
69 /// The descriptor is created with `FD_CLOEXEC` set.
70 ///
71 /// ### Errors
72 /// - `EINVAL`: `init` is invalid.
73 /// - `EMFILE`: Process limit on open file descriptors hit.
74 /// - `ENFILE`: System-wide limit on open files hit.
75 pub fn eventfd(init: u32) -> Result<Self, CoreError> {
76 let fd = unsafe { libc::eventfd(init, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
77 syscall_ret(fd, "eventfd")?;
78 Self::new(fd, "eventfd")
79 }
80
81 /// Create a non-blocking `timerfd` using `CLOCK_MONOTONIC` with `TFD_CLOEXEC`.
82 ///
83 /// The descriptor is created with `FD_CLOEXEC` set.
84 ///
85 /// ### Errors
86 /// - `EMFILE`: Process limit on open file descriptors hit.
87 /// - `ENFILE`: System-wide limit on open files hit.
88 /// - `ENOMEM`: Insufficient kernel memory.
89 pub fn timerfd() -> Result<Self, CoreError> {
90 let fd = unsafe {
91 libc::timerfd_create(
92 libc::CLOCK_MONOTONIC,
93 libc::TFD_CLOEXEC | libc::TFD_NONBLOCK,
94 )
95 };
96 syscall_ret(fd, "timerfd_create")?;
97 Self::new(fd, "timerfd_create")
98 }
99
100 /// Access the underlying raw file descriptor.
101 ///
102 /// NOTE: This is an escape hatch for low-level interactions. Prefer using
103 /// the safe methods on `Fd` or implementing `AsRawFd`.
104 #[inline(always)]
105 pub(crate) fn raw(&self) -> RawFd {
106 self.0
107 }
108
109 /// Perform a `dup2` syscall.
110 ///
111 /// ### Errors
112 /// - `EBADF`: The source or target file descriptor is invalid.
113 /// - `EMFILE`: The target descriptor exceeds the process limit.
114 pub fn dup2(&self, target: RawFd) -> Result<(), CoreError> {
115 loop {
116 let r = unsafe { libc::dup2(self.0, target) };
117 if r < 0 {
118 let e = errno();
119 if e == libc::EINTR {
120 continue;
121 }
122 return syscall_ret(r, "dup2");
123 }
124 return Ok(());
125 }
126 }
127
128 /// Set the `O_NONBLOCK` flag on the descriptor.
129 ///
130 /// ### Errors
131 /// - `EBADF`: The file descriptor is invalid.
132 pub fn set_nonblock(&self) -> Result<(), CoreError> {
133 let flags = unsafe { libc::fcntl(self.0, libc::F_GETFL) };
134 syscall_ret(flags, "fcntl(F_GETFL)")?;
135 let r = unsafe { libc::fcntl(self.0, libc::F_SETFL, flags | libc::O_NONBLOCK) };
136 syscall_ret(r, "fcntl(F_SETFL)")
137 }
138
139 /// Set the `FD_CLOEXEC` flag on the descriptor.
140 ///
141 /// ### Errors
142 /// - `EBADF`: The file descriptor is invalid.
143 pub fn set_cloexec(&self) -> Result<(), CoreError> {
144 let flags = unsafe { libc::fcntl(self.0, libc::F_GETFD) };
145 syscall_ret(flags, "fcntl(F_GETFD)")?;
146 let r = unsafe { libc::fcntl(self.0, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
147 syscall_ret(r, "fcntl(F_SETFD)")
148 }
149
150 /// Read bytes into a mutable slice.
151 ///
152 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
153 ///
154 /// ### Edge Cases
155 /// - **Zero-length read**: Returns `Ok(Some(0))` immediately.
156 /// - **Partial read**: Returns the number of bytes actually read.
157 ///
158 /// ### Errors
159 /// - `EBADF`: The file descriptor is invalid or not open for reading.
160 /// - `EFAULT`: `buf` points outside the process's address space.
161 /// - `EIO`: Low-level I/O error.
162 pub fn read_slice(&self, buf: &mut [u8]) -> Result<Option<usize>, CoreError> {
163 self.read_raw(buf.as_mut_ptr(), buf.len())
164 }
165
166 /// Seek to an absolute file offset.
167 ///
168 /// ### Errors
169 /// - `EBADF`: The file descriptor is not seekable.
170 /// - `EINVAL`: `offset` is invalid.
171 /// - `EOVERFLOW`: The resulting offset exceeds the off_t range.
172 pub fn seek_set(&self, offset: i64) -> Result<u64, CoreError> {
173 loop {
174 let pos = unsafe { libc::lseek(self.0, offset as libc::off_t, libc::SEEK_SET) };
175 if pos < 0 {
176 let e = errno();
177 if e == libc::EINTR {
178 continue;
179 }
180 return Err(CoreError::sys(e, "lseek"));
181 }
182 return Ok(pos as u64);
183 }
184 }
185
186 /// Write bytes from a slice.
187 ///
188 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
189 ///
190 /// ### Edge Cases
191 /// - **Zero-length write**: Returns `Ok(Some(0))` immediately.
192 /// - **Partial write**: Returns the number of bytes actually written.
193 ///
194 /// ### Errors
195 /// - `EBADF`: The file descriptor is invalid or not open for writing.
196 /// - `EFAULT`: `buf` points outside the process's address space.
197 /// - `EPIPE`: The reading end of a pipe or socket was closed.
198 pub fn write_slice(&self, buf: &[u8]) -> Result<Option<usize>, CoreError> {
199 self.write_raw(buf.as_ptr(), buf.len())
200 }
201
202 /// Read a native-endian `u64`, blocking until data is available.
203 ///
204 /// Unlike `read_u64`, this never returns `Ok(None)` — it retries on `EINTR`
205 /// and returns `Err` only on a hard I/O failure. Intended for blocking
206 /// eventfds used as inter-thread notification primitives.
207 pub fn read_u64_blocking(&self) -> Result<u64, CoreError> {
208 let mut bytes = [0u8; std::mem::size_of::<u64>()];
209 loop {
210 let n = unsafe {
211 libc::read(self.0, bytes.as_mut_ptr() as *mut libc::c_void, bytes.len())
212 };
213 if n == bytes.len() as isize {
214 return Ok(u64::from_ne_bytes(bytes));
215 }
216 if n < 0 {
217 let e = errno();
218 if e == libc::EINTR { continue; }
219 return Err(CoreError::sys(e, "read_u64_blocking"));
220 }
221 return Err(CoreError::sys(libc::EIO, "read_u64_blocking:short_read"));
222 }
223 }
224
225 /// Read a native-endian `u64`.
226 ///
227 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
228 pub fn read_u64(&self) -> Result<Option<u64>, CoreError> {
229 let mut bytes = [0u8; std::mem::size_of::<u64>()];
230 match self.read_slice(&mut bytes)? {
231 Some(n) if n == bytes.len() => Ok(Some(u64::from_ne_bytes(bytes))),
232 Some(_) => Err(CoreError::sys(libc::EIO, "read_u64")),
233 None => Ok(None),
234 }
235 }
236
237 /// Write a native-endian `u64`.
238 ///
239 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
240 pub fn write_u64(&self, value: u64) -> Result<Option<usize>, CoreError> {
241 self.write_slice(&value.to_ne_bytes())
242 }
243
244 /// Arm or disarm a one-shot `timerfd`.
245 ///
246 /// Passing `None` disarms the timer. Zero durations are rounded up to one
247 /// nanosecond so the timer still expires.
248 ///
249 /// ### Errors
250 /// - `EBADF`: The file descriptor is invalid.
251 /// - `EINVAL`: The duration is invalid or not supported by the kernel.
252 pub fn set_timer_oneshot(&self, delay: Option<Duration>) -> Result<(), CoreError> {
253 let mut spec: libc::itimerspec = unsafe { std::mem::zeroed() };
254 if let Some(delay) = delay {
255 let delay = delay.max(Duration::from_nanos(1));
256 spec.it_value.tv_sec = delay.as_secs() as libc::time_t;
257 spec.it_value.tv_nsec = delay.subsec_nanos() as libc::c_long;
258 }
259
260 let ret = unsafe { libc::timerfd_settime(self.raw(), 0, &spec, std::ptr::null_mut()) };
261 syscall_ret(ret, "timerfd_settime")
262 }
263
264 /// Read bytes into a raw buffer.
265 ///
266 /// Internal callers must ensure `buf` points to a valid writable region of
267 /// at least `count` bytes.
268 ///
269 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
270 pub(crate) fn read_raw(&self, buf: *mut u8, count: usize) -> Result<Option<usize>, CoreError> {
271 loop {
272 let n = unsafe { libc::read(self.0, buf as *mut libc::c_void, count) };
273 if n < 0 {
274 let e = errno();
275 if e == libc::EINTR {
276 continue;
277 }
278 if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
279 return Ok(None);
280 }
281 return Err(CoreError::sys(e, "read"));
282 }
283 return Ok(Some(n as usize));
284 }
285 }
286
287 /// Write bytes from a raw buffer.
288 ///
289 /// Internal callers must ensure `buf` points to a valid readable region of
290 /// at least `count` bytes.
291 ///
292 /// Returns `Ok(None)` if the operation would block (`EAGAIN`).
293 pub(crate) fn write_raw(
294 &self,
295 buf: *const u8,
296 count: usize,
297 ) -> Result<Option<usize>, CoreError> {
298 loop {
299 let n = unsafe { libc::write(self.0, buf as *const libc::c_void, count) };
300 if n < 0 {
301 let e = errno();
302 if e == libc::EINTR {
303 continue;
304 }
305 if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
306 return Ok(None);
307 }
308 return Err(CoreError::sys(e, "write"));
309 }
310 return Ok(Some(n as usize));
311 }
312 }
313}
314
315impl Drop for Fd {
316 fn drop(&mut self) {
317 if self.0 >= 0 {
318 unsafe {
319 libc::close(self.0);
320 }
321 }
322 }
323}
324
325/// An opaque token representing a registered file descriptor.
326#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
327pub struct Token(u64);
328
329#[allow(dead_code)]
330impl Token {
331 #[inline(always)]
332 pub(crate) fn new(val: u64) -> Self {
333 Self(val)
334 }
335
336 #[inline(always)]
337 pub(crate) fn val(&self) -> u64 {
338 self.0
339 }
340}
341
342/// A readiness event generated by the reactor.
343#[derive(Clone, Copy, Debug)]
344pub struct Event {
345 /// Token associated with the ready descriptor.
346 pub token: Token,
347 /// Descriptor is ready for reading (`EPOLLIN`).
348 pub readable: bool,
349 /// Descriptor has priority data or an exceptional condition (`EPOLLPRI`).
350 pub priority: bool,
351 /// Descriptor is ready for writing (`EPOLLOUT`).
352 pub writable: bool,
353 /// Indicates an error condition (`EPOLLERR`).
354 ///
355 /// NOTE: For edge-triggered readiness, an error condition often means both
356 /// readable and writable are set to ensure the handler drains the FD.
357 pub error: bool,
358 /// Indicates a remote hangup (`EPOLLHUP`).
359 pub hangup: bool,
360}
361
362const _: () = assert!(std::mem::size_of::<Event>() == 16);
363const _: () = assert!(std::mem::align_of::<Event>() == 8);
364
365/// A lightweight epoll reactor using edge-triggered monitoring (EPOLLET).
366///
367/// ### Edge-Triggered Contract
368/// Because this reactor uses EPOLLET, all handlers MUST drain their respective
369/// read or write sources until they receive an `EAGAIN` / `EWOULDBLOCK` error
370/// (represented as `Ok(None)` in the `Fd` helpers).
371///
372/// Failure to drain a source will result in missing future readiness events
373/// for that file descriptor until it is re-registered or another event occurs.
374///
375/// ### Fork Safety
376/// The `Reactor` owns an `epoll` descriptor which is `O_CLOEXEC`. After an
377/// `exec` call in a child process, the reactor and all its registrations are
378/// lost. If the child continues without `exec`, it shares the same epoll
379/// instance, which is generally unsafe and requires careful coordination.
380///
381/// # Example
382/// ```no_run
383/// # use coreshift_core::reactor::{Reactor, Fd, Event};
384/// # fn example(fd: Fd) -> Result<(), Box<dyn std::error::Error>> {
385/// let mut reactor = Reactor::new()?;
386/// let token = reactor.add(&fd, true, false)?;
387///
388/// let mut events = Vec::new();
389/// loop {
390/// reactor.wait(&mut events, 64, -1)?;
391/// for ev in &events {
392/// if ev.token == token {
393/// // Drain fd...
394/// }
395/// }
396/// }
397/// # Ok(())
398/// # }
399/// ```
400pub struct Reactor {
401 epfd: RawFd,
402 next_token: u64,
403 events_buf: Vec<libc::epoll_event>,
404 signalfd: Option<Fd>,
405 signalfd_previous_mask: Option<libc::sigset_t>,
406 /// Token for the signalfd (if initialized).
407 sigchld_token: Option<Token>,
408 /// Token for the inotify fd (if initialized).
409 inotify_token: Option<Token>,
410}
411
412impl Reactor {
413 /// Create a new epoll reactor.
414 ///
415 /// ### Errors
416 /// - `EMFILE`: Process limit on open file descriptors hit.
417 /// - `ENFILE`: System-wide limit on open files hit.
418 /// - `ENOMEM`: Insufficient kernel memory.
419 pub fn new() -> Result<Self, CoreError> {
420 let epfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
421 syscall_ret(epfd, "epoll_create1")?;
422 Ok(Self {
423 epfd,
424 next_token: 1,
425 events_buf: Vec::with_capacity(64),
426 signalfd: None,
427 signalfd_previous_mask: None,
428 sigchld_token: None,
429 inotify_token: None,
430 })
431 }
432
433 /// Initialize inotify and add it to the reactor.
434 ///
435 /// ### Errors
436 /// - `EMFILE`: Process limit on open file descriptors hit.
437 /// - `ENFILE`: System-wide limit on open files hit.
438 /// - `ENOMEM`: Insufficient kernel memory.
439 /// - `EPERM`: Permission denied to create inotify instance.
440 pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError> {
441 let fd = unsafe { libc::inotify_init1(libc::IN_CLOEXEC | libc::IN_NONBLOCK) };
442 syscall_ret(fd, "inotify_init1")?;
443
444 let fd_obj = Fd::new(fd, "inotify")?;
445 let token = self.add(&fd_obj, true, false)?;
446 self.inotify_token = Some(token);
447
448 Ok((fd_obj, token))
449 }
450
451 /// Initialize signalfd for SIGCHLD and add it to the reactor.
452 ///
453 /// The previous current-thread signal mask is restored when the reactor is
454 /// dropped.
455 ///
456 /// ### Errors
457 /// - `EBADF`: The provided file descriptor is invalid.
458 /// - `EINVAL`: Signal mask is invalid or already set up.
459 /// - `EMFILE`: Process limit on open file descriptors hit.
460 pub fn setup_signalfd(&mut self) -> Result<Token, CoreError> {
461 if self.signalfd.is_some() {
462 return Err(CoreError::sys(
463 libc::EINVAL,
464 "setup_signalfd already initialized",
465 ));
466 }
467
468 let mut mask: libc::sigset_t = unsafe { std::mem::zeroed() };
469 unsafe { libc::sigemptyset(&mut mask) };
470 unsafe { libc::sigaddset(&mut mask, libc::SIGCHLD) };
471
472 let mut previous_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
473 let r = unsafe { libc::pthread_sigmask(libc::SIG_BLOCK, &mask, &mut previous_mask) };
474 if r != 0 {
475 return Err(CoreError::sys(r, "pthread_sigmask(SIG_BLOCK)"));
476 }
477
478 let sfd = unsafe { libc::signalfd(-1, &mask, libc::SFD_NONBLOCK | libc::SFD_CLOEXEC) };
479 if let Err(err) = syscall_ret(sfd, "signalfd") {
480 let _ = unsafe {
481 libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
482 };
483 return Err(err);
484 }
485
486 let fd = Fd::new(sfd, "signalfd")?;
487 let token = match self.add(&fd, true, false) {
488 Ok(token) => token,
489 Err(err) => {
490 let _ = unsafe {
491 libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
492 };
493 return Err(err);
494 }
495 };
496
497 self.signalfd = Some(fd);
498 self.signalfd_previous_mask = Some(previous_mask);
499 self.sigchld_token = Some(token);
500
501 Ok(token)
502 }
503
504 /// Drain the internal signalfd buffer.
505 pub fn drain_signalfd(&self) -> Result<(), CoreError> {
506 if let Some(fd) = &self.signalfd {
507 let mut buf = [0u8; std::mem::size_of::<libc::signalfd_siginfo>()];
508 loop {
509 match fd.read_slice(&mut buf) {
510 Ok(Some(n)) if n < buf.len() => break,
511 Ok(Some(_)) => continue,
512 Ok(None) => break,
513 Err(e) => return Err(e),
514 }
515 }
516 }
517 Ok(())
518 }
519
520 /// Register a file descriptor with the reactor.
521 ///
522 /// This assigns a new unique token for the descriptor and enables
523 /// edge-triggered monitoring.
524 #[inline(always)]
525 pub fn add(&mut self, fd: &Fd, readable: bool, writable: bool) -> Result<Token, CoreError> {
526 let token = Token(self.next_token);
527 self.next_token += 1;
528 self.add_with_token(fd.raw(), token, readable, writable, false)?;
529 Ok(token)
530 }
531
532 /// Register a file descriptor for priority readiness (EPOLLPRI).
533 #[inline(always)]
534 pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError> {
535 let token = Token(self.next_token);
536 self.next_token += 1;
537 self.add_with_token(fd.raw(), token, false, false, true)?;
538 Ok(token)
539 }
540
541 /// Register a file descriptor with custom epoll flags.
542 ///
543 /// This allows registration with flags like `EPOLLONESHOT` or explicit
544 /// control over `EPOLLET`.
545 ///
546 /// # Example
547 /// ```no_run
548 /// # use coreshift_core::reactor::{Reactor, Fd};
549 /// let mut reactor = Reactor::new().unwrap();
550 /// let fd = Fd::eventfd(0).unwrap();
551 /// reactor.add_with_flags(&fd, (libc::EPOLLIN | libc::EPOLLONESHOT) as u32).unwrap();
552 /// ```
553 #[inline(always)]
554 pub fn add_with_flags(&mut self, fd: &Fd, flags: u32) -> Result<Token, CoreError> {
555 let token = Token(self.next_token);
556 self.next_token += 1;
557 let mut ev = libc::epoll_event {
558 events: flags,
559 u64: token.0,
560 };
561 let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, fd.raw(), &mut ev) };
562 syscall_ret(r, "epoll_ctl_add")?;
563 Ok(token)
564 }
565
566 #[inline(always)]
567 pub(crate) fn add_with_token(
568 &mut self,
569 raw_fd: RawFd,
570 token: Token,
571 readable: bool,
572 writable: bool,
573 priority: bool,
574 ) -> Result<(), CoreError> {
575 let mut events = libc::EPOLLET as u32;
576 if readable {
577 events |= libc::EPOLLIN as u32;
578 }
579 if writable {
580 events |= libc::EPOLLOUT as u32;
581 }
582 if priority {
583 events |= libc::EPOLLPRI as u32;
584 }
585 let mut ev = libc::epoll_event {
586 events,
587 u64: token.0,
588 };
589 let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, raw_fd, &mut ev) };
590 syscall_ret(r, "epoll_ctl_add")?;
591 Ok(())
592 }
593
594 /// Remove a file descriptor from the reactor.
595 #[inline(always)]
596 pub fn del(&self, fd: &Fd) -> Result<(), CoreError> {
597 self.del_raw(fd.raw())
598 }
599
600 /// Remove a raw descriptor from the reactor.
601 ///
602 /// NOTE: This is an escape hatch for low-level interactions. Prefer using
603 /// [`del`](Self::del).
604 #[inline(always)]
605 pub(crate) fn del_raw(&self, raw: RawFd) -> Result<(), CoreError> {
606 loop {
607 let ret = unsafe {
608 libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, raw, std::ptr::null_mut())
609 };
610 if ret == -1 {
611 let e = errno();
612 if e == libc::EINTR {
613 continue;
614 }
615 return Err(CoreError::sys(e, "epoll_ctl_del"));
616 }
617 return Ok(());
618 }
619 }
620
621 /// Wait for events.
622 ///
623 /// This function blocks until at least one event is ready or the timeout
624 /// expires. Ready events are appended to the `buffer`.
625 ///
626 /// ### Timeout Contract
627 /// - `-1`: Block indefinitely until an event occurs or a signal interrupts.
628 /// - `0`: Return immediately, even if no events are ready.
629 /// - `> 0`: Wait for up to the specified number of milliseconds.
630 ///
631 /// Returns the number of events received.
632 #[inline(always)]
633 pub fn wait(
634 &mut self,
635 buffer: &mut Vec<Event>,
636 max_events: usize,
637 timeout: i32,
638 ) -> Result<usize, CoreError> {
639 buffer.clear();
640
641 if max_events == 0 {
642 return Ok(0);
643 }
644
645 // Ensure buffer has enough capacity
646 if buffer.capacity() < max_events {
647 buffer.reserve(max_events.saturating_sub(buffer.len()));
648 }
649
650 if self.events_buf.capacity() < max_events {
651 self.events_buf
652 .reserve(max_events.saturating_sub(self.events_buf.len()));
653 }
654
655 let n = unsafe {
656 libc::epoll_wait(
657 self.epfd,
658 self.events_buf.as_mut_ptr(),
659 max_events as i32,
660 timeout,
661 )
662 };
663
664 if n > 0 {
665 unsafe {
666 self.events_buf.set_len(n as usize);
667 }
668 for i in 0..n as usize {
669 let ev = self.events_buf[i];
670 let is_read = (ev.events & libc::EPOLLIN as u32) != 0;
671 let is_priority = (ev.events & libc::EPOLLPRI as u32) != 0;
672 let is_write = (ev.events & libc::EPOLLOUT as u32) != 0;
673 let is_err = (ev.events & libc::EPOLLERR as u32) != 0;
674 let is_hup = (ev.events & libc::EPOLLHUP as u32) != 0;
675
676 buffer.push(Event {
677 token: Token(ev.u64),
678 readable: is_read || is_err,
679 priority: is_priority || is_err,
680 writable: is_write || is_err,
681 error: is_err,
682 hangup: is_hup,
683 });
684 }
685 return Ok(n as usize);
686 }
687
688 if n < 0 {
689 let e = errno();
690 if e == libc::EINTR {
691 return Ok(0);
692 }
693 return Err(CoreError::sys(e, "epoll_wait"));
694 }
695 Ok(0)
696 }
697
698 /// Return the raw epoll file descriptor.
699 ///
700 /// NOTE: This is an escape hatch for low-level interactions.
701 #[allow(dead_code)]
702 pub(crate) fn fd(&self) -> RawFd {
703 self.epfd
704 }
705}
706
707impl Drop for Reactor {
708 fn drop(&mut self) {
709 if let Some(mask) = self.signalfd_previous_mask.take() {
710 let _ =
711 unsafe { libc::pthread_sigmask(libc::SIG_SETMASK, &mask, std::ptr::null_mut()) };
712 }
713 if self.epfd >= 0 {
714 unsafe {
715 libc::close(self.epfd);
716 }
717 }
718 }
719}