armdb 0.1.13

sharded bitcask key-value storage optimized for NVMe
Documentation
/// Slot status: the slot has never been written (packed in low 2 bits of meta).
pub const STATUS_FREE: u8 = 0;
/// Slot status: the slot contains a valid key-value pair.
pub const STATUS_OCCUPIED: u8 = 1;
/// Slot status: the slot was previously occupied but has been deleted.
pub const STATUS_DELETED: u8 = 2;

/// Backward-compatible aliases (v1 names).
#[allow(dead_code)]
pub const SLOT_FREE: u8 = STATUS_FREE;
pub const SLOT_OCCUPIED: u8 = STATUS_OCCUPIED;
pub const SLOT_DELETED: u8 = STATUS_DELETED;

const STATUS_MASK: u32 = 0b11;
const VERSION_SHIFT: u32 = 2;
const VERSION_MASK: u32 = 0xFFFF_FFFC;

/// Warning threshold — at ~97% of 2^30 we log/metric that a hot slot is
/// approaching version wrap.
pub const VERSION_WARN_THRESHOLD: u32 = 0x3E00_0000;

/// Maximum 30-bit version value (2^30 - 1).
pub(crate) const VERSION_MAX: u32 = (1 << 30) - 1;

/// Size of the slot header: [meta: 4] [crc32: 4] = 8 bytes.
pub const SLOT_HEADER_SIZE: usize = 8;

/// Pack a `status` (2-bit) and a `version` (30-bit) into a single `u32`.
#[inline]
pub fn pack_meta(status: u8, version: u32) -> u32 {
    debug_assert!(
        version <= VERSION_MAX,
        "version {version} exceeds 30-bit max {VERSION_MAX}"
    );
    (version << VERSION_SHIFT) | (status as u32 & STATUS_MASK)
}

/// Extract the status (low 2 bits) from a packed meta word.
#[inline]
pub fn status_of(meta: u32) -> u8 {
    (meta & STATUS_MASK) as u8
}

/// Extract the version (upper 30 bits) from a packed meta word.
#[inline]
pub fn version_of(meta: u32) -> u32 {
    meta >> VERSION_SHIFT
}

/// Increment `version` by 1, preserving `status`. Version wraps at 2^30 because
/// the status bits occupy positions 0-1; adding 4 always leaves bits 0-1 untouched
/// and `wrapping_add` on `u32` naturally wraps when version bits overflow at 2^32.
#[inline]
pub fn bump_version(meta: u32) -> u32 {
    meta.wrapping_add(1 << VERSION_SHIFT)
}

/// Replace status, preserving version.
#[inline]
pub fn with_status(meta: u32, new_status: u8) -> u32 {
    (meta & VERSION_MASK) | (new_status as u32 & STATUS_MASK)
}

/// Modular comparison: is `new_version` newer than `old_version` in the
/// 30-bit version ring? Tolerates lag up to 2^29 versions — beyond that,
/// comparison is ambiguous and the event is treated as stale.
#[inline]
pub fn is_newer(new_version: u32, old_version: u32) -> bool {
    let diff = new_version.wrapping_sub(old_version) & VERSION_MAX;
    diff != 0 && diff < (1 << 29)
}

/// Compute the total slot size for the given key and value lengths,
/// aligned up to 8 bytes.
pub const fn slot_size(key_len: usize, value_len: usize) -> usize {
    let raw = SLOT_HEADER_SIZE + key_len + value_len;
    // align up to 8
    (raw + 7) & !7
}

/// Compute CRC32 over (meta bytes + key + value). Used for OCCUPIED slots;
/// DELETED slots store a stale CRC that is intentionally ignored at read.
pub fn compute_crc(meta: u32, key: &[u8], value: &[u8]) -> u32 {
    let mut h = crc32fast::Hasher::new();
    h.update(&meta.to_le_bytes());
    h.update(key);
    h.update(value);
    h.finalize()
}

/// Serialize an occupied slot into `buf`.
///
/// Layout: `[meta LE u32] [crc32 LE u32] [key] [value] [0-pad to align]`.
///
/// # Panics
/// If `buf.len() < slot_size(key.len(), value.len())`.
pub fn serialize_slot(buf: &mut [u8], meta: u32, key: &[u8], value: &[u8]) {
    let size = slot_size(key.len(), value.len());
    assert!(
        buf.len() >= size,
        "buffer too small: {} < {}",
        buf.len(),
        size
    );
    buf[..size].fill(0);
    buf[0..4].copy_from_slice(&meta.to_le_bytes());
    let crc = compute_crc(meta, key, value);
    buf[4..8].copy_from_slice(&crc.to_le_bytes());
    buf[SLOT_HEADER_SIZE..SLOT_HEADER_SIZE + key.len()].copy_from_slice(key);
    let val_off = SLOT_HEADER_SIZE + key.len();
    buf[val_off..val_off + value.len()].copy_from_slice(value);
}

