mindb 0.1.2

Lightweight embedded key–value store with write-ahead log and zstd compression.
Documentation
#![allow(dead_code)]

use std::fs::{File, OpenOptions};
use std::io::{self, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::PathBuf;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU64, Ordering};

use parking_lot::Mutex;
use xxhash_rust::xxh3::xxh3_64;

const RING_SIZE: usize = 4 * 1024 * 1024;
const ALIGNMENT: usize = 4096;
const FRAME_HEADER_LEN: usize = 12;

/// Maximum payload size that can fit in a single WAL frame.
/// Public so higher layers can split payloads on entry boundaries.
pub const WAL_MAX_PAYLOAD: usize = RING_SIZE - FRAME_HEADER_LEN;

/// Describes a WAL append and the logical byte range that was reserved for it.
#[derive(Debug, Clone, Copy)]
pub struct WalTicket {
    pub offset: u64,
    pub len: u32,
}

/// Options used when constructing a [`Wal`].
#[derive(Debug, Clone)]
pub struct WalOptions {
    pub path: PathBuf,
    pub direct_io: bool,
    pub ring_bytes: usize,
}

impl Default for WalOptions {
    fn default() -> Self {
        Self {
            path: PathBuf::from("wal.log"),
            direct_io: true,
            ring_bytes: RING_SIZE,
        }
    }
}

/// Write-ahead log backed by a 4 MiB ring buffer and O_DIRECT writes.
pub struct Wal {
    file: Mutex<File>,
    fd: RawFd,
    buffer: Mutex<RingBuffer>,
    committed: AtomicU64,
    direct_io: bool,
    ring_len: usize,
}

impl Wal {
    /// Opens a new WAL instance at the configured path.
    pub fn open(options: WalOptions) -> io::Result<Self> {
        let mut open_options = OpenOptions::new();
        open_options
            .create(true)
            .write(true)
            .read(true)
            .append(true);

        if options.direct_io {
            open_options.custom_flags(libc::O_DIRECT);
        }

        let file = open_options.open(&options.path)?;
        let fd = file.as_raw_fd();
        let start_offset = file.metadata()?.len();

        Ok(Self {
            file: Mutex::new(file),
            fd,
            buffer: Mutex::new(RingBuffer::new(options.ring_bytes, start_offset)?),
            committed: AtomicU64::new(start_offset),
            direct_io: options.direct_io,
            ring_len: options.ring_bytes,
        })
    }

    /// Appends bytes to the WAL, returning the logical offset used for the entry.
    pub fn append(&self, payload: &[u8]) -> io::Result<WalTicket> {
        if payload.len() + FRAME_HEADER_LEN > self.ring_len {
            return Err(io::Error::new(
                io::ErrorKind::InvalidInput,
                "payload larger than WAL ring",
            ));
        }

        loop {
            {
                let mut ring = self.buffer.lock();
                let frame_len = align_up(payload.len() + FRAME_HEADER_LEN, ALIGNMENT);
                if ring.available() >= frame_len {
                    let offset = ring.reserve(frame_len);
                    ring.write_frame(offset, payload);
                    return Ok(WalTicket {
                        offset,
                        len: payload.len() as u32,
                    });
                }
            }

            // Not enough capacity – flush to reclaim space and retry.
            self.flush()?;
        }
    }

    /// Flushes the in-memory ring buffer to disk without forcing durability.
    pub fn flush(&self) -> io::Result<()> {
        let mut ring = self.buffer.lock();
        if ring.pending() == 0 {
            return Ok(());
        }

        let mut file = self.file.lock();
        let pending = ring.pending() as usize;
        if pending == 0 {
            return Ok(());
        }

        let mut drained = 0usize;
        while drained < pending {
            let chunk = ring.chunk();
            if chunk.is_empty() {
                break;
            }
            self.write_chunk(&mut file, chunk)?;
            let chunk_len = chunk.len();
            drained += chunk_len;
            ring.consume(chunk_len as u64);
        }

        Ok(())
    }

    /// Issues `fdatasync` ensuring that previously flushed bytes are durable on disk.
    pub fn sync(&self) -> io::Result<()> {
        let res = unsafe { libc::fdatasync(self.fd) };
        if res != 0 {
            return Err(io::Error::last_os_error());
        }
        Ok(())
    }

    /// Commits all pending bytes to disk, issuing fdatasync.
    pub fn commit(&self) -> io::Result<()> {
        self.flush()?;
        self.sync()
    }

    fn write_chunk(&self, file: &mut File, chunk: &[u8]) -> io::Result<()> {
        if chunk.is_empty() {
            return Ok(());
        }

        if self.direct_io {
            write_all_direct(self.fd, chunk)?;
        } else {
            file.write_all(chunk)?;
        }

        self.committed
            .fetch_add(chunk.len() as u64, Ordering::Release);
        Ok(())
    }

