armdb 0.1.13

sharded bitcask key-value storage optimized for NVMe
Documentation
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout};

pub const TOMBSTONE_BIT: u64 = 1 << 63;

/// On-disk entry header. 16 bytes, 8-byte aligned, no padding.
#[derive(Debug, Clone, Copy, FromBytes, IntoBytes, KnownLayout, Immutable)]
#[repr(C)]
pub struct EntryHeader {
    /// Global Sequence Number. Bit 63 = tombstone flag, bits 0-62 = sequence.
    pub gsn: u64,
    /// CRC32 checksum over (gsn || value_len || key || value).
    pub crc32: u32,
    /// Length of the value in bytes. 0 for tombstones.
    pub value_len: u32,
}

const _: () = assert!(size_of::<EntryHeader>() == 16);
const _: () = assert!(align_of::<EntryHeader>() == 8);

impl EntryHeader {
    #[inline]
    pub fn is_tombstone(&self) -> bool {
        self.gsn & TOMBSTONE_BIT != 0
    }

    #[inline]
    pub fn sequence(&self) -> u64 {
        self.gsn & !TOMBSTONE_BIT
    }
}

#[inline]
pub fn make_tombstone_gsn(gsn: u64) -> u64 {
    gsn | TOMBSTONE_BIT
}

/// Compute CRC32 over gsn || value_len || key || value.
pub fn compute_crc32(gsn: u64, value_len: u32, key: &[u8], value: &[u8]) -> u32 {
    let mut hasher = crc32fast::Hasher::new();
    hasher.update(&gsn.to_ne_bytes());
    hasher.update(&value_len.to_ne_bytes());
    hasher.update(key);
    hasher.update(value);
    hasher.finalize()
}

/// Compute the total on-disk size of an entry including padding to 8-byte alignment.
#[inline]
pub const fn entry_size(key_len: usize, value_len: u32) -> u64 {
    let raw = size_of::<EntryHeader>() + key_len + value_len as usize;
    let padded = (raw + 7) & !7;
    padded as u64
}

/// Serialize a complete entry (header + key + value + padding) into a `Vec<u8>`.
pub fn serialize_entry(gsn: u64, key: &[u8], value: &[u8], tombstone: bool) -> Vec<u8> {
    let actual_gsn = if tombstone {
        make_tombstone_gsn(gsn)
    } else {
        gsn
    };
    let value_len = value.len() as u32;
    let crc = compute_crc32(actual_gsn, value_len, key, value);

    let total = entry_size(key.len(), value_len) as usize;
    let mut buf = vec![0u8; total];

    let header = EntryHeader {
        gsn: actual_gsn,
        crc32: crc,
        value_len,
    };

    buf[..16].copy_from_slice(header.as_bytes());
    buf[16..16 + key.len()].copy_from_slice(key);
    buf[16 + key.len()..16 + key.len() + value.len()].copy_from_slice(value);
    // Remaining bytes are already zeroed (padding).

    buf
}

#[cfg(test)]
mod tests {
    use super::*;
    use zerocopy::FromBytes;

    #[test]
    fn test_tombstone_encoding() {
        let gsn = 123u64;
        let tombstone_gsn = make_tombstone_gsn(gsn);
        assert_ne!(tombstone_gsn, gsn);
        assert!(tombstone_gsn & TOMBSTONE_BIT != 0);

        let header = EntryHeader {
            gsn: tombstone_gsn,
            crc32: 0,
            value_len: 0,
        };
        assert!(header.is_tombstone());
        assert_eq!(header.sequence(), gsn);
    }

    #[test]
    fn test_sequence_preserves_value() {
        let header = EntryHeader {
            gsn: 42,
            crc32: 0,
            value_len: 0,
        };
        assert_eq!(header.sequence(), 42);
        assert!(!header.is_tombstone());
    }

    #[test]
    fn test_entry_size_alignment() {
        for key_len in 0..64 {
            for val_len in 0..64u32 {
                let size = entry_size(key_len, val_len);
                assert_eq!(size % 8, 0, "key_len={key_len}, val_len={val_len}");
            }
        }
    }

    #[test]
    fn test_entry_size_exact() {
        // header=16, key=8, val=0 => raw=24, padded=24
        assert_eq!(entry_size(8, 0), 24);
        // header=16, key=8, val=8 => raw=32, padded=32
        assert_eq!(entry_size(8, 8), 32);
        // header=16, key=16, val=1 => raw=33, padded=40
        assert_eq!(entry_size(16, 1), 40);
    }

    #[test]
    fn test_serialize_deserialize() {
        let key = b"test_key";
        let value = b"test_value";
        let gsn = 42u64;

        let buf = serialize_entry(gsn, key, value, false);
        let header = EntryHeader::read_from_bytes(&buf[..16]).expect("failed to parse header");

        assert_eq!(header.gsn, 42);
        assert_eq!(header.value_len, value.len() as u32);

        let expected_crc = compute_crc32(gsn, value.len() as u32, key, value);
        assert_eq!(header.crc32, expected_crc);
    }

    #[test]
    fn test_serialize_tombstone() {
        let key = b"tombkey";
        let value = b"";
        let gsn = 99u64;

        let buf = serialize_entry(gsn, key, value, true);
        let header = EntryHeader::read_from_bytes(&buf[..16]).expect("failed to parse header");

        assert!(header.is_tombstone());
        assert_eq!(header.sequence(), gsn);
    }

    #[test]
    fn test_crc_detects_corruption() {
        let key = b"mykey";
        let value = b"myvalue";
        let gsn = 7u64;

        let mut buf = serialize_entry(gsn, key, value, false);
        let header = EntryHeader::read_from_bytes(&buf[..16]).expect("failed to parse header");
        let original_crc = header.crc32;

        // Flip a byte in the value area (starts at offset 16 + key.len())
        let value_offset = 16 + key.len();
        buf[value_offset] ^= 0xFF;

        // Recompute CRC over the corrupted payload
        let corrupted_value = &buf[value_offset..value_offset + value.len()];
        let new_crc = compute_crc32(header.gsn, header.value_len, key, corrupted_value);
        assert_ne!(new_crc, original_crc);
    }
}