tempest_io/lib.rs
1//! Async I/O abstraction for Tempest's tick-based event loop.
2//!
3//! Provides two backends:
4//! - [`LinuxIo`] - backed by `io_uring`, for production use.
5//! - [`VirtualIo`] - in-memory, for testing and DST.
6
7use std::{
8 io,
9 mem::ManuallyDrop,
10 path::{Path, PathBuf},
11};
12
13pub mod buf;
14pub use buf::*;
15
16#[cfg(feature = "virtual")]
17pub mod virtual_io;
18#[cfg(feature = "virtual")]
19pub use virtual_io::{VirtualIo, VirtualIoConfig};
20
21#[cfg(feature = "wasm")]
22pub mod wasm_io;
23#[cfg(feature = "wasm")]
24pub use wasm_io::{WasmIo, WasmIoConfig};
25
26#[cfg(feature = "linux")]
27pub mod linux_io;
28#[cfg(feature = "linux")]
29pub use linux_io::{LinuxIo, LinuxIoConfig};
30
31#[cfg(test)]
32mod tests;
33
34#[derive(Default, Clone)]
35pub struct OpenOptions {
36 pub read: bool,
37 pub write: bool,
38 pub create: bool,
39 pub create_new: bool,
40 pub truncate: bool,
41 pub direct: bool,
42 pub dsync: bool,
43}
44
45impl OpenOptions {
46 pub fn new() -> Self {
47 Default::default()
48 }
49
50 pub fn read(mut self, read: bool) -> Self {
51 self.read = read;
52 self
53 }
54
55 pub fn write(mut self, write: bool) -> Self {
56 self.write = write;
57 self
58 }
59
60 pub fn create(mut self, create: bool) -> Self {
61 self.create = create;
62 self
63 }
64
65 pub fn create_new(mut self, create_new: bool) -> Self {
66 self.create_new = create_new;
67 self
68 }
69
70 pub fn truncate(mut self, truncate: bool) -> Self {
71 self.truncate = truncate;
72 self
73 }
74
75 pub fn direct(mut self, direct: bool) -> Self {
76 self.direct = direct;
77 self
78 }
79
80 pub fn dsync(mut self, dsync: bool) -> Self {
81 self.dsync = dsync;
82 self
83 }
84
85 #[cfg(feature = "linux")]
86 pub const fn to_libc_flags(&self) -> i32 {
87 let mut flags = match (self.read, self.write) {
88 (true, true) => libc::O_RDWR,
89 (false, true) => libc::O_WRONLY,
90 _ => libc::O_RDONLY,
91 };
92
93 if self.create {
94 flags |= libc::O_CREAT;
95 }
96 if self.truncate {
97 flags |= libc::O_TRUNC;
98 }
99 if self.create_new {
100 flags |= libc::O_CREAT | libc::O_EXCL;
101 }
102 if self.direct {
103 flags |= libc::O_DIRECT;
104 }
105 if self.dsync {
106 flags |= libc::O_DSYNC;
107 }
108
109 flags
110 }
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
114pub struct OpHandle(pub u64);
115
116/// Owned handle to an in-flight read operation.
117///
118/// Holds the destination buffer until the kernel completes the read.
119/// Must be consumed via [`complete`](Self::complete) once the
120/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
121/// before completion is a bug.
122pub struct ReadHandle<B: IoBufMut> {
123 buf: B,
124}
125
126impl<B: IoBufMut> ReadHandle<B> {
127 #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
128 pub(crate) fn new(buf: B) -> Self {
129 Self { buf }
130 }
131
132 /// Mark this read operation as completed, returning the inner buffer.
133 ///
134 /// # Safety
135 ///
136 /// `bytes_read` must be the result value from the CQE corresponding to
137 /// this handle's [`OpHandle`]. Passing an arbitrary value will mark
138 /// uninitialized memory as initialized, potentially resulting in UB.
139 pub unsafe fn complete(self, bytes_read: u32) -> B {
140 let mut this = ManuallyDrop::new(self);
141 // SAFETY: caller must ensure that `buf` is valid up to `bytes_read`
142 unsafe { this.buf.set_init(bytes_read as usize) };
143 // SAFETY: we are consuming self via manually drop
144 unsafe { std::ptr::read(&this.buf) }
145 }
146}
147
148impl<B: IoBufMut> Drop for ReadHandle<B> {
149 fn drop(&mut self) {
150 panic!("ReadHandle dropped before completion");
151 }
152}
153
154/// Owned handle to an in-flight write operation.
155///
156/// Holds the source buffer until the kernel completes the write.
157/// Must be consumed via [`complete`](Self::complete) once the
158/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
159/// before completion is a bug.
160pub struct WriteHandle<B: IoBuf> {
161 buf: B,
162}
163
164impl<B: IoBuf> WriteHandle<B> {
165 #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
166 pub(crate) fn new(buf: B) -> Self {
167 Self { buf }
168 }
169
170 /// Mark this read operation as completed, returning the inner buffer.
171 pub fn complete(self) -> B {
172 let this = ManuallyDrop::new(self);
173 // SAFETY: we are consuming self via manually drop
174 unsafe { std::ptr::read(&this.buf) }
175 }
176}
177
178impl<B: IoBuf> Drop for WriteHandle<B> {
179 fn drop(&mut self) {
180 panic!("WriteHandle dropped before completion");
181 }
182}
183
184pub struct DirEntry {
185 pub path: PathBuf,
186 pub is_dir: bool,
187}
188
189pub trait Statx: Sized {
190 fn stx_size(&self) -> u64;
191}
192
193/// Owned handle to an in-flight fstat operation.
194///
195/// Holds the [`statx`](libc::statx) buffer until the kernel completes the write.
196/// Must be consumed via [`complete`](Self::complete) once the
197/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
198/// before completion is a bug.
199///
200/// # Implementation
201///
202/// Implementor of [`Io`] must ensure to fill the [statx](Self::statx) fields.
203pub struct FstatHandle<S: Statx> {
204 statx: Box<S>,
205}
206
207impl<S: Statx> FstatHandle<S> {
208 #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
209 pub(crate) fn new(statx: Box<S>) -> Self {
210 Self { statx }
211 }
212
213 /// Mark this read operation as completed, returning the inner buffer.
214 pub fn complete(self) -> S {
215 let this = ManuallyDrop::new(self);
216 // explicitly move the Box out and let it drop
217 // SAFETY: we are consuming this via manually drop so we have to ensure statx is
218 // deallocated correctly and we do not leak any memory by moving it out so it drops
219 let statx = unsafe { std::ptr::read(&this.statx as *const Box<S>) };
220 *statx
221 }
222}
223
224impl<S: Statx> Drop for FstatHandle<S> {
225 fn drop(&mut self) {
226 panic!("StatHandle dropped before completion");
227 }
228}
229
230/// Abstraction over async I/O backends.
231///
232/// All operations are split into two phases: submission and completion.
233/// The caller supplies an [`OpHandle`] (generated by the runtime) when
234/// submitting an operation; that same handle appears in [`completions`](Self::completions)
235/// once the operation finishes. Call [`poll`](Self::poll) once per tick to
236/// drain completed CQEs, then route each handle to the state machine that
237/// submitted it via [`get_cqe`](Self::get_cqe).
238///
239/// # Path contract
240///
241/// Paths passed to [`open`](Self::open) must be consistent across calls -
242/// the same file must always be referred to by the same path. Implementations
243/// are not required to normalize paths, so `"./foo"`, `"foo"`, and
244/// `"bar/../foo"` may be treated as distinct files. Callers should
245/// use absolute canonical paths to avoid ambiguity.
246///
247/// # Safety
248///
249/// All methods taking [`Self::Fd`] require a valid open file descriptor
250/// obtained from a completed [`open`](Self::open) CQE. Using a closed
251/// or invalid fd is undefined behavior.
252pub trait Io: 'static {
253 /// Returns the block size required for O_DIRECT I/O.
254 /// Buffer addresses, offsets, and lengths must all be aligned to this.
255 fn block_size(&self) -> usize;
256
257 /// Returns the current duration since the epoch start.
258 fn now(&self) -> std::time::Duration;
259
260 /// The file descriptor type for this backend.
261 type Fd: Copy;
262
263 /// Converts a raw CQE result from a completed [`open`] into an `Fd`.
264 ///
265 /// # Safety
266 ///
267 /// `result` must come from a successful [`open`] CQE. Calling this with
268 /// an arbitrary `u32` is undefined behavior.
269 ///
270 /// [`open`]: Self::open
271 unsafe fn into_fd(result: u32) -> Self::Fd;
272
273 type RegisteredBuf: IoBufMut;
274
275 /// Obtain a buffer that has been registered with this ring, which are guranteed to be properly
276 /// aligned to [`block_size`] for drive reads for files opened with the [`direct`] option.
277 ///
278 /// [`block_size`]: Self::block_size
279 /// [`direct`]: OpenOptions::direct
280 fn acquire_buf(&mut self) -> Option<Self::RegisteredBuf>;
281
282 /// Release a buffer that has been registered with this ring back to it, for reuse.
283 /// The buffer being released must come from a prior call to [`acquire_buf`].
284 ///
285 /// # Panics
286 ///
287 /// This function may panic when `buf` was not obtained through [`acquire_buf`].
288 ///
289 /// [`acquire_buf`]: Self::acquire_buf
290 fn release_buf(&mut self, buf: Self::RegisteredBuf);
291
292 type Statx: Statx;
293
294 /// Submits an fstat SQE. On completion, call [`FstatHandle::complete`] to
295 /// retrieve the populated [`Statx`] value.
296 fn fstat(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<FstatHandle<Self::Statx>>;
297
298 /// Submits an open SQE. On completion, pass the CQE result to
299 /// [`into_fd`](Self::into_fd) to obtain the file descriptor.
300 fn open(&mut self, path: &Path, opts: OpenOptions, handle: OpHandle) -> io::Result<()>;
301
302 /// Submits a close SQE. `fd` must not be used after this call.
303 fn close(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<()>;
304
305 /// Submits a read SQE. Returns a [`ReadHandle`] that owns `buf` until
306 /// the corresponding [`OpHandle`] appears in [`completions`](Self::completions) -
307 /// call [`ReadHandle::complete`] with the CQE result to get the buffer back.
308 fn read_at<B: IoBufMut>(
309 &mut self,
310 fd: Self::Fd,
311 buf: B,
312 offset: u64,
313 handle: OpHandle,
314 ) -> Result<ReadHandle<B>, (io::Error, B)>;
315
316 /// Submits a write SQE. Returns a [`WriteHandle`] that owns `buf` until
317 /// the corresponding [`OpHandle`] appears in [`completions`](Self::completions) -
318 /// call [`WriteHandle::complete`] to get the buffer back.
319 fn write_at<B: IoBuf>(
320 &mut self,
321 fd: Self::Fd,
322 buf: B,
323 offset: u64,
324 handle: OpHandle,
325 ) -> Result<WriteHandle<B>, (io::Error, B)>;
326
327 /// Submits an fsync SQE, ensuring all pending writes to `fd` are
328 /// durable before the corresponding CQE arrives.
329 fn fsync(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<()>;
330
331 /// Submits a rename SQE, atomically moving `from` to `to`.
332 /// If a file already exists at `to`, it is replaced.
333 /// The `from` and `to` paths must not be used after this call until
334 /// the corresponding handle appears in [`completions`](Self::completions).
335 fn rename(&mut self, from: &Path, to: &Path, handle: OpHandle) -> io::Result<()>;
336
337 /// Submits an unlink SQE, removing the file at `path`.
338 /// The CQE carries an error if the file does not exist.
339 /// `path` must not be used after this call until the corresponding
340 /// handle appears in [`completions`](Self::completions).
341 fn remove(&mut self, path: &Path, handle: OpHandle) -> io::Result<()>;
342
343 /// Drains all available CQEs into internal storage without blocking.
344 /// Call once per tick before driving any state machines.
345 fn poll(&mut self) -> io::Result<()>;
346
347 /// Returns the number of submitted operations whose CQE has not arrived yet.
348 fn in_flight(&self) -> usize;
349
350 /// Blocks until at least one CQE arrives, then drains all available CQEs
351 /// into internal storage. Call only when [`in_flight`](Self::in_flight) > 0
352 /// and the wake set is empty - i.e. nothing can make progress without I/O.
353 fn park(&mut self) -> io::Result<()>;
354
355 /// Returns the completions collected during the last [`poll`](Self::poll).
356 /// After all state machines have ticked this must be empty - a leftover
357 /// entry is a logic error (leaked operation or lost CQE).
358 fn completions(&mut self) -> &mut Completions;
359
360 /// Extract and remove the CQE for `handle` from this tick's completions.
361 /// Returns `None` if the CQE has not yet arrived.
362 fn get_cqe(&mut self, handle: OpHandle) -> Option<io::Result<u32>> {
363 let completions = self.completions();
364 let idx = completions.iter().position(|(h, _)| *h == handle)?;
365 let (_, result) = completions.swap_remove(idx);
366 Some(result)
367 }
368
369 /// Returns the direct children of `path` as a list of [`DirEntry`] values.
370 ///
371 /// This is a blocking operation and is **not** submitted to the ring.
372 /// It is intended for startup and recovery only - never call this on
373 /// the hot path. Implementations may issue blocking syscalls internally.
374 fn list_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>>;
375
376 /// Recursively creates `path` and all missing parent directories.
377 ///
378 /// This is a blocking operation and is **not** submitted to the ring.
379 /// It is intended for startup and recovery only - never call this on
380 /// the hot path. Implementations may issue blocking syscalls internally.
381 /// No-ops if the directory already exists.
382 fn create_dir_all(&self, path: &Path) -> io::Result<()>;
383}
384
385/// Completions collected by [`Io::poll`] for the current tick.
386///
387/// Drained entry-by-entry via [`Io::get_cqe`] as state machines tick.
388/// After all state machines have ticked this must be empty - a leftover
389/// entry means an [`OpHandle`] was never claimed, which is a logic error
390/// (leaked operation or lost CQE).
391pub type Completions = Vec<(OpHandle, io::Result<u32>)>;