pub mod batch;
pub mod error;
pub mod options;
pub mod size;
pub mod uncorrupt;
pub(crate) mod clear_data_file;
pub(crate) mod compact_data_file;
pub(crate) mod data_file;
pub(crate) mod delete;
pub(crate) mod delete_data_file;
pub(crate) mod free;
pub(crate) mod read;
pub(crate) mod recover;
pub(crate) mod recover_data_file;
pub(crate) mod util;
pub(crate) mod wal;
pub(crate) mod write;
#[cfg(test)]
mod tests;
use data_file::DataFile;
use error::Error;
use options::{store::OptionsStore, BlockDBOptions};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::RwLock;
use util::fs::{create_directory, delete_directory, delete_temp_files, DirectoryIter};
use wal::{BlockDBState, BlockDBWAL};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct BlockKey {
pub data_file_id: String,
pub data_block_id: String,
}
pub enum ConfirmDestructiveAction {
IKnowWhatImDoing,
}
#[derive(Debug, Clone)]
pub struct BlockDB {
pub path: Arc<PathBuf>,
pub options_store: Arc<OptionsStore>,
pub(crate) wal: Arc<RwLock<BlockDBWAL>>,
pub(crate) data_files: Arc<RwLock<HashMap<String, Arc<RwLock<DataFile>>>>>,
}
impl BlockDB {
pub async fn open<P: AsRef<Path>>(
path: P,
options: Option<BlockDBOptions>,
) -> Result<Self, Error> {
let path = path.as_ref().to_path_buf();
create_directory(&path).await?;
let mut wal = BlockDBWAL::open(path.join("wal")).await?;
let BlockDBState { valid_data_files } = wal.replay(None).await?;
let options_store_path = path.join("options_store");
create_directory(&options_store_path).await?;
delete_temp_files(&options_store_path).await?;
let options_store = Arc::new(OptionsStore::open(options_store_path, options).await?);
let data_files_path = path.join("data_files");
create_directory(&data_files_path).await?;
let mut data_files = HashMap::new();
let mut data_files_directory_iter = DirectoryIter::new(&data_files_path).await?;
while let Some((directory_name, directory_path)) = data_files_directory_iter.next().await? {
if valid_data_files.contains(&directory_name) {
delete_temp_files(&directory_path).await?;
let data_file = DataFile::open(
directory_name.clone(),
directory_path,
options_store.clone(),
)
.await?;
data_files.insert(directory_name, Arc::new(RwLock::new(data_file)));
} else {
delete_directory(&directory_path).await?;
}
}
Ok(Self {
wal: Arc::new(RwLock::new(wal)),
path: Arc::new(path),
data_files: Arc::new(RwLock::new(data_files)),
options_store,
})
}
}