Skip to main content

tokio/fs/
read_uring.rs

1use crate::fs::OpenOptions;
2use crate::runtime::driver::op::Op;
3
4use std::io;
5use std::io::ErrorKind;
6use std::os::fd::OwnedFd;
7use std::path::Path;
8
9// this algorithm is inspired from rust std lib version 1.90.0
10// https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409
11const PROBE_SIZE: usize = 32;
12const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;
13
14// Max bytes we can read using io uring submission at a time
15// SAFETY: cannot be higher than u32::MAX for safe cast
16// Set to read max 64 MiB at time
17const MAX_READ_SIZE: usize = 64 * 1024 * 1024;
18
19pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
20    let file = OpenOptions::new().read(true).open(path).await?;
21
22    // TODO: use io uring in the future to obtain metadata
23    let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();
24
25    let fd: OwnedFd = file
26        .try_into_std()
27        .expect("unexpected in-flight operation detected")
28        .into();
29
30    let mut buf = Vec::new();
31
32    if let Some(size_hint) = size_hint {
33        buf.try_reserve(size_hint)?;
34    }
35
36    read_to_end_uring(fd, buf).await
37}
38
39async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
40    let mut offset = 0;
41    let start_cap = buf.capacity();
42
43    loop {
44        if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() >= PROBE_SIZE {
45            // The buffer might be an exact fit. Let's read into a probe buffer
46            // and see if it returns `Ok(0)`. If so, we've avoided an
47            // unnecessary increasing of the capacity. But if not, append the
48            // probe buffer to the primary buffer and let its capacity grow.
49            let (r_fd, r_buf, is_eof) = small_probe_read(fd, buf, &mut offset).await?;
50
51            if is_eof {
52                return Ok(r_buf);
53            }
54
55            buf = r_buf;
56            fd = r_fd;
57        }
58
59        // buf is full, need more capacity
60        if buf.len() == buf.capacity() {
61            buf.try_reserve(PROBE_SIZE)?;
62        }
63
64        // prepare the spare capacity to be read into
65        let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);
66
67        // buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
68        // is less than u32::MAX
69        let read_len = u32::try_from(buf_len).expect("buf_len must always fit in u32");
70
71        // read into spare capacity
72        let (r_fd, r_buf, is_eof) = op_read(fd, buf, &mut offset, read_len).await?;
73
74        if is_eof {
75            return Ok(r_buf);
76        }
77
78        fd = r_fd;
79        buf = r_buf;
80    }
81}
82
83async fn small_probe_read(
84    fd: OwnedFd,
85    mut buf: Vec<u8>,
86    offset: &mut u64,
87) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
88    let read_len = PROBE_SIZE_U32;
89
90    let mut temp_arr = [0; PROBE_SIZE];
91    // we don't call this function if the buffer's length < PROBE_SIZE
92    let back_bytes_len = buf.len() - PROBE_SIZE;
93
94    temp_arr.copy_from_slice(&buf[back_bytes_len..]);
95
96    // We're decreasing the length of the buffer and len is greater
97    // than PROBE_SIZE. So we can read into the discarded length
98    buf.truncate(back_bytes_len);
99
100    let (r_fd, mut r_buf, is_eof) = op_read(fd, buf, offset, read_len).await?;
101    // If `size_read` returns zero due to reasons such as the buffer's exact fit,
102    // then this `try_reserve` does not perform allocation.
103    r_buf.try_reserve(PROBE_SIZE)?;
104    r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
105
106    Ok((r_fd, r_buf, is_eof))
107}
108
109// Takes a length to read and returns a single read in the buffer
110//
111// Returns the file descriptor, buffer and EOF reached or not
112async fn op_read(
113    mut fd: OwnedFd,
114    mut buf: Vec<u8>,
115    offset: &mut u64,
116    read_len: u32,
117) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
118    loop {
119        let (res, r_fd, r_buf) = Op::read_at(fd, buf, read_len as usize, *offset).await;
120
121        match res {
122            Err(e) if e.kind() == ErrorKind::Interrupted => {
123                buf = r_buf;
124                fd = r_fd;
125            }
126            Err(e) => return Err(e),
127            Ok(size_read) => {
128                *offset += size_read as u64;
129
130                return Ok((r_fd, r_buf, size_read == 0));
131            }
132        }
133    }
134}