mace-kv 0.0.34

A fast, cross-platform embedded key-value storage engine with ACID, MVCC, and flash-optimized storage
Documentation
use crate::Options;
use crate::types::traits::IAsSlice;

use crate::io::{self, GatherIO, IoVec};
use std::fmt::Debug;
use std::path::PathBuf;

#[derive(Clone, Copy)]
pub struct LenSeq {
    pub len: u32,
    pub seq: u32,
}

impl LenSeq {
    pub const fn new(len: u32, seq: u32) -> Self {
        Self { len, seq }
    }
}

#[derive(Clone, Copy, Debug)]
#[repr(C, packed(1))]
pub struct Reloc {
    /// frame offset in page file
    pub(crate) off: usize,
    /// frame length including header (excluding refcnt)
    pub(crate) len: u32,
    /// index in relocation table
    pub(crate) seq: u32,
    /// checksum of page
    pub(crate) crc: u32,
}

#[derive(Debug, Clone, Copy)]
#[repr(C, packed(1))]
pub struct AddrPair {
    /// logical address
    pub(crate) key: u64,
    /// relocated address
    pub(crate) val: Reloc,
}

impl AddrPair {
    pub const LEN: usize = size_of::<Self>();
    pub fn new(key: u64, off: usize, len: u32, seq: u32, crc: u32) -> Self {
        Self {
            key,
            val: Reloc { off, len, seq, crc },
        }
    }
}

impl IAsSlice for AddrPair {}

#[derive(Debug, Clone, Copy)]
#[repr(C, packed(1))]
pub struct MapEntry {
    pub page_id: u64,
    // NULL_ADDR for delete mark
    pub page_addr: u64,
}

impl IAsSlice for MapEntry {}

#[derive(Clone, Copy)]
#[repr(C, packed(1))]
pub struct Interval {
    pub lo: u64,
    pub hi: u64,
}

impl Debug for Interval {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_fmt(format_args!("[{}, {}]", { self.lo }, { self.hi }))
    }
}

impl Interval {
    pub const LEN: usize = size_of::<Self>();

    pub const fn new(lo: u64, hi: u64) -> Self {
        Self { lo, hi }
    }
}

impl IAsSlice for Interval {}

pub struct GatherWriter {
    path: PathBuf,
    file: io::File,
    queue: Vec<IoVec>,
    queued_len: usize,
    max_iovcnt: usize,
}

unsafe impl Send for GatherWriter {}

impl GatherWriter {
    /// max iovec count is limited to 1024 in Linux
    pub(crate) const MAX_IOVCNT: usize = 1024;
    pub(crate) const DEFAULT_IOVCNT: usize = 64;

    fn open(path: &PathBuf, trunc: bool) -> io::File {
        io::File::options()
            .write(true)
            .append(true)
            .trunc(trunc)
            .create(true)
            .open(path)
            .inspect_err(|x| {
                log::error!("can't open {path:?}, {x}");
            })
            .unwrap()
    }

    fn create(path: &PathBuf, max_iovcnt: usize, trunc: bool) -> Self {
        Self {
            path: path.clone(),
            file: Self::open(path, trunc),
            queue: Vec::with_capacity(max_iovcnt),
            queued_len: 0,
            max_iovcnt: if max_iovcnt >= Self::MAX_IOVCNT {
                Self::DEFAULT_IOVCNT
            } else {
                max_iovcnt
            },
        }
    }

    pub fn trunc(path: &PathBuf, max_iovcnt: usize) -> Self {
        Self::create(path, max_iovcnt, true)
    }

    pub fn append(path: &PathBuf, max_iovcnt: usize) -> Self {
        Self::create(path, max_iovcnt, false)
    }

    pub fn queue(&mut self, data: &[u8]) {
        if self.queue.len() >= self.max_iovcnt {
            self.flush();
        }
        self.queue.push(data.into());
        self.queued_len += data.len();
    }

    #[allow(unused)]
    pub fn is_empty(&self) -> bool {
        self.queue.is_empty()
    }

    pub fn pos(&self) -> u64 {
        self.file
            .size()
            .inspect_err(|x| {
                log::error!("can't get metadata of {:?}, {}", self.path, x);
            })
            .unwrap()
    }

    pub fn write(&mut self, data: &[u8]) {
        self.file
            .write(data)
            .inspect_err(|e| {
                log::error!("can't write: {:?}", e);
            })
            .expect("can't fail");
    }

    pub fn flush(&mut self) {
        let iov = self.queue.as_mut_slice();
        self.file
            .writev(iov, self.queued_len)
            .inspect_err(|x| log::error!("can't write iov, {x}"))
            .unwrap();
        self.queued_len = 0;
        self.queue.clear();
    }

    pub fn sync(&mut self) {
        self.file
            .sync()
            .inspect_err(|x| {
                log::error!("can't sync {:?}, {}", self.path, x);
            })
            .expect("can't fail");
    }

    pub fn sync_data(&mut self) {
        self.file
            .sync_data()
            .inspect_err(|x| {
                log::error!("can't sync data {:?}, {}", self.path, x);
            })
            .expect("can't fail");
    }
}

impl Drop for GatherWriter {
    fn drop(&mut self) {
        self.flush();
    }
}

#[derive(Debug, Clone, Copy, Default)]
#[repr(C)]
pub struct Position {
    pub file_id: u64,
    pub offset: u64,
}

pub type GroupPositions = [Position; Options::MAX_CONCURRENT_WRITE as usize];
pub const fn init_group_pos() -> GroupPositions {
    [Position::MIN; Options::MAX_CONCURRENT_WRITE as usize]
}

impl Position {
    pub const MIN: Self = Position::new(u64::MIN, u64::MIN);

    pub const fn new(id: u64, off: u64) -> Self {
        Self {
            file_id: id,
            offset: off,
        }
    }
}

impl Ord for Position {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        match self.file_id.cmp(&other.file_id) {
            std::cmp::Ordering::Equal => self.offset.cmp(&other.offset),
            o => o,
        }
    }
}

impl PartialOrd for Position {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl PartialEq for Position {
    fn eq(&self, other: &Self) -> bool {
        self.cmp(other).is_eq()
    }
}

impl Eq for Position {}