1#![allow(unsafe_code)]
9#![allow(non_camel_case_types)]
10#![allow(dead_code)]
11#![allow(missing_docs)]
15
16use crate::PipelineError;
17use core::mem;
18use core::ptr;
19
20const IORING_FEAT_SINGLE_MMAP: u32 = 1 << 0;
22const IORING_SETUP_SQPOLL: u32 = 1 << 1;
23const IORING_ENTER_SQ_WAKEUP: u32 = 1 << 1;
24const IORING_SQ_NEED_WAKEUP: u32 = 1 << 0;
25
26const IORING_OFF_SQ_RING: u64 = 0;
27const IORING_OFF_CQ_RING: u64 = 0x8000000;
28const IORING_OFF_SQES: u64 = 0x10000000;
29
30const IORING_REGISTER_BUFFERS: u32 = 0;
32const IORING_REGISTER_FILES: u32 = 2;
33
34pub const IOSQE_FIXED_FILE: u8 = 1 << 0;
36
37#[repr(C)]
40#[derive(Debug, Default, Clone, Copy)]
41pub struct io_sqring_offsets {
42 pub head: u32,
43 pub tail: u32,
44 pub ring_mask: u32,
45 pub ring_entries: u32,
46 pub flags: u32,
47 pub dropped: u32,
48 pub array: u32,
49 pub resv1: u32,
50 pub resv2: u64,
51}
52
53#[repr(C)]
54#[derive(Debug, Default, Clone, Copy)]
55pub struct io_cqring_offsets {
56 pub head: u32,
57 pub tail: u32,
58 pub ring_mask: u32,
59 pub ring_entries: u32,
60 pub overflow: u32,
61 pub cqes: u32,
62 pub flags: u32,
63 pub resv1: u32,
64 pub resv2: u64,
65}
66
67#[repr(C)]
68#[derive(Debug, Default, Clone, Copy)]
69pub struct io_uring_params {
70 pub sq_entries: u32,
71 pub cq_entries: u32,
72 pub flags: u32,
73 pub sq_thread_cpu: u32,
74 pub sq_thread_idle: u32,
75 pub features: u32,
76 pub wq_fd: u32,
77 pub resv: [u32; 3],
78 pub sq_off: io_sqring_offsets,
79 pub cq_off: io_cqring_offsets,
80}
81
82#[repr(C)]
83#[derive(Debug, Default, Clone, Copy)]
84pub struct io_uring_sqe {
85 pub opcode: u8,
86 pub flags: u8,
87 pub ioprio: u16,
88 pub fd: i32,
89 pub user_data_or_off: u64, pub addr: u64,
91 pub len: u32,
92 pub op_flags: u32,
93 pub user_data: u64,
94 pub buf_index: u16,
95 pub personality: u16,
96 pub file_index: i32, pub addr3: u64,
98 pub __pad2: [u64; 1],
99}
100
101#[repr(C)]
102#[derive(Debug, Default, Clone, Copy)]
103pub struct io_uring_cqe {
104 pub user_data: u64,
105 pub res: i32,
106 pub flags: u32,
107}
108
109pub struct IoUringState {
121 ring_fd: i32,
122 sq_ring_ptr: *mut libc::c_void,
123 sq_ring_size: usize,
124 cq_ring_ptr: *mut libc::c_void,
125 cq_ring_size: usize,
126 sqes_ptr: *mut libc::c_void,
127 sqes_size: usize,
128 params: io_uring_params,
129}
130
131unsafe impl Send for IoUringState {}
133unsafe impl Sync for IoUringState {}
134
135impl IoUringState {
136 pub fn new(entries: u32) -> Result<Self, PipelineError> {
148 let mut params: io_uring_params = unsafe { mem::zeroed() };
150
151 params.flags |= IORING_SETUP_SQPOLL;
156 params.sq_thread_idle = 2000;
157
158 let ring_fd = unsafe {
160 libc::syscall(
161 libc::SYS_io_uring_setup,
162 entries,
163 &mut params as *mut io_uring_params,
164 )
165 };
166
167 if ring_fd < 0 {
168 return Err(PipelineError::IoUringSyscall {
169 syscall: "io_uring_setup",
170 errno: val_to_err(),
171 fix: "check kernel version (>= 5.1 required), CAP_SYS_ADMIN for SQPOLL on < 5.13, and nofile ulimit",
172 });
173 }
174
175 let ring_fd = syscall_result_i32(
176 ring_fd,
177 "io_uring_setup",
178 "io_uring_setup returned an fd outside i32; check libc/kernel ABI bindings",
179 )?;
180
181 let sq_ring_size = kernel_ring_span_usize(
182 params.sq_off.array,
183 params.sq_entries,
184 mem::size_of::<u32>(),
185 "SQ ring",
186 )?;
187 let cq_ring_size = kernel_ring_span_usize(
188 params.cq_off.cqes,
189 params.cq_entries,
190 mem::size_of::<io_uring_cqe>(),
191 "CQ ring",
192 )?;
193
194 let (sq_size, cq_size) = if (params.features & IORING_FEAT_SINGLE_MMAP) != 0 {
195 let max_size = core::cmp::max(sq_ring_size, cq_ring_size);
196 (max_size, max_size)
197 } else {
198 (sq_ring_size, cq_ring_size)
199 };
200
201 let sq_ring_ptr = unsafe {
203 libc::mmap(
204 ptr::null_mut(),
205 sq_size,
206 libc::PROT_READ | libc::PROT_WRITE,
207 libc::MAP_SHARED | libc::MAP_POPULATE,
208 ring_fd,
209 IORING_OFF_SQ_RING as libc::off_t,
210 )
211 };
212
213 if sq_ring_ptr == libc::MAP_FAILED {
214 let err = val_to_err();
215 unsafe {
218 libc::close(ring_fd);
219 }
220 return Err(PipelineError::IoUringSyscall {
221 syscall: "mmap(sq_ring)",
222 errno: err,
223 fix: "check /proc/sys/vm/max_map_count and process memory limits",
224 });
225 }
226
227 let cq_ring_ptr = if (params.features & IORING_FEAT_SINGLE_MMAP) != 0 {
228 sq_ring_ptr
229 } else {
230 let ptr = unsafe {
233 libc::mmap(
234 ptr::null_mut(),
235 cq_size,
236 libc::PROT_READ | libc::PROT_WRITE,
237 libc::MAP_SHARED | libc::MAP_POPULATE,
238 ring_fd,
239 IORING_OFF_CQ_RING as libc::off_t,
240 )
241 };
242 if ptr == libc::MAP_FAILED {
243 let err = val_to_err();
244 unsafe {
247 libc::munmap(sq_ring_ptr, sq_size);
248 libc::close(ring_fd);
249 }
250 return Err(PipelineError::IoUringSyscall {
251 syscall: "mmap(cq_ring)",
252 errno: err,
253 fix: "check /proc/sys/vm/max_map_count and process memory limits",
254 });
255 }
256 ptr
257 };
258
259 let sqes_size = kernel_record_span_usize(
260 params.sq_entries,
261 mem::size_of::<io_uring_sqe>(),
262 "SQE table",
263 )?;
264 let sqes_ptr = unsafe {
266 libc::mmap(
267 ptr::null_mut(),
268 sqes_size,
269 libc::PROT_READ | libc::PROT_WRITE,
270 libc::MAP_SHARED | libc::MAP_POPULATE,
271 ring_fd,
272 IORING_OFF_SQES as libc::off_t,
273 )
274 };
275
276 if sqes_ptr == libc::MAP_FAILED {
277 let err = val_to_err();
278 unsafe {
281 if (params.features & IORING_FEAT_SINGLE_MMAP) == 0 {
282 libc::munmap(cq_ring_ptr, cq_size);
283 }
284 libc::munmap(sq_ring_ptr, sq_size);
285 libc::close(ring_fd);
286 }
287 return Err(PipelineError::IoUringSyscall {
288 syscall: "mmap(sqes)",
289 errno: err,
290 fix: "check /proc/sys/vm/max_map_count and process memory limits",
291 });
292 }
293
294 Ok(Self {
295 ring_fd,
296 sq_ring_ptr,
297 sq_ring_size: sq_size,
298 cq_ring_ptr,
299 cq_ring_size: cq_size,
300 sqes_ptr,
301 sqes_size,
302 params,
303 })
304 }
305
306 pub fn enter(
314 &self,
315 to_submit: u32,
316 min_complete: u32,
317 flags: u32,
318 ) -> Result<i32, PipelineError> {
319 let res = unsafe {
322 libc::syscall(
323 libc::SYS_io_uring_enter,
324 self.ring_fd,
325 to_submit,
326 min_complete,
327 flags,
328 ptr::null::<libc::sigset_t>(),
329 0, )
331 };
332 if res < 0 {
333 Err(PipelineError::IoUringSyscall {
334 syscall: "io_uring_enter",
335 errno: val_to_err(),
336 fix: "retry on EINTR/EBUSY; check SQPOLL thread health via /proc/<pid>/task/ on ENXIO",
337 })
338 } else {
339 syscall_result_i32(
340 res,
341 "io_uring_enter",
342 "io_uring_enter returned a completion count outside i32; check libc/kernel ABI bindings",
343 )
344 }
345 }
346
347 #[must_use]
349 pub fn uses_sqpoll(&self) -> bool {
350 (self.params.flags & IORING_SETUP_SQPOLL) != 0
351 }
352
353 #[must_use]
355 pub fn sq_needs_wakeup(&self) -> bool {
356 unsafe {
359 let flags = (*(self.sq_ring_ptr.add(kernel_offset_usize_or_panic(
360 self.params.sq_off.flags,
361 "SQ flags offset",
362 )) as *const core::sync::atomic::AtomicU32))
363 .load(core::sync::atomic::Ordering::Acquire);
364 (flags & IORING_SQ_NEED_WAKEUP) != 0
365 }
366 }
367
368 pub fn wake_sqpoll(&self) -> Result<i32, PipelineError> {
370 self.enter(0, 0, IORING_ENTER_SQ_WAKEUP)
371 }
372
373 pub fn get_sqe(&mut self) -> Option<&mut io_uring_sqe> {
375 unsafe {
377 let head = (*(self.sq_ring_ptr.add(kernel_offset_usize_or_panic(
378 self.params.sq_off.head,
379 "SQ head offset",
380 )) as *const core::sync::atomic::AtomicU32))
381 .load(core::sync::atomic::Ordering::Acquire);
382 let tail_ptr = self.sq_ring_ptr.add(kernel_offset_usize_or_panic(
383 self.params.sq_off.tail,
384 "SQ tail offset",
385 )) as *const core::sync::atomic::AtomicU32;
386 let tail = (*tail_ptr).load(core::sync::atomic::Ordering::Relaxed);
387 let ring_entries = *(self.sq_ring_ptr.add(kernel_offset_usize_or_panic(
388 self.params.sq_off.ring_entries,
389 "SQ ring_entries offset",
390 )) as *const u32);
391
392 if tail.wrapping_sub(head) < ring_entries {
393 let ring_mask = *(self.sq_ring_ptr.add(kernel_offset_usize_or_panic(
394 self.params.sq_off.ring_mask,
395 "SQ ring_mask offset",
396 )) as *const u32);
397 let idx = tail & ring_mask;
398 let sqes = self.sqes_ptr as *mut io_uring_sqe;
399 Some(&mut *sqes.add(kernel_offset_usize_or_panic(idx, "SQE index")))
400 } else {
401 None
402 }
403 }
404 }
405
406 pub fn commit_sqe(&mut self) {
408 unsafe {
410 let tail_ptr = self.sq_ring_ptr.add(kernel_offset_usize_or_panic(
411 self.params.sq_off.tail,
412 "SQ tail offset",
413 )) as *const core::sync::atomic::AtomicU32;
414 let tail = (*tail_ptr).load(core::sync::atomic::Ordering::Relaxed);
415 let array_ptr = self.sq_ring_ptr.add(kernel_offset_usize_or_panic(
416 self.params.sq_off.array,
417 "SQ array offset",
418 )) as *mut u32;
419 let ring_mask = *(self.sq_ring_ptr.add(kernel_offset_usize_or_panic(
420 self.params.sq_off.ring_mask,
421 "SQ ring_mask offset",
422 )) as *const u32);
423 let idx = tail & ring_mask;
424
425 *array_ptr.add(kernel_offset_usize_or_panic(idx, "SQ array index")) = idx;
426 (*(tail_ptr as *mut core::sync::atomic::AtomicU32))
427 .store(tail.wrapping_add(1), core::sync::atomic::Ordering::Release);
428 }
429 }
430
431 pub fn peek_cqe(&mut self) -> Option<&io_uring_cqe> {
433 unsafe {
435 let head_ptr = self.cq_ring_ptr.add(kernel_offset_usize_or_panic(
436 self.params.cq_off.head,
437 "CQ head offset",
438 )) as *const core::sync::atomic::AtomicU32;
439 let head = (*head_ptr).load(core::sync::atomic::Ordering::Relaxed);
440 let tail = (*(self.cq_ring_ptr.add(kernel_offset_usize_or_panic(
441 self.params.cq_off.tail,
442 "CQ tail offset",
443 )) as *const core::sync::atomic::AtomicU32))
444 .load(core::sync::atomic::Ordering::Acquire);
445
446 if head != tail {
447 let ring_mask = *(self.cq_ring_ptr.add(kernel_offset_usize_or_panic(
448 self.params.cq_off.ring_mask,
449 "CQ ring_mask offset",
450 )) as *const u32);
451 let idx = head & ring_mask;
452 let cqes = self.cq_ring_ptr.add(kernel_offset_usize_or_panic(
453 self.params.cq_off.cqes,
454 "CQ CQE base offset",
455 )) as *const io_uring_cqe;
456 Some(&*cqes.add(kernel_offset_usize_or_panic(idx, "CQE index")))
457 } else {
458 None
459 }
460 }
461 }
462
463 pub fn register_buffers(
475 &self,
476 iovecs: &[crate::uring::stream::Iovec],
477 ) -> Result<(), PipelineError> {
478 let res = unsafe {
480 libc::syscall(
481 libc::SYS_io_uring_register,
482 self.ring_fd,
483 IORING_REGISTER_BUFFERS,
484 iovecs.as_ptr() as *const core::ffi::c_void,
485 slice_len_u32(iovecs.len(), "registered buffer count")?,
486 )
487 };
488 if res < 0 {
489 Err(PipelineError::IoUringSyscall {
490 syscall: "io_uring_register(BUFFERS)",
491 errno: val_to_err(),
492 fix: "check /proc/sys/vm/max_user_watches; EOPNOTSUPP means kernel < 5.1",
493 })
494 } else {
495 Ok(())
496 }
497 }
498
499 pub fn register_files(&self, fds: &[i32]) -> Result<(), PipelineError> {
508 let res = unsafe {
510 libc::syscall(
511 libc::SYS_io_uring_register,
512 self.ring_fd,
513 IORING_REGISTER_FILES,
514 fds.as_ptr() as *const core::ffi::c_void,
515 slice_len_u32(fds.len(), "registered file count")?,
516 )
517 };
518 if res < 0 {
519 Err(PipelineError::IoUringSyscall {
520 syscall: "io_uring_register(FILES)",
521 errno: val_to_err(),
522 fix: "ensure every fd is still open; ENOMEM means lower the fd set size",
523 })
524 } else {
525 Ok(())
526 }
527 }
528
529 pub fn advance_cq(&mut self) {
531 unsafe {
533 let head_ptr = self.cq_ring_ptr.add(kernel_offset_usize_or_panic(
534 self.params.cq_off.head,
535 "CQ head offset",
536 )) as *mut core::sync::atomic::AtomicU32;
537 let head = (*head_ptr).load(core::sync::atomic::Ordering::Relaxed);
538 (*head_ptr).store(head.wrapping_add(1), core::sync::atomic::Ordering::Release);
539 }
540 }
541}
542
543impl Drop for IoUringState {
544 fn drop(&mut self) {
545 unsafe {
547 libc::munmap(self.sqes_ptr, self.sqes_size);
548 if self.sq_ring_ptr != self.cq_ring_ptr {
549 libc::munmap(self.cq_ring_ptr, self.cq_ring_size);
550 }
551 libc::munmap(self.sq_ring_ptr, self.sq_ring_size);
552 libc::close(self.ring_fd);
553 }
554 }
555}
556
557fn val_to_err() -> i32 {
558 unsafe { *libc::__errno_location() }
561}
562
563fn syscall_result_i32(
564 value: libc::c_long,
565 syscall: &'static str,
566 fix: &'static str,
567) -> Result<i32, PipelineError> {
568 i32::try_from(value).map_err(|_| PipelineError::IoUringSyscall {
569 syscall,
570 errno: libc::EOVERFLOW,
571 fix,
572 })
573}
574
575fn kernel_ring_span_usize(
576 base_offset: u32,
577 entries: u32,
578 record_bytes: usize,
579 label: &'static str,
580) -> Result<usize, PipelineError> {
581 let record_bytes = u32::try_from(record_bytes).map_err(|_| PipelineError::IoUringSyscall {
582 syscall: "io_uring_setup",
583 errno: libc::EOVERFLOW,
584 fix: match label {
585 "SQ ring" => {
586 "SQ ring record width cannot fit u32; use a supported kernel/userspace ABI"
587 }
588 "CQ ring" => {
589 "CQ ring record width cannot fit u32; use a supported kernel/userspace ABI"
590 }
591 _ => "io_uring record width cannot fit u32; use a supported kernel/userspace ABI",
592 },
593 })?;
594 let payload = vyre_driver::accounting::checked_mul_u32_value(
595 entries,
596 record_bytes,
597 PipelineError::IoUringSyscall {
598 syscall: "io_uring_setup",
599 errno: libc::EOVERFLOW,
600 fix: match label {
601 "SQ ring" => "SQ ring mmap size overflowed u32; reduce requested entries",
602 "CQ ring" => "CQ ring mmap size overflowed u32; reduce requested entries",
603 _ => "io_uring mmap size overflowed u32; reduce requested entries",
604 },
605 },
606 )?;
607 let bytes = vyre_driver::accounting::checked_add_u32_value(
608 base_offset,
609 payload,
610 PipelineError::IoUringSyscall {
611 syscall: "io_uring_setup",
612 errno: libc::EOVERFLOW,
613 fix: match label {
614 "SQ ring" => "SQ ring mmap span overflowed u32; reduce requested entries",
615 "CQ ring" => "CQ ring mmap span overflowed u32; reduce requested entries",
616 _ => "io_uring mmap span overflowed u32; reduce requested entries",
617 },
618 },
619 )?;
620 usize::try_from(bytes).map_err(|_| PipelineError::IoUringSyscall {
621 syscall: "io_uring_setup",
622 errno: libc::EOVERFLOW,
623 fix: match label {
624 "SQ ring" => "SQ ring mmap span cannot fit host usize; reduce requested entries",
625 "CQ ring" => "CQ ring mmap span cannot fit host usize; reduce requested entries",
626 _ => "io_uring mmap span cannot fit host usize; reduce requested entries",
627 },
628 })
629}
630
631fn kernel_record_span_usize(
632 entries: u32,
633 record_bytes: usize,
634 label: &'static str,
635) -> Result<usize, PipelineError> {
636 let entries = usize::try_from(entries).map_err(|_| PipelineError::IoUringSyscall {
637 syscall: "io_uring_setup",
638 errno: libc::EOVERFLOW,
639 fix: match label {
640 "SQE table" => "SQE entry count cannot fit host usize; reduce requested entries",
641 _ => "io_uring entry count cannot fit host usize; reduce requested entries",
642 },
643 })?;
644 vyre_driver::accounting::checked_mul_usize_lazy(entries, record_bytes, || {
645 PipelineError::IoUringSyscall {
646 syscall: "io_uring_setup",
647 errno: libc::EOVERFLOW,
648 fix: match label {
649 "SQE table" => "SQE table mmap size overflowed usize; reduce requested entries",
650 _ => "io_uring record mmap size overflowed usize; reduce requested entries",
651 },
652 }
653 })
654}
655
656fn kernel_offset_usize_or_panic(value: u32, label: &'static str) -> usize {
657 usize::try_from(value).unwrap_or_else(|source| {
658 panic!(
659 "io_uring {label} value {value} cannot fit usize: {source}. Fix: use a supported kernel/userspace ABI or reduce ring entries."
660 )
661 })
662}
663
664fn slice_len_u32(value: usize, label: &'static str) -> Result<u32, PipelineError> {
665 u32::try_from(value).map_err(|_| PipelineError::IoUringSyscall {
666 syscall: "io_uring_register",
667 errno: libc::EOVERFLOW,
668 fix: match label {
669 "registered buffer count" => {
670 "registered buffer count cannot fit u32; split fixed-buffer registration"
671 }
672 "registered file count" => {
673 "registered file count cannot fit u32; split fixed-file registration"
674 }
675 _ => "io_uring registration count cannot fit u32; split registration",
676 },
677 })
678}