1use crate::CoreError;
13use crate::error::syscall_ret;
14use std::io::Error as IoError;
15use std::time::Duration;
16
17#[inline(always)]
18fn errno() -> i32 {
19 IoError::last_os_error().raw_os_error().unwrap_or(0)
20}
21
22pub struct Fd(RawFd);
27
28use std::os::unix::io::{AsRawFd, RawFd};
29
30impl AsRawFd for Fd {
31 fn as_raw_fd(&self) -> RawFd {
32 self.0
33 }
34}
35
36impl Fd {
37 #[inline(always)]
42 pub(crate) fn new(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
43 if fd < 0 {
44 Err(CoreError::sys(errno(), op))
45 } else {
46 Ok(Self(fd))
47 }
48 }
49
50 #[inline(always)]
57 pub unsafe fn from_owned_raw_fd(fd: RawFd, op: &'static str) -> Result<Self, CoreError> {
58 Self::new(fd, op)
59 }
60
61 pub fn eventfd(init: u32) -> Result<Self, CoreError> {
63 let fd = unsafe { libc::eventfd(init, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
64 syscall_ret(fd, "eventfd")?;
65 Self::new(fd, "eventfd")
66 }
67
68 pub fn timerfd() -> Result<Self, CoreError> {
70 let fd = unsafe {
71 libc::timerfd_create(
72 libc::CLOCK_MONOTONIC,
73 libc::TFD_CLOEXEC | libc::TFD_NONBLOCK,
74 )
75 };
76 syscall_ret(fd, "timerfd_create")?;
77 Self::new(fd, "timerfd_create")
78 }
79
80 #[inline(always)]
85 pub(crate) fn raw(&self) -> RawFd {
86 self.0
87 }
88
89 pub fn dup2(&self, target: RawFd) -> Result<(), CoreError> {
91 loop {
92 let r = unsafe { libc::dup2(self.0, target) };
93 if r < 0 {
94 let e = errno();
95 if e == libc::EINTR {
96 continue;
97 }
98 return syscall_ret(r, "dup2");
99 }
100 return Ok(());
101 }
102 }
103
104 pub fn set_nonblock(&self) -> Result<(), CoreError> {
106 let flags = unsafe { libc::fcntl(self.0, libc::F_GETFL) };
107 syscall_ret(flags, "fcntl(F_GETFL)")?;
108 let r = unsafe { libc::fcntl(self.0, libc::F_SETFL, flags | libc::O_NONBLOCK) };
109 syscall_ret(r, "fcntl(F_SETFL)")
110 }
111
112 pub fn set_cloexec(&self) -> Result<(), CoreError> {
114 let flags = unsafe { libc::fcntl(self.0, libc::F_GETFD) };
115 syscall_ret(flags, "fcntl(F_GETFD)")?;
116 let r = unsafe { libc::fcntl(self.0, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
117 syscall_ret(r, "fcntl(F_SETFD)")
118 }
119
120 pub fn read_slice(&self, buf: &mut [u8]) -> Result<Option<usize>, CoreError> {
124 self.read_raw(buf.as_mut_ptr(), buf.len())
125 }
126
127 pub fn seek_set(&self, offset: i64) -> Result<u64, CoreError> {
129 loop {
130 let pos = unsafe { libc::lseek(self.0, offset as libc::off_t, libc::SEEK_SET) };
131 if pos < 0 {
132 let e = errno();
133 if e == libc::EINTR {
134 continue;
135 }
136 return Err(CoreError::sys(e, "lseek"));
137 }
138 return Ok(pos as u64);
139 }
140 }
141
142 pub fn write_slice(&self, buf: &[u8]) -> Result<Option<usize>, CoreError> {
146 self.write_raw(buf.as_ptr(), buf.len())
147 }
148
149 pub fn read_u64(&self) -> Result<Option<u64>, CoreError> {
153 let mut bytes = [0u8; std::mem::size_of::<u64>()];
154 match self.read_slice(&mut bytes)? {
155 Some(n) if n == bytes.len() => Ok(Some(u64::from_ne_bytes(bytes))),
156 Some(_) => Err(CoreError::sys(libc::EIO, "read_u64")),
157 None => Ok(None),
158 }
159 }
160
161 pub fn write_u64(&self, value: u64) -> Result<Option<usize>, CoreError> {
165 self.write_slice(&value.to_ne_bytes())
166 }
167
168 pub fn set_timer_oneshot(&self, delay: Option<Duration>) -> Result<(), CoreError> {
173 let mut spec: libc::itimerspec = unsafe { std::mem::zeroed() };
174 if let Some(delay) = delay {
175 let delay = delay.max(Duration::from_nanos(1));
176 spec.it_value.tv_sec = delay.as_secs() as libc::time_t;
177 spec.it_value.tv_nsec = delay.subsec_nanos() as libc::c_long;
178 }
179
180 let ret = unsafe { libc::timerfd_settime(self.raw(), 0, &spec, std::ptr::null_mut()) };
181 syscall_ret(ret, "timerfd_settime")
182 }
183
184 pub(crate) fn read_raw(&self, buf: *mut u8, count: usize) -> Result<Option<usize>, CoreError> {
191 loop {
192 let n = unsafe { libc::read(self.0, buf as *mut libc::c_void, count) };
193 if n < 0 {
194 let e = errno();
195 if e == libc::EINTR {
196 continue;
197 }
198 if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
199 return Ok(None);
200 }
201 return Err(CoreError::sys(e, "read"));
202 }
203 return Ok(Some(n as usize));
204 }
205 }
206
207 pub(crate) fn write_raw(
214 &self,
215 buf: *const u8,
216 count: usize,
217 ) -> Result<Option<usize>, CoreError> {
218 loop {
219 let n = unsafe { libc::write(self.0, buf as *const libc::c_void, count) };
220 if n < 0 {
221 let e = errno();
222 if e == libc::EINTR {
223 continue;
224 }
225 if e == libc::EAGAIN || e == libc::EWOULDBLOCK {
226 return Ok(None);
227 }
228 return Err(CoreError::sys(e, "write"));
229 }
230 return Ok(Some(n as usize));
231 }
232 }
233}
234
235impl Drop for Fd {
236 fn drop(&mut self) {
237 if self.0 >= 0 {
238 unsafe {
239 libc::close(self.0);
240 }
241 }
242 }
243}
244
245#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
247pub struct Token(u64);
248
249#[allow(dead_code)]
250impl Token {
251 #[inline(always)]
252 pub(crate) fn new(val: u64) -> Self {
253 Self(val)
254 }
255
256 #[inline(always)]
257 pub(crate) fn val(&self) -> u64 {
258 self.0
259 }
260}
261
262#[derive(Clone, Copy, Debug)]
264pub struct Event {
265 pub token: Token,
267 pub readable: bool,
269 pub priority: bool,
271 pub writable: bool,
273 pub error: bool,
278}
279
280pub struct Reactor {
310 epfd: RawFd,
311 next_token: u64,
312 events_buf: Vec<libc::epoll_event>,
313 signalfd: Option<Fd>,
314 signalfd_previous_mask: Option<libc::sigset_t>,
315 sigchld_token: Option<Token>,
317 inotify_token: Option<Token>,
319}
320
321impl Reactor {
322 pub fn new() -> Result<Self, CoreError> {
327 let epfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
328 syscall_ret(epfd, "epoll_create1")?;
329 Ok(Self {
330 epfd,
331 next_token: 1,
332 events_buf: Vec::with_capacity(64),
333 signalfd: None,
334 signalfd_previous_mask: None,
335 sigchld_token: None,
336 inotify_token: None,
337 })
338 }
339
340 pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError> {
345 let fd = unsafe { libc::inotify_init1(libc::IN_CLOEXEC | libc::IN_NONBLOCK) };
346 syscall_ret(fd, "inotify_init1")?;
347
348 let fd_obj = Fd::new(fd, "inotify")?;
349 let token = self.add(&fd_obj, true, false)?;
350 self.inotify_token = Some(token);
351
352 Ok((fd_obj, token))
353 }
354
355 pub fn setup_signalfd(&mut self) -> Result<Token, CoreError> {
363 if self.signalfd.is_some() {
364 return Err(CoreError::sys(
365 libc::EINVAL,
366 "setup_signalfd already initialized",
367 ));
368 }
369
370 let mut mask: libc::sigset_t = unsafe { std::mem::zeroed() };
371 unsafe { libc::sigemptyset(&mut mask) };
372 unsafe { libc::sigaddset(&mut mask, libc::SIGCHLD) };
373
374 let mut previous_mask: libc::sigset_t = unsafe { std::mem::zeroed() };
375 let r = unsafe { libc::pthread_sigmask(libc::SIG_BLOCK, &mask, &mut previous_mask) };
376 if r != 0 {
377 return Err(CoreError::sys(r, "pthread_sigmask(SIG_BLOCK)"));
378 }
379
380 let sfd = unsafe { libc::signalfd(-1, &mask, libc::SFD_NONBLOCK | libc::SFD_CLOEXEC) };
381 if let Err(err) = syscall_ret(sfd, "signalfd") {
382 let _ = unsafe {
383 libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
384 };
385 return Err(err);
386 }
387
388 let fd = Fd::new(sfd, "signalfd")?;
389 let token = match self.add(&fd, true, false) {
390 Ok(token) => token,
391 Err(err) => {
392 let _ = unsafe {
393 libc::pthread_sigmask(libc::SIG_SETMASK, &previous_mask, std::ptr::null_mut())
394 };
395 return Err(err);
396 }
397 };
398
399 self.signalfd = Some(fd);
400 self.signalfd_previous_mask = Some(previous_mask);
401 self.sigchld_token = Some(token);
402
403 Ok(token)
404 }
405
406 pub fn drain_signalfd(&self) -> Result<(), CoreError> {
408 if let Some(fd) = &self.signalfd {
409 let mut buf = [0u8; std::mem::size_of::<libc::signalfd_siginfo>()];
410 loop {
411 match fd.read_slice(&mut buf) {
412 Ok(Some(n)) if n < buf.len() => break,
413 Ok(Some(_)) => continue,
414 Ok(None) => break,
415 Err(e) => return Err(e),
416 }
417 }
418 }
419 Ok(())
420 }
421
422 #[inline(always)]
427 pub fn add(&mut self, fd: &Fd, readable: bool, writable: bool) -> Result<Token, CoreError> {
428 let token = Token(self.next_token);
429 self.next_token += 1;
430 self.add_with_token(fd.raw(), token, readable, writable, false)?;
431 Ok(token)
432 }
433
434 #[inline(always)]
436 pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError> {
437 let token = Token(self.next_token);
438 self.next_token += 1;
439 self.add_with_token(fd.raw(), token, false, false, true)?;
440 Ok(token)
441 }
442
443 #[inline(always)]
444 pub(crate) fn add_with_token(
445 &mut self,
446 raw_fd: RawFd,
447 token: Token,
448 readable: bool,
449 writable: bool,
450 priority: bool,
451 ) -> Result<(), CoreError> {
452 let mut events = libc::EPOLLET as u32;
453 if readable {
454 events |= libc::EPOLLIN as u32;
455 }
456 if writable {
457 events |= libc::EPOLLOUT as u32;
458 }
459 if priority {
460 events |= libc::EPOLLPRI as u32;
461 }
462 let mut ev = libc::epoll_event {
463 events,
464 u64: token.0,
465 };
466 let r = unsafe { libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_ADD, raw_fd, &mut ev) };
467 syscall_ret(r, "epoll_ctl_add")?;
468 Ok(())
469 }
470
471 #[inline(always)]
473 pub fn del(&self, fd: &Fd) -> Result<(), CoreError> {
474 self.del_raw(fd.raw())
475 }
476
477 #[inline(always)]
482 pub(crate) fn del_raw(&self, raw: RawFd) -> Result<(), CoreError> {
483 loop {
484 let ret = unsafe {
485 libc::epoll_ctl(self.epfd, libc::EPOLL_CTL_DEL, raw, std::ptr::null_mut())
486 };
487 if ret == -1 {
488 let e = errno();
489 if e == libc::EINTR {
490 continue;
491 }
492 return Err(CoreError::sys(e, "epoll_ctl_del"));
493 }
494 return Ok(());
495 }
496 }
497
498 #[inline(always)]
505 pub fn wait(
506 &mut self,
507 buffer: &mut Vec<Event>,
508 max_events: usize,
509 timeout: i32,
510 ) -> Result<usize, CoreError> {
511 buffer.clear();
512
513 if max_events == 0 {
514 return Ok(0);
515 }
516
517 if buffer.capacity() < max_events {
519 buffer.reserve(max_events.saturating_sub(buffer.len()));
520 }
521
522 if self.events_buf.capacity() < max_events {
523 self.events_buf
524 .reserve(max_events.saturating_sub(self.events_buf.len()));
525 }
526
527 let n = unsafe {
528 libc::epoll_wait(
529 self.epfd,
530 self.events_buf.as_mut_ptr(),
531 max_events as i32,
532 timeout,
533 )
534 };
535
536 if n > 0 {
537 unsafe {
538 self.events_buf.set_len(n as usize);
539 }
540 for i in 0..n as usize {
541 let ev = self.events_buf[i];
542 let is_read = (ev.events & libc::EPOLLIN as u32) != 0;
543 let is_priority = (ev.events & libc::EPOLLPRI as u32) != 0;
544 let is_write = (ev.events & libc::EPOLLOUT as u32) != 0;
545 let is_err = (ev.events & (libc::EPOLLERR | libc::EPOLLHUP) as u32) != 0;
546
547 buffer.push(Event {
548 token: Token(ev.u64),
549 readable: is_read || is_err,
550 priority: is_priority || is_err,
551 writable: is_write || is_err,
552 error: is_err,
553 });
554 }
555 return Ok(n as usize);
556 }
557
558 if n < 0 {
559 let e = errno();
560 if e == libc::EINTR {
561 return Ok(0);
562 }
563 return Err(CoreError::sys(e, "epoll_wait"));
564 }
565 Ok(0)
566 }
567
568 #[allow(dead_code)]
572 pub(crate) fn fd(&self) -> RawFd {
573 self.epfd
574 }
575}
576
577impl Drop for Reactor {
578 fn drop(&mut self) {
579 if let Some(mask) = self.signalfd_previous_mask.take() {
580 let _ =
581 unsafe { libc::pthread_sigmask(libc::SIG_SETMASK, &mask, std::ptr::null_mut()) };
582 }
583 if self.epfd >= 0 {
584 unsafe {
585 libc::close(self.epfd);
586 }
587 }
588 }
589}