1use crate::{job::ReadResult, *};
2use std::{ops::Deref, os::fd::AsRawFd, path::Path};
3
4#[derive(Debug, Clone)]
7pub struct RingConfig {
8 pub entries: usize,
10 pub buf_size: usize,
12 pub io_depth: i32,
14 pub timeout: i32,
16 pub numa: i32,
18 pub flags: u64,
20 pub block_size: usize,
22}
23
24impl Default for RingConfig {
25 fn default() -> Self {
26 Self {
27 entries: 1024,
28 buf_size: 4 << 20,
29 io_depth: 0,
30 timeout: 0,
31 numa: -1,
32 flags: 0,
33 block_size: 0,
34 }
35 }
36}
37
38pub struct Ring {
41 config: RingConfig,
42 read_ior: Ior,
43 write_ior: Ior,
44 iov: Iov,
45 cqes: Vec<hf3fs_cqe>,
46}
47
48impl Ring {
49 pub fn create(config: &RingConfig, mount_point: &Path) -> Result<Self> {
58 let read_ior = Ior::create(
59 mount_point,
60 config.entries as _,
61 true,
62 config.io_depth,
63 config.timeout,
64 config.numa,
65 config.flags,
66 )?;
67 let write_ior = Ior::create(
68 mount_point,
69 config.entries as _,
70 false,
71 config.io_depth,
72 config.timeout,
73 config.numa,
74 config.flags,
75 )?;
76 let iov = Iov::create(mount_point, config.buf_size, config.block_size, config.numa)?;
77
78 Ok(Self {
79 config: config.clone(),
80 read_ior,
81 write_ior,
82 iov,
83 cqes: vec![hf3fs_cqe::default(); config.entries],
84 })
85 }
86
87 pub fn read(&mut self, file: &File, offset: u64, length: usize) -> Result<&[u8]> {
97 let results = self.batch_read(&[(file, offset, length)])?;
98 match &results[0] {
99 e if e.ret < 0 => Err(Error::ReadFailed(e.ret as i32)),
100 r => Ok(r.buf),
101 }
102 }
103
104 pub fn batch_read(&mut self, jobs: &[impl ReadJob]) -> Result<Vec<ReadResult<'_>>> {
116 let count = jobs.len();
117 if count > self.config.entries {
118 return Err(Error::InsufficientEntriesLength);
119 }
120
121 let total_len = jobs.iter().map(|job| job.length()).sum::<usize>();
122 if total_len > self.config.buf_size {
123 return Err(Error::InsufficientBufferLength);
124 }
125
126 let mut buf = unsafe { std::slice::from_raw_parts(self.iov.base, self.config.buf_size) };
127 let mut submitted = 0;
128 let mut results = Vec::with_capacity(count);
129 for (idx, job) in jobs.iter().enumerate() {
130 if job.length() == 0 {
131 results.push(ReadResult { ret: 0, buf: &[] });
132 continue;
133 }
134
135 let ret = unsafe {
136 hf3fs_prep_io(
137 self.read_ior.deref(),
138 self.iov.deref(),
139 true,
140 buf.as_ptr() as _,
141 job.file().as_raw_fd(),
142 job.offset() as usize,
143 job.length() as u64,
144 idx as _,
145 )
146 };
147 if ret < 0 {
148 return Err(Error::PrepareIOFailed(-ret));
149 }
150 results.push(ReadResult {
151 ret: 0,
152 buf: &buf[..job.length()],
153 });
154 buf = &buf[job.length()..];
155 submitted += 1;
156 }
157
158 if submitted == 0 {
159 return Ok(results);
160 }
161
162 let ret = unsafe { hf3fs_submit_ios(self.read_ior.deref()) };
163 if ret != 0 {
164 return Err(Error::SubmitIOsFailed(-ret));
165 }
166
167 let ret = unsafe {
168 self.cqes.set_len(submitted);
169 hf3fs_wait_for_ios(
170 self.read_ior.deref(),
171 self.cqes.as_mut_ptr(),
172 submitted as _,
173 submitted as _,
174 std::ptr::null_mut(),
175 )
176 };
177 if ret < 0 {
178 return Err(Error::WaitForIOsFailed(-ret));
179 }
180
181 for cqe in &self.cqes {
182 let result = &mut results[cqe.userdata as usize];
183 result.ret = cqe.result;
184 result.buf = &result.buf[..std::cmp::max(0, cqe.result) as usize];
185 }
186 Ok(results)
187 }
188
189 pub fn write(&mut self, file: &File, buf: &[u8], offset: u64) -> Result<usize> {
199 let results = self.batch_write(&[(file, buf, offset)])?;
200 match results[0] {
201 e if e < 0 => Err(Error::WriteFailed(-e as i32)),
202 r => Ok(r as usize),
203 }
204 }
205
206 pub fn batch_write(&mut self, jobs: &[impl WriteJob]) -> Result<Vec<i64>> {
218 let count = jobs.len();
219 if count > self.config.entries {
220 return Err(Error::InsufficientEntriesLength);
221 }
222
223 let total_len = jobs.iter().map(|job| job.data().len()).sum::<usize>();
224 if total_len > self.config.buf_size {
225 return Err(Error::InsufficientBufferLength);
226 }
227
228 let mut buf =
229 unsafe { std::slice::from_raw_parts_mut(self.iov.base, self.config.buf_size) };
230 let mut submitted = 0;
231 let mut results = vec![0; count];
232 for (idx, job) in jobs.iter().enumerate() {
233 if job.data().is_empty() {
234 continue;
235 }
236
237 let len = job.data().len();
238 buf[..len].copy_from_slice(job.data());
239
240 let ret = unsafe {
241 hf3fs_prep_io(
242 self.write_ior.deref(),
243 self.iov.deref(),
244 false,
245 buf.as_ptr() as _,
246 job.file().as_raw_fd(),
247 job.offset() as usize,
248 len as u64,
249 idx as _,
250 )
251 };
252 if ret < 0 {
253 return Err(Error::PrepareIOFailed(-ret));
254 }
255
256 buf = &mut buf[len..];
257 submitted += 1;
258 }
259
260 if submitted == 0 {
261 return Ok(results);
262 }
263
264 let ret = unsafe { hf3fs_submit_ios(self.write_ior.deref()) };
265 if ret != 0 {
266 return Err(Error::SubmitIOsFailed(-ret));
267 }
268
269 let ret = unsafe {
270 self.cqes.set_len(submitted);
271 hf3fs_wait_for_ios(
272 self.write_ior.deref(),
273 self.cqes.as_mut_ptr(),
274 submitted as _,
275 submitted as _,
276 std::ptr::null_mut(),
277 )
278 };
279 if ret < 0 {
280 return Err(Error::WaitForIOsFailed(-ret));
281 }
282
283 for cqe in &self.cqes {
284 results[cqe.userdata as usize] = cqe.result;
285 }
286 Ok(results)
287 }
288}