bitcasky 0.1.2

Bitcasky is a Rust implementation of the Bitcask key-value store
Documentation
use std::{ops::Deref, ptr};

use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use crc::{Crc, CRC_32_CKSUM};

use super::{
    Formatter, FormatterError, MergeMeta, Result, RowHeader, RowHintHeader, RowMeta, RowToWrite,
};

const CRC_SIZE: usize = 4;
const TSTAMP_SIZE: usize = 8;
const KEY_SIZE_SIZE: usize = 8;
const VALUE_SIZE_SIZE: usize = 8;
const DATA_FILE_TSTAMP_OFFSET: usize = CRC_SIZE;
const DATA_FILE_KEY_SIZE_OFFSET: usize = CRC_SIZE + TSTAMP_SIZE;
const DATA_FILE_VALUE_SIZE_OFFSET: usize = DATA_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE;
const DATA_FILE_KEY_OFFSET: usize = CRC_SIZE + TSTAMP_SIZE + KEY_SIZE_SIZE + VALUE_SIZE_SIZE;

const ROW_OFFSET_SIZE: usize = 8;
const ROW_SIZE_SIZE: usize = 8;
const HINT_FILE_KEY_SIZE_OFFSET: usize = TSTAMP_SIZE;
const HINT_FILE_ROW_OFFSET_OFFSET: usize = HINT_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE;
const HINT_FILE_ROW_SIZE_OFFSET: usize = HINT_FILE_ROW_OFFSET_OFFSET + ROW_OFFSET_SIZE;
const HINT_FILE_KEY_OFFSET: usize = HINT_FILE_ROW_SIZE_OFFSET + ROW_SIZE_SIZE;
const HINT_FILE_HEADER_SIZE: usize = TSTAMP_SIZE + KEY_SIZE_SIZE + ROW_OFFSET_SIZE + ROW_SIZE_SIZE;

const MERGE_META_FILE_SIZE: usize = 4;

#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub struct FormatterV1 {}

impl FormatterV1 {
    fn gen_crc<V: Deref<Target = [u8]>>(&self, meta: &RowMeta, key: &[u8], value: &V) -> u32 {
        let crc32 = Crc::<u32>::new(&CRC_32_CKSUM);
        let mut ck = crc32.digest();
        ck.update(&meta.expire_timestamp.to_be_bytes());
        ck.update(&meta.key_size.to_be_bytes());
        ck.update(&value.len().to_be_bytes());
        ck.update(key.as_ref());
        ck.update(value);
        ck.finalize()
    }

    fn gen_crc_by_kv_bytes(&self, meta: &RowMeta, kv: &[u8]) -> u32 {
        let crc32 = Crc::<u32>::new(&CRC_32_CKSUM);
        let mut ck = crc32.digest();
        ck.update(&meta.expire_timestamp.to_be_bytes());
        ck.update(&meta.key_size.to_be_bytes());
        ck.update(&meta.value_size.to_be_bytes());
        ck.update(kv);
        ck.finalize()
    }
}

impl Formatter for FormatterV1 {
    fn row_header_size(&self) -> usize {
        DATA_FILE_KEY_OFFSET
    }

