uring_file/uring.rs
1//! Core io_uring wrapper providing async file operations.
2//!
3//! This module implements a thread-safe, async-compatible wrapper around Linux's io_uring interface. It uses a dedicated submission thread and completion thread to manage the ring, allowing multiple async tasks to share a single ring instance.
4//!
5//! # Architecture
6//!
7//! ```text
8//! ┌─────────────┐ ┌───────────────────┐ ┌────────────────┐
9//! │ Async Tasks │────▶│ Submission Thread │────▶│ io_uring │
10//! │ (callers) │ │ (batches SQEs) │ │ (kernel space) │
11//! └─────────────┘ └───────────────────┘ └───────┬────────┘
12//! ▲ │
13//! │ ┌───────────────────┐ │
14//! └────────────│ Completion Thread │◀────────────┘
15//! │ (polls CQEs) │
16//! └───────────────────┘
17//! ```
18//!
19//! # Thread Safety
20//!
21//! The `Uring` struct is `Clone + Send + Sync`. Cloning is cheap (just clones the internal channel sender). All operations are thread-safe.
22
23use crate::metadata::Metadata;
24use dashmap::DashMap;
25use io_uring::IoUring;
26use io_uring::cqueue::Entry as CEntry;
27use io_uring::opcode;
28use io_uring::squeue::Entry as SEntry;
29use io_uring::types;
30use std::cmp::min;
31use std::collections::VecDeque;
32use std::ffi::CString;
33use std::io;
34use std::mem::MaybeUninit;
35use std::os::fd::AsRawFd;
36use std::os::fd::FromRawFd;
37use std::os::fd::IntoRawFd;
38use std::os::fd::OwnedFd;
39use std::os::fd::RawFd;
40use std::os::unix::ffi::OsStrExt;
41use std::path::Path;
42use std::sync::Arc;
43use std::sync::atomic::AtomicU32;
44use std::sync::atomic::Ordering;
45use std::thread;
46use tokio::sync::oneshot;
47
48// ============================================================================
49// Constants
50// ============================================================================
51
52/// Maximum length for a single io_uring read/write operation.
53///
54/// io_uring uses i32 for return values, limiting single operations to ~2GB. The actual limit is 4096 bytes less than 2GB for unknown reasons.
55pub const URING_LEN_MAX: u64 = 2 * 1024 * 1024 * 1024 - 4096;
56
57/// Maximum number of files that can be registered with a single Uring instance.
58const MAX_REGISTERED_FILES: u32 = 4096;
59
60// ============================================================================
61// Path Conversion
62// ============================================================================
63
64/// Convert a path to a CString for use with io_uring operations.
65fn path_to_cstring(path: &Path) -> io::Result<CString> {
66 CString::new(path.as_os_str().as_bytes()).map_err(|e| {
67 io::Error::new(
68 io::ErrorKind::InvalidInput,
69 format!("path contains null byte at position {}", e.nul_position()),
70 )
71 })
72}
73
74// ============================================================================
75// Buffer Traits
76// ============================================================================
77
78// We define custom buffer traits rather than using `Vec<u8>` or `Box<[u8]>` directly because:
79//
80// 1. **Aligned allocations**: O_DIRECT requires sector-aligned buffers (typically 512 or 4096 bytes).
81// `Vec<u8>` only guarantees pointer alignment, not allocation alignment. Custom allocators
82// can provide properly aligned buffers that implement these traits.
83//
84// 2. **Buffer pools**: High-performance applications reuse buffers to avoid allocation overhead.
85// Pool-managed buffers can implement these traits directly without conversion.
86//
87// 3. **Specialized memory**: GPU memory, mmap'd regions, or other exotic buffer types can
88// participate in io_uring operations by implementing these traits.
89//
90// 4. **Zero-copy**: Accepting generic buffers avoids the need to copy data into/out of
91// a library-owned buffer type.
92//
93// The traits are `unsafe` because implementors must guarantee pointer stability across moves,
94// which is automatically true for heap allocations but NOT for stack arrays.
95
96/// A buffer that can be used for io_uring write operations.
97///
98/// # Safety
99///
100/// Implementors must guarantee that:
101/// - The pointer returned by `as_ptr()` remains valid and at a stable address until the I/O operation completes, even if `self` is moved.
102/// - This is automatically satisfied for heap-allocated buffers (`Vec<u8>`, `Box<[u8]>`, etc.) but NOT for stack-allocated arrays.
103pub unsafe trait IoBuf: Send + 'static {
104 /// Returns a pointer to the buffer's data.
105 fn as_ptr(&self) -> *const u8;
106 /// Returns the number of initialized bytes in the buffer.
107 fn len(&self) -> usize;
108 /// Returns true if the buffer is empty.
109 fn is_empty(&self) -> bool {
110 self.len() == 0
111 }
112}
113
114/// A buffer that can be used for io_uring read operations.
115///
116/// # Safety
117///
118/// Implementors must guarantee that:
119/// - The pointer returned by `as_mut_ptr()` remains valid and at a stable address until the I/O operation completes, even if `self` is moved.
120/// - This is automatically satisfied for heap-allocated buffers (`Vec<u8>`, `Box<[u8]>`, etc.) but NOT for stack-allocated arrays.
121pub unsafe trait IoBufMut: Send + 'static {
122 /// Returns a mutable pointer to the buffer's data.
123 fn as_mut_ptr(&mut self) -> *mut u8;
124 /// Returns the buffer's total capacity (maximum bytes that can be read into it).
125 fn capacity(&self) -> usize;
126}
127
128// Implementations for Vec<u8>
129unsafe impl IoBuf for Vec<u8> {
130 fn as_ptr(&self) -> *const u8 {
131 Vec::as_ptr(self)
132 }
133
134 fn len(&self) -> usize {
135 Vec::len(self)
136 }
137}
138
139unsafe impl IoBufMut for Vec<u8> {
140 fn as_mut_ptr(&mut self) -> *mut u8 {
141 Vec::as_mut_ptr(self)
142 }
143
144 fn capacity(&self) -> usize {
145 Vec::capacity(self)
146 }
147}
148
149// Implementations for Box<[u8]>
150unsafe impl IoBuf for Box<[u8]> {
151 fn as_ptr(&self) -> *const u8 {
152 <[u8]>::as_ptr(self)
153 }
154
155 fn len(&self) -> usize {
156 <[u8]>::len(self)
157 }
158}
159
160unsafe impl IoBufMut for Box<[u8]> {
161 fn as_mut_ptr(&mut self) -> *mut u8 {
162 <[u8]>::as_mut_ptr(self)
163 }
164
165 fn capacity(&self) -> usize {
166 // Box<[u8]> has fixed size, capacity == len
167 <[u8]>::len(self)
168 }
169}
170
171// ============================================================================
172// File Target Types
173// ============================================================================
174
175/// Internal representation of a file target - either a raw fd or a registered file index.
176#[derive(Clone, Copy)]
177#[doc(hidden)]
178pub enum Target {
179 Fd(RawFd),
180 Fixed { index: u32, raw_fd: RawFd },
181}
182
183/// Trait for types that can be used as file targets in io_uring operations.
184///
185/// This is implemented for all types that implement `AsRawFd` (using unregistered fds) and for `RegisteredFile` (using registered file indices for better performance).
186pub trait UringTarget {
187 #[doc(hidden)]
188 fn as_target(&self, uring_identity: &Arc<()>) -> Target;
189}
190
191impl<T: AsRawFd> UringTarget for T {
192 fn as_target(&self, _uring_identity: &Arc<()>) -> Target {
193 Target::Fd(self.as_raw_fd())
194 }
195}
196
197/// A file registered with a specific `Uring` instance for optimized I/O.
198///
199/// Registered files avoid the overhead of fd lookup on each operation. Create one via [`Uring::register`].
200///
201/// # Performance
202///
203/// Using registered files can significantly reduce per-operation overhead, especially for high-frequency I/O patterns. The kernel maintains a pre-validated reference to the file, avoiding repeated fd table lookups.
204///
205/// # Kernel Requirements
206///
207/// Requires Linux 5.12+ for sparse file registration.
208pub struct RegisteredFile {
209 index: u32,
210 raw_fd: RawFd,
211 uring_identity: Arc<()>,
212}
213
214impl UringTarget for RegisteredFile {
215 fn as_target(&self, uring_identity: &Arc<()>) -> Target {
216 assert!(
217 Arc::ptr_eq(&self.uring_identity, uring_identity),
218 "RegisteredFile used with wrong Uring instance"
219 );
220 Target::Fixed {
221 index: self.index,
222 raw_fd: self.raw_fd,
223 }
224 }
225}
226
227// ============================================================================
228// Internal Request Types (just pointers, caller owns the buffer)
229// ============================================================================
230
231struct ReadRequest {
232 target: Target,
233 buf_ptr: *mut u8,
234 buf_len: u32,
235 offset: u64,
236}
237
238// SAFETY: The pointer is to heap-allocated memory owned by the caller's future, which awaits completion. The pointer is only dereferenced by the kernel, not by our threads.
239unsafe impl Send for ReadRequest {}
240unsafe impl Sync for ReadRequest {}
241
242struct WriteRequest {
243 target: Target,
244 buf_ptr: *const u8,
245 buf_len: u32,
246 offset: u64,
247}
248
249// SAFETY: The pointer is to heap-allocated memory owned by the caller's future, which awaits completion. The pointer is only dereferenced by the kernel, not by our threads.
250unsafe impl Send for WriteRequest {}
251unsafe impl Sync for WriteRequest {}
252
253struct SyncRequest {
254 target: Target,
255 datasync: bool,
256}
257
258struct StatxRequest {
259 target: Target,
260 /// Caller-allocated buffer for statx result. We use libc::statx for the actual storage, cast to types::statx* for the opcode.
261 statx_buf: Box<MaybeUninit<libc::statx>>,
262}
263
264struct FallocateRequest {
265 target: Target,
266 offset: u64,
267 len: u64,
268 mode: i32,
269}
270
271struct FadviseRequest {
272 target: Target,
273 offset: u64,
274 len: i64,
275 advice: i32,
276}
277
278struct FtruncateRequest {
279 target: Target,
280 len: u64,
281}
282
283struct OpenAtRequest {
284 /// Directory fd for relative paths, or AT_FDCWD for current directory.
285 dir_fd: RawFd,
286 /// Path to open. Owned to ensure validity until completion.
287 path: CString,
288 flags: i32,
289 mode: u32,
290}
291
292struct StatxPathRequest {
293 /// Directory fd for relative paths, or AT_FDCWD for current directory.
294 dir_fd: RawFd,
295 /// Path to stat. Owned to ensure validity until completion.
296 path: CString,
297 flags: i32,
298 statx_buf: Box<MaybeUninit<libc::statx>>,
299}
300
301struct CloseRequest {
302 fd: RawFd,
303}
304
305struct RenameAtRequest {
306 old_dir_fd: RawFd,
307 old_path: CString,
308 new_dir_fd: RawFd,
309 new_path: CString,
310 flags: u32,
311}
312
313struct UnlinkAtRequest {
314 dir_fd: RawFd,
315 path: CString,
316 flags: i32,
317}
318
319struct MkdirAtRequest {
320 dir_fd: RawFd,
321 path: CString,
322 mode: u32,
323}
324
325struct SymlinkAtRequest {
326 new_dir_fd: RawFd,
327 target: CString,
328 link_path: CString,
329}
330
331struct LinkAtRequest {
332 old_dir_fd: RawFd,
333 old_path: CString,
334 new_dir_fd: RawFd,
335 new_path: CString,
336 flags: i32,
337}
338
339// ============================================================================
340// Response Types
341// ============================================================================
342
343/// Result of a read operation: the buffer and actual bytes read.
344pub struct ReadResult<B> {
345 /// The buffer containing the data read.
346 pub buf: B,
347 /// Number of bytes actually read (may be less than buffer capacity at EOF). Limited to ~2GB per operation.
348 pub bytes_read: u32,
349}
350
351/// Result of a write operation: the buffer and actual bytes written.
352pub struct WriteResult<B> {
353 /// The original buffer (returned for reuse).
354 pub buf: B,
355 /// Number of bytes actually written (may be less than buffer size for non-regular files). Limited to ~2GB per operation.
356 pub bytes_written: u32,
357}
358
359// ============================================================================
360// Request Enum
361// ============================================================================
362
363enum Message {
364 Read {
365 req: ReadRequest,
366 res: oneshot::Sender<io::Result<u32>>,
367 },
368 Write {
369 req: WriteRequest,
370 res: oneshot::Sender<io::Result<u32>>,
371 },
372 Sync {
373 req: SyncRequest,
374 res: oneshot::Sender<io::Result<()>>,
375 },
376 Statx {
377 req: StatxRequest,
378 res: oneshot::Sender<io::Result<Metadata>>,
379 },
380 Fallocate {
381 req: FallocateRequest,
382 res: oneshot::Sender<io::Result<()>>,
383 },
384 Fadvise {
385 req: FadviseRequest,
386 res: oneshot::Sender<io::Result<()>>,
387 },
388 Ftruncate {
389 req: FtruncateRequest,
390 res: oneshot::Sender<io::Result<()>>,
391 },
392 OpenAt {
393 req: OpenAtRequest,
394 res: oneshot::Sender<io::Result<OwnedFd>>,
395 },
396 StatxPath {
397 req: StatxPathRequest,
398 res: oneshot::Sender<io::Result<Metadata>>,
399 },
400 Close {
401 req: CloseRequest,
402 res: oneshot::Sender<io::Result<()>>,
403 },
404 RenameAt {
405 req: RenameAtRequest,
406 res: oneshot::Sender<io::Result<()>>,
407 },
408 UnlinkAt {
409 req: UnlinkAtRequest,
410 res: oneshot::Sender<io::Result<()>>,
411 },
412 MkdirAt {
413 req: MkdirAtRequest,
414 res: oneshot::Sender<io::Result<()>>,
415 },
416 SymlinkAt {
417 req: SymlinkAtRequest,
418 res: oneshot::Sender<io::Result<()>>,
419 },
420 LinkAt {
421 req: LinkAtRequest,
422 res: oneshot::Sender<io::Result<()>>,
423 },
424}
425
426// ============================================================================
427// Uring Configuration
428// ============================================================================
429
430/// Default ring size for io_uring (16384 entries).
431///
432/// This is a conservative default that works in most environments including containers
433/// and memory-constrained systems. The kernel will further clamp this if needed via
434/// `IORING_SETUP_CLAMP`.
435pub const DEFAULT_RING_SIZE: u32 = 16384;
436
437/// Configuration options for io_uring initialization.
438///
439/// These are advanced options that affect io_uring behavior. Most users should use `UringCfg::default()`. Incorrect configuration may cause `EINVAL` errors or degraded performance.
440///
441/// # Kernel Requirements
442///
443/// Some options require specific kernel versions or capabilities:
444/// - `coop_taskrun`: Linux 5.19+
445/// - `defer_taskrun`: Linux 6.1+
446/// - `sqpoll`: Requires `CAP_SYS_NICE` capability
447/// - `iopoll`: Only works with O_DIRECT files on supported filesystems
448#[derive(Clone, Debug)]
449pub struct UringCfg {
450 /// Size of the io_uring submission/completion queues (number of entries).
451 ///
452 /// Larger values allow more operations to be batched but consume more memory.
453 /// The kernel will clamp this to the maximum supported size via `IORING_SETUP_CLAMP`.
454 ///
455 /// If you encounter `ENOMEM` errors during initialization, try reducing this value.
456 /// Defaults to [`DEFAULT_RING_SIZE`] (16384 entries).
457 pub ring_size: u32,
458
459 /// Enable cooperative task running (Linux 5.19+). When enabled, the kernel will only process completions when the application explicitly asks for them, reducing overhead.
460 pub coop_taskrun: bool,
461
462 /// Enable deferred task running (Linux 6.1+). Similar to `coop_taskrun` but with additional deferral. Requires `coop_taskrun` to also be set.
463 pub defer_taskrun: bool,
464
465 /// Enable I/O polling mode. When enabled, the kernel will poll for completions instead of using interrupts. Only works with `O_DIRECT` files on supported filesystems. Can provide lower latency but uses more CPU.
466 pub iopoll: bool,
467
468 /// Enable submission queue polling with the given idle timeout in milliseconds. When enabled, a kernel thread will poll the submission queue, eliminating the need for system calls to submit I/O. The thread will go to sleep after being idle for the specified duration. **Requires `CAP_SYS_NICE` capability.**
469 pub sqpoll: Option<u32>,
470}
471
472impl Default for UringCfg {
473 fn default() -> Self {
474 Self {
475 ring_size: DEFAULT_RING_SIZE,
476 coop_taskrun: false,
477 defer_taskrun: false,
478 iopoll: false,
479 sqpoll: None,
480 }
481 }
482}
483
484// ============================================================================
485// Uring Core
486// ============================================================================
487
488/// Handle to a shared io_uring instance.
489///
490/// This is the main entry point for performing async I/O operations via io_uring. It is cheap to clone (just clones an `Arc` internally) and safe to share across threads and async tasks.
491///
492/// # Example
493///
494/// ```ignore
495/// use uring_file::uring::{Uring, UringCfg};
496/// use std::fs::File;
497///
498/// #[tokio::main]
499/// async fn main() -> std::io::Result<()> {
500/// let uring = Uring::new(UringCfg::default())?;
501/// let file = File::open("test.txt")?;
502///
503/// // Read with library-allocated buffer
504/// let result = uring.read_at(&file, 0, 1024).await?;
505///
506/// // Read into user-provided buffer (zero-copy for custom allocators)
507/// let buf = vec![0u8; 1024];
508/// let result = uring.read_into(&file, 0, buf).await?;
509///
510/// // Register file for reduced per-operation overhead
511/// let registered = uring.register(&file)?;
512/// let result = uring.read_at(®istered, 0, 1024).await?;
513///
514/// println!("Read {} bytes", result.bytes_read);
515/// Ok(())
516/// }
517/// ```
518///
519/// # Architecture Notes
520///
521/// Internally, `Uring` spawns two background threads:
522/// 1. **Submission thread**: Receives requests via a channel, batches them, and submits them to the kernel.
523/// 2. **Completion thread**: Polls the completion queue and dispatches results back to waiting async tasks.
524///
525/// This design allows for efficient batching of submissions while maintaining a simple async API.
526// Sources:
527// - Example: https://github1s.com/tokio-rs/io-uring/blob/HEAD/examples/tcp_echo.rs
528// - liburing docs: https://unixism.net/loti/ref-liburing/completion.html
529// - Quick high-level overview: https://man.archlinux.org/man/io_uring.7.en
530// - io_uring walkthrough: https://unixism.net/2020/04/io-uring-by-example-part-1-introduction/
531// - Multithreading:
532// - https://github.com/axboe/liburing/issues/109#issuecomment-1114213402
533// - https://github.com/axboe/liburing/issues/109#issuecomment-1166378978
534// - https://github.com/axboe/liburing/issues/109#issuecomment-614911522
535// - https://github.com/axboe/liburing/issues/125
536// - https://github.com/axboe/liburing/issues/127
537// - https://github.com/axboe/liburing/issues/129
538// - https://github.com/axboe/liburing/issues/571#issuecomment-1106480309
539// - Kernel poller: https://unixism.net/loti/tutorial/sq_poll.html
540#[derive(Clone)]
541pub struct Uring {
542 // We don't use std::sync::mpsc::Sender as it is not Sync, so it's really complicated to use from any async function.
543 sender: crossbeam_channel::Sender<Message>,
544 ring: Arc<IoUring<SEntry, CEntry>>,
545 next_file_slot: Arc<AtomicU32>,
546 identity: Arc<()>,
547}
548
549/// Helper to build a submission entry for either Fd or Fixed target.
550macro_rules! build_op {
551 ($target:expr, | $fd:ident | $op:expr) => {
552 match $target {
553 Target::Fd(raw) => {
554 let $fd = types::Fd(raw);
555 $op
556 }
557 Target::Fixed { index, .. } => {
558 let $fd = types::Fixed(index);
559 $op
560 }
561 }
562 };
563}
564
565/// Helper to build a submission entry that only supports Fd (not Fixed).
566macro_rules! build_op_fd_only {
567 ($target:expr, | $fd:ident | $op:expr) => {
568 match $target {
569 Target::Fd(raw) => {
570 let $fd = types::Fd(raw);
571 $op
572 }
573 Target::Fixed { raw_fd, .. } => {
574 let $fd = types::Fd(raw_fd);
575 $op
576 }
577 }
578 };
579}
580
581/// Process a completion entry and dispatch the result.
582fn handle_completion(msg: Message, result: i32) {
583 let result: io::Result<i32> = if result < 0 {
584 Err(io::Error::from_raw_os_error(-result))
585 } else {
586 Ok(result)
587 };
588
589 match msg {
590 Message::Read { res, .. } => {
591 let _ = res.send(result.map(|n| n as u32));
592 }
593 Message::Write { res, .. } => {
594 let _ = res.send(result.map(|n| n as u32));
595 }
596 Message::Sync { res, .. } => {
597 let _ = res.send(result.map(|_| ()));
598 }
599 Message::Statx { req, res } => {
600 let outcome = result.map(|_| {
601 // SAFETY: The kernel has initialized the statx buffer
602 let statx = unsafe { (*req.statx_buf).assume_init() };
603 Metadata(statx)
604 });
605 let _ = res.send(outcome);
606 }
607 Message::Fallocate { res, .. } => {
608 let _ = res.send(result.map(|_| ()));
609 }
610 Message::Fadvise { res, .. } => {
611 let _ = res.send(result.map(|_| ()));
612 }
613 Message::Ftruncate { res, .. } => {
614 let _ = res.send(result.map(|_| ()));
615 }
616 Message::OpenAt { res, .. } => {
617 let outcome = result.map(|fd| {
618 // SAFETY: The kernel returns a valid fd on success
619 unsafe { OwnedFd::from_raw_fd(fd) }
620 });
621 let _ = res.send(outcome);
622 }
623 Message::StatxPath { req, res } => {
624 let outcome = result.map(|_| {
625 // SAFETY: The kernel has initialized the statx buffer
626 let statx = unsafe { (*req.statx_buf).assume_init() };
627 Metadata(statx)
628 });
629 let _ = res.send(outcome);
630 }
631 Message::Close { res, .. } => {
632 let _ = res.send(result.map(|_| ()));
633 }
634 Message::RenameAt { res, .. } => {
635 let _ = res.send(result.map(|_| ()));
636 }
637 Message::UnlinkAt { res, .. } => {
638 let _ = res.send(result.map(|_| ()));
639 }
640 Message::MkdirAt { res, .. } => {
641 let _ = res.send(result.map(|_| ()));
642 }
643 Message::SymlinkAt { res, .. } => {
644 let _ = res.send(result.map(|_| ()));
645 }
646 Message::LinkAt { res, .. } => {
647 let _ = res.send(result.map(|_| ()));
648 }
649 }
650}
651
652impl Uring {
653 /// Create a new io_uring instance with the given configuration.
654 ///
655 /// This spawns two background threads for submission and completion handling. The threads will automatically stop when all `Uring` handles are dropped.
656 ///
657 /// # Errors
658 ///
659 /// Returns an error if the io_uring cannot be created (e.g., kernel too old, resource limits exceeded, or insufficient permissions).
660 pub fn new(cfg: UringCfg) -> io::Result<Self> {
661 let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
662 let pending: Arc<DashMap<u64, Message>> = Default::default();
663
664 let ring = {
665 let mut builder = IoUring::<SEntry, CEntry>::builder();
666 // Use IORING_SETUP_CLAMP to let the kernel reduce the ring size if our requested size exceeds system limits. This is safer than failing outright.
667 builder.setup_clamp();
668 if cfg.coop_taskrun {
669 builder.setup_coop_taskrun();
670 };
671 if cfg.defer_taskrun {
672 builder.setup_defer_taskrun();
673 };
674 if cfg.iopoll {
675 builder.setup_iopoll();
676 }
677 if let Some(sqpoll) = cfg.sqpoll {
678 builder.setup_sqpoll(sqpoll);
679 };
680 builder.build(cfg.ring_size)?
681 };
682
683 // Pre-allocate sparse file table for registration (Linux 5.12+). If this fails, file registration won't work but unregistered fds will still function.
684 let _ = ring.submitter().register_files_sparse(MAX_REGISTERED_FILES);
685
686 let ring = Arc::new(ring);
687
688 // Submission thread.
689 thread::spawn({
690 let pending = pending.clone();
691 let ring = ring.clone();
692 // This is outside the loop to avoid reallocation each time.
693 let mut msgbuf = VecDeque::new();
694 move || {
695 // SAFETY: We ensure that the submission queue is only accessed from this single thread. The completion queue is accessed from a separate thread.
696 let mut submission = unsafe { ring.submission_shared() };
697 let mut next_id = 0u64;
698
699 // If this loop exits, it means we've dropped all `Uring` handles and can safely stop.
700 while let Ok(init_msg) = receiver.recv() {
701 // Process multiple messages at once to avoid too many io_uring submits.
702 msgbuf.push_back(init_msg);
703 while let Ok(msg) = receiver.try_recv() {
704 msgbuf.push_back(msg);
705 }
706
707 // How the io_uring submission queue works:
708 // - The buffer is shared between the kernel and userspace.
709 // - There are atomic head and tail indices that allow them to be shared mutably between kernel and userspace safely.
710 // - The Rust library we're using abstracts over this by caching the head and tail as local values. Once we've made our inserts, we update the atomic tail and then tell the kernel to consume some of the queue. When we update the atomic tail, we also check the atomic head and update our local cached value; some entries may have been consumed by the kernel in some other thread since we last checked and we may actually have more free space than we thought.
711 while let Some(msg) = msgbuf.pop_front() {
712 let id = next_id;
713 next_id = next_id.wrapping_add(1);
714
715 let submission_entry = match &msg {
716 Message::Read { req, .. } => {
717 build_op!(req.target, |fd| opcode::Read::new(
718 fd,
719 req.buf_ptr,
720 req.buf_len
721 )
722 .offset(req.offset)
723 .build()
724 .user_data(id))
725 }
726 Message::Write { req, .. } => {
727 build_op!(req.target, |fd| opcode::Write::new(
728 fd,
729 req.buf_ptr,
730 req.buf_len
731 )
732 .offset(req.offset)
733 .build()
734 .user_data(id))
735 }
736 Message::Sync { req, .. } => {
737 build_op!(req.target, |fd| {
738 let mut fsync = opcode::Fsync::new(fd);
739 if req.datasync {
740 fsync = fsync.flags(types::FsyncFlags::DATASYNC);
741 }
742 fsync.build().user_data(id)
743 })
744 }
745 Message::Statx { req, .. } => {
746 const STATX_BASIC_STATS: u32 = 0x000007ff; // Request all basic stat fields
747 const AT_EMPTY_PATH: i32 = 0x1000; // Interpret fd as the file itself, not a directory
748 static EMPTY_PATH: &std::ffi::CStr = c""; // Empty path since we use AT_EMPTY_PATH
749
750 // Cast libc::statx* to types::statx* - the opcode uses an opaque type but the kernel writes the actual statx struct
751 let statx_ptr = req.statx_buf.as_ptr() as *mut types::statx;
752
753 // Note: Statx doesn't support Fixed in the io-uring crate, so we fall back to raw fd
754 build_op_fd_only!(req.target, |fd| opcode::Statx::new(
755 fd,
756 EMPTY_PATH.as_ptr(),
757 statx_ptr
758 )
759 .flags(AT_EMPTY_PATH)
760 .mask(STATX_BASIC_STATS)
761 .build()
762 .user_data(id))
763 }
764 Message::Fallocate { req, .. } => {
765 build_op!(req.target, |fd| opcode::Fallocate::new(fd, req.len)
766 .offset(req.offset)
767 .mode(req.mode)
768 .build()
769 .user_data(id))
770 }
771 Message::Fadvise { req, .. } => {
772 build_op!(req.target, |fd| opcode::Fadvise::new(
773 fd, req.len, req.advice
774 )
775 .offset(req.offset)
776 .build()
777 .user_data(id))
778 }
779 Message::Ftruncate { req, .. } => {
780 build_op!(req.target, |fd| opcode::Ftruncate::new(fd, req.len)
781 .build()
782 .user_data(id))
783 }
784 Message::OpenAt { req, .. } => {
785 opcode::OpenAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
786 .flags(req.flags)
787 .mode(req.mode)
788 .build()
789 .user_data(id)
790 }
791 Message::StatxPath { req, .. } => {
792 const STATX_BASIC_STATS: u32 = 0x000007ff;
793 let statx_ptr = req.statx_buf.as_ptr() as *mut types::statx;
794
795 opcode::Statx::new(types::Fd(req.dir_fd), req.path.as_ptr(), statx_ptr)
796 .flags(req.flags)
797 .mask(STATX_BASIC_STATS)
798 .build()
799 .user_data(id)
800 }
801 Message::Close { req, .. } => {
802 opcode::Close::new(types::Fd(req.fd)).build().user_data(id)
803 }
804 Message::RenameAt { req, .. } => opcode::RenameAt::new(
805 types::Fd(req.old_dir_fd),
806 req.old_path.as_ptr(),
807 types::Fd(req.new_dir_fd),
808 req.new_path.as_ptr(),
809 )
810 .flags(req.flags)
811 .build()
812 .user_data(id),
813 Message::UnlinkAt { req, .. } => {
814 opcode::UnlinkAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
815 .flags(req.flags)
816 .build()
817 .user_data(id)
818 }
819 Message::MkdirAt { req, .. } => {
820 opcode::MkDirAt::new(types::Fd(req.dir_fd), req.path.as_ptr())
821 .mode(req.mode)
822 .build()
823 .user_data(id)
824 }
825 Message::SymlinkAt { req, .. } => opcode::SymlinkAt::new(
826 types::Fd(req.new_dir_fd),
827 req.target.as_ptr(),
828 req.link_path.as_ptr(),
829 )
830 .build()
831 .user_data(id),
832 Message::LinkAt { req, .. } => opcode::LinkAt::new(
833 types::Fd(req.old_dir_fd),
834 req.old_path.as_ptr(),
835 types::Fd(req.new_dir_fd),
836 req.new_path.as_ptr(),
837 )
838 .flags(req.flags)
839 .build()
840 .user_data(id),
841 };
842
843 // Insert before submitting so the completion handler can find it.
844 pending.insert(id, msg);
845
846 if submission.is_full() {
847 submission.sync();
848 ring.submit_and_wait(1).unwrap();
849 }
850
851 // SAFETY: The submission entry references memory owned by the caller's future, which is awaiting completion.
852 unsafe {
853 submission.push(&submission_entry).unwrap();
854 };
855 }
856
857 submission.sync();
858 // This is still necessary even with sqpoll, as our kernel thread may have gone to sleep.
859 ring.submit().unwrap();
860 }
861 }
862 });
863
864 // Completion thread.
865 thread::spawn({
866 let pending = pending.clone();
867 let ring = ring.clone();
868 move || {
869 // SAFETY: We ensure that the completion queue is only accessed from this single thread. The submission queue is accessed from a separate thread.
870 let mut completion = unsafe { ring.completion_shared() };
871
872 // TODO: Stop this loop if all `Uring` handles have been dropped and there are no pending requests. Currently this thread runs forever.
873 loop {
874 let Some(e) = completion.next() else {
875 // No completions available, wait for one.
876 ring.submit_and_wait(1).unwrap();
877 completion.sync();
878 continue;
879 };
880
881 let id = e.user_data();
882 let (_, req) = pending
883 .remove(&id)
884 .expect("completion for unknown request id");
885 handle_completion(req, e.result());
886 }
887 }
888 });
889
890 Ok(Self {
891 sender,
892 ring,
893 next_file_slot: Arc::new(AtomicU32::new(0)),
894 identity: Arc::new(()),
895 })
896 }
897
898 /// Register a file for optimized I/O operations.
899 ///
900 /// Registered files use kernel-side file references, avoiding fd table lookups on each operation. This can significantly improve performance for high-frequency I/O.
901 ///
902 /// # Errors
903 ///
904 /// Returns an error if the maximum number of registered files has been reached, or if file registration fails (e.g., kernel too old).
905 ///
906 /// # Kernel Requirements
907 ///
908 /// Requires Linux 5.12+ for sparse file registration.
909 pub fn register(&self, file: &impl AsRawFd) -> io::Result<RegisteredFile> {
910 let raw_fd = file.as_raw_fd();
911 let slot = self.next_file_slot.fetch_add(1, Ordering::SeqCst);
912 if slot >= MAX_REGISTERED_FILES {
913 return Err(io::Error::new(
914 io::ErrorKind::Other,
915 "maximum registered files exceeded",
916 ));
917 }
918
919 self
920 .ring
921 .submitter()
922 .register_files_update(slot, &[raw_fd])?;
923
924 Ok(RegisteredFile {
925 index: slot,
926 raw_fd,
927 uring_identity: self.identity.clone(),
928 })
929 }
930
931 /// Send a message to the submission thread.
932 fn send(&self, msg: Message) {
933 self.sender.send(msg).expect("uring submission thread dead");
934 }
935
936 /// Read into a user-provided buffer. This is the primitive read operation that accepts any buffer type implementing [`IoBufMut`].
937 ///
938 /// The buffer is returned along with the number of bytes read. This allows buffer reuse and supports custom allocators (e.g., aligned buffers for O_DIRECT).
939 pub async fn read_into_at<B: IoBufMut>(
940 &self,
941 file: &impl UringTarget,
942 offset: impl TryInto<u64>,
943 mut buf: B,
944 ) -> io::Result<ReadResult<B>> {
945 let offset: u64 = offset
946 .try_into()
947 .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
948 let target = file.as_target(&self.identity);
949 let ptr = buf.as_mut_ptr();
950 let cap = buf.capacity();
951 let (tx, rx) = oneshot::channel();
952 self.send(Message::Read {
953 req: ReadRequest {
954 target,
955 buf_ptr: ptr,
956 buf_len: cap.try_into().unwrap(),
957 offset,
958 },
959 res: tx,
960 });
961 let bytes_read = rx.await.expect("uring completion channel dropped")?;
962 Ok(ReadResult { buf, bytes_read })
963 }
964
965 /// Read from a file at the specified offset, allocating a buffer internally.
966 ///
967 /// This is a convenience wrapper around [`read_into_at`](Self::read_into_at) that allocates a `Vec<u8>`. For zero-copy or custom allocators, use `read_into_at` directly.
968 pub async fn read_at(
969 &self,
970 file: &impl UringTarget,
971 offset: impl TryInto<u64>,
972 len: impl TryInto<u32>,
973 ) -> io::Result<ReadResult<Vec<u8>>> {
974 let len: u32 = len
975 .try_into()
976 .map_err(|_| io::Error::other("len exceeds u32::MAX"))?;
977 let buf = vec![0u8; len as usize];
978 self.read_into_at(file, offset, buf).await
979 }
980
981 /// Write a buffer to a file at the specified offset. Accepts any buffer type implementing [`IoBuf`].
982 ///
983 /// The buffer is returned along with the number of bytes written. This allows buffer reuse and supports custom allocators.
984 pub async fn write_at<B: IoBuf>(
985 &self,
986 file: &impl UringTarget,
987 offset: impl TryInto<u64>,
988 buf: B,
989 ) -> io::Result<WriteResult<B>> {
990 let offset: u64 = offset
991 .try_into()
992 .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
993 let target = file.as_target(&self.identity);
994 let ptr = buf.as_ptr();
995 let len = buf.len();
996 let (tx, rx) = oneshot::channel();
997 self.send(Message::Write {
998 req: WriteRequest {
999 target,
1000 buf_ptr: ptr,
1001 buf_len: len.try_into().unwrap(),
1002 offset,
1003 },
1004 res: tx,
1005 });
1006 let bytes_written = rx.await.expect("uring completion channel dropped")?;
1007 Ok(WriteResult { buf, bytes_written })
1008 }
1009
1010 /// Read exactly `len` bytes from a file at the specified offset.
1011 ///
1012 /// Returns an error if fewer bytes are available (e.g., at EOF). This is useful when you know the exact size of data to read and want to fail rather than handle partial reads.
1013 pub async fn read_exact_at(
1014 &self,
1015 file: &impl UringTarget,
1016 offset: impl TryInto<u64>,
1017 len: impl TryInto<u32>,
1018 ) -> io::Result<Vec<u8>> {
1019 let len: u32 = len
1020 .try_into()
1021 .map_err(|_| io::Error::other("len exceeds u32::MAX"))?;
1022 let result = self.read_at(file, offset, len).await?;
1023 if result.bytes_read < len {
1024 return Err(io::Error::new(
1025 io::ErrorKind::UnexpectedEof,
1026 format!("expected {} bytes, got {}", len, result.bytes_read),
1027 ));
1028 }
1029 Ok(result.buf)
1030 }
1031
1032 /// Write all bytes to a file at the specified offset.
1033 ///
1034 /// Loops on short writes until all data is written. Returns an error if a write returns 0 bytes (indicating the write cannot proceed).
1035 pub async fn write_all_at(
1036 &self,
1037 file: &impl UringTarget,
1038 offset: impl TryInto<u64>,
1039 data: &[u8],
1040 ) -> io::Result<()> {
1041 let mut offset: u64 = offset
1042 .try_into()
1043 .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
1044 let mut written = 0usize;
1045 while written < data.len() {
1046 let remaining = data.len() - written;
1047 let chunk_size = min(remaining, URING_LEN_MAX as usize);
1048 let chunk = data[written..written + chunk_size].to_vec();
1049 let result = self.write_at(file, offset, chunk).await?;
1050 if result.bytes_written == 0 {
1051 return Err(io::Error::new(
1052 io::ErrorKind::WriteZero,
1053 "failed to write any bytes",
1054 ));
1055 }
1056 written += result.bytes_written as usize;
1057 offset += result.bytes_written as u64;
1058 }
1059 Ok(())
1060 }
1061
1062 /// Synchronize file data and metadata to disk (fsync). This ensures that all data and metadata modifications are flushed to the underlying storage device. Even when using direct I/O, this is necessary to ensure the device itself has flushed any internal caches.
1063 ///
1064 /// **Note on ordering**: io_uring does not guarantee ordering between operations. If you need to ensure writes complete before fsync, you should await the write first, then call fsync.
1065 pub async fn sync(&self, file: &impl UringTarget) -> io::Result<()> {
1066 let target = file.as_target(&self.identity);
1067 let (tx, rx) = oneshot::channel();
1068 self.send(Message::Sync {
1069 req: SyncRequest {
1070 target,
1071 datasync: false,
1072 },
1073 res: tx,
1074 });
1075 rx.await.expect("uring completion channel dropped")
1076 }
1077
1078 /// Synchronize file data to disk (fdatasync). Like [`sync`](Self::sync), but only flushes data, not metadata (unless the metadata is required to retrieve the data). This can be faster than a full fsync.
1079 pub async fn datasync(&self, file: &impl UringTarget) -> io::Result<()> {
1080 let target = file.as_target(&self.identity);
1081 let (tx, rx) = oneshot::channel();
1082 self.send(Message::Sync {
1083 req: SyncRequest {
1084 target,
1085 datasync: true,
1086 },
1087 res: tx,
1088 });
1089 rx.await.expect("uring completion channel dropped")
1090 }
1091
1092 /// Get file status information (statx). This is the io_uring equivalent of `fstat`/`statx`. It returns metadata about the file including size, permissions, timestamps, etc. Requires Linux 5.6+. On older kernels, this will fail with `EINVAL`.
1093 pub async fn statx(&self, file: &impl UringTarget) -> io::Result<Metadata> {
1094 let target = file.as_target(&self.identity);
1095 let statx_buf = Box::new(MaybeUninit::<libc::statx>::uninit());
1096 let (tx, rx) = oneshot::channel();
1097 self.send(Message::Statx {
1098 req: StatxRequest { target, statx_buf },
1099 res: tx,
1100 });
1101 rx.await.expect("uring completion channel dropped")
1102 }
1103
1104 /// Pre-allocate or deallocate space for a file (fallocate). This can be used to pre-allocate space to avoid fragmentation, punch holes in sparse files, or zero-fill regions. Use `libc::FALLOC_FL_*` constants for mode flags. Requires Linux 5.6+.
1105 pub async fn fallocate(
1106 &self,
1107 file: &impl UringTarget,
1108 offset: impl TryInto<u64>,
1109 len: impl TryInto<u64>,
1110 mode: i32,
1111 ) -> io::Result<()> {
1112 let offset: u64 = offset
1113 .try_into()
1114 .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
1115 let len: u64 = len
1116 .try_into()
1117 .map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
1118 let target = file.as_target(&self.identity);
1119 let (tx, rx) = oneshot::channel();
1120 self.send(Message::Fallocate {
1121 req: FallocateRequest {
1122 target,
1123 offset,
1124 len,
1125 mode,
1126 },
1127 res: tx,
1128 });
1129 rx.await.expect("uring completion channel dropped")
1130 }
1131
1132 /// Advise the kernel about expected file access patterns (fadvise). This is a hint to the kernel about how you intend to access a file region. The kernel may use this to optimize readahead, caching, etc. Use `libc::POSIX_FADV_*` constants for advice values. Requires Linux 5.6+.
1133 pub async fn fadvise(
1134 &self,
1135 file: &impl UringTarget,
1136 offset: impl TryInto<u64>,
1137 len: impl TryInto<u64>,
1138 advice: i32,
1139 ) -> io::Result<()> {
1140 let offset: u64 = offset
1141 .try_into()
1142 .map_err(|_| io::Error::other("offset exceeds u64::MAX"))?;
1143 let len: u64 = len
1144 .try_into()
1145 .map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
1146 let len: i64 = len
1147 .try_into()
1148 .map_err(|_| io::Error::other("len exceeds i64::MAX"))?;
1149 let target = file.as_target(&self.identity);
1150 let (tx, rx) = oneshot::channel();
1151 self.send(Message::Fadvise {
1152 req: FadviseRequest {
1153 target,
1154 offset,
1155 len,
1156 advice,
1157 },
1158 res: tx,
1159 });
1160 rx.await.expect("uring completion channel dropped")
1161 }
1162
1163 /// Truncate a file to a specified length (ftruncate). If the file is larger than the specified length, the extra data is lost. If the file is smaller, it is extended and the extended part reads as zeros. Requires Linux 6.9+. On older kernels, this will fail with `EINVAL`.
1164 pub async fn ftruncate(&self, file: &impl UringTarget, len: impl TryInto<u64>) -> io::Result<()> {
1165 let len: u64 = len
1166 .try_into()
1167 .map_err(|_| io::Error::other("len exceeds u64::MAX"))?;
1168 let target = file.as_target(&self.identity);
1169 let (tx, rx) = oneshot::channel();
1170 self.send(Message::Ftruncate {
1171 req: FtruncateRequest { target, len },
1172 res: tx,
1173 });
1174 rx.await.expect("uring completion channel dropped")
1175 }
1176
1177 /// Open a file asynchronously. This is the io_uring equivalent of `open(2)`/`openat(2)`. Requires Linux 5.6+.
1178 ///
1179 /// # Arguments
1180 ///
1181 /// * `path` - Path to open (any `CStr`-like type: `&CStr`, `CString`, `c"literal"`).
1182 /// * `flags` - Open flags from `libc` (e.g., `libc::O_RDONLY`, `libc::O_RDWR | libc::O_CREAT`).
1183 /// * `mode` - File mode for creation (only used with `O_CREAT`).
1184 ///
1185 /// # Returns
1186 ///
1187 /// Returns an `OwnedFd` on success. The fd is automatically closed when dropped.
1188 pub async fn open(&self, path: impl AsRef<Path>, flags: i32, mode: u32) -> io::Result<OwnedFd> {
1189 self.open_at(libc::AT_FDCWD, path, flags, mode).await
1190 }
1191
1192 /// Open a file relative to a directory fd. This is useful for safe path traversal and avoiding TOCTOU races. Requires Linux 5.6+.
1193 ///
1194 /// # Arguments
1195 ///
1196 /// * `dir_fd` - Directory fd for relative paths, or `libc::AT_FDCWD` for current directory.
1197 /// * `path` - Path to open relative to `dir_fd`.
1198 /// * `flags` - Open flags.
1199 /// * `mode` - File mode for creation.
1200 pub async fn open_at(
1201 &self,
1202 dir_fd: RawFd,
1203 path: impl AsRef<Path>,
1204 flags: i32,
1205 mode: u32,
1206 ) -> io::Result<OwnedFd> {
1207 let path = path_to_cstring(path.as_ref())?;
1208 let (tx, rx) = oneshot::channel();
1209 self.send(Message::OpenAt {
1210 req: OpenAtRequest {
1211 dir_fd,
1212 path,
1213 flags,
1214 mode,
1215 },
1216 res: tx,
1217 });
1218 rx.await.expect("uring completion channel dropped")
1219 }
1220
1221 /// Get file metadata by path without opening the file. This is the io_uring equivalent of `stat(2)`/`statx(2)`. Requires Linux 5.6+.
1222 ///
1223 /// Unlike `statx()` which operates on an open fd, this method stats the path directly.
1224 pub async fn statx_path(&self, path: impl AsRef<Path>) -> io::Result<Metadata> {
1225 self.statx_at(libc::AT_FDCWD, path, 0).await
1226 }
1227
1228 /// Get file metadata relative to a directory fd. This allows safe path traversal and additional flags for controlling symlink behavior. Requires Linux 5.6+.
1229 ///
1230 /// # Arguments
1231 ///
1232 /// * `dir_fd` - Directory fd for relative paths, or `libc::AT_FDCWD` for current directory.
1233 /// * `path` - Path to stat relative to `dir_fd`.
1234 /// * `flags` - Flags from `libc` (e.g., `libc::AT_SYMLINK_NOFOLLOW` to not follow symlinks).
1235 pub async fn statx_at(
1236 &self,
1237 dir_fd: RawFd,
1238 path: impl AsRef<Path>,
1239 flags: i32,
1240 ) -> io::Result<Metadata> {
1241 let path = path_to_cstring(path.as_ref())?;
1242 let statx_buf = Box::new(MaybeUninit::<libc::statx>::uninit());
1243 let (tx, rx) = oneshot::channel();
1244 self.send(Message::StatxPath {
1245 req: StatxPathRequest {
1246 dir_fd,
1247 path,
1248 flags,
1249 statx_buf,
1250 },
1251 res: tx,
1252 });
1253 rx.await.expect("uring completion channel dropped")
1254 }
1255
1256 /// Close a file descriptor asynchronously. This is the io_uring equivalent of `close(2)`. Requires Linux 5.6+.
1257 ///
1258 /// Takes ownership of the fd to prevent the automatic synchronous close on drop. This is useful when you want to:
1259 /// - Handle close errors (which are silently ignored by `OwnedFd::drop`)
1260 /// - Batch close operations with other io_uring operations
1261 /// - Avoid blocking the async runtime on close
1262 pub async fn close(&self, fd: impl IntoRawFd) -> io::Result<()> {
1263 let raw_fd = fd.into_raw_fd();
1264 let (tx, rx) = oneshot::channel();
1265 self.send(Message::Close {
1266 req: CloseRequest { fd: raw_fd },
1267 res: tx,
1268 });
1269 rx.await.expect("uring completion channel dropped")
1270 }
1271
1272 /// Rename a file asynchronously. This is the io_uring equivalent of `rename(2)`. Requires Linux 5.11+.
1273 pub async fn rename(
1274 &self,
1275 old_path: impl AsRef<Path>,
1276 new_path: impl AsRef<Path>,
1277 ) -> io::Result<()> {
1278 self
1279 .rename_at(libc::AT_FDCWD, old_path, libc::AT_FDCWD, new_path, 0)
1280 .await
1281 }
1282
1283 /// Rename a file relative to directory fds. This is the io_uring equivalent of `renameat2(2)`. Requires Linux 5.11+.
1284 ///
1285 /// Flags can include `libc::RENAME_NOREPLACE`, `libc::RENAME_EXCHANGE`, etc.
1286 pub async fn rename_at(
1287 &self,
1288 old_dir_fd: RawFd,
1289 old_path: impl AsRef<Path>,
1290 new_dir_fd: RawFd,
1291 new_path: impl AsRef<Path>,
1292 flags: u32,
1293 ) -> io::Result<()> {
1294 let old_path = path_to_cstring(old_path.as_ref())?;
1295 let new_path = path_to_cstring(new_path.as_ref())?;
1296 let (tx, rx) = oneshot::channel();
1297 self.send(Message::RenameAt {
1298 req: RenameAtRequest {
1299 old_dir_fd,
1300 old_path,
1301 new_dir_fd,
1302 new_path,
1303 flags,
1304 },
1305 res: tx,
1306 });
1307 rx.await.expect("uring completion channel dropped")
1308 }
1309
1310 /// Delete a file or empty directory. This is the io_uring equivalent of `unlink(2)`. Requires Linux 5.11+.
1311 pub async fn unlink(&self, path: impl AsRef<Path>) -> io::Result<()> {
1312 self.unlink_at(libc::AT_FDCWD, path, 0).await
1313 }
1314
1315 /// Delete a directory. This is the io_uring equivalent of `rmdir(2)`. Requires Linux 5.11+.
1316 pub async fn rmdir(&self, path: impl AsRef<Path>) -> io::Result<()> {
1317 self
1318 .unlink_at(libc::AT_FDCWD, path, libc::AT_REMOVEDIR)
1319 .await
1320 }
1321
1322 /// Delete a file or directory relative to a directory fd. This is the io_uring equivalent of `unlinkat(2)`. Requires Linux 5.11+.
1323 ///
1324 /// Use `libc::AT_REMOVEDIR` flag to remove directories.
1325 pub async fn unlink_at(
1326 &self,
1327 dir_fd: RawFd,
1328 path: impl AsRef<Path>,
1329 flags: i32,
1330 ) -> io::Result<()> {
1331 let path = path_to_cstring(path.as_ref())?;
1332 let (tx, rx) = oneshot::channel();
1333 self.send(Message::UnlinkAt {
1334 req: UnlinkAtRequest {
1335 dir_fd,
1336 path,
1337 flags,
1338 },
1339 res: tx,
1340 });
1341 rx.await.expect("uring completion channel dropped")
1342 }
1343
1344 /// Create a directory. This is the io_uring equivalent of `mkdir(2)`. Requires Linux 5.15+.
1345 pub async fn mkdir(&self, path: impl AsRef<Path>, mode: u32) -> io::Result<()> {
1346 self.mkdir_at(libc::AT_FDCWD, path, mode).await
1347 }
1348
1349 /// Create a directory relative to a directory fd. This is the io_uring equivalent of `mkdirat(2)`. Requires Linux 5.15+.
1350 pub async fn mkdir_at(&self, dir_fd: RawFd, path: impl AsRef<Path>, mode: u32) -> io::Result<()> {
1351 let path = path_to_cstring(path.as_ref())?;
1352 let (tx, rx) = oneshot::channel();
1353 self.send(Message::MkdirAt {
1354 req: MkdirAtRequest { dir_fd, path, mode },
1355 res: tx,
1356 });
1357 rx.await.expect("uring completion channel dropped")
1358 }
1359
1360 /// Create a symbolic link. This is the io_uring equivalent of `symlink(2)`. Requires Linux 5.11+.
1361 pub async fn symlink(
1362 &self,
1363 target: impl AsRef<Path>,
1364 link_path: impl AsRef<Path>,
1365 ) -> io::Result<()> {
1366 self.symlink_at(target, libc::AT_FDCWD, link_path).await
1367 }
1368
1369 /// Create a symbolic link relative to a directory fd. This is the io_uring equivalent of `symlinkat(2)`. Requires Linux 5.11+.
1370 pub async fn symlink_at(
1371 &self,
1372 target: impl AsRef<Path>,
1373 new_dir_fd: RawFd,
1374 link_path: impl AsRef<Path>,
1375 ) -> io::Result<()> {
1376 let target = path_to_cstring(target.as_ref())?;
1377 let link_path = path_to_cstring(link_path.as_ref())?;
1378 let (tx, rx) = oneshot::channel();
1379 self.send(Message::SymlinkAt {
1380 req: SymlinkAtRequest {
1381 new_dir_fd,
1382 target,
1383 link_path,
1384 },
1385 res: tx,
1386 });
1387 rx.await.expect("uring completion channel dropped")
1388 }
1389
1390 /// Create a hard link. This is the io_uring equivalent of `link(2)`. Requires Linux 5.11+.
1391 pub async fn hard_link(
1392 &self,
1393 original: impl AsRef<Path>,
1394 link: impl AsRef<Path>,
1395 ) -> io::Result<()> {
1396 self
1397 .hard_link_at(libc::AT_FDCWD, original, libc::AT_FDCWD, link, 0)
1398 .await
1399 }
1400
1401 /// Create a hard link relative to directory fds. This is the io_uring equivalent of `linkat(2)`. Requires Linux 5.11+.
1402 pub async fn hard_link_at(
1403 &self,
1404 old_dir_fd: RawFd,
1405 original: impl AsRef<Path>,
1406 new_dir_fd: RawFd,
1407 link: impl AsRef<Path>,
1408 flags: i32,
1409 ) -> io::Result<()> {
1410 let old_path = path_to_cstring(original.as_ref())?;
1411 let new_path = path_to_cstring(link.as_ref())?;
1412 let (tx, rx) = oneshot::channel();
1413 self.send(Message::LinkAt {
1414 req: LinkAtRequest {
1415 old_dir_fd,
1416 old_path,
1417 new_dir_fd,
1418 new_path,
1419 flags,
1420 },
1421 res: tx,
1422 });
1423 rx.await.expect("uring completion channel dropped")
1424 }
1425}