tempest-io 0.0.1

TempestDB I/O Layer
Documentation
use std::{io, path::Path};

use crate::{Io, IoBuf, IoBufMut, OpHandle, OpenOptions, Statx};

/// Poll `io` until the `target` operation is completed.
/// There should only be this in-flight operation.
///
/// # Panics
///
/// Panics if another operation is completed first.
pub(crate) fn poll_until(io: &mut impl Io, target: OpHandle) -> io::Result<u32> {
    loop {
        io.poll()?;
        if let Some(result) = io.get_cqe(target) {
            return result;
        }
        assert!(
            io.completions().is_empty(),
            "unexpected completion for a different handle while waiting for {}",
            target.0
        );
    }
}

fn h(n: &mut u64) -> OpHandle {
    let handle = OpHandle(*n);
    *n += 1;
    handle
}

pub(super) fn test_open_write_read<I: Io>(io: &mut I, path: &Path) {
    let mut n = 0u64;

    // -- open --
    let open_h = h(&mut n);
    io.open(
        path,
        OpenOptions::new().read(true).write(true).create(true),
        open_h,
    )
    .unwrap();
    let raw = poll_until(io, open_h).unwrap();
    // SAFETY: result comes from a successful open CQE
    let fd = unsafe { I::into_fd(raw) };

    // -- write --
    let data = bytes::Bytes::from_static(b"hello tempest");
    let write_h = h(&mut n);
    let write_handle = io.write_at(fd, data.clone(), 0, write_h).unwrap();
    let bytes_written = poll_until(io, write_h).unwrap();
    assert_eq!(bytes_written as usize, data.len());
    write_handle.complete();

    // -- fsync --
    let fsync_h = h(&mut n);
    io.fsync(fd, fsync_h).unwrap();
    poll_until(io, fsync_h).unwrap();

    // -- read --
    let buf = bytes::BytesMut::zeroed(data.len());
    let read_h = h(&mut n);
    let read_handle = io.read_at(fd, buf, 0, read_h).unwrap();
    let bytes_read = poll_until(io, read_h).unwrap();
    // SAFETY: bytes_read comes from the corresponding read CQE
    let buf = unsafe { read_handle.complete(bytes_read) };
    assert_eq!(buf.as_ref(), b"hello tempest");

    // -- close --
    let close_h = h(&mut n);
    io.close(fd, close_h).unwrap();
    poll_until(io, close_h).unwrap();
}

pub(super) fn test_write_read_slice<I: Io>(io: &mut I, path: &Path) {
    let mut n = 0u64;

    // -- open --
    let open_h = h(&mut n);
    io.open(
        path,
        OpenOptions::new().read(true).write(true).create(true),
        open_h,
    )
    .unwrap();
    let raw = poll_until(io, open_h).unwrap();
    // SAFETY: result comes from a successful open CQE
    let fd = unsafe { I::into_fd(raw) };

    // -- write via slice --
    // large buffer, but only write the first 5 bytes
    let mut scratch = bytes::BytesMut::zeroed(4096);
    scratch[..5].copy_from_slice(b"hello");
    let write_h = h(&mut n);
    let write_handle = io.write_at(fd, scratch.slice(0..5), 0, write_h).unwrap();
    poll_until(io, write_h).unwrap();
    let slice = write_handle.complete();
    let scratch = slice.into_inner();

    // scratch is fully reclaimed here - capacity still 4096
    assert_eq!(scratch.capacity(), 4096);

    // -- read via slice --
    let read_h = h(&mut n);
    let read_handle = io.read_at(fd, scratch.slice(0..5), 0, read_h).unwrap();
    let bytes_read = poll_until(io, read_h).unwrap();
    assert_eq!(bytes_read, 5);
    let slice = unsafe { read_handle.complete(bytes_read) };
    let scratch = slice.into_inner();

    assert_eq!(&scratch[..5], b"hello");
    assert_eq!(scratch.capacity(), 4096); // still fully intact

    // -- close --
    let close_h = h(&mut n);
    io.close(fd, close_h).unwrap();
    poll_until(io, close_h).unwrap();
}

pub(super) fn test_remove<I: Io>(io: &mut I, path: &Path) {
    let mut n = 0u64;

    // -- create the file --
    let open_h = h(&mut n);
    io.open(path, OpenOptions::new().write(true).create(true), open_h)
        .unwrap();
    let raw = poll_until(io, open_h).unwrap();
    let fd = unsafe { I::into_fd(raw) };

    let close_h = h(&mut n);
    io.close(fd, close_h).unwrap();
    poll_until(io, close_h).unwrap();

    // -- remove it --
    let remove_h = h(&mut n);
    io.remove(path, remove_h).unwrap();
    poll_until(io, remove_h).unwrap();

    // -- opening it again without create should now fail --
    let open_h2 = h(&mut n);
    io.open(path, OpenOptions::new().read(true), open_h2)
        .unwrap();
    assert!(poll_until(io, open_h2).is_err());
}

