usrbio/
ring.rs

1use crate::{job::ReadResult, *};
2use std::{ops::Deref, os::fd::AsRawFd, path::Path};
3
4/// Configuration parameters for the I/O ring.
5/// This structure defines the settings used to create and configure an I/O ring.
6#[derive(Debug, Clone)]
7pub struct RingConfig {
8    /// Number of entries in the ring buffer
9    pub entries: usize,
10    /// Size of the I/O buffer in bytes
11    pub buf_size: usize,
12    /// Maximum I/O depth for concurrent operations
13    pub io_depth: i32,
14    /// Timeout value for I/O operations
15    pub timeout: i32,
16    /// NUMA node identifier (-1 for no NUMA awareness)
17    pub numa: i32,
18    /// Additional flags for ring configuration
19    pub flags: u64,
20    /// Block size for I/O operations
21    pub block_size: usize,
22}
23
24impl Default for RingConfig {
25    fn default() -> Self {
26        Self {
27            entries: 1024,
28            buf_size: 4 << 20,
29            io_depth: 0,
30            timeout: 0,
31            numa: -1,
32            flags: 0,
33            block_size: 0,
34        }
35    }
36}
37
38/// An I/O ring that manages asynchronous read and write operations.
39/// Provides facilities for both single and batch I/O operations with configurable parameters.
40pub struct Ring {
41    config: RingConfig,
42    read_ior: Ior,
43    write_ior: Ior,
44    iov: Iov,
45    cqes: Vec<hf3fs_cqe>,
46}
47
48impl Ring {
49    /// Creates a new I/O ring with the specified configuration and mount point.
50    ///
51    /// # Arguments
52    /// * `config` - Ring configuration parameters
53    /// * `mount_point` - Mount point path for the filesystem
54    ///
55    /// # Returns
56    /// * `Result<Self>` - A new Ring instance or an error
57    pub fn create(config: &RingConfig, mount_point: &Path) -> Result<Self> {
58        let read_ior = Ior::create(
59            mount_point,
60            config.entries as _,
61            true,
62            config.io_depth,
63            config.timeout,
64            config.numa,
65            config.flags,
66        )?;
67        let write_ior = Ior::create(
68            mount_point,
69            config.entries as _,
70            false,
71            config.io_depth,
72            config.timeout,
73            config.numa,
74            config.flags,
75        )?;
76        let iov = Iov::create(mount_point, config.buf_size, config.block_size, config.numa)?;
77
78        Ok(Self {
79            config: config.clone(),
80            read_ior,
81            write_ior,
82            iov,
83            cqes: vec![hf3fs_cqe::default(); config.entries],
84        })
85    }
86
87    /// Performs a single read operation.
88    ///
89    /// # Arguments
90    /// * `file` - The file to read from
91    /// * `offset` - Starting position for the read
92    /// * `length` - Number of bytes to read
93    ///
94    /// # Returns
95    /// * `Result<&[u8]>` - The read data as a byte slice or an error
96    pub fn read(&mut self, file: &File, offset: u64, length: usize) -> Result<&[u8]> {
97        let results = self.batch_read(&[(file, offset, length)])?;
98        match &results[0] {
99            e if e.ret < 0 => Err(Error::ReadFailed(e.ret as i32)),
100            r => Ok(r.buf),
101        }
102    }
103
104    /// Performs multiple read operations in a batch.
105    ///
106    /// # Arguments
107    /// * `jobs` - Slice of read operations to perform
108    ///
109    /// # Returns
110    /// * `Result<Vec<ReadResult<'_>>>` - Vector of read results or an error
111    ///
112    /// # Errors
113    /// * Returns error if number of jobs exceeds ring entries
114    /// * Returns error if total read length exceeds buffer size
115    pub fn batch_read(&mut self, jobs: &[impl ReadJob]) -> Result<Vec<ReadResult<'_>>> {
116        let count = jobs.len();
117        if count > self.config.entries {
118            return Err(Error::InsufficientEntriesLength);
119        }
120
121        let total_len = jobs.iter().map(|job| job.length()).sum::<usize>();
122        if total_len > self.config.buf_size {
123            return Err(Error::InsufficientBufferLength);
124        }
125
126        let mut buf = unsafe { std::slice::from_raw_parts(self.iov.base, self.config.buf_size) };
127        let mut submitted = 0;
128        let mut results = Vec::with_capacity(count);
129        for (idx, job) in jobs.iter().enumerate() {
130            if job.length() == 0 {
131                results.push(ReadResult { ret: 0, buf: &[] });
132                continue;
133            }
134
135            let ret = unsafe {
136                hf3fs_prep_io(
137                    self.read_ior.deref(),
138                    self.iov.deref(),
139                    true,
140                    buf.as_ptr() as _,
141                    job.file().as_raw_fd(),
142                    job.offset() as usize,
143                    job.length() as u64,
144                    idx as _,
145                )
146            };
147            if ret < 0 {
148                return Err(Error::PrepareIOFailed(-ret));
149            }
150            results.push(ReadResult {
151                ret: 0,
152                buf: &buf[..job.length()],
153            });
154            buf = &buf[job.length()..];
155            submitted += 1;
156        }
157
158        if submitted == 0 {
159            return Ok(results);
160        }
161
162        let ret = unsafe { hf3fs_submit_ios(self.read_ior.deref()) };
163        if ret != 0 {
164            return Err(Error::SubmitIOsFailed(-ret));
165        }
166
167        let ret = unsafe {
168            self.cqes.set_len(submitted);
169            hf3fs_wait_for_ios(
170                self.read_ior.deref(),
171                self.cqes.as_mut_ptr(),
172                submitted as _,
173                submitted as _,
174                std::ptr::null_mut(),
175            )
176        };
177        if ret < 0 {
178            return Err(Error::WaitForIOsFailed(-ret));
179        }
180
181        for cqe in &self.cqes {
182            let result = &mut results[cqe.userdata as usize];
183            result.ret = cqe.result;
184            result.buf = &result.buf[..std::cmp::max(0, cqe.result) as usize];
185        }
186        Ok(results)
187    }
188
189    /// Performs a single write operation.
190    ///
191    /// # Arguments
192    /// * `file` - The file to write to
193    /// * `buf` - Data to write
194    /// * `offset` - Starting position for the write
195    ///
196    /// # Returns
197    /// * `Result<usize>` - Number of bytes written or an error
198    pub fn write(&mut self, file: &File, buf: &[u8], offset: u64) -> Result<usize> {
199        let results = self.batch_write(&[(file, buf, offset)])?;
200        match results[0] {
201            e if e < 0 => Err(Error::WriteFailed(-e as i32)),
202            r => Ok(r as usize),
203        }
204    }
205
206    /// Performs multiple write operations in a batch.
207    ///
208    /// # Arguments
209    /// * `jobs` - Slice of write operations to perform
210    ///
211    /// # Returns
212    /// * `Result<Vec<i64>>` - Vector of write results (bytes written) or an error
213    ///
214    /// # Errors
215    /// * Returns error if number of jobs exceeds ring entries
216    /// * Returns error if total write length exceeds buffer size
217    pub fn batch_write(&mut self, jobs: &[impl WriteJob]) -> Result<Vec<i64>> {
218        let count = jobs.len();
219        if count > self.config.entries {
220            return Err(Error::InsufficientEntriesLength);
221        }
222
223        let total_len = jobs.iter().map(|job| job.data().len()).sum::<usize>();
224        if total_len > self.config.buf_size {
225            return Err(Error::InsufficientBufferLength);
226        }
227
228        let mut buf =
229            unsafe { std::slice::from_raw_parts_mut(self.iov.base, self.config.buf_size) };
230        let mut submitted = 0;
231        let mut results = vec![0; count];
232        for (idx, job) in jobs.iter().enumerate() {
233            if job.data().is_empty() {
234                continue;
235            }
236
237            let len = job.data().len();
238            buf[..len].copy_from_slice(job.data());
239
240            let ret = unsafe {
241                hf3fs_prep_io(
242                    self.write_ior.deref(),
243                    self.iov.deref(),
244                    false,
245                    buf.as_ptr() as _,
246                    job.file().as_raw_fd(),
247                    job.offset() as usize,
248                    len as u64,
249                    idx as _,
250                )
251            };
252            if ret < 0 {
253                return Err(Error::PrepareIOFailed(-ret));
254            }
255
256            buf = &mut buf[len..];
257            submitted += 1;
258        }
259
260        if submitted == 0 {
261            return Ok(results);
262        }
263
264        let ret = unsafe { hf3fs_submit_ios(self.write_ior.deref()) };
265        if ret != 0 {
266            return Err(Error::SubmitIOsFailed(-ret));
267        }
268
269        let ret = unsafe {
270            self.cqes.set_len(submitted);
271            hf3fs_wait_for_ios(
272                self.write_ior.deref(),
273                self.cqes.as_mut_ptr(),
274                submitted as _,
275                submitted as _,
276                std::ptr::null_mut(),
277            )
278        };
279        if ret < 0 {
280            return Err(Error::WaitForIOsFailed(-ret));
281        }
282
283        for cqe in &self.cqes {
284            results[cqe.userdata as usize] = cqe.result;
285        }
286        Ok(results)
287    }
288}