Skip to main content

kevy_sys/
lib.rs

1//! kevy-sys — kevy's network-boundary layer.
2//!
3//! One of kevy's three OS-boundary crates (alongside the publishable
4//! [`kevy-uring`](https://crates.io/crates/kevy-uring) and
5//! [`kevy-madvise`](https://crates.io/crates/kevy-madvise)). This is the
6//! server-internal piece — hand-curated to the exact subset of sockets and the
7//! readiness poller (kqueue on macOS, epoll on Linux) that kevy's server
8//! needs. Every binding is declared by hand with `unsafe extern "C"`
9//! (no `libc` crate, no third-party dep). On Linux these symbols resolve
10//! through glibc, on macOS through libSystem — both already linked by
11//! `std`, so we add zero dependencies.
12//!
13//! The poller here is *readiness*-based. The *completion*-based io_uring
14//! engine has moved to its own crate, [`kevy-uring`]; either can back
15//! the reactor on top ([kevy-net]), which exposes only a byte-level
16//! service contract.
17//!
18//! Part of the [kevy] key–value server; not a generic OS-binding library.
19//!
20//! [`kevy-uring`]: https://crates.io/crates/kevy-uring
21//!
22//! # Safety
23//!
24//! `unsafe` is confined to the private `ffi` module's `extern "C"` declarations
25//! and the thin wrappers that call them. The bindings match the platform libc
26//! ABI (socklen_t = `u32`; `struct sockaddr_in`, `kevent`, and `epoll_event`
27//! laid out per-OS/arch). All raw fds are owned by RAII types ([`Socket`],
28//! [`Poller`], [`Waker`]) that close on drop, and errors are read via
29//! `std::io::Error::last_os_error()`. The public API is safe.
30//!
31//! [kevy]: https://crates.io/crates/kevy
32//! [kevy-net]: https://crates.io/crates/kevy-net
33//!
34//! # Example
35//!
36//! ```no_run
37//! use kevy_sys::{Poller, tcp_listen};
38//!
39//! # fn main() -> std::io::Result<()> {
40//! let listener = tcp_listen([127, 0, 0, 1], 6379, 1024)?;
41//! listener.set_nonblocking()?;
42//!
43//! let poller = Poller::new()?;
44//! poller.add(listener.raw(), /* read */ true, /* write */ false)?;
45//!
46//! let mut events = Vec::new();
47//! poller.wait(&mut events, Some(1000))?; // block up to 1s
48//! for ev in &events {
49//!     if ev.fd == listener.raw() && ev.readable {
50//!         let conn = listener.accept()?;
51//!         conn.set_nodelay()?;
52//!         // ... read/write `conn` ...
53//!     }
54//! }
55//! # Ok(())
56//! # }
57//! ```
58
59use core::ffi::{c_int, c_void};
60use core::mem::size_of;
61use core::ptr;
62use std::io;
63
64pub(crate) mod ffi;
65
66#[cfg(any(target_os = "macos", target_os = "ios"))]
67mod poller_kq;
68#[cfg(target_os = "linux")]
69mod poller_ep;
70
71#[cfg(any(target_os = "macos", target_os = "ios"))]
72pub use poller_kq::Poller;
73#[cfg(target_os = "linux")]
74pub use poller_ep::Poller;
75
76// ---- constants -------------------------------------------------------------
77
78const AF_INET: c_int = 2;
79const SOCK_STREAM: c_int = 1;
80const IPPROTO_TCP: c_int = 6;
81const TCP_NODELAY: c_int = 1;
82const F_GETFL: c_int = 3;
83const F_SETFL: c_int = 4;
84
85#[cfg(target_os = "linux")]
86const SOL_SOCKET: c_int = 1;
87#[cfg(target_os = "linux")]
88const SO_REUSEADDR: c_int = 2;
89#[cfg(target_os = "linux")]
90const SO_REUSEPORT: c_int = 15;
91#[cfg(target_os = "linux")]
92const O_NONBLOCK: c_int = 0x800;
93
94#[cfg(any(target_os = "macos", target_os = "ios"))]
95const SOL_SOCKET: c_int = 0xffff;
96#[cfg(any(target_os = "macos", target_os = "ios"))]
97const SO_REUSEADDR: c_int = 0x0004;
98#[cfg(any(target_os = "macos", target_os = "ios"))]
99const SO_REUSEPORT: c_int = 0x0200;
100#[cfg(any(target_os = "macos", target_os = "ios"))]
101const O_NONBLOCK: c_int = 0x0004;
102
103// ---- sockaddr_in -----------------------------------------------------------
104
105#[cfg(target_os = "linux")]
106#[repr(C)]
107struct SockaddrIn {
108    sin_family: u16,
109    sin_port: u16,
110    sin_addr: u32,
111    sin_zero: [u8; 8],
112}
113
114#[cfg(any(target_os = "macos", target_os = "ios"))]
115#[repr(C)]
116struct SockaddrIn {
117    sin_len: u8,
118    sin_family: u8,
119    sin_port: u16,
120    sin_addr: u32,
121    sin_zero: [u8; 8],
122}
123
124impl SockaddrIn {
125    fn new(ip: [u8; 4], port: u16) -> Self {
126        #[cfg(target_os = "linux")]
127        return SockaddrIn {
128            sin_family: AF_INET as u16,
129            sin_port: port.to_be(),
130            sin_addr: u32::from_ne_bytes(ip),
131            sin_zero: [0; 8],
132        };
133        #[cfg(any(target_os = "macos", target_os = "ios"))]
134        return SockaddrIn {
135            sin_len: size_of::<SockaddrIn>() as u8,
136            sin_family: AF_INET as u8,
137            sin_port: port.to_be(),
138            sin_addr: u32::from_ne_bytes(ip),
139            sin_zero: [0; 8],
140        };
141    }
142
143    fn zeroed() -> Self {
144        unsafe { core::mem::zeroed() }
145    }
146}
147
148// ---- Socket ----------------------------------------------------------------
149
150/// An owned socket file descriptor. Closes itself on drop via our own `close`.
151pub struct Socket {
152    fd: c_int,
153}
154
155impl Socket {
156    /// The raw file descriptor. Borrowed — the `Socket` retains ownership.
157    #[inline]
158    pub fn raw(&self) -> i32 {
159        self.fd
160    }
161
162    /// Wrap an already-open fd (e.g. one accepted by io_uring) into an owning
163    /// `Socket` that closes it on drop.
164    ///
165    /// # Safety
166    /// `fd` must be a valid open descriptor whose ownership is transferred here.
167    #[inline]
168    pub unsafe fn from_raw_fd(fd: i32) -> Socket {
169        Socket { fd }
170    }
171
172    /// Accept one inbound connection. On a non-blocking listener with no pending
173    /// connection this returns `Err` with kind `WouldBlock`.
174    pub fn accept(&self) -> io::Result<Socket> {
175        let fd = unsafe { ffi::accept(self.fd, ptr::null_mut(), ptr::null_mut()) };
176        if fd < 0 {
177            return Err(io::Error::last_os_error());
178        }
179        Ok(Socket { fd })
180    }
181
182    /// Read into `buf`, returning the byte count (0 == EOF). Retries on EINTR.
183    /// On a non-blocking socket with no data, returns `WouldBlock`.
184    pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
185        loop {
186            let n = unsafe { ffi::read(self.fd, buf.as_mut_ptr() as *mut c_void, buf.len()) };
187            if n < 0 {
188                let e = io::Error::last_os_error();
189                if e.kind() == io::ErrorKind::Interrupted {
190                    continue;
191                }
192                return Err(e);
193            }
194            return Ok(n as usize);
195        }
196    }
197
198    /// A single `write` syscall; may write fewer bytes than requested, or return
199    /// `WouldBlock` on a full non-blocking socket. Retries on EINTR.
200    pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
201        loop {
202            let n = unsafe { ffi::write(self.fd, buf.as_ptr() as *const c_void, buf.len()) };
203            if n < 0 {
204                let e = io::Error::last_os_error();
205                if e.kind() == io::ErrorKind::Interrupted {
206                    continue;
207                }
208                return Err(e);
209            }
210            return Ok(n as usize);
211        }
212    }
213
214    /// Write the whole buffer (blocking-socket convenience).
215    pub fn write_all(&self, mut buf: &[u8]) -> io::Result<()> {
216        while !buf.is_empty() {
217            let n = self.write(buf)?;
218            if n == 0 {
219                return Err(io::Error::new(io::ErrorKind::WriteZero, "write returned 0"));
220            }
221            buf = &buf[n..];
222        }
223        Ok(())
224    }
225
226    /// Put the socket into non-blocking mode (`O_NONBLOCK`).
227    pub fn set_nonblocking(&self) -> io::Result<()> {
228        set_fd_nonblocking(self.fd)
229    }
230
231    /// Disable Nagle's algorithm (`TCP_NODELAY`) for low-latency replies.
232    pub fn set_nodelay(&self) -> io::Result<()> {
233        let one: c_int = 1;
234        let r = unsafe {
235            ffi::setsockopt(
236                self.fd,
237                IPPROTO_TCP,
238                TCP_NODELAY,
239                &one as *const c_int as *const c_void,
240                size_of::<c_int>() as u32,
241            )
242        };
243        if r < 0 {
244            return Err(io::Error::last_os_error());
245        }
246        Ok(())
247    }
248
249    /// The local port this socket is bound to (host byte order).
250    pub fn local_port(&self) -> io::Result<u16> {
251        let mut addr = SockaddrIn::zeroed();
252        let mut len = size_of::<SockaddrIn>() as u32;
253        let r = unsafe {
254            ffi::getsockname(
255                self.fd,
256                &mut addr as *mut SockaddrIn as *mut c_void,
257                &mut len,
258            )
259        };
260        if r < 0 {
261            return Err(io::Error::last_os_error());
262        }
263        Ok(u16::from_be(addr.sin_port))
264    }
265}
266
267impl Drop for Socket {
268    fn drop(&mut self) {
269        unsafe {
270            ffi::close(self.fd);
271        }
272    }
273}
274
275/// Set `O_NONBLOCK` on a raw fd (sockets and pipe ends alike).
276fn set_fd_nonblocking(fd: c_int) -> io::Result<()> {
277    let flags = unsafe { ffi::fcntl(fd, F_GETFL, 0) };
278    if flags < 0 {
279        return Err(io::Error::last_os_error());
280    }
281    if unsafe { ffi::fcntl(fd, F_SETFL, flags | O_NONBLOCK) } < 0 {
282        return Err(io::Error::last_os_error());
283    }
284    Ok(())
285}
286
287fn setsockopt_int(fd: c_int, level: c_int, name: c_int, val: c_int) -> io::Result<()> {
288    let r = unsafe {
289        ffi::setsockopt(
290            fd,
291            level,
292            name,
293            &val as *const c_int as *const c_void,
294            size_of::<c_int>() as u32,
295        )
296    };
297    if r < 0 {
298        return Err(io::Error::last_os_error());
299    }
300    Ok(())
301}
302
303fn listen_inner(ip: [u8; 4], port: u16, backlog: i32, reuseport: bool) -> io::Result<Socket> {
304    let fd = unsafe { ffi::socket(AF_INET, SOCK_STREAM, 0) };
305    if fd < 0 {
306        return Err(io::Error::last_os_error());
307    }
308    let sock = Socket { fd }; // owns fd: any early return closes it
309
310    setsockopt_int(fd, SOL_SOCKET, SO_REUSEADDR, 1)?;
311    if reuseport {
312        // Each thread-per-core shard opens its own listener on the same port;
313        // the kernel load-balances accepted connections across them.
314        setsockopt_int(fd, SOL_SOCKET, SO_REUSEPORT, 1)?;
315    }
316
317    let addr = SockaddrIn::new(ip, port);
318    let r = unsafe {
319        ffi::bind(
320            fd,
321            &addr as *const SockaddrIn as *const c_void,
322            size_of::<SockaddrIn>() as u32,
323        )
324    };
325    if r < 0 {
326        return Err(io::Error::last_os_error());
327    }
328    if unsafe { ffi::listen(fd, backlog) } < 0 {
329        return Err(io::Error::last_os_error());
330    }
331    Ok(sock)
332}
333
334/// Create a blocking IPv4 TCP listener bound to `ip:port` with `SO_REUSEADDR`.
335/// Pass `port == 0` to let the OS assign an ephemeral port.
336pub fn tcp_listen(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
337    listen_inner(ip, port, backlog, false)
338}
339
340/// Like [`tcp_listen`] but also sets `SO_REUSEPORT`, so multiple listeners can
341/// share one port (one per thread-per-core shard).
342pub fn tcp_listen_reuseport(ip: [u8; 4], port: u16, backlog: i32) -> io::Result<Socket> {
343    listen_inner(ip, port, backlog, true)
344}
345
346/// A self-pipe used to wake a [`Poller`] blocked in `wait` from another thread.
347/// Register `read_fd()` in the poller for read-readiness; call `wake()` from any
348/// thread to make the poll return; call `drain()` when the read end fires.
349pub struct Waker {
350    read_fd: c_int,
351    write_fd: c_int,
352}
353
354/// Create a non-blocking self-pipe waker.
355pub fn waker() -> io::Result<Waker> {
356    let mut fds = [0 as c_int; 2];
357    if unsafe { ffi::pipe(fds.as_mut_ptr()) } < 0 {
358        return Err(io::Error::last_os_error());
359    }
360    let w = Waker {
361        read_fd: fds[0],
362        write_fd: fds[1],
363    };
364    set_fd_nonblocking(w.read_fd)?;
365    set_fd_nonblocking(w.write_fd)?;
366    Ok(w)
367}
368
369impl Waker {
370    /// The read end — register this in a [`Poller`] for read-readiness.
371    #[inline]
372    pub fn read_fd(&self) -> i32 {
373        self.read_fd
374    }
375
376    /// Signal the waker. A full pipe already means "pending", so EAGAIN is fine.
377    pub fn wake(&self) -> io::Result<()> {
378        let byte = [1u8];
379        loop {
380            let n = unsafe { ffi::write(self.write_fd, byte.as_ptr() as *const c_void, 1) };
381            if n < 0 {
382                let e = io::Error::last_os_error();
383                match e.kind() {
384                    io::ErrorKind::Interrupted => continue,
385                    io::ErrorKind::WouldBlock => return Ok(()),
386                    _ => return Err(e),
387                }
388            }
389            return Ok(());
390        }
391    }
392
393    /// Consume all pending wake bytes after the read end fires.
394    pub fn drain(&self) {
395        let mut buf = [0u8; 64];
396        loop {
397            let n = unsafe { ffi::read(self.read_fd, buf.as_mut_ptr() as *mut c_void, buf.len()) };
398            if n <= 0 {
399                break; // EAGAIN / EOF / error — nothing more to drain
400            }
401        }
402    }
403}
404
405impl Drop for Waker {
406    fn drop(&mut self) {
407        unsafe {
408            ffi::close(self.read_fd);
409            ffi::close(self.write_fd);
410        }
411    }
412}
413
414// The pipe ends are plain fds with no aliasing; safe to move across threads.
415unsafe impl Send for Waker {}
416unsafe impl Sync for Waker {}
417
418// ---- Poller ----------------------------------------------------------------
419
420/// A readiness notification for one file descriptor.
421#[derive(Debug, Clone, Copy)]
422pub struct Event {
423    pub fd: i32,
424    pub readable: bool,
425    pub writable: bool,
426    /// Peer hang-up / error — the connection should be closed.
427    pub hup: bool,
428}
429
430/// How many raw events to pull from the kernel per `wait` call.
431const WAIT_CAPACITY: usize = 1024;
432
433
434
435#[cfg(test)]
436mod tests;