use crate::error::Error;
use crate::reader::LogReadMemo;
use crate::utils::{bin_to_u32, bin_to_u64, crc, u32_to_bin, u64_to_bin};
use memmap2::{Mmap, MmapMut, MmapOptions};
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::{env, fs};
use tracing::{debug, info, warn};
static MAGIC_NO_WAL: &[u8] = b"HQL_WAL";
static MIN_WAL_SIZE: u32 = 8 * 1024;
#[derive(Debug)]
pub struct WalRecord<'a> {
log_id: u64,
crc: &'a [u8],
data: &'a [u8],
}
impl WalRecord<'_> {
#[inline]
fn len(&self) -> u32 {
debug_assert!(self.data.len() < u32::MAX as usize);
8 + 4 + 4 + self.data.len() as u32
}
}
#[derive(Debug)]
pub struct WalFile {
pub version: u8,
pub wal_no: u64,
pub path: String,
pub id_from: u64,
pub id_until: u64,
pub data_start: Option<u32>,
pub data_end: Option<u32>,
len_max: u32,
mmap: Option<Mmap>,
mmap_mut: Option<MmapMut>,
}
impl Drop for WalFile {
fn drop(&mut self) {
if self.mmap_mut.is_some() {
let mut buf = Vec::with_capacity(32);
self.update_header(&mut buf).unwrap();
self.flush().unwrap();
}
}
}
impl WalFile {
#[tracing::instrument(skip_all)]
fn check_repair_data_integrity(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
debug_assert!(buf.is_empty());
debug_assert!(self.mmap_mut.is_some());
debug_assert!(self.id_from <= self.id_until);
debug_assert_eq!(self.data_start.is_some(), self.data_end.is_some());
info!(
"Starting detailed WalFile integrity check for {}",
self.path
);
let mut offset = self.offset_logs() as u32 + 1;
let mut id_before: Option<u64> = None;
if let Some(start) = self.data_start {
offset = start;
let data_end = self.data_end.unwrap();
loop {
let record = self.read_record_unchecked(offset)?;
if let Some(id_before) = id_before
&& record.log_id != id_before + 1
{
return Err(Error::Integrity(
format!(
"WAL record incorrect ordering, missing Log ID {}",
id_before + 1
)
.into(),
));
}
if record.crc != crc!(record.data) || record.data.is_empty() {
#[cfg(feature = "auto-heal")]
{
match id_before {
Some(id) => {
warn!(
"Bad CRC CHKSUM for WAL ID {}. Trying automatic healing. \
Rolling back latest log ID to {}.",
record.log_id, id
);
self.id_until = id;
self.data_end = Some(offset - 1);
}
None => {
warn!(
"Bad CRC CHKSUM for WAL ID {}. Trying automatic healing. \
The first ID in this log is bad - resetting data block.",
record.log_id
);
self.data_start = None;
self.data_end = None;
self.id_until = self.id_from;
}
};
let mmap = self.mmap_mut.as_mut().unwrap();
u64_to_bin(0, buf)?;
let idx = offset as usize;
(&mut mmap[idx..idx + 8]).write_all(buf)?;
return Ok(());
}
#[cfg(not(feature = "auto-heal"))]
return Err(Error::Integrity("Invalid CRC for WAL Record".into()));
}
id_before = Some(record.log_id);
offset += record.len() + 1;
if offset > data_end || record.log_id == self.id_until {
debug!("Reached data_end in data integrity check");
break;
}
}
}
let mut recovered = 0;
loop {
if offset >= self.len_max + 8 + 4 + 4 {
debug!("Reached the end of the WAL file");
break;
}
if let Ok(record) = self.read_record_unchecked(offset) {
if record.log_id > 0 {
if let Some(id_before) = id_before
&& record.log_id != id_before + 1
{
warn!(
"Mismatch in Log ID in unexpected data for Log ID {} after already \
recovered {} logs - ignoring entry\nexpected Log ID: {} / found: {}\
\nread offset: {}\n{:?}",
record.log_id,
recovered,
id_before + 1,
record.log_id,
offset,
record
);
break;
}
id_before = Some(record.log_id);
if record.crc != crc!(record.data) {
warn!(
"Mismatch in CRC in unexpected data section after already recovered {} \
logs - ignoring entry with log id: {}, expected crc: {:?}, \
actual crc: {:?} - \nread offset: {} / {:?}",
recovered,
record.log_id,
record.crc,
crc!(record.data),
offset,
self
);
break;
}
warn!(
"Found unexpected, valid WAL record with Log ID {}",
record.log_id
);
let record_len = record.len();
self.id_until = record.log_id;
if self.data_start.is_none() {
self.data_start = Some(offset);
}
self.data_end = Some(offset + record_len);
buf.clear();
self.update_header(buf)?;
recovered += 1;
offset += record_len + 1;
if offset >= self.len_max {
debug!("Reached end of file in unexpected data section");
break;
}
} else {
info!("No unexpected additional WAL records found");
break;
}
} else {
info!("No unexpected additional data found in {}", self.path);
break;
}
}
if recovered > 0 {
info!(
"Successfully recovered {} orphaned WAL file records: {:?}",
recovered, self
);
}
Ok(())
}
#[inline]
pub fn has_space(&self, data_len: u32) -> bool {
self.len_max
> self.data_end.unwrap_or_else(|| self.offset_logs() as u32) + 1 + 8 + 4 + 4 + data_len
}
#[inline]
pub fn space_left(&self) -> u32 {
self.len_max - self.data_end.unwrap_or_else(|| self.offset_logs() as u32)
}
#[tracing::instrument(level = "debug", skip_all)]
#[inline]
pub fn append_log(&mut self, id: u64, data: &[u8], buf: &mut Vec<u8>) -> Result<(), Error> {
debug_assert!(buf.is_empty());
debug_assert!(self.mmap_mut.is_some());
debug_assert!(data.len() <= u32::MAX as usize);
debug_assert!(self.has_space(data.len() as u32));
debug_assert_eq!(self.data_start.is_some(), self.data_end.is_some());
let (start, update_header) = if self.data_start.is_none() {
self.id_from = id;
let start = self.offset_logs();
self.data_start = Some(start as u32);
(start, true)
} else {
(self.data_end.unwrap() as usize + 1, false)
};
let mmap = self.mmap_mut.as_mut().unwrap();
let crc = crc!(&data);
(&mut mmap[start + 8..start + 8 + 8]).write_all(crc.as_slice())?;
let len = data.len() as u32;
u32_to_bin(len, buf)?;
(&mut mmap[start + 8 + 4..start + 8 + 4 + 4]).write_all(buf)?;
(&mut mmap[start + 8 + 4 + 4..]).write_all(data)?;
buf.clear();
u64_to_bin(id, buf)?;
(&mut mmap[start..start + 8]).write_all(buf)?;
self.id_until = id;
self.data_end = Some((start + 8 + 4 + 4 + data.len()) as u32);
debug_assert!(self.data_start.is_some());
debug_assert!(self.data_end.is_some());
debug_assert!(self.data_end.unwrap() <= self.len_max);
if update_header {
buf.clear();
self.update_header(buf)?;
}
Ok(())
}
#[inline]
pub fn clone_no_mmap(&self) -> Self {
Self {
version: self.version,
wal_no: self.wal_no,
path: self.path.clone(),
id_from: self.id_from,
id_until: self.id_until,
data_start: self.data_start,
data_end: self.data_end,
len_max: self.len_max,
mmap: None,
mmap_mut: None,
}
}
#[inline]
pub fn clone_from_no_mmap(&mut self, other: &Self) {
debug_assert_eq!(self.path, other.path);
self.id_from = other.id_from;
self.id_until = other.id_until;
self.data_start = other.data_start;
self.data_end = other.data_end;
}
#[tracing::instrument(level = "debug", skip_all)]
#[inline]
pub fn read_logs(
&self,
id_from: u64,
id_until: u64,
memo: &mut Option<LogReadMemo>,
buf: &mut Vec<(u64, Vec<u8>)>,
) -> Result<u32, Error> {
debug_assert!(buf.is_empty());
debug_assert!(id_until >= id_from);
debug_assert!(self.data_start.is_some());
debug_assert!(self.data_end.is_some());
if self.id_from > id_from {
return Err(Error::Generic(
format!(
"`id_from` is below threshold - id_from: {id_from} / self.id_from: {}",
self.id_from
)
.into(),
));
}
if self.id_until < id_until {
return Err(Error::Generic("`id_until` is above threshold".into()));
}
if id_until < id_from {
return Err(Error::Generic(
"`id_until` cannot be smaller than `id_from`".into(),
));
}
let data_start = self.data_start.unwrap();
let data_end = self.data_end.unwrap();
let mut idx = data_start;
let mut offset = 0;
if let Some(memo) = memo
&& memo.last_wal_no == self.wal_no
&& memo.last_log_id < id_from
{
idx = memo.data_end + 1;
debug!("LogReadMemo match, shifting start idx to: {}", idx);
}
loop {
let record = self.read_record_unchecked(idx)?;
if record.log_id == id_from {
offset = idx;
}
if record.log_id >= id_from {
if record.log_id <= id_until {
debug_assert!(idx + record.len() <= data_end);
if record.crc != crc!(record.data) {
return Err(Error::Integrity("Invalid CRC for WAL Record".into()));
}
buf.push((record.log_id, record.data.to_vec()))
} else {
break;
}
}
if record.log_id >= id_until {
debug_assert!(record.data.len() < u32::MAX as usize);
match self.version {
1 => {
*memo = Some(LogReadMemo {
last_wal_no: self.wal_no,
last_log_id: record.log_id,
data_end: idx + 8 + 4 + 4 + record.data.len() as u32,
});
}
_ => unreachable!(),
}
break;
}
idx += record.len() + 1;
debug_assert!(idx - 1 <= data_end);
}
debug_assert_eq!(
id_until + 1 - id_from,
buf.len() as u64,
"expected id_from {id_from} until {id_until}: {:?}",
self
);
Ok(offset)
}
#[inline(always)]
fn read_record_unchecked(&self, offset: u32) -> Result<WalRecord<'_>, Error> {
debug_assert!(offset + 8 + 4 + 4 < self.len_max);
let head = self.read_bytes(offset, offset + 8 + 4 + 4)?;
let log_id = bin_to_u64(&head[..8])?;
let crc = &head[8..12];
let data_from = offset + 8 + 4 + 4;
let data_len = bin_to_u32(&head[12..16])?;
if data_len == 0 {
return Err(Error::Integrity(
format!("Attempt to read non-existent data of length 0\n{self:?}").into(),
));
}
let data = self.read_bytes(data_from, data_from + data_len)?;
Ok(WalRecord { log_id, crc, data })
}
#[inline(always)]
fn read_bytes(&self, from: u32, until: u32) -> Result<&[u8], Error> {
debug_assert!(from < until, "from < until -> {from} < {until}");
if let Some(mmap) = &self.mmap {
Ok(&mmap[from as usize..until as usize])
} else if let Some(mmap) = &self.mmap_mut {
Ok(&mmap[from as usize..until as usize])
} else {
Err(Error::Generic("No mmap exists".into()))
}
}
#[inline]
pub fn flush(&mut self) -> Result<(), Error> {
debug_assert!(self.mmap_mut.is_some());
self.mmap_mut.as_mut().unwrap().flush()?;
Ok(())
}
#[inline]
pub fn flush_async(&mut self) -> Result<(), Error> {
debug_assert!(self.mmap_mut.is_some());
self.mmap_mut.as_mut().unwrap().flush_async()?;
Ok(())
}
#[inline]
pub fn update_header(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
debug_assert!(self.mmap_mut.is_some());
debug_assert!(buf.is_empty());
self.build_header(buf)?;
let mmap = self.mmap_mut.as_mut().unwrap();
(&mut mmap[..buf.len()]).write_all(buf)?;
Ok(())
}
#[inline]
pub fn mmap(&mut self) -> Result<(), Error> {
if self.mmap.is_some() {
return Ok(());
}
let file = OpenOptions::new()
.read(true)
.write(false)
.create(false)
.open(&self.path)?;
let mmap = unsafe { MmapOptions::new().populate().map(&file)? };
#[cfg(unix)]
mmap.advise(memmap2::Advice::Sequential)?;
self.mmap = Some(mmap);
Ok(())
}
#[inline]
pub fn mmap_drop(&mut self) {
self.mmap = None;
}
#[inline]
pub fn mmap_mut(&mut self) -> Result<(), Error> {
if self.mmap_mut.is_some() {
return Ok(());
}
let file = OpenOptions::new()
.read(true)
.write(true)
.create(false)
.open(&self.path)?;
let mmap = unsafe { MmapOptions::new().populate().map_mut(&file)? };
#[cfg(unix)]
mmap.advise(memmap2::Advice::Sequential)?;
self.mmap_mut = Some(mmap);
Ok(())
}
#[inline]
pub fn new(
wal_no: u64,
base_path: &str,
id_from: u64,
id_until: u64,
wal_size: u32,
) -> Result<Self, Error> {
debug_assert!(!base_path.is_empty());
debug_assert!(id_from <= id_until);
debug_assert!(wal_size >= MIN_WAL_SIZE);
if wal_size < MIN_WAL_SIZE {
return Err(Error::Generic(
format!("min allowed `wal_size` is {MIN_WAL_SIZE}").into(),
));
}
let path = Self::build_full_path(base_path, wal_no);
Ok(Self {
wal_no,
path,
version: 1,
id_from,
id_until,
data_start: None,
data_end: None,
len_max: wal_size,
mmap: None,
mmap_mut: None,
})
}
#[inline]
#[tracing::instrument(skip_all)]
pub fn create_file(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
debug_assert!(buf.is_empty());
let file = File::create_new(&self.path)?;
file.set_len(self.len_max as u64)?;
self.build_header(buf)?;
let mut mmap = unsafe { MmapOptions::new().map_mut(&file)? };
(&mut mmap[..buf.len()]).write_all(buf)?;
mmap.flush_async()?;
Ok(())
}
#[tracing::instrument(level = "debug", skip_all)]
#[inline]
pub fn read_from_file(path_full: String) -> Result<Self, Error> {
let Some((_, fname)) = path_full.rsplit_once('/') else {
return Err(Error::InvalidPath("Invalid file path"));
};
let Some((num, ending)) = fname.split_once('.') else {
return Err(Error::InvalidPath("Invalid file path"));
};
if ending != "wal" {
return Err(Error::InvalidFileName);
}
let wal_no = num.parse::<u64>()?;
let mut buf = vec![0; 32];
let mut file = File::open(&path_full)?;
let len_max = file.metadata()?.len();
debug_assert!(len_max < u32::MAX as u64);
file.read_exact(&mut buf)?;
if buf[..7].iter().as_slice() != MAGIC_NO_WAL {
return Err(Error::FileCorrupted("Invalid WAL file magic number".into()));
}
if buf[7..8] != [1u8] {
return Err(Error::FileCorrupted("Invalid WAL file version".into()));
}
let id_from = bin_to_u64(&buf[8..16])?;
let id_until = bin_to_u64(&buf[16..24])?;
let data_start = bin_to_u32(&buf[24..28])?;
let data_end = bin_to_u32(&buf[28..32])?;
Ok(Self {
version: 1,
wal_no,
path: path_full,
id_from,
id_until,
data_start: if data_start == 0 {
None
} else {
Some(data_start)
},
data_end: if data_end == 0 { None } else { Some(data_end) },
len_max: len_max as u32,
mmap: None,
mmap_mut: None,
})
}
#[inline]
fn build_header(&self, buf: &mut Vec<u8>) -> Result<(), Error> {
debug_assert!(buf.is_empty());
buf.extend_from_slice(MAGIC_NO_WAL);
match self.version {
1 => {
buf.push(self.version);
u64_to_bin(self.id_from, buf)?;
u64_to_bin(self.id_until, buf)?;
u32_to_bin(self.data_start.unwrap_or(0), buf)?;
u32_to_bin(self.data_end.unwrap_or(0), buf)?;
}
_ => unreachable!(),
}
Ok(())
}
#[inline]
pub fn offset_logs(&self) -> usize {
match self.version {
1 => 7 + 1 + 8 + 8 + 4 + 4,
_ => unreachable!(),
}
}
#[inline]
fn build_full_path(base_path: &str, wal_no: u64) -> String {
debug_assert!(!base_path.is_empty());
let wal_no_str = wal_no.to_string();
let zeros = "000000000000000";
debug_assert_eq!(zeros.len(), 15);
let path = format!(
"{}/{}{}.wal",
base_path,
&zeros[..16 - wal_no_str.len()],
wal_no_str
);
#[cfg(debug_assertions)]
{
let (base, name) = path.rsplit_once('/').unwrap();
debug_assert_eq!(base, base_path);
let name = name.strip_suffix(".wal").unwrap();
debug_assert_eq!(name.len(), 16);
}
path
}
}
#[derive(Debug)]
pub struct WalFileSet {
pub active: Option<usize>,
pub base_path: String,
pub files: VecDeque<WalFile>,
}
impl WalFileSet {
#[inline]
pub fn active(&mut self) -> &mut WalFile {
debug_assert!(self.active.is_some());
debug_assert!(
self.files.len() > self.active.unwrap(),
"self.files.len(): {}, self.active: {}",
self.files.len(),
self.active.unwrap()
);
self.files.get_mut(self.active.unwrap()).unwrap()
}
#[tracing::instrument(level = "debug", skip_all)]
#[inline]
pub fn add_file(&mut self, wal_size: u32, buf: &mut Vec<u8>) -> Result<&WalFile, Error> {
let wal_no = self.files.back().map(|w| w.wal_no + 1).unwrap_or(1);
let mut wal = WalFile::new(wal_no, &self.base_path, 0, 0, wal_size)?;
wal.create_file(buf)?;
self.files.push_back(wal);
if wal_no == 1 {
self.active = Some(0);
}
Ok(self.files.back().unwrap())
}
#[tracing::instrument(skip_all)]
pub fn check_integrity(
&mut self,
buf: &mut Vec<u8>,
wal_deep_integrity_check: bool,
) -> Result<(), Error> {
if self.files.is_empty() {
return Ok(());
}
let mut iter = self.files.iter();
let first = iter.next().unwrap();
let mut wal_no = first.wal_no;
let mut until = first.id_until;
if first.id_from > until {
return Err(Error::FileCorrupted(
"`id_from` cannot be greater than `id_until`".into(),
));
}
for wal in iter {
if wal.data_end.unwrap_or(0) > wal.len_max {
return Err(Error::Integrity(
"WAL data offset bigger than file size".into(),
));
}
if wal_no + 1 != wal.wal_no {
return Err(Error::Integrity(
format!("Missing wal file no {}", wal_no + 1).into(),
));
}
if wal.id_from == 0 && wal.id_until == 0 {
break;
}
if wal.data_start.is_some() != wal.data_end.is_some() {
return Err(Error::Integrity(
"Invalid `data_start` / `data_end` values for WAL with data".into(),
));
}
if let Some(data_start) = wal.data_start {
debug_assert!(wal.data_end.is_some());
let data_end = wal.data_end.unwrap();
if data_start < wal.offset_logs() as u32 {
return Err(Error::Integrity(
"`data_start` cannot be smaller than header offset".into(),
));
}
if data_start == data_end || data_start + 8 + 4 + 4 > data_end {
return Err(Error::Integrity(
"`data_start` does not match `data_end` - data cannot fit".into(),
));
}
}
if until + 1 != wal.id_from {
return Err(Error::Integrity(
format!("Missing logs between IDs {} and {}", until + 1, wal.id_from).into(),
));
}
wal_no = wal.wal_no;
until = wal.id_until;
}
let check = env::var("HQL_CHECK_WAL_INTEGRITY")
.as_deref()
.unwrap_or("false")
.parse::<bool>()
.expect("Cannot parse HQL_CHECK_WAL_INTEGRITY as bool");
if wal_deep_integrity_check || check {
let active = self.active();
if active.mmap_mut.is_none() {
active.mmap_mut()?;
}
active.check_repair_data_integrity(buf)?;
}
Ok(())
}
#[inline]
pub fn clone_no_map(&self) -> Self {
let mut slf = Self {
active: self.active,
base_path: self.base_path.clone(),
files: VecDeque::with_capacity(self.files.len()),
};
for file in &self.files {
slf.files.push_back(file.clone_no_mmap());
}
slf
}
#[inline]
pub fn clone_files_from_no_mmap(&mut self, other: &VecDeque<WalFile>) {
self.files
.retain(|f| other.iter().any(|upd| upd.wal_no == f.wal_no));
self.files.iter_mut().enumerate().for_each(|(i, f)| {
let upd = &other[i];
debug_assert_eq!(f.wal_no, upd.wal_no);
f.id_from = upd.id_from;
f.id_until = upd.id_until;
f.data_start = upd.data_start;
f.data_end = upd.data_end;
});
for upd in other.iter().skip(self.files.len()) {
self.files.push_back(upd.clone_no_mmap());
}
}
#[tracing::instrument(level = "debug", skip_all)]
pub fn read(base_path: String, wal_size: u32) -> Result<WalFileSet, Error> {
let mut file_names = Vec::with_capacity(2);
for entry in fs::read_dir(&base_path)? {
let entry = entry?.file_name();
let fname = entry.to_str().unwrap_or_default();
if fname.ends_with(".wal") {
file_names.push(fname.to_string());
}
}
file_names.sort();
let mut files = VecDeque::with_capacity(file_names.len());
for name in file_names {
let path_full = format!("{base_path}/{name}");
if let Ok(wal) = WalFile::read_from_file(path_full) {
files.push_back(wal);
}
}
let active = if files.is_empty() {
let mut wal = WalFile::new(1, &base_path, 0, 0, wal_size)?;
let mut buf = Vec::with_capacity(28);
wal.create_file(&mut buf)?;
files.push_back(wal);
0
} else {
files.len() - 1
};
Ok(Self {
active: Some(active),
base_path,
files,
})
}
#[tracing::instrument(level = "debug", skip_all)]
#[inline]
pub fn roll_over(&mut self, wal_size: u32, buf: &mut Vec<u8>) -> Result<(), Error> {
debug_assert!(buf.is_empty());
debug_assert!(!self.files.is_empty());
debug_assert!(self.files.back().unwrap().mmap_mut.is_some());
let last_id = {
let active = self.active();
active.update_header(buf)?;
active.flush_async()?;
active.mmap_mut = None;
active.id_until
};
buf.clear();
self.add_file(wal_size, buf)?;
self.active = Some(self.files.len() - 1);
let active = self.active();
active.mmap_mut()?;
active.id_from = last_id + 1;
active.id_until = last_id + 1;
Ok(())
}
#[tracing::instrument(level = "debug", skip_all)]
#[inline]
pub fn shift_delete_logs(
&mut self,
id_from: u64,
id_until: u64,
wal_size: u32,
buf: &mut Vec<u8>,
buf_logs: &mut Vec<(u64, Vec<u8>)>,
) -> Result<(), Error> {
debug_assert!(buf.is_empty());
debug_assert!(buf_logs.is_empty());
debug_assert!(!self.files.is_empty());
let purge_front = Some(id_from) <= self.files.front().map(|f| f.id_from);
let truncate_back = Some(id_until) >= self.files.back().map(|f| f.id_until);
let mut memo: Option<LogReadMemo> = None;
if purge_front {
while !self.files.is_empty() && self.files.front().unwrap().id_until < id_until {
let file = self.files.pop_front().unwrap();
fs::remove_file(&file.path)?;
}
if self.files.is_empty() {
self.add_file(wal_size, buf)?;
self.files.front_mut().unwrap().mmap_mut()?;
}
let front = self.files.front_mut().unwrap();
if front.id_from < id_until && front.data_end.is_some() {
debug_assert!(
front.id_until >= id_until,
"id_until: {id_until}, front: {front:?}"
);
if front.mmap_mut.is_none() {
front.mmap_mut()?;
}
let offset = front.read_logs(id_until, id_until, &mut memo, buf_logs)?;
front.id_from = id_until;
front.data_start = Some(offset);
}
} else if truncate_back {
while !self.files.is_empty() && self.files.back().unwrap().id_from > id_from {
let file = self.files.pop_back().unwrap();
fs::remove_file(&file.path)?;
}
if self.files.is_empty() {
self.add_file(wal_size, buf)?;
self.files.front_mut().unwrap().mmap_mut()?;
}
let back = self.files.back_mut().unwrap();
if back.id_from == id_from {
back.id_until = id_from;
back.data_start = None;
back.data_end = None;
} else if back.id_until >= id_from {
if back.mmap_mut.is_none() {
back.mmap_mut()?;
}
let offset = back.read_logs(id_from, id_from, &mut memo, buf_logs)?;
back.id_until = id_from - 1;
back.data_end = Some(offset - 1);
}
}
self.active = Some(self.files.len() - 1);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
static PATH: &str = "test_data";
static MB2: u32 = 2 * 1024 * 1024;
#[test]
fn append_read_logs() -> Result<(), Error> {
let base_path = format!("{}/append_logs", PATH);
let _ = fs::remove_dir_all(&base_path);
fs::create_dir_all(&base_path)?;
let mut memo: Option<LogReadMemo> = None;
let mut buf = Vec::with_capacity(32);
let mut wal = WalFile::new(1, &base_path, 0, 0, MB2).unwrap();
wal.create_file(&mut buf)?;
wal.mmap_mut()?;
let d1 = b"Hello World".as_slice();
let d2 = b"I am Batman!".as_slice();
let d3 = b"... and not the Joker!".as_slice();
assert!(wal.data_start.is_none());
assert!(wal.data_end.is_none());
assert!(wal.has_space(d1.len() as u32));
buf.clear();
wal.append_log(1, d1, &mut buf)?;
let start = wal.data_start.unwrap();
assert_eq!(start, wal.offset_logs() as u32);
let end = start + 8 + 4 + 4 + d1.len() as u32;
assert_eq!(wal.data_end.unwrap(), end);
assert_eq!(wal.id_from, 1);
assert_eq!(wal.id_until, 1);
assert!(wal.has_space(d2.len() as u32));
buf.clear();
wal.append_log(2, d2, &mut buf)?;
assert_eq!(wal.data_start.unwrap(), start);
let end = end + 1 + 8 + 4 + 4 + d2.len() as u32;
assert_eq!(wal.data_end.unwrap(), end);
assert_eq!(wal.id_from, 1);
assert_eq!(wal.id_until, 2);
assert!(wal.has_space(d3.len() as u32));
buf.clear();
wal.append_log(3, d3, &mut buf)?;
assert_eq!(wal.data_start.unwrap(), start);
let end = end + 1 + 8 + 4 + 4 + d3.len() as u32;
assert_eq!(wal.data_end.unwrap(), end);
assert_eq!(wal.id_from, 1);
assert_eq!(wal.id_until, 3);
buf.clear();
wal.update_header(&mut buf)?;
wal.flush()?;
let mut wal_disk = WalFile::read_from_file(wal.path.clone())?;
wal_disk.mmap()?;
assert_eq!(wal_disk.data_start.unwrap(), start);
assert_eq!(wal_disk.data_end.unwrap(), end);
assert_eq!(wal_disk.id_from, 1);
assert_eq!(wal_disk.id_until, 3);
let mut logs = Vec::with_capacity(3);
buf.clear();
wal_disk.read_logs(1, 3, &mut memo, &mut logs)?;
assert_eq!(logs.len(), 3);
let (id, data) = logs.get(0).unwrap();
assert_eq!(id, &1);
assert_eq!(data, d1);
let (id, data) = logs.get(1).unwrap();
assert_eq!(id, &2);
assert_eq!(data, d2);
let (id, data) = logs.get(2).unwrap();
assert_eq!(id, &3);
assert_eq!(data, d3);
buf.clear();
logs.clear();
wal_disk.read_logs(2, 3, &mut memo, &mut logs)?;
assert_eq!(logs.len(), 2);
let (id, data) = logs.get(0).unwrap();
assert_eq!(id, &2);
assert_eq!(data, d2);
let (id, data) = logs.get(1).unwrap();
assert_eq!(id, &3);
assert_eq!(data, d3);
buf.clear();
logs.clear();
wal_disk.read_logs(2, 2, &mut memo, &mut logs)?;
assert_eq!(logs.len(), 1);
let (id, data) = logs.get(0).unwrap();
assert_eq!(id, &2);
assert_eq!(data, d2);
buf.clear();
logs.clear();
assert!(wal_disk.read_logs(1, 4, &mut memo, &mut logs).is_err());
assert!(wal_disk.read_logs(0, 2, &mut memo, &mut logs).is_err());
Ok(())
}
#[test]
fn convert_wal_header() -> Result<(), Error> {
let base_path = format!("{}/convert_wal_header", PATH);
let _ = fs::remove_dir_all(&base_path);
fs::create_dir_all(&base_path)?;
let path_with_no = format!("{base_path}/0000000000000001.wal");
let _ = fs::remove_file(&path_with_no);
let mut wal = WalFile::new(1, &base_path, 23, 1337, MB2).unwrap();
assert!(wal.data_start.is_none());
assert!(wal.data_end.is_none());
let mut buf = Vec::with_capacity(28);
wal.create_file(&mut buf)?;
assert!(wal.data_start.is_none());
assert!(wal.data_end.is_none());
let wal_disk = WalFile::read_from_file(path_with_no.clone())?;
assert_eq!(wal.version, wal_disk.version);
assert_eq!(wal.path, wal_disk.path);
assert_eq!(wal.wal_no, wal_disk.wal_no);
assert_eq!(wal.id_from, wal_disk.id_from);
assert_eq!(wal.id_until, wal_disk.id_until);
assert_eq!(wal.data_start, wal_disk.data_start);
assert_eq!(wal.data_end, wal_disk.data_end);
assert_eq!(wal.len_max, wal_disk.len_max);
let path_h1 = format!("{}/0000000000000001.wal", base_path);
let path_h2 = format!("{}/0000000000000002.wal", base_path);
let _ = fs::remove_file(&path_with_no);
let _ = fs::remove_file(&path_h1);
let _ = fs::remove_file(&path_h2);
let mut set = WalFileSet {
active: None,
base_path,
files: Default::default(),
};
buf.clear();
set.add_file(MB2, &mut buf).unwrap();
assert_eq!(fs::exists(&path_h1)?, true);
assert_eq!(fs::exists(&path_h2)?, false);
buf.clear();
set.add_file(MB2, &mut buf).unwrap();
assert_eq!(fs::exists(&path_h2)?, true);
Ok(())
}
#[test]
fn integrity_check() -> Result<(), Error> {
let mut buf = Vec::new();
let base_path = format!("{}/integrity_check", PATH);
let _ = fs::remove_dir_all(&base_path);
fs::create_dir_all(&base_path)?;
let mut files = VecDeque::with_capacity(4);
files.push_back(WalFile {
version: 1,
wal_no: 1,
path: "".to_string(),
id_from: 1,
id_until: 10,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
files.push_back(WalFile {
version: 1,
wal_no: 2,
path: "".to_string(),
id_from: 11,
id_until: 17,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
files.push_back(WalFile {
version: 1,
wal_no: 3,
path: "".to_string(),
id_from: 18,
id_until: 33,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
let mut set = WalFileSet {
active: Some(2),
base_path: base_path.clone(),
files,
};
set.check_integrity(&mut buf, false).unwrap();
assert_eq!(set.active().wal_no, 3);
let mut buf = Vec::with_capacity(28);
set.add_file(MB2, &mut buf)?;
set.check_integrity(&mut buf, false).unwrap();
assert_eq!(set.active().wal_no, 3);
let mut files = VecDeque::with_capacity(4);
files.push_back(WalFile {
version: 1,
wal_no: 1,
path: "".to_string(),
id_from: 1,
id_until: 10,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
files.push_back(WalFile {
version: 1,
wal_no: 3,
path: "".to_string(),
id_from: 18,
id_until: 33,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
let mut set = WalFileSet {
active: Some(1),
base_path: base_path.clone(),
files,
};
assert!(set.check_integrity(&mut buf, false).is_err());
let mut files = VecDeque::with_capacity(4);
files.push_back(WalFile {
version: 1,
wal_no: 1,
path: "".to_string(),
id_from: 1,
id_until: 10,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
files.push_back(WalFile {
version: 1,
wal_no: 2,
path: "".to_string(),
id_from: 11,
id_until: 17,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
files.push_back(WalFile {
version: 1,
wal_no: 4,
path: "".to_string(),
id_from: 18,
id_until: 33,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
let mut set = WalFileSet {
active: Some(2),
base_path: base_path.clone(),
files,
};
assert!(set.check_integrity(&mut buf, false).is_err());
let mut files = VecDeque::with_capacity(4);
files.push_back(WalFile {
version: 1,
wal_no: 1,
path: "".to_string(),
id_from: 1,
id_until: 10,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
files.push_back(WalFile {
version: 1,
wal_no: 2,
path: "".to_string(),
id_from: 11,
id_until: 17,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
files.push_back(WalFile {
version: 1,
wal_no: 3,
path: "".to_string(),
id_from: 19,
id_until: 33,
data_start: Some(32),
data_end: Some(64),
len_max: MB2,
mmap: None,
mmap_mut: None,
});
let mut set = WalFileSet {
active: Some(2),
base_path: base_path.clone(),
files,
};
assert!(set.check_integrity(&mut buf, false).is_err());
Ok(())
}
#[test]
fn roll_over_purge_front() -> Result<(), Error> {
let base_path = format!("{}/roll_over_purge_front", PATH);
let _ = fs::remove_dir_all(&base_path);
fs::create_dir_all(&base_path)?;
let mut buf = Vec::with_capacity(8);
let mut wal = WalFileSet {
active: None,
base_path,
files: Default::default(),
};
wal.add_file(MB2, &mut buf)?;
wal.active().mmap_mut()?;
let d1 = b"Hello World".as_slice();
let d2 = b"I am Batman!".as_slice();
let d3 = b"... and not the Joker!".as_slice();
let d4 = b"I like Harley Quinn".as_slice();
let d5 = b"... a lot".as_slice();
buf.clear();
wal.active().append_log(1, d1, &mut buf)?;
buf.clear();
wal.active().append_log(2, d2, &mut buf)?;
assert_eq!(wal.files.len(), 1);
assert_eq!(wal.active().wal_no, 1);
buf.clear();
wal.roll_over(MB2, &mut buf)?;
assert_eq!(wal.files.len(), 2);
assert_eq!(wal.active().wal_no, 2);
assert!(wal.files.front().unwrap().mmap_mut.is_none());
assert!(wal.active().mmap_mut.is_some());
buf.clear();
wal.active().append_log(3, d3, &mut buf)?;
buf.clear();
wal.active().append_log(4, d4, &mut buf)?;
buf.clear();
wal.active().append_log(5, d5, &mut buf)?;
let mut buf_logs = Vec::with_capacity(1);
buf.clear();
wal.shift_delete_logs(0, 1, MB2, &mut buf, &mut buf_logs)?;
let front = wal.files.front().unwrap();
assert_eq!(front.id_from, 1);
assert_eq!(front.data_start.unwrap() as usize, front.offset_logs());
buf.clear();
buf_logs.clear();
wal.shift_delete_logs(0, 2, MB2, &mut buf, &mut buf_logs)?;
let front = wal.files.front().unwrap();
assert_eq!(front.id_from, 2);
let start = 8 + 4 + 4 + d1.len() + front.offset_logs() + 1;
assert_eq!(front.data_start.unwrap() as usize, start);
buf.clear();
buf_logs.clear();
wal.shift_delete_logs(0, 3, MB2, &mut buf, &mut buf_logs)?;
assert_eq!(wal.files.len(), 1);
let front = wal.files.front().unwrap();
assert_eq!(front.id_from, 3);
assert_eq!(front.data_start.unwrap() as usize, front.offset_logs());
Ok(())
}
#[test]
fn roll_over_truncate_end() -> Result<(), Error> {
let base_path = format!("{}/roll_over_truncate_end", PATH);
let _ = fs::remove_dir_all(&base_path);
fs::create_dir_all(&base_path)?;
let mut buf = Vec::with_capacity(8);
let mut wal = WalFileSet {
active: None,
base_path,
files: Default::default(),
};
wal.add_file(MB2, &mut buf)?;
wal.active().mmap_mut()?;
let d1 = b"Hello World".as_slice();
let d2 = b"I am Batman!".as_slice();
let d3 = b"... and not the Joker!".as_slice();
let d4 = b"I like Harley Quinn".as_slice();
let d5 = b"... a lot".as_slice();
buf.clear();
wal.active().append_log(1, d1, &mut buf)?;
buf.clear();
wal.active().append_log(2, d2, &mut buf)?;
assert_eq!(wal.files.len(), 1);
assert_eq!(wal.active().wal_no, 1);
buf.clear();
wal.roll_over(MB2, &mut buf)?;
assert_eq!(wal.files.len(), 2);
assert_eq!(wal.active().wal_no, 2);
assert!(wal.files.front().unwrap().mmap_mut.is_none());
assert!(wal.active().mmap_mut.is_some());
buf.clear();
wal.active().append_log(3, d3, &mut buf)?;
buf.clear();
wal.active().append_log(4, d4, &mut buf)?;
buf.clear();
wal.active().append_log(5, d5, &mut buf)?;
let mut buf_logs = Vec::with_capacity(1);
buf.clear();
wal.shift_delete_logs(6, u64::MAX, MB2, &mut buf, &mut buf_logs)?;
assert_eq!(wal.files.front().unwrap().id_from, 1);
let back = wal.files.back().unwrap();
assert_eq!(back.id_from, 3);
assert_eq!(back.id_until, 5);
let data_end = back.offset_logs() + 16 + d3.len() + 1 + 16 + d4.len() + 1 + 16 + d5.len();
assert_eq!(back.data_end, Some(data_end as u32));
buf.clear();
buf_logs.clear();
wal.shift_delete_logs(5, u64::MAX, MB2, &mut buf, &mut buf_logs)?;
assert_eq!(wal.files.front().unwrap().id_from, 1);
let back = wal.files.back().unwrap();
assert_eq!(back.id_from, 3);
assert_eq!(back.id_until, 4);
let data_end = back.offset_logs() + 16 + d3.len() + 1 + 16 + d4.len();
assert_eq!(back.data_end, Some(data_end as u32));
buf.clear();
buf_logs.clear();
wal.shift_delete_logs(4, u64::MAX, MB2, &mut buf, &mut buf_logs)?;
assert_eq!(wal.files.front().unwrap().id_from, 1);
let back = wal.files.back().unwrap();
assert_eq!(back.id_from, 3);
assert_eq!(back.id_until, 3);
let data_end = back.offset_logs() + 16 + d3.len();
assert_eq!(back.data_end, Some(data_end as u32));
buf.clear();
buf_logs.clear();
wal.shift_delete_logs(3, u64::MAX, MB2, &mut buf, &mut buf_logs)?;
assert_eq!(wal.files.front().unwrap().id_from, 1);
let back = wal.files.back().unwrap();
assert_eq!(back.id_from, 3);
assert_eq!(back.id_until, 3);
assert_eq!(back.data_start, None);
assert_eq!(back.data_end, None);
buf.clear();
buf_logs.clear();
wal.shift_delete_logs(2, u64::MAX, MB2, &mut buf, &mut buf_logs)?;
assert_eq!(wal.files.len(), 1);
assert_eq!(wal.files.front().unwrap().id_from, 1);
let back = wal.files.back().unwrap();
assert_eq!(back.id_from, 1);
assert_eq!(back.id_until, 1);
let data_end = back.offset_logs() + 16 + d1.len();
assert_eq!(back.data_end, Some(data_end as u32));
buf.clear();
buf_logs.clear();
wal.shift_delete_logs(1, u64::MAX, MB2, &mut buf, &mut buf_logs)?;
assert_eq!(wal.files.len(), 1);
let back = wal.files.back().unwrap();
assert_eq!(back.data_start, None);
assert_eq!(back.data_end, None);
Ok(())
}
}