use std::{
fs::{File, create_dir_all, read_dir, remove_file},
io::{BufReader, Read, Seek, Write},
path::PathBuf,
};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use crc::Crc;
use tracing::instrument;
use crate::database::{
config::wal_config::{WALConfig, WALVariant},
wal::{MAGIC_NUMBER, WAL, WALIterator, errors::WALError},
};
fn eight_align_addition(value: u64) -> u64 {
if value % 8 == 0 {
return 0;
}
return 8 - value % 8;
}
pub struct DefaultWAL {
pub config: WALConfig,
active_log: Option<File>,
counter: u64,
curr_offset: u64, last_rotation_offset: u64, crc_computer: Crc<u32>,
}
impl DefaultWAL {
#[instrument(name = "Default WAL new", skip(config))]
pub fn new(config: &WALConfig) -> Result<Self, WALError> {
assert_eq!(config.variant, WALVariant::Default);
if !config.wal_dir.exists() {
create_dir_all(&config.wal_dir)?;
}
let mut wal = Self {
config: config.clone(),
active_log: None,
counter: 0,
curr_offset: 0,
last_rotation_offset: 0,
crc_computer: Crc::<u32>::new(&crc::CRC_32_CKSUM),
};
let files = wal.get_all_files()?;
let (current_offset, active_log_file) = if files.len() == 0 {
let new_file_path = config.wal_dir.join("0.wal");
let new_file = File::options()
.create(true)
.append(true)
.open(new_file_path)?;
(0, new_file)
} else {
let active_file = File::options().read(true).open(&files[0].1)?;
let mut curr_offset = files[0].0;
let wal_iterator = DefaultWALIterator::new(
files.iter().skip(1).map(|item| item.1.clone()).collect(),
Some(BufReader::new(active_file)),
wal.crc_computer.clone(),
config.wal_max_payload_len_in_bytes,
);
for log in wal_iterator {
match log {
Ok(v) => curr_offset = v.0 + Self::get_entry_size(v.1.len() as u64),
Err(e) => {
eprintln!(
"Trimming the wal at {} because of error {:?}",
curr_offset, e
);
break;
}
}
}
let file_index = match files.binary_search_by(|item| {
if item.0 < curr_offset {
std::cmp::Ordering::Less
} else if item.0 == curr_offset {
std::cmp::Ordering::Equal
} else {
std::cmp::Ordering::Greater
}
}) {
Ok(target_file_index) => target_file_index,
Err(next_file_index) => {
next_file_index - 1
}
};
if file_index != files.len() - 1 {
eprintln!(
"Can't start DB, corruption found other than last file in WAL file {:?}",
files[file_index].1
);
return Err(WALError::CorruptedEntry(curr_offset));
}
let active_file = File::options().write(true).open(&files[file_index].1)?;
assert!(curr_offset >= files[file_index].0);
let index_inside_file = curr_offset - files[file_index].0;
active_file.set_len(index_inside_file)?;
active_file.sync_all()?;
drop(active_file);
let active_file = File::options().append(true).open(&files[file_index].1)?;
wal.last_rotation_offset = curr_offset; (curr_offset, active_file)
};
wal.active_log = Some(active_log_file);
wal.curr_offset = current_offset;
Ok(wal)
}
pub fn get_entry_size(payload_len: u64) -> u64 {
let data_len = 8 + 4 + 8 + payload_len + 8;
data_len + eight_align_addition(data_len)
}
#[instrument(name = "Default WAL Rotate", skip(self))]
pub fn rotate(&mut self) -> Result<(), WALError> {
let new_log_file_id = format!("{}.wal", self.curr_offset);
let new_log_file_path = self.config.wal_dir.join(&new_log_file_id);
let new_log_file = File::options()
.create_new(true)
.append(true)
.open(new_log_file_path)?;
if let Some(active_log) = self.active_log.take() {
if !matches!(
self.config.wal_sync_variant,
crate::database::config::wal_config::WALSyncVariant::NoSync
) {
active_log.sync_data()?;
}
drop(active_log);
}
if !matches!(
self.config.wal_sync_variant,
crate::database::config::wal_config::WALSyncVariant::NoSync
) {
new_log_file.sync_data()?;
}
self.active_log = Some(new_log_file);
self.last_rotation_offset = self.curr_offset;
Ok(())
}
fn get_all_files(&self) -> Result<Vec<(u64, PathBuf)>, WALError> {
let dir_enteries = read_dir(&self.config.wal_dir)?;
let mut files = vec![];
for dir_entry in dir_enteries {
let dir_entry = dir_entry?;
if dir_entry.path().is_dir() {
continue;
}
let file_path = dir_entry.path();
let stem = file_path
.as_path()
.file_stem()
.ok_or_else(|| WALError::InvalidFileName(file_path.clone()))?
.to_str()
.ok_or_else(|| WALError::InvalidFileName(file_path.clone()))?;
let file_offset = stem
.parse::<u64>()
.map_err(|_e| WALError::InvalidFileName(file_path.clone()))?;
files.push((file_offset, dir_entry.path()));
}
files.sort_by_key(|(offset, _)| *offset);
Ok(files)
}
}
impl WAL for DefaultWAL {
#[instrument(name = "Default WAL Append Log", skip(self))]
fn append_log(&mut self, payload: &[u8]) -> Result<u64, WALError> {
if payload.len() as u64 > self.config.wal_max_payload_len_in_bytes {
return Err(WALError::PayloadLengthOutOfBound(payload.len() as u64));
}
if self.active_log.is_none()
|| self.curr_offset - self.last_rotation_offset > self.config.wal_file_size_in_bytes
{
self.rotate()?;
}
let mut local_buff = Vec::with_capacity(8 + 4 + 8 + payload.len() + 8 + 8); local_buff.write_u64::<BigEndian>(self.curr_offset)?;
local_buff.write_u32::<BigEndian>(self.crc_computer.checksum(&payload))?;
local_buff.write_u64::<BigEndian>(payload.len() as u64)?;
local_buff.write_all(&payload)?;
local_buff.write_u64::<BigEndian>(MAGIC_NUMBER)?;
while local_buff.len() % 8 != 0 {
local_buff.write_u8(0)?;
}
let active_log = self.active_log.as_mut().unwrap();
active_log.write_all(&local_buff)?;
self.curr_offset += local_buff.len() as u64;
match self.config.wal_sync_variant {
crate::database::config::wal_config::WALSyncVariant::NoSync => {}
crate::database::config::wal_config::WALSyncVariant::Always => {
active_log.sync_data()?;
}
crate::database::config::wal_config::WALSyncVariant::GroupSync(group_size) => {
self.counter += 1;
if self.counter >= group_size {
self.counter = 0;
active_log.sync_data()?;
}
}
}
Ok(self.curr_offset)
}
#[instrument(name = "Default WAL Read", skip(self))]
fn read(&self, offset: u64) -> Result<Box<dyn WALIterator>, WALError> {
let files = self.get_all_files()?;
if files.len() == 0 || offset >= self.curr_offset {
return Ok(Box::new(DefaultWALIterator::new(
vec![],
None,
self.crc_computer.clone(),
self.config.wal_max_payload_len_in_bytes,
)));
}
let mut file_index = 0;
while file_index < files.len() && offset >= files[file_index].0 {
file_index += 1;
}
if file_index == 0 {
return Err(WALError::OffsetUnderflow);
}
let offset_inside_file = offset - files[file_index - 1].0;
let files_to_be_included = files
.into_iter()
.skip(file_index - 1) .map(|item| item.1)
.collect::<Vec<PathBuf>>();
let mut active_file = File::options().read(true).open(&files_to_be_included[0])?;
active_file.seek(std::io::SeekFrom::Start(offset_inside_file))?;
return Ok(Box::new(DefaultWALIterator::new(
files_to_be_included,
Some(BufReader::new(active_file)),
self.crc_computer.clone(),
self.config.wal_max_payload_len_in_bytes,
)));
}
fn flush_wal(&mut self, offset: u64) -> Result<(), WALError> {
let max_offset_to_be_flushed = offset - 1;
let files = self.get_all_files()?;
let mut file_index = 0;
while file_index < files.len() && max_offset_to_be_flushed > files[file_index].0 {
file_index += 1;
}
if file_index <= 1 {
return Ok(());
}
for i in 0..file_index - 1 {
remove_file(&files[i].1)?;
}
return Ok(());
}
fn get_offset(&self) -> u64 {
self.curr_offset
}
}
pub struct DefaultWALIterator {
files: Vec<PathBuf>,
active_file: Option<BufReader<File>>,
error: Option<WALError>,
checksum_algo: Crc<u32>,
index: usize,
wal_max_payload_len_in_bytes: u64,
}
impl DefaultWALIterator {
pub fn new(
files: Vec<PathBuf>,
active_file: Option<BufReader<File>>,
checksum_algo: Crc<u32>,
wal_max_payload_len_in_bytes: u64,
) -> Self {
Self {
files,
active_file,
checksum_algo,
index: 0,
error: None,
wal_max_payload_len_in_bytes,
}
}
fn read_record(&mut self) -> Result<Option<(u64, Vec<u8>)>, WALError> {
if self.error.is_some() {
return Err(self.error.clone().unwrap());
}
let active_file = self.active_file.as_mut().unwrap();
let lsn = match active_file.read_u64::<BigEndian>() {
Ok(v) => v,
Err(e) => {
match e.kind() {
std::io::ErrorKind::UnexpectedEof => {
self.index += 1;
if self.index >= self.files.len() {
self.active_file = None;
return Ok(None);
} else {
let next_file =
File::options().read(true).open(&self.files[self.index])?;
self.active_file = Some(BufReader::new(next_file));
}
return self.read_record();
}
_ => {}
}
return Err(e.into());
}
};
let checksum = active_file.read_u32::<BigEndian>()?;
let payload_len = active_file.read_u64::<BigEndian>()?;
if payload_len > self.wal_max_payload_len_in_bytes {
return Err(WALError::PayloadLengthOutOfBound(lsn));
}
let mut payload = vec![0; payload_len as usize];
active_file.read_exact(&mut payload)?;
let magic_number = active_file.read_u64::<BigEndian>()?;
let bytes_read = 8 + 4 + 8 + payload_len + 8;
let pad = eight_align_addition(bytes_read) as i64;
if pad > 0 {
let mut skip = [0u8; 8];
active_file.read_exact(&mut skip[..pad as usize])?;
}
if magic_number != MAGIC_NUMBER || checksum != self.checksum_algo.checksum(&payload) {
let error = WALError::CorruptedEntry(lsn);
return Err(error);
}
return Ok(Some((lsn, payload)));
}
}
impl Iterator for DefaultWALIterator {
type Item = Result<(u64, Vec<u8>), WALError>;
fn next(&mut self) -> Option<Self::Item> {
if self.active_file.is_none() {
return None;
}
if let Some(e) = &self.error {
return Some(Err(e.clone()));
}
let payload = match self.read_record() {
Ok(v) => v,
Err(e) => {
self.error = Some(e.clone());
return Some(Err(e));
}
};
match payload {
Some(v) => Some(Ok(v)),
_ => None,
}
}
}
impl WALIterator for DefaultWALIterator {}