block-db 0.2.0

Local, multi-threaded, durable byte DB.
Documentation
// Authors: Robert Lopez

use super::state::{BlockDBUpdateState, FailedBatchState};
use crate::{
    batch::state::DataFileUpdateState,
    data_file::{wal::DataFileLog, DataFile},
    wal::BlockDBLog,
    BlockDB, BlockKey,
};
use std::{
    collections::{HashMap, HashSet},
    sync::Arc,
};
use tokio::sync::RwLock;

impl BlockDB {
    pub(crate) async fn attempt_batch<Bytes: AsRef<[u8]>>(
        &mut self,
        writes: Vec<Bytes>,
        frees: Vec<&BlockKey>,
    ) -> Result<BlockDBUpdateState, FailedBatchState> {
        let mut writes_index = 0;
        let mut full_data_file_ids: HashSet<String> = HashSet::new();
        let mut used_free_chunk_map = HashMap::new();
        let mut block_db_update_state = BlockDBUpdateState::default();
        let max_file_size = self.options_store.max_file_size();

        macro_rules! attempt_write {
            ($data_file_writer:expr, $data_file_id:expr) => {
                let data_file_logs = block_db_update_state
                    .data_file_log_map
                    .entry($data_file_id.clone())
                    .or_default();

                let data_file_inverse_logs = block_db_update_state
                    .data_file_inverse_log_map
                    .entry($data_file_id.clone())
                    .or_default();

                let used_free_chunks = used_free_chunk_map
                    .entry($data_file_id.clone())
                    .or_default();

                let DataFileUpdateState {
                    new_size,
                    new_free_bytes,
                    new_used_bytes,
                    new_data_blocks,
                    new_free_chunk_offsets,
                } = block_db_update_state
                    .data_file_update_map
                    .entry($data_file_id.clone())
                    .or_insert(DataFileUpdateState::from(&*$data_file_writer));

                while !full_data_file_ids.contains(&$data_file_id) && writes_index < writes.len() {
                    match $data_file_writer
                        .attempt_write(
                            writes[writes_index].as_ref(),
                            new_size,
                            new_free_bytes,
                            new_used_bytes,
                            used_free_chunks,
                            new_free_chunk_offsets,
                        )
                        .await
                    {
                        Ok((data_block_id, data_block)) => {
                            if *new_size >= max_file_size {
                                full_data_file_ids.insert($data_file_id.clone());
                            }

                            data_file_logs.push(DataFileLog::Write {
                                new_block: data_block.clone(),
                                data_block_id: data_block_id.clone(),
                                used_free_chunks: used_free_chunks.clone(),
                            });
                            data_file_inverse_logs.push(DataFileLog::Free(data_block_id.clone()));

                            new_data_blocks.insert(data_block_id.clone(), data_block);

                            block_db_update_state.new_block_keys.push(BlockKey {
                                data_file_id: $data_file_id.clone(),
                                data_block_id,
                            });

                            writes_index += 1;
                        }
                        Err(err) => {
                            return block_db_update_state.to_failed_state(err);
                        }
                    }
                }
            };
        }

        // 1: Write to non-full DataFile(s)
        for (data_file_id, data_file) in self.data_files.read().await.clone() {
            if writes_index >= writes.len() {
                break;
            }

            if full_data_file_ids.contains(&data_file_id) {
                continue;
            }

            attempt_write!(data_file.write().await, data_file_id);
        }

        // 2: Create new DataFile(s) to house the remaining writes
        while writes_index < writes.len() {
            let new_data_file_id = DataFile::generate_id();
            let new_data_file_path = self.path.join(format!("data_files/{new_data_file_id}"));

            match DataFile::open(
                new_data_file_id.clone(),
                new_data_file_path.clone(),
                self.options_store.clone(),
            )
            .await
            {
                Ok(mut data_file) => {
                    attempt_write!(&mut data_file, new_data_file_id.clone());

                    block_db_update_state
                        .logs
                        .push(BlockDBLog::AddDataFile(new_data_file_id.clone()));

                    block_db_update_state
                        .new_data_file_map
                        .insert(new_data_file_id, Arc::new(RwLock::new(data_file)));
                }
                Err(err) => {
                    return block_db_update_state.to_failed_state(err);
                }
            }
        }

        // 3: Process frees last to avoid loss of valid data on rollback
        let data_files_reader = self.data_files.read().await;

        for BlockKey {
            data_file_id,
            data_block_id,
        } in frees
        {
            if let Some(data_file) = data_files_reader.get(data_file_id) {
                let DataFileUpdateState {
                    new_free_bytes,
                    new_used_bytes,
                    new_data_blocks,
                    new_free_chunk_offsets,
                    ..
                } = block_db_update_state
                    .data_file_update_map
                    .entry(data_file_id.to_string())
                    .or_insert(DataFileUpdateState::from(&data_file.write().await));

                if let Some(mut data_block) = new_data_blocks.remove(data_block_id) {
                    let data_block_size = data_block.size(&self.options_store);

                    *new_free_bytes += data_block_size;
                    *new_used_bytes -= data_block_size;
                    new_free_chunk_offsets.append(&mut data_block.chunk_offsets);

                    block_db_update_state.freed_bytes += data_block_size;

                    block_db_update_state
                        .data_file_log_map
                        .entry(data_file_id.to_string())
                        .or_default()
                        .push(DataFileLog::Free(data_block_id.to_string()));
                }
            }
        }

        Ok(block_db_update_state)
    }
}