use arbitrary::{Arbitrary, Unstructured};
use log::debug;
use mkit::{self, thread};
use std::{
ffi, fs, mem, ops, path,
sync::{Arc, RwLock},
vec,
};
use crate::{entry, journal, journal::Journal, state, writer, Error, Result};
pub const JOURNAL_LIMIT: usize = 1024 * 1024 * 1024;
pub const SYNC_BUFFER: usize = 1024;
#[derive(Debug, Clone)]
pub struct Config {
pub name: String,
pub dir: ffi::OsString,
pub journal_limit: usize,
pub fsync: bool,
}
impl Arbitrary for Config {
fn arbitrary(u: &mut Unstructured) -> arbitrary::Result<Self> {
let name: String = u.arbitrary()?;
let dir = tempfile::tempdir().unwrap().path().into();
let journal_limit = *u.choose(&[100, 1000, 10_000, 1_000_000])?;
let fsync: bool = u.arbitrary()?;
let config = Config {
name,
dir,
journal_limit,
fsync,
};
Ok(config)
}
}
impl Config {
pub fn new(name: &str, dir: &ffi::OsStr) -> Config {
Config {
name: name.to_string(),
dir: dir.to_os_string(),
journal_limit: JOURNAL_LIMIT,
fsync: true,
}
}
pub fn set_journal_limit(&mut self, journal_limit: usize) -> &mut Self {
self.journal_limit = journal_limit;
self
}
pub fn set_fsync(&mut self, fsync: bool) -> &mut Self {
self.fsync = fsync;
self
}
}
pub struct Wal<S = state::NoState> {
config: Config,
tx: thread::Tx<writer::Req, writer::Res>,
t: Arc<RwLock<mkit::thread::Thread<writer::Req, writer::Res, Result<u64>>>>,
w: Arc<RwLock<writer::Writer<S>>>,
}
impl<S> Clone for Wal<S> {
fn clone(&self) -> Wal<S> {
Wal {
config: self.config.clone(),
tx: self.tx.clone(),
t: Arc::clone(&self.t),
w: Arc::clone(&self.w),
}
}
}
impl<S> Wal<S> {
pub fn create(config: Config, state: S) -> Result<Wal<S>>
where
S: state::State,
{
fs::create_dir_all(&config.dir).ok();
for item in err_at!(IOError, fs::read_dir(&config.dir))? {
let file_path: path::PathBuf = {
let file_name = err_at!(IOError, item)?.file_name();
[config.dir.clone(), file_name.clone()].iter().collect()
};
match Journal::<S>::load_cold(&config.name, file_path.as_ref()) {
Some(journal) => match journal.purge() {
Ok(_) => (),
Err(err) => {
debug!(target: "wral", "failed to purge {:?}, {}", file_path, err)
}
},
None => continue,
};
}
let num = 0;
let journal = Journal::start(&config.name, &config.dir, num, state)?;
debug!(target: "wral", "{:?}/{} created", &config.dir, &config.name);
let seqno = 1;
let (w, t, tx) = writer::Writer::start(config.clone(), vec![], journal, seqno);
let val = Wal {
config,
tx,
t: Arc::new(RwLock::new(t)),
w,
};
Ok(val)
}
pub fn load(config: Config) -> Result<Wal<S>>
where
S: Default + state::State,
{
let mut journals: Vec<(Journal<S>, u64, S)> = vec![];
for item in err_at!(IOError, fs::read_dir(&config.dir))? {
let file_path: path::PathBuf = {
let file_name = err_at!(IOError, item)?.file_name();
[config.dir.clone(), file_name.clone()].iter().collect()
};
match Journal::load(&config.name, file_path.as_ref()) {
Some((journal, state)) => {
let seqno = journal.to_last_seqno().unwrap();
journals.push((journal, seqno, state));
}
None => debug!(target: "wral", "failed to load {:?}", file_path),
};
}
journals.sort_by(|(_, a, _), (_, b, _)| a.cmp(b));
let (mut seqno, num, state) = match journals.last() {
Some((j, seqno, state)) => (*seqno, j.to_journal_number(), state.clone()),
None => (0, 0, S::default()),
};
seqno += 1;
let num = num.saturating_add(1);
let journal = Journal::start(&config.name, &config.dir, num, state)?;
let n_batches: usize = journals.iter().map(|(j, _, _)| j.len_batches()).sum();
debug!(
target: "wral",
"{:?}/{} loaded with {} journals, {} batches",
config.dir, config.name, journals.len(), n_batches
);
let journals: Vec<Journal<S>> = journals.into_iter().map(|(j, _, _)| j).collect();
let (w, t, tx) = writer::Writer::start(config.clone(), journals, journal, seqno);
let val = Wal {
config,
tx,
t: Arc::new(RwLock::new(t)),
w,
};
Ok(val)
}
pub fn close(self, purge: bool) -> Result<Option<u64>> {
match Arc::try_unwrap(self.t) {
Ok(t) => {
mem::drop(self.tx);
(err_at!(IPCFail, t.into_inner())?.join()?)?;
match Arc::try_unwrap(self.w) {
Ok(w) => {
let w = err_at!(IPCFail, w.into_inner())?;
Ok(Some(if purge { w.purge()? } else { w.close()? }))
}
Err(_) => Ok(None), }
}
Err(_) => Ok(None), }
}
}
impl<S> Wal<S> {
pub fn add_op(&self, op: &[u8]) -> Result<u64> {
let req = writer::Req::AddEntry { op: op.to_vec() };
let writer::Res::Seqno(seqno) = self.tx.request(req)?;
Ok(seqno)
}
}
impl<S> Wal<S> {
pub fn iter(&self) -> Result<impl Iterator<Item = Result<entry::Entry>>> {
self.range(..)
}
pub fn range<R>(&self, range: R) -> Result<impl Iterator<Item = Result<entry::Entry>>>
where
R: ops::RangeBounds<u64>,
{
let journals = match Self::range_bound_to_range_inclusive(range) {
Some(range) => {
let rd = err_at!(Fatal, self.w.read())?;
let mut journals = vec![];
for jn in rd.journals.iter() {
journals.push(journal::RdJournal::from_journal(jn, range.clone())?);
}
journals.push(journal::RdJournal::from_journal(&rd.journal, range)?);
journals
}
None => vec![],
};
Ok(Iter {
journal: None,
journals: journals.into_iter(),
})
}
fn range_bound_to_range_inclusive<R>(range: R) -> Option<ops::RangeInclusive<u64>>
where
R: ops::RangeBounds<u64>,
{
let start = match range.start_bound() {
ops::Bound::Excluded(start) if *start < u64::MAX => Some(*start + 1),
ops::Bound::Excluded(_) => None,
ops::Bound::Included(start) => Some(*start),
ops::Bound::Unbounded => Some(0),
}?;
let end = match range.end_bound() {
ops::Bound::Excluded(0) => None,
ops::Bound::Excluded(end) => Some(*end - 1),
ops::Bound::Included(end) => Some(*end),
ops::Bound::Unbounded => Some(u64::MAX),
}?;
Some(start..=end)
}
}
struct Iter {
journal: Option<journal::RdJournal>,
journals: vec::IntoIter<journal::RdJournal>,
}
impl Iterator for Iter {
type Item = Result<entry::Entry>;
fn next(&mut self) -> Option<Self::Item> {
let mut journal = match self.journal.take() {
Some(journal) => journal,
None => self.journals.next()?,
};
loop {
match journal.next() {
Some(item) => {
self.journal = Some(journal);
return Some(item);
}
None => match self.journals.next() {
Some(j) => {
journal = j;
}
None => {
return None;
}
},
}
}
}
}
#[cfg(test)]
#[path = "wral_test.rs"]
mod wral_test;