block-db 0.2.0

Local, multi-threaded, durable byte DB.
Documentation
// Authors: Robert Lopez

use super::{error::Error, BlockDB};
use serde::{Deserialize, Serialize};
use std::{
    collections::HashSet,
    ops::{Deref, DerefMut},
    path::PathBuf,
    sync::Arc,
};
use tokio::sync::Mutex;
use walr::{options::WALOptionsBuilder, ReplayHandlerResult, WAL};

#[derive(Debug, Serialize, Deserialize)]
pub enum BlockDBLog {
    AddDataFile(String),
    AddDataFiles(HashSet<String>),
    DeleteDataFile(String),
    DeleteDataFiles,
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BlockDBState {
    pub valid_data_files: HashSet<String>,
}

#[derive(Debug)]
pub struct BlockDBWAL(WAL<BlockDBLog, BlockDBState, ()>);

impl Deref for BlockDBWAL {
    type Target = WAL<BlockDBLog, BlockDBState, ()>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl DerefMut for BlockDBWAL {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

fn replay_handler(
    logs: Vec<BlockDBLog>,
    checkpoint: Arc<Mutex<BlockDBState>>,
    _context: Option<()>,
) -> ReplayHandlerResult {
    Box::pin(async move {
        let mut checkpoint = checkpoint.lock().await;

        for log in logs {
            match log {
                BlockDBLog::AddDataFile(id) => {
                    checkpoint.valid_data_files.insert(id);
                }
                BlockDBLog::AddDataFiles(ids) => {
                    checkpoint.valid_data_files.extend(ids);
                }
                BlockDBLog::DeleteDataFile(ref id) => {
                    checkpoint.valid_data_files.remove(id);
                }
                BlockDBLog::DeleteDataFiles => {
                    checkpoint.valid_data_files.clear();
                }
            }
        }

        Ok(())
    })
}

impl BlockDBWAL {
    pub async fn open(data_path: PathBuf) -> Result<Self, Error> {
        Ok(Self(
            WAL::open(
                data_path,
                Box::new(replay_handler),
                Some(
                    WALOptionsBuilder::new()
                        .max_size((size_of::<BlockDBLog>() + 2) * 1_000)
                        .build(),
                ),
            )
            .await?,
        ))
    }
}

impl BlockDB {
    pub async fn wal_checkpoint_handler(
        &self,
        wal_writer: &mut WAL<BlockDBLog, BlockDBState, ()>,
    ) -> Result<(), Error> {
        if wal_writer.should_checkpoint() {
            wal_writer
                .checkpoint(&BlockDBState {
                    valid_data_files: self.data_files.read().await.keys().cloned().collect(),
                })
                .await?;
        }

        Ok(())
    }
}