Skip to main content

tokio/io/uring/
read.rs

1use crate::io::blocking::Buf;
2use crate::io::uring::utils::{ArcFd, UringFd};
3use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};
4
5use io_uring::{opcode, types};
6use std::fmt;
7use std::io::{self, Error};
8use std::os::fd::OwnedFd;
9
10/// Trait for buffers that can be used with io-uring read operations.
11pub(crate) trait ReadBuffer: Send + 'static {
12    /// Prepare the buffer for a read operation.
13    /// Returns a pointer and length for the io-uring SQE.
14    fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32);
15
16    /// Complete a read of `n` bytes.
17    ///
18    /// # Safety
19    ///
20    /// The caller must ensure the kernel wrote exactly `n` bytes
21    /// into the buffer at the pointer returned by `uring_read_prepare`.
22    unsafe fn uring_read_complete(&mut self, n: u32);
23}
24
25impl ReadBuffer for Vec<u8> {
26    fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) {
27        assert!(self.spare_capacity_mut().len() >= max_len);
28        let ptr = self.spare_capacity_mut().as_mut_ptr().cast();
29        (ptr, max_len as u32)
30    }
31
32    unsafe fn uring_read_complete(&mut self, n: u32) {
33        // SAFETY: the kernel wrote `n` bytes into spare capacity starting
34        // at the old self.len(), so self.len() + n bytes are now initialized.
35        unsafe { self.set_len(self.len() + n as usize) };
36    }
37}
38
39impl ReadBuffer for Buf {
40    fn uring_read_prepare(&mut self, max_len: usize) -> (*mut u8, u32) {
41        self.prepare_uring_read(max_len)
42    }
43
44    unsafe fn uring_read_complete(&mut self, n: u32) {
45        // SAFETY: caller guarantees kernel wrote exactly n bytes.
46        unsafe { self.complete_uring_read(n as usize) };
47    }
48}
49
50pub(crate) struct Read<B, F = ArcFd> {
51    fd: F,
52    buf: B,
53}
54
55impl<B: fmt::Debug, F> fmt::Debug for Read<B, F> {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        f.debug_struct("Read")
58            .field("buf", &self.buf)
59            .finish_non_exhaustive()
60    }
61}
62
63impl<B: ReadBuffer, F: UringFd> Completable for Read<B, F> {
64    type Output = (io::Result<u32>, F, B);
65
66    fn complete(self, cqe: CqeResult) -> Self::Output {
67        let mut buf = self.buf;
68        if let Ok(len) = cqe.result {
69            // SAFETY: kernel wrote exactly `len` bytes into the prepared buffer.
70            unsafe { buf.uring_read_complete(len) };
71        }
72        (cqe.result, self.fd, buf)
73    }
74
75    fn complete_with_error(self, err: Error) -> Self::Output {
76        (Err(err), self.fd, self.buf)
77    }
78}
79
80impl Cancellable for Read<Vec<u8>, OwnedFd> {
81    fn cancel(self) -> CancelData {
82        CancelData::ReadVec(self)
83    }
84}
85
86impl Cancellable for Read<Buf, ArcFd> {
87    fn cancel(self) -> CancelData {
88        CancelData::ReadBuf(self)
89    }
90}
91
92impl<B, F> Op<Read<B, F>>
93where
94    B: ReadBuffer + fmt::Debug,
95    F: UringFd,
96    Read<B, F>: Cancellable,
97{
98    /// Submit a read operation via io-uring.
99    ///
100    /// `max_len` is the maximum number of bytes to read.
101    /// `offset` is the file offset; use `u64::MAX` for the current cursor.
102    pub(crate) fn read_at(fd: F, mut buf: B, max_len: usize, offset: u64) -> Self {
103        let (ptr, len) = buf.uring_read_prepare(max_len);
104
105        let sqe = opcode::Read::new(types::Fd(UringFd::as_raw_fd(&fd)), ptr, len)
106            .offset(offset)
107            .build();
108
109        // SAFETY: `fd` and `buf`, which owns the heap buffer, are moved into `Read`,
110        // which is held by the `Op` for the entire duration of the io-uring operation.
111        // The buffer pointer remains valid because Vec heap data doesn't move.
112        unsafe { Op::new(sqe, Read { fd, buf }) }
113    }
114}