use std::fmt;
use std::fs;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use minibytes::Bytes;
use once_cell::sync::OnceCell;
use tracing::debug;
use tracing::debug_span;
use tracing::trace;
use crate::errors::IoResultExt;
use crate::errors::ResultExt;
use crate::lock::ScopedDirLock;
use crate::lock::READER_LOCK_OPTS;
use crate::log;
use crate::log::FlushFilterContext;
use crate::log::FlushFilterFunc;
use crate::log::FlushFilterOutput;
use crate::log::IndexDef;
use crate::log::Log;
use crate::repair::OpenOptionsOutput;
use crate::repair::OpenOptionsRepair;
use crate::repair::RepairMessage;
use crate::utils;
pub struct RotateLog {
dir: Option<PathBuf>,
open_options: OpenOptions,
logs: Vec<OnceCell<Log>>,
logs_len: AtomicUsize,
latest: u8,
reader_lock: Option<ScopedDirLock>,
#[cfg(test)]
hook_after_log_sync: Option<Box<dyn Fn()>>,
}
const LATEST_FILE: &str = "latest";
#[derive(Clone)]
pub struct OpenOptions {
pub(crate) max_bytes_per_log: u64,
pub(crate) max_log_count: u8,
pub(crate) log_open_options: log::OpenOptions,
pub(crate) auto_sync_threshold: Option<u64>,
}
impl OpenOptions {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let max_log_count = 2;
let max_bytes_per_log = 2_000_000_000; Self {
max_bytes_per_log,
max_log_count,
log_open_options: log::OpenOptions::new(),
auto_sync_threshold: None,
}
}
pub fn max_log_count(mut self, count: u8) -> Self {
assert!(count >= 1);
self.max_log_count = count;
self
}
pub fn max_bytes_per_log(mut self, bytes: u64) -> Self {
assert!(bytes > 0);
self.max_bytes_per_log = bytes;
self
}
pub fn checksum_type(mut self, checksum_type: log::ChecksumType) -> Self {
self.log_open_options = self.log_open_options.checksum_type(checksum_type);
self
}
pub fn create(mut self, create: bool) -> Self {
self.log_open_options = self.log_open_options.create(create);
self
}
pub fn index(mut self, name: &'static str, func: fn(&[u8]) -> Vec<log::IndexOutput>) -> Self {
self.log_open_options = self.log_open_options.index(name, func);
self
}
pub fn index_defs(mut self, index_defs: Vec<IndexDef>) -> Self {
self.log_open_options = self.log_open_options.index_defs(index_defs);
self
}
pub fn flush_filter(mut self, flush_filter: Option<FlushFilterFunc>) -> Self {
self.log_open_options = self.log_open_options.flush_filter(flush_filter);
self
}
pub fn auto_sync_threshold(mut self, threshold: impl Into<Option<u64>>) -> Self {
self.auto_sync_threshold = threshold.into();
self
}
pub fn open(&self, dir: impl AsRef<Path>) -> crate::Result<RotateLog> {
let dir = dir.as_ref();
let result: crate::Result<_> = (|| {
let reader_lock = ScopedDirLock::new_with_options(dir, &READER_LOCK_OPTS)?;
let span = debug_span!("RotateLog::open", dir = &dir.to_string_lossy().as_ref());
let _guard = span.enter();
let latest_and_log = read_latest_and_logs(dir, &self);
let (latest, logs) = match latest_and_log {
Ok((latest, logs)) => (latest, logs),
Err(e) => {
if !self.log_open_options.create {
return Err(e)
.context("not creating new logs since OpenOption::create is not set");
} else {
utils::mkdir_p(dir)?;
let lock = ScopedDirLock::new(&dir)?;
match read_latest_raw(dir) {
Ok(latest) => {
match read_logs(dir, &self, latest) {
Ok(logs) => {
(latest, logs)
}
Err(err) => {
let latest = latest.wrapping_add(1);
match create_empty_log(Some(dir), &self, latest, &lock) {
Ok(new_log) => {
if let Ok(logs) = read_logs(dir, &self, latest) {
(latest, logs)
} else {
(latest, vec![create_log_cell(new_log)])
}
}
Err(new_log_err) => {
let msg = "cannot create new empty log after failing to read existing logs";
return Err(new_log_err.message(msg).source(err));
}
}
}
}
}
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
let latest = 0;
let new_log =
create_empty_log(Some(dir), &self, latest, &lock)?;
(latest, vec![create_log_cell(new_log)])
} else {
let corrupted = err.kind() == io::ErrorKind::InvalidData;
let mut result = Err(err).context(dir, "cannot read 'latest'");
if corrupted {
result = result.corruption();
}
return result;
}
}
}
}
}
};
let logs_len = AtomicUsize::new(logs.len());
Ok(RotateLog {
dir: Some(dir.into()),
open_options: self.clone(),
logs,
logs_len,
latest,
reader_lock: Some(reader_lock),
#[cfg(test)]
hook_after_log_sync: None,
})
})();
result.context(|| format!("in rotate::OpenOptions::open({:?})", dir))
}
pub fn create_in_memory(&self) -> crate::Result<RotateLog> {
let result: crate::Result<_> = (|| {
let cell = create_log_cell(self.log_open_options.open(())?);
let mut logs = Vec::with_capacity(1);
logs.push(cell);
let logs_len = AtomicUsize::new(logs.len());
Ok(RotateLog {
dir: None,
open_options: self.clone(),
logs,
logs_len,
latest: 0,
reader_lock: None,
#[cfg(test)]
hook_after_log_sync: None,
})
})();
result.context("in rotate::OpenOptions::create_in_memory")
}
pub fn repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
let dir = dir.as_ref();
(|| -> crate::Result<_> {
let _lock = ScopedDirLock::new(dir)?;
let mut message = RepairMessage::new(dir);
message += &format!("Processing RotateLog: {:?}\n", dir);
let read_dir = dir.read_dir().context(dir, "cannot readdir")?;
let mut ids = Vec::new();
for entry in read_dir {
let entry = entry.context(dir, "cannot readdir")?;
let name = entry.file_name();
if let Some(name) = name.to_str() {
if let Ok(id) = name.parse::<u8>() {
ids.push(id);
}
}
}
ids.sort_unstable();
for &id in ids.iter() {
let name = id.to_string();
message += &format!("Attempt to repair log {:?}\n", name);
match self.log_open_options.repair(&dir.join(name)) {
Ok(log) => message += &log,
Err(err) => message += &format!("Failed: {}\n", err),
}
}
let latest_path = dir.join(LATEST_FILE);
match read_latest_raw(dir) {
Ok(latest) => message += &format!("Latest = {}\n", latest),
Err(err) => match err.kind() {
io::ErrorKind::NotFound
| io::ErrorKind::InvalidData
| io::ErrorKind::UnexpectedEof => {
let latest = guess_latest(ids);
let content = format!("{}", latest);
let fsync = false;
utils::atomic_write(&latest_path, content, fsync)?;
message += &format!("Reset latest to {}\n", latest);
}
_ => return Err(err).context(&latest_path, "cannot read or parse"),
},
};
Ok(message.into_string())
})()
.context(|| format!("in rotate::OpenOptions::repair({:?})", dir))
}
}
impl OpenOptionsRepair for OpenOptions {
fn open_options_repair(&self, dir: impl AsRef<Path>) -> crate::Result<String> {
OpenOptions::repair(self, dir.as_ref())
}
}
impl OpenOptionsOutput for OpenOptions {
type Output = RotateLog;
fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
self.open(path)
}
}
impl fmt::Debug for OpenOptions {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "OpenOptions {{ ")?;
write!(f, "max_bytes_per_log: {}, ", self.max_bytes_per_log)?;
write!(f, "max_log_count: {}, ", self.max_log_count)?;
write!(f, "auto_sync_threshold: {:?}, ", self.auto_sync_threshold)?;
write!(f, "log_open_options: {:?} }}", &self.log_open_options)?;
Ok(())
}
}
impl RotateLog {
pub fn append(&mut self, data: impl AsRef<[u8]>) -> crate::Result<()> {
(|| -> crate::Result<_> {
let threshold = self.open_options.auto_sync_threshold;
let log = self.writable_log();
log.append(data)?;
if let Some(threshold) = threshold {
if log.mem_buf.len() as u64 >= threshold {
self.sync()
.context("sync triggered by auto_sync_threshold")?;
}
}
Ok(())
})()
.context("in RotateLog::append")
}
pub fn lookup(
&self,
index_id: usize,
key: impl Into<Bytes>,
) -> crate::Result<RotateLogLookupIter> {
let key = key.into();
let result: crate::Result<_> = (|| {
Ok(RotateLogLookupIter {
inner_iter: self.logs[0].get().unwrap().lookup(index_id, &key)?,
end: false,
log_rotate: self,
log_index: 0,
index_id,
key: key.clone(),
})
})();
result
.context(|| format!("in RotateLog::lookup({}, {:?})", index_id, key.as_ref()))
.context(|| format!(" RotateLog.dir = {:?}", self.dir))
}
pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
for log in &self.logs {
if let Some(log) = log.get() {
if log.disk_buf.range_of_slice(slice).is_some() {
return log.slice_to_bytes(slice);
}
}
}
Bytes::copy_from_slice(slice)
}
pub fn lookup_latest(
&self,
index_id: usize,
key: impl AsRef<[u8]>,
) -> crate::Result<log::LogLookupIter> {
let key = key.as_ref();
assert!(
self.open_options.log_open_options.flush_filter.is_some(),
"programming error: flush_filter should also be set"
);
self.logs[0]
.get()
.unwrap()
.lookup(index_id, key)
.context(|| format!("in RotateLog::lookup_latest({}, {:?})", index_id, key))
.context(|| format!(" RotateLog.dir = {:?}", self.dir))
}
pub fn sync(&mut self) -> crate::Result<u8> {
let result: crate::Result<_> = (|| {
let span = debug_span!("RotateLog::sync", latest = self.latest as u32);
if let Some(dir) = &self.dir {
span.record("dir", &dir.to_string_lossy().as_ref());
}
let _guard = span.enter();
if self.dir.is_none() {
return Ok(0);
}
if self.writable_log().iter_dirty().next().is_none() {
if let Ok(latest) = read_latest(self.dir.as_ref().unwrap()) {
if latest != self.latest {
self.set_logs(read_logs(
self.dir.as_ref().unwrap(),
&self.open_options,
latest,
)?);
self.latest = latest;
}
self.writable_log().sync()?;
} else {
}
} else {
let dir = self.dir.clone().unwrap();
let lock = ScopedDirLock::new(&dir)?;
let latest = read_latest(self.dir.as_ref().unwrap())?;
if latest != self.latest {
let mut new_logs =
read_logs(self.dir.as_ref().unwrap(), &self.open_options, latest)?;
if let Some(filter) = self.open_options.log_open_options.flush_filter {
let log = new_logs[0].get_mut().unwrap();
for entry in self.writable_log().iter_dirty() {
let content = entry?;
let context = FlushFilterContext { log };
match filter(&context, content).map_err(|err| {
crate::Error::wrap(err, "failed to run filter function")
})? {
FlushFilterOutput::Drop => {}
FlushFilterOutput::Keep => log.append(content)?,
FlushFilterOutput::Replace(content) => log.append(content)?,
}
}
} else {
let log = new_logs[0].get_mut().unwrap();
for entry in self.writable_log().iter_dirty() {
let bytes = entry?;
log.append(bytes)?;
}
}
self.set_logs(new_logs);
self.latest = latest;
}
let size = self.writable_log().flush()?;
#[cfg(test)]
if let Some(func) = self.hook_after_log_sync.as_ref() {
func();
}
if size >= self.open_options.max_bytes_per_log {
self.writable_log().finalize_indexes(&lock)?;
self.rotate_internal(&lock)?;
}
}
Ok(self.latest)
})();
result
.context("in RotateLog::sync")
.context(|| format!(" RotateLog.dir = {:?}", self.dir))
}
pub fn remove_old_logs(&mut self) -> crate::Result<()> {
if let Some(dir) = &self.dir {
let lock = ScopedDirLock::new(dir)?;
let latest = read_latest(dir)?;
if latest == self.latest {
self.try_remove_old_logs(&lock);
}
}
Ok(())
}
fn rotate_internal(&mut self, lock: &ScopedDirLock) -> crate::Result<()> {
let span = debug_span!("RotateLog::rotate", latest = self.latest as u32);
if let Some(dir) = &self.dir {
span.record("dir", &dir.to_string_lossy().as_ref());
}
let _guard = span.enter();
let next = self.latest.wrapping_add(1);
let log = create_empty_log(
Some(self.dir.as_ref().unwrap()),
&self.open_options,
next,
&lock,
)?;
if self.logs.len() >= self.open_options.max_log_count as usize {
self.logs.pop();
}
self.logs.insert(0, create_log_cell(log));
self.logs_len = AtomicUsize::new(self.logs.len());
self.latest = next;
self.try_remove_old_logs(lock);
Ok(())
}
pub fn flush(&mut self) -> crate::Result<u8> {
self.sync()
}
fn set_logs(&mut self, logs: Vec<OnceCell<Log>>) {
self.logs_len = AtomicUsize::new(logs.len());
self.logs = logs;
}
#[allow(clippy::nonminimal_bool)]
fn try_remove_old_logs(&self, _lock: &ScopedDirLock) {
if let Ok(read_dir) = self.dir.as_ref().unwrap().read_dir() {
let latest = self.latest;
let earliest = latest.wrapping_sub(self.open_options.max_log_count - 1);
for entry in read_dir {
if let Ok(entry) = entry {
let name = entry.file_name();
debug!("Inspecting {:?} for rotate log removal", name);
if let Some(name) = name.to_str() {
if let Ok(id) = name.parse::<u8>() {
if (latest >= earliest && (id > latest || id < earliest))
|| (latest < earliest && (id > latest && id < earliest))
{
match fs::remove_file(entry.path().join(log::META_FILE)) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
}
Err(e) => {
debug!(
"Error removing rotate log meta: {:?} {:?}",
name, e
);
continue;
}
}
let res = fs::remove_dir_all(entry.path());
match res {
Ok(_) => debug!("Removed rotate log: {:?}", name),
Err(err) => {
debug!("Error removing rotate log directory: {:?}", err)
}
};
} else {
debug!(
"Not removing rotate log: {:?} (latest: {:?}, earliest: {:?})",
name, latest, earliest
);
}
}
}
}
}
}
}
fn writable_log(&mut self) -> &mut Log {
self.logs[0].get_mut().unwrap()
}
fn load_log(&self, index: usize) -> crate::Result<Option<&Log>> {
if index >= self.logs_len.load(SeqCst) {
return Ok(None);
}
match self.logs.get(index) {
Some(cell) => {
let id = self.latest.wrapping_sub(index as u8);
if let Some(dir) = &self.dir {
let log = cell.get_or_try_init(|| {
let mut open_options = self.open_options.log_open_options.clone();
if index > 0 {
open_options = open_options.with_zero_index_lag();
}
let log = load_log(&dir, id, open_options);
trace!(
name = "RotateLog::load_log",
index = index,
success = log.is_ok()
);
log
});
match log {
Ok(log) => Ok(Some(log)),
Err(err) => {
self.logs_len.store(index, SeqCst);
Err(err)
}
}
} else {
Ok(cell.get())
}
}
None => unreachable!(),
}
}
pub fn iter(&self) -> impl Iterator<Item = crate::Result<&[u8]>> {
let logs = self.logs();
logs.into_iter().rev().flat_map(|log| log.iter())
}
pub fn iter_dirty(&self) -> impl Iterator<Item = crate::Result<&[u8]>> {
self.logs[0].get().unwrap().iter_dirty()
}
}
fn create_log_cell(log: Log) -> OnceCell<Log> {
let cell = OnceCell::new();
cell.set(log)
.expect("cell is empty so cell.set cannot fail");
cell
}
fn load_log(dir: &Path, id: u8, open_options: log::OpenOptions) -> crate::Result<Log> {
let name = format!("{}", id);
let log_path = dir.join(&name);
open_options.create(false).open(&log_path)
}
pub trait RotateLowLevelExt {
fn logs(&self) -> Vec<&Log>;
fn force_rotate(&mut self) -> crate::Result<()>;
}
impl RotateLowLevelExt for RotateLog {
fn logs(&self) -> Vec<&Log> {
(0..)
.map(|i| self.load_log(i))
.take_while(|res| match res {
Ok(Some(_)) => true,
_ => false,
})
.map(|res| res.unwrap().unwrap())
.collect()
}
fn force_rotate(&mut self) -> crate::Result<()> {
if self.dir.is_none() {
return Ok(());
}
let dir = self.dir.clone().unwrap();
let lock = ScopedDirLock::new(&dir)?;
self.latest = read_latest(self.dir.as_ref().unwrap())?;
self.rotate_internal(&lock)?;
self.set_logs(read_logs(
self.dir.as_ref().unwrap(),
&self.open_options,
self.latest,
)?);
Ok(())
}
}
pub struct RotateLogLookupIter<'a> {
inner_iter: log::LogLookupIter<'a>,
end: bool,
log_rotate: &'a RotateLog,
log_index: usize,
index_id: usize,
key: Bytes,
}
impl<'a> Iterator for RotateLogLookupIter<'a> {
type Item = crate::Result<&'a [u8]>;
fn next(&mut self) -> Option<Self::Item> {
if self.end {
return None;
}
match self.inner_iter.next() {
None => {
if self.log_index + 1 >= self.log_rotate.logs.len() {
self.end = true;
None
} else {
self.log_index += 1;
match self.log_rotate.load_log(self.log_index) {
Ok(None) => {
self.end = true;
return None;
}
Err(_err) => {
self.end = true;
return None;
}
Ok(Some(log)) => {
self.inner_iter = match log.lookup(self.index_id, &self.key) {
Err(err) => {
self.end = true;
return Some(Err(err));
}
Ok(iter) => iter,
}
}
}
self.next()
}
}
Some(Err(err)) => {
self.end = true;
Some(Err(err))
}
Some(Ok(slice)) => Some(Ok(slice)),
}
}
}
fn create_empty_log(
dir: Option<&Path>,
open_options: &OpenOptions,
latest: u8,
_lock: &ScopedDirLock,
) -> crate::Result<Log> {
Ok(match dir {
Some(dir) => {
let latest_path = dir.join(LATEST_FILE);
let latest_str = format!("{}", latest);
let log_path = dir.join(&latest_str);
let opts = open_options.log_open_options.clone().create(true);
opts.delete_content(&log_path)?;
let log = opts.open(&log_path)?;
utils::atomic_write(&latest_path, latest_str.as_bytes(), false)?;
log
}
None => open_options.log_open_options.clone().open(())?,
})
}
fn read_latest(dir: &Path) -> crate::Result<u8> {
read_latest_raw(dir).context(dir, "cannot read latest")
}
fn read_latest_raw(dir: &Path) -> io::Result<u8> {
let latest_path = dir.join(LATEST_FILE);
let data = utils::atomic_read(&latest_path)?;
let content: String = String::from_utf8(data).map_err(|_e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("{:?}: failed to read as utf8 string", latest_path),
)
})?;
let id: u8 = content.parse().map_err(|_e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!(
"{:?}: failed to parse {:?} as u8 integer",
latest_path, content
),
)
})?;
Ok(id)
}
fn read_logs(
dir: &Path,
open_options: &OpenOptions,
latest: u8,
) -> crate::Result<Vec<OnceCell<Log>>> {
let mut logs = Vec::with_capacity(open_options.max_log_count as usize);
let log = load_log(dir, latest, open_options.log_open_options.clone())?;
logs.push(create_log_cell(log));
for index in 1..open_options.max_log_count {
let id = latest.wrapping_sub(index);
let name = format!("{}", id);
let log_path = dir.join(&name);
if !log_path.is_dir() {
break;
}
logs.push(OnceCell::new());
}
trace!(
name = "RotateLog::read_logs",
max_log_count = open_options.max_log_count,
logs_len = logs.len()
);
Ok(logs)
}
fn read_latest_and_logs(
dir: &Path,
open_options: &OpenOptions,
) -> crate::Result<(u8, Vec<OnceCell<Log>>)> {
let latest = read_latest(dir)?;
Ok((latest, read_logs(dir, open_options, latest)?))
}
fn guess_latest(mut ids: Vec<u8>) -> u8 {
ids.sort_unstable();
let mut id_to_ignore = 255;
loop {
match ids.pop() {
Some(id) => {
if id == id_to_ignore {
id_to_ignore -= 1;
if id_to_ignore == 0 {
break 0;
}
continue;
} else {
break id;
}
}
None => {
break 0;
}
}
}
}
#[cfg(test)]
mod tests {
use log::IndexOutput;
use tempfile::tempdir;
use super::*;
#[test]
fn test_open() {
let dir = tempdir().unwrap();
let path = dir.path().join("rotate");
assert!(OpenOptions::new().create(false).open(&path).is_err());
assert!(OpenOptions::new().create(true).open(&path).is_ok());
assert!(
OpenOptions::new()
.checksum_type(log::ChecksumType::Xxhash64)
.create(false)
.open(&path)
.is_ok()
);
}
fn lookup<'a>(rotate: &'a RotateLog, key: &[u8]) -> Vec<&'a [u8]> {
let values = rotate
.lookup(0, key.to_vec())
.unwrap()
.collect::<crate::Result<Vec<&[u8]>>>()
.unwrap();
for value in &values {
let b1 = rotate.slice_to_bytes(value);
let b2 = rotate.slice_to_bytes(value);
if rotate
.iter_dirty()
.any(|i| i.unwrap().as_ptr() == value.as_ptr())
{
continue;
}
assert_eq!(
b1.as_ptr(),
b2.as_ptr(),
"slice_to_bytes should return zero-copy"
);
}
values
}
fn iter(rotate: &RotateLog) -> Vec<&[u8]> {
rotate
.iter()
.collect::<crate::Result<Vec<&[u8]>>>()
.unwrap()
}
#[test]
fn test_trivial_append_lookup() {
let dir = tempdir().unwrap();
let opts = OpenOptions::new()
.create(true)
.index_defs(vec![IndexDef::new("two-bytes", |_| {
vec![IndexOutput::Reference(0..2)]
})]);
let rotate = opts.clone().open(&dir).unwrap();
let rotate_mem = opts.clone().create_in_memory().unwrap();
for rotate in &mut [rotate, rotate_mem] {
rotate.append(b"aaa").unwrap();
rotate.append(b"abbb").unwrap();
rotate.append(b"abc").unwrap();
assert_eq!(lookup(&rotate, b"aa"), vec![b"aaa"]);
assert_eq!(lookup(&rotate, b"ab"), vec![&b"abc"[..], b"abbb"]);
assert_eq!(lookup(&rotate, b"ac"), Vec::<&[u8]>::new());
}
}
#[test]
fn test_simple_rotate() {
let dir = tempdir().unwrap();
let mut rotate = OpenOptions::new()
.create(true)
.max_bytes_per_log(100)
.max_log_count(2)
.index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
.open(&dir)
.unwrap();
rotate.append(b"a").unwrap();
assert_eq!(rotate.sync().unwrap(), 0);
rotate.append(b"a").unwrap();
assert_eq!(rotate.sync().unwrap(), 0);
rotate.append(vec![b'b'; 100]).unwrap();
assert_eq!(rotate.sync().unwrap(), 1);
assert_eq!(lookup(&rotate, b"a").len(), 2);
rotate.append(vec![b'c'; 50]).unwrap();
assert_eq!(rotate.sync().unwrap(), 1);
rotate.append(vec![b'd'; 50]).unwrap();
assert_eq!(rotate.sync().unwrap(), 2);
assert_eq!(lookup(&rotate, b"a").len(), 0);
assert_eq!(lookup(&rotate, b"b").len(), 0);
assert_eq!(lookup(&rotate, b"c").len(), 1);
assert_eq!(lookup(&rotate, b"d").len(), 1);
assert!(!dir.path().join("0").exists());
}
#[test]
fn test_manual_remove_old_logs() {
let dir = tempdir().unwrap();
let dir = &dir;
let open = |n: u8| -> RotateLog {
OpenOptions::new()
.create(true)
.max_bytes_per_log(1)
.max_log_count(n)
.open(dir)
.unwrap()
};
let read_all =
|log: &RotateLog| -> Vec<Vec<u8>> { log.iter().map(|v| v.unwrap().to_vec()).collect() };
{
let mut rotate = open(5);
for i in 0..5 {
rotate.append(vec![i]).unwrap();
rotate.sync().unwrap();
}
}
{
let rotate = open(4);
assert_eq!(read_all(&rotate), [[2], [3], [4]]);
let rotate = open(3);
assert_eq!(read_all(&rotate), [[3], [4]]);
}
{
let mut rotate = open(3);
rotate.remove_old_logs().unwrap();
}
{
let rotate = open(4);
assert_eq!(read_all(&rotate), [[3], [4]]);
}
}
fn test_wrapping_rotate(max_log_count: u8) {
let dir = tempdir().unwrap();
let mut rotate = OpenOptions::new()
.create(true)
.max_bytes_per_log(10)
.max_log_count(max_log_count)
.open(&dir)
.unwrap();
let count = || {
fs::read_dir(&dir)
.unwrap()
.map(|entry| entry.unwrap().file_name().into_string().unwrap())
.filter(|name| name != "lock" && name != "rlock")
.count()
};
for i in 1..=(max_log_count - 1) {
rotate.append(b"abcdefghijklmn").unwrap();
assert_eq!(rotate.sync().unwrap(), i);
assert_eq!(count(), (i as usize) + 2);
}
for i in max_log_count..=255 {
rotate.append(b"abcdefghijklmn").unwrap();
assert_eq!(rotate.sync().unwrap(), i);
assert_eq!(count(), (max_log_count as usize) + 1);
}
for _ in 0..=max_log_count {
rotate.append(b"abcdefghijklmn").unwrap();
assert_eq!(count(), (max_log_count as usize) + 1);
}
}
#[test]
fn test_wrapping_rotate_10() {
test_wrapping_rotate(10)
}
#[test]
fn test_wrapping_rotate_255() {
test_wrapping_rotate(255)
}
#[test]
fn test_force_rotate() {
let dir = tempdir().unwrap();
let mut rotate = OpenOptions::new()
.create(true)
.max_bytes_per_log(1 << 30)
.max_log_count(3)
.open(&dir)
.unwrap();
use super::RotateLowLevelExt;
assert_eq!(rotate.logs().len(), 1);
rotate.force_rotate().unwrap();
assert_eq!(rotate.logs().len(), 2);
rotate.force_rotate().unwrap();
assert_eq!(rotate.logs().len(), 3);
rotate.force_rotate().unwrap();
assert_eq!(rotate.logs().len(), 3);
}
#[test]
fn test_lookup_rotated() {
let dir = tempdir().unwrap();
let open_opts = OpenOptions::new()
.create(true)
.max_bytes_per_log(1)
.max_log_count(3)
.index("first-byte", |_| vec![IndexOutput::Reference(0..1)]);
let mut rotate1 = open_opts.open(&dir).unwrap();
rotate1.append(b"a1").unwrap();
assert_eq!(rotate1.sync().unwrap(), 1);
rotate1.append(b"a2").unwrap();
assert_eq!(rotate1.sync().unwrap(), 2);
assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
let rotate2 = open_opts.open(&dir).unwrap();
let mut rotate3 = open_opts.open(&dir).unwrap();
rotate3.append(b"a3").unwrap();
assert_eq!(rotate3.sync().unwrap(), 3);
assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
assert_eq!(lookup(&rotate2, b"a"), vec![b"a2"]);
assert_eq!(iter(&rotate2), vec![b"a2"]);
}
#[test]
fn test_lookup_truncated_meta() {
let dir = tempdir().unwrap();
let open_opts = OpenOptions::new()
.create(true)
.max_bytes_per_log(1)
.max_log_count(3)
.index("first-byte", |_| vec![IndexOutput::Reference(0..1)]);
let mut rotate1 = open_opts.open(&dir).unwrap();
rotate1.append(b"a1").unwrap();
assert_eq!(rotate1.sync().unwrap(), 1);
rotate1.append(b"a2").unwrap();
assert_eq!(rotate1.sync().unwrap(), 2);
assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
let rotate2 = open_opts.open(&dir).unwrap();
utils::atomic_write(dir.path().join("0").join(log::META_FILE), "", false).unwrap();
assert_eq!(lookup(&rotate1, b"a"), vec![b"a2", b"a1"]);
assert_eq!(iter(&rotate1), vec![b"a1", b"a2"]);
assert_eq!(lookup(&rotate2, b"a"), vec![b"a2"]);
assert_eq!(iter(&rotate2), vec![b"a2"]);
}
#[test]
fn test_concurrent_writes() {
let dir = tempdir().unwrap();
let mut rotate1 = OpenOptions::new()
.create(true)
.max_bytes_per_log(100)
.max_log_count(2)
.open(&dir)
.unwrap();
let mut rotate2 = OpenOptions::new()
.max_bytes_per_log(100)
.max_log_count(2)
.open(&dir)
.unwrap();
rotate1.append(vec![b'a'; 100]).unwrap();
assert_eq!(rotate1.sync().unwrap(), 1);
let size = |log_index: u64| {
dir.path()
.join(format!("{}", log_index))
.join(log::PRIMARY_FILE)
.metadata()
.unwrap()
.len()
};
let size1 = size(1);
rotate2.append(vec![b'b'; 100]).unwrap();
assert_eq!(rotate2.sync().unwrap(), 2);
#[cfg(unix)]
{
assert!(!dir.path().join("0").exists());
}
assert!(size(1) > size1 + 100);
assert!(size(2) > 0);
}
#[test]
fn test_flush_filter() {
let dir = tempdir().unwrap();
let read_log = |name: &str| -> Vec<Vec<u8>> {
let log = Log::open(dir.path().join(name), Vec::new()).unwrap();
log.iter().map(|v| v.unwrap().to_vec()).collect()
};
let mut rotate1 = OpenOptions::new()
.create(true)
.max_bytes_per_log(100)
.flush_filter(Some(|ctx, bytes| {
assert!(!ctx.log.iter().any(|x| x.unwrap() == b"aa"));
Ok(match bytes.len() {
1 => FlushFilterOutput::Replace(b"xx".to_vec()),
_ => FlushFilterOutput::Keep,
})
}))
.open(&dir)
.unwrap();
let mut rotate2 = OpenOptions::new()
.max_bytes_per_log(100)
.open(&dir)
.unwrap();
rotate2.append(vec![b'a'; 3]).unwrap();
rotate2.sync().unwrap();
rotate1.append(vec![b'a'; 1]).unwrap(); rotate1.append(vec![b'a'; 2]).unwrap();
assert_eq!(rotate1.sync().unwrap(), 0); assert_eq!(read_log("0"), vec![&b"aaa"[..], b"xx", b"aa"]);
rotate1.append(vec![b'a'; 1]).unwrap(); assert_eq!(rotate1.sync().unwrap(), 0); assert_eq!(read_log("0").last().unwrap(), b"a");
rotate1.append(vec![b'a'; 1]).unwrap(); rotate1.append(vec![b'a'; 2]).unwrap();
rotate2.append(vec![b'a'; 100]).unwrap(); assert_eq!(rotate2.sync().unwrap(), 1);
assert_eq!(rotate1.sync().unwrap(), 1); assert_eq!(read_log("1"), vec![b"xx", b"aa"]);
}
#[test]
fn test_lookup_latest() {
let dir = tempdir().unwrap();
let mut rotate = OpenOptions::new()
.create(true)
.max_bytes_per_log(100)
.flush_filter(Some(|_, _| panic!()))
.index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
.open(&dir)
.unwrap();
rotate.append(vec![b'a'; 101]).unwrap();
rotate.sync().unwrap(); rotate.append(vec![b'b'; 10]).unwrap();
assert_eq!(rotate.lookup_latest(0, b"b").unwrap().count(), 1);
assert_eq!(rotate.lookup_latest(0, b"a").unwrap().count(), 0);
rotate.append(vec![b'c'; 101]).unwrap();
rotate.sync().unwrap();
rotate.append(vec![b'd'; 10]).unwrap();
rotate.sync().unwrap(); rotate.append(vec![b'e'; 10]).unwrap();
assert_eq!(rotate.lookup_latest(0, b"c").unwrap().count(), 0);
assert_eq!(rotate.lookup_latest(0, b"d").unwrap().count(), 1);
assert_eq!(rotate.lookup_latest(0, b"e").unwrap().count(), 1);
}
#[test]
#[should_panic]
fn test_lookup_latest_panic() {
let dir = tempdir().unwrap();
let rotate = OpenOptions::new()
.create(true)
.index("first-byte", |_| vec![IndexOutput::Reference(0..1)])
.open(&dir)
.unwrap();
rotate.lookup_latest(0, b"a").unwrap(); }
#[test]
fn test_iter() {
let dir = tempdir().unwrap();
let mut rotate = OpenOptions::new()
.create(true)
.max_bytes_per_log(100)
.open(&dir)
.unwrap();
let a = vec![b'a'; 101];
let b = vec![b'b'; 10];
rotate.append(a.clone()).unwrap();
assert_eq!(
rotate.iter_dirty().collect::<Result<Vec<_>, _>>().unwrap(),
vec![&a[..]]
);
rotate.sync().unwrap(); rotate.append(b.clone()).unwrap();
rotate.append(a.clone()).unwrap();
rotate.append(a.clone()).unwrap();
assert_eq!(
rotate.iter_dirty().collect::<Result<Vec<_>, _>>().unwrap(),
vec![&b[..], &a, &a]
);
assert_eq!(
rotate.iter().map(|e| e.unwrap()).collect::<Vec<&[u8]>>(),
vec![&a[..], &b, &a, &a],
);
rotate.sync().unwrap(); assert_eq!(
rotate.iter().map(|e| e.unwrap()).collect::<Vec<&[u8]>>(),
vec![&b[..], &a, &a],
);
}
#[test]
fn test_recover_from_empty_logs() {
let dir = tempdir().unwrap();
let rotate = OpenOptions::new().create(true).open(&dir).unwrap();
drop(rotate);
for dirent in fs::read_dir(&dir).unwrap() {
let dirent = dirent.unwrap();
let path = dirent.path();
if path.is_dir() {
fs::remove_dir_all(path).unwrap();
}
}
let _ = OpenOptions::new().create(true).open(&dir).unwrap();
}
#[test]
fn test_recover_from_occupied_logs() {
let dir = tempdir().unwrap();
{
let mut log = log::OpenOptions::new()
.create(true)
.open(&dir.path().join("1"))
.unwrap();
log.append(&[b'b'; 100][..]).unwrap();
log.append(&[b'c'; 100][..]).unwrap();
log.sync().unwrap();
}
let mut rotate = OpenOptions::new()
.create(true)
.max_bytes_per_log(100)
.max_log_count(3)
.open(&dir)
.unwrap();
for i in [1, 2] {
rotate.append(vec![b'a'; 101]).unwrap();
assert_eq!(rotate.sync().unwrap(), i); }
assert_eq!(
rotate.iter().map(|b| b.unwrap()[0]).collect::<Vec<_>>(),
vec![b'a'; 2]
);
}
#[test]
fn test_index_lag() {
let dir = tempdir().unwrap();
let opts = OpenOptions::new()
.create(true)
.index_defs(vec![
IndexDef::new("idx", |_| vec![IndexOutput::Reference(0..2)])
.lag_threshold(u64::max_value()),
])
.max_bytes_per_log(100)
.max_log_count(3);
let size = |name: &str| dir.path().join(name).metadata().unwrap().len();
let mut rotate = opts.clone().open(&dir).unwrap();
rotate.append(vec![b'x'; 200]).unwrap();
rotate.sync().unwrap();
rotate.append(vec![b'y'; 200]).unwrap();
rotate.sync().unwrap();
rotate.append(vec![b'z'; 10]).unwrap();
rotate.sync().unwrap();
assert!(size("0/index2-idx") > 0);
assert!(size("0/log") > 100);
assert!(size("1/index2-idx") > 0);
assert!(size("1/log") > 100);
assert_eq!(size("2/index2-idx"), 25);
assert!(size("2/log") < 100);
}
#[test]
fn test_sync_missing_latest() {
let dir = tempdir().unwrap();
let opts = OpenOptions::new()
.max_bytes_per_log(10000)
.max_log_count(10);
let mut rotate = opts.clone().create(true).open(&dir).unwrap();
rotate.append(vec![b'x'; 200]).unwrap();
rotate.sync().unwrap();
let mut rotate2 = opts.clone().open(&dir).unwrap();
fs::remove_file(dir.path().join(LATEST_FILE)).unwrap();
rotate2.sync().unwrap(); rotate2.append(vec![b'y'; 200]).unwrap();
rotate2.sync().unwrap_err(); }
#[test]
fn test_auto_sync_threshold() {
let dir = tempdir().unwrap();
let opts = OpenOptions::new().auto_sync_threshold(100).create(true);
let mut rotate = opts.clone().create(true).open(&dir).unwrap();
rotate.append(vec![b'x'; 50]).unwrap();
assert_eq!(rotate.logs()[0].iter_dirty().count(), 1);
rotate.append(vec![b'x'; 50]).unwrap(); assert_eq!(rotate.logs()[0].iter_dirty().count(), 0);
}
#[test]
fn test_auto_sync_threshold_with_racy_index_update_on_open() {
fn index_defs(lag_threshold: u64) -> Vec<IndexDef> {
let index_names = ["a"];
(0..index_names.len())
.map(|i| {
IndexDef::new(&index_names[i], |_| vec![IndexOutput::Reference(0..1)])
.lag_threshold(lag_threshold)
})
.collect()
}
fn open_opts(lag_threshold: u64) -> OpenOptions {
let index_defs = index_defs(lag_threshold);
OpenOptions::new()
.auto_sync_threshold(1000)
.max_bytes_per_log(400)
.max_log_count(10)
.create(true)
.index_defs(index_defs)
}
let dir = tempdir().unwrap();
let path = dir.path();
let data: &[u8] = &[b'x'; 100];
let n = 10;
for _i in 0..n {
let mut rotate1 = open_opts(300).open(path).unwrap();
rotate1.hook_after_log_sync = Some({
let path = path.to_path_buf();
Box::new(move || {
let rotate2 = open_opts(100).open(&path).unwrap();
let _all = rotate2.iter().collect::<Result<Vec<_>, _>>().unwrap();
})
});
rotate1.append(data).unwrap();
rotate1.sync().unwrap();
}
let rotate1 = open_opts(300).open(path).unwrap();
let mut count = 0;
for entry in rotate1.lookup(0, b"x" as &[u8]).unwrap() {
let entry = entry.unwrap();
assert_eq!(entry, data);
count += 1;
}
assert_eq!(count, n);
}
#[test]
fn test_reindex_old_logs() {
let dir = tempdir().unwrap();
let opts = OpenOptions::new()
.max_bytes_per_log(10)
.max_log_count(10)
.create(true);
let mut rotate = opts.clone().create(true).open(&dir).unwrap();
for i in 0..2u8 {
rotate.append(vec![i; 50]).unwrap();
rotate.sync().unwrap(); }
let opts = opts.index("a", |_data| vec![IndexOutput::Reference(0..1)]);
let rotate = opts.clone().create(true).open(&dir).unwrap();
assert!(!dir.path().join("1/index2-a").exists());
assert!(!dir.path().join("0/index2-a").exists());
let mut iter = rotate.lookup(0, b"\x00".to_vec()).unwrap();
assert!(!dir.path().join("1/index2-a").exists());
assert_eq!(iter.nth(0).unwrap().unwrap(), &[0; 50][..]);
assert!(dir.path().join("1/index2-a").exists());
assert!(dir.path().join("0/index2-a").exists());
}
#[test]
fn test_repair_latest() {
assert_eq!(guess_latest(vec![]), 0);
assert_eq!(guess_latest(vec![3, 4, 5]), 5);
assert_eq!(guess_latest(vec![0, 1, 2, 254, 255]), 2);
assert_eq!(guess_latest((0..=255).collect::<Vec<_>>()), 0);
let dir = tempdir().unwrap();
let opts = OpenOptions::new().max_bytes_per_log(100).max_log_count(10);
let mut rotate = opts.clone().create(true).open(&dir).unwrap();
for i in 1..=2 {
rotate.append(vec![b'x'; 200]).unwrap();
assert_eq!(rotate.sync().unwrap(), i);
}
let latest_path = dir.path().join(LATEST_FILE);
utils::atomic_write(&latest_path, "NaN", false).unwrap();
assert!(opts.open(&dir).is_err());
assert_eq!(
opts.repair(&dir)
.unwrap()
.lines()
.filter(|l| !l.contains("Processing"))
.collect::<Vec<_>>()
.join("\n"),
r#"Attempt to repair log "0"
Verified 1 entries, 223 bytes in log
Attempt to repair log "1"
Verified 1 entries, 223 bytes in log
Attempt to repair log "2"
Verified 0 entries, 12 bytes in log
Reset latest to 2"#
);
opts.open(&dir).unwrap();
fs::remove_file(dir.path().join(LATEST_FILE)).unwrap();
assert!(opts.open(&dir).is_err());
assert_eq!(
opts.repair(&dir)
.unwrap()
.lines()
.filter(|l| !l.contains("Processing"))
.collect::<Vec<_>>()
.join("\n"),
r#"Attempt to repair log "0"
Verified 1 entries, 223 bytes in log
Attempt to repair log "1"
Verified 1 entries, 223 bytes in log
Attempt to repair log "2"
Verified 0 entries, 12 bytes in log
Reset latest to 2"#
);
opts.open(&dir).unwrap();
}
#[test]
fn test_load_broken_logs_once() {
let dir = tempdir().unwrap();
let open_opts = OpenOptions::new()
.create(true)
.max_log_count(10)
.max_bytes_per_log(100);
let mut log = open_opts.open(dir.path()).unwrap();
for i in 0..4 {
log.append(&[i; 200][..]).unwrap();
log.sync().unwrap();
}
utils::atomic_write(dir.path().join("1").join("meta"), "foo", false).unwrap();
let log = open_opts.open(dir.path()).unwrap();
assert!(log.load_log(3).is_err()); assert!(log.load_log(3).is_ok());
assert_eq!(
log.iter().map(|i| i.unwrap()[0]).collect::<Vec<_>>(),
[2, 3]
);
}
#[test]
fn test_multithread_sync() {
let dir = tempdir().unwrap();
const THREAD_COUNT: u8 = if cfg!(debug_assertions) { 10 } else { 30 };
const WRITE_COUNT_PER_THREAD: u8 = if cfg!(debug_assertions) { 10 } else { 50 };
fn index_ref(data: &[u8]) -> Vec<IndexOutput> {
vec![IndexOutput::Reference(0..data.len() as u64)]
}
fn index_copy(data: &[u8]) -> Vec<IndexOutput> {
vec![IndexOutput::Owned(data.to_vec().into_boxed_slice())]
}
let indexes = vec![
IndexDef::new("key1", index_ref).lag_threshold(1),
IndexDef::new("key2", index_ref).lag_threshold(50),
IndexDef::new("key3", index_ref).lag_threshold(1000),
IndexDef::new("key4", index_copy).lag_threshold(1),
IndexDef::new("key5", index_copy).lag_threshold(50),
IndexDef::new("key6", index_copy).lag_threshold(1000),
];
let index_len = indexes.len();
let open_opts = OpenOptions::new()
.create(true)
.max_log_count(200)
.max_bytes_per_log(200)
.index_defs(indexes);
use std::sync::Arc;
use std::sync::Barrier;
let barrier = Arc::new(Barrier::new(THREAD_COUNT as usize));
let threads: Vec<_> = (0..THREAD_COUNT)
.map(|i| {
let barrier = barrier.clone();
let open_opts = open_opts.clone();
let path = dir.path().to_path_buf();
std::thread::spawn(move || {
barrier.wait();
let mut log = open_opts.open(path).unwrap();
for j in 1..=WRITE_COUNT_PER_THREAD {
let buf = [i, j];
log.append(&buf).unwrap();
if j % (i + 1) == 0 || j == WRITE_COUNT_PER_THREAD {
log.sync().unwrap();
for entry in log.iter().map(|d| d.unwrap()) {
for index_id in 0..index_len {
for index_value in log.lookup(index_id, entry.to_vec()).unwrap()
{
assert_eq!(index_value.unwrap(), entry);
}
}
}
}
}
})
})
.collect();
for thread in threads {
thread.join().expect("joined");
}
let log = open_opts.open(dir.path()).unwrap();
let count = log.iter().count() as u64;
assert_eq!(count, THREAD_COUNT as u64 * WRITE_COUNT_PER_THREAD as u64);
}
}