tempest-io 0.0.1

TempestDB I/O Layer
Documentation
//! Async I/O abstraction for Tempest's tick-based event loop.
//!
//! Provides two backends:
//! - [`LinuxIo`] - backed by `io_uring`, for production use.
//! - [`VirtualIo`] - in-memory, for testing and DST.

use std::{
    io,
    mem::ManuallyDrop,
    path::{Path, PathBuf},
};

pub mod buf;
pub use buf::*;

#[cfg(feature = "virtual")]
pub mod virtual_io;
#[cfg(feature = "virtual")]
pub use virtual_io::{VirtualIo, VirtualIoConfig};

#[cfg(feature = "wasm")]
pub mod wasm_io;
#[cfg(feature = "wasm")]
pub use wasm_io::{WasmIo, WasmIoConfig};

#[cfg(feature = "linux")]
pub mod linux_io;
#[cfg(feature = "linux")]
pub use linux_io::{LinuxIo, LinuxIoConfig};

#[cfg(test)]
mod tests;

#[derive(Default, Clone)]
pub struct OpenOptions {
    pub read: bool,
    pub write: bool,
    pub create: bool,
    pub create_new: bool,
    pub truncate: bool,
    pub direct: bool,
    pub dsync: bool,
}

impl OpenOptions {
    pub fn new() -> Self {
        Default::default()
    }

    pub fn read(mut self, read: bool) -> Self {
        self.read = read;
        self
    }

    pub fn write(mut self, write: bool) -> Self {
        self.write = write;
        self
    }

    pub fn create(mut self, create: bool) -> Self {
        self.create = create;
        self
    }

    pub fn create_new(mut self, create_new: bool) -> Self {
        self.create_new = create_new;
        self
    }

    pub fn truncate(mut self, truncate: bool) -> Self {
        self.truncate = truncate;
        self
    }

    pub fn direct(mut self, direct: bool) -> Self {
        self.direct = direct;
        self
    }

    pub fn dsync(mut self, dsync: bool) -> Self {
        self.dsync = dsync;
        self
    }