/// Read the raw meta bytes from a slot buffer (first 4 bytes LE).
/// Caller must ensure `buf.len() >= 4` (guaranteed by `read_slot`'s
/// preceding size check; `pack_meta` produces buffers of full slot size).
#[inline]
pub fn meta_of(buf: &[u8]) -> u32 {
    debug_assert!(
        buf.len() >= 4,
        "meta_of requires buf.len() >= 4, got {}",
        buf.len()
    );
    u32::from_le_bytes(buf[0..4].try_into().expect("slot buffer >= 4 bytes"))
}

/// Read a slot and validate it. Returns `Some((meta, key, value))` only
/// for OCCUPIED slots with a matching CRC. Returns `None` for FREE,
/// DELETED, or CRC-mismatched slots.
pub fn read_slot(buf: &[u8], key_len: usize, value_len: usize) -> Option<(u32, &[u8], &[u8])> {
    let size = slot_size(key_len, value_len);
    if buf.len() < size {
        return None;
    }
    let meta = meta_of(buf);
    if status_of(meta) != STATUS_OCCUPIED {
        return None;
    }
    let stored_crc = u32::from_le_bytes(
        buf[4..8]
            .try_into()
            .expect("buf already checked >= SLOT_HEADER_SIZE"),
    );
    let key = &buf[SLOT_HEADER_SIZE..SLOT_HEADER_SIZE + key_len];
    let val_off = SLOT_HEADER_SIZE + key_len;
    let value = &buf[val_off..val_off + value_len];
    if stored_crc != compute_crc(meta, key, value) {
        return None;
    }
    Some((meta, key, value))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_slot_size_alignment() {
        // Header alone is 8 bytes, already aligned.
        assert_eq!(slot_size(0, 0), 8);

        // 8 + 4 + 4 = 16 — already aligned.
        assert_eq!(slot_size(4, 4), 16);

        // 8 + 8 + 8 = 24 — already aligned.
        assert_eq!(slot_size(8, 8), 24);

        // 8 + 1 + 0 = 9 -> align to 16.
        assert_eq!(slot_size(1, 0), 16);

        // 8 + 3 + 2 = 13 -> align to 16.
        assert_eq!(slot_size(3, 2), 16);

        // 8 + 10 + 7 = 25 -> align to 32.
        assert_eq!(slot_size(10, 7), 32);

        // Every result is a multiple of 8.
        for k in 0..64 {
            for v in 0..64 {
                assert_eq!(slot_size(k, v) % 8, 0);
            }
        }
    }

    #[test]
    fn test_serialize_read_roundtrip() {
        let key = b"hello";
        let value = b"world!!!";
        let meta = pack_meta(STATUS_OCCUPIED, 0);
        let size = slot_size(key.len(), value.len());
        let mut buf = vec![0u8; size];

        serialize_slot(&mut buf, meta, key, value);

        let (m, k, v) = read_slot(&buf, key.len(), value.len()).expect("validation should pass");
        assert_eq!(m, meta);
        assert_eq!(k, key);
        assert_eq!(v, value);
    }

    #[test]
    fn test_corrupted_slot_fails_validation() {
        let key = b"abc";
        let value = b"defgh";
        let meta = pack_meta(STATUS_OCCUPIED, 1);
        let size = slot_size(key.len(), value.len());
        let mut buf = vec![0u8; size];

        serialize_slot(&mut buf, meta, key, value);

        // Corrupt a value byte.
        buf[SLOT_HEADER_SIZE + key.len()] ^= 0xFF;

        assert!(
            read_slot(&buf, key.len(), value.len()).is_none(),
            "corrupted slot should fail validation"
        );
    }

    #[test]
    fn test_crc_covers_meta() {
        let m1 = pack_meta(STATUS_OCCUPIED, 1);
        let m2 = pack_meta(STATUS_OCCUPIED, 2);
        let c1 = compute_crc(m1, b"k", b"v");
        let c2 = compute_crc(m2, b"k", b"v");
        assert_ne!(c1, c2, "different meta must produce different CRC");
    }

    #[test]
    fn test_serialize_read_roundtrip_with_meta() {
        let key = b"hello";
        let value = b"world!!!";
        let meta = pack_meta(STATUS_OCCUPIED, 123);
        let size = slot_size(key.len(), value.len());
        let mut buf = vec![0u8; size];
        serialize_slot(&mut buf, meta, key, value);
        let (m, k, v) =
            read_slot(&buf, key.len(), value.len()).expect("valid slot should read back");
        assert_eq!(m, meta);
        assert_eq!(k, key);
        assert_eq!(v, value);
    }

    #[test]
    fn test_read_slot_returns_none_for_deleted() {
        let size = slot_size(4, 4);
        let mut buf = vec![0u8; size];
        serialize_slot(&mut buf, pack_meta(STATUS_OCCUPIED, 1), b"abcd", b"1234");
        // Change status to DELETED by rewriting first 4 bytes only —
        // CRC field stays stale on purpose; read_slot must return None without checking it.
        let deleted_meta = pack_meta(STATUS_DELETED, 2);
        buf[0..4].copy_from_slice(&deleted_meta.to_le_bytes());
        assert!(read_slot(&buf, 4, 4).is_none());
    }

    #[test]
    fn test_read_slot_returns_none_for_free() {
        let size = slot_size(4, 4);
        let buf = vec![0u8; size]; // all zeros = status FREE
        assert!(read_slot(&buf, 4, 4).is_none());
    }

    #[test]
    fn test_pack_meta_roundtrip() {
        let m = pack_meta(STATUS_OCCUPIED, 0x12345);
        assert_eq!(status_of(m), STATUS_OCCUPIED);
        assert_eq!(version_of(m), 0x12345);
    }

    #[test]
    fn test_pack_meta_max_version() {
        let max_v = (1u32 << 30) - 1;
        let m = pack_meta(STATUS_DELETED, max_v);
        assert_eq!(status_of(m), STATUS_DELETED);
        assert_eq!(version_of(m), max_v);
    }

    #[test]
    fn test_bump_version_preserves_status() {
        let m = pack_meta(STATUS_OCCUPIED, 5);
        let b = bump_version(m);
        assert_eq!(status_of(b), STATUS_OCCUPIED);
        assert_eq!(version_of(b), 6);
    }

    #[test]
    fn test_bump_version_wraps_at_2_pow_30() {
        let m = pack_meta(STATUS_OCCUPIED, (1 << 30) - 1);
        let b = bump_version(m);
        assert_eq!(version_of(b), 0, "version should wrap from 2^30-1 to 0");
        assert_eq!(status_of(b), STATUS_OCCUPIED);
    }

    #[test]
    fn test_with_status_preserves_version() {
        let m = pack_meta(STATUS_OCCUPIED, 42);
        let changed = with_status(m, STATUS_DELETED);
        assert_eq!(status_of(changed), STATUS_DELETED);
        assert_eq!(version_of(changed), 42);
    }

    #[test]
    fn test_is_newer_basic() {
        assert!(is_newer(5, 3));
        assert!(!is_newer(3, 5));
        assert!(!is_newer(7, 7), "equal is not newer");
    }

    #[test]
    fn test_is_newer_wrap_near_max() {
        // new=0 with old=2^30-1: diff = 1, newer
        assert!(is_newer(0, (1 << 30) - 1));
        // new=5 with old=2^30-3: diff = 5 + 3 = 8 (forward distance through the wrap boundary), newer
        assert!(is_newer(5, (1 << 30) - 3));
    }

    #[test]
    fn test_is_newer_boundary() {
        // diff exactly at 2^29 is ambiguous → NOT newer (window is strictly <)
        assert!(!is_newer(1 << 29, 0));
        // diff one below boundary → newer
        assert!(is_newer((1 << 29) - 1, 0));
    }

    #[test]
    fn test_is_newer_too_far() {
        // Forward ring-distance from old=(2^29-5) to new=0 is 2^29+5, which
        // exceeds the 2^29 tolerance window → treat as stale (not newer).
        assert!(!is_newer(0, (1 << 29) - 5));
    }

    #[test]
    fn test_is_newer_max_forward_distance() {
        // new=VERSION_MAX=(2^30-1), old=0: diff = 2^30-1 > 2^29 → NOT newer.
        // This is the largest forward distance in the ring; must be treated as stale.
        assert!(!is_newer(VERSION_MAX, 0));
    }
}