block-db 0.2.0

Local, multi-threaded, durable byte DB.
Documentation
// Authors: Robert Lopez

pub(crate) mod attempt_batch;
pub(crate) mod state;

use crate::{
    data_file::DataFile, error::Error, uncorrupt::UncorruptAction, wal::BlockDBLog, BlockDB,
    BlockKey,
};
use state::{BlockDBUpdateState, DataFileUpdateState, FailedBatchState};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Debug, Clone)]
pub struct BatchResult {
    pub freed_bytes: usize,
    pub new_block_keys: Vec<BlockKey>,
}

impl BlockDB {
    /// Performs a batch of writes followed by a batch of frees,
    /// combined into a single atomic operation.
    ///
    /// Returns:
    /// - A `Vec<BlockKey>` containing the keys for the newly written `DataBlock`s,
    ///   in the same order they were provided.
    /// - The total number of bytes freed by the batch delete.
    ///
    /// ---
    /// - **Atomic**
    /// - **Corruptible**
    ///
    /// ---
    /// Example
    /// ```
    /// let mut block_db = BlockDB::open("./data", None).await?;
    ///
    /// let block_key = block_db.write(b"Shark").await?;
    ///
    /// let BatchResult {
    ///     freed_bytes,
    ///     new_block_keys,
    /// } = block_db
    ///         .batch(
    ///             vec![b"Hello", b"World"],
    ///             vec![&block_key]
    ///         ).await?;
    /// ```
    pub async fn batch<Bytes: AsRef<[u8]>>(
        &mut self,
        writes: Vec<Bytes>,
        frees: Vec<&BlockKey>,
    ) -> Result<BatchResult, Error> {
        macro_rules! return_corrupted_error {
            ($err:expr, $data_file_ids_to_delete:expr, $data_file_inverse_log_map:expr) => {
                return Err(Error::Corrupted {
                    err: Box::new($err),
                    action: UncorruptAction::BadBatchUnwind {
                        data_file_ids_to_delete: $data_file_ids_to_delete,
                        data_file_inverse_log_map: $data_file_inverse_log_map,
                    },
                });
            };
        }

        match self.attempt_batch(writes, frees).await {
            Ok(BlockDBUpdateState {
                logs,
                freed_bytes,
                new_block_keys,
                new_data_file_map,
                data_file_log_map,
                data_file_update_map,
                data_file_inverse_log_map,
            }) => {
                self.wal.write().await.log_many(&logs).await?;

                let added_data_file_ids = logs
                    .iter()
                    .filter_map(|log| {
                        if let BlockDBLog::AddDataFile(data_file_id) = log {
                            Some(data_file_id.to_string())
                        } else {
                            debug_assert!(
                                matches!(log, BlockDBLog::AddDataFile(_)),
                                "BlockDBUpdateState.logs had a invalid Log {log:?}"
                            );
                            None
                        }
                    })
                    .collect::<Vec<String>>();

                let mut data_files_writer = self.data_files.write().await;
                data_files_writer.extend(new_data_file_map);

                let mut logged_data_files: Vec<Arc<RwLock<DataFile>>> = vec![];

                for (data_file_id, data_file_logs) in data_file_log_map {
                    if let Some(data_file) = data_files_writer.get(&data_file_id) {
                        if let Err(err) =
                            data_file.write().await.wal.log_many(&data_file_logs).await
                        {
                            for logged_data_file in logged_data_files {
                                let mut logged_data_file_writer = logged_data_file.write().await;

                                if let Err(err) = logged_data_file_writer.wal.undo().await {
                                    return_corrupted_error!(
                                        Error::Walr(err),
                                        added_data_file_ids,
                                        data_file_inverse_log_map
                                    );
                                } else if let Err(err) = logged_data_file_writer.recover().await {
                                    return_corrupted_error!(
                                        err,
                                        added_data_file_ids,
                                        data_file_inverse_log_map
                                    );
                                }
                            }

                            return_corrupted_error!(
                                Error::Walr(err),
                                added_data_file_ids,
                                data_file_inverse_log_map
                            );
                        }

                        logged_data_files.push(data_file.clone());
                    }
                }

                for (
                    data_file_id,
                    DataFileUpdateState {
                        new_size,
                        new_free_bytes,
                        new_used_bytes,
                        new_data_blocks,
                        new_free_chunk_offsets,
                    },
                ) in data_file_update_map
                {
                    if let Some(data_file) = data_files_writer.get(&data_file_id) {
                        let mut data_file_writer = data_file.write().await;
                        data_file_writer.size = new_size;
                        data_file_writer.free_bytes = new_free_bytes;
                        data_file_writer.used_bytes = new_used_bytes;
                        data_file_writer.data_blocks = new_data_blocks;
                        data_file_writer.free_chunk_offsets = new_free_chunk_offsets;
                    }
                }

                Ok(BatchResult {
                    freed_bytes,
                    new_block_keys,
                })
            }
            Err(FailedBatchState {
                err,
                data_file_inverse_log_map,
            }) => {
                return_corrupted_error!(err, vec![], data_file_inverse_log_map);
            }
        }
    }
}