use super::{
assert::{assert_read_is, assert_read_is_none},
state::ChunkCount,
TestDB,
};
use crate::{batch::BatchResult, BlockKey, size::ByteSize, tests::util::size_bytes};
impl TestDB {
pub async fn batch<Bytes: AsRef<[u8]> + Clone>(
&mut self,
writes: Vec<Bytes>,
frees: Vec<(&BlockKey, Bytes)>,
new_data_files: usize,
) -> Result<Vec<BlockKey>, String> {
let expected_freed_bytes = frees
.iter()
.map(|(_, b)| size_bytes(&b.as_ref(), self.chunk_size))
.sum::<usize>();
let BatchResult {
freed_bytes,
new_block_keys,
} = self
.block_db
.batch(
writes.clone(),
frees.clone().into_iter().map(|(k, _)| k).collect(),
)
.await
.map_err(|err| format!("Failed to batch \n {err:?}"))?;
if freed_bytes != expected_freed_bytes {
return Err(format!(
"{freed_bytes} != {expected_freed_bytes} \n Invalid freed bytes count"
));
}
self.test_state.data_file_count += new_data_files;
let ByteSize {
used_bytes: block_db_used_bytes,
free_bytes: block_db_free_bytes,
} = &mut self.test_state.block_db_byte_size;
for (index, bytes) in writes.iter().enumerate() {
let block_key = new_block_keys[index].clone();
let data_file_id = block_key.data_file_id;
let ByteSize {
used_bytes: data_file_used_bytes,
free_bytes: data_file_free_bytes,
} = self
.test_state
.data_file_byte_size_map
.entry(data_file_id.to_string())
.or_default();
let ChunkCount { used, free } = self
.test_state
.data_file_chunk_count_map
.entry(data_file_id.to_string())
.or_default();
let bytes_size = size_bytes(bytes.as_ref(), self.chunk_size);
let introduced_chunks = bytes_size / self.chunk_size;
let previous_free_chunks = *free;
let new_free_chunks =
(previous_free_chunks as i64 - introduced_chunks as i64).max(0) as usize;
let used_free_chunks = previous_free_chunks - new_free_chunks;
let used_free_bytes = used_free_chunks * self.chunk_size;
let new_introduced_chunks = introduced_chunks - used_free_chunks;
let new_introduced_bytes = new_introduced_chunks * self.chunk_size;
*block_db_used_bytes += new_introduced_bytes + used_free_bytes;
*block_db_free_bytes -= used_free_bytes;
*data_file_used_bytes += new_introduced_bytes + used_free_bytes;
*data_file_free_bytes -= used_free_bytes;
*free = new_free_chunks;
*used += new_introduced_chunks + used_free_chunks;
}
for (block_key, freeing_bytes) in frees {
let data_file_id = &block_key.data_file_id;
let ByteSize {
used_bytes: data_file_used_bytes,
free_bytes: data_file_free_bytes,
} = self
.test_state
.data_file_byte_size_map
.entry(data_file_id.to_string())
.or_default();
let ChunkCount { used, free } = self
.test_state
.data_file_chunk_count_map
.entry(data_file_id.to_string())
.or_default();
let freed_bytes = size_bytes(freeing_bytes.as_ref(), self.chunk_size);
let freed_chunks = freed_bytes / self.chunk_size;
*used -= freed_chunks;
*free += freed_chunks;
*block_db_free_bytes += freed_bytes;
*block_db_used_bytes -= freed_bytes;
*data_file_free_bytes += freed_bytes;
*data_file_used_bytes -= freed_bytes;
assert_read_is_none!(&self.block_db, block_key);
}
for (index, new_block_key) in new_block_keys.iter().enumerate() {
assert_read_is!(&self.block_db, new_block_key, writes[index]);
}
self.test_state
.assert_against_block_db(&self.block_db)
.await?;
Ok(new_block_keys)
}
}