Skip to main content

tempest_io/
lib.rs

1//! Async I/O abstraction for Tempest's tick-based event loop.
2//!
3//! Provides two backends:
4//! - [`LinuxIo`] - backed by `io_uring`, for production use.
5//! - [`VirtualIo`] - in-memory, for testing and DST.
6
7use std::{
8    io,
9    mem::ManuallyDrop,
10    path::{Path, PathBuf},
11};
12
13pub mod buf;
14pub use buf::*;
15
16#[cfg(feature = "virtual")]
17pub mod virtual_io;
18#[cfg(feature = "virtual")]
19pub use virtual_io::{VirtualIo, VirtualIoConfig};
20
21#[cfg(feature = "wasm")]
22pub mod wasm_io;
23#[cfg(feature = "wasm")]
24pub use wasm_io::{WasmIo, WasmIoConfig};
25
26#[cfg(feature = "linux")]
27pub mod linux_io;
28#[cfg(feature = "linux")]
29pub use linux_io::{LinuxIo, LinuxIoConfig};
30
31#[cfg(test)]
32mod tests;
33
34#[derive(Default, Clone)]
35pub struct OpenOptions {
36    pub read: bool,
37    pub write: bool,
38    pub create: bool,
39    pub create_new: bool,
40    pub truncate: bool,
41    pub direct: bool,
42    pub dsync: bool,
43}
44
45impl OpenOptions {
46    pub fn new() -> Self {
47        Default::default()
48    }
49
50    pub fn read(mut self, read: bool) -> Self {
51        self.read = read;
52        self
53    }
54
55    pub fn write(mut self, write: bool) -> Self {
56        self.write = write;
57        self
58    }
59
60    pub fn create(mut self, create: bool) -> Self {
61        self.create = create;
62        self
63    }
64
65    pub fn create_new(mut self, create_new: bool) -> Self {
66        self.create_new = create_new;
67        self
68    }
69
70    pub fn truncate(mut self, truncate: bool) -> Self {
71        self.truncate = truncate;
72        self
73    }
74
75    pub fn direct(mut self, direct: bool) -> Self {
76        self.direct = direct;
77        self
78    }
79
80    pub fn dsync(mut self, dsync: bool) -> Self {
81        self.dsync = dsync;
82        self
83    }
84
85    #[cfg(feature = "linux")]
86    pub const fn to_libc_flags(&self) -> i32 {
87        let mut flags = match (self.read, self.write) {
88            (true, true) => libc::O_RDWR,
89            (false, true) => libc::O_WRONLY,
90            _ => libc::O_RDONLY,
91        };
92
93        if self.create {
94            flags |= libc::O_CREAT;
95        }
96        if self.truncate {
97            flags |= libc::O_TRUNC;
98        }
99        if self.create_new {
100            flags |= libc::O_CREAT | libc::O_EXCL;
101        }
102        if self.direct {
103            flags |= libc::O_DIRECT;
104        }
105        if self.dsync {
106            flags |= libc::O_DSYNC;
107        }
108
109        flags
110    }
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
114pub struct OpHandle(pub u64);
115
116/// Owned handle to an in-flight read operation.
117///
118/// Holds the destination buffer until the kernel completes the read.
119/// Must be consumed via [`complete`](Self::complete) once the
120/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
121/// before completion is a bug.
122pub struct ReadHandle<B: IoBufMut> {
123    buf: B,
124}
125
126impl<B: IoBufMut> ReadHandle<B> {
127    #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
128    pub(crate) fn new(buf: B) -> Self {
129        Self { buf }
130    }
131
132    /// Mark this read operation as completed, returning the inner buffer.
133    ///
134    /// # Safety
135    ///
136    /// `bytes_read` must be the result value from the CQE corresponding to
137    /// this handle's [`OpHandle`]. Passing an arbitrary value will mark
138    /// uninitialized memory as initialized, potentially resulting in UB.
139    pub unsafe fn complete(self, bytes_read: u32) -> B {
140        let mut this = ManuallyDrop::new(self);
141        // SAFETY: caller must ensure that `buf` is valid up to `bytes_read`
142        unsafe { this.buf.set_init(bytes_read as usize) };
143        // SAFETY: we are consuming self via manually drop
144        unsafe { std::ptr::read(&this.buf) }
145    }
146}
147
148impl<B: IoBufMut> Drop for ReadHandle<B> {
149    fn drop(&mut self) {
150        panic!("ReadHandle dropped before completion");
151    }
152}
153
154/// Owned handle to an in-flight write operation.
155///
156/// Holds the source buffer until the kernel completes the write.
157/// Must be consumed via [`complete`](Self::complete) once the
158/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
159/// before completion is a bug.
160pub struct WriteHandle<B: IoBuf> {
161    buf: B,
162}
163
164impl<B: IoBuf> WriteHandle<B> {
165    #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
166    pub(crate) fn new(buf: B) -> Self {
167        Self { buf }
168    }
169
170    /// Mark this read operation as completed, returning the inner buffer.
171    pub fn complete(self) -> B {
172        let this = ManuallyDrop::new(self);
173        // SAFETY: we are consuming self via manually drop
174        unsafe { std::ptr::read(&this.buf) }
175    }
176}
177
178impl<B: IoBuf> Drop for WriteHandle<B> {
179    fn drop(&mut self) {
180        panic!("WriteHandle dropped before completion");
181    }
182}
183
184pub struct DirEntry {
185    pub path: PathBuf,
186    pub is_dir: bool,
187}
188
189pub trait Statx: Sized {
190    fn stx_size(&self) -> u64;
191}
192
193/// Owned handle to an in-flight fstat operation.
194///
195/// Holds the [`statx`](libc::statx) buffer until the kernel completes the write.
196/// Must be consumed via [`complete`](Self::complete) once the
197/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
198/// before completion is a bug.
199///
200/// # Implementation
201///
202/// Implementor of [`Io`] must ensure to fill the [statx](Self::statx) fields.
203pub struct FstatHandle<S: Statx> {
204    statx: Box<S>,
205}
206
207impl<S: Statx> FstatHandle<S> {
208    #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
209    pub(crate) fn new(statx: Box<S>) -> Self {
210        Self { statx }
211    }
212
213    /// Mark this read operation as completed, returning the inner buffer.
214    pub fn complete(self) -> S {
215        let this = ManuallyDrop::new(self);
216        // explicitly move the Box out and let it drop
217        // SAFETY: we are consuming this via manually drop so we have to ensure statx is
218        // deallocated correctly and we do not leak any memory by moving it out so it drops
219        let statx = unsafe { std::ptr::read(&this.statx as *const Box<S>) };
220        *statx
221    }
222}
223
224impl<S: Statx> Drop for FstatHandle<S> {
225    fn drop(&mut self) {
226        panic!("StatHandle dropped before completion");
227    }
228}
229
230/// Abstraction over async I/O backends.
231///
232/// All operations are split into two phases: submission and completion.
233/// The caller supplies an [`OpHandle`] (generated by the runtime) when
234/// submitting an operation; that same handle appears in [`completions`](Self::completions)
235/// once the operation finishes. Call [`poll`](Self::poll) once per tick to
236/// drain completed CQEs, then route each handle to the state machine that
237/// submitted it via [`get_cqe`](Self::get_cqe).
238///
239/// # Path contract
240///
241/// Paths passed to [`open`](Self::open) must be consistent across calls -
242/// the same file must always be referred to by the same path. Implementations
243/// are not required to normalize paths, so `"./foo"`, `"foo"`, and
244/// `"bar/../foo"` may be treated as distinct files. Callers should
245/// use absolute canonical paths to avoid ambiguity.
246///
247/// # Safety
248///
249/// All methods taking [`Self::Fd`] require a valid open file descriptor
250/// obtained from a completed [`open`](Self::open) CQE. Using a closed
251/// or invalid fd is undefined behavior.
252pub trait Io: 'static {
253    /// Returns the block size required for O_DIRECT I/O.
254    /// Buffer addresses, offsets, and lengths must all be aligned to this.
255    fn block_size(&self) -> usize;
256
257    /// Returns the current duration since the epoch start.
258    fn now(&self) -> std::time::Duration;
259
260    /// The file descriptor type for this backend.
261    type Fd: Copy;
262
263    /// Converts a raw CQE result from a completed [`open`] into an `Fd`.
264    ///
265    /// # Safety
266    ///
267    /// `result` must come from a successful [`open`] CQE. Calling this with
268    /// an arbitrary `u32` is undefined behavior.
269    ///
270    /// [`open`]: Self::open
271    unsafe fn into_fd(result: u32) -> Self::Fd;
272
273    type RegisteredBuf: IoBufMut;
274
275    /// Obtain a buffer that has been registered with this ring, which are guranteed to be properly
276    /// aligned to [`block_size`] for drive reads for files opened with the [`direct`] option.
277    ///
278    /// [`block_size`]: Self::block_size
279    /// [`direct`]: OpenOptions::direct
280    fn acquire_buf(&mut self) -> Option<Self::RegisteredBuf>;
281
282    /// Release a buffer that has been registered with this ring back to it, for reuse.
283    /// The buffer being released must come from a prior call to [`acquire_buf`].
284    ///
285    /// # Panics
286    ///
287    /// This function may panic when `buf` was not obtained through [`acquire_buf`].
288    ///
289    /// [`acquire_buf`]: Self::acquire_buf
290    fn release_buf(&mut self, buf: Self::RegisteredBuf);
291
292    type Statx: Statx;
293
294    /// Submits an fstat SQE. On completion, call [`FstatHandle::complete`] to
295    /// retrieve the populated [`Statx`] value.
296    fn fstat(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<FstatHandle<Self::Statx>>;
297
298    /// Submits an open SQE. On completion, pass the CQE result to
299    /// [`into_fd`](Self::into_fd) to obtain the file descriptor.
300    fn open(&mut self, path: &Path, opts: OpenOptions, handle: OpHandle) -> io::Result<()>;
301
302    /// Submits a close SQE. `fd` must not be used after this call.
303    fn close(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<()>;
304
305    /// Submits a read SQE. Returns a [`ReadHandle`] that owns `buf` until
306    /// the corresponding [`OpHandle`] appears in [`completions`](Self::completions) -
307    /// call [`ReadHandle::complete`] with the CQE result to get the buffer back.
308    fn read_at<B: IoBufMut>(
309        &mut self,
310        fd: Self::Fd,
311        buf: B,
312        offset: u64,
313        handle: OpHandle,
314    ) -> Result<ReadHandle<B>, (io::Error, B)>;
315
316    /// Submits a write SQE. Returns a [`WriteHandle`] that owns `buf` until
317    /// the corresponding [`OpHandle`] appears in [`completions`](Self::completions) -
318    /// call [`WriteHandle::complete`] to get the buffer back.
319    fn write_at<B: IoBuf>(
320        &mut self,
321        fd: Self::Fd,
322        buf: B,
323        offset: u64,
324        handle: OpHandle,
325    ) -> Result<WriteHandle<B>, (io::Error, B)>;
326
327    /// Submits an fsync SQE, ensuring all pending writes to `fd` are
328    /// durable before the corresponding CQE arrives.
329    fn fsync(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<()>;
330
331    /// Submits a rename SQE, atomically moving `from` to `to`.
332    /// If a file already exists at `to`, it is replaced.
333    /// The `from` and `to` paths must not be used after this call until
334    /// the corresponding handle appears in [`completions`](Self::completions).
335    fn rename(&mut self, from: &Path, to: &Path, handle: OpHandle) -> io::Result<()>;
336
337    /// Submits an unlink SQE, removing the file at `path`.
338    /// The CQE carries an error if the file does not exist.
339    /// `path` must not be used after this call until the corresponding
340    /// handle appears in [`completions`](Self::completions).
341    fn remove(&mut self, path: &Path, handle: OpHandle) -> io::Result<()>;
342
343    /// Drains all available CQEs into internal storage without blocking.
344    /// Call once per tick before driving any state machines.
345    fn poll(&mut self) -> io::Result<()>;
346
347    /// Returns the number of submitted operations whose CQE has not arrived yet.
348    fn in_flight(&self) -> usize;
349
350    /// Blocks until at least one CQE arrives, then drains all available CQEs
351    /// into internal storage. Call only when [`in_flight`](Self::in_flight) > 0
352    /// and the wake set is empty - i.e. nothing can make progress without I/O.
353    fn park(&mut self) -> io::Result<()>;
354
355    /// Returns the completions collected during the last [`poll`](Self::poll).
356    /// After all state machines have ticked this must be empty - a leftover
357    /// entry is a logic error (leaked operation or lost CQE).
358    fn completions(&mut self) -> &mut Completions;
359
360    /// Extract and remove the CQE for `handle` from this tick's completions.
361    /// Returns `None` if the CQE has not yet arrived.
362    fn get_cqe(&mut self, handle: OpHandle) -> Option<io::Result<u32>> {
363        let completions = self.completions();
364        let idx = completions.iter().position(|(h, _)| *h == handle)?;
365        let (_, result) = completions.swap_remove(idx);
366        Some(result)
367    }
368
369    /// Returns the direct children of `path` as a list of [`DirEntry`] values.
370    ///
371    /// This is a blocking operation and is **not** submitted to the ring.
372    /// It is intended for startup and recovery only - never call this on
373    /// the hot path. Implementations may issue blocking syscalls internally.
374    fn list_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>>;
375
376    /// Recursively creates `path` and all missing parent directories.
377    ///
378    /// This is a blocking operation and is **not** submitted to the ring.
379    /// It is intended for startup and recovery only - never call this on
380    /// the hot path. Implementations may issue blocking syscalls internally.
381    /// No-ops if the directory already exists.
382    fn create_dir_all(&self, path: &Path) -> io::Result<()>;
383}
384
385/// Completions collected by [`Io::poll`] for the current tick.
386///
387/// Drained entry-by-entry via [`Io::get_cqe`] as state machines tick.
388/// After all state machines have ticked this must be empty - a leftover
389/// entry means an [`OpHandle`] was never claimed, which is a logic error
390/// (leaked operation or lost CQE).
391pub type Completions = Vec<(OpHandle, io::Result<u32>)>;