Skip to main content

kevy_sys/
lib.rs

1//! kevy-sys — kevy's network-boundary layer.
2//!
3//! One of kevy's three OS-boundary crates (alongside the publishable
4//! [`kevy-uring`](https://crates.io/crates/kevy-uring) and
5//! [`kevy-madvise`](https://crates.io/crates/kevy-madvise)). This is the
6//! server-internal piece — hand-curated to the exact subset of sockets and the
7//! readiness poller (kqueue on macOS, epoll on Linux) that kevy's server
8//! needs. Every binding is declared by hand with `unsafe extern "C"`
9//! (no `libc` crate, no third-party dep). On Linux these symbols resolve
10//! through glibc, on macOS through libSystem — both already linked by
11//! `std`, so we add zero dependencies.
12//!
13//! The poller here is *readiness*-based. The *completion*-based io_uring
14//! engine has moved to its own crate, [`kevy-uring`]; either can back
15//! the reactor on top ([kevy-net]), which exposes only a byte-level
16//! service contract.
17//!
18//! Part of the [kevy] key–value server; not a generic OS-binding library.
19//!
20//! [`kevy-uring`]: https://crates.io/crates/kevy-uring
21//!
22//! # Safety
23//!
24//! `unsafe` is confined to the private `ffi` module's `extern "C"` declarations
25//! and the thin wrappers that call them. The bindings match the platform libc
26//! ABI (socklen_t = `u32`; `struct sockaddr_in`, `kevent`, and `epoll_event`
27//! laid out per-OS/arch). All raw fds are owned by RAII types ([`Socket`],
28//! [`Poller`], [`Waker`]) that close on drop, and errors are read via
29//! `std::io::Error::last_os_error()`. The public API is safe.
30//!
31//! [kevy]: https://crates.io/crates/kevy
32//! [kevy-net]: https://crates.io/crates/kevy-net
33//!
34//! # Example
35//!
36//! ```no_run
37//! use kevy_sys::{Poller, tcp_listen};
38//!
39//! # fn main() -> std::io::Result<()> {
40//! let listener = tcp_listen([127, 0, 0, 1], 6379, 1024)?;
41//! listener.set_nonblocking()?;
42//!
43//! let poller = Poller::new()?;
44//! poller.add(listener.raw(), /* read */ true, /* write */ false)?;
45//!
46//! let mut events = Vec::new();
47//! poller.wait(&mut events, Some(1000))?; // block up to 1s
48//! for ev in &events {
49//!     if ev.fd == listener.raw() && ev.readable {
50//!         let conn = listener.accept()?;
51//!         conn.set_nodelay()?;
52//!         // ... read/write `conn` ...
53//!     }
54//! }
55//! # Ok(())
56//! # }
57//! ```
58
59use core::ffi::{c_int, c_void};
60use core::mem::size_of;
61use core::ptr;
62use std::io;
63
64pub(crate) mod ffi;
65
66#[cfg(any(target_os = "macos", target_os = "ios"))]
67mod poller_kq;
68#[cfg(target_os = "linux")]
69mod poller_ep;
70
71#[cfg(any(target_os = "macos", target_os = "ios"))]
72pub use poller_kq::Poller;
73#[cfg(target_os = "linux")]
74pub use poller_ep::Poller;
75
76// ---- constants -------------------------------------------------------------
77
78const AF_INET: c_int = 2;
79const AF_UNIX: c_int = 1;
80const SOCK_STREAM: c_int = 1;
81const IPPROTO_TCP: c_int = 6;
82const TCP_NODELAY: c_int = 1;
83const F_GETFL: c_int = 3;
84const F_SETFL: c_int = 4;
85
86#[cfg(target_os = "linux")]
87const SOL_SOCKET: c_int = 1;
88#[cfg(target_os = "linux")]
89const SO_REUSEADDR: c_int = 2;
90#[cfg(target_os = "linux")]
91const SO_REUSEPORT: c_int = 15;
92#[cfg(target_os = "linux")]
93const O_NONBLOCK: c_int = 0x800;
94
95#[cfg(any(target_os = "macos", target_os = "ios"))]
96const SOL_SOCKET: c_int = 0xffff;
97#[cfg(any(target_os = "macos", target_os = "ios"))]
98const SO_REUSEADDR: c_int = 0x0004;
99#[cfg(any(target_os = "macos", target_os = "ios"))]
100const SO_REUSEPORT: c_int = 0x0200;
101#[cfg(any(target_os = "macos", target_os = "ios"))]
102const O_NONBLOCK: c_int = 0x0004;
103
104// ---- sockaddr_in -----------------------------------------------------------
105
106#[cfg(target_os = "linux")]
107#[repr(C)]
108struct SockaddrIn {
109    sin_family: u16,
110    sin_port: u16,
111    sin_addr: u32,
112    sin_zero: [u8; 8],
113}
114
115// Field names mirror BSD's `<netinet/in.h>` struct sockaddr_in — the `sin_*`
116// prefix is the ABI; renaming would just obscure the libc binding.
117#[allow(clippy::struct_field_names)]
118#[cfg(any(target_os = "macos", target_os = "ios"))]
119#[repr(C)]
120struct SockaddrIn {
121    sin_len: u8,
122    sin_family: u8,
123    sin_port: u16,
124    sin_addr: u32,
125    sin_zero: [u8; 8],
126}
127
128impl SockaddrIn {
129    fn new(ip: [u8; 4], port: u16) -> Self {
130        #[cfg(target_os = "linux")]
131        return SockaddrIn {
132            sin_family: AF_INET as u16,
133            sin_port: port.to_be(),
134            sin_addr: u32::from_ne_bytes(ip),
135            sin_zero: [0; 8],
136        };
137        #[cfg(any(target_os = "macos", target_os = "ios"))]
138        return SockaddrIn {
139            sin_len: size_of::<SockaddrIn>() as u8,
140            sin_family: AF_INET as u8,
141            sin_port: port.to_be(),
142            sin_addr: u32::from_ne_bytes(ip),
143            sin_zero: [0; 8],
144        };
145    }
146
147    fn zeroed() -> Self {
148        unsafe { core::mem::zeroed() }
149    }
150}
151
152// ---- Socket ----------------------------------------------------------------
153
154/// An owned socket file descriptor. Closes itself on drop via our own `close`.
155pub struct Socket {
156    fd: c_int,
157}
158
159impl Socket {
160    /// The raw file descriptor. Borrowed — the `Socket` retains ownership.
161    #[inline]
162    pub fn raw(&self) -> i32 {
163        self.fd
164    }
165
166    /// Wrap an already-open fd (e.g. one accepted by io_uring) into an owning
167    /// `Socket` that closes it on drop.
168    ///
169    /// # Safety
170    /// `fd` must be a valid open descriptor whose ownership is transferred here.
171    #[inline]
172    pub unsafe fn from_raw_fd(fd: i32) -> Socket {
173        Socket { fd }
174    }
175
176    /// Accept one inbound connection. On a non-blocking listener with no pending
177    /// connection this returns `Err` with kind `WouldBlock`.
178    pub fn accept(&self) -> io::Result<Socket> {
179        let fd = unsafe { ffi::accept(self.fd, ptr::null_mut(), ptr::null_mut()) };
180        if fd < 0 {
181            return Err(io::Error::last_os_error());
182        }
183        Ok(Socket { fd })
184    }
185
186    /// Read into `buf`, returning the byte count (0 == EOF). Retries on EINTR.
187    /// On a non-blocking socket with no data, returns `WouldBlock`.
188    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
189        loop {
190            let n = unsafe { ffi::read(self.fd, buf.as_mut_ptr().cast::<c_void>(), buf.len()) };
191            if n < 0 {
192                let e = io::Error::last_os_error();
193                if e.kind() == io::ErrorKind::Interrupted {
194                    continue;
195                }
196                return Err(e);
197            }
198            return Ok(n as usize);
199        }
200    }
201
202    /// A single `write` syscall; may write fewer bytes than requested, or return
203    /// `WouldBlock` on a full non-blocking socket. Retries on EINTR.
204    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
205        loop {
206            let n = unsafe { ffi::write(self.fd, buf.as_ptr().cast::<c_void>(), buf.len()) };
207            if n < 0 {
208                let e = io::Error::last_os_error();
209                if e.kind() == io::ErrorKind::Interrupted {
210                    continue;
211                }
212                return Err(e);
213            }
214            return Ok(n as usize);
215        }
216    }
217
218    /// Write the whole buffer (blocking-socket convenience).
219    pub fn write_all(&self, mut buf: &[u8]) -> io::Result<()> {
220        while !buf.is_empty() {
221            let n = self.write(buf)?;
222            if n == 0 {
223                return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
224            }
225            buf = &buf[n..];
226        }
227        Ok(())
228    }
229
230    /// Put the socket into non-blocking mode (`O_NONBLOCK`).
231    pub fn set_nonblocking(&self) -> io::Result<()> {
232        set_fd_nonblocking(self.fd)
233    }
234
235    /// Disable Nagle's algorithm (`TCP_NODELAY`) for low-latency replies.
236    pub fn set_nodelay(&self) -> io::Result<()> {
237        let one: c_int = 1;
238        let r = unsafe {
239            ffi::setsockopt(
240                self.fd,
241                IPPROTO_TCP,
242                TCP_NODELAY,
243                (&raw const one).cast::<c_void>(),
244                size_of::<c_int>() as u32,
245            )
246        };
247        if r < 0 {
248            return Err(io::Error::last_os_error());
249        }
250        Ok(())
251    }
252
253    /// The local port this socket is bound to (host byte order).
254    pub fn local_port(&self) -> io::Result<u16> {
255        let mut addr = SockaddrIn::zeroed();
256        let mut len = size_of::<SockaddrIn>() as u32;
257        let r = unsafe {
258            ffi::getsockname(
259                self.fd,
260                (&raw mut addr).cast::<c_void>(),
261                &raw mut len,
262            )
263        };
264        if r < 0 {
265            return Err(io::Error::last_os_error());
266        }
267        Ok(u16::from_be(addr.sin_port))
268    }
269
270    /// The peer's IPv4 address + port. Used by the replication
271    /// listener to record the (ip, port) of every connected replica
272    /// for `INFO replication` / `ROLE` reporting. IPv4 only —
273    /// matches the rest of kevy-sys, which has not yet grown IPv6.
274    pub fn peer_addr(&self) -> io::Result<(std::net::Ipv4Addr, u16)> {
275        let mut addr = SockaddrIn::zeroed();
276        let mut len = size_of::<SockaddrIn>() as u32;
277        let r = unsafe {
278            ffi::getpeername(
279                self.fd,
280                (&raw mut addr).cast::<c_void>(),
281                &raw mut len,
282            )
283        };
284        if r < 0 {
285            return Err(io::Error::last_os_error());
286        }
287        // `sin_addr` is stored in network byte order as a u32 of the
288        // packed octets — `u32::from_ne_bytes(ip)` on construction
289        // (see [`SockaddrIn::new_v4`]) is the inverse of the bytes
290        // we want here.
291        let octets = addr.sin_addr.to_ne_bytes();
292        Ok((std::net::Ipv4Addr::from(octets), u16::from_be(addr.sin_port)))
293    }
294}
295
296impl Drop for Socket {
297    fn drop(&mut self) {
298        unsafe {
299            ffi::close(self.fd);
300        }
301    }
302}
303
304/// Set `O_NONBLOCK` on a raw fd (sockets and pipe ends alike).
305fn set_fd_nonblocking(fd: c_int) -> io::Result<()> {
306    let flags = unsafe { ffi::fcntl(fd, F_GETFL, 0) };
307    if flags < 0 {
308        return Err(io::Error::last_os_error());
309    }
310    if unsafe { ffi::fcntl(fd, F_SETFL, flags | O_NONBLOCK) } < 0 {
311        return Err(io::Error::last_os_error());
312    }
313    Ok(())
314}
315
316fn setsockopt_int(fd: c_int, level: c_int, name: c_int, val: c_int) -> io::Result<()> {
317    let r = unsafe {
318        ffi::setsockopt(
319            fd,
320            level,
321            name,
322            (&raw const val).cast::<c_void>(),
323            size_of::<c_int>() as u32,
324        )
325    };
326    if r < 0 {
327        return Err(io::Error::last_os_error());
328    }
329    Ok(())
330}
331
332fn listen_inner(ip: [u8; 4], port: u16, backlog: i32, reuseport: bool) -> io::Result<Socket> {
333    let fd = unsafe { ffi::socket(AF_INET, SOCK_STREAM, 0) };
334    if fd < 0 {
335        return Err(io::Error::last_os_error());
336    }
337    let sock = Socket { fd }; // owns fd: any early return closes it
338
339    setsockopt_int(fd, SOL_SOCKET, SO_REUSEADDR, 1)?;
340    if reuseport {
341        // Each thread-per-core shard opens its own listener on the same port;
342        // the kernel load-balances accepted connections across them.
343        setsockopt_int(fd, SOL_SOCKET, SO_REUSEPORT, 1)?;
344    }
345
346    let addr = SockaddrIn::new(ip, port);
347    let r = unsafe {
348        ffi::bind(
349            fd,
350            (&raw const addr).cast::<c_void>(),
351            size_of::<SockaddrIn>() as u32,
352        )
353    };
354    if r < 0 {
355        return Err(io::Error::last_os_error());
356    }
357    if unsafe { ffi::listen(fd, backlog) } < 0 {
358        return Err(io::Error::last_os_error());
359    }
360    Ok(sock)
361}
362
363/// Create a blocking IPv4 TCP listener bound to `ip:port` with `SO_REUSEADDR`.
364/// Pass `port == 0` to let the OS assign an ephemeral port.
365pub fn tcp_listen(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
366    listen_inner(ip, port, backlog, false)
367}
368
369/// Like [`tcp_listen`] but also sets `SO_REUSEPORT`, so multiple listeners can
370/// share one port (one per thread-per-core shard).
371pub fn tcp_listen_reuseport(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
372    listen_inner(ip, port, backlog, true)
373}
374
375// ---- sockaddr_un (AF_UNIX) -------------------------------------------------
376
377/// Unix-domain `sockaddr_un`. sun_path is 108 bytes on Linux + macOS BSD.
378#[repr(C)]
379struct SockaddrUn {
380    #[cfg(target_os = "linux")]
381    sun_family: u16,
382    #[cfg(not(target_os = "linux"))]
383    sun_len: u8,
384    #[cfg(not(target_os = "linux"))]
385    sun_family: u8,
386    sun_path: [u8; 108],
387}
388
389impl SockaddrUn {
390    fn new(path: &[u8]) -> io::Result<(Self, u32)> {
391        if path.is_empty() || path.len() >= 108 {
392            return Err(io::Error::new(
393                io::ErrorKind::InvalidInput,
394                "unix socket path must be 1..=107 bytes",
395            ));
396        }
397        let mut sun_path = [0u8; 108];
398        sun_path[..path.len()].copy_from_slice(path);
399        // The actual length passed to bind() is offset_of(sun_path) + strlen(path) + 1
400        // (for the NUL); using full struct size also works on Linux + BSD.
401        let sa = SockaddrUn {
402            #[cfg(target_os = "linux")]
403            sun_family: AF_UNIX as u16,
404            #[cfg(not(target_os = "linux"))]
405            sun_len: size_of::<SockaddrUn>() as u8,
406            #[cfg(not(target_os = "linux"))]
407            sun_family: AF_UNIX as u8,
408            sun_path,
409        };
410        Ok((sa, size_of::<SockaddrUn>() as u32))
411    }
412}
413
414/// Create a blocking AF_UNIX stream listener bound to `path`. Unlinks any
415/// existing file at the path first (mirroring valkey/redis's `unixsocket`
416/// option). UDS bypasses the TCP stack — useful when client+server are on
417/// the same host and the TCP loopback round-trip is the bench-shape floor.
418pub fn unix_listen(path: &[u8], backlog: i32) -> io::Result<Socket> {
419    // Best-effort unlink so subsequent bind doesn't EADDRINUSE on restart.
420    // Convert path to a NUL-terminated CString for libc::unlink.
421    if let Ok(c) = std::ffi::CString::new(path) {
422        unsafe {
423            ffi::unlink(c.as_ptr());
424        }
425    }
426
427    let fd = unsafe { ffi::socket(AF_UNIX, SOCK_STREAM, 0) };
428    if fd < 0 {
429        return Err(io::Error::last_os_error());
430    }
431    let sock = Socket { fd };
432
433    let (addr, len) = SockaddrUn::new(path)?;
434    let r = unsafe {
435        ffi::bind(
436            fd,
437            (&raw const addr).cast::<c_void>(),
438            len,
439        )
440    };
441    if r < 0 {
442        return Err(io::Error::last_os_error());
443    }
444    if unsafe { ffi::listen(fd, backlog) } < 0 {
445        return Err(io::Error::last_os_error());
446    }
447    // World-writable so clients with different uid can connect (redis SOP).
448    // Use libc::chmod via CString.
449    if let Ok(c) = std::ffi::CString::new(path) {
450        unsafe { ffi::chmod(c.as_ptr(), 0o777) };
451    }
452    Ok(sock)
453}
454
455/// A self-pipe used to wake a [`Poller`] blocked in `wait` from another thread.
456/// Register `read_fd()` in the poller for read-readiness; call `wake()` from any
457/// thread to make the poll return; call `drain()` when the read end fires.
458pub struct Waker {
459    read_fd: c_int,
460    write_fd: c_int,
461}
462
463/// Create a non-blocking self-pipe waker.
464pub fn waker() -> io::Result<Waker> {
465    let mut fds = [0 as c_int; 2];
466    if unsafe { ffi::pipe(fds.as_mut_ptr()) } < 0 {
467        return Err(io::Error::last_os_error());
468    }
469    let w = Waker {
470        read_fd: fds[0],
471        write_fd: fds[1],
472    };
473    set_fd_nonblocking(w.read_fd)?;
474    set_fd_nonblocking(w.write_fd)?;
475    Ok(w)
476}
477
478impl Waker {
479    /// The read end — register this in a [`Poller`] for read-readiness.
480    #[inline]
481    pub fn read_fd(&self) -> i32 {
482        self.read_fd
483    }
484
485    /// Signal the waker. A full pipe already means "pending", so EAGAIN is fine.
486    pub fn wake(&self) -> io::Result<()> {
487        let byte = [1u8];
488        loop {
489            let n = unsafe { ffi::write(self.write_fd, byte.as_ptr().cast::<c_void>(), 1) };
490            if n < 0 {
491                let e = io::Error::last_os_error();
492                match e.kind() {
493                    io::ErrorKind::Interrupted => continue,
494                    io::ErrorKind::WouldBlock => return Ok(()),
495                    _ => return Err(e),
496                }
497            }
498            return Ok(());
499        }
500    }
501
502    /// Consume all pending wake bytes after the read end fires.
503    pub fn drain(&self) {
504        let mut buf = [0u8; 64];
505        loop {
506            let n = unsafe { ffi::read(self.read_fd, buf.as_mut_ptr().cast::<c_void>(), buf.len()) };
507            if n <= 0 {
508                break; // EAGAIN / EOF / error — nothing more to drain
509            }
510        }
511    }
512}
513
514impl Drop for Waker {
515    fn drop(&mut self) {
516        unsafe {
517            ffi::close(self.read_fd);
518            ffi::close(self.write_fd);
519        }
520    }
521}
522
523// The pipe ends are plain fds with no aliasing; safe to move across threads.
524unsafe impl Send for Waker {}
525unsafe impl Sync for Waker {}
526
527// ---- Poller ----------------------------------------------------------------
528
529/// A readiness notification for one file descriptor.
530#[derive(Debug, Clone, Copy)]
531pub struct Event {
532    pub fd: i32,
533    pub readable: bool,
534    pub writable: bool,
535    /// Peer hang-up / error — the connection should be closed.
536    pub hup: bool,
537}
538
539/// How many raw events to pull from the kernel per `wait` call.
540const WAIT_CAPACITY: usize = 1024;
541
542
543
544#[cfg(test)]
545mod tests;