armdb 0.1.14

sharded bitcask key-value storage optimized for NVMe
Documentation
#[cfg(target_os = "linux")]
use rustix_uring::{IoUring, cqueue, opcode, squeue, types};

#[cfg(target_os = "linux")]
use crate::error::{DbError, DbResult};

/// io_uring-based writer. Submits writes and fsyncs via the kernel's submission queue.
/// No internal buffering — callers provide the data buffer directly.
#[cfg(target_os = "linux")]
pub struct UringWriter {
    ring: IoUring<squeue::Entry, cqueue::Entry>,
    fd: Option<std::os::unix::io::RawFd>,
}

#[cfg(target_os = "linux")]
impl UringWriter {
    pub fn new() -> DbResult<Self> {
        let ring = IoUring::builder()
            .setup_sqpoll(2000)
            // TODO:
            // .setup_sqpoll_cpu(cpu_id)
            .build(256)
            .map_err(|e| DbError::Io(e.into()))?;

        Ok(Self { ring, fd: None })
    }

    pub fn set_file(&mut self, fd: std::os::unix::io::RawFd) {
        self.fd = Some(fd);
    }

    /// Write data to the file at the given offset. Blocks until completion.
    pub fn write_at(&mut self, data: &[u8], file_offset: u64) -> DbResult<()> {
        if data.is_empty() {
            return Ok(());
        }
        let fd = self.fd.expect("No file set in UringWriter");

        let write_e = opcode::Write::new(types::Fd(fd), data.as_ptr(), data.len() as u32)
            .offset(file_offset)
            .build()
            .user_data(1);

        unsafe {
            if self.ring.submission().push(&write_e).is_err() {
                self.ring.submit().map_err(|e| DbError::Io(e.into()))?;
                self.ring.submission().push(&write_e).expect("Queue full");
            }
        }

        self.ring
            .submit_and_wait(1)
            .map_err(|e| DbError::Io(e.into()))?;
        let mut cq = self.ring.completion();
        for cqe in &mut cq {
            if let Err(err) = cqe.result() {
                return Err(DbError::Io(err.into()));
            }
        }

        Ok(())
    }

    pub fn fsync(&mut self) -> DbResult<()> {
        let fd = self.fd.expect("No file set in UringWriter");

        let fsync_e = opcode::Fsync::new(types::Fd(fd)).build().user_data(2);

        unsafe {
            if self.ring.submission().push(&fsync_e).is_err() {
                self.ring.submit().map_err(|e| DbError::Io(e.into()))?;
                self.ring.submission().push(&fsync_e).expect("Queue full");
            }
        }

        self.ring
            .submit_and_wait(1)
            .map_err(|e| DbError::Io(e.into()))?;
        let mut cq = self.ring.completion();
        for cqe in &mut cq {
            if let Err(err) = cqe.result() {
                return Err(DbError::Io(err.into()));
            }
        }

        Ok(())
    }
}