mod entry;
mod file;
#[cfg(feature = "async-std")]
use async_std::{
fs,
path::{Path, PathBuf},
prelude::*,
};
pub use entry::Entry;
pub use file::WalFile;
#[cfg(feature = "tokio")]
use std::path::{Path, PathBuf};
use std::{convert::Infallible, ffi::OsStr, fmt::Display, io};
#[cfg(feature = "tokio")]
use tokio::fs;
#[cfg(test)]
macro_rules! trace {
($s:expr $(, $opt:expr)*) => {
eprintln!(concat!("[{}:{}] ", $s), file!(), line!(), $($opt),*)
};
}
#[cfg(not(test))]
macro_rules! trace {
($s:expr $(, $opt:expr)*) => {
concat!("[{}:{}] ", $s);
};
}
#[derive(Debug)]
pub enum Error<EE: std::error::Error> {
Io(io::Error),
NotADirectory,
NotAFile,
InvalidAck,
InvalidEntry,
InvalidFile,
SizeExceeded,
InvalidAckId {
ack_id: u64,
read_index: u64,
write_file_ack: u64,
},
InvalidIndex,
IncompatibleError,
Entry(EE),
}
impl<EE: std::error::Error> Display for Error<EE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Io(e) => e.fmt(f),
Error::NotADirectory => write!(f, "Not a directory"),
Error::NotAFile => write!(f, "Not a file"),
Error::InvalidAck => write!(f, "Invalid WAL entry (ACK)"),
Error::InvalidEntry => write!(f, "Invalid WAL entry (Entry)"),
Error::InvalidFile => write!(f, "Invalid WAL File"),
Error::SizeExceeded => write!(f, "WAL Size Exceeded"),
Error::InvalidAckId{ ack_id, read_index, write_file_ack } => write!(f, "Invalid Ack Index {ack_id}, current read index: {read_index} write_file_ack: {write_file_ack}"),
Error::InvalidIndex => write!(f, "Invalid Index"),
Error::IncompatibleError => write!(f, "Incompatible error"),
Error::Entry(e) => write!(f, "Entry Error: {e}"),
}
}
}
impl<EE: std::error::Error> std::error::Error for Error<EE> {}
impl<EE: std::error::Error> From<io::Error> for Error<EE> {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}
pub(crate) fn match_error<EE1: std::error::Error, EE2: std::error::Error>(
e: Error<EE1>,
) -> Error<EE2> {
match e {
Error::Io(e) => Error::Io(e),
Error::NotADirectory => Error::NotADirectory,
Error::NotAFile => Error::NotAFile,
Error::InvalidAck => Error::InvalidAck,
Error::InvalidEntry => Error::InvalidEntry,
Error::InvalidFile => Error::InvalidFile,
Error::SizeExceeded => Error::SizeExceeded,
Error::InvalidAckId {
ack_id,
read_index,
write_file_ack,
} => Error::InvalidAckId {
ack_id,
read_index,
write_file_ack,
},
Error::InvalidIndex => Error::InvalidIndex,
Error::IncompatibleError => Error::IncompatibleError,
Error::Entry(_) => Error::IncompatibleError,
}
}
pub type Result<T> = std::result::Result<T, Error<Infallible>>;
pub struct Wal {
dir: PathBuf,
files: Vec<(u64, PathBuf)>,
read_file: Option<WalFile>,
write_file: WalFile,
chunk_size: u64,
max_chunks: usize,
}
impl Wal {
pub async fn open<P>(path: P, chunk_size: u64, max_chunks: usize) -> Result<Self>
where
P: AsRef<Path>,
{
let path = path.as_ref();
let dir = path.to_path_buf();
let m = fs::metadata(path).await?;
if !m.is_dir() {
return Err(Error::NotADirectory);
}
let mut files = Vec::new();
let mut rd = fs::read_dir(path).await?;
while let Some(file) = next_dir_entry(&mut rd).await {
let file = file?.path();
if fs::metadata(&file).await?.is_file() {
let first_idx: u64 = file
.file_name()
.and_then(OsStr::to_str)
.and_then(|s| s.parse().ok())
.ok_or(Error::NotAFile)?;
files.push((first_idx, file))
}
}
files.sort();
if let Some((_, last_file)) = files.last() {
let write_file = WalFile::open(&last_file).await?;
trace!("Opening WRITE file: {:?}", write_file);
let next_idx_to_read = write_file.next_idx_to_read;
let mut wal = Self {
dir,
files,
read_file: None,
write_file,
chunk_size,
max_chunks,
};
wal.seek_to(next_idx_to_read).await?;
Ok(wal)
} else {
let mut file = dir.clone();
file.push(Self::format_file_name(0));
let write_file = WalFile::open(&file).await?;
files.push((0, file));
Ok(Self {
dir,
files,
read_file: None,
write_file,
chunk_size,
max_chunks,
})
}
}
pub async fn push<E>(&mut self, data: E) -> std::result::Result<u64, Error<E::Error>>
where
E: Entry,
{
let idx = self.write_file.push(data).await?;
if self.write_file.size() > self.chunk_size {
trace!(
"Current file exceeds max size with {} > {}",
self.write_file.size(),
self.chunk_size
);
if self.files.len() > self.max_chunks {
return Err(Error::SizeExceeded);
}
let mut path = self.dir.clone();
path.push(Self::format_file_name(self.write_file.next_idx_to_write));
self.files
.push((self.write_file.next_idx_to_write, path.clone()));
let mut next_wal = WalFile::open(path).await.map_err(match_error)?;
next_wal.next_idx_to_read = self.write_file.next_idx_to_read;
next_wal.next_idx_to_write = self.write_file.next_idx_to_write;
next_wal.ack_idx = self.write_file.ack_idx;
next_wal.ack_written = self.write_file.ack_written;
std::mem::swap(&mut next_wal, &mut self.write_file);
self.write_file.preserve_ack().await.map_err(match_error)?;
if self.read_file.is_none() {
self.read_file = Some(next_wal)
}
}
Ok(idx)
}
pub async fn pop<E>(&mut self) -> Result<Option<(u64, E::Output)>>
where
E: Entry,
{
'outer: loop {
if let Some(read) = self.read_file.as_mut() {
trace!("Read file exists: {:?}", read);
if let Some(r) = read.pop::<E>().await.map_err(match_error)? {
trace!(" We found an entry: {}", r.0);
return Ok(Some(r));
}
trace!(" We are exhausted.");
if let Some((_, files)) = self.files.split_last() {
for (idx, path) in files {
trace!(
" testing next file with {} >= {}",
*idx,
read.next_idx_to_read
);
if *idx >= read.next_idx_to_read {
*read = WalFile::open(path).await?;
continue 'outer;
}
}
}
}
break;
}
trace!("read_file => None");
if let Some(rf) = self.read_file.take() {
trace!("read_file.next_idx: {}", rf.next_idx_to_read);
self.write_file.next_idx_to_read = rf.next_idx_to_read;
}
self.write_file.pop::<E>().await.map_err(match_error)
}
pub async fn ack(&mut self, id: u64) -> Result<()> {
trace!("ACKing {}", id);
if self.read_idx() <= id || self.write_file.ack_idx > id {
trace!(
"read_idx: {}, write_file.ack_idx: {}",
self.read_idx(),
self.write_file.ack_idx
);
return Err(Error::InvalidAckId {
ack_id: id,
read_index: self.read_idx(),
write_file_ack: self.write_file.ack_idx,
});
}
self.write_file.ack(id);
let mut files = self.files.iter();
let mut to_delete = None;
let mut cnt = 0;
if let Some(mut this) = files.next() {
trace!(" First file to check: {}", this.0);
loop {
let last = this;
if let Some(next) = files.next() {
this = next;
if this.0 > id {
break;
};
} else {
break;
}
cnt += 1;
to_delete = Some(last.0);
}
}
if let Some(to_delete) = to_delete {
trace!(" Deleting Wal File up to id: {}", to_delete);
let mut files = Vec::with_capacity(self.files.len() - cnt);
std::mem::swap(&mut self.files, &mut files);
let files = files.into_iter();
for (id, f) in files {
if id <= to_delete {
trace!(" Deleting Wal File@{} {:?}", id, f.to_string_lossy());
fs::remove_file(f).await?;
} else {
self.files.push((id, f))
}
}
}
Ok(())
}
pub async fn revert(&mut self) -> Result<()> {
trace!("Reverting to {}", self.write_file.ack_idx + 1);
self.seek_to(self.write_file.ack_idx + 1).await
}
pub async fn close(mut self) -> Result<()> {
self.preserve_ack().await
}
pub async fn preserve_ack(&mut self) -> Result<()> {
self.write_file.preserve_ack().await?;
if let Some(f) = self.read_file.take() {
f.close().await?;
}
Ok(())
}
fn format_file_name(idx: u64) -> String {
format!("{:020}", idx)
}
async fn seek_to(&mut self, idx: u64) -> Result<()> {
trace!("Seeking to: {} in {:?}", idx, self.files);
if let Some((write_idx, _)) = self.files.last() {
if idx >= *write_idx {
trace!("Seeking in write file: {:?}", self.write_file);
self.read_file = None;
return self.write_file.seek_to(idx).await;
}
};
let mut i = self.files.iter().rev().skip_while(|(i, _)| {
trace!(" testing if {} > {} == {}", *i, idx, *i > idx);
*i > idx
});
if let Some((_, f)) = i.next() {
trace!("Seek picked: {} {:?}", idx, f);
let mut read_file = WalFile::open(f).await?;
trace!("Seeking in write read file: {:?}", read_file);
read_file.seek_to(idx).await?;
self.read_file = Some(read_file);
Ok(())
} else {
Err(Error::InvalidIndex)
}
}
fn read_idx(&self) -> u64 {
if let Some(read_file) = &self.read_file {
read_file.next_idx_to_read
} else {
self.write_file.next_idx_to_read
}
}
}
#[cfg(feature = "async-std")]
async fn next_dir_entry(rd: &mut fs::ReadDir) -> Option<io::Result<fs::DirEntry>> {
rd.next().await
}
#[cfg(feature = "tokio")]
async fn next_dir_entry(rd: &mut fs::ReadDir) -> Option<io::Result<fs::DirEntry>> {
rd.next_entry().await.transpose()
}
#[cfg(test)]
mod test {
use super::*;
use tempfile::Builder as TempDirBuilder;
#[cfg_attr(feature = "async-std", async_std::test)]
#[cfg_attr(feature = "tokio", tokio::test)]
async fn wal() -> Result<()> {
let temp_dir = TempDirBuilder::new().prefix("tremor-wal").tempdir()?;
let path = temp_dir.path().to_path_buf();
{
let mut w = Wal::open(&path, 50, 10).await?;
assert_eq!(w.push(b"1".to_vec()).await?, 1);
assert_eq!(w.pop::<Vec<u8>>().await?, Some((1, b"1".to_vec())));
w.close().await?;
}
{
let mut w = Wal::open(&path, 50, 10).await?;
assert_eq!(w.pop::<Vec<u8>>().await?, Some((1, b"1".to_vec())));
w.ack(1).await?;
assert_eq!(w.push(b"22".to_vec()).await?, 2);
assert_eq!(w.pop::<Vec<u8>>().await?, Some((2, b"22".to_vec())));
w.close().await?;
}
{
let mut w = Wal::open(&path, 50, 10).await?;
assert_eq!(w.pop::<Vec<u8>>().await?, Some((2, b"22".to_vec())));
w.revert().await?;
assert_eq!(w.pop::<Vec<u8>>().await?, Some((2, b"22".to_vec())));
w.ack(2).await?;
assert_eq!(w.push(b"333".to_vec()).await?, 3);
assert_eq!(w.pop::<Vec<u8>>().await?, Some((3, b"333".to_vec())));
w.ack(3).await?;
w.close().await?;
}
let mut w = Wal::open(&path, 50, 10).await?;
assert_eq!(w.pop::<Vec<u8>>().await?, None);
temp_dir.close()?;
Ok(())
}
#[cfg_attr(feature = "async-std", async_std::test)]
#[cfg_attr(feature = "tokio", tokio::test)]
async fn ack() -> Result<()> {
let temp_dir = TempDirBuilder::new().prefix("tremor-wal").tempdir()?;
let path = temp_dir.path().to_path_buf();
let mut w = Wal::open(&path, 128, 10).await?;
assert_eq!(w.pop::<Vec<u8>>().await?, None);
let data = [b'A'; 721];
assert_eq!(1, w.push(data.to_vec()).await?);
assert_eq!(w.pop::<Vec<u8>>().await?, Some((1, data.to_vec())));
w.ack(1).await?;
assert_eq!(2, w.push(data.to_vec()).await?);
assert_eq!(w.pop::<Vec<u8>>().await?, Some((2, data.to_vec())));
assert_eq!(w.pop::<Vec<u8>>().await?, None);
w.ack(2).await?;
assert_eq!(3, w.push(data.to_vec()).await?);
assert_eq!(w.pop::<Vec<u8>>().await?, Some((3, data.to_vec())));
w.ack(3).await?;
Ok(())
}
}