use super::state::{BlockDBUpdateState, FailedBatchState};
use crate::{
batch::state::DataFileUpdateState,
data_file::{wal::DataFileLog, DataFile},
wal::BlockDBLog,
BlockDB, BlockKey,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::RwLock;
impl BlockDB {
pub(crate) async fn attempt_batch<Bytes: AsRef<[u8]>>(
&mut self,
writes: Vec<Bytes>,
frees: Vec<&BlockKey>,
) -> Result<BlockDBUpdateState, FailedBatchState> {
let mut writes_index = 0;
let mut full_data_file_ids: HashSet<String> = HashSet::new();
let mut used_free_chunk_map = HashMap::new();
let mut block_db_update_state = BlockDBUpdateState::default();
let max_file_size = self.options_store.max_file_size();
macro_rules! attempt_write {
($data_file_writer:expr, $data_file_id:expr) => {
let data_file_logs = block_db_update_state
.data_file_log_map
.entry($data_file_id.clone())
.or_default();
let data_file_inverse_logs = block_db_update_state
.data_file_inverse_log_map
.entry($data_file_id.clone())
.or_default();
let used_free_chunks = used_free_chunk_map
.entry($data_file_id.clone())
.or_default();
let DataFileUpdateState {
new_size,
new_free_bytes,
new_used_bytes,
new_data_blocks,
new_free_chunk_offsets,
} = block_db_update_state
.data_file_update_map
.entry($data_file_id.clone())
.or_insert(DataFileUpdateState::from(&*$data_file_writer));
while !full_data_file_ids.contains(&$data_file_id) && writes_index < writes.len() {
match $data_file_writer
.attempt_write(
writes[writes_index].as_ref(),
new_size,
new_free_bytes,
new_used_bytes,
used_free_chunks,
new_free_chunk_offsets,
)
.await
{
Ok((data_block_id, data_block)) => {
if *new_size >= max_file_size {
full_data_file_ids.insert($data_file_id.clone());
}
data_file_logs.push(DataFileLog::Write {
new_block: data_block.clone(),
data_block_id: data_block_id.clone(),
used_free_chunks: used_free_chunks.clone(),
});
data_file_inverse_logs.push(DataFileLog::Free(data_block_id.clone()));
new_data_blocks.insert(data_block_id.clone(), data_block);
block_db_update_state.new_block_keys.push(BlockKey {
data_file_id: $data_file_id.clone(),
data_block_id,
});
writes_index += 1;
}
Err(err) => {
return block_db_update_state.to_failed_state(err);
}
}
}
};
}
for (data_file_id, data_file) in self.data_files.read().await.clone() {
if writes_index >= writes.len() {
break;
}
if full_data_file_ids.contains(&data_file_id) {
continue;
}
attempt_write!(data_file.write().await, data_file_id);
}
while writes_index < writes.len() {
let new_data_file_id = DataFile::generate_id();
let new_data_file_path = self.path.join(format!("data_files/{new_data_file_id}"));
match DataFile::open(
new_data_file_id.clone(),
new_data_file_path.clone(),
self.options_store.clone(),
)
.await
{
Ok(mut data_file) => {
attempt_write!(&mut data_file, new_data_file_id.clone());
block_db_update_state
.logs
.push(BlockDBLog::AddDataFile(new_data_file_id.clone()));
block_db_update_state
.new_data_file_map
.insert(new_data_file_id, Arc::new(RwLock::new(data_file)));
}
Err(err) => {
return block_db_update_state.to_failed_state(err);
}
}
}
let data_files_reader = self.data_files.read().await;
for BlockKey {
data_file_id,
data_block_id,
} in frees
{
if let Some(data_file) = data_files_reader.get(data_file_id) {
let DataFileUpdateState {
new_free_bytes,
new_used_bytes,
new_data_blocks,
new_free_chunk_offsets,
..
} = block_db_update_state
.data_file_update_map
.entry(data_file_id.to_string())
.or_insert(DataFileUpdateState::from(&data_file.write().await));
if let Some(mut data_block) = new_data_blocks.remove(data_block_id) {
let data_block_size = data_block.size(&self.options_store);
*new_free_bytes += data_block_size;
*new_used_bytes -= data_block_size;
new_free_chunk_offsets.append(&mut data_block.chunk_offsets);
block_db_update_state.freed_bytes += data_block_size;
block_db_update_state
.data_file_log_map
.entry(data_file_id.to_string())
.or_default()
.push(DataFileLog::Free(data_block_id.to_string()));
}
}
}
Ok(block_db_update_state)
}
}