Skip to main content

kevy_uring/
prep.rs

1//! SQE preparation helpers — `prep_*` queue one io_uring submission entry
2//! into the SQ ring. Split out of [`crate::ring`] so that file stays under
3//! the 500-LOC house rule. Each helper returns `false` if the SQ is full;
4//! the caller is expected to submit and retry.
5
6use core::ptr;
7
8use crate::ffi::{
9    IORING_ACCEPT_MULTISHOT, IORING_OP_ACCEPT, IORING_OP_ASYNC_CANCEL, IORING_OP_NOP,
10    IORING_OP_READ, IORING_OP_RECV, IORING_OP_TIMEOUT, IORING_OP_WRITE, IORING_OP_WRITEV,
11    IORING_RECV_MULTISHOT, IOSQE_BUFFER_SELECT, IOSQE_FIXED_FILE, Iovec, SOCK_CLOEXEC,
12    SOCK_NONBLOCK,
13};
14use crate::layout::{IoUringSqe, KernelTimespec};
15use crate::ring::IoUring;
16
17impl IoUring {
18    /// Queue a `read(fd)` of `len` bytes into `buf`, tagged with `user_data`.
19    /// Returns `false` if the SQ is full.
20    ///
21    /// # Safety
22    /// `buf` must point to `len` writable bytes and stay valid until the matching
23    /// completion is reaped.
24    pub unsafe fn prep_read(&mut self, fd: i32, buf: *mut u8, len: u32, user_data: u64) -> bool {
25        let Some(idx) = self.reserve() else {
26            return false;
27        };
28        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
29        unsafe {
30            ptr::write(
31                self.sqes_ptr().add(idx),
32                IoUringSqe::new(IORING_OP_READ, fd, buf as u64, len, user_data),
33            );
34        }
35        true
36    }
37
38    /// Queue a `write(fd)` of `len` bytes from `buf`, tagged with `user_data`.
39    /// Returns `false` if the SQ is full.
40    ///
41    /// # Safety
42    /// `buf` must point to `len` readable bytes and stay valid until the matching
43    /// completion is reaped.
44    pub unsafe fn prep_write(&mut self, fd: i32, buf: *const u8, len: u32, user_data: u64) -> bool {
45        let Some(idx) = self.reserve() else {
46            return false;
47        };
48        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
49        unsafe {
50            ptr::write(
51                self.sqes_ptr().add(idx),
52                IoUringSqe::new(IORING_OP_WRITE, fd, buf as u64, len, user_data),
53            );
54        }
55        true
56    }
57
58    /// Queue a `writev(fd, iov, iovcnt)`. L1 (2026-06-21): the reactor's
59    /// reply path uses this to fuse [header iovec, value-borrow iovec,
60    /// CRLF iovec] into one syscall — the per-GET memcpy of the value
61    /// into the conn output buffer is avoided.
62    ///
63    /// # Safety
64    /// `iov` must point to `iovcnt` valid `Iovec` entries, each `iov_base`
65    /// pointing to a readable byte range of length `iov_len`. The kernel
66    /// reads the iovec array AND each base asynchronously — both must
67    /// stay valid until the matching completion is reaped (the reactor
68    /// parks them in the conn's pending-writev state and drops on CQE).
69    pub unsafe fn prep_writev(
70        &mut self,
71        fd: i32,
72        iov: *const Iovec,
73        iovcnt: u32,
74        user_data: u64,
75    ) -> bool {
76        let Some(idx) = self.reserve() else {
77            return false;
78        };
79        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own
80        // alone. addr = iov pointer; len = iovcnt; off field (unused here)
81        // stays 0.
82        unsafe {
83            ptr::write(
84                self.sqes_ptr().add(idx),
85                IoUringSqe::new(IORING_OP_WRITEV, fd, iov as u64, iovcnt, user_data),
86            );
87        }
88        true
89    }
90
91    /// Queue a **multishot** `recv(fd)` that draws its destination buffer from
92    /// the provided-buffer group `bgid` (see [`IoUring::register_buf_ring`]): one
93    /// SQE re-fires a completion per arrival, the kernel picking + reporting a
94    /// buffer id each time, until it terminates (error / `ENOBUFS`, signalled by
95    /// [`crate::Completion::has_more`] returning `false`). No per-recv SQE, no read
96    /// buffer to keep alive. Returns `false` if the SQ is full.
97    pub fn prep_recv_multishot(&mut self, fd: i32, bgid: u16, user_data: u64) -> bool {
98        let Some(idx) = self.reserve() else {
99            return false;
100        };
101        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
102        unsafe {
103            let sqe = self.sqes_ptr().add(idx);
104            // addr/len 0: the buffer comes from the group, not from us.
105            ptr::write(sqe, IoUringSqe::new(IORING_OP_RECV, fd, 0, 0, user_data));
106            (*sqe).ioprio = IORING_RECV_MULTISHOT;
107            (*sqe).flags = IOSQE_BUFFER_SELECT;
108            // `buf_index` aliases `buf_group` in the kernel ABI.
109            (*sqe).buf_index = bgid;
110        }
111        true
112    }
113
114    /// Same as [`Self::prep_write`] but addresses the destination by
115    /// registered-files **slot index** instead of raw fd. Sets
116    /// `IOSQE_FIXED_FILE`; the kernel skips its per-op `fget`/`fput`. Caller
117    /// must have populated `slot` via
118    /// [`crate::IoUring::update_file_slot`].
119    ///
120    /// # Safety
121    /// Same as `prep_write`.
122    pub unsafe fn prep_write_fixed(
123        &mut self,
124        slot: u32,
125        buf: *const u8,
126        len: u32,
127        user_data: u64,
128    ) -> bool {
129        let Some(idx) = self.reserve() else {
130            return false;
131        };
132        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
133        unsafe {
134            let sqe = self.sqes_ptr().add(idx);
135            ptr::write(
136                sqe,
137                IoUringSqe::new(IORING_OP_WRITE, slot as i32, buf as u64, len, user_data),
138            );
139            (*sqe).flags = IOSQE_FIXED_FILE;
140        }
141        true
142    }
143
144    /// Same as [`Self::prep_recv_multishot`] but addresses the source by
145    /// registered-files **slot index** instead of raw fd. Sets
146    /// `IOSQE_FIXED_FILE | IOSQE_BUFFER_SELECT`; the kernel skips its
147    /// per-op `fget`/`fput`. Caller must have populated `slot` via
148    /// [`crate::IoUring::update_file_slot`].
149    pub fn prep_recv_multishot_fixed(
150        &mut self,
151        slot: u32,
152        bgid: u16,
153        user_data: u64,
154    ) -> bool {
155        let Some(idx) = self.reserve() else {
156            return false;
157        };
158        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
159        unsafe {
160            let sqe = self.sqes_ptr().add(idx);
161            ptr::write(
162                sqe,
163                IoUringSqe::new(IORING_OP_RECV, slot as i32, 0, 0, user_data),
164            );
165            (*sqe).ioprio = IORING_RECV_MULTISHOT;
166            (*sqe).flags = IOSQE_BUFFER_SELECT | IOSQE_FIXED_FILE;
167            (*sqe).buf_index = bgid;
168        }
169        true
170    }
171
172    /// Queue an `accept` on `listen_fd`; the accepted fd arrives as the
173    /// completion's `res` (already `O_NONBLOCK | O_CLOEXEC`). Returns `false` if
174    /// the SQ is full.
175    pub fn prep_accept(&mut self, listen_fd: i32, user_data: u64) -> bool {
176        let Some(idx) = self.reserve() else {
177            return false;
178        };
179        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
180        unsafe {
181            let sqe = self.sqes_ptr().add(idx);
182            ptr::write(
183                sqe,
184                IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
185            );
186            (*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
187        }
188        true
189    }
190
191    /// Queue a **multishot** accept on `listen_fd` (Linux 5.19+). The kernel
192    /// keeps one SQE armed across many connections — each new fd arrives as
193    /// its own CQE with `IORING_CQE_F_MORE` set in `flags` while still armed.
194    /// When `F_MORE` is clear the multishot has terminated and userland must
195    /// re-arm via this fn (or fall back to [`Self::prep_accept`]). Caller
196    /// must keep `user_data` stable across the run of one multishot — each
197    /// CQE replays the same tag.
198    ///
199    /// B4 (2026-06-20): replaces the one-SQE-per-accept call site in
200    /// `kevy_rt::uring_reactor`. At -c1 (one persistent conn) zero
201    /// difference; under high-conn-churn workloads cuts an SQE + an
202    /// `arm_conns`-loop trip per accept.
203    pub fn prep_accept_multishot(&mut self, listen_fd: i32, user_data: u64) -> bool {
204        let Some(idx) = self.reserve() else {
205            return false;
206        };
207        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
208        unsafe {
209            let sqe = self.sqes_ptr().add(idx);
210            ptr::write(
211                sqe,
212                IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
213            );
214            (*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
215            (*sqe).ioprio = IORING_ACCEPT_MULTISHOT;
216        }
217        true
218    }
219
220    /// Queue a relative timeout: the completion (res = `-ETIME`) arrives once
221    /// `ts` elapses, or earlier with res = 0 / `-ECANCELED` if the ring shuts
222    /// down. Bounds a blocking [`IoUring::submit_and_wait`] the way a poller's
223    /// wait-timeout would. Returns `false` if the SQ is full.
224    ///
225    /// # Safety
226    /// `ts` must stay valid (not moved or dropped) until the matching
227    /// completion is reaped — the kernel reads it asynchronously.
228    pub unsafe fn prep_timeout(&mut self, ts: *const KernelTimespec, user_data: u64) -> bool {
229        let Some(idx) = self.reserve() else {
230            return false;
231        };
232        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
233        unsafe {
234            let sqe = self.sqes_ptr().add(idx);
235            // addr = timespec ptr, len = 1 (one timespec), off = 0 (pure
236            // timeout — no completion-count trigger), rw_flags = 0 (relative).
237            ptr::write(
238                sqe,
239                IoUringSqe::new(IORING_OP_TIMEOUT, -1, ts as u64, 1, user_data),
240            );
241        }
242        true
243    }
244
245    /// Queue a no-op tagged with `user_data` (used to prove the round-trip).
246    /// Returns `false` if the SQ is full.
247    pub fn prep_nop(&mut self, user_data: u64) -> bool {
248        let Some(idx) = self.reserve() else {
249            return false;
250        };
251        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
252        unsafe {
253            ptr::write(
254                self.sqes_ptr().add(idx),
255                IoUringSqe::new(IORING_OP_NOP, -1, 0, 0, user_data),
256            );
257        }
258        true
259    }
260
261    /// Queue an `IORING_OP_ASYNC_CANCEL` SQE targeting a previously-armed
262    /// SQE whose `user_data == target`. Used by v1.29 B2-alt to cancel
263    /// an in-flight multishot recv before switching the conn to single-
264    /// shot `prep_read` for big-arg ingest.
265    ///
266    /// The kernel emits two CQEs:
267    /// - one for THIS cancel SQE, tagged `user_data`: `res = 0` on
268    ///   success, `-ENOENT` if no matching target found (already
269    ///   completed / never existed), `-EALREADY` if the target had
270    ///   already started executing.
271    /// - one for the target SQE, tagged with the target's user_data:
272    ///   `res = -ECANCELED` (`-125`) when cancellation succeeded.
273    ///
274    /// Caller MUST be prepared to reap both CQEs in either order; the
275    /// kernel does not guarantee ordering between the cancel-result CQE
276    /// and the target-cancelled CQE.
277    ///
278    /// Returns `false` if the SQ is full.
279    pub fn prep_cancel(&mut self, target: u64, user_data: u64) -> bool {
280        let Some(idx) = self.reserve() else {
281            return false;
282        };
283        // SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
284        // fd is unused for ASYNC_CANCEL; addr carries the target user_data.
285        unsafe {
286            ptr::write(
287                self.sqes_ptr().add(idx),
288                IoUringSqe::new(IORING_OP_ASYNC_CANCEL, -1, target, 0, user_data),
289            );
290        }
291        true
292    }
293}