use super::{data_block::DataBlock, DataFile};
use crate::{error::Error, options::store::OptionsStore};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet, VecDeque},
ops::{Deref, DerefMut},
path::PathBuf,
sync::Arc,
};
use tokio::sync::Mutex;
use walr::{options::WALOptionsBuilder, ReplayHandlerResult, WAL};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DataFileLog {
Free(String),
Clear,
Write {
new_block: DataBlock,
data_block_id: String,
used_free_chunks: HashSet<usize>,
},
WriteMany {
new_blocks: Vec<(String, DataBlock)>,
used_free_chunks: HashSet<usize>,
},
Compact {
new_size: usize,
new_data_blocks: HashMap<String, DataBlock>,
},
FreeMany(Vec<String>),
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct DataFileState {
pub free_bytes: usize,
pub used_bytes: usize,
pub data_blocks: HashMap<String, DataBlock>,
pub free_chunk_offsets: VecDeque<usize>,
}
pub type DataFileReplayContext = Arc<OptionsStore>;
#[derive(Debug)]
pub struct DataFileWAL(WAL<DataFileLog, DataFileState, DataFileReplayContext>);
impl Deref for DataFileWAL {
type Target = WAL<DataFileLog, DataFileState, DataFileReplayContext>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for DataFileWAL {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
fn replay_handler(
logs: Vec<DataFileLog>,
checkpoint: Arc<Mutex<DataFileState>>,
context: Option<DataFileReplayContext>,
) -> ReplayHandlerResult {
Box::pin(async move {
let mut checkpoint = checkpoint.lock().await;
let options_store = context.unwrap();
for log in logs {
match log {
DataFileLog::Clear => {
*checkpoint = DataFileState::default();
}
DataFileLog::Write {
new_block,
data_block_id,
used_free_chunks,
} => {
if !used_free_chunks.is_empty() {
checkpoint.free_chunk_offsets = checkpoint
.free_chunk_offsets
.iter()
.filter_map(|offset| {
if used_free_chunks.contains(offset) {
None
} else {
Some(*offset)
}
})
.collect();
}
checkpoint.used_bytes += new_block.size(&options_store);
checkpoint.free_bytes -= used_free_chunks.len() * options_store.chunk_size();
checkpoint.data_blocks.insert(data_block_id, new_block);
}
DataFileLog::WriteMany {
new_blocks,
used_free_chunks,
} => {
if !used_free_chunks.is_empty() {
checkpoint.free_chunk_offsets = checkpoint
.free_chunk_offsets
.iter()
.filter_map(|offset| {
if used_free_chunks.contains(offset) {
None
} else {
Some(*offset)
}
})
.collect();
}
checkpoint.free_bytes -= used_free_chunks.len() * options_store.chunk_size();
for (id, new_block) in new_blocks {
checkpoint.used_bytes += new_block.size(&options_store);
checkpoint.data_blocks.insert(id, new_block);
}
}
DataFileLog::Free(id) => {
if let Some(mut data_block) = checkpoint.data_blocks.remove(&id) {
let size = data_block.size(&options_store);
checkpoint.used_bytes -= size;
checkpoint.free_bytes += size;
checkpoint
.free_chunk_offsets
.append(&mut data_block.chunk_offsets);
}
}
DataFileLog::FreeMany(ids) => {
for id in ids {
if let Some(mut data_block) = checkpoint.data_blocks.remove(&id) {
let size = data_block.size(&options_store);
checkpoint.used_bytes -= size;
checkpoint.free_bytes += size;
checkpoint
.free_chunk_offsets
.append(&mut data_block.chunk_offsets);
}
}
}
DataFileLog::Compact {
new_size,
new_data_blocks,
} => {
checkpoint.used_bytes = new_size;
checkpoint.free_bytes = 0;
checkpoint.data_blocks = new_data_blocks;
checkpoint.free_chunk_offsets.clear();
}
}
}
Ok(())
})
}
impl DataFileWAL {
pub async fn open(data_path: PathBuf) -> Result<Self, Error> {
Ok(Self(
WAL::open(
data_path,
Box::new(replay_handler),
Some(
WALOptionsBuilder::new()
.max_size((size_of::<DataFileLog>() + 2) * 1_000)
.build(),
),
)
.await?,
))
}
}
impl DataFile {
pub async fn wal_checkpoint_handler(&mut self) -> Result<(), Error> {
if self.wal.should_checkpoint() {
self.wal
.checkpoint(&DataFileState {
free_bytes: self.free_bytes,
used_bytes: self.used_bytes,
data_blocks: self.data_blocks.clone(),
free_chunk_offsets: self.free_chunk_offsets.clone(),
})
.await?;
}
Ok(())
}
}