tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::path::PathBuf;

use bytes::BytesMut;
use tempest_io::{Io, IoBuf, OpenOptions, Statx};
use tempest_rt::{close_file, open_file, read_exact, stat_file};
use zerocopy::Ref;

use crate::{
    base::Comparer,
    sst::{SST_MAGICNUM, SstFooter, SstReadError, bloom::BloomFilter, index::BlockIndex},
};

pub(crate) struct SstHandle<I: Io, C: Comparer> {
    pub(super) fd: I::Fd,
    pub(super) bloom_filter: BloomFilter,
    pub(super) block_index: BlockIndex<C>,
}

impl<I: Io, C: Comparer> SstHandle<I, C> {
    pub(crate) async fn close(self) -> Result<(), SstReadError> {
        close_file::<I>(self.fd).await?;
        Ok(())
    }
}

#[instrument(level = "debug")]
pub(crate) async fn load<I: Io, C: Comparer>(
    path: PathBuf,
) -> Result<SstHandle<I, C>, SstReadError> {
    let fd = open_file::<I>(path, OpenOptions::new().read(true)).await?;
    let statx = stat_file::<I>(fd).await?;
    let file_size = statx.stx_size();
    debug!(file_size, "opened SST file");

    const FOOTER_SIZE: usize = size_of::<SstFooter>();
    let footer_offset = file_size - FOOTER_SIZE as u64;

    // PERF: we can initialize a small fixed size buffer here, which saves us from doing a heap
    // allocation. the array gets stored in the future, which is fine for small buffers
    let (result, s) = read_exact::<_, I>(fd, [0u8; FOOTER_SIZE], footer_offset).await;
    result?;
    let footer = Ref::<_, SstFooter>::from_bytes(s.as_ref())
        .expect("footer buf should have read exactly size_of(footer) bytes");
    // TODO: validate checksum
    if footer.magic != *SST_MAGICNUM {
        close_file::<I>(fd).await.unwrap();
        return Err(SstReadError::InvalidMagic);
    }
    debug!(?footer, "read SST footer");

    // -- read bloom filter --
    // TODO: validate checksum
    let bloom_size = footer.bloom_size.get() as usize;
    let bloom_offset = footer.bloom_offset.get();
    let bloom_buf = BytesMut::with_capacity(bloom_size);
    let (result, s) = read_exact::<_, I>(fd, bloom_buf.slice(..bloom_size), bloom_offset).await;
    result?;
    let bloom_bits = s.into_inner().freeze();
    assert_eq!(bloom_bits.len(), bloom_size);
    let bloom_filter = BloomFilter::from_parts(bloom_bits, footer.bloom_footer.clone());
    debug!(size = bloom_size, "read bloom filter");

    // -- read index --
    // TODO: validate checksum
    let index_size = footer.index_size.get() as usize;
    let index_offset = footer.index_offset.get();
    let index_buf = BytesMut::with_capacity(index_size);
    let (result, s) = read_exact::<_, I>(fd, index_buf.slice(..index_size), index_offset).await;
    result?;
    let index_raw = s.into_inner().freeze();
    assert_eq!(index_raw.len(), index_size);
    let block_index = BlockIndex::new(index_raw);
    debug!(size = index_size, "read block index");

    debug!("SST loaded");
    Ok(SstHandle {
        fd,
        bloom_filter,
        block_index,
    })
}