pub(super) fn test_create_dir_and_file<I: Io>(io: &mut I, dir: &Path, filepath: &Path) {
    let mut n = 0u64;

    // -- create deeply nested directory --
    io.create_dir_all(dir).unwrap();

    // -- create a file inside --
    let open_h = h(&mut n);
    io.open(
        filepath,
        OpenOptions::new().write(true).create(true),
        open_h,
    )
    .unwrap();
    let raw = poll_until(io, open_h).unwrap();
    let fd = unsafe { I::into_fd(raw) };

    // -- write something to confirm it works --
    let data = bytes::Bytes::from_static(b"nested");
    let write_h = h(&mut n);
    let write_handle = io.write_at(fd, data.clone(), 0, write_h).unwrap();
    let bytes_written = poll_until(io, write_h).unwrap();
    assert_eq!(bytes_written as usize, data.len());
    write_handle.complete();

    let close_h = h(&mut n);
    io.close(fd, close_h).unwrap();
    poll_until(io, close_h).unwrap();
}

pub(super) fn test_fstat<I: Io>(io: &mut I, path: &Path) {
    let mut n = 0u64;

    // -- open and write known bytes --
    let open_h = h(&mut n);
    io.open(
        path,
        OpenOptions::new().read(true).write(true).create(true),
        open_h,
    )
    .unwrap();
    let raw = poll_until(io, open_h).unwrap();
    let fd = unsafe { I::into_fd(raw) };

    let data = bytes::Bytes::from_static(b"hello tempest");
    let write_h = h(&mut n);
    let write_handle = io.write_at(fd, data.clone(), 0, write_h).unwrap();
    poll_until(io, write_h).unwrap();
    write_handle.complete();

    // -- fstat --
    let fstat_h = h(&mut n);
    let stat_handle = io.fstat(fd, fstat_h).unwrap();
    poll_until(io, fstat_h).unwrap();
    let stat = stat_handle.complete();
    assert_eq!(stat.stx_size(), data.len() as u64);

    // -- close --
    let close_h = h(&mut n);
    io.close(fd, close_h).unwrap();
    poll_until(io, close_h).unwrap();
}

pub(super) fn test_list_dir<I: Io>(io: &mut I, dir: &Path) {
    let mut n = 0u64;

    io.create_dir_all(dir).unwrap();

    // -- create a few files --
    for name in ["a.txt", "b.txt", "c.txt"] {
        let path = dir.join(name);
        let open_h = h(&mut n);
        io.open(&path, OpenOptions::new().write(true).create(true), open_h)
            .unwrap();
        let raw = poll_until(io, open_h).unwrap();
        let fd = unsafe { I::into_fd(raw) };
        let close_h = h(&mut n);
        io.close(fd, close_h).unwrap();
        poll_until(io, close_h).unwrap();
    }

    // -- list --
    let mut entries = io.list_dir(dir).unwrap();
    entries.sort_by(|a, b| a.path.cmp(&b.path));

    assert_eq!(entries.len(), 3);
    assert_eq!(entries[0].path, dir.join("a.txt"));
    assert_eq!(entries[1].path, dir.join("b.txt"));
    assert_eq!(entries[2].path, dir.join("c.txt"));
    assert!(entries.iter().all(|e| !e.is_dir));
}

pub(super) fn test_registered_buf_write_read<I: Io>(io: &mut I, path: &Path) {
    let mut n = 0u64;

    let open_h = h(&mut n);
    io.open(
        path,
        OpenOptions::new().read(true).write(true).create(true),
        open_h,
    )
    .unwrap();
    let raw = poll_until(io, open_h).unwrap();
    let fd = unsafe { I::into_fd(raw) };

    // acquire a registered buffer, fill it with known data
    let mut buf = io.acquire_buf().expect("pool should have a free buffer");
    let data = b"registered buffer test";
    unsafe {
        std::ptr::copy_nonoverlapping(data.as_ptr(), buf.stable_mut_ptr(), data.len());
        buf.set_init(data.len());
    }

    // write - LinuxIo dispatches to WriteFixed, VirtualIo to a normal write
    let write_h = h(&mut n);
    let write_handle = io
        .write_at(fd, buf, 0, write_h)
        .map_err(|(e, _)| e)
        .unwrap();
    let bytes_written = poll_until(io, write_h).unwrap();
    assert_eq!(bytes_written as usize, data.len());
    let buf = write_handle.complete();
    io.release_buf(buf);

    // acquire a fresh buffer for reading
    let mut rbuf = io.acquire_buf().expect("pool should have a free buffer");
    rbuf.clear();

    // read back - LinuxIo dispatches to ReadFixed, VirtualIo to a normal read
    let read_h = h(&mut n);
    let read_handle = io.read_at(fd, rbuf, 0, read_h).map_err(|(e, _)| e).unwrap();
    let bytes_read = poll_until(io, read_h).unwrap();
    let rbuf = unsafe { read_handle.complete(bytes_read) };

    assert_eq!(
        unsafe { std::slice::from_raw_parts(rbuf.stable_ptr(), bytes_read as usize) },
        data.as_ref()
    );
    io.release_buf(rbuf);

    let close_h = h(&mut n);
    io.close(fd, close_h).unwrap();
    poll_until(io, close_h).unwrap();
}

#[cfg(feature = "linux")]
mod linux;
#[cfg(feature = "virtual")]
mod virtual_io;