Skip to main content

kevy_sys/
lib.rs

1//! kevy-sys — kevy's network-boundary layer.
2//!
3//! One of kevy's three OS-boundary crates (alongside the publishable
4//! [`kevy-uring`](https://crates.io/crates/kevy-uring) and
5//! [`kevy-madvise`](https://crates.io/crates/kevy-madvise)). This is the
6//! server-internal piece — hand-curated to the exact subset of sockets and the
7//! readiness poller (kqueue on macOS, epoll on Linux) that kevy's server
8//! needs. Every binding is declared by hand with `unsafe extern "C"`
9//! (no `libc` crate, no third-party dep). On Linux these symbols resolve
10//! through glibc, on macOS through libSystem — both already linked by
11//! `std`, so we add zero dependencies.
12//!
13//! The poller here is *readiness*-based. The *completion*-based io_uring
14//! engine has moved to its own crate, [`kevy-uring`]; either can back
15//! the reactor on top ([kevy-net]), which exposes only a byte-level
16//! service contract.
17//!
18//! Part of the [kevy] key–value server; not a generic OS-binding library.
19//!
20//! [`kevy-uring`]: https://crates.io/crates/kevy-uring
21//!
22//! # Safety
23//!
24//! `unsafe` is confined to the private `ffi` module's `extern "C"` declarations
25//! and the thin wrappers that call them. The bindings match the platform libc
26//! ABI (socklen_t = `u32`; `struct sockaddr_in`, `kevent`, and `epoll_event`
27//! laid out per-OS/arch). All raw fds are owned by RAII types ([`Socket`],
28//! [`Poller`], [`Waker`]) that close on drop, and errors are read via
29//! `std::io::Error::last_os_error()`. The public API is safe.
30//!
31//! [kevy]: https://crates.io/crates/kevy
32//! [kevy-net]: https://crates.io/crates/kevy-net
33//!
34//! # Example
35//!
36//! ```no_run
37//! use kevy_sys::{Poller, tcp_listen};
38//!
39//! # fn main() -> std::io::Result<()> {
40//! let listener = tcp_listen([127, 0, 0, 1], 6379, 1024)?;
41//! listener.set_nonblocking()?;
42//!
43//! let poller = Poller::new()?;
44//! poller.add(listener.raw(), /* read */ true, /* write */ false)?;
45//!
46//! let mut events = Vec::new();
47//! poller.wait(&mut events, Some(1000))?; // block up to 1s
48//! for ev in &events {
49//!     if ev.fd == listener.raw() && ev.readable {
50//!         let conn = listener.accept()?;
51//!         conn.set_nodelay()?;
52//!         // ... read/write `conn` ...
53//!     }
54//! }
55//! # Ok(())
56//! # }
57//! ```
58
59use core::ffi::{c_int, c_void};
60use core::mem::size_of;
61use core::ptr;
62use std::io;
63
64mod ffi {
65    use core::ffi::{c_int, c_void};
66
67    // socklen_t is u32 on both Linux and macOS.
68    unsafe extern "C" {
69        pub fn socket(domain: c_int, ty: c_int, protocol: c_int) -> c_int;
70        pub fn setsockopt(
71            fd: c_int,
72            level: c_int,
73            optname: c_int,
74            optval: *const c_void,
75            optlen: u32,
76        ) -> c_int;
77        pub fn bind(fd: c_int, addr: *const c_void, addrlen: u32) -> c_int;
78        pub fn listen(fd: c_int, backlog: c_int) -> c_int;
79        pub fn accept(fd: c_int, addr: *mut c_void, addrlen: *mut u32) -> c_int;
80        pub fn getsockname(fd: c_int, addr: *mut c_void, addrlen: *mut u32) -> c_int;
81        pub fn read(fd: c_int, buf: *mut c_void, count: usize) -> isize;
82        pub fn write(fd: c_int, buf: *const c_void, count: usize) -> isize;
83        pub fn close(fd: c_int) -> c_int;
84        // Variadic in C; we only ever pass a single int arg (F_GETFL/F_SETFL).
85        pub fn fcntl(fd: c_int, cmd: c_int, ...) -> c_int;
86        pub fn pipe(fds: *mut c_int) -> c_int;
87    }
88
89    /// `struct timespec` — used by kqueue's `kevent` timeout (macOS only).
90    #[cfg(any(target_os = "macos", target_os = "ios"))]
91    #[repr(C)]
92    pub struct Timespec {
93        pub tv_sec: isize,
94        pub tv_nsec: isize,
95    }
96
97    #[cfg(any(target_os = "macos", target_os = "ios"))]
98    #[repr(C)]
99    pub struct Kevent {
100        pub ident: usize,
101        pub filter: i16,
102        pub flags: u16,
103        pub fflags: u32,
104        pub data: isize,
105        pub udata: usize, // really `void*`; we never use it, keep it integral (Send)
106    }
107
108    #[cfg(any(target_os = "macos", target_os = "ios"))]
109    unsafe extern "C" {
110        pub fn kqueue() -> c_int;
111        pub fn kevent(
112            kq: c_int,
113            changelist: *const Kevent,
114            nchanges: c_int,
115            eventlist: *mut Kevent,
116            nevents: c_int,
117            timeout: *const Timespec,
118        ) -> c_int;
119    }
120
121    // `struct epoll_event` is `__attribute__((packed))` only on x86_64; on every
122    // other arch it is naturally aligned (8-byte `data` after 4-byte `events`,
123    // with 4 bytes of padding). Match the kernel ABI exactly.
124    #[cfg(target_os = "linux")]
125    #[repr(C)]
126    #[cfg_attr(target_arch = "x86_64", repr(packed))]
127    pub struct EpollEvent {
128        pub events: u32,
129        pub data: u64,
130    }
131
132    #[cfg(target_os = "linux")]
133    unsafe extern "C" {
134        pub fn epoll_create1(flags: c_int) -> c_int;
135        pub fn epoll_ctl(epfd: c_int, op: c_int, fd: c_int, event: *mut EpollEvent) -> c_int;
136        pub fn epoll_wait(
137            epfd: c_int,
138            events: *mut EpollEvent,
139            maxevents: c_int,
140            timeout: c_int,
141        ) -> c_int;
142    }
143
144}
145
146// ---- constants -------------------------------------------------------------
147
148const AF_INET: c_int = 2;
149const SOCK_STREAM: c_int = 1;
150const IPPROTO_TCP: c_int = 6;
151const TCP_NODELAY: c_int = 1;
152const F_GETFL: c_int = 3;
153const F_SETFL: c_int = 4;
154
155#[cfg(target_os = "linux")]
156const SOL_SOCKET: c_int = 1;
157#[cfg(target_os = "linux")]
158const SO_REUSEADDR: c_int = 2;
159#[cfg(target_os = "linux")]
160const SO_REUSEPORT: c_int = 15;
161#[cfg(target_os = "linux")]
162const O_NONBLOCK: c_int = 0x800;
163
164#[cfg(any(target_os = "macos", target_os = "ios"))]
165const SOL_SOCKET: c_int = 0xffff;
166#[cfg(any(target_os = "macos", target_os = "ios"))]
167const SO_REUSEADDR: c_int = 0x0004;
168#[cfg(any(target_os = "macos", target_os = "ios"))]
169const SO_REUSEPORT: c_int = 0x0200;
170#[cfg(any(target_os = "macos", target_os = "ios"))]
171const O_NONBLOCK: c_int = 0x0004;
172
173// ---- sockaddr_in -----------------------------------------------------------
174
175#[cfg(target_os = "linux")]
176#[repr(C)]
177struct SockaddrIn {
178    sin_family: u16,
179    sin_port: u16,
180    sin_addr: u32,
181    sin_zero: [u8; 8],
182}
183
184#[cfg(any(target_os = "macos", target_os = "ios"))]
185#[repr(C)]
186struct SockaddrIn {
187    sin_len: u8,
188    sin_family: u8,
189    sin_port: u16,
190    sin_addr: u32,
191    sin_zero: [u8; 8],
192}
193
194impl SockaddrIn {
195    fn new(ip: [u8; 4], port: u16) -> Self {
196        #[cfg(target_os = "linux")]
197        return SockaddrIn {
198            sin_family: AF_INET as u16,
199            sin_port: port.to_be(),
200            sin_addr: u32::from_ne_bytes(ip),
201            sin_zero: [0; 8],
202        };
203        #[cfg(any(target_os = "macos", target_os = "ios"))]
204        return SockaddrIn {
205            sin_len: size_of::<SockaddrIn>() as u8,
206            sin_family: AF_INET as u8,
207            sin_port: port.to_be(),
208            sin_addr: u32::from_ne_bytes(ip),
209            sin_zero: [0; 8],
210        };
211    }
212
213    fn zeroed() -> Self {
214        unsafe { core::mem::zeroed() }
215    }
216}
217
218// ---- Socket ----------------------------------------------------------------
219
220/// An owned socket file descriptor. Closes itself on drop via our own `close`.
221pub struct Socket {
222    fd: c_int,
223}
224
225impl Socket {
226    /// The raw file descriptor. Borrowed — the `Socket` retains ownership.
227    #[inline]
228    pub fn raw(&self) -> i32 {
229        self.fd
230    }
231
232    /// Wrap an already-open fd (e.g. one accepted by io_uring) into an owning
233    /// `Socket` that closes it on drop.
234    ///
235    /// # Safety
236    /// `fd` must be a valid open descriptor whose ownership is transferred here.
237    #[inline]
238    pub unsafe fn from_raw_fd(fd: i32) -> Socket {
239        Socket { fd }
240    }
241
242    /// Accept one inbound connection. On a non-blocking listener with no pending
243    /// connection this returns `Err` with kind `WouldBlock`.
244    pub fn accept(&self) -> io::Result<Socket> {
245        let fd = unsafe { ffi::accept(self.fd, ptr::null_mut(), ptr::null_mut()) };
246        if fd < 0 {
247            return Err(io::Error::last_os_error());
248        }
249        Ok(Socket { fd })
250    }
251
252    /// Read into `buf`, returning the byte count (0 == EOF). Retries on EINTR.
253    /// On a non-blocking socket with no data, returns `WouldBlock`.
254    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
255        loop {
256            let n = unsafe { ffi::read(self.fd, buf.as_mut_ptr() as *mut c_void, buf.len()) };
257            if n < 0 {
258                let e = io::Error::last_os_error();
259                if e.kind() == io::ErrorKind::Interrupted {
260                    continue;
261                }
262                return Err(e);
263            }
264            return Ok(n as usize);
265        }
266    }
267
268    /// A single `write` syscall; may write fewer bytes than requested, or return
269    /// `WouldBlock` on a full non-blocking socket. Retries on EINTR.
270    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
271        loop {
272            let n = unsafe { ffi::write(self.fd, buf.as_ptr() as *const c_void, buf.len()) };
273            if n < 0 {
274                let e = io::Error::last_os_error();
275                if e.kind() == io::ErrorKind::Interrupted {
276                    continue;
277                }
278                return Err(e);
279            }
280            return Ok(n as usize);
281        }
282    }
283
284    /// Write the whole buffer (blocking-socket convenience).
285    pub fn write_all(&self, mut buf: &[u8]) -> io::Result<()> {
286        while !buf.is_empty() {
287            let n = self.write(buf)?;
288            if n == 0 {
289                return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
290            }
291            buf = &buf[n..];
292        }
293        Ok(())
294    }
295
296    /// Put the socket into non-blocking mode (`O_NONBLOCK`).
297    pub fn set_nonblocking(&self) -> io::Result<()> {
298        set_fd_nonblocking(self.fd)
299    }
300
301    /// Disable Nagle's algorithm (`TCP_NODELAY`) for low-latency replies.
302    pub fn set_nodelay(&self) -> io::Result<()> {
303        let one: c_int = 1;
304        let r = unsafe {
305            ffi::setsockopt(
306                self.fd,
307                IPPROTO_TCP,
308                TCP_NODELAY,
309                &one as *const c_int as *const c_void,
310                size_of::<c_int>() as u32,
311            )
312        };
313        if r < 0 {
314            return Err(io::Error::last_os_error());
315        }
316        Ok(())
317    }
318
319    /// The local port this socket is bound to (host byte order).
320    pub fn local_port(&self) -> io::Result<u16> {
321        let mut addr = SockaddrIn::zeroed();
322        let mut len = size_of::<SockaddrIn>() as u32;
323        let r = unsafe {
324            ffi::getsockname(
325                self.fd,
326                &mut addr as *mut SockaddrIn as *mut c_void,
327                &mut len,
328            )
329        };
330        if r < 0 {
331            return Err(io::Error::last_os_error());
332        }
333        Ok(u16::from_be(addr.sin_port))
334    }
335}
336
337impl Drop for Socket {
338    fn drop(&mut self) {
339        unsafe {
340            ffi::close(self.fd);
341        }
342    }
343}
344
345/// Set `O_NONBLOCK` on a raw fd (sockets and pipe ends alike).
346fn set_fd_nonblocking(fd: c_int) -> io::Result<()> {
347    let flags = unsafe { ffi::fcntl(fd, F_GETFL, 0) };
348    if flags < 0 {
349        return Err(io::Error::last_os_error());
350    }
351    if unsafe { ffi::fcntl(fd, F_SETFL, flags | O_NONBLOCK) } < 0 {
352        return Err(io::Error::last_os_error());
353    }
354    Ok(())
355}
356
357fn setsockopt_int(fd: c_int, level: c_int, name: c_int, val: c_int) -> io::Result<()> {
358    let r = unsafe {
359        ffi::setsockopt(
360            fd,
361            level,
362            name,
363            &val as *const c_int as *const c_void,
364            size_of::<c_int>() as u32,
365        )
366    };
367    if r < 0 {
368        return Err(io::Error::last_os_error());
369    }
370    Ok(())
371}
372
373fn listen_inner(ip: [u8; 4], port: u16, backlog: i32, reuseport: bool) -> io::Result<Socket> {
374    let fd = unsafe { ffi::socket(AF_INET, SOCK_STREAM, 0) };
375    if fd < 0 {
376        return Err(io::Error::last_os_error());
377    }
378    let sock = Socket { fd }; // owns fd: any early return closes it
379
380    setsockopt_int(fd, SOL_SOCKET, SO_REUSEADDR, 1)?;
381    if reuseport {
382        // Each thread-per-core shard opens its own listener on the same port;
383        // the kernel load-balances accepted connections across them.
384        setsockopt_int(fd, SOL_SOCKET, SO_REUSEPORT, 1)?;
385    }
386
387    let addr = SockaddrIn::new(ip, port);
388    let r = unsafe {
389        ffi::bind(
390            fd,
391            &addr as *const SockaddrIn as *const c_void,
392            size_of::<SockaddrIn>() as u32,
393        )
394    };
395    if r < 0 {
396        return Err(io::Error::last_os_error());
397    }
398    if unsafe { ffi::listen(fd, backlog) } < 0 {
399        return Err(io::Error::last_os_error());
400    }
401    Ok(sock)
402}
403
404/// Create a blocking IPv4 TCP listener bound to `ip:port` with `SO_REUSEADDR`.
405/// Pass `port == 0` to let the OS assign an ephemeral port.
406pub fn tcp_listen(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
407    listen_inner(ip, port, backlog, false)
408}
409
410/// Like [`tcp_listen`] but also sets `SO_REUSEPORT`, so multiple listeners can
411/// share one port (one per thread-per-core shard).
412pub fn tcp_listen_reuseport(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
413    listen_inner(ip, port, backlog, true)
414}
415
416/// A self-pipe used to wake a [`Poller`] blocked in `wait` from another thread.
417/// Register `read_fd()` in the poller for read-readiness; call `wake()` from any
418/// thread to make the poll return; call `drain()` when the read end fires.
419pub struct Waker {
420    read_fd: c_int,
421    write_fd: c_int,
422}
423
424/// Create a non-blocking self-pipe waker.
425pub fn waker() -> io::Result<Waker> {
426    let mut fds = [0 as c_int; 2];
427    if unsafe { ffi::pipe(fds.as_mut_ptr()) } < 0 {
428        return Err(io::Error::last_os_error());
429    }
430    let w = Waker {
431        read_fd: fds[0],
432        write_fd: fds[1],
433    };
434    set_fd_nonblocking(w.read_fd)?;
435    set_fd_nonblocking(w.write_fd)?;
436    Ok(w)
437}
438
439impl Waker {
440    /// The read end — register this in a [`Poller`] for read-readiness.
441    #[inline]
442    pub fn read_fd(&self) -> i32 {
443        self.read_fd
444    }
445
446    /// Signal the waker. A full pipe already means "pending", so EAGAIN is fine.
447    pub fn wake(&self) -> io::Result<()> {
448        let byte = [1u8];
449        loop {
450            let n = unsafe { ffi::write(self.write_fd, byte.as_ptr() as *const c_void, 1) };
451            if n < 0 {
452                let e = io::Error::last_os_error();
453                match e.kind() {
454                    io::ErrorKind::Interrupted => continue,
455                    io::ErrorKind::WouldBlock => return Ok(()),
456                    _ => return Err(e),
457                }
458            }
459            return Ok(());
460        }
461    }
462
463    /// Consume all pending wake bytes after the read end fires.
464    pub fn drain(&self) {
465        let mut buf = [0u8; 64];
466        loop {
467            let n = unsafe { ffi::read(self.read_fd, buf.as_mut_ptr() as *mut c_void, buf.len()) };
468            if n <= 0 {
469                break; // EAGAIN / EOF / error — nothing more to drain
470            }
471        }
472    }
473}
474
475impl Drop for Waker {
476    fn drop(&mut self) {
477        unsafe {
478            ffi::close(self.read_fd);
479            ffi::close(self.write_fd);
480        }
481    }
482}
483
484// The pipe ends are plain fds with no aliasing; safe to move across threads.
485unsafe impl Send for Waker {}
486unsafe impl Sync for Waker {}
487
488// ---- Poller ----------------------------------------------------------------
489
490/// A readiness notification for one file descriptor.
491#[derive(Debug, Clone, Copy)]
492pub struct Event {
493    pub fd: i32,
494    pub readable: bool,
495    pub writable: bool,
496    /// Peer hang-up / error — the connection should be closed.
497    pub hup: bool,
498}
499
500/// How many raw events to pull from the kernel per `wait` call.
501const WAIT_CAPACITY: usize = 1024;
502
503#[cfg(any(target_os = "macos", target_os = "ios"))]
504mod kq {
505    pub const EVFILT_READ: i16 = -1;
506    pub const EVFILT_WRITE: i16 = -2;
507    pub const EV_ADD: u16 = 0x0001;
508    pub const EV_DELETE: u16 = 0x0002;
509    pub const EV_ENABLE: u16 = 0x0004;
510    pub const EV_DISABLE: u16 = 0x0008;
511    pub const EV_EOF: u16 = 0x8000;
512}
513
514/// Edge/level-readiness poller. macOS: kqueue. Linux: epoll. Same API on both.
515#[cfg(any(target_os = "macos", target_os = "ios"))]
516pub struct Poller {
517    kq: c_int,
518}
519
520#[cfg(any(target_os = "macos", target_os = "ios"))]
521impl Poller {
522    pub fn new() -> io::Result<Self> {
523        let kq = unsafe { ffi::kqueue() };
524        if kq < 0 {
525            return Err(io::Error::last_os_error());
526        }
527        Ok(Poller { kq })
528    }
529
530    fn change(&self, fd: i32, filter: i16, flags: u16) -> io::Result<()> {
531        let kev = ffi::Kevent {
532            ident: fd as usize,
533            filter,
534            flags,
535            fflags: 0,
536            data: 0,
537            udata: 0,
538        };
539        let r = unsafe { ffi::kevent(self.kq, &kev, 1, ptr::null_mut(), 0, ptr::null()) };
540        if r < 0 {
541            return Err(io::Error::last_os_error());
542        }
543        Ok(())
544    }
545
546    /// Register `fd`, enabling the read/write filters per the interest flags.
547    pub fn add(&self, fd: i32, read: bool, write: bool) -> io::Result<()> {
548        let r = if read { kq::EV_ENABLE } else { kq::EV_DISABLE };
549        let w = if write { kq::EV_ENABLE } else { kq::EV_DISABLE };
550        self.change(fd, kq::EVFILT_READ, kq::EV_ADD | r)?;
551        self.change(fd, kq::EVFILT_WRITE, kq::EV_ADD | w)?;
552        Ok(())
553    }
554
555    /// Change the read/write interest of an already-registered `fd`.
556    pub fn modify(&self, fd: i32, read: bool, write: bool) -> io::Result<()> {
557        self.change(
558            fd,
559            kq::EVFILT_READ,
560            if read { kq::EV_ENABLE } else { kq::EV_DISABLE },
561        )?;
562        self.change(
563            fd,
564            kq::EVFILT_WRITE,
565            if write { kq::EV_ENABLE } else { kq::EV_DISABLE },
566        )?;
567        Ok(())
568    }
569
570    /// Best-effort deregistration of both filters.
571    pub fn delete(&self, fd: i32) -> io::Result<()> {
572        let _ = self.change(fd, kq::EVFILT_READ, kq::EV_DELETE);
573        let _ = self.change(fd, kq::EVFILT_WRITE, kq::EV_DELETE);
574        Ok(())
575    }
576
577    /// Wait for readiness, filling `out`. `timeout_ms == None` blocks forever.
578    pub fn wait(&self, out: &mut Vec<Event>, timeout_ms: Option<i32>) -> io::Result<usize> {
579        out.clear();
580        let mut raw: Vec<ffi::Kevent> = Vec::with_capacity(WAIT_CAPACITY);
581        let ts;
582        let ts_ptr = match timeout_ms {
583            Some(ms) => {
584                ts = ffi::Timespec {
585                    tv_sec: (ms / 1000) as isize,
586                    tv_nsec: ((ms % 1000) * 1_000_000) as isize,
587                };
588                &ts as *const ffi::Timespec
589            }
590            None => ptr::null(),
591        };
592        let n = unsafe {
593            ffi::kevent(
594                self.kq,
595                ptr::null(),
596                0,
597                raw.as_mut_ptr(),
598                WAIT_CAPACITY as c_int,
599                ts_ptr,
600            )
601        };
602        if n < 0 {
603            let e = io::Error::last_os_error();
604            if e.kind() == io::ErrorKind::Interrupted {
605                return Ok(0);
606            }
607            return Err(e);
608        }
609        unsafe { raw.set_len(n as usize) };
610        for kev in &raw {
611            out.push(Event {
612                fd: kev.ident as i32,
613                readable: kev.filter == kq::EVFILT_READ,
614                writable: kev.filter == kq::EVFILT_WRITE,
615                hup: kev.flags & kq::EV_EOF != 0,
616            });
617        }
618        Ok(out.len())
619    }
620}
621
622#[cfg(target_os = "linux")]
623mod ep {
624    pub const EPOLL_CLOEXEC: super::c_int = 0x80000;
625    pub const EPOLL_CTL_ADD: super::c_int = 1;
626    pub const EPOLL_CTL_DEL: super::c_int = 2;
627    pub const EPOLL_CTL_MOD: super::c_int = 3;
628    pub const EPOLLIN: u32 = 0x001;
629    pub const EPOLLOUT: u32 = 0x004;
630    pub const EPOLLERR: u32 = 0x008;
631    pub const EPOLLHUP: u32 = 0x010;
632    pub const EPOLLRDHUP: u32 = 0x2000;
633}
634
635#[cfg(target_os = "linux")]
636pub struct Poller {
637    epfd: c_int,
638}
639
640#[cfg(target_os = "linux")]
641impl Poller {
642    pub fn new() -> io::Result<Self> {
643        let epfd = unsafe { ffi::epoll_create1(ep::EPOLL_CLOEXEC) };
644        if epfd < 0 {
645            return Err(io::Error::last_os_error());
646        }
647        Ok(Poller { epfd })
648    }
649
650    fn mask(read: bool, write: bool) -> u32 {
651        let mut m = ep::EPOLLRDHUP;
652        if read {
653            m |= ep::EPOLLIN;
654        }
655        if write {
656            m |= ep::EPOLLOUT;
657        }
658        m
659    }
660
661    fn ctl(&self, op: c_int, fd: i32, read: bool, write: bool) -> io::Result<()> {
662        let mut ev = ffi::EpollEvent {
663            events: Self::mask(read, write),
664            data: fd as u64,
665        };
666        let r = unsafe { ffi::epoll_ctl(self.epfd, op, fd, &mut ev) };
667        if r < 0 {
668            return Err(io::Error::last_os_error());
669        }
670        Ok(())
671    }
672
673    pub fn add(&self, fd: i32, read: bool, write: bool) -> io::Result<()> {
674        self.ctl(ep::EPOLL_CTL_ADD, fd, read, write)
675    }
676
677    pub fn modify(&self, fd: i32, read: bool, write: bool) -> io::Result<()> {
678        self.ctl(ep::EPOLL_CTL_MOD, fd, read, write)
679    }
680
681    pub fn delete(&self, fd: i32) -> io::Result<()> {
682        let r = unsafe { ffi::epoll_ctl(self.epfd, ep::EPOLL_CTL_DEL, fd, ptr::null_mut()) };
683        if r < 0 {
684            return Err(io::Error::last_os_error());
685        }
686        Ok(())
687    }
688
689    pub fn wait(&self, out: &mut Vec<Event>, timeout_ms: Option<i32>) -> io::Result<usize> {
690        out.clear();
691        let mut raw: Vec<ffi::EpollEvent> = Vec::with_capacity(WAIT_CAPACITY);
692        let n = unsafe {
693            ffi::epoll_wait(
694                self.epfd,
695                raw.as_mut_ptr(),
696                WAIT_CAPACITY as c_int,
697                timeout_ms.unwrap_or(-1),
698            )
699        };
700        if n < 0 {
701            let e = io::Error::last_os_error();
702            if e.kind() == io::ErrorKind::Interrupted {
703                return Ok(0);
704            }
705            return Err(e);
706        }
707        unsafe { raw.set_len(n as usize) };
708        for ev in &raw {
709            let flags = ev.events; // copy out (struct may be packed on x86_64)
710            let fd = ev.data as i32;
711            let hup = flags & (ep::EPOLLHUP | ep::EPOLLERR | ep::EPOLLRDHUP) != 0;
712            out.push(Event {
713                fd,
714                readable: flags & (ep::EPOLLIN | ep::EPOLLHUP | ep::EPOLLERR) != 0,
715                writable: flags & ep::EPOLLOUT != 0,
716                hup,
717            });
718        }
719        Ok(out.len())
720    }
721}
722
723#[cfg(any(target_os = "linux", target_os = "macos", target_os = "ios"))]
724impl Drop for Poller {
725    fn drop(&mut self) {
726        #[cfg(target_os = "linux")]
727        let fd = self.epfd;
728        #[cfg(any(target_os = "macos", target_os = "ios"))]
729        let fd = self.kq;
730        unsafe {
731            ffi::close(fd);
732        }
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use super::*;
739    use std::io::{Read, Write};
740
741    #[test]
742    fn listen_accept_roundtrip() {
743        let listener = tcp_listen([127, 0, 0, 1], 0, 16).unwrap();
744        let port = listener.local_port().unwrap();
745        assert_ne!(port, 0);
746
747        let server = std::thread::spawn(move || {
748            let conn = listener.accept().unwrap();
749            let mut b = [0u8; 1];
750            assert_eq!(conn.read(&mut b).unwrap(), 1);
751            conn.write_all(&b).unwrap();
752        });
753
754        let mut client = std::net::TcpStream::connect(("127.0.0.1", port)).unwrap();
755        client.write_all(b"Z").unwrap();
756        let mut got = [0u8; 1];
757        assert_eq!(client.read(&mut got).unwrap(), 1);
758        assert_eq!(&got, b"Z");
759
760        server.join().unwrap();
761    }
762
763    #[test]
764    fn poller_signals_listener_readable() {
765        let listener = tcp_listen([127, 0, 0, 1], 0, 16).unwrap();
766        listener.set_nonblocking().unwrap();
767        let port = listener.local_port().unwrap();
768
769        let poller = Poller::new().unwrap();
770        poller.add(listener.raw(), true, false).unwrap();
771
772        let _client = std::net::TcpStream::connect(("127.0.0.1", port)).unwrap();
773
774        let mut events = Vec::new();
775        let n = poller.wait(&mut events, Some(2000)).unwrap();
776        assert!(n >= 1, "expected a readiness event");
777        assert!(events.iter().any(|e| e.fd == listener.raw() && e.readable));
778
779        // Non-blocking accept should now succeed.
780        listener.accept().unwrap();
781    }
782
783    #[test]
784    fn waker_wakes_poller() {
785        let w = std::sync::Arc::new(waker().unwrap());
786        let poller = Poller::new().unwrap();
787        poller.add(w.read_fd(), true, false).unwrap();
788
789        let w2 = w.clone();
790        std::thread::spawn(move || w2.wake().unwrap());
791
792        let mut events = Vec::new();
793        let n = poller.wait(&mut events, Some(2000)).unwrap();
794        assert!(n >= 1, "waker should have woken the poller");
795        assert!(events.iter().any(|e| e.fd == w.read_fd() && e.readable));
796        w.drain();
797    }
798
799    #[test]
800    fn reuseport_allows_shared_port() {
801        let l1 = tcp_listen_reuseport([127, 0, 0, 1], 0, 16).unwrap();
802        let port = l1.local_port().unwrap();
803        // A second listener on the SAME port succeeds only because of SO_REUSEPORT.
804        let l2 = tcp_listen_reuseport([127, 0, 0, 1], port, 16).unwrap();
805        assert_eq!(l2.local_port().unwrap(), port);
806    }
807}