1use std::{
2 ffi::CStr,
3 io, mem,
4 ops::{Deref, DerefMut},
5 os::unix::io::RawFd,
6 ptr,
7};
8
9use super::registrar::{UringFd, UringReadBuf, UringWriteBuf};
10
11use nix::sys::socket::{SockaddrLike, SockaddrStorage};
12pub use nix::{
13 fcntl::{FallocateFlags, OFlag, PosixFadviseAdvice},
14 poll::PollFlags,
15 sys::{
16 epoll::{EpollEvent, EpollOp},
17 mman::MmapAdvise,
18 socket::{MsgFlags, SockFlag},
19 stat::Mode,
20 },
21};
22
23use super::Personality;
24use crate::{sys::Statx, uring_sys};
25
26pub struct SQE<'a> {
31 sqe: &'a mut uring_sys::io_uring_sqe,
32}
33
34impl<'a> SQE<'a> {
35 pub(crate) fn new(sqe: &'a mut uring_sys::io_uring_sqe) -> SQE<'a> {
36 SQE { sqe }
37 }
38
39 #[inline]
41 pub fn user_data(&self) -> u64 {
42 self.sqe.user_data
43 }
44
45 pub unsafe fn set_user_data(&mut self, user_data: u64) {
66 self.sqe.user_data = user_data as _;
67 }
68
69 #[inline]
71 pub fn flags(&self) -> SubmissionFlags {
72 SubmissionFlags::from_bits_retain(self.sqe.flags as _)
73 }
74
75 pub fn overwrite_flags(&mut self, flags: SubmissionFlags) {
77 self.sqe.flags = flags.bits() as _;
78 }
79
80 #[inline]
82 pub(crate) fn set_fixed_file(&mut self) {
83 self.set_flags(SubmissionFlags::FIXED_FILE);
84 }
85
86 #[inline]
89 pub fn set_flags(&mut self, flags: SubmissionFlags) {
90 self.sqe.flags |= flags.bits();
91 }
92
93 #[inline]
95 pub fn set_personality(&mut self, personality: Personality) {
96 self.sqe.buf_index.buf_index.personality = personality.id;
97 }
98
99 #[inline]
104 pub unsafe fn prep_read(&mut self, fd: impl UringFd, buf: impl UringReadBuf, offset: u64) {
105 buf.prep_read(fd, self, offset);
106 }
107
108 #[inline]
110 pub unsafe fn prep_read_vectored(
111 &mut self,
112 fd: impl UringFd,
113 bufs: &mut [io::IoSliceMut<'_>],
114 offset: u64,
115 ) {
116 let len = bufs.len();
117 let addr = bufs.as_mut_ptr();
118 uring_sys::io_uring_prep_readv(self.sqe, fd.as_raw_fd(), addr as _, len as _, offset as _);
119 fd.update_sqe(self);
120 }
121
122 #[inline]
124 pub unsafe fn prep_read_fixed(
125 &mut self,
126 fd: impl UringFd,
127 buf: &mut [u8],
128 offset: u64,
129 buf_index: u32,
130 ) {
131 let len = buf.len();
132 let addr = buf.as_mut_ptr();
133 uring_sys::io_uring_prep_read_fixed(
134 self.sqe,
135 fd.as_raw_fd(),
136 addr as _,
137 len as _,
138 offset as _,
139 buf_index as _,
140 );
141 fd.update_sqe(self);
142 }
143
144 #[inline]
149 pub unsafe fn prep_write(&mut self, fd: impl UringFd, buf: impl UringWriteBuf, offset: u64) {
150 buf.prep_write(fd, self, offset)
151 }
152
153 #[inline]
155 pub unsafe fn prep_write_vectored(
156 &mut self,
157 fd: impl UringFd,
158 bufs: &[io::IoSlice<'_>],
159 offset: u64,
160 ) {
161 let len = bufs.len();
162 let addr = bufs.as_ptr();
163 uring_sys::io_uring_prep_writev(self.sqe, fd.as_raw_fd(), addr as _, len as _, offset as _);
164 fd.update_sqe(self);
165 }
166
167 #[inline]
170 pub unsafe fn prep_write_fixed(
171 &mut self,
172 fd: impl UringFd,
173 buf: &[u8],
174 offset: u64,
175 buf_index: usize,
176 ) {
177 let len = buf.len();
178 let addr = buf.as_ptr();
179 uring_sys::io_uring_prep_write_fixed(
180 self.sqe,
181 fd.as_raw_fd(),
182 addr as _,
183 len as _,
184 offset as _,
185 buf_index as _,
186 );
187 fd.update_sqe(self);
188 }
189
190 #[inline]
192 pub unsafe fn prep_fsync(&mut self, fd: impl UringFd, flags: FsyncFlags) {
193 uring_sys::io_uring_prep_fsync(self.sqe, fd.as_raw_fd(), flags.bits() as _);
194 fd.update_sqe(self);
195 }
196
197 #[inline]
199 pub unsafe fn prep_splice(
200 &mut self,
201 fd_in: RawFd,
202 off_in: i64,
203 fd_out: RawFd,
204 off_out: i64,
205 count: u32,
206 flags: SpliceFlags,
207 ) {
208 uring_sys::io_uring_prep_splice(
209 self.sqe,
210 fd_in,
211 off_in,
212 fd_out,
213 off_out,
214 count,
215 flags.bits(),
216 );
217 }
218
219 #[inline]
221 pub unsafe fn prep_recv(&mut self, fd: impl UringFd, buf: &mut [u8], flags: MsgFlags) {
222 let data = buf.as_mut_ptr() as *mut libc::c_void;
223 let len = buf.len();
224 uring_sys::io_uring_prep_recv(self.sqe, fd.as_raw_fd(), data, len, flags.bits());
225 fd.update_sqe(self);
226 }
227
228 #[inline]
230 pub unsafe fn prep_send(&mut self, fd: impl UringFd, buf: &[u8], flags: MsgFlags) {
231 let data = buf.as_ptr() as *const libc::c_void as *mut libc::c_void;
232 let len = buf.len();
233 uring_sys::io_uring_prep_send(self.sqe, fd.as_raw_fd(), data, len, flags.bits());
234 fd.update_sqe(self);
235 }
236
237 pub unsafe fn prep_recvmsg(
239 &mut self,
240 fd: impl UringFd,
241 msg: *mut libc::msghdr,
242 flags: MsgFlags,
243 ) {
244 uring_sys::io_uring_prep_recvmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _);
245 fd.update_sqe(self);
246 }
247
248 pub unsafe fn prep_sendmsg(
250 &mut self,
251 fd: impl UringFd,
252 msg: *mut libc::msghdr,
253 flags: MsgFlags,
254 ) {
255 uring_sys::io_uring_prep_sendmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _);
256 fd.update_sqe(self);
257 }
258
259 #[inline]
261 pub unsafe fn prep_fallocate(
262 &mut self,
263 fd: impl UringFd,
264 offset: u64,
265 size: u64,
266 flags: FallocateFlags,
267 ) {
268 uring_sys::io_uring_prep_fallocate(
269 self.sqe,
270 fd.as_raw_fd(),
271 flags.bits() as _,
272 offset as _,
273 size as _,
274 );
275 fd.update_sqe(self);
276 }
277
278 #[inline]
280 pub unsafe fn prep_statx(
281 &mut self,
282 dirfd: impl UringFd,
283 path: &CStr,
284 flags: StatxFlags,
285 mask: StatxMode,
286 buf: &mut Statx,
287 ) {
288 uring_sys::io_uring_prep_statx(
289 self.sqe,
290 dirfd.as_raw_fd(),
291 path.as_ptr() as _,
292 flags.bits() as _,
293 mask.bits() as _,
294 buf as _,
295 );
296 }
297
298 #[inline]
300 pub unsafe fn prep_openat(&mut self, fd: impl UringFd, path: &CStr, flags: OFlag, mode: Mode) {
301 uring_sys::io_uring_prep_openat(
302 self.sqe,
303 fd.as_raw_fd(),
304 path.as_ptr() as _,
305 flags.bits(),
306 mode.bits(),
307 );
308 }
309
310 #[inline]
314 pub unsafe fn prep_close(&mut self, fd: impl UringFd) {
315 uring_sys::io_uring_prep_close(self.sqe, fd.as_raw_fd());
316 }
317
318 #[inline]
320 pub unsafe fn prep_timeout(
321 &mut self,
322 ts: &uring_sys::__kernel_timespec,
323 events: u32,
324 flags: TimeoutFlags,
325 ) {
326 uring_sys::io_uring_prep_timeout(
327 self.sqe,
328 ts as *const _ as *mut _,
329 events as _,
330 flags.bits() as _,
331 );
332 }
333
334 #[inline]
335 pub unsafe fn prep_timeout_remove(&mut self, user_data: u64) {
336 uring_sys::io_uring_prep_timeout_remove(self.sqe, user_data as _, 0);
337 }
338
339 #[inline]
340 pub unsafe fn prep_link_timeout(&mut self, ts: &uring_sys::__kernel_timespec) {
341 uring_sys::io_uring_prep_link_timeout(self.sqe, ts as *const _ as *mut _, 0);
342 }
343
344 #[inline]
345 pub unsafe fn prep_poll_add(&mut self, fd: impl UringFd, poll_flags: PollFlags) {
346 uring_sys::io_uring_prep_poll_add(self.sqe, fd.as_raw_fd(), poll_flags.bits());
347 fd.update_sqe(self);
348 }
349
350 #[inline]
351 pub unsafe fn prep_poll_remove(&mut self, user_data: u64) {
352 uring_sys::io_uring_prep_poll_remove(self.sqe, user_data as _)
353 }
354
355 #[inline]
356 pub unsafe fn prep_connect(&mut self, fd: impl UringFd, socket_addr: &SockaddrStorage) {
357 let addr = socket_addr.as_ptr();
358 let len = socket_addr.len();
359 uring_sys::io_uring_prep_connect(self.sqe, fd.as_raw_fd(), addr as *mut _, len);
360 fd.update_sqe(self);
361 }
362
363 #[inline]
364 pub unsafe fn prep_accept(
365 &mut self,
366 fd: impl UringFd,
367 accept: Option<&mut SockAddrStorage>,
368 flags: SockFlag,
369 ) {
370 let (addr, len) = match accept {
371 Some(accept) => (
372 accept.storage.as_mut_ptr() as *mut _,
373 &mut accept.len as *mut _ as *mut _,
374 ),
375 None => (std::ptr::null_mut(), std::ptr::null_mut()),
376 };
377 uring_sys::io_uring_prep_accept(self.sqe, fd.as_raw_fd(), addr, len, flags.bits());
378 fd.update_sqe(self);
379 }
380
381 #[inline]
382 pub unsafe fn prep_fadvise(
383 &mut self,
384 fd: impl UringFd,
385 off: u64,
386 len: u64,
387 advice: PosixFadviseAdvice,
388 ) {
389 use PosixFadviseAdvice::*;
390 let advice = match advice {
391 POSIX_FADV_NORMAL => libc::POSIX_FADV_NORMAL,
392 POSIX_FADV_SEQUENTIAL => libc::POSIX_FADV_SEQUENTIAL,
393 POSIX_FADV_RANDOM => libc::POSIX_FADV_RANDOM,
394 POSIX_FADV_NOREUSE => libc::POSIX_FADV_NOREUSE,
395 POSIX_FADV_WILLNEED => libc::POSIX_FADV_WILLNEED,
396 POSIX_FADV_DONTNEED => libc::POSIX_FADV_DONTNEED,
397 _ => unreachable!(),
398 };
399 uring_sys::io_uring_prep_fadvise(self.sqe, fd.as_raw_fd(), off as _, len as _, advice);
400 fd.update_sqe(self);
401 }
402
403 #[inline]
404 pub unsafe fn prep_madvise(&mut self, data: &mut [u8], advice: MmapAdvise) {
405 use MmapAdvise::*;
406 let advice = match advice {
407 MADV_NORMAL => libc::MADV_NORMAL,
408 MADV_RANDOM => libc::MADV_RANDOM,
409 MADV_SEQUENTIAL => libc::MADV_SEQUENTIAL,
410 MADV_WILLNEED => libc::MADV_WILLNEED,
411 MADV_DONTNEED => libc::MADV_DONTNEED,
412 MADV_REMOVE => libc::MADV_REMOVE,
413 MADV_DONTFORK => libc::MADV_DONTFORK,
414 MADV_DOFORK => libc::MADV_DOFORK,
415 MADV_HWPOISON => libc::MADV_HWPOISON,
416 MADV_MERGEABLE => libc::MADV_MERGEABLE,
417 MADV_UNMERGEABLE => libc::MADV_UNMERGEABLE,
418 MADV_SOFT_OFFLINE => libc::MADV_SOFT_OFFLINE,
419 MADV_HUGEPAGE => libc::MADV_HUGEPAGE,
420 MADV_NOHUGEPAGE => libc::MADV_NOHUGEPAGE,
421 MADV_DONTDUMP => libc::MADV_DONTDUMP,
422 MADV_DODUMP => libc::MADV_DODUMP,
423 MADV_FREE => libc::MADV_FREE,
424 _ => unreachable!(),
425 };
426 uring_sys::io_uring_prep_madvise(
427 self.sqe,
428 data.as_mut_ptr() as *mut _,
429 data.len() as _,
430 advice,
431 );
432 }
433
434 #[inline]
435 pub unsafe fn prep_epoll_ctl(
436 &mut self,
437 epoll_fd: RawFd,
438 op: EpollOp,
439 fd: RawFd,
440 event: Option<&mut EpollEvent>,
441 ) {
442 let op = match op {
443 EpollOp::EpollCtlAdd => libc::EPOLL_CTL_ADD,
444 EpollOp::EpollCtlDel => libc::EPOLL_CTL_DEL,
445 EpollOp::EpollCtlMod => libc::EPOLL_CTL_MOD,
446 _ => unreachable!(),
447 };
448 let event = event.map_or(ptr::null_mut(), |event| event as *mut EpollEvent as *mut _);
449 uring_sys::io_uring_prep_epoll_ctl(self.sqe, epoll_fd, fd, op, event);
450 }
451
452 #[inline]
453 pub unsafe fn prep_files_update(&mut self, files: &[RawFd], offset: u32) {
454 let addr = files.as_ptr() as *mut RawFd;
455 let len = files.len() as u32;
456 uring_sys::io_uring_prep_files_update(self.sqe, addr, len, offset as _);
457 }
458
459 pub unsafe fn prep_provide_buffers(
460 &mut self,
461 buffers: &mut [u8],
462 count: u32,
463 group: BufferGroupId,
464 index: u32,
465 ) {
466 let addr = buffers.as_mut_ptr() as *mut libc::c_void;
467 let len = buffers.len() as u32 / count;
468 uring_sys::io_uring_prep_provide_buffers(
469 self.sqe,
470 addr,
471 len as _,
472 count as _,
473 group.id as _,
474 index as _,
475 );
476 }
477
478 pub unsafe fn prep_remove_buffers(&mut self, count: u32, id: BufferGroupId) {
479 uring_sys::io_uring_prep_remove_buffers(self.sqe, count as _, id.id as _);
480 }
481
482 #[inline]
483 pub unsafe fn prep_cancel(&mut self, user_data: u64, flags: i32) {
484 uring_sys::io_uring_prep_cancel(self.sqe, user_data as _, flags);
485 }
486
487 #[inline]
489 pub unsafe fn prep_nop(&mut self) {
490 uring_sys::io_uring_prep_nop(self.sqe);
491 }
492
493 pub fn clear(&mut self) {
495 *self.sqe = unsafe { mem::zeroed() };
496 }
497
498 pub fn raw(&self) -> &uring_sys::io_uring_sqe {
503 self.sqe
504 }
505
506 pub unsafe fn raw_mut(&mut self) -> &mut uring_sys::io_uring_sqe {
507 self.sqe
508 }
509}
510
511unsafe impl<'a> Send for SQE<'a> {}
512unsafe impl<'a> Sync for SQE<'a> {}
513
514#[derive(Debug)]
515pub struct SockAddrStorage {
516 storage: mem::MaybeUninit<nix::sys::socket::sockaddr_storage>,
517 len: usize,
518}
519
520impl SockAddrStorage {
521 pub fn uninit() -> Self {
522 let storage = mem::MaybeUninit::uninit();
523 let len = mem::size_of::<nix::sys::socket::sockaddr_storage>();
524 SockAddrStorage { storage, len }
525 }
526}
527
528#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
529pub struct BufferGroupId {
530 pub id: u32,
531}
532
533bitflags::bitflags! {
534 #[derive(Debug, Clone, Copy)]
536 pub struct SubmissionFlags: u8 {
537 const FIXED_FILE = 1 << 0; const IO_DRAIN = 1 << 1; const IO_LINK = 1 << 2; const IO_HARDLINK = 1 << 3;
548 const ASYNC = 1 << 4;
549 const BUFFER_SELECT = 1 << 5;
550 }
551}
552
553bitflags::bitflags! {
554 pub struct FsyncFlags: u32 {
555 const FSYNC_DATASYNC = 1 << 0;
557 }
558}
559
560bitflags::bitflags! {
561 pub struct StatxFlags: i32 {
562 const AT_STATX_SYNC_AS_STAT = 0;
563 const AT_SYMLINK_NOFOLLOW = 1 << 10;
564 const AT_NO_AUTOMOUNT = 1 << 11;
565 const AT_EMPTY_PATH = 1 << 12;
566 const AT_STATX_FORCE_SYNC = 1 << 13;
567 const AT_STATX_DONT_SYNC = 1 << 14;
568 }
569}
570
571bitflags::bitflags! {
572 pub struct StatxMode: i32 {
573 const STATX_TYPE = 1 << 0;
574 const STATX_MODE = 1 << 1;
575 const STATX_NLINK = 1 << 2;
576 const STATX_UID = 1 << 3;
577 const STATX_GID = 1 << 4;
578 const STATX_ATIME = 1 << 5;
579 const STATX_MTIME = 1 << 6;
580 const STATX_CTIME = 1 << 7;
581 const STATX_INO = 1 << 8;
582 const STATX_SIZE = 1 << 9;
583 const STATX_BLOCKS = 1 << 10;
584 const STATX_BTIME = 1 << 11;
585 }
586}
587
588bitflags::bitflags! {
589 pub struct TimeoutFlags: u32 {
590 const TIMEOUT_ABS = 1 << 0;
591 }
592}
593
594bitflags::bitflags! {
595 pub struct SpliceFlags: u32 {
596 const F_FD_IN_FIXED = 1 << 31;
597 }
598}
599
600pub struct SQEs<'ring> {
602 sq: &'ring mut uring_sys::io_uring,
603 count: u32,
604 consumed: u32,
605}
606
607impl<'ring> SQEs<'ring> {
608 pub(crate) fn new(sq: &'ring mut uring_sys::io_uring, count: u32) -> SQEs<'ring> {
609 SQEs {
610 sq,
611 count,
612 consumed: 0,
613 }
614 }
615
616 pub fn hard_linked(&mut self) -> HardLinked<'ring, '_> {
622 HardLinked { sqes: self }
623 }
624
625 pub fn soft_linked(&mut self) -> SoftLinked<'ring, '_> {
631 SoftLinked { sqes: self }
632 }
633
634 pub fn remaining(&self) -> u32 {
636 self.count - self.consumed
637 }
638
639 fn consume(&mut self) -> Option<SQE<'ring>> {
640 if self.consumed < self.count {
641 unsafe {
642 let sqe = uring_sys::io_uring_get_sqe(self.sq);
643 uring_sys::io_uring_prep_nop(sqe);
644 self.consumed += 1;
645 Some(SQE { sqe: &mut *sqe })
646 }
647 } else {
648 None
649 }
650 }
651}
652
653impl<'ring> Iterator for SQEs<'ring> {
654 type Item = SQE<'ring>;
655
656 fn next(&mut self) -> Option<SQE<'ring>> {
657 self.consume()
658 }
659}
660
661pub struct HardLinked<'ring, 'a> {
663 sqes: &'a mut SQEs<'ring>,
664}
665
666impl<'ring> HardLinked<'ring, '_> {
667 pub fn terminate(self) -> Option<SQE<'ring>> {
668 self.sqes.consume()
669 }
670}
671
672impl<'ring> Iterator for HardLinked<'ring, '_> {
673 type Item = HardLinkedSQE<'ring>;
674
675 fn next(&mut self) -> Option<Self::Item> {
676 let is_final = self.sqes.remaining() == 1;
677 self.sqes
678 .consume()
679 .map(|sqe| HardLinkedSQE { sqe, is_final })
680 }
681}
682
683pub struct HardLinkedSQE<'ring> {
684 sqe: SQE<'ring>,
685 is_final: bool,
686}
687
688impl<'ring> Deref for HardLinkedSQE<'ring> {
689 type Target = SQE<'ring>;
690
691 fn deref(&self) -> &SQE<'ring> {
692 &self.sqe
693 }
694}
695
696impl<'ring> DerefMut for HardLinkedSQE<'ring> {
697 fn deref_mut(&mut self) -> &mut SQE<'ring> {
698 &mut self.sqe
699 }
700}
701
702impl<'ring> Drop for HardLinkedSQE<'ring> {
703 fn drop(&mut self) {
704 if !self.is_final {
705 self.sqe.set_flags(SubmissionFlags::IO_HARDLINK);
706 }
707 }
708}
709
710pub struct SoftLinked<'ring, 'a> {
712 sqes: &'a mut SQEs<'ring>,
713}
714
715impl<'ring> SoftLinked<'ring, '_> {
716 pub fn terminate(self) -> Option<SQE<'ring>> {
717 self.sqes.consume()
718 }
719}
720
721impl<'ring> Iterator for SoftLinked<'ring, '_> {
722 type Item = SoftLinkedSQE<'ring>;
723
724 fn next(&mut self) -> Option<Self::Item> {
725 let is_final = self.sqes.remaining() == 1;
726 self.sqes
727 .consume()
728 .map(|sqe| SoftLinkedSQE { sqe, is_final })
729 }
730}
731
732pub struct SoftLinkedSQE<'ring> {
733 sqe: SQE<'ring>,
734 is_final: bool,
735}
736
737impl<'ring> Deref for SoftLinkedSQE<'ring> {
738 type Target = SQE<'ring>;
739
740 fn deref(&self) -> &SQE<'ring> {
741 &self.sqe
742 }
743}
744
745impl<'ring> DerefMut for SoftLinkedSQE<'ring> {
746 fn deref_mut(&mut self) -> &mut SQE<'ring> {
747 &mut self.sqe
748 }
749}
750
751impl<'ring> Drop for SoftLinkedSQE<'ring> {
752 fn drop(&mut self) {
753 if !self.is_final {
754 self.sqe.set_flags(SubmissionFlags::IO_LINK);
755 }
756 }
757}