1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
//! SQE preparation helpers — `prep_*` queue one io_uring submission entry
//! into the SQ ring. Split out of [`crate::ring`] so that file stays under
//! the 500-LOC house rule. Each helper returns `false` if the SQ is full;
//! the caller is expected to submit and retry.
use core::ptr;
use crate::ffi::{
IORING_ACCEPT_MULTISHOT, IORING_OP_ACCEPT, IORING_OP_ASYNC_CANCEL, IORING_OP_NOP,
IORING_OP_READ, IORING_OP_RECV, IORING_OP_TIMEOUT, IORING_OP_WRITE, IORING_OP_WRITEV,
IORING_RECV_MULTISHOT, IOSQE_BUFFER_SELECT, IOSQE_FIXED_FILE, Iovec, SOCK_CLOEXEC,
SOCK_NONBLOCK,
};
use crate::layout::{IoUringSqe, KernelTimespec};
use crate::ring::IoUring;
impl IoUring {
/// Queue a `read(fd)` of `len` bytes into `buf`, tagged with `user_data`.
/// Returns `false` if the SQ is full.
///
/// # Safety
/// `buf` must point to `len` writable bytes and stay valid until the matching
/// completion is reaped.
pub unsafe fn prep_read(&mut self, fd: i32, buf: *mut u8, len: u32, user_data: u64) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
ptr::write(
self.sqes_ptr().add(idx),
IoUringSqe::new(IORING_OP_READ, fd, buf as u64, len, user_data),
);
}
true
}
/// Queue a `write(fd)` of `len` bytes from `buf`, tagged with `user_data`.
/// Returns `false` if the SQ is full.
///
/// # Safety
/// `buf` must point to `len` readable bytes and stay valid until the matching
/// completion is reaped.
pub unsafe fn prep_write(&mut self, fd: i32, buf: *const u8, len: u32, user_data: u64) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
ptr::write(
self.sqes_ptr().add(idx),
IoUringSqe::new(IORING_OP_WRITE, fd, buf as u64, len, user_data),
);
}
true
}
/// Queue a `writev(fd, iov, iovcnt)`. L1 (2026-06-21): the reactor's
/// reply path uses this to fuse [header iovec, value-borrow iovec,
/// CRLF iovec] into one syscall — the per-GET memcpy of the value
/// into the conn output buffer is avoided.
///
/// # Safety
/// `iov` must point to `iovcnt` valid `Iovec` entries, each `iov_base`
/// pointing to a readable byte range of length `iov_len`. The kernel
/// reads the iovec array AND each base asynchronously — both must
/// stay valid until the matching completion is reaped (the reactor
/// parks them in the conn's pending-writev state and drops on CQE).
pub unsafe fn prep_writev(
&mut self,
fd: i32,
iov: *const Iovec,
iovcnt: u32,
user_data: u64,
) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own
// alone. addr = iov pointer; len = iovcnt; off field (unused here)
// stays 0.
unsafe {
ptr::write(
self.sqes_ptr().add(idx),
IoUringSqe::new(IORING_OP_WRITEV, fd, iov as u64, iovcnt, user_data),
);
}
true
}
/// Queue a **multishot** `recv(fd)` that draws its destination buffer from
/// the provided-buffer group `bgid` (see [`IoUring::register_buf_ring`]): one
/// SQE re-fires a completion per arrival, the kernel picking + reporting a
/// buffer id each time, until it terminates (error / `ENOBUFS`, signalled by
/// [`crate::Completion::has_more`] returning `false`). No per-recv SQE, no read
/// buffer to keep alive. Returns `false` if the SQ is full.
pub fn prep_recv_multishot(&mut self, fd: i32, bgid: u16, user_data: u64) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
let sqe = self.sqes_ptr().add(idx);
// addr/len 0: the buffer comes from the group, not from us.
ptr::write(sqe, IoUringSqe::new(IORING_OP_RECV, fd, 0, 0, user_data));
(*sqe).ioprio = IORING_RECV_MULTISHOT;
(*sqe).flags = IOSQE_BUFFER_SELECT;
// `buf_index` aliases `buf_group` in the kernel ABI.
(*sqe).buf_index = bgid;
}
true
}
/// Same as [`Self::prep_write`] but addresses the destination by
/// registered-files **slot index** instead of raw fd. Sets
/// `IOSQE_FIXED_FILE`; the kernel skips its per-op `fget`/`fput`. Caller
/// must have populated `slot` via
/// [`crate::IoUring::update_file_slot`].
///
/// # Safety
/// Same as `prep_write`.
pub unsafe fn prep_write_fixed(
&mut self,
slot: u32,
buf: *const u8,
len: u32,
user_data: u64,
) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
let sqe = self.sqes_ptr().add(idx);
ptr::write(
sqe,
IoUringSqe::new(IORING_OP_WRITE, slot as i32, buf as u64, len, user_data),
);
(*sqe).flags = IOSQE_FIXED_FILE;
}
true
}
/// Same as [`Self::prep_recv_multishot`] but addresses the source by
/// registered-files **slot index** instead of raw fd. Sets
/// `IOSQE_FIXED_FILE | IOSQE_BUFFER_SELECT`; the kernel skips its
/// per-op `fget`/`fput`. Caller must have populated `slot` via
/// [`crate::IoUring::update_file_slot`].
pub fn prep_recv_multishot_fixed(
&mut self,
slot: u32,
bgid: u16,
user_data: u64,
) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
let sqe = self.sqes_ptr().add(idx);
ptr::write(
sqe,
IoUringSqe::new(IORING_OP_RECV, slot as i32, 0, 0, user_data),
);
(*sqe).ioprio = IORING_RECV_MULTISHOT;
(*sqe).flags = IOSQE_BUFFER_SELECT | IOSQE_FIXED_FILE;
(*sqe).buf_index = bgid;
}
true
}
/// Queue an `accept` on `listen_fd`; the accepted fd arrives as the
/// completion's `res` (already `O_NONBLOCK | O_CLOEXEC`). Returns `false` if
/// the SQ is full.
pub fn prep_accept(&mut self, listen_fd: i32, user_data: u64) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
let sqe = self.sqes_ptr().add(idx);
ptr::write(
sqe,
IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
);
(*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
}
true
}
/// Queue a **multishot** accept on `listen_fd` (Linux 5.19+). The kernel
/// keeps one SQE armed across many connections — each new fd arrives as
/// its own CQE with `IORING_CQE_F_MORE` set in `flags` while still armed.
/// When `F_MORE` is clear the multishot has terminated and userland must
/// re-arm via this fn (or fall back to [`Self::prep_accept`]). Caller
/// must keep `user_data` stable across the run of one multishot — each
/// CQE replays the same tag.
///
/// B4 (2026-06-20): replaces the one-SQE-per-accept call site in
/// `kevy_rt::uring_reactor`. At -c1 (one persistent conn) zero
/// difference; under high-conn-churn workloads cuts an SQE + an
/// `arm_conns`-loop trip per accept.
pub fn prep_accept_multishot(&mut self, listen_fd: i32, user_data: u64) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
let sqe = self.sqes_ptr().add(idx);
ptr::write(
sqe,
IoUringSqe::new(IORING_OP_ACCEPT, listen_fd, 0, 0, user_data),
);
(*sqe).rw_flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
(*sqe).ioprio = IORING_ACCEPT_MULTISHOT;
}
true
}
/// Queue a relative timeout: the completion (res = `-ETIME`) arrives once
/// `ts` elapses, or earlier with res = 0 / `-ECANCELED` if the ring shuts
/// down. Bounds a blocking [`IoUring::submit_and_wait`] the way a poller's
/// wait-timeout would. Returns `false` if the SQ is full.
///
/// # Safety
/// `ts` must stay valid (not moved or dropped) until the matching
/// completion is reaped — the kernel reads it asynchronously.
pub unsafe fn prep_timeout(&mut self, ts: *const KernelTimespec, user_data: u64) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
let sqe = self.sqes_ptr().add(idx);
// addr = timespec ptr, len = 1 (one timespec), off = 0 (pure
// timeout — no completion-count trigger), rw_flags = 0 (relative).
ptr::write(
sqe,
IoUringSqe::new(IORING_OP_TIMEOUT, -1, ts as u64, 1, user_data),
);
}
true
}
/// Queue a no-op tagged with `user_data` (used to prove the round-trip).
/// Returns `false` if the SQ is full.
pub fn prep_nop(&mut self, user_data: u64) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
unsafe {
ptr::write(
self.sqes_ptr().add(idx),
IoUringSqe::new(IORING_OP_NOP, -1, 0, 0, user_data),
);
}
true
}
/// Queue an `IORING_OP_ASYNC_CANCEL` SQE targeting a previously-armed
/// SQE whose `user_data == target`. Used by v1.29 B2-alt to cancel
/// an in-flight multishot recv before switching the conn to single-
/// shot `prep_read` for big-arg ingest.
///
/// The kernel emits two CQEs:
/// - one for THIS cancel SQE, tagged `user_data`: `res = 0` on
/// success, `-ENOENT` if no matching target found (already
/// completed / never existed), `-EALREADY` if the target had
/// already started executing.
/// - one for the target SQE, tagged with the target's user_data:
/// `res = -ECANCELED` (`-125`) when cancellation succeeded.
///
/// Caller MUST be prepared to reap both CQEs in either order; the
/// kernel does not guarantee ordering between the cancel-result CQE
/// and the target-cancelled CQE.
///
/// Returns `false` if the SQ is full.
pub fn prep_cancel(&mut self, target: u64, user_data: u64) -> bool {
let Some(idx) = self.reserve() else {
return false;
};
// SAFETY: `idx` is a freshly reserved, in-bounds SQE slot we own alone.
// fd is unused for ASYNC_CANCEL; addr carries the target user_data.
unsafe {
ptr::write(
self.sqes_ptr().add(idx),
IoUringSqe::new(IORING_OP_ASYNC_CANCEL, -1, target, 0, user_data),
);
}
true
}
}