uring_file/
uring.rs

1//! Core io_uring wrapper providing async file operations.
2//!
3//! This module implements a thread-safe, async-compatible wrapper around Linux's io_uring interface. It uses a dedicated submission thread and completion thread to manage the ring, allowing multiple async tasks to share a single ring instance.
4//!
5//! # Architecture
6//!
7//! ```text
8//! ┌─────────────┐     ┌───────────────────┐     ┌────────────────┐
9//! │ Async Tasks │────▶│ Submission Thread │────▶│   io_uring     │
10//! │  (callers)  │     │  (batches SQEs)   │     │ (kernel space) │
11//! └─────────────┘     └───────────────────┘     └───────┬────────┘
12//!        ▲                                              │
13//!        │            ┌───────────────────┐             │
14//!        └────────────│ Completion Thread │◀────────────┘
15//!                     │   (polls CQEs)    │
16//!                     └───────────────────┘
17//! ```
18//!
19//! # Thread Safety
20//!
21//! The `Uring` struct is `Clone + Send + Sync`. Cloning is cheap (just clones the internal channel sender). All operations are thread-safe.
22
23use crate::metadata::Metadata;
24use dashmap::DashMap;
25use io_uring::IoUring;
26use io_uring::cqueue::Entry as CEntry;
27use io_uring::opcode;
28use io_uring::squeue::Entry as SEntry;
29use io_uring::types;
30use std::cmp::min;
31use std::collections::VecDeque;
32use std::ffi::CString;
33use std::io;
34use std::mem::MaybeUninit;
35use std::os::fd::AsRawFd;
36use std::os::fd::FromRawFd;
37use std::os::fd::IntoRawFd;
38use std::os::fd::OwnedFd;
39use std::os::fd::RawFd;
40use std::os::unix::ffi::OsStrExt;
41use std::path::Path;
42use std::sync::Arc;
43use std::sync::atomic::AtomicU32;
44use std::sync::atomic::Ordering;
45use std::thread;
46use tokio::sync::oneshot;
47
48// ============================================================================
49// Constants
50// ============================================================================
51
52/// Maximum length for a single io_uring read/write operation.
53///
54/// io_uring uses i32 for return values, limiting single operations to ~2GB. The actual limit is 4096 bytes less than 2GB for unknown reasons.
55pub const URING_LEN_MAX: u64 = 2 * 1024 * 1024 * 1024 - 4096;
56
57/// Maximum number of files that can be registered with a single Uring instance.
58const MAX_REGISTERED_FILES: u32 = 4096;
59
60// ============================================================================
61// Path Conversion
62// ============================================================================
63
64/// Convert a path to a CString for use with io_uring operations.
65fn path_to_cstring(path: &Path) -> io::Result<CString> {
66  CString::new(path.as_os_str().as_bytes()).map_err(|e| {
67    io::Error::new(
68      io::ErrorKind::InvalidInput,
69      format!("path contains null byte at position {}", e.nul_position()),
70    )
71  })
72}
73
74// ============================================================================
75// Buffer Traits
76// ============================================================================
77
78// We define custom buffer traits rather than using `Vec<u8>` or `Box<[u8]>` directly because:
79//
80// 1. **Aligned allocations**: O_DIRECT requires sector-aligned buffers (typically 512 or 4096 bytes).
81//    `Vec<u8>` only guarantees pointer alignment, not allocation alignment. Custom allocators
82//    can provide properly aligned buffers that implement these traits.
83//
84// 2. **Buffer pools**: High-performance applications reuse buffers to avoid allocation overhead.
85//    Pool-managed buffers can implement these traits directly without conversion.
86//
87// 3. **Specialized memory**: GPU memory, mmap'd regions, or other exotic buffer types can
88//    participate in io_uring operations by implementing these traits.
89//
90// 4. **Zero-copy**: Accepting generic buffers avoids the need to copy data into/out of
91//    a library-owned buffer type.
92//
93// The traits are `unsafe` because implementors must guarantee pointer stability across moves,
94// which is automatically true for heap allocations but NOT for stack arrays.
95
96/// A buffer that can be used for io_uring write operations.
97///
98/// # Safety
99///
100/// Implementors must guarantee that:
101/// - The pointer returned by `as_ptr()` remains valid and at a stable address until the I/O operation completes, even if `self` is moved.
102/// - This is automatically satisfied for heap-allocated buffers (`Vec<u8>`, `Box<[u8]>`, etc.) but NOT for stack-allocated arrays.
103pub unsafe trait IoBuf: Send + 'static {
104  /// Returns a pointer to the buffer's data.
105  fn as_ptr(&self) -> *const u8;
106  /// Returns the number of initialized bytes in the buffer.
107  fn len(&self) -> usize;
108  /// Returns true if the buffer is empty.
109  fn is_empty(&self) -> bool {
110    self.len() == 0
111  }
112}
113
114/// A buffer that can be used for io_uring read operations.
115///
116/// # Safety
117///
118/// Implementors must guarantee that:
119/// - The pointer returned by `as_mut_ptr()` remains valid and at a stable address until the I/O operation completes, even if `self` is moved.
120/// - This is automatically satisfied for heap-allocated buffers (`Vec<u8>`, `Box<[u8]>`, etc.) but NOT for stack-allocated arrays.
121pub unsafe trait IoBufMut: Send + 'static {
122  /// Returns a mutable pointer to the buffer's data.
123  fn as_mut_ptr(&mut self) -> *mut u8;
124  /// Returns the buffer's total capacity (maximum bytes that can be read into it).
125  fn capacity(&self) -> usize;
126}
127
128// Implementations for Vec<u8>
129unsafe impl IoBuf for Vec<u8> {
130  fn as_ptr(&self) -> *const u8 {
131    Vec::as_ptr(self)
132  }
133
134  fn len(&self) -> usize {
135    Vec::len(self)
136  }
137}
138
139unsafe impl IoBufMut for Vec<u8> {
140  fn as_mut_ptr(&mut self) -> *mut u8 {
141    Vec::as_mut_ptr(self)
142  }
143
144  fn capacity(&self) -> usize {
145    Vec::capacity(self)
146  }
147}
148
149// Implementations for Box<[u8]>
150unsafe impl IoBuf for Box<[u8]> {
151  fn as_ptr(&self) -> *const u8 {
152    <[u8]>::as_ptr(self)
153  }
154
155  fn len(&self) -> usize {
156    <[u8]>::len(self)
157  }
158}
159
160unsafe impl IoBufMut for Box<[u8]> {
161  fn as_mut_ptr(&mut self) -> *mut u8 {
162    <[u8]>::as_mut_ptr(self)
163  }
164
165  fn capacity(&self) -> usize {
166    // Box<[u8]> has fixed size, capacity == len
167    <[u8]>::len(self)
168  }
169}
170
171// ============================================================================
172// File Target Types
173// ============================================================================
174
175/// Internal representation of a file target - either a raw fd or a registered file index.
176#[derive(Clone, Copy)]
177#[doc(hidden)]
178pub enum Target {
179  Fd(RawFd),
180  Fixed { index: u32, raw_fd: RawFd },
181}
182
183/// Trait for types that can be used as file targets in io_uring operations.
184///
185/// This is implemented for all types that implement `AsRawFd` (using unregistered fds) and for `RegisteredFile` (using registered file indices for better performance).
186pub trait UringTarget {
187  #[doc(hidden)]
188  fn as_target(&self, uring_identity: &Arc<()>) -> Target;
189}
190
191impl<T: AsRawFd> UringTarget for T {
192  fn as_target(&self, _uring_identity: &Arc<()>) -> Target {
193    Target::Fd(self.as_raw_fd())
194  }
195}
196
197/// A file registered with a specific `Uring` instance for optimized I/O.
198///
199/// Registered files avoid the overhead of fd lookup on each operation. Create one via [`Uring::register`].
200///
201/// # Performance
202///
203/// Using registered files can significantly reduce per-operation overhead, especially for high-frequency I/O patterns. The kernel maintains a pre-validated reference to the file, avoiding repeated fd table lookups.
204///
205/// # Kernel Requirements
206///
207/// Requires Linux 5.12+ for sparse file registration.
208pub struct RegisteredFile {
209  index: u32,
210  raw_fd: RawFd,
211  uring_identity: Arc<()>,
212}
213
214impl UringTarget for RegisteredFile {
215  fn as_target(&self, uring_identity: &Arc<()>) -> Target {
216    assert!(
217      Arc::ptr_eq(&self.uring_identity, uring_identity),
218      "RegisteredFile used with wrong Uring instance"
219    );
220    Target::Fixed {
221      index: self.index,
222      raw_fd: self.raw_fd,
223    }
224  }
225}
226
227// ============================================================================
228// Internal Request Types (just pointers, caller owns the buffer)
229// ============================================================================
230
231struct ReadRequest {
232  target: Target,
233  buf_ptr: *mut u8,
234  buf_len: u32,
235  offset: u64,
236}
237
238// SAFETY: The pointer is to heap-allocated memory owned by the caller's future, which awaits completion. The pointer is only dereferenced by the kernel, not by our threads.
239unsafe impl Send for ReadRequest {}
240unsafe impl Sync for ReadRequest {}
241
242struct WriteRequest {
243  target: Target,
244  buf_ptr: *const u8,
245  buf_len: u32,
246  offset: u64,
247}
248
249// SAFETY: The pointer is to heap-allocated memory owned by the caller's future, which awaits completion. The pointer is only dereferenced by the kernel, not by our threads.
250unsafe impl Send for WriteRequest {}
251unsafe impl Sync for WriteRequest {}
252
253struct SyncRequest {
254  target: Target,
255  datasync: bool,
256}
257
258struct StatxRequest {
259  target: Target,
260  /// Caller-allocated buffer for statx result. We use libc::statx for the actual storage, cast to types::statx* for the opcode.
261  statx_buf: Box<MaybeUninit<libc::statx>>,
262}
263
264struct FallocateRequest {
265  target: Target,
266  offset: u64,
267  len: u64,
268  mode: i32,
269}
270
271struct FadviseRequest {
272  target: Target,
273  offset: u64,
274  len: i64,
275  advice: i32,
276}
277
278struct FtruncateRequest {
279  target: Target,
280  len: u64,
281}
282
283struct OpenAtRequest {
284  /// Directory fd for relative paths, or AT_FDCWD for current directory.
285  dir_fd: RawFd,
286  /// Path to open. Owned to ensure validity until completion.
287  path: CString,
288  flags: i32,
289  mode: u32,
290}
291
292struct StatxPathRequest {
293  /// Directory fd for relative paths, or AT_FDCWD for current directory.
294  dir_fd: RawFd,
295  /// Path to stat. Owned to ensure validity until completion.
296  path: CString,
297  flags: i32,
298  statx_buf: Box<MaybeUninit<libc::statx>>,
299}
300
301struct CloseRequest {
302  fd: RawFd,
303}
304
305struct RenameAtRequest {
306  old_dir_fd: RawFd,
307  old_path: CString,
308  new_dir_fd: RawFd,
309  new_path: CString,
310  flags: u32,
311}
312
313struct UnlinkAtRequest {
314  dir_fd: RawFd,
315  path: CString,
316  flags: i32,
317}
318
319struct MkdirAtRequest {
320  dir_fd: RawFd,
321  path: CString,
322  mode: u32,
323}
324
325struct SymlinkAtRequest {
326  new_dir_fd: RawFd,
327  target: CString,
328  link_path: CString,
329}
330
331struct LinkAtRequest {
332  old_dir_fd: RawFd,
333  old_path: CString,
334  new_dir_fd: RawFd,
335  new_path: CString,
336  flags: i32,
337}
338
339// ============================================================================
340// Response Types
341// ============================================================================
342
343/// Result of a read operation: the buffer and actual bytes read.
344pub struct ReadResult<B> {
345  /// The buffer containing the data read.
346  pub buf: B,
347  /// Number of bytes actually read (may be less than buffer capacity at EOF). Limited to ~2GB per operation.
348  pub bytes_read: u32,
349}
350
351/// Result of a write operation: the buffer and actual bytes written.
352pub struct WriteResult<B> {
353  /// The original buffer (returned for reuse).
354  pub buf: B,
355  /// Number of bytes actually written (may be less than buffer size for non-regular files). Limited to ~2GB per operation.
356  pub bytes_written: u32,
357}
358
359// ============================================================================
360// Request Enum
361// ============================================================================
362
363enum Message {
364  Read {
365    req: ReadRequest,
366    res: oneshot::Sender<io::Result<u32>>,
367  },
368  Write {
369    req: WriteRequest,
370    res: oneshot::Sender<io::Result<u32>>,
371  },
372  Sync {
373    req: SyncRequest,
374    res: oneshot::Sender<io::Result<()>>,
375  },
376  Statx {
377    req: StatxRequest,
378    res: oneshot::Sender<io::Result<Metadata>>,
379  },
380  Fallocate {
381    req: FallocateRequest,
382    res: oneshot::Sender<io::Result<()>>,
383  },
384  Fadvise {
385    req: FadviseRequest,
386    res: oneshot::Sender<io::Result<()>>,
387  },
388  Ftruncate {
389    req: FtruncateRequest,
390    res: oneshot::Sender<io::Result<()>>,
391  },
392  OpenAt {
393    req: OpenAtRequest,
394    res: oneshot::Sender<io::Result<OwnedFd>>,
395  },
396  StatxPath {
397    req: StatxPathRequest,
398    res: oneshot::Sender<io::Result<Metadata>>,
399  },
400  Close {
401    req: CloseRequest,
402    res: oneshot::Sender<io::Result<()>>,
403  },
404  RenameAt {
405    req: RenameAtRequest,
406    res: oneshot::Sender<io::Result<()>>,
407  },
408  UnlinkAt {
409    req: UnlinkAtRequest,
410    res: oneshot::Sender<io::Result<()>>,
411  },
412  MkdirAt {
413    req: MkdirAtRequest,
414    res: oneshot::Sender<io::Result<()>>,
415  },
416  SymlinkAt {
417    req: SymlinkAtRequest,
418    res: oneshot::Sender<io::Result<()>>,
419  },
420  LinkAt {
421    req: LinkAtRequest,
422    res: oneshot::Sender<io::Result<()>>,
423  },
424}
425
426// ============================================================================
427// Uring Configuration
428// ============================================================================
429
430/// Default ring size for io_uring (16384 entries).
431///
432/// This is a conservative default that works in most environments including containers
433/// and memory-constrained systems. The kernel will further clamp this if needed via
434/// `IORING_SETUP_CLAMP`.
435pub const DEFAULT_RING_SIZE: u32 = 16384;
436
437/// Configuration options for io_uring initialization.
438///
439/// These are advanced options that affect io_uring behavior. Most users should use `UringCfg::default()`. Incorrect configuration may cause `EINVAL` errors or degraded performance.
440///
441/// # Kernel Requirements
442///
443/// Some options require specific kernel versions or capabilities:
444/// - `coop_taskrun`: Linux 5.19+
445/// - `defer_taskrun`: Linux 6.1+
446/// - `sqpoll`: Requires `CAP_SYS_NICE` capability
447/// - `iopoll`: Only works with O_DIRECT files on supported filesystems
448#[derive(Clone, Debug)]
449pub struct UringCfg {
450  /// Size of the io_uring submission/completion queues (number of entries).
451  ///
452  /// Larger values allow more operations to be batched but consume more memory.
453  /// The kernel will clamp this to the maximum supported size via `IORING_SETUP_CLAMP`.
454  ///
455  /// If you encounter `ENOMEM` errors during initialization, try reducing this value.
456  /// Defaults to [`DEFAULT_RING_SIZE`] (16384 entries).
457  pub ring_size: u32,
458
459  /// Enable cooperative task running (Linux 5.19+). When enabled, the kernel will only process completions when the application explicitly asks for them, reducing overhead.
460  pub coop_taskrun: bool,
461
462  /// Enable deferred task running (Linux 6.1+). Similar to `coop_taskrun` but with additional deferral. Requires `coop_taskrun` to also be set.
463  pub defer_taskrun: bool,
464
465  /// Enable I/O polling mode. When enabled, the kernel will poll for completions instead of using interrupts. Only works with `O_DIRECT` files on supported filesystems. Can provide lower latency but uses more CPU.
466  pub iopoll: bool,
467
468  /// Enable submission queue polling with the given idle timeout in milliseconds. When enabled, a kernel thread will poll the submission queue, eliminating the need for system calls to submit I/O. The thread will go to sleep after being idle for the specified duration. **Requires `CAP_SYS_NICE` capability.**
469  pub sqpoll: Option<u32>,
470}
471
472impl Default for UringCfg {
473  fn default() -> Self {
474    Self {
475      ring_size: DEFAULT_RING_SIZE,
476      coop_taskrun: false,
477      defer_taskrun: false,
478      iopoll: false,
479      sqpoll: None,
480    }
481  }
482}
483
484// ============================================================================
485// Uring Core
486// ============================================================================
487
488/// Handle to a shared io_uring instance.
489///
490/// This is the main entry point for performing async I/O operations via io_uring. It is cheap to clone (just clones an `Arc` internally) and safe to share across threads and async tasks.
491///
492/// # Example
493///
494/// ```ignore
495/// use uring_file::uring::{Uring, UringCfg};
496/// use std::fs::File;
497///
498/// #[tokio::main]
499/// async fn main() -> std::io::Result<()> {
500///     let uring = Uring::new(UringCfg::default())?;
501///     let file = File::open("test.txt")?;
502///     
503///     // Read with library-allocated buffer
504///     let result = uring.read_at(&file, 0, 1024).await?;
505///     
506///     // Read into user-provided buffer (zero-copy for custom allocators)
507///     let buf = vec![0u8; 1024];
508///     let result = uring.read_into(&file, 0, buf).await?;
509///     
510///     // Register file for reduced per-operation overhead
511///     let registered = uring.register(&file)?;
512///     let result = uring.read_at(&registered, 0, 1024).await?;
513///     
514///     println!("Read {} bytes", result.bytes_read);
515///     Ok(())
516/// }
517/// ```
518///
519/// # Architecture Notes
520///
521/// Internally, `Uring` spawns two background threads:
522/// 1. **Submission thread**: Receives requests via a channel, batches them, and submits them to the kernel.
523/// 2. **Completion thread**: Polls the completion queue and dispatches results back to waiting async tasks.
524///
525/// This design allows for efficient batching of submissions while maintaining a simple async API.
526// Sources:
527// - Example: https://github1s.com/tokio-rs/io-uring/blob/HEAD/examples/tcp_echo.rs
528// - liburing docs: https://unixism.net/loti/ref-liburing/completion.html
529// - Quick high-level overview: https://man.archlinux.org/man/io_uring.7.en
530// - io_uring walkthrough: https://unixism.net/2020/04/io-uring-by-example-part-1-introduction/
531// - Multithreading:
532//   - https://github.com/axboe/liburing/issues/109#issuecomment-1114213402
533//   - https://github.com/axboe/liburing/issues/109#issuecomment-1166378978
534//   - https://github.com/axboe/liburing/issues/109#issuecomment-614911522
535//   - https://github.com/axboe/liburing/issues/125
536//   - https://github.com/axboe/liburing/issues/127
537//   - https://github.com/axboe/liburing/issues/129
538//   - https://github.com/axboe/liburing/issues/571#issuecomment-1106480309
539// - Kernel poller: https://unixism.net/loti/tutorial/sq_poll.html
540#[derive(Clone)]
541pub struct Uring {
542  // We don't use std::sync::mpsc::Sender as it is not Sync, so it's really complicated to use from any async function.
543  sender: crossbeam_channel::Sender<Message>,
544  ring: Arc<IoUring<SEntry, CEntry>>,
545  next_file_slot: Arc<AtomicU32>,
546  identity: Arc<()>,
547}
548
549/// Helper to build a submission entry for either Fd or Fixed target.
550macro_rules! build_op {
551  ($target:expr, | $fd:ident | $op:expr) => {
552    match $target {
553      Target::Fd(raw) => {
554        let $fd = types::Fd(raw);
555        $op
556      }
557      Target::Fixed { index, .. } => {
558        let $fd = types::Fixed(index);
559        $op
560      }
561    }
562  };
563}
564
565/// Helper to build a submission entry that only supports Fd (not Fixed).
566macro_rules! build_op_fd_only {
567  ($target:expr, | $fd:ident | $op:expr) => {
568    match $target {
569      Target::Fd(raw) => {
570        let $fd = types::Fd(raw);
571        $op
572      }
573      Target::Fixed { raw_fd, .. } => {
574        let $fd = types::Fd(raw_fd);
575        $op
576      }
577    }
578  };
579}
580
581/// Process a completion entry and dispatch the result.
582fn handle_completion(msg: Message, result: i32) {
583  let result: io::Result<i32> = if result < 0 {
584    Err(io::Error::from_raw_os_error(-result))
585  } else {
586    Ok(result)
587  };
588
589  match msg {
590    Message::Read { res, .. } => {
591      let _ = res.send(result.map(|n| n as u32));
592    }
593    Message::Write { res, .. } => {
594      let _ = res.send(result.map(|n| n as u32));
595    }
596    Message::Sync { res, .. } => {
597      let _ = res.send(result.map(|_| ()));
598    }
599    Message::Statx { req, res } => {
600      let outcome = result.map(|_| {
601        // SAFETY: The kernel has initialized the statx buffer
602        let statx = unsafe { (*req.statx_buf).assume_init() };
603        Metadata(statx)
604      });
605      let _ = res.send(outcome);
606    }
607    Message::Fallocate { res, .. } => {
608      let _ = res.send(result.map(|_| ()));
609    }
610    Message::Fadvise { res, .. } => {
611      let _ = res.send(result.map(|_| ()));
612    }
613    Message::Ftruncate { res, .. } => {
614      let _ = res.send(result.map(|_| ()));
615    }
616    Message::OpenAt { res, .. } => {
617      let outcome = result.map(|fd| {
618        // SAFETY: The kernel returns a valid fd on success
619        unsafe { OwnedFd::from_raw_fd(fd) }
620      });
621      let _ = res.send(outcome);
622    }
623    Message::StatxPath { req, res } => {
624      let outcome = result.map(|_| {
625        // SAFETY: The kernel has initialized the statx buffer
626        let statx = unsafe { (*req.statx_buf).assume_init() };
627        Metadata(statx)
628      });
629      let _ = res.send(outcome);
630    }
631    Message::Close { res, .. } => {
632      let _ = res.send(result.map(|_| ()));
633    }
634    Message::RenameAt { res, .. } => {
635      let _ = res.send(result.map(|_| ()));
636    }
637    Message::UnlinkAt { res, .. } => {
638      let _ = res.send(result.map(|_| ()));
639    }
640    Message::MkdirAt { res, .. } => {
641      let _ = res.send(result.map(|_| ()));
642    }
643    Message::SymlinkAt { res, .. } => {
644      let _ = res.send(result.map(|_| ()));
645    }
646    Message::LinkAt { res, .. } => {
647      let _ = res.send(result.map(|_| ()));
648    }
649  }
650}
651
652impl Uring {
653  /// Create a new io_uring instance with the given configuration.
654  ///
655  /// This spawns two background threads for submission and completion handling. The threads will automatically stop when all `Uring` handles are dropped.
656  ///
657  /// # Errors
658  ///
659  /// Returns an error if the io_uring cannot be created (e.g., kernel too old, resource limits exceeded, or insufficient permissions).
660  pub fn new(cfg: UringCfg) -> io::Result<Self> {
661    let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
662    let pending: Arc<DashMap<u64, Message>> = Default::default();
663
664    let ring = {
665      let mut builder = IoUring::<SEntry, CEntry>::builder();
666      // Use IORING_SETUP_CLAMP to let the kernel reduce the ring size if our requested size exceeds system limits. This is safer than failing outright.
667      builder.setup_clamp();
668      if cfg.coop_taskrun {
669        builder.setup_coop_taskrun();
670      };
671      if cfg.defer_taskrun {
672        builder.setup_defer_taskrun();
673      };
674      if cfg.iopoll {
675        builder.setup_iopoll();
676      }
677      if let Some(sqpoll) = cfg.sqpoll {
678        builder.setup_sqpoll(sqpoll);
679      };
680      builder.build(cfg.ring_size)?
681    };
682
683    // Pre-allocate sparse file table for registration (Linux 5.12+). If this fails, file registration won't work but unregistered fds will still function.
684    let _ = ring.submitter().register_files_sparse(MAX_REGISTERED_FILES);
685
686    let ring = Arc::new(ring);
687
688    // Submission thread.
689    thread::spawn({
690      let pending = pending.clone();
691      let ring = ring.clone();
692      // This is outside the loop to avoid reallocation each time.
693      let mut msgbuf = VecDeque::new();
694      move || {
695        // SAFETY: We ensure that the submission queue is only accessed from this single thread. The completion queue is accessed from a separate thread.
696        let mut submission = unsafe { ring.submission_shared() };
697        let mut next_id = 0u64;
698
699        // If this loop exits, it means we've dropped all `Uring` handles and can safely stop.
700        while let Ok(init_msg) = receiver.recv() {
701          // Process multiple messages at once to avoid too many io_uring submits.
702          msgbuf.push_back(init_msg);
703          while let Ok(msg) = receiver.try_recv() {
704            msgbuf.push_back(msg);
705          }
706
707          // How the io_uring submission queue works:
708          // - The buffer is shared between the kernel and userspace.
709          // - There are atomic head and tail indices that allow them to be shared mutably between kernel and userspace safely.
710          // - The Rust library we're using abstracts over this by caching the head and tail as local values. Once we've made our inserts, we update the atomic tail and then tell the kernel to consume some of the queue. When we update the atomic tail, we also check the atomic head and update our local cached value; some entries may have been consumed by the kernel in some other thread since we last checked and we may actually have more free space than we thought.
711          while let Some(msg) = msgbuf.pop_front() {
712            let id = next_id;
713            next_id = next_id.wrapping_add(1);
714
715            let submission_entry = match &msg {
716              Message::Read { req, .. } => {
717                build_op!(req.target, |fd| opcode::Read::new(
718                  fd,
719                  req.buf_ptr,
720                  req.buf_len
721                )
722                .offset(req.offset)
723                .build()
724                .user_data(id))
725              }
726              Message::Write { req, .. } => {
727                build_op!(req.target, |fd| opcode::Write::new(
728                  fd,
729                  req.buf_ptr,
730                  req.buf_len
731                )
732                .offset(req.offset)
733                .build()
734                .user_data(id))
735              }
736              Message::Sync { req, .. } => {
737                build_op!(req.target, |fd| {
738                  let mut fsync = opcode::Fsync::new(fd);
739                  if req.datasync {
740                    fsync = fsync.flags(types::FsyncFlags::DATASYNC);
741                  }
742                  fsync.build().user_data(id)
743                })
744              }
745              Message::Statx { req, .. } => {
746                const STATX_BASIC_STATS: u32 = 0x000007ff; // Request all basic stat fields
747                const AT_EMPTY_PATH: i32 = 0x1000; // Interpret fd as the file itself, not a directory
748                static EMPTY_PATH: &std::ffi::CStr = c""; // Empty path since we use AT_EMPTY_PATH
749
750                // Cast libc::statx* to types::statx* - the opcode uses an opaque type but the kernel writes the actual statx struct
751                let statx_ptr = req.statx_buf.as_ptr() as *mut types::statx;
752
753                // Note: Statx doesn't support Fixed in the io-uring crate, so we fall back to raw fd
754                build_op_fd_only!(req.target, |fd| opcode::Statx::new(
755                  fd,
756                  EMPTY_PATH.as_ptr(),
757                  statx_ptr
758                )
759                .flags(AT_EMPTY_PATH)
760                .mask(STATX_BASIC_STATS)
761                .build()
762                .user_data(id))
763              }
764              Message::Fallocate { req, .. } => {
765                build_op!(req.target, |fd| opcode::Fallocate::new(fd, req.len)
766                  .offset(req.offset)
767                  .mode(req.mode)
768                  .build()
769                  .user_data(id))
770              }
771              Message::Fadvise { req, .. } => {
772                build_op!(req.target, |fd| opcode::Fadvise::new(
773                  fd, req.len, req.advice
774                )
775                .offset(req.offset)
776                .build()
777                .user_data(id))
778              }
779              Message::Ftruncate { req, .. } => {
780                build_op!(req.target, |fd| opcode::Ftruncate::new(fd, req.len)
781                  .build()
782                  .user_data(id))
783              }
784              Message::OpenAt { req, .. } => {
785                opcode::OpenAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
786                  .flags(req.flags)
787                  .mode(req.mode)
788                  .build()
789                  .user_data(id)
790              }
791              Message::StatxPath { req, .. } => {
792                const STATX_BASIC_STATS: u32 = 0x000007ff;
793                let statx_ptr = req.statx_buf.as_ptr() as *mut types::statx;
794
795                opcode::Statx::new(types::Fd(req.dir_fd), req.path.as_ptr(), statx_ptr)
796                  .flags(req.flags)
797                  .mask(STATX_BASIC_STATS)
798                  .build()
799                  .user_data(id)
800              }
801              Message::Close { req, .. } => {
802                opcode::Close::new(types::Fd(req.fd)).build().user_data(id)
803              }
804              Message::RenameAt { req, .. } => opcode::RenameAt::new(
805                types::Fd(req.old_dir_fd),
806                req.old_path.as_ptr(),
807                types::Fd(req.new_dir_fd),
808                req.new_path.as_ptr(),
809              )
810              .flags(req.flags)
811              .build()
812              .user_data(id),
813              Message::UnlinkAt { req, .. } => {
814                opcode::UnlinkAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
815                  .flags(req.flags)
816                  .build()
817                  .user_data(id)
818              }
819              Message::MkdirAt { req, .. } => {
820                opcode::MkDirAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
821                  .mode(req.mode)
822                  .build()
823                  .user_data(id)
824              }
825              Message::SymlinkAt { req, .. } => opcode::SymlinkAt::new(
826                types::Fd(req.new_dir_fd),
827                req.target.as_ptr(),
828                req.link_path.as_ptr(),
829              )
830              .build()
831              .user_data(id),
832              Message::LinkAt { req, .. } => opcode::LinkAt::new(
833                types::Fd(req.old_dir_fd),
834                req.old_path.as_ptr(),
835                types::Fd(req.new_dir_fd),
836                req.new_path.as_ptr(),
837              )
838              .flags(req.flags)
839              .build()
840              .user_data(id),
841            };
842
843            // Insert before submitting so the completion handler can find it.
844            pending.insert(id, msg);
845
846            if submission.is_full() {
847              submission.sync();
848              ring.submit_and_wait(1).unwrap();
849            }
850
851            // SAFETY: The submission entry references memory owned by the caller's future, which is awaiting completion.
852            unsafe {
853              submission.push(&submission_entry).unwrap();
854            };
855          }
856
857          submission.sync();
858          // This is still necessary even with sqpoll, as our kernel thread may have gone to sleep.
859          ring.submit().unwrap();
860        }
861      }
862    });
863
864    // Completion thread.
865    thread::spawn({
866      let pending = pending.clone();
867      let ring = ring.clone();
868      move || {
869        // SAFETY: We ensure that the completion queue is only accessed from this single thread. The submission queue is accessed from a separate thread.
870        let mut completion = unsafe { ring.completion_shared() };
871
872        // TODO: Stop this loop if all `Uring` handles have been dropped and there are no pending requests. Currently this thread runs forever.
873        loop {
874          let Some(e) = completion.next() else {
875            // No completions available, wait for one.
876            ring.submit_and_wait(1).unwrap();
877            completion.sync();
878            continue;
879          };
880
881          let id = e.user_data();
882          let (_, req) = pending
883            .remove(&id)
884            .expect("completion for unknown request id");
885          handle_completion(req, e.result());
886        }
887      }
888    });
889
890    Ok(Self {
891      sender,
892      ring,
893      next_file_slot: Arc::new(AtomicU32::new(0)),
894      identity: Arc::new(()),
895    })
896  }
897
898  /// Register a file for optimized I/O operations.
899  ///
900  /// Registered files use kernel-side file references, avoiding fd table lookups on each operation. This can significantly improve performance for high-frequency I/O.
901  ///
902  /// # Errors
903  ///
904  /// Returns an error if the maximum number of registered files has been reached, or if file registration fails (e.g., kernel too old).
905  ///
906  /// # Kernel Requirements
907  ///
908  /// Requires Linux 5.12+ for sparse file registration.
909  pub fn register(&self, file: &impl AsRawFd) -> io::Result<RegisteredFile> {
910    let raw_fd = file.as_raw_fd();
911    let slot = self.next_file_slot.fetch_add(1, Ordering::SeqCst);
912    if slot >= MAX_REGISTERED_FILES {
913      return Err(io::Error::new(
914        io::ErrorKind::Other,
915        "maximum registered files exceeded",
916      ));
917    }
918
919    self
920      .ring
921      .submitter()
922      .register_files_update(slot, &[raw_fd])?;
923
924    Ok(RegisteredFile {
925      index: slot,
926      raw_fd,
927      uring_identity: self.identity.clone(),
928    })
929  }
930
931  /// Send a message to the submission thread.
932  fn send(&self, msg: Message) {
933    self.sender.send(msg).expect("uring submission thread dead");
934  }
935
936  /// Read into a user-provided buffer. This is the primitive read operation that accepts any buffer type implementing [`IoBufMut`].
937  ///
938  /// The buffer is returned along with the number of bytes read. This allows buffer reuse and supports custom allocators (e.g., aligned buffers for O_DIRECT).
939  pub async fn read_into_at<B: IoBufMut>(
940    &self,
941    file: &impl UringTarget,
942    offset: impl TryInto<u64>,
943    mut buf: B,
944  ) -> io::Result<ReadResult<B>> {
945    let offset: u64 = offset
946      .try_into()
947      .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
948    let target = file.as_target(&self.identity);
949    let ptr = buf.as_mut_ptr();
950    let cap = buf.capacity();
951    let (tx, rx) = oneshot::channel();
952    self.send(Message::Read {
953      req: ReadRequest {
954        target,
955        buf_ptr: ptr,
956        buf_len: cap.try_into().unwrap(),
957        offset,
958      },
959      res: tx,
960    });
961    let bytes_read = rx.await.expect("uring completion channel dropped")?;
962    Ok(ReadResult { buf, bytes_read })
963  }
964
965  /// Read from a file at the specified offset, allocating a buffer internally.
966  ///
967  /// This is a convenience wrapper around [`read_into_at`](Self::read_into_at) that allocates a `Vec<u8>`. For zero-copy or custom allocators, use `read_into_at` directly.
968  pub async fn read_at(
969    &self,
970    file: &impl UringTarget,
971    offset: impl TryInto<u64>,
972    len: impl TryInto<u32>,
973  ) -> io::Result<ReadResult<Vec<u8>>> {
974    let len: u32 = len
975      .try_into()
976      .map_err(|_| io::Error::other("len exceeds u32::MAX"))?;
977    let buf = vec![0u8; len as usize];
978    self.read_into_at(file, offset, buf).await
979  }
980
981  /// Write a buffer to a file at the specified offset. Accepts any buffer type implementing [`IoBuf`].
982  ///
983  /// The buffer is returned along with the number of bytes written. This allows buffer reuse and supports custom allocators.
984  pub async fn write_at<B: IoBuf>(
985    &self,
986    file: &impl UringTarget,
987    offset: impl TryInto<u64>,
988    buf: B,
989  ) -> io::Result<WriteResult<B>> {
990    let offset: u64 = offset
991      .try_into()
992      .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
993    let target = file.as_target(&self.identity);
994    let ptr = buf.as_ptr();
995    let len = buf.len();
996    let (tx, rx) = oneshot::channel();
997    self.send(Message::Write {
998      req: WriteRequest {
999        target,
1000        buf_ptr: ptr,
1001        buf_len: len.try_into().unwrap(),
1002        offset,
1003      },
1004      res: tx,
1005    });
1006    let bytes_written = rx.await.expect("uring completion channel dropped")?;
1007    Ok(WriteResult { buf, bytes_written })
1008  }
1009
1010  /// Read exactly `len` bytes from a file at the specified offset.
1011  ///
1012  /// Returns an error if fewer bytes are available (e.g., at EOF). This is useful when you know the exact size of data to read and want to fail rather than handle partial reads.
1013  pub async fn read_exact_at(
1014    &self,
1015    file: &impl UringTarget,
1016    offset: impl TryInto<u64>,
1017    len: impl TryInto<u32>,
1018  ) -> io::Result<Vec<u8>> {
1019    let len: u32 = len
1020      .try_into()
1021      .map_err(|_| io::Error::other("len exceeds u32::MAX"))?;
1022    let result = self.read_at(file, offset, len).await?;
1023    if result.bytes_read < len {
1024      return Err(io::Error::new(
1025        io::ErrorKind::UnexpectedEof,
1026        format!("expected {} bytes, got {}", len, result.bytes_read),
1027      ));
1028    }
1029    Ok(result.buf)
1030  }
1031
1032  /// Write all bytes to a file at the specified offset.
1033  ///
1034  /// Loops on short writes until all data is written. Returns an error if a write returns 0 bytes (indicating the write cannot proceed).
1035  pub async fn write_all_at(
1036    &self,
1037    file: &impl UringTarget,
1038    offset: impl TryInto<u64>,
1039    data: &[u8],
1040  ) -> io::Result<()> {
1041    let mut offset: u64 = offset
1042      .try_into()
1043      .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
1044    let mut written = 0usize;
1045    while written < data.len() {
1046      let remaining = data.len() - written;
1047      let chunk_size = min(remaining, URING_LEN_MAX as usize);
1048      let chunk = data[written..written + chunk_size].to_vec();
1049      let result = self.write_at(file, offset, chunk).await?;
1050      if result.bytes_written == 0 {
1051        return Err(io::Error::new(
1052          io::ErrorKind::WriteZero,
1053          "failed to write any bytes",
1054        ));
1055      }
1056      written += result.bytes_written as usize;
1057      offset += result.bytes_written as u64;
1058    }
1059    Ok(())
1060  }
1061
1062  /// Synchronize file data and metadata to disk (fsync). This ensures that all data and metadata modifications are flushed to the underlying storage device. Even when using direct I/O, this is necessary to ensure the device itself has flushed any internal caches.
1063  ///
1064  /// **Note on ordering**: io_uring does not guarantee ordering between operations. If you need to ensure writes complete before fsync, you should await the write first, then call fsync.
1065  pub async fn sync(&self, file: &impl UringTarget) -> io::Result<()> {
1066    let target = file.as_target(&self.identity);
1067    let (tx, rx) = oneshot::channel();
1068    self.send(Message::Sync {
1069      req: SyncRequest {
1070        target,
1071        datasync: false,
1072      },
1073      res: tx,
1074    });
1075    rx.await.expect("uring completion channel dropped")
1076  }
1077
1078  /// Synchronize file data to disk (fdatasync). Like [`sync`](Self::sync), but only flushes data, not metadata (unless the metadata is required to retrieve the data). This can be faster than a full fsync.
1079  pub async fn datasync(&self, file: &impl UringTarget) -> io::Result<()> {
1080    let target = file.as_target(&self.identity);
1081    let (tx, rx) = oneshot::channel();
1082    self.send(Message::Sync {
1083      req: SyncRequest {
1084        target,
1085        datasync: true,
1086      },
1087      res: tx,
1088    });
1089    rx.await.expect("uring completion channel dropped")
1090  }
1091
1092  /// Get file status information (statx). This is the io_uring equivalent of `fstat`/`statx`. It returns metadata about the file including size, permissions, timestamps, etc. Requires Linux 5.6+. On older kernels, this will fail with `EINVAL`.
1093  pub async fn statx(&self, file: &impl UringTarget) -> io::Result<Metadata> {
1094    let target = file.as_target(&self.identity);
1095    let statx_buf = Box::new(MaybeUninit::<libc::statx>::uninit());
1096    let (tx, rx) = oneshot::channel();
1097    self.send(Message::Statx {
1098      req: StatxRequest { target, statx_buf },
1099      res: tx,
1100    });
1101    rx.await.expect("uring completion channel dropped")
1102  }
1103
1104  /// Pre-allocate or deallocate space for a file (fallocate). This can be used to pre-allocate space to avoid fragmentation, punch holes in sparse files, or zero-fill regions. Use `libc::FALLOC_FL_*` constants for mode flags. Requires Linux 5.6+.
1105  pub async fn fallocate(
1106    &self,
1107    file: &impl UringTarget,
1108    offset: impl TryInto<u64>,
1109    len: impl TryInto<u64>,
1110    mode: i32,
1111  ) -> io::Result<()> {
1112    let offset: u64 = offset
1113      .try_into()
1114      .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
1115    let len: u64 = len
1116      .try_into()
1117      .map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
1118    let target = file.as_target(&self.identity);
1119    let (tx, rx) = oneshot::channel();
1120    self.send(Message::Fallocate {
1121      req: FallocateRequest {
1122        target,
1123        offset,
1124        len,
1125        mode,
1126      },
1127      res: tx,
1128    });
1129    rx.await.expect("uring completion channel dropped")
1130  }
1131
1132  /// Advise the kernel about expected file access patterns (fadvise). This is a hint to the kernel about how you intend to access a file region. The kernel may use this to optimize readahead, caching, etc. Use `libc::POSIX_FADV_*` constants for advice values. Requires Linux 5.6+.
1133  pub async fn fadvise(
1134    &self,
1135    file: &impl UringTarget,
1136    offset: impl TryInto<u64>,
1137    len: impl TryInto<u64>,
1138    advice: i32,
1139  ) -> io::Result<()> {
1140    let offset: u64 = offset
1141      .try_into()
1142      .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
1143    let len: u64 = len
1144      .try_into()
1145      .map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
1146    let len: i64 = len
1147      .try_into()
1148      .map_err(|_| io::Error::other("len exceeds i64::MAX"))?;
1149    let target = file.as_target(&self.identity);
1150    let (tx, rx) = oneshot::channel();
1151    self.send(Message::Fadvise {
1152      req: FadviseRequest {
1153        target,
1154        offset,
1155        len,
1156        advice,
1157      },
1158      res: tx,
1159    });
1160    rx.await.expect("uring completion channel dropped")
1161  }
1162
1163  /// Truncate a file to a specified length (ftruncate). If the file is larger than the specified length, the extra data is lost. If the file is smaller, it is extended and the extended part reads as zeros. Requires Linux 6.9+. On older kernels, this will fail with `EINVAL`.
1164  pub async fn ftruncate(&self, file: &impl UringTarget, len: impl TryInto<u64>) -> io::Result<()> {
1165    let len: u64 = len
1166      .try_into()
1167      .map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
1168    let target = file.as_target(&self.identity);
1169    let (tx, rx) = oneshot::channel();
1170    self.send(Message::Ftruncate {
1171      req: FtruncateRequest { target, len },
1172      res: tx,
1173    });
1174    rx.await.expect("uring completion channel dropped")
1175  }
1176
1177  /// Open a file asynchronously. This is the io_uring equivalent of `open(2)`/`openat(2)`. Requires Linux 5.6+.
1178  ///
1179  /// # Arguments
1180  ///
1181  /// * `path` - Path to open (any `CStr`-like type: `&CStr`, `CString`, `c"literal"`).
1182  /// * `flags` - Open flags from `libc` (e.g., `libc::O_RDONLY`, `libc::O_RDWR | libc::O_CREAT`).
1183  /// * `mode` - File mode for creation (only used with `O_CREAT`).
1184  ///
1185  /// # Returns
1186  ///
1187  /// Returns an `OwnedFd` on success. The fd is automatically closed when dropped.
1188  pub async fn open(&self, path: impl AsRef<Path>, flags: i32, mode: u32) -> io::Result<OwnedFd> {
1189    self.open_at(libc::AT_FDCWD, path, flags, mode).await
1190  }
1191
1192  /// Open a file relative to a directory fd. This is useful for safe path traversal and avoiding TOCTOU races. Requires Linux 5.6+.
1193  ///
1194  /// # Arguments
1195  ///
1196  /// * `dir_fd` - Directory fd for relative paths, or `libc::AT_FDCWD` for current directory.
1197  /// * `path` - Path to open relative to `dir_fd`.
1198  /// * `flags` - Open flags.
1199  /// * `mode` - File mode for creation.
1200  pub async fn open_at(
1201    &self,
1202    dir_fd: RawFd,
1203    path: impl AsRef<Path>,
1204    flags: i32,
1205    mode: u32,
1206  ) -> io::Result<OwnedFd> {
1207    let path = path_to_cstring(path.as_ref())?;
1208    let (tx, rx) = oneshot::channel();
1209    self.send(Message::OpenAt {
1210      req: OpenAtRequest {
1211        dir_fd,
1212        path,
1213        flags,
1214        mode,
1215      },
1216      res: tx,
1217    });
1218    rx.await.expect("uring completion channel dropped")
1219  }
1220
1221  /// Get file metadata by path without opening the file. This is the io_uring equivalent of `stat(2)`/`statx(2)`. Requires Linux 5.6+.
1222  ///
1223  /// Unlike `statx()` which operates on an open fd, this method stats the path directly.
1224  pub async fn statx_path(&self, path: impl AsRef<Path>) -> io::Result<Metadata> {
1225    self.statx_at(libc::AT_FDCWD, path, 0).await
1226  }
1227
1228  /// Get file metadata relative to a directory fd. This allows safe path traversal and additional flags for controlling symlink behavior. Requires Linux 5.6+.
1229  ///
1230  /// # Arguments
1231  ///
1232  /// * `dir_fd` - Directory fd for relative paths, or `libc::AT_FDCWD` for current directory.
1233  /// * `path` - Path to stat relative to `dir_fd`.
1234  /// * `flags` - Flags from `libc` (e.g., `libc::AT_SYMLINK_NOFOLLOW` to not follow symlinks).
1235  pub async fn statx_at(
1236    &self,
1237    dir_fd: RawFd,
1238    path: impl AsRef<Path>,
1239    flags: i32,
1240  ) -> io::Result<Metadata> {
1241    let path = path_to_cstring(path.as_ref())?;
1242    let statx_buf = Box::new(MaybeUninit::<libc::statx>::uninit());
1243    let (tx, rx) = oneshot::channel();
1244    self.send(Message::StatxPath {
1245      req: StatxPathRequest {
1246        dir_fd,
1247        path,
1248        flags,
1249        statx_buf,
1250      },
1251      res: tx,
1252    });
1253    rx.await.expect("uring completion channel dropped")
1254  }
1255
1256  /// Close a file descriptor asynchronously. This is the io_uring equivalent of `close(2)`. Requires Linux 5.6+.
1257  ///
1258  /// Takes ownership of the fd to prevent the automatic synchronous close on drop. This is useful when you want to:
1259  /// - Handle close errors (which are silently ignored by `OwnedFd::drop`)
1260  /// - Batch close operations with other io_uring operations
1261  /// - Avoid blocking the async runtime on close
1262  pub async fn close(&self, fd: impl IntoRawFd) -> io::Result<()> {
1263    let raw_fd = fd.into_raw_fd();
1264    let (tx, rx) = oneshot::channel();
1265    self.send(Message::Close {
1266      req: CloseRequest { fd: raw_fd },
1267      res: tx,
1268    });
1269    rx.await.expect("uring completion channel dropped")
1270  }
1271
1272  /// Rename a file asynchronously. This is the io_uring equivalent of `rename(2)`. Requires Linux 5.11+.
1273  pub async fn rename(
1274    &self,
1275    old_path: impl AsRef<Path>,
1276    new_path: impl AsRef<Path>,
1277  ) -> io::Result<()> {
1278    self
1279      .rename_at(libc::AT_FDCWD, old_path, libc::AT_FDCWD, new_path, 0)
1280      .await
1281  }
1282
1283  /// Rename a file relative to directory fds. This is the io_uring equivalent of `renameat2(2)`. Requires Linux 5.11+.
1284  ///
1285  /// Flags can include `libc::RENAME_NOREPLACE`, `libc::RENAME_EXCHANGE`, etc.
1286  pub async fn rename_at(
1287    &self,
1288    old_dir_fd: RawFd,
1289    old_path: impl AsRef<Path>,
1290    new_dir_fd: RawFd,
1291    new_path: impl AsRef<Path>,
1292    flags: u32,
1293  ) -> io::Result<()> {
1294    let old_path = path_to_cstring(old_path.as_ref())?;
1295    let new_path = path_to_cstring(new_path.as_ref())?;
1296    let (tx, rx) = oneshot::channel();
1297    self.send(Message::RenameAt {
1298      req: RenameAtRequest {
1299        old_dir_fd,
1300        old_path,
1301        new_dir_fd,
1302        new_path,
1303        flags,
1304      },
1305      res: tx,
1306    });
1307    rx.await.expect("uring completion channel dropped")
1308  }
1309
1310  /// Delete a file or empty directory. This is the io_uring equivalent of `unlink(2)`. Requires Linux 5.11+.
1311  pub async fn unlink(&self, path: impl AsRef<Path>) -> io::Result<()> {
1312    self.unlink_at(libc::AT_FDCWD, path, 0).await
1313  }
1314
1315  /// Delete a directory. This is the io_uring equivalent of `rmdir(2)`. Requires Linux 5.11+.
1316  pub async fn rmdir(&self, path: impl AsRef<Path>) -> io::Result<()> {
1317    self
1318      .unlink_at(libc::AT_FDCWD, path, libc::AT_REMOVEDIR)
1319      .await
1320  }
1321
1322  /// Delete a file or directory relative to a directory fd. This is the io_uring equivalent of `unlinkat(2)`. Requires Linux 5.11+.
1323  ///
1324  /// Use `libc::AT_REMOVEDIR` flag to remove directories.
1325  pub async fn unlink_at(
1326    &self,
1327    dir_fd: RawFd,
1328    path: impl AsRef<Path>,
1329    flags: i32,
1330  ) -> io::Result<()> {
1331    let path = path_to_cstring(path.as_ref())?;
1332    let (tx, rx) = oneshot::channel();
1333    self.send(Message::UnlinkAt {
1334      req: UnlinkAtRequest {
1335        dir_fd,
1336        path,
1337        flags,
1338      },
1339      res: tx,
1340    });
1341    rx.await.expect("uring completion channel dropped")
1342  }
1343
1344  /// Create a directory. This is the io_uring equivalent of `mkdir(2)`. Requires Linux 5.15+.
1345  pub async fn mkdir(&self, path: impl AsRef<Path>, mode: u32) -> io::Result<()> {
1346    self.mkdir_at(libc::AT_FDCWD, path, mode).await
1347  }
1348
1349  /// Create a directory relative to a directory fd. This is the io_uring equivalent of `mkdirat(2)`. Requires Linux 5.15+.
1350  pub async fn mkdir_at(&self, dir_fd: RawFd, path: impl AsRef<Path>, mode: u32) -> io::Result<()> {
1351    let path = path_to_cstring(path.as_ref())?;
1352    let (tx, rx) = oneshot::channel();
1353    self.send(Message::MkdirAt {
1354      req: MkdirAtRequest { dir_fd, path, mode },
1355      res: tx,
1356    });
1357    rx.await.expect("uring completion channel dropped")
1358  }
1359
1360  /// Create a symbolic link. This is the io_uring equivalent of `symlink(2)`. Requires Linux 5.11+.
1361  pub async fn symlink(
1362    &self,
1363    target: impl AsRef<Path>,
1364    link_path: impl AsRef<Path>,
1365  ) -> io::Result<()> {
1366    self.symlink_at(target, libc::AT_FDCWD, link_path).await
1367  }
1368
1369  /// Create a symbolic link relative to a directory fd. This is the io_uring equivalent of `symlinkat(2)`. Requires Linux 5.11+.
1370  pub async fn symlink_at(
1371    &self,
1372    target: impl AsRef<Path>,
1373    new_dir_fd: RawFd,
1374    link_path: impl AsRef<Path>,
1375  ) -> io::Result<()> {
1376    let target = path_to_cstring(target.as_ref())?;
1377    let link_path = path_to_cstring(link_path.as_ref())?;
1378    let (tx, rx) = oneshot::channel();
1379    self.send(Message::SymlinkAt {
1380      req: SymlinkAtRequest {
1381        new_dir_fd,
1382        target,
1383        link_path,
1384      },
1385      res: tx,
1386    });
1387    rx.await.expect("uring completion channel dropped")
1388  }
1389
1390  /// Create a hard link. This is the io_uring equivalent of `link(2)`. Requires Linux 5.11+.
1391  pub async fn hard_link(
1392    &self,
1393    original: impl AsRef<Path>,
1394    link: impl AsRef<Path>,
1395  ) -> io::Result<()> {
1396    self
1397      .hard_link_at(libc::AT_FDCWD, original, libc::AT_FDCWD, link, 0)
1398      .await
1399  }
1400
1401  /// Create a hard link relative to directory fds. This is the io_uring equivalent of `linkat(2)`. Requires Linux 5.11+.
1402  pub async fn hard_link_at(
1403    &self,
1404    old_dir_fd: RawFd,
1405    original: impl AsRef<Path>,
1406    new_dir_fd: RawFd,
1407    link: impl AsRef<Path>,
1408    flags: i32,
1409  ) -> io::Result<()> {
1410    let old_path = path_to_cstring(original.as_ref())?;
1411    let new_path = path_to_cstring(link.as_ref())?;
1412    let (tx, rx) = oneshot::channel();
1413    self.send(Message::LinkAt {
1414      req: LinkAtRequest {
1415        old_dir_fd,
1416        old_path,
1417        new_dir_fd,
1418        new_path,
1419        flags,
1420      },
1421      res: tx,
1422    });
1423    rx.await.expect("uring completion channel dropped")
1424  }
1425}