use super::{data_block::DataBlock, util::bytes::BytesChunkIter, DataFile};
use crate::error::Error;
use std::collections::{HashSet, VecDeque};
impl DataFile {
pub async fn attempt_write(
&mut self,
bytes: &[u8],
offset: &mut usize,
new_free_bytes: &mut usize,
new_used_bytes: &mut usize,
used_free_chunks: &mut HashSet<usize>,
new_free_chunk_offsets: &mut VecDeque<usize>,
) -> Result<(String, DataBlock), Error> {
let id = DataBlock::generate_id();
let mut data_block = DataBlock::default();
let chunk_size = self.options_store.chunk_size();
let mut bytes_chunk_iter = BytesChunkIter::new(bytes, chunk_size);
macro_rules! write_chunk {
($chunk:expr, $padding:expr, $offset:expr) => {
if let Some(padding) = $padding {
data_block.padding = padding;
self.write_chunk($offset, $chunk).await?;
self.write_chunk($offset + $chunk.len(), &vec![0; padding])
.await?;
} else {
self.write_chunk($offset, $chunk).await?;
}
};
}
while let Some((chunk, padding)) = bytes_chunk_iter.next() {
if let Some(free_chunk_offset) = new_free_chunk_offsets.pop_front() {
used_free_chunks.insert(free_chunk_offset);
data_block.chunk_offsets.push_back(free_chunk_offset);
write_chunk!(chunk, padding, free_chunk_offset);
*new_free_bytes -= chunk_size;
} else {
data_block.chunk_offsets.push_back(*offset);
write_chunk!(chunk, padding, *offset);
*offset += chunk_size;
}
*new_used_bytes += chunk_size;
}
self.flush().await?;
Ok((id, data_block))
}
pub async fn write<Bytes: AsRef<[u8]>>(&mut self, bytes: Bytes) -> Result<String, Error> {
self.wal_checkpoint_handler().await?;
let mut new_size = self.size;
let mut new_free_bytes = self.free_bytes;
let mut new_used_bytes = self.used_bytes;
let mut used_free_chunks = HashSet::new();
let mut new_free_chunk_offsets = self.free_chunk_offsets.clone();
match self
.attempt_write(
bytes.as_ref(),
&mut new_size,
&mut new_free_bytes,
&mut new_used_bytes,
&mut used_free_chunks,
&mut new_free_chunk_offsets,
)
.await
{
Ok((data_block_id, data_block)) => {
if let Err(err) = self
.wal
.log(&super::wal::DataFileLog::Write {
new_block: data_block.clone(),
data_block_id: data_block_id.clone(),
used_free_chunks,
})
.await
{
self.recover().await?;
return Err(Error::Walr(err));
}
self.size = new_size;
self.free_bytes = new_free_bytes;
self.used_bytes = new_used_bytes;
self.free_chunk_offsets = new_free_chunk_offsets;
self.data_blocks.insert(data_block_id.clone(), data_block);
Ok(data_block_id)
}
Err(err) => {
self.recover().await?;
Err(err)
}
}
}
}