Skip to main content

kevy_sys/
lib.rs

1//! kevy-sys — kevy's network-boundary layer.
2//!
3//! One of kevy's three OS-boundary crates (alongside the publishable
4//! [`kevy-uring`](https://crates.io/crates/kevy-uring) and
5//! [`kevy-madvise`](https://crates.io/crates/kevy-madvise)). This is the
6//! server-internal piece — hand-curated to the exact subset of sockets and the
7//! readiness poller (kqueue on macOS, epoll on Linux) that kevy's server
8//! needs. Every binding is declared by hand with `unsafe extern "C"`
9//! (no `libc` crate, no third-party dep). On Linux these symbols resolve
10//! through glibc, on macOS through libSystem — both already linked by
11//! `std`, so we add zero dependencies.
12//!
13//! The poller here is *readiness*-based. The *completion*-based io_uring
14//! engine has moved to its own crate, [`kevy-uring`]; either can back
15//! the reactor on top ([kevy-net]), which exposes only a byte-level
16//! service contract.
17//!
18//! Part of the [kevy] key–value server; not a generic OS-binding library.
19//!
20//! [`kevy-uring`]: https://crates.io/crates/kevy-uring
21//!
22//! # Safety
23//!
24//! `unsafe` is confined to the private `ffi` module's `extern "C"` declarations
25//! and the thin wrappers that call them. The bindings match the platform libc
26//! ABI (socklen_t = `u32`; `struct sockaddr_in`, `kevent`, and `epoll_event`
27//! laid out per-OS/arch). All raw fds are owned by RAII types ([`Socket`],
28//! [`Poller`], [`Waker`]) that close on drop, and errors are read via
29//! `std::io::Error::last_os_error()`. The public API is safe.
30//!
31//! [kevy]: https://crates.io/crates/kevy
32//! [kevy-net]: https://crates.io/crates/kevy-net
33//!
34//! # Example
35//!
36//! ```no_run
37//! use kevy_sys::{Poller, tcp_listen};
38//!
39//! # fn main() -> std::io::Result<()> {
40//! let listener = tcp_listen([127, 0, 0, 1], 6379, 1024)?;
41//! listener.set_nonblocking()?;
42//!
43//! let poller = Poller::new()?;
44//! poller.add(listener.raw(), /* read */ true, /* write */ false)?;
45//!
46//! let mut events = Vec::new();
47//! poller.wait(&mut events, Some(1000))?; // block up to 1s
48//! for ev in &events {
49//!     if ev.fd == listener.raw() && ev.readable {
50//!         let conn = listener.accept()?;
51//!         conn.set_nodelay()?;
52//!         // ... read/write `conn` ...
53//!     }
54//! }
55//! # Ok(())
56//! # }
57//! ```
58
59use core::ffi::{c_int, c_void};
60use core::mem::size_of;
61use core::ptr;
62use std::io;
63
64pub(crate) mod ffi;
65
66#[cfg(any(target_os = "macos", target_os = "ios"))]
67mod poller_kq;
68#[cfg(target_os = "linux")]
69mod poller_ep;
70
71#[cfg(any(target_os = "macos", target_os = "ios"))]
72pub use poller_kq::Poller;
73#[cfg(target_os = "linux")]
74pub use poller_ep::Poller;
75
76// ---- constants -------------------------------------------------------------
77
78const AF_INET: c_int = 2;
79const SOCK_STREAM: c_int = 1;
80const IPPROTO_TCP: c_int = 6;
81const TCP_NODELAY: c_int = 1;
82const F_GETFL: c_int = 3;
83const F_SETFL: c_int = 4;
84
85#[cfg(target_os = "linux")]
86const SOL_SOCKET: c_int = 1;
87#[cfg(target_os = "linux")]
88const SO_REUSEADDR: c_int = 2;
89#[cfg(target_os = "linux")]
90const SO_REUSEPORT: c_int = 15;
91#[cfg(target_os = "linux")]
92const O_NONBLOCK: c_int = 0x800;
93
94#[cfg(any(target_os = "macos", target_os = "ios"))]
95const SOL_SOCKET: c_int = 0xffff;
96#[cfg(any(target_os = "macos", target_os = "ios"))]
97const SO_REUSEADDR: c_int = 0x0004;
98#[cfg(any(target_os = "macos", target_os = "ios"))]
99const SO_REUSEPORT: c_int = 0x0200;
100#[cfg(any(target_os = "macos", target_os = "ios"))]
101const O_NONBLOCK: c_int = 0x0004;
102
103// ---- sockaddr_in -----------------------------------------------------------
104
105#[cfg(target_os = "linux")]
106#[repr(C)]
107struct SockaddrIn {
108    sin_family: u16,
109    sin_port: u16,
110    sin_addr: u32,
111    sin_zero: [u8; 8],
112}
113
114// Field names mirror BSD's `<netinet/in.h>` struct sockaddr_in — the `sin_*`
115// prefix is the ABI; renaming would just obscure the libc binding.
116#[allow(clippy::struct_field_names)]
117#[cfg(any(target_os = "macos", target_os = "ios"))]
118#[repr(C)]
119struct SockaddrIn {
120    sin_len: u8,
121    sin_family: u8,
122    sin_port: u16,
123    sin_addr: u32,
124    sin_zero: [u8; 8],
125}
126
127impl SockaddrIn {
128    fn new(ip: [u8; 4], port: u16) -> Self {
129        #[cfg(target_os = "linux")]
130        return SockaddrIn {
131            sin_family: AF_INET as u16,
132            sin_port: port.to_be(),
133            sin_addr: u32::from_ne_bytes(ip),
134            sin_zero: [0; 8],
135        };
136        #[cfg(any(target_os = "macos", target_os = "ios"))]
137        return SockaddrIn {
138            sin_len: size_of::<SockaddrIn>() as u8,
139            sin_family: AF_INET as u8,
140            sin_port: port.to_be(),
141            sin_addr: u32::from_ne_bytes(ip),
142            sin_zero: [0; 8],
143        };
144    }
145
146    fn zeroed() -> Self {
147        unsafe { core::mem::zeroed() }
148    }
149}
150
151// ---- Socket ----------------------------------------------------------------
152
153/// An owned socket file descriptor. Closes itself on drop via our own `close`.
154pub struct Socket {
155    fd: c_int,
156}
157
158impl Socket {
159    /// The raw file descriptor. Borrowed — the `Socket` retains ownership.
160    #[inline]
161    pub fn raw(&self) -> i32 {
162        self.fd
163    }
164
165    /// Wrap an already-open fd (e.g. one accepted by io_uring) into an owning
166    /// `Socket` that closes it on drop.
167    ///
168    /// # Safety
169    /// `fd` must be a valid open descriptor whose ownership is transferred here.
170    #[inline]
171    pub unsafe fn from_raw_fd(fd: i32) -> Socket {
172        Socket { fd }
173    }
174
175    /// Accept one inbound connection. On a non-blocking listener with no pending
176    /// connection this returns `Err` with kind `WouldBlock`.
177    pub fn accept(&self) -> io::Result<Socket> {
178        let fd = unsafe { ffi::accept(self.fd, ptr::null_mut(), ptr::null_mut()) };
179        if fd < 0 {
180            return Err(io::Error::last_os_error());
181        }
182        Ok(Socket { fd })
183    }
184
185    /// Read into `buf`, returning the byte count (0 == EOF). Retries on EINTR.
186    /// On a non-blocking socket with no data, returns `WouldBlock`.
187    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
188        loop {
189            let n = unsafe { ffi::read(self.fd, buf.as_mut_ptr().cast::<c_void>(), buf.len()) };
190            if n < 0 {
191                let e = io::Error::last_os_error();
192                if e.kind() == io::ErrorKind::Interrupted {
193                    continue;
194                }
195                return Err(e);
196            }
197            return Ok(n as usize);
198        }
199    }
200
201    /// A single `write` syscall; may write fewer bytes than requested, or return
202    /// `WouldBlock` on a full non-blocking socket. Retries on EINTR.
203    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
204        loop {
205            let n = unsafe { ffi::write(self.fd, buf.as_ptr().cast::<c_void>(), buf.len()) };
206            if n < 0 {
207                let e = io::Error::last_os_error();
208                if e.kind() == io::ErrorKind::Interrupted {
209                    continue;
210                }
211                return Err(e);
212            }
213            return Ok(n as usize);
214        }
215    }
216
217    /// Write the whole buffer (blocking-socket convenience).
218    pub fn write_all(&self, mut buf: &[u8]) -> io::Result<()> {
219        while !buf.is_empty() {
220            let n = self.write(buf)?;
221            if n == 0 {
222                return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
223            }
224            buf = &buf[n..];
225        }
226        Ok(())
227    }
228
229    /// Put the socket into non-blocking mode (`O_NONBLOCK`).
230    pub fn set_nonblocking(&self) -> io::Result<()> {
231        set_fd_nonblocking(self.fd)
232    }
233
234    /// Disable Nagle's algorithm (`TCP_NODELAY`) for low-latency replies.
235    pub fn set_nodelay(&self) -> io::Result<()> {
236        let one: c_int = 1;
237        let r = unsafe {
238            ffi::setsockopt(
239                self.fd,
240                IPPROTO_TCP,
241                TCP_NODELAY,
242                (&raw const one).cast::<c_void>(),
243                size_of::<c_int>() as u32,
244            )
245        };
246        if r < 0 {
247            return Err(io::Error::last_os_error());
248        }
249        Ok(())
250    }
251
252    /// The local port this socket is bound to (host byte order).
253    pub fn local_port(&self) -> io::Result<u16> {
254        let mut addr = SockaddrIn::zeroed();
255        let mut len = size_of::<SockaddrIn>() as u32;
256        let r = unsafe {
257            ffi::getsockname(
258                self.fd,
259                (&raw mut addr).cast::<c_void>(),
260                &raw mut len,
261            )
262        };
263        if r < 0 {
264            return Err(io::Error::last_os_error());
265        }
266        Ok(u16::from_be(addr.sin_port))
267    }
268
269    /// The peer's IPv4 address + port. Used by the replication
270    /// listener to record the (ip, port) of every connected replica
271    /// for `INFO replication` / `ROLE` reporting. IPv4 only —
272    /// matches the rest of kevy-sys, which has not yet grown IPv6.
273    pub fn peer_addr(&self) -> io::Result<(std::net::Ipv4Addr, u16)> {
274        let mut addr = SockaddrIn::zeroed();
275        let mut len = size_of::<SockaddrIn>() as u32;
276        let r = unsafe {
277            ffi::getpeername(
278                self.fd,
279                (&raw mut addr).cast::<c_void>(),
280                &raw mut len,
281            )
282        };
283        if r < 0 {
284            return Err(io::Error::last_os_error());
285        }
286        // `sin_addr` is stored in network byte order as a u32 of the
287        // packed octets — `u32::from_ne_bytes(ip)` on construction
288        // (see [`SockaddrIn::new_v4`]) is the inverse of the bytes
289        // we want here.
290        let octets = addr.sin_addr.to_ne_bytes();
291        Ok((std::net::Ipv4Addr::from(octets), u16::from_be(addr.sin_port)))
292    }
293}
294
295impl Drop for Socket {
296    fn drop(&mut self) {
297        unsafe {
298            ffi::close(self.fd);
299        }
300    }
301}
302
303/// Set `O_NONBLOCK` on a raw fd (sockets and pipe ends alike).
304fn set_fd_nonblocking(fd: c_int) -> io::Result<()> {
305    let flags = unsafe { ffi::fcntl(fd, F_GETFL, 0) };
306    if flags < 0 {
307        return Err(io::Error::last_os_error());
308    }
309    if unsafe { ffi::fcntl(fd, F_SETFL, flags | O_NONBLOCK) } < 0 {
310        return Err(io::Error::last_os_error());
311    }
312    Ok(())
313}
314
315fn setsockopt_int(fd: c_int, level: c_int, name: c_int, val: c_int) -> io::Result<()> {
316    let r = unsafe {
317        ffi::setsockopt(
318            fd,
319            level,
320            name,
321            (&raw const val).cast::<c_void>(),
322            size_of::<c_int>() as u32,
323        )
324    };
325    if r < 0 {
326        return Err(io::Error::last_os_error());
327    }
328    Ok(())
329}
330
331fn listen_inner(ip: [u8; 4], port: u16, backlog: i32, reuseport: bool) -> io::Result<Socket> {
332    let fd = unsafe { ffi::socket(AF_INET, SOCK_STREAM, 0) };
333    if fd < 0 {
334        return Err(io::Error::last_os_error());
335    }
336    let sock = Socket { fd }; // owns fd: any early return closes it
337
338    setsockopt_int(fd, SOL_SOCKET, SO_REUSEADDR, 1)?;
339    if reuseport {
340        // Each thread-per-core shard opens its own listener on the same port;
341        // the kernel load-balances accepted connections across them.
342        setsockopt_int(fd, SOL_SOCKET, SO_REUSEPORT, 1)?;
343    }
344
345    let addr = SockaddrIn::new(ip, port);
346    let r = unsafe {
347        ffi::bind(
348            fd,
349            (&raw const addr).cast::<c_void>(),
350            size_of::<SockaddrIn>() as u32,
351        )
352    };
353    if r < 0 {
354        return Err(io::Error::last_os_error());
355    }
356    if unsafe { ffi::listen(fd, backlog) } < 0 {
357        return Err(io::Error::last_os_error());
358    }
359    Ok(sock)
360}
361
362/// Create a blocking IPv4 TCP listener bound to `ip:port` with `SO_REUSEADDR`.
363/// Pass `port == 0` to let the OS assign an ephemeral port.
364pub fn tcp_listen(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
365    listen_inner(ip, port, backlog, false)
366}
367
368/// Like [`tcp_listen`] but also sets `SO_REUSEPORT`, so multiple listeners can
369/// share one port (one per thread-per-core shard).
370pub fn tcp_listen_reuseport(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
371    listen_inner(ip, port, backlog, true)
372}
373
374/// A self-pipe used to wake a [`Poller`] blocked in `wait` from another thread.
375/// Register `read_fd()` in the poller for read-readiness; call `wake()` from any
376/// thread to make the poll return; call `drain()` when the read end fires.
377pub struct Waker {
378    read_fd: c_int,
379    write_fd: c_int,
380}
381
382/// Create a non-blocking self-pipe waker.
383pub fn waker() -> io::Result<Waker> {
384    let mut fds = [0 as c_int; 2];
385    if unsafe { ffi::pipe(fds.as_mut_ptr()) } < 0 {
386        return Err(io::Error::last_os_error());
387    }
388    let w = Waker {
389        read_fd: fds[0],
390        write_fd: fds[1],
391    };
392    set_fd_nonblocking(w.read_fd)?;
393    set_fd_nonblocking(w.write_fd)?;
394    Ok(w)
395}
396
397impl Waker {
398    /// The read end — register this in a [`Poller`] for read-readiness.
399    #[inline]
400    pub fn read_fd(&self) -> i32 {
401        self.read_fd
402    }
403
404    /// Signal the waker. A full pipe already means "pending", so EAGAIN is fine.
405    pub fn wake(&self) -> io::Result<()> {
406        let byte = [1u8];
407        loop {
408            let n = unsafe { ffi::write(self.write_fd, byte.as_ptr().cast::<c_void>(), 1) };
409            if n < 0 {
410                let e = io::Error::last_os_error();
411                match e.kind() {
412                    io::ErrorKind::Interrupted => continue,
413                    io::ErrorKind::WouldBlock => return Ok(()),
414                    _ => return Err(e),
415                }
416            }
417            return Ok(());
418        }
419    }
420
421    /// Consume all pending wake bytes after the read end fires.
422    pub fn drain(&self) {
423        let mut buf = [0u8; 64];
424        loop {
425            let n = unsafe { ffi::read(self.read_fd, buf.as_mut_ptr().cast::<c_void>(), buf.len()) };
426            if n <= 0 {
427                break; // EAGAIN / EOF / error — nothing more to drain
428            }
429        }
430    }
431}
432
433impl Drop for Waker {
434    fn drop(&mut self) {
435        unsafe {
436            ffi::close(self.read_fd);
437            ffi::close(self.write_fd);
438        }
439    }
440}
441
442// The pipe ends are plain fds with no aliasing; safe to move across threads.
443unsafe impl Send for Waker {}
444unsafe impl Sync for Waker {}
445
446// ---- Poller ----------------------------------------------------------------
447
448/// A readiness notification for one file descriptor.
449#[derive(Debug, Clone, Copy)]
450pub struct Event {
451    pub fd: i32,
452    pub readable: bool,
453    pub writable: bool,
454    /// Peer hang-up / error — the connection should be closed.
455    pub hup: bool,
456}
457
458/// How many raw events to pull from the kernel per `wait` call.
459const WAIT_CAPACITY: usize = 1024;
460
461
462
463#[cfg(test)]
464mod tests;