    fn net_row_size<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
        &self,
        row: &RowToWrite<K, V>,
    ) -> usize {
        self.row_header_size() + row.key.as_ref().len() + row.value.len()
    }

    fn encode_row<K: AsRef<[u8]>, V: Deref<Target = [u8]>>(
        &self,
        row: &RowToWrite<K, V>,
        bs: &mut [u8],
    ) -> usize {
        let crc = self.gen_crc(&row.meta, row.key.as_ref(), &row.value);
        LittleEndian::write_u32(bs, crc);
        LittleEndian::write_u64(&mut bs[4..], row.meta.expire_timestamp);
        LittleEndian::write_u64(&mut bs[12..], row.meta.key_size as u64);
        LittleEndian::write_u64(&mut bs[20..], row.meta.value_size as u64);
        copy_memory(row.key.as_ref(), &mut bs[28..]);
        copy_memory(&row.value, &mut bs[28 + row.key.as_ref().len()..]);
        self.net_row_size(row)
    }

    fn decode_row_header(&self, bs: &[u8]) -> RowHeader {
        let expected_crc = LittleEndian::read_u32(&bs[0..DATA_FILE_TSTAMP_OFFSET]);
        let timestamp =
            LittleEndian::read_u64(&bs[DATA_FILE_TSTAMP_OFFSET..DATA_FILE_KEY_SIZE_OFFSET]);
        let key_size = LittleEndian::read_u64(
            &bs[DATA_FILE_KEY_SIZE_OFFSET..(DATA_FILE_KEY_SIZE_OFFSET + KEY_SIZE_SIZE)],
        ) as usize;
        let val_size = LittleEndian::read_u64(
            &bs[DATA_FILE_VALUE_SIZE_OFFSET..(DATA_FILE_VALUE_SIZE_OFFSET + VALUE_SIZE_SIZE)],
        ) as usize;
        RowHeader {
            crc: expected_crc,
            meta: RowMeta {
                expire_timestamp: timestamp,
                key_size,
                value_size: val_size,
            },
        }
    }

    fn validate_key_value(&self, header: &RowHeader, kv: &[u8]) -> Result<()> {
        let actual_crc = self.gen_crc_by_kv_bytes(&header.meta, kv);
        if header.crc != actual_crc {
            return Err(FormatterError::CrcCheckFailed {
                expected_crc: header.crc,
                actual_crc,
            });
        }
        Ok(())
    }

    fn encode_row_hint(&self, hint: &super::RowHint, output: &mut [u8]) -> usize {
        let header = &hint.header;

        LittleEndian::write_u64(output, header.expire_timestamp);
        LittleEndian::write_u64(
            &mut output[HINT_FILE_KEY_SIZE_OFFSET..],
            header.key_size as u64,
        );
        LittleEndian::write_u64(
            &mut output[HINT_FILE_ROW_OFFSET_OFFSET..],
            header.row_offset as u64,
        );
        LittleEndian::write_u64(
            &mut output[HINT_FILE_ROW_SIZE_OFFSET..],
            header.row_size as u64,
        );

        copy_memory(&hint.key, &mut output[HINT_FILE_KEY_OFFSET..]);
        HINT_FILE_HEADER_SIZE + hint.key.len()
    }

    fn row_hint_header_size(&self) -> usize {
        HINT_FILE_HEADER_SIZE
    }

    fn decode_row_hint_header(&self, header_bs: &[u8]) -> RowHintHeader {
        let timestamp = LittleEndian::read_u64(&header_bs[0..TSTAMP_SIZE]);
        let key_size = LittleEndian::read_u64(
            &header_bs[HINT_FILE_KEY_SIZE_OFFSET..HINT_FILE_ROW_OFFSET_OFFSET],
        ) as usize;
        let row_offset = LittleEndian::read_u64(
            &header_bs[HINT_FILE_ROW_OFFSET_OFFSET..HINT_FILE_ROW_SIZE_OFFSET],
        ) as usize;
        let row_size =
            LittleEndian::read_u64(&header_bs[HINT_FILE_ROW_SIZE_OFFSET..HINT_FILE_KEY_OFFSET])
                as usize;
        RowHintHeader {
            expire_timestamp: timestamp,
            key_size,
            row_offset,
            row_size,
        }
    }

    fn merge_meta_size(&self) -> usize {
        MERGE_META_FILE_SIZE
    }

    fn encode_merge_meta(&self, meta: &super::MergeMeta) -> Bytes {
        Bytes::copy_from_slice(&meta.known_max_storage_id.to_be_bytes())
    }

    fn decode_merge_meta(&self, mut meta: Bytes) -> MergeMeta {
        let known_max_storage_id = meta.get_u32();
        MergeMeta {
            known_max_storage_id,
        }
    }
}

fn copy_memory(src: &[u8], dst: &mut [u8]) {
    let len_src = src.len();
    assert!(dst.len() >= len_src);
    unsafe {
        ptr::copy_nonoverlapping(src.as_ptr(), dst.as_mut_ptr(), len_src);
    }
}

#[cfg(test)]
mod tests {
    use crate::formatter::RowHint;

    use super::*;

    use test_log::test;

    #[test]
    fn test_encode_decode_merge_meta() {
        let merge_meta = MergeMeta {
            known_max_storage_id: 123,
        };

        let formatter = FormatterV1 {};
        let bytes = formatter.encode_merge_meta(&merge_meta);
        assert_eq!(formatter.merge_meta_size(), bytes.len());
        assert_eq!(merge_meta, formatter.decode_merge_meta(bytes));
    }

    #[test]
    fn test_encode_decode_row_hint() {
        let k = b"Hello".to_vec();
        let hint = RowHint {
            header: RowHintHeader {
                expire_timestamp: 12345,
                key_size: k.len(),
                row_offset: 56789,
                row_size: 12345,
            },
            key: k,
        };

        let formatter = FormatterV1 {};
        let mut bs: Vec<u8> = vec![0_u8; 2048];
        formatter.encode_row_hint(&hint, bs.as_mut());
        assert_eq!(hint.header, formatter.decode_row_hint_header(&bs));
    }

    #[test]
    fn test_encode_decode_row() {
        let k = b"Hello".to_vec();
        let v = b"World".to_vec();
        let row = RowToWrite {
            meta: RowMeta {
                expire_timestamp: 12345,
                key_size: k.len(),
                value_size: v.len(),
            },
            key: k,
            value: v,
        };

        let formatter = FormatterV1 {};
        let mut bs: Vec<u8> = vec![0_u8; 2048];

        formatter.encode_row(&row, bs.as_mut());

        assert_eq!(row.meta, formatter.decode_row_header(bs.as_ref()).meta);
    }
}