    /// Returns the number of bytes durably persisted to disk.
    pub fn committed_bytes(&self) -> u64 {
        self.committed.load(Ordering::Acquire)
    }

    /// Returns the maximum payload allowed in a single WAL frame.
    pub fn max_payload(&self) -> usize {
        self.ring_len.saturating_sub(FRAME_HEADER_LEN)
    }
}

/// Ensures the provided length is aligned to the block boundary.
fn align_up(len: usize, align: usize) -> usize {
    if len % align == 0 {
        len
    } else {
        len + (align - (len % align))
    }
}

fn write_all_direct(fd: RawFd, buf: &[u8]) -> io::Result<()> {
    let mut written = 0usize;
    while written < buf.len() {
        let ptr = unsafe { buf.as_ptr().add(written) as *const libc::c_void };
        let to_write = buf.len() - written;
        let ret = unsafe { libc::write(fd, ptr, to_write) };
        if ret < 0 {
            return Err(io::Error::last_os_error());
        }
        written += ret as usize;
    }
    Ok(())
}

struct RingBuffer {
    storage: AlignedBuffer,
    head: u64,
    tail: u64,
    len: u64,
}

impl RingBuffer {
    fn new(len: usize, start_offset: u64) -> io::Result<Self> {
        Ok(Self {
            storage: AlignedBuffer::new(len, ALIGNMENT)?,
            head: start_offset,
            tail: start_offset,
            len: len as u64,
        })
    }

    fn reserve(&mut self, size: usize) -> u64 {
        let offset = self.head;
        self.head += size as u64;
        offset
    }

    fn write_raw(&mut self, start: usize, data: &[u8]) {
        let end = start + data.len();

        if end <= self.capacity() {
            unsafe {
                let dst = self.storage.as_mut_slice(start, data.len());
                dst.copy_from_slice(data);
            }
        } else {
            let first = self.capacity() - start;
            unsafe {
                let dst = self.storage.as_mut_slice(start, first);
                dst.copy_from_slice(&data[..first]);
            }
            unsafe {
                let dst = self.storage.as_mut_slice(0, data.len() - first);
                dst.copy_from_slice(&data[first..]);
            }
        }
    }

    fn write_frame(&mut self, offset: u64, payload: &[u8]) {
        let frame_len = align_up(payload.len() + FRAME_HEADER_LEN, ALIGNMENT);
        let start = (offset % self.len) as usize;
        let mut frame = vec![0u8; frame_len];
        frame[..4].copy_from_slice(&(payload.len() as u32).to_le_bytes());
        frame[4..8].copy_from_slice(&(FRAME_HEADER_LEN as u32).to_le_bytes());
        frame[8..12].copy_from_slice(&(xxh3_64(payload) as u32).to_le_bytes());
        frame[FRAME_HEADER_LEN..FRAME_HEADER_LEN + payload.len()].copy_from_slice(payload);
        self.write_raw(start, &frame);
    }

    fn available(&self) -> usize {
        (self.len - self.pending()) as usize
    }

    fn pending(&self) -> u64 {
        self.head - self.tail
    }

    fn capacity(&self) -> usize {
        self.len as usize
    }

    fn chunk(&self) -> &[u8] {
        let pending = self.pending();
        if pending == 0 {
            return &[];
        }

        let tail = (self.tail % self.len) as usize;
        let contiguous = (self.capacity() - tail).min(pending as usize);
        unsafe { self.storage.as_slice(tail, contiguous) }
    }

    fn consume(&mut self, len: u64) {
        self.tail += len;
        if self.tail > self.head {
            self.tail = self.head;
        }
    }
}

struct AlignedBuffer {
    ptr: NonNull<u8>,
    len: usize,
    alignment: usize,
}

unsafe impl Send for AlignedBuffer {}
unsafe impl Sync for AlignedBuffer {}

impl AlignedBuffer {
    fn new(len: usize, alignment: usize) -> io::Result<Self> {
        let mut ptr = std::ptr::null_mut();
        let res = unsafe { libc::posix_memalign(&mut ptr, alignment, len) };
        if res != 0 {
            return Err(io::Error::from_raw_os_error(res));
        }
        Ok(Self {
            ptr: NonNull::new(ptr as *mut u8).expect("posix_memalign returned null"),
            len,
            alignment,
        })
    }

    unsafe fn as_slice(&self, start: usize, len: usize) -> &[u8] {
        debug_assert!(start + len <= self.len);
        unsafe { std::slice::from_raw_parts(self.ptr.as_ptr().add(start), len) }
    }

    unsafe fn as_mut_slice(&mut self, start: usize, len: usize) -> &mut [u8] {
        debug_assert!(start + len <= self.len);
        unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr().add(start), len) }
    }
}

impl Drop for AlignedBuffer {
    fn drop(&mut self) {
        unsafe {
            libc::free(self.ptr.as_ptr() as *mut libc::c_void);
        }
    }
}