block-db 0.2.0

Local, multi-threaded, durable byte DB.
Documentation
// Authors: Robert Lopez

use super::{
    assert::{assert_read_is, assert_read_is_none},
    state::ChunkCount,
    TestDB,
};
use crate::{batch::BatchResult, BlockKey, size::ByteSize, tests::util::size_bytes};

impl TestDB {
    pub async fn batch<Bytes: AsRef<[u8]> + Clone>(
        &mut self,
        writes: Vec<Bytes>,
        frees: Vec<(&BlockKey, Bytes)>,
        new_data_files: usize,
    ) -> Result<Vec<BlockKey>, String> {
        let expected_freed_bytes = frees
            .iter()
            .map(|(_, b)| size_bytes(&b.as_ref(), self.chunk_size))
            .sum::<usize>();

        let BatchResult {
            freed_bytes,
            new_block_keys,
        } = self
            .block_db
            .batch(
                writes.clone(),
                frees.clone().into_iter().map(|(k, _)| k).collect(),
            )
            .await
            .map_err(|err| format!("Failed to batch \n {err:?}"))?;

        if freed_bytes != expected_freed_bytes {
            return Err(format!(
                "{freed_bytes} != {expected_freed_bytes} \n Invalid freed bytes count"
            ));
        }

        self.test_state.data_file_count += new_data_files;

        let ByteSize {
            used_bytes: block_db_used_bytes,
            free_bytes: block_db_free_bytes,
        } = &mut self.test_state.block_db_byte_size;

        for (index, bytes) in writes.iter().enumerate() {
            let block_key = new_block_keys[index].clone();
            let data_file_id = block_key.data_file_id;

            let ByteSize {
                used_bytes: data_file_used_bytes,
                free_bytes: data_file_free_bytes,
            } = self
                .test_state
                .data_file_byte_size_map
                .entry(data_file_id.to_string())
                .or_default();

            let ChunkCount { used, free } = self
                .test_state
                .data_file_chunk_count_map
                .entry(data_file_id.to_string())
                .or_default();

            let bytes_size = size_bytes(bytes.as_ref(), self.chunk_size);

            let introduced_chunks = bytes_size / self.chunk_size;
            let previous_free_chunks = *free;

            let new_free_chunks =
                (previous_free_chunks as i64 - introduced_chunks as i64).max(0) as usize;

            let used_free_chunks = previous_free_chunks - new_free_chunks;
            let used_free_bytes = used_free_chunks * self.chunk_size;

            let new_introduced_chunks = introduced_chunks - used_free_chunks;
            let new_introduced_bytes = new_introduced_chunks * self.chunk_size;

            *block_db_used_bytes += new_introduced_bytes + used_free_bytes;
            *block_db_free_bytes -= used_free_bytes;

            *data_file_used_bytes += new_introduced_bytes + used_free_bytes;
            *data_file_free_bytes -= used_free_bytes;

            *free = new_free_chunks;
            *used += new_introduced_chunks + used_free_chunks;
        }

        for (block_key, freeing_bytes) in frees {
            let data_file_id = &block_key.data_file_id;

            let ByteSize {
                used_bytes: data_file_used_bytes,
                free_bytes: data_file_free_bytes,
            } = self
                .test_state
                .data_file_byte_size_map
                .entry(data_file_id.to_string())
                .or_default();

            let ChunkCount { used, free } = self
                .test_state
                .data_file_chunk_count_map
                .entry(data_file_id.to_string())
                .or_default();

            let freed_bytes = size_bytes(freeing_bytes.as_ref(), self.chunk_size);
            let freed_chunks = freed_bytes / self.chunk_size;

            *used -= freed_chunks;
            *free += freed_chunks;

            *block_db_free_bytes += freed_bytes;
            *block_db_used_bytes -= freed_bytes;

            *data_file_free_bytes += freed_bytes;
            *data_file_used_bytes -= freed_bytes;

            assert_read_is_none!(&self.block_db, block_key);
        }

        for (index, new_block_key) in new_block_keys.iter().enumerate() {
            assert_read_is!(&self.block_db, new_block_key, writes[index]);
        }

        self.test_state
            .assert_against_block_db(&self.block_db)
            .await?;

        Ok(new_block_keys)
    }
}