tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::{mem::replace, path::PathBuf};

use bytes::{Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use tempest_core::utils::{ByteSize, HexU64};
use tempest_io::{Io, OpenOptions};
use tempest_rt::{close_file, open_file, write_exact};
use zerocopy::IntoBytes;

use crate::{
    StorageError,
    base::{Comparer, SeqNum},
    config::SstWriteConfig,
    iterator::StorageIterator,
    sst::{
        SstFooter,
        block::{BlockBuilder, BlockBuilderStatus},
        bloom::BloomFilterBuilder,
        index::IndexBuilder,
    },
};

#[derive(derive_more::Debug, Clone, Serialize, Deserialize)]
#[debug(
    "SstWriteStats {{ file_size: {:?}, entry_count: {}, key_range: [{:?}, {:?}], seqnum_range: [{}, {}] }}",
    ByteSize(*file_size), entry_count, min_key, max_key, min_seqnum.get(), max_seqnum.get(),
)]
pub(crate) struct SstWriteStats {
    pub(crate) file_size: u64,
    pub(crate) entry_count: u64,
    pub(crate) min_key: Bytes,
    pub(crate) max_key: Bytes,
    pub(crate) min_seqnum: SeqNum,
    pub(crate) max_seqnum: SeqNum,
}

impl Default for SstWriteStats {
    fn default() -> Self {
        Self {
            file_size: 0,
            entry_count: 0,
            min_key: Bytes::new(),
            max_key: Bytes::new(),
            // NB: will automatically fall back to the minimum on the first write
            min_seqnum: SeqNum::MAX,
            // NB: will automatically fall back to the maximum on the first write
            max_seqnum: SeqNum::ZERO,
        }
    }
}

#[instrument(skip(config, source), level = "debug")]
pub(crate) async fn write<I, C, S>(
    path: PathBuf,
    entry_estimate: u64,
    config: SstWriteConfig,
    mut source: S,
) -> Result<SstWriteStats, StorageError>
where
    I: Io,
    C: Comparer,
    S: StorageIterator<I, C>,
{
    // -- create file --
    let fd = open_file::<I>(
        path,
        OpenOptions::new().write(true).create(true).truncate(true),
    )
    .await?;
    let mut filepos = 0;

    // -- set up builders and begin stat tracking --
    let mut stats = SstWriteStats::default();

    let mut block_builder =
        BlockBuilder::new(config.block_target_size, config.block_restart_interval);
    let mut bloom_filter_builder =
        BloomFilterBuilder::new(entry_estimate as usize, config.bloom_false_positive_rate);
    let mut index_builder = IndexBuilder::new();

    let mut last_key = None;
    let mut block_count = 0u64;
    let mut total_blocks_size = 0u64;

    // -- write blocks --
    while let Some((key, value)) = source.next().await? {
        let status = block_builder.write_entry(key.key(), key.trailer(), value);

        if stats.entry_count == 0 {
            stats.min_key = key.key().clone();
        }
        stats.max_key = key.key().clone();
        stats.min_seqnum = stats.min_seqnum.min(key.trailer().seqnum());
        stats.max_seqnum = stats.max_seqnum.max(key.trailer().seqnum());
        stats.entry_count += 1;

        let (prefix, _) = key.split_up();
        bloom_filter_builder.insert(prefix);

        last_key = Some(key.clone());

        if status == BlockBuilderStatus::Full {
            let block = replace(
                &mut block_builder,
                BlockBuilder::new(config.block_target_size, config.block_restart_interval),
            )
            .finalize();
            let offset = filepos;
            let block_size = block.len() as u64;
            filepos += block_size;
            total_blocks_size += block_size;
            index_builder.push(&last_key.take().unwrap(), offset, block.len() as u32);
            trace!(block = block_count, size = ?ByteSize(block_size), offset = ?HexU64(offset), "flushed block");
            block_count += 1;
            write_exact::<_, I>(fd, block, offset).await.0?;
        }
    }

    // NB: most of the time, the source is exhausted, while in the middle of writing a block,
    // which forces us to flush the block builder one last time
    if block_builder.get_status() != BlockBuilderStatus::Empty {
        let block = replace(
            &mut block_builder,
            BlockBuilder::new(config.block_target_size, config.block_restart_interval),
        )
        .finalize();
        let offset = filepos;
        let block_size = block.len() as u64;
        filepos += block_size;
        total_blocks_size += block_size;
        index_builder.push(&last_key.take().unwrap(), offset, block.len() as u32);
        trace!(block = block_count, size = ?ByteSize(block_size), offset = ?HexU64(offset), "flushed block");
        block_count += 1;
        write_exact::<_, I>(fd, block, offset).await.0?;
    }
    debug!(block_count, size = ?ByteSize(total_blocks_size), "wrote all blocks");

    let bloom_filter = bloom_filter_builder.finalize();
    let bloom_footer = bloom_filter.footer();
    let bloom_bits = bloom_filter.bits().clone();

    let index = index_builder.finalize();

    // -- write sst bloom filter --
    let bloom_offset = filepos;
    let bloom_size = bloom_bits.len();
    filepos += bloom_size as u64;
    write_exact::<_, I>(fd, bloom_bits, bloom_offset).await.0?;
    debug!(size = ?ByteSize(bloom_size as u64), offset = ?HexU64(bloom_offset), "wrote bloom filter");

    // -- write sst index --
    let index_offset = filepos;
    let index_size = index.len();
    filepos += index_size as u64;
    write_exact::<_, I>(fd, index, index_offset).await.0?;
    debug!(size=?ByteSize(index_size as u64), offset=?HexU64(index_offset), "wrote index");

    // -- write sst footer --
    let footer = SstFooter::new(
        bloom_offset,
        bloom_size.try_into().expect("bloom size exceeds 4GiB"),
        bloom_footer,
        index_offset,
        index_size.try_into().expect("index size exceeds 4GiB"),
    );
    let footer_offset = filepos;
    let footer_bytes = footer.as_bytes();
    let footer_size = footer_bytes.len();
    let mut footer_buf = BytesMut::with_capacity(footer_size);
    footer_buf.extend_from_slice(footer_bytes);
    filepos += footer_size as u64;
    write_exact::<_, I>(fd, footer_buf, footer_offset).await.0?;
    debug!(offset = ?HexU64(footer_offset), "wrote sst footer");

    stats.file_size = filepos;

    close_file::<I>(fd).await?;

    debug!(?stats, "finished writing sst");
    Ok(stats)
}