block-db 0.2.0

Local, multi-threaded, durable byte DB.
Documentation
// Authors: Robert Lopez

use crate::{data_file::wal::DataFileLog, error::Error, BlockDB, ConfirmDestructiveAction};
use serde::{Deserialize, Serialize};
use std::{
    collections::{HashMap, VecDeque},
    path::PathBuf,
};
use tokio::fs::rename;

/// Action to resolve to corruption of a BlockDB
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UncorruptAction {
    Clear {
        data_file_id: String,
    },
    Recover,
    BadBatchUnwind {
        data_file_ids_to_delete: Vec<String>,
        data_file_inverse_log_map: HashMap<String, Vec<DataFileLog>>,
    },
    DataFileCompactMove {
        path: PathBuf,
        temp_path: PathBuf,
    },
}

impl BlockDB {
    /// Attempts to resolve a `BlockDB` corruption state.
    ///
    /// Used in response to an `Error::Corrupted`, this method attempts to
    /// automatically resolve the issue by performing recovery steps.
    ///
    /// The only error this method can return is another `Error::Corrupted`,
    /// indicating that the corruption has not yet been resolved and recovery
    /// should be retried. This allows multiple safe attempts to uncorrupt
    /// the database.
    ///
    /// If specific `DataFile`s remain deadlocked — even after ensuring
    /// filesystem and hardware stability — it likely indicates unrecoverable
    /// corruption in the WAL or binary file contents.
    ///
    /// ---
    /// Example
    /// ``` let mut block_db = BlockDB::open("./data", None).await?;
    ///
    /// // ...
    ///
    /// if let Err(err) = block_db
    ///     .batch(vec![b"Hello", b"World"], vec![])
    ///     .await
    /// {
    ///     if let Error::Corrupted { action, .. } = err {
    ///         if let Err(Error::Corrupted { action, .. }) = block_db.uncorrupt(action).await {
    ///             // Store this action to process after ensuring stability
    ///         }
    ///     }
    /// }
    /// ```
    pub async fn uncorrupt(&mut self, action: UncorruptAction) -> Result<(), Error> {
        match action {
            UncorruptAction::Clear { data_file_id } => {
                if let Some(data_file) = self.data_files.read().await.get(&data_file_id) {
                    let mut data_file_writer = data_file.write().await;
                    data_file_writer
                        .truncate(0)
                        .await
                        .map_err(|err| Error::Corrupted {
                            err: Box::new(err),
                            action: UncorruptAction::Clear { data_file_id },
                        })?;

                    data_file_writer.size = 0;
                    data_file_writer.free_bytes = 0;
                    data_file_writer.used_bytes = 0;
                    data_file_writer.data_blocks = HashMap::new();
                    data_file_writer.free_chunk_offsets = VecDeque::new();
                }
            }
            UncorruptAction::Recover => {
                self.recover().await.map_err(|err| Error::Corrupted {
                    err: Box::new(err),
                    action: UncorruptAction::Recover,
                })?;
            }
            UncorruptAction::BadBatchUnwind {
                data_file_ids_to_delete,
                data_file_inverse_log_map,
            } => {
                let mut data_file_ids_to_delete_queue = VecDeque::from(data_file_ids_to_delete);

                while let Some(data_file_id) = data_file_ids_to_delete_queue.pop_front() {
                    if let Err(err) = self
                        .delete_data_file(data_file_id, ConfirmDestructiveAction::IKnowWhatImDoing)
                        .await
                    {
                        return Err(Error::Corrupted {
                            err: Box::new(err),
                            action: UncorruptAction::BadBatchUnwind {
                                data_file_ids_to_delete: data_file_ids_to_delete_queue.into(),
                                data_file_inverse_log_map,
                            },
                        });
                    }
                }

                let data_files_reader = self.data_files.read().await;
                let mut data_file_inverse_logs_queue = data_file_inverse_log_map
                    .into_iter()
                    .collect::<VecDeque<(String, Vec<DataFileLog>)>>(
                );

                while let Some((data_file_id, data_file_logs)) =
                    data_file_inverse_logs_queue.pop_front()
                {
                    if let Some(data_file) = data_files_reader.get(&data_file_id) {
                        if let Err(err) =
                            data_file.write().await.wal.log_many(&data_file_logs).await
                        {
                            return Err(Error::Corrupted {
                                err: Box::new(Error::Walr(err)),
                                action: UncorruptAction::BadBatchUnwind {
                                    data_file_ids_to_delete: vec![],
                                    data_file_inverse_log_map: data_file_inverse_logs_queue
                                        .into_iter()
                                        .collect(),
                                },
                            });
                        }
                    } else {
                        debug_assert!(
                            data_files_reader.get(&data_file_id).is_some(),
                            "BlockDB::uncorrupt was passed a non-existent DataFile ID {data_file_id}"
                        );
                    }
                }

                drop(data_files_reader);
                self.recover().await.map_err(|err| Error::Corrupted {
                    err: Box::new(err),
                    action: UncorruptAction::Recover,
                })?;
            }
            UncorruptAction::DataFileCompactMove { path, temp_path } => {
                rename(&temp_path, &path)
                    .await
                    .map_err(|err| Error::Corrupted {
                        err: Box::new(Error::IO(err)),
                        action: UncorruptAction::DataFileCompactMove { path, temp_path },
                    })?;
            }
        }

        Ok(())
    }
}