use super::{wal::DataFileLog, DataFile};
use crate::{error::Error, uncorrupt::UncorruptAction};
use std::collections::{HashMap, VecDeque};
use tokio::{
fs::{remove_file, rename, OpenOptions},
io::AsyncWriteExt,
};
impl DataFile {
pub async fn compact(&mut self) -> Result<usize, Error> {
if self.free_bytes == 0 {
return Ok(0);
}
self.wal_checkpoint_handler().await?;
let path = self.path.clone();
let temp_path = self.directory.join("temp-df");
let mut new_size = 0;
let mut temp_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)
.await?;
let mut new_data_blocks = HashMap::new();
for (id, mut data_block) in self.data_blocks.clone() {
let chunks = data_block.chunk_offsets.len();
let mut new_chunk_offsets = VecDeque::with_capacity(chunks);
for chunk_offset in data_block.chunk_offsets.iter() {
temp_file
.write_all(&self.read_chunk_with_padding(*chunk_offset).await?)
.await?;
new_chunk_offsets.push_back(new_size);
new_size += self.options_store.chunk_size();
}
if !new_chunk_offsets.is_empty() {
data_block.chunk_offsets = new_chunk_offsets;
new_data_blocks.insert(id, data_block);
}
}
self.wal
.log(&DataFileLog::Compact {
new_size,
new_data_blocks: new_data_blocks.clone(),
})
.await?;
if let Err(err) = rename(&temp_path, &path).await {
self.wal.undo().await.map_err(|err| Error::Corrupted {
err: Box::new(Error::Walr(err)),
action: UncorruptAction::DataFileCompactMove {
path,
temp_path: temp_path.clone(),
},
})?;
remove_file(temp_path).await?;
return Err(Error::IO(err));
}
let freed_bytes = self.free_chunk_offsets.len() * self.options_store.chunk_size();
self.file = temp_file;
self.size = new_size;
self.used_bytes = new_size;
self.free_bytes = 0;
self.data_blocks = new_data_blocks;
self.free_chunk_offsets.clear();
Ok(freed_bytes)
}
}