use super::{error::Error, BlockDB};
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
ops::{Deref, DerefMut},
path::PathBuf,
sync::Arc,
};
use tokio::sync::Mutex;
use walr::{options::WALOptionsBuilder, ReplayHandlerResult, WAL};
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockDBLog {
AddDataFile(String),
AddDataFiles(HashSet<String>),
DeleteDataFile(String),
DeleteDataFiles,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct BlockDBState {
pub valid_data_files: HashSet<String>,
}
#[derive(Debug)]
pub struct BlockDBWAL(WAL<BlockDBLog, BlockDBState, ()>);
impl Deref for BlockDBWAL {
type Target = WAL<BlockDBLog, BlockDBState, ()>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for BlockDBWAL {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
fn replay_handler(
logs: Vec<BlockDBLog>,
checkpoint: Arc<Mutex<BlockDBState>>,
_context: Option<()>,
) -> ReplayHandlerResult {
Box::pin(async move {
let mut checkpoint = checkpoint.lock().await;
for log in logs {
match log {
BlockDBLog::AddDataFile(id) => {
checkpoint.valid_data_files.insert(id);
}
BlockDBLog::AddDataFiles(ids) => {
checkpoint.valid_data_files.extend(ids);
}
BlockDBLog::DeleteDataFile(ref id) => {
checkpoint.valid_data_files.remove(id);
}
BlockDBLog::DeleteDataFiles => {
checkpoint.valid_data_files.clear();
}
}
}
Ok(())
})
}
impl BlockDBWAL {
pub async fn open(data_path: PathBuf) -> Result<Self, Error> {
Ok(Self(
WAL::open(
data_path,
Box::new(replay_handler),
Some(
WALOptionsBuilder::new()
.max_size((size_of::<BlockDBLog>() + 2) * 1_000)
.build(),
),
)
.await?,
))
}
}
impl BlockDB {
pub async fn wal_checkpoint_handler(
&self,
wal_writer: &mut WAL<BlockDBLog, BlockDBState, ()>,
) -> Result<(), Error> {
if wal_writer.should_checkpoint() {
wal_writer
.checkpoint(&BlockDBState {
valid_data_files: self.data_files.read().await.keys().cloned().collect(),
})
.await?;
}
Ok(())
}
}