block-db 0.2.0

Local, multi-threaded, durable byte DB.
Documentation
// Authors: Robert Lopez

use super::{data_block::DataBlock, DataFile};
use crate::{error::Error, options::store::OptionsStore};
use serde::{Deserialize, Serialize};
use std::{
    collections::{HashMap, HashSet, VecDeque},
    ops::{Deref, DerefMut},
    path::PathBuf,
    sync::Arc,
};
use tokio::sync::Mutex;
use walr::{options::WALOptionsBuilder, ReplayHandlerResult, WAL};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DataFileLog {
    Free(String),
    Clear,
    Write {
        new_block: DataBlock,
        data_block_id: String,
        used_free_chunks: HashSet<usize>,
    },
    WriteMany {
        new_blocks: Vec<(String, DataBlock)>,
        used_free_chunks: HashSet<usize>,
    },
    Compact {
        new_size: usize,
        new_data_blocks: HashMap<String, DataBlock>,
    },
    FreeMany(Vec<String>),
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct DataFileState {
    pub free_bytes: usize,
    pub used_bytes: usize,
    pub data_blocks: HashMap<String, DataBlock>,
    pub free_chunk_offsets: VecDeque<usize>,
}

pub type DataFileReplayContext = Arc<OptionsStore>;

#[derive(Debug)]
pub struct DataFileWAL(WAL<DataFileLog, DataFileState, DataFileReplayContext>);

impl Deref for DataFileWAL {
    type Target = WAL<DataFileLog, DataFileState, DataFileReplayContext>;

    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl DerefMut for DataFileWAL {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.0
    }
}

fn replay_handler(
    logs: Vec<DataFileLog>,
    checkpoint: Arc<Mutex<DataFileState>>,
    context: Option<DataFileReplayContext>,
) -> ReplayHandlerResult {
    Box::pin(async move {
        let mut checkpoint = checkpoint.lock().await;
        let options_store = context.unwrap();

        for log in logs {
            match log {
                DataFileLog::Clear => {
                    *checkpoint = DataFileState::default();
                }
                DataFileLog::Write {
                    new_block,
                    data_block_id,
                    used_free_chunks,
                } => {
                    if !used_free_chunks.is_empty() {
                        checkpoint.free_chunk_offsets = checkpoint
                            .free_chunk_offsets
                            .iter()
                            .filter_map(|offset| {
                                if used_free_chunks.contains(offset) {
                                    None
                                } else {
                                    Some(*offset)
                                }
                            })
                            .collect();
                    }

                    checkpoint.used_bytes += new_block.size(&options_store);
                    checkpoint.free_bytes -= used_free_chunks.len() * options_store.chunk_size();
                    checkpoint.data_blocks.insert(data_block_id, new_block);
                }
                DataFileLog::WriteMany {
                    new_blocks,
                    used_free_chunks,
                } => {
                    if !used_free_chunks.is_empty() {
                        checkpoint.free_chunk_offsets = checkpoint
                            .free_chunk_offsets
                            .iter()
                            .filter_map(|offset| {
                                if used_free_chunks.contains(offset) {
                                    None
                                } else {
                                    Some(*offset)
                                }
                            })
                            .collect();
                    }

                    checkpoint.free_bytes -= used_free_chunks.len() * options_store.chunk_size();

                    for (id, new_block) in new_blocks {
                        checkpoint.used_bytes += new_block.size(&options_store);
                        checkpoint.data_blocks.insert(id, new_block);
                    }
                }
                DataFileLog::Free(id) => {
                    if let Some(mut data_block) = checkpoint.data_blocks.remove(&id) {
                        let size = data_block.size(&options_store);

                        checkpoint.used_bytes -= size;
                        checkpoint.free_bytes += size;
                        checkpoint
                            .free_chunk_offsets
                            .append(&mut data_block.chunk_offsets);
                    }
                }
                DataFileLog::FreeMany(ids) => {
                    for id in ids {
                        if let Some(mut data_block) = checkpoint.data_blocks.remove(&id) {
                            let size = data_block.size(&options_store);

                            checkpoint.used_bytes -= size;
                            checkpoint.free_bytes += size;
                            checkpoint
                                .free_chunk_offsets
                                .append(&mut data_block.chunk_offsets);
                        }
                    }
                }
                DataFileLog::Compact {
                    new_size,
                    new_data_blocks,
                } => {
                    checkpoint.used_bytes = new_size;
                    checkpoint.free_bytes = 0;
                    checkpoint.data_blocks = new_data_blocks;
                    checkpoint.free_chunk_offsets.clear();
                }
            }
        }

        Ok(())
    })
}

impl DataFileWAL {
    pub async fn open(data_path: PathBuf) -> Result<Self, Error> {
        Ok(Self(
            WAL::open(
                data_path,
                Box::new(replay_handler),
                Some(
                    WALOptionsBuilder::new()
                        .max_size((size_of::<DataFileLog>() + 2) * 1_000)
                        .build(),
                ),
            )
            .await?,
        ))
    }
}

impl DataFile {
    pub async fn wal_checkpoint_handler(&mut self) -> Result<(), Error> {
        if self.wal.should_checkpoint() {
            self.wal
                .checkpoint(&DataFileState {
                    free_bytes: self.free_bytes,
                    used_bytes: self.used_bytes,
                    data_blocks: self.data_blocks.clone(),
                    free_chunk_offsets: self.free_chunk_offsets.clone(),
                })
                .await?;
        }

        Ok(())
    }
}