    #[cfg(feature = "linux")]
    pub const fn to_libc_flags(&self) -> i32 {
        let mut flags = match (self.read, self.write) {
            (true, true) => libc::O_RDWR,
            (false, true) => libc::O_WRONLY,
            _ => libc::O_RDONLY,
        };

        if self.create {
            flags |= libc::O_CREAT;
        }
        if self.truncate {
            flags |= libc::O_TRUNC;
        }
        if self.create_new {
            flags |= libc::O_CREAT | libc::O_EXCL;
        }
        if self.direct {
            flags |= libc::O_DIRECT;
        }
        if self.dsync {
            flags |= libc::O_DSYNC;
        }

        flags
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct OpHandle(pub u64);

/// Owned handle to an in-flight read operation.
///
/// Holds the destination buffer until the kernel completes the read.
/// Must be consumed via [`complete`](Self::complete) once the
/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
/// before completion is a bug.
pub struct ReadHandle<B: IoBufMut> {
    buf: B,
}

impl<B: IoBufMut> ReadHandle<B> {
    #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
    pub(crate) fn new(buf: B) -> Self {
        Self { buf }
    }

    /// Mark this read operation as completed, returning the inner buffer.
    ///
    /// # Safety
    ///
    /// `bytes_read` must be the result value from the CQE corresponding to
    /// this handle's [`OpHandle`]. Passing an arbitrary value will mark
    /// uninitialized memory as initialized, potentially resulting in UB.
    pub unsafe fn complete(self, bytes_read: u32) -> B {
        let mut this = ManuallyDrop::new(self);
        // SAFETY: caller must ensure that `buf` is valid up to `bytes_read`
        unsafe { this.buf.set_init(bytes_read as usize) };
        // SAFETY: we are consuming self via manually drop
        unsafe { std::ptr::read(&this.buf) }
    }
}

impl<B: IoBufMut> Drop for ReadHandle<B> {
    fn drop(&mut self) {
        panic!("ReadHandle dropped before completion");
    }
}

/// Owned handle to an in-flight write operation.
///
/// Holds the source buffer until the kernel completes the write.
/// Must be consumed via [`complete`](Self::complete) once the
/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
/// before completion is a bug.
pub struct WriteHandle<B: IoBuf> {
    buf: B,
}

impl<B: IoBuf> WriteHandle<B> {
    #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
    pub(crate) fn new(buf: B) -> Self {
        Self { buf }
    }

    /// Mark this read operation as completed, returning the inner buffer.
    pub fn complete(self) -> B {
        let this = ManuallyDrop::new(self);
        // SAFETY: we are consuming self via manually drop
        unsafe { std::ptr::read(&this.buf) }
    }
}

impl<B: IoBuf> Drop for WriteHandle<B> {
    fn drop(&mut self) {
        panic!("WriteHandle dropped before completion");
    }
}

pub struct DirEntry {
    pub path: PathBuf,
    pub is_dir: bool,
}

pub trait Statx: Sized {
    fn stx_size(&self) -> u64;
}

/// Owned handle to an in-flight fstat operation.
///
/// Holds the [`statx`](libc::statx) buffer until the kernel completes the write.
/// Must be consumed via [`complete`](Self::complete) once the
/// corresponding [`OpHandle`] appears in [`Io::poll`] - dropping it
/// before completion is a bug.
///
/// # Implementation
///
/// Implementor of [`Io`] must ensure to fill the [statx](Self::statx) fields.
pub struct FstatHandle<S: Statx> {
    statx: Box<S>,
}

impl<S: Statx> FstatHandle<S> {
    #[cfg(any(feature = "virtual", feature = "linux", feature = "wasm"))]
    pub(crate) fn new(statx: Box<S>) -> Self {
        Self { statx }
    }

    /// Mark this read operation as completed, returning the inner buffer.
    pub fn complete(self) -> S {
        let this = ManuallyDrop::new(self);
        // explicitly move the Box out and let it drop
        // SAFETY: we are consuming this via manually drop so we have to ensure statx is
        // deallocated correctly and we do not leak any memory by moving it out so it drops
        let statx = unsafe { std::ptr::read(&this.statx as *const Box<S>) };
        *statx
    }
}

impl<S: Statx> Drop for FstatHandle<S> {
    fn drop(&mut self) {
        panic!("StatHandle dropped before completion");
    }
}

/// Abstraction over async I/O backends.
///
/// All operations are split into two phases: submission and completion.
/// The caller supplies an [`OpHandle`] (generated by the runtime) when
/// submitting an operation; that same handle appears in [`completions`](Self::completions)
/// once the operation finishes. Call [`poll`](Self::poll) once per tick to
/// drain completed CQEs, then route each handle to the state machine that
/// submitted it via [`get_cqe`](Self::get_cqe).
///
/// # Path contract
///
/// Paths passed to [`open`](Self::open) must be consistent across calls -
/// the same file must always be referred to by the same path. Implementations
/// are not required to normalize paths, so `"./foo"`, `"foo"`, and
/// `"bar/../foo"` may be treated as distinct files. Callers should
/// use absolute canonical paths to avoid ambiguity.
///
/// # Safety
///
/// All methods taking [`Self::Fd`] require a valid open file descriptor
/// obtained from a completed [`open`](Self::open) CQE. Using a closed
/// or invalid fd is undefined behavior.
pub trait Io: 'static {
    /// Returns the block size required for O_DIRECT I/O.
    /// Buffer addresses, offsets, and lengths must all be aligned to this.
    fn block_size(&self) -> usize;

    /// Returns the current duration since the epoch start.
    fn now(&self) -> std::time::Duration;

    /// The file descriptor type for this backend.
    type Fd: Copy;

    /// Converts a raw CQE result from a completed [`open`] into an `Fd`.
    ///
    /// # Safety
    ///
    /// `result` must come from a successful [`open`] CQE. Calling this with
    /// an arbitrary `u32` is undefined behavior.
    ///
    /// [`open`]: Self::open
    unsafe fn into_fd(result: u32) -> Self::Fd;

    type RegisteredBuf: IoBufMut;

    /// Obtain a buffer that has been registered with this ring, which are guranteed to be properly
    /// aligned to [`block_size`] for drive reads for files opened with the [`direct`] option.
    ///
    /// [`block_size`]: Self::block_size
    /// [`direct`]: OpenOptions::direct
    fn acquire_buf(&mut self) -> Option<Self::RegisteredBuf>;

    /// Release a buffer that has been registered with this ring back to it, for reuse.
    /// The buffer being released must come from a prior call to [`acquire_buf`].
    ///
    /// # Panics
    ///
    /// This function may panic when `buf` was not obtained through [`acquire_buf`].
    ///
    /// [`acquire_buf`]: Self::acquire_buf
    fn release_buf(&mut self, buf: Self::RegisteredBuf);

    type Statx: Statx;

    /// Submits an fstat SQE. On completion, call [`FstatHandle::complete`] to
    /// retrieve the populated [`Statx`] value.
    fn fstat(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<FstatHandle<Self::Statx>>;

    /// Submits an open SQE. On completion, pass the CQE result to
    /// [`into_fd`](Self::into_fd) to obtain the file descriptor.
    fn open(&mut self, path: &Path, opts: OpenOptions, handle: OpHandle) -> io::Result<()>;

    /// Submits a close SQE. `fd` must not be used after this call.
    fn close(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<()>;

    /// Submits a read SQE. Returns a [`ReadHandle`] that owns `buf` until
    /// the corresponding [`OpHandle`] appears in [`completions`](Self::completions) -
    /// call [`ReadHandle::complete`] with the CQE result to get the buffer back.
    fn read_at<B: IoBufMut>(
        &mut self,
        fd: Self::Fd,
        buf: B,
        offset: u64,
        handle: OpHandle,
    ) -> Result<ReadHandle<B>, (io::Error, B)>;

    /// Submits a write SQE. Returns a [`WriteHandle`] that owns `buf` until
    /// the corresponding [`OpHandle`] appears in [`completions`](Self::completions) -
    /// call [`WriteHandle::complete`] to get the buffer back.
    fn write_at<B: IoBuf>(
        &mut self,
        fd: Self::Fd,
        buf: B,
        offset: u64,
        handle: OpHandle,
    ) -> Result<WriteHandle<B>, (io::Error, B)>;

    /// Submits an fsync SQE, ensuring all pending writes to `fd` are
    /// durable before the corresponding CQE arrives.
    fn fsync(&mut self, fd: Self::Fd, handle: OpHandle) -> io::Result<()>;

    /// Submits a rename SQE, atomically moving `from` to `to`.
    /// If a file already exists at `to`, it is replaced.
    /// The `from` and `to` paths must not be used after this call until
    /// the corresponding handle appears in [`completions`](Self::completions).
    fn rename(&mut self, from: &Path, to: &Path, handle: OpHandle) -> io::Result<()>;

    /// Submits an unlink SQE, removing the file at `path`.
    /// The CQE carries an error if the file does not exist.
    /// `path` must not be used after this call until the corresponding
    /// handle appears in [`completions`](Self::completions).
    fn remove(&mut self, path: &Path, handle: OpHandle) -> io::Result<()>;

    /// Drains all available CQEs into internal storage without blocking.
    /// Call once per tick before driving any state machines.
    fn poll(&mut self) -> io::Result<()>;

    /// Returns the number of submitted operations whose CQE has not arrived yet.
    fn in_flight(&self) -> usize;

    /// Blocks until at least one CQE arrives, then drains all available CQEs
    /// into internal storage. Call only when [`in_flight`](Self::in_flight) > 0
    /// and the wake set is empty - i.e. nothing can make progress without I/O.
    fn park(&mut self) -> io::Result<()>;

    /// Returns the completions collected during the last [`poll`](Self::poll).
    /// After all state machines have ticked this must be empty - a leftover
    /// entry is a logic error (leaked operation or lost CQE).
    fn completions(&mut self) -> &mut Completions;

    /// Extract and remove the CQE for `handle` from this tick's completions.
    /// Returns `None` if the CQE has not yet arrived.
    fn get_cqe(&mut self, handle: OpHandle) -> Option<io::Result<u32>> {
        let completions = self.completions();
        let idx = completions.iter().position(|(h, _)| *h == handle)?;
        let (_, result) = completions.swap_remove(idx);
        Some(result)
    }

    /// Returns the direct children of `path` as a list of [`DirEntry`] values.
    ///
    /// This is a blocking operation and is **not** submitted to the ring.
    /// It is intended for startup and recovery only - never call this on
    /// the hot path. Implementations may issue blocking syscalls internally.
    fn list_dir(&self, path: &Path) -> io::Result<Vec<DirEntry>>;

    /// Recursively creates `path` and all missing parent directories.
    ///
    /// This is a blocking operation and is **not** submitted to the ring.
    /// It is intended for startup and recovery only - never call this on
    /// the hot path. Implementations may issue blocking syscalls internally.
    /// No-ops if the directory already exists.
    fn create_dir_all(&self, path: &Path) -> io::Result<()>;
}

/// Completions collected by [`Io::poll`] for the current tick.
///
/// Drained entry-by-entry via [`Io::get_cqe`] as state machines tick.
/// After all state machines have ticked this must be empty - a leftover
/// entry means an [`OpHandle`] was never claimed, which is a logic error
/// (leaked operation or lost CQE).
pub type Completions = Vec<(OpHandle, io::Result<u32>)>;