1use crate::fs::OpenOptions;
2use crate::runtime::driver::op::Op;
3
4use std::io;
5use std::io::ErrorKind;
6use std::os::fd::OwnedFd;
7use std::path::Path;
8
9const PROBE_SIZE: usize = 32;
12const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;
13
14const MAX_READ_SIZE: usize = 64 * 1024 * 1024;
18
19pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
20 let file = OpenOptions::new().read(true).open(path).await?;
21
22 let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();
24
25 let fd: OwnedFd = file
26 .try_into_std()
27 .expect("unexpected in-flight operation detected")
28 .into();
29
30 let mut buf = Vec::new();
31
32 if let Some(size_hint) = size_hint {
33 buf.try_reserve(size_hint)?;
34 }
35
36 read_to_end_uring(fd, buf).await
37}
38
39async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
40 let mut offset = 0;
41 let start_cap = buf.capacity();
42
43 loop {
44 if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() >= PROBE_SIZE {
45 let (r_fd, r_buf, is_eof) = small_probe_read(fd, buf, &mut offset).await?;
50
51 if is_eof {
52 return Ok(r_buf);
53 }
54
55 buf = r_buf;
56 fd = r_fd;
57 }
58
59 if buf.len() == buf.capacity() {
61 buf.try_reserve(PROBE_SIZE)?;
62 }
63
64 let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);
66
67 let read_len = u32::try_from(buf_len).expect("buf_len must always fit in u32");
70
71 let (r_fd, r_buf, is_eof) = op_read(fd, buf, &mut offset, read_len).await?;
73
74 if is_eof {
75 return Ok(r_buf);
76 }
77
78 fd = r_fd;
79 buf = r_buf;
80 }
81}
82
83async fn small_probe_read(
84 fd: OwnedFd,
85 mut buf: Vec<u8>,
86 offset: &mut u64,
87) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
88 let read_len = PROBE_SIZE_U32;
89
90 let mut temp_arr = [0; PROBE_SIZE];
91 let back_bytes_len = buf.len() - PROBE_SIZE;
93
94 temp_arr.copy_from_slice(&buf[back_bytes_len..]);
95
96 buf.truncate(back_bytes_len);
99
100 let (r_fd, mut r_buf, is_eof) = op_read(fd, buf, offset, read_len).await?;
101 r_buf.try_reserve(PROBE_SIZE)?;
104 r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
105
106 Ok((r_fd, r_buf, is_eof))
107}
108
109async fn op_read(
113 mut fd: OwnedFd,
114 mut buf: Vec<u8>,
115 offset: &mut u64,
116 read_len: u32,
117) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
118 loop {
119 let (res, r_fd, r_buf) = Op::read_at(fd, buf, read_len as usize, *offset).await;
120
121 match res {
122 Err(e) if e.kind() == ErrorKind::Interrupted => {
123 buf = r_buf;
124 fd = r_fd;
125 }
126 Err(e) => return Err(e),
127 Ok(size_read) => {
128 *offset += size_read as u64;
129
130 return Ok((r_fd, r_buf, size_read == 0));
131 }
132 }
133 }
134}