use crate::errors::Errors;
use crate::options::DharmaOpts;
use crate::storage::block::{create_blocks, write_block_to_disk, Block, Value};
use crate::storage::sorted_string_table_reader::SSTableReader;
use crate::traits::{ResourceKey, ResourceValue};
use std::fs::{remove_file, File};
use std::path::{Path, PathBuf};
const WRITE_AHEAD_LOG_NAME: &str = "wal.log";
pub struct WriteAheadLog {
options: DharmaOpts,
writer: File,
}
impl WriteAheadLog {
pub fn create(options: DharmaOpts) -> Result<WriteAheadLog, Errors> {
let path = format!("{0}/{1}", options.path, WRITE_AHEAD_LOG_NAME);
if !Path::new(&path).exists() {
let file_result = File::create(path);
if file_result.is_ok() {
let writer: File = file_result.unwrap();
return Ok(WriteAheadLog {
options: options.clone(),
writer,
});
}
return Err(Errors::WAL_LOG_CREATION_FAILED);
}
Err(Errors::DB_PATH_DIRTY)
}
pub fn append<K: ResourceKey, V: ResourceValue>(
&mut self,
key: K,
value: V,
) -> Result<(), Errors> {
let value = Value::new(key, value);
let mut blocks: Vec<Block> = Vec::new();
create_blocks(&self.options, &vec![value], &mut blocks);
for block in blocks {
let write_result = write_block_to_disk(&self.options, &mut self.writer, &block);
if write_result.is_err() {
return Err(Errors::WAL_WRITE_FAILED);
}
}
Ok(())
}
pub fn reset(&mut self) -> Result<WriteAheadLog, Errors> {
let delete_wal_result = self.cleanup();
if delete_wal_result.is_ok() {
return WriteAheadLog::create(self.options.clone());
}
Err(Errors::WAL_LOG_CREATION_FAILED)
}
pub fn cleanup(&mut self) -> Result<(), Errors> {
let path = format!("{0}/{1}", self.options.path, WRITE_AHEAD_LOG_NAME);
let delete_wal_result = remove_file(&path);
if delete_wal_result.is_err() {
return Err(Errors::WAL_CLEANUP_FAILED);
}
Ok(())
}
pub fn recover<K: ResourceKey, V: ResourceValue>(
options: DharmaOpts,
) -> Result<Vec<(K, V)>, Errors> {
let path = format!("{0}/{1}", options.path, WRITE_AHEAD_LOG_NAME);
let mut reader =
SSTableReader::from(&PathBuf::from(&path), options.block_size_in_bytes).unwrap();
let mut data = Vec::new();
while reader.has_next() {
let value = reader.read();
let record: Value<K, V> = value.to_record::<K, V>().unwrap();
data.push((record.key, record.value));
reader.next();
}
return remove_file(&path)
.and_then(|_| Ok(data))
.map_err(|_| Errors::WAL_BOOTSTRAP_FAILED);
}
}