1use std::io;
2use std::mem;
3use std::ffi::CStr;
4use std::ops::{Deref, DerefMut};
5use std::os::unix::io::RawFd;
6use std::ptr;
7use std::slice;
8
9use crate::registrar::{UringFd, UringReadBuf, UringWriteBuf};
10
11pub use nix::fcntl::{OFlag, FallocateFlags, PosixFadviseAdvice};
12pub use nix::poll::PollFlags;
13pub use nix::sys::epoll::{EpollOp, EpollEvent};
14pub use nix::sys::mman::MmapAdvise;
15pub use nix::sys::stat::Mode;
16pub use nix::sys::socket::{SockAddr, SockFlag, MsgFlags};
17
18use crate::Personality;
19
20pub struct SQE<'a> {
25 sqe: &'a mut uring_sys::io_uring_sqe,
26}
27
28impl<'a> SQE<'a> {
29 pub(crate) fn new(sqe: &'a mut uring_sys::io_uring_sqe) -> SQE<'a> {
30 SQE { sqe }
31 }
32
33 #[inline]
35 pub fn user_data(&self) -> u64 {
36 self.sqe.user_data as u64
37 }
38
39 pub unsafe fn set_user_data(&mut self, user_data: u64) {
74 self.sqe.user_data = user_data as _;
75 }
76
77 #[inline]
79 pub fn flags(&self) -> SubmissionFlags {
80 unsafe { SubmissionFlags::from_bits_unchecked(self.sqe.flags as _) }
81 }
82
83 pub fn overwrite_flags(&mut self, flags: SubmissionFlags) {
85 self.sqe.flags = flags.bits() as _;
86 }
87
88 #[inline]
90 pub(crate) fn set_fixed_file(&mut self) {
91 self.set_flags(SubmissionFlags::FIXED_FILE);
92 }
93
94 #[inline]
96 pub fn set_flags(&mut self, flags: SubmissionFlags) {
97 self.sqe.flags |= flags.bits();
98 }
99
100 #[inline]
102 pub fn set_personality(&mut self, personality: Personality) {
103 self.sqe.buf_index.buf_index.personality = personality.id;
104 }
105
106 #[inline]
111 pub unsafe fn prep_read(
112 &mut self,
113 fd: impl UringFd,
114 buf: impl UringReadBuf,
115 offset: u64,
116 ) {
117 buf.prep_read(fd, self, offset);
118 }
119
120 #[inline]
122 pub unsafe fn prep_read_vectored(
123 &mut self,
124 fd: impl UringFd,
125 bufs: &mut [io::IoSliceMut<'_>],
126 offset: u64,
127 ) {
128 let len = bufs.len();
129 let addr = bufs.as_mut_ptr();
130 uring_sys::io_uring_prep_readv(self.sqe, fd.as_raw_fd(), addr as _, len as _, offset as _);
131 fd.update_sqe(self);
132 }
133
134 #[inline]
136 pub unsafe fn prep_read_fixed(
137 &mut self,
138 fd: impl UringFd,
139 buf: &mut [u8],
140 offset: u64,
141 buf_index: u32,
142 ) {
143 let len = buf.len();
144 let addr = buf.as_mut_ptr();
145 uring_sys::io_uring_prep_read_fixed(self.sqe,
146 fd.as_raw_fd(),
147 addr as _,
148 len as _,
149 offset as _,
150 buf_index as _);
151 fd.update_sqe(self);
152 }
153
154 #[inline]
159 pub unsafe fn prep_write(
160 &mut self,
161 fd: impl UringFd,
162 buf: impl UringWriteBuf,
163 offset: u64,
164 ) {
165 buf.prep_write(fd, self, offset)
166 }
167
168 #[inline]
170 pub unsafe fn prep_write_vectored(
171 &mut self,
172 fd: impl UringFd,
173 bufs: &[io::IoSlice<'_>],
174 offset: u64,
175 ) {
176 let len = bufs.len();
177 let addr = bufs.as_ptr();
178 uring_sys::io_uring_prep_writev(self.sqe,
179 fd.as_raw_fd(),
180 addr as _,
181 len as _,
182 offset as _);
183 fd.update_sqe(self);
184 }
185
186 #[inline]
188 pub unsafe fn prep_write_fixed(
189 &mut self,
190 fd: impl UringFd,
191 buf: &[u8],
192 offset: u64,
193 buf_index: usize,
194 ) {
195 let len = buf.len();
196 let addr = buf.as_ptr();
197 uring_sys::io_uring_prep_write_fixed(self.sqe,
198 fd.as_raw_fd(),
199 addr as _,
200 len as _,
201 offset as _,
202 buf_index as _);
203 fd.update_sqe(self);
204 }
205
206 #[inline]
208 pub unsafe fn prep_fsync(&mut self, fd: impl UringFd, flags: FsyncFlags) {
209 uring_sys::io_uring_prep_fsync(self.sqe, fd.as_raw_fd(), flags.bits() as _);
210 fd.update_sqe(self);
211 }
212
213 #[inline]
215 pub unsafe fn prep_splice(
216 &mut self,
217 fd_in: RawFd,
218 off_in: i64,
219 fd_out: RawFd,
220 off_out: i64,
221 count: u32,
222 flags: SpliceFlags,
223 ) {
224 uring_sys::io_uring_prep_splice(self.sqe, fd_in, off_in, fd_out, off_out, count, flags.bits());
225 }
226
227 #[inline]
229 pub unsafe fn prep_recv(&mut self, fd: impl UringFd, buf: &mut [u8], flags: MsgFlags) {
230 let data = buf.as_mut_ptr() as *mut libc::c_void;
231 let len = buf.len();
232 uring_sys::io_uring_prep_send(self.sqe, fd.as_raw_fd(), data, len, flags.bits());
233 fd.update_sqe(self);
234 }
235
236 #[inline]
238 pub unsafe fn prep_send(&mut self, fd: impl UringFd, buf: &[u8], flags: MsgFlags) {
239 let data = buf.as_ptr() as *const libc::c_void as *mut libc::c_void;
240 let len = buf.len();
241 uring_sys::io_uring_prep_send(self.sqe, fd.as_raw_fd(), data, len, flags.bits());
242 fd.update_sqe(self);
243 }
244
245 pub unsafe fn prep_recvmsg(&mut self, fd: impl UringFd, msg: *mut libc::msghdr, flags: MsgFlags) {
247 uring_sys::io_uring_prep_recvmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _);
248 fd.update_sqe(self);
249 }
250
251 pub unsafe fn prep_sendmsg(&mut self, fd: impl UringFd, msg: *mut libc::msghdr, flags: MsgFlags) {
253 uring_sys::io_uring_prep_sendmsg(self.sqe, fd.as_raw_fd(), msg, flags.bits() as _);
254 fd.update_sqe(self);
255 }
256
257 #[inline]
259 pub unsafe fn prep_fallocate(&mut self, fd: impl UringFd,
260 offset: u64, size: u64,
261 flags: FallocateFlags) {
262 uring_sys::io_uring_prep_fallocate(self.sqe, fd.as_raw_fd(),
263 flags.bits() as _,
264 offset as _,
265 size as _);
266 fd.update_sqe(self);
267 }
268
269 #[inline]
271 pub unsafe fn prep_statx(
272 &mut self,
273 dirfd: impl UringFd,
274 path: &CStr,
275 flags: StatxFlags,
276 mask: StatxMode,
277 buf: &mut libc::statx,
278 ) {
279 uring_sys::io_uring_prep_statx(self.sqe, dirfd.as_raw_fd(), path.as_ptr() as _,
280 flags.bits() as _, mask.bits() as _,
281 buf as _);
282 }
283
284 #[inline]
286 pub unsafe fn prep_openat(
287 &mut self,
288 fd: impl UringFd,
289 path: &CStr,
290 flags: OFlag,
291 mode: Mode,
292 ) {
293 uring_sys::io_uring_prep_openat(self.sqe, fd.as_raw_fd(), path.as_ptr() as _, flags.bits(), mode.bits());
294 }
295
296 #[inline]
300 pub unsafe fn prep_close(&mut self, fd: impl UringFd) {
301 uring_sys::io_uring_prep_close(self.sqe, fd.as_raw_fd());
302 }
303
304
305 #[inline]
327 pub unsafe fn prep_timeout(&mut self, ts: &uring_sys::__kernel_timespec, events: u32, flags: TimeoutFlags) {
328 uring_sys::io_uring_prep_timeout(self.sqe,
329 ts as *const _ as *mut _,
330 events as _,
331 flags.bits() as _);
332 }
333
334 #[inline]
335 pub unsafe fn prep_timeout_remove(&mut self, user_data: u64) {
336 uring_sys::io_uring_prep_timeout_remove(self.sqe, user_data as _, 0);
337 }
338
339 #[inline]
340 pub unsafe fn prep_link_timeout(&mut self, ts: &uring_sys::__kernel_timespec) {
341 uring_sys::io_uring_prep_link_timeout(self.sqe, ts as *const _ as *mut _, 0);
342 }
343
344 #[inline]
345 pub unsafe fn prep_poll_add(&mut self, fd: impl UringFd, poll_flags: PollFlags) {
346 uring_sys::io_uring_prep_poll_add(self.sqe, fd.as_raw_fd(), poll_flags.bits());
347 fd.update_sqe(self);
348 }
349
350 #[inline]
351 pub unsafe fn prep_poll_remove(&mut self, user_data: u64) {
352 uring_sys::io_uring_prep_poll_remove(self.sqe, user_data as _)
353 }
354
355 #[inline]
356 pub unsafe fn prep_connect(&mut self, fd: impl UringFd, socket_addr: &SockAddr) {
357 let (addr, len) = socket_addr.as_ffi_pair();
358 uring_sys::io_uring_prep_connect(self.sqe, fd.as_raw_fd(), addr as *const _ as *mut _, len);
359 fd.update_sqe(self);
360 }
361
362 #[inline]
363 pub unsafe fn prep_accept(&mut self, fd: impl UringFd, accept: Option<&mut SockAddrStorage>, flags: SockFlag) {
364 let (addr, len) = match accept {
365 Some(accept) => (accept.storage.as_mut_ptr() as *mut _, &mut accept.len as *mut _ as *mut _),
366 None => (std::ptr::null_mut(), std::ptr::null_mut())
367 };
368 uring_sys::io_uring_prep_accept(self.sqe, fd.as_raw_fd(), addr, len, flags.bits());
369 fd.update_sqe(self);
370 }
371
372 #[inline]
373 pub unsafe fn prep_fadvise(&mut self, fd: impl UringFd, off: u64, len: u64, advice: PosixFadviseAdvice) {
374 use PosixFadviseAdvice::*;
375 let advice = match advice {
376 POSIX_FADV_NORMAL => libc::POSIX_FADV_NORMAL,
377 POSIX_FADV_SEQUENTIAL => libc::POSIX_FADV_SEQUENTIAL,
378 POSIX_FADV_RANDOM => libc::POSIX_FADV_RANDOM,
379 POSIX_FADV_NOREUSE => libc::POSIX_FADV_NOREUSE,
380 POSIX_FADV_WILLNEED => libc::POSIX_FADV_WILLNEED,
381 POSIX_FADV_DONTNEED => libc::POSIX_FADV_DONTNEED,
382 };
383 uring_sys::io_uring_prep_fadvise(self.sqe, fd.as_raw_fd(), off as _, len as _, advice);
384 fd.update_sqe(self);
385 }
386
387 #[inline]
388 pub unsafe fn prep_madvise(&mut self, data: &mut [u8], advice: MmapAdvise) {
389 use MmapAdvise::*;
390 let advice = match advice {
391 MADV_NORMAL => libc::MADV_NORMAL,
392 MADV_RANDOM => libc::MADV_RANDOM,
393 MADV_SEQUENTIAL => libc::MADV_SEQUENTIAL,
394 MADV_WILLNEED => libc::MADV_WILLNEED,
395 MADV_DONTNEED => libc::MADV_DONTNEED,
396 MADV_REMOVE => libc::MADV_REMOVE,
397 MADV_DONTFORK => libc::MADV_DONTFORK,
398 MADV_DOFORK => libc::MADV_DOFORK,
399 MADV_HWPOISON => libc::MADV_HWPOISON,
400 MADV_MERGEABLE => libc::MADV_MERGEABLE,
401 MADV_UNMERGEABLE => libc::MADV_UNMERGEABLE,
402 MADV_SOFT_OFFLINE => libc::MADV_SOFT_OFFLINE,
403 MADV_HUGEPAGE => libc::MADV_HUGEPAGE,
404 MADV_NOHUGEPAGE => libc::MADV_NOHUGEPAGE,
405 MADV_DONTDUMP => libc::MADV_DONTDUMP,
406 MADV_DODUMP => libc::MADV_DODUMP,
407 MADV_FREE => libc::MADV_FREE,
408 };
409 uring_sys::io_uring_prep_madvise(self.sqe, data.as_mut_ptr() as *mut _, data.len() as _, advice);
410 }
411
412 #[inline]
413 pub unsafe fn prep_epoll_ctl(&mut self, epoll_fd: RawFd, op: EpollOp, fd: RawFd, event: Option<&mut EpollEvent>) {
414 let op = match op {
415 EpollOp::EpollCtlAdd => libc::EPOLL_CTL_ADD,
416 EpollOp::EpollCtlDel => libc::EPOLL_CTL_DEL,
417 EpollOp::EpollCtlMod => libc::EPOLL_CTL_MOD,
418 };
419 let event = event.map_or(ptr::null_mut(), |event| event as *mut EpollEvent as *mut _);
420 uring_sys::io_uring_prep_epoll_ctl(self.sqe, epoll_fd, fd, op, event);
421 }
422
423 #[inline]
424 pub unsafe fn prep_files_update(&mut self, files: &[RawFd], offset: u32) {
425 let addr = files.as_ptr() as *mut RawFd;
426 let len = files.len() as u32;
427 uring_sys::io_uring_prep_files_update(self.sqe, addr, len, offset as _);
428 }
429
430 pub unsafe fn prep_provide_buffers(&mut self,
431 buffers: &mut [u8],
432 count: u32,
433 group: BufferGroupId,
434 index: u32,
435 ) {
436 let addr = buffers.as_mut_ptr() as *mut libc::c_void;
437 let len = buffers.len() as u32 / count;
438 uring_sys::io_uring_prep_provide_buffers(self.sqe, addr, len as _, count as _, group.id as _, index as _);
439 }
440
441 pub unsafe fn prep_remove_buffers(&mut self, count: u32, id: BufferGroupId) {
442 uring_sys::io_uring_prep_remove_buffers(self.sqe, count as _, id.id as _);
443 }
444
445 #[inline]
446 pub unsafe fn prep_cancel(&mut self, user_data: u64, flags: i32) {
447 uring_sys::io_uring_prep_cancel(self.sqe, user_data as _, flags);
448 }
449
450
451 #[inline]
469 pub unsafe fn prep_nop(&mut self) {
470 uring_sys::io_uring_prep_nop(self.sqe);
471 }
472
473 pub fn clear(&mut self) {
492 *self.sqe = unsafe { mem::zeroed() };
493 }
494
495 pub fn raw(&self) -> &uring_sys::io_uring_sqe {
515 &self.sqe
516 }
517
518 pub unsafe fn raw_mut(&mut self) -> &mut uring_sys::io_uring_sqe {
519 &mut self.sqe
520 }
521}
522
523unsafe impl<'a> Send for SQE<'a> { }
524unsafe impl<'a> Sync for SQE<'a> { }
525
526pub struct SockAddrStorage {
527 storage: mem::MaybeUninit<nix::sys::socket::sockaddr_storage>,
528 len: usize,
529}
530
531impl SockAddrStorage {
532 pub fn uninit() -> Self {
533 let storage = mem::MaybeUninit::uninit();
534 let len = mem::size_of::<nix::sys::socket::sockaddr_storage>();
535 SockAddrStorage {
536 storage,
537 len
538 }
539 }
540
541 pub unsafe fn as_socket_addr(&self) -> io::Result<SockAddr> {
542 let storage = &*self.storage.as_ptr();
543 nix::sys::socket::sockaddr_storage_to_addr(storage, self.len).map_err(|e| {
544 let err_no = e.as_errno();
545 match err_no {
546 Some(err_no) => io::Error::from_raw_os_error(err_no as _),
547 None => io::Error::new(io::ErrorKind::Other, "Unknown error")
548 }
549 })
550 }
551}
552
553#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
554pub struct BufferGroupId {
555 pub id: u32,
556}
557
558bitflags::bitflags! {
559 pub struct SubmissionFlags: u8 {
561 const FIXED_FILE = 1 << 0; const IO_DRAIN = 1 << 1; const IO_LINK = 1 << 2; const IO_HARDLINK = 1 << 3;
572 const ASYNC = 1 << 4;
573 const BUFFER_SELECT = 1 << 5;
574 }
575}
576
577bitflags::bitflags! {
578 pub struct FsyncFlags: u32 {
579 const FSYNC_DATASYNC = 1 << 0;
581 }
582}
583
584bitflags::bitflags! {
585 pub struct StatxFlags: i32 {
586 const AT_STATX_SYNC_AS_STAT = 0;
587 const AT_SYMLINK_NOFOLLOW = 1 << 10;
588 const AT_NO_AUTOMOUNT = 1 << 11;
589 const AT_EMPTY_PATH = 1 << 12;
590 const AT_STATX_FORCE_SYNC = 1 << 13;
591 const AT_STATX_DONT_SYNC = 1 << 14;
592 }
593}
594
595bitflags::bitflags! {
596 pub struct StatxMode: i32 {
597 const STATX_TYPE = 1 << 0;
598 const STATX_MODE = 1 << 1;
599 const STATX_NLINK = 1 << 2;
600 const STATX_UID = 1 << 3;
601 const STATX_GID = 1 << 4;
602 const STATX_ATIME = 1 << 5;
603 const STATX_MTIME = 1 << 6;
604 const STATX_CTIME = 1 << 7;
605 const STATX_INO = 1 << 8;
606 const STATX_SIZE = 1 << 9;
607 const STATX_BLOCKS = 1 << 10;
608 const STATX_BTIME = 1 << 11;
609 }
610}
611
612bitflags::bitflags! {
613 pub struct TimeoutFlags: u32 {
614 const TIMEOUT_ABS = 1 << 0;
615 }
616}
617
618bitflags::bitflags! {
619 pub struct SpliceFlags: u32 {
620 const F_FD_IN_FIXED = 1 << 31;
621 }
622}
623
624pub struct SQEs<'ring> {
626 sqes: slice::IterMut<'ring, uring_sys::io_uring_sqe>,
627}
628
629impl<'ring> SQEs<'ring> {
630 pub(crate) fn new(slice: &'ring mut [uring_sys::io_uring_sqe]) -> SQEs<'ring> {
631 SQEs {
632 sqes: slice.iter_mut(),
633 }
634 }
635
636 pub fn single(&mut self) -> Option<SQE<'ring>> {
639 let mut next = None;
640 while let Some(sqe) = self.consume() { next = Some(sqe) }
641 next
642 }
643
644 pub fn hard_linked(&mut self) -> HardLinked<'ring, '_> {
649 HardLinked { sqes: self }
650 }
651
652 pub fn soft_linked(&mut self) -> SoftLinked<'ring, '_> {
657 SoftLinked { sqes: self }
658 }
659
660 pub fn remaining(&self) -> u32 {
662 self.sqes.len() as u32
663 }
664
665 fn consume(&mut self) -> Option<SQE<'ring>> {
666 self.sqes.next().map(|sqe| {
667 unsafe { uring_sys::io_uring_prep_nop(sqe) }
668 SQE { sqe }
669 })
670 }
671}
672
673impl<'ring> Iterator for SQEs<'ring> {
674 type Item = SQE<'ring>;
675
676 fn next(&mut self) -> Option<SQE<'ring>> {
677 self.consume()
678 }
679}
680
681pub struct HardLinked<'ring, 'a> {
683 sqes: &'a mut SQEs<'ring>,
684}
685
686impl<'ring> HardLinked<'ring, '_> {
687 pub fn terminate(self) -> Option<SQE<'ring>> {
688 self.sqes.consume()
689 }
690}
691
692impl<'ring> Iterator for HardLinked<'ring, '_> {
693 type Item = HardLinkedSQE<'ring>;
694
695 fn next(&mut self) -> Option<Self::Item> {
696 let is_final = self.sqes.remaining() == 1;
697 self.sqes.consume().map(|sqe| HardLinkedSQE { sqe, is_final })
698 }
699}
700
701pub struct HardLinkedSQE<'ring> {
702 sqe: SQE<'ring>,
703 is_final: bool,
704}
705
706impl<'ring> Deref for HardLinkedSQE<'ring> {
707 type Target = SQE<'ring>;
708
709 fn deref(&self) -> &SQE<'ring> {
710 &self.sqe
711 }
712}
713
714impl<'ring> DerefMut for HardLinkedSQE<'ring> {
715 fn deref_mut(&mut self) -> &mut SQE<'ring> {
716 &mut self.sqe
717 }
718}
719
720impl<'ring> Drop for HardLinkedSQE<'ring> {
721 fn drop(&mut self) {
722 if !self.is_final {
723 self.sqe.set_flags(SubmissionFlags::IO_HARDLINK);
724 }
725 }
726}
727
728pub struct SoftLinked<'ring, 'a> {
730 sqes: &'a mut SQEs<'ring>,
731}
732
733impl<'ring> SoftLinked<'ring, '_> {
734 pub fn terminate(self) -> Option<SQE<'ring>> {
735 self.sqes.consume()
736 }
737}
738
739impl<'ring> Iterator for SoftLinked<'ring, '_> {
740 type Item = SoftLinkedSQE<'ring>;
741
742 fn next(&mut self) -> Option<Self::Item> {
743 let is_final = self.sqes.remaining() == 1;
744 self.sqes.consume().map(|sqe| SoftLinkedSQE { sqe, is_final })
745 }
746}
747
748pub struct SoftLinkedSQE<'ring> {
749 sqe: SQE<'ring>,
750 is_final: bool,
751}
752
753impl<'ring> Deref for SoftLinkedSQE<'ring> {
754 type Target = SQE<'ring>;
755
756 fn deref(&self) -> &SQE<'ring> {
757 &self.sqe
758 }
759}
760
761impl<'ring> DerefMut for SoftLinkedSQE<'ring> {
762 fn deref_mut(&mut self) -> &mut SQE<'ring> {
763 &mut self.sqe
764 }
765}
766
767impl<'ring> Drop for SoftLinkedSQE<'ring> {
768 fn drop(&mut self) {
769 if !self.is_final {
770 self.sqe.set_flags(SubmissionFlags::IO_LINK);
771 }
772 }
773}