use std::collections::BTreeMap;
use std::collections::HashMap;
use std::io;
use std::mem;
use std::ops;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use vlqencoding::VLQDecode;
use vlqencoding::VLQEncode;
use crate::errors::IoResultExt;
use crate::errors::ResultExt;
use crate::lock::ScopedDirLock;
use crate::lock::READER_LOCK_OPTS;
use crate::log;
use crate::log::GenericPath;
use crate::log::LogMetadata;
use crate::repair::OpenOptionsOutput;
use crate::repair::OpenOptionsRepair;
use crate::repair::RepairMessage;
use crate::utils;
use crate::utils::rand_u64;
#[derive(Clone, Default)]
pub struct OpenOptions {
name_open_options: Vec<(&'static str, log::OpenOptions)>,
leacy_multimeta_source: bool,
}
pub struct MultiLog {
path: PathBuf,
multimeta: MultiMeta,
logs: Vec<log::Log>,
multimeta_log: log::Log,
leacy_multimeta_source: bool,
reader_lock: ScopedDirLock,
}
const INDEX_REVERSE_KEY: &[u8] = b"r";
const INDEX_REVERSE: usize = 0;
#[derive(Debug)]
pub struct MultiMeta {
metas: BTreeMap<String, Arc<Mutex<LogMetadata>>>,
version: (u64, u64),
}
impl OpenOptions {
pub fn from_name_opts(name_opts: Vec<(&'static str, log::OpenOptions)>) -> Self {
for (name, _) in &name_opts {
if name == &"multimeta" {
panic!("MultiLog: cannot use 'multimeta' as Log name");
} else if name.contains('/') || name.contains('\\') {
panic!("MultiLog: cannot use '/' or '\\' in Log name");
}
}
Self {
name_open_options: name_opts,
leacy_multimeta_source: false,
}
}
pub fn open(&self, path: &Path) -> crate::Result<MultiLog> {
let result: crate::Result<_> = (|| {
let reader_lock = ScopedDirLock::new_with_options(path, &READER_LOCK_OPTS)?;
let meta_log_path = multi_meta_log_path(path);
let meta_path = multi_meta_path(path);
let mut multimeta_log = multi_meta_log_open_options().open(meta_log_path)?;
let multimeta_log_is_empty = multimeta_log.iter().next().is_none();
let mut multimeta = MultiMeta::default();
if multimeta_log_is_empty || self.leacy_multimeta_source {
multimeta.read_file(&meta_path)?;
} else {
multimeta.read_log(&multimeta_log)?;
apply_legacy_meta_if_it_is_newer(&meta_path, &mut multimeta);
}
let locked = if !multimeta_log_is_empty
&& self
.name_open_options
.iter()
.all(|(name, _)| multimeta.metas.contains_key(AsRef::<str>::as_ref(name)))
{
None
} else {
utils::mkdir_p(path)?;
Some(LockGuard(ScopedDirLock::new(path)?))
};
let mut logs = Vec::with_capacity(self.name_open_options.len());
for (name, opts) in self.name_open_options.iter() {
let fspath = path.join(name);
let name_ref: &str = name;
if !multimeta.metas.contains_key(name_ref) {
utils::mkdir_p(&fspath)?;
let meta = log::Log::load_or_create_meta(&fspath.as_path().into(), true)?;
let meta = Arc::new(Mutex::new(meta));
multimeta.metas.insert(name.to_string(), meta);
}
let path = GenericPath::SharedMeta {
path: Box::new(fspath.as_path().into()),
meta: multimeta.metas[name_ref].clone(),
};
let log = opts.open(path)?;
logs.push(log);
}
if let Some(locked) = locked.as_ref() {
if !self.leacy_multimeta_source {
multimeta.write_log(&mut multimeta_log, locked)?;
}
multimeta.write_file(&meta_path)?;
}
Ok(MultiLog {
path: path.to_path_buf(),
logs,
multimeta,
multimeta_log,
leacy_multimeta_source: self.leacy_multimeta_source,
reader_lock,
})
})();
result.context("in multi::OpenOptions::open")
}
}
impl MultiLog {
pub fn lock(&mut self) -> crate::Result<LockGuard> {
let result: crate::Result<_> = (|| {
let lock = LockGuard(ScopedDirLock::new(&self.path)?);
self.read_meta(&lock)?;
Ok(lock)
})();
result.context("in MultiLog::lock")
}
pub fn write_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
if lock.0.path() != self.path {
let msg = format!(
"Invalid lock used to write_meta (Lock path = {:?}, MultiLog path = {:?})",
lock.0.path(),
&self.path
);
return Err(crate::Error::programming(msg));
}
let result: crate::Result<_> = (|| {
self.multimeta.bump_version();
if !self.leacy_multimeta_source {
self.multimeta.write_log(&mut self.multimeta_log, lock)?;
}
let meta_path = multi_meta_path(&self.path);
self.multimeta.write_file(meta_path)?;
Ok(())
})();
result.context("in MultiLog::write_meta")
}
pub fn version(&self) -> (u64, u64) {
self.multimeta.version
}
fn read_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
debug_assert_eq!(lock.0.path(), &self.path);
(|| -> crate::Result<()> {
let meta_path = multi_meta_path(&self.path);
if self.leacy_multimeta_source {
self.multimeta.read_file(&meta_path)?;
} else {
self.multimeta_log.clear_dirty()?;
self.multimeta_log.sync()?;
self.multimeta.read_log(&self.multimeta_log)?;
apply_legacy_meta_if_it_is_newer(&meta_path, &mut self.multimeta);
}
Ok(())
})()
.context("reloading multimeta")
}
pub fn detach_logs(&mut self) -> Vec<log::Log> {
let mut result = Vec::new();
mem::swap(&mut result, &mut self.logs);
result
}
fn sync(&mut self) -> crate::Result<()> {
let lock = self.lock()?;
for log in self.logs.iter_mut() {
log.sync()?;
}
self.write_meta(&lock)?;
Ok(())
}
}
fn apply_legacy_meta_if_it_is_newer(meta_path: &Path, multimeta: &mut MultiMeta) {
let mut maybe_new_multimeta = MultiMeta::default();
if maybe_new_multimeta.read_file(meta_path).is_ok() {
if maybe_new_multimeta.metas.iter().all(|(k, v)| {
v.lock().unwrap().primary_len
>= match multimeta.metas.get(k) {
None => 0,
Some(v) => v.lock().unwrap().primary_len,
}
}) {
for (k, v) in multimeta.metas.iter() {
let mut current = v.lock().unwrap();
if let Some(newer) = maybe_new_multimeta.metas.remove(k) {
let newer = newer.lock().unwrap();
current.primary_len = newer.primary_len;
current.indexes = newer.indexes.clone();
}
}
}
}
}
fn multi_meta_log_open_options() -> log::OpenOptions {
log::OpenOptions::new()
.index("reverse", |_data| -> Vec<_> {
vec![log::IndexOutput::Owned(
INDEX_REVERSE_KEY.to_vec().into_boxed_slice(),
)]
})
.create(true)
}
pub struct LockGuard(ScopedDirLock);
impl ops::Index<usize> for MultiLog {
type Output = log::Log;
fn index(&self, index: usize) -> &Self::Output {
&self.logs[index]
}
}
impl ops::IndexMut<usize> for MultiLog {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.logs[index]
}
}
impl OpenOptionsRepair for OpenOptions {
fn open_options_repair(&self, path: impl AsRef<Path>) -> crate::Result<String> {
let path = path.as_ref();
let lock = LockGuard(ScopedDirLock::new(path)?);
let mut out = RepairMessage::new(path);
let mpath = multi_meta_log_path(path);
out += "Repairing MultiMeta Log:\n";
out += &indent(&multi_meta_log_open_options().open_options_repair(&mpath)?);
let mut repaired_log_metas = HashMap::new();
for (name, opts) in self.name_open_options.iter() {
let fspath = path.join(name);
if !fspath.exists() {
out += &format!("Skipping non-existed Log {}\n", name);
continue;
}
out += &format!("Repairing Log {}\n", name);
out += &indent(&opts.open_options_repair(&fspath)?);
let log = opts.open(&fspath)?;
let len = log.meta.primary_len;
out += &format!("Log {} has valid length {} after repair\n", name, len);
repaired_log_metas.insert(*name, log.meta);
}
let mut mlog = multi_meta_log_open_options()
.open(&mpath)
.context("repair cannot open MultiMeta Log after repairing it")?;
let mut selected_meta = None;
let mut invalid_count = 0;
for entry in mlog.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)? {
if let Ok(data) = entry {
let mut mmeta = MultiMeta::default();
if mmeta.read(data).is_ok() {
if mmeta.metas.iter().all(|(name, meta)| {
let len_required = meta.lock().unwrap().primary_len;
let len_provided = repaired_log_metas
.get(name.as_str())
.map(|m| m.primary_len)
.unwrap_or_default();
len_required <= len_provided
}) {
if invalid_count > 0 {
let mmeta_desc = mmeta
.metas
.iter()
.map(|(name, meta)| {
format!("{}: {}", name, meta.lock().unwrap().primary_len)
})
.collect::<Vec<_>>()
.join(", ");
out += &format!(
"Found valid MultiMeta after {} invalid entries: {}\n",
invalid_count, mmeta_desc
);
}
selected_meta = Some(mmeta);
break;
} else {
invalid_count += 1;
}
}
}
}
if selected_meta.is_none() {
let mut mmeta = MultiMeta::default();
if mmeta.read_file(&multi_meta_path(path)).is_ok() {
selected_meta = Some(mmeta);
}
}
let selected_meta = match selected_meta {
None => {
return Err(crate::Error::corruption(
&mpath,
"repair cannot find valid MultiMeta",
))
.context(|| format!("Repair log:\n{}", indent(out.as_str())));
}
Some(meta) => meta,
};
let mut should_write_new_meta_entry = invalid_count > 0;
for (name, log_meta) in selected_meta.metas.iter() {
let mut log_meta = log_meta.lock().unwrap();
let should_invalidate_indexes = match repaired_log_metas.get(name.as_str()) {
None => true,
Some(repaired_log_meta) => &*log_meta != repaired_log_meta,
};
if should_invalidate_indexes {
out += &format!("Invalidated indexes in log '{}'\n", name);
log_meta.indexes.clear();
should_write_new_meta_entry = true;
}
}
if should_write_new_meta_entry {
selected_meta
.write_log(&mut mlog, &lock)
.context("repair cannot write MultiMeta log")?;
selected_meta
.write_file(multi_meta_path(path))
.context("repair cannot write valid MultiMeta file")?;
out += "Write valid MultiMeta\n";
} else {
out += "MultiMeta is valid\n";
}
Ok(out.into_string())
}
}
impl OpenOptionsOutput for OpenOptions {
type Output = MultiLog;
fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
self.open(path)
}
}
fn multi_meta_path(dir: &Path) -> PathBuf {
dir.join("multimeta")
}
fn multi_meta_log_path(dir: &Path) -> PathBuf {
dir.join("multimetalog")
}
fn indent(s: &str) -> String {
s.lines()
.map(|l| format!(" {}\n", l))
.collect::<Vec<_>>()
.concat()
}
impl Default for MultiMeta {
fn default() -> Self {
Self {
metas: Default::default(),
version: (rand_u64(), 0),
}
}
}
impl MultiMeta {
fn read(&mut self, mut reader: impl io::Read) -> io::Result<()> {
let format_version: usize = reader.read_vlq()?;
if format_version != 0 {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("MultiMeta format {} is unsupported", format_version),
));
}
let count: usize = reader.read_vlq()?;
for _ in 0..count {
let name_len = reader.read_vlq()?;
let mut name_buf = vec![0; name_len];
reader.read_exact(&mut name_buf)?;
let name = String::from_utf8(name_buf)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Log name is not utf-8"))?;
let meta = LogMetadata::read(&mut reader)?;
self.metas
.entry(name.to_string())
.and_modify(|e| {
let mut e = e.lock().unwrap();
let truncated = e.primary_len > meta.primary_len && e.epoch == meta.epoch;
*e = meta.clone();
if truncated {
e.epoch = e.epoch.wrapping_add(1);
}
})
.or_insert_with(|| Arc::new(Mutex::new(meta.clone())));
}
let version_major: u64 = reader.read_vlq().unwrap_or_else(|_| rand_u64());
let version_minor: u64 = reader.read_vlq().unwrap_or_default();
self.version = (version_major, version_minor);
Ok(())
}
fn write(&self, mut writer: impl io::Write) -> io::Result<()> {
let version = 0;
writer.write_vlq(version)?;
writer.write_vlq(self.metas.len())?;
for (name, meta) in self.metas.iter() {
writer.write_vlq(name.len())?;
writer.write_all(name.as_bytes())?;
meta.lock().unwrap().write(&mut writer)?;
}
writer.write_vlq(self.version.0)?;
writer.write_vlq(self.version.1)?;
Ok(())
}
fn read_file<P: AsRef<Path>>(&mut self, path: P) -> crate::Result<()> {
let path = path.as_ref();
match utils::atomic_read(path) {
Ok(buf) => self.read(&buf[..]),
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
Err(e) => Err(e),
}
.context(path, "when decoding MultiMeta")
}
fn write_file<P: AsRef<Path>>(&self, path: P) -> crate::Result<()> {
let mut buf = Vec::new();
self.write(&mut buf).infallible()?;
utils::atomic_write(path, &buf, false)?;
Ok(())
}
fn read_log(&mut self, log: &log::Log) -> crate::Result<()> {
if let Some(last_entry) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
let data = last_entry?;
self.read(data).context(
log.path().as_opt_path().unwrap_or_else(|| Path::new("")),
"when decoding MutltiMeta",
)?;
}
Ok(())
}
fn write_log(&self, log: &mut log::Log, _lock: &LockGuard) -> crate::Result<()> {
let mut data = Vec::new();
self.write(&mut data).infallible()?;
log.clear_dirty()?;
log.sync()?;
if let Some(Ok(last_data)) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
if last_data == &data {
return Ok(());
}
}
log.append(&data)?;
log.sync()?;
Ok(())
}
fn bump_version(&mut self) {
self.version.1 += 1;
}
}
#[cfg(test)]
mod tests {
use log::tests::pwrite;
use quickcheck::quickcheck;
use super::*;
fn simple_open_opts() -> OpenOptions {
OpenOptions::from_name_opts(vec![
("a", log::OpenOptions::new()),
("b", log::OpenOptions::new()),
])
}
fn simple_multilog(path: &Path) -> MultiLog {
let mopts = simple_open_opts();
mopts.open(path).unwrap()
}
fn index_open_opts() -> OpenOptions {
fn index_func(bytes: &[u8]) -> Vec<log::IndexOutput> {
(0..bytes.len() as u64)
.map(|i| log::IndexOutput::Reference(i..i + 1))
.collect()
}
let index_def = log::IndexDef::new("x", index_func).lag_threshold(0);
OpenOptions::from_name_opts(vec![(
"a",
log::OpenOptions::new().index_defs(vec![index_def]),
)])
}
#[test]
fn test_individual_log_can_be_opened_directly() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog = simple_multilog(path);
log::OpenOptions::new().open(path.join("a")).unwrap();
log::OpenOptions::new().open(path.join("b")).unwrap();
mlog[0].append(b"1").unwrap();
mlog[0].flush().unwrap();
log::OpenOptions::new().open(path.join("a")).unwrap();
}
#[test]
fn test_individual_log_flushes_are_invisible() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog = simple_multilog(path);
mlog[0].append(b"2").unwrap();
mlog[0].sync().unwrap();
mlog[0].append(b"3").unwrap();
mlog[0].append(b"4").unwrap();
mlog[1].append(b"y").unwrap();
mlog[1].sync().unwrap();
mlog[1].append(b"z").unwrap();
mlog[1].sync().unwrap();
assert_eq!(mlog[0].iter().count(), 3);
assert_eq!(mlog[1].iter().count(), 2);
let mlog2 = simple_multilog(path);
assert_eq!(mlog2[0].iter().count(), 0);
assert_eq!(mlog2[1].iter().count(), 0);
mlog.sync().unwrap();
assert_eq!(mlog[0].iter().count(), 2);
assert_eq!(mlog[1].iter().count(), 0);
let mlog2 = simple_multilog(path);
assert_eq!(mlog2[0].iter().count(), 2);
assert_eq!(mlog2[1].iter().count(), 0);
}
#[test]
fn test_version() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog1 = simple_multilog(&path.join("1"));
let mut mlog2 = simple_multilog(&path.join("2"));
let v1 = mlog1.version();
let v2 = mlog2.version();
assert!(v1.1 == 0);
assert!(v2.1 == 0);
assert_ne!(v1, v2);
mlog1.sync().unwrap();
mlog2.sync().unwrap();
let v3 = mlog1.version();
let v4 = mlog2.version();
assert_eq!(v3.0, v1.0);
assert_eq!(v4.0, v2.0);
assert!(v3 > v1);
assert!(v4 > v2);
let mlog1 = simple_multilog(&path.join("1"));
let mlog2 = simple_multilog(&path.join("2"));
let v5 = mlog1.version();
let v6 = mlog2.version();
assert_eq!(v5, v3);
assert_eq!(v6, v4);
}
#[test]
fn test_detach_logs() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog = simple_multilog(path);
let mut logs = mlog.detach_logs();
logs[0].append(b"0").unwrap();
logs[1].append(b"1").unwrap();
let lock = mlog.lock().unwrap();
logs[0].sync().unwrap();
logs[1].sync().unwrap();
mlog.write_meta(&lock).unwrap();
drop(lock);
let mlog2 = simple_multilog(path);
assert_eq!(mlog2[0].iter().count(), 1);
assert_eq!(mlog2[1].iter().count(), 1);
}
#[test]
fn test_new_index_built_only_once() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mopts = OpenOptions::from_name_opts(vec![("a", log::OpenOptions::new())]);
let mut mlog = mopts.open(path).unwrap();
mlog[0].append(b"0").unwrap();
mlog.sync().unwrap();
let index_def =
log::IndexDef::new("i", |_| vec![log::IndexOutput::Reference(0..1)]).lag_threshold(0);
let mopts = OpenOptions::from_name_opts(vec![(
"a",
log::OpenOptions::new().index_defs(vec![index_def.clone()]),
)]);
let index_size = || {
path.join("a")
.join(index_def.filename())
.metadata()
.map(|m| m.len())
.unwrap_or_default()
};
assert_eq!(index_size(), 0);
let _mlog = mopts.open(path).unwrap();
assert_eq!(index_size(), 36);
let mut mlog = mopts.open(path).unwrap();
assert_eq!(index_size(), 36);
let lock = LockGuard(ScopedDirLock::new(path).unwrap());
mlog.multimeta.metas["a"].lock().unwrap().epoch ^= 1;
mlog.multimeta
.write_log(&mut mlog.multimeta_log, &lock)
.unwrap();
mlog.multimeta.write_file(multi_meta_path(path)).unwrap();
drop(lock);
let _mlog = mopts.open(path).unwrap();
assert_eq!(index_size(), 71);
}
#[test]
fn test_wrong_locks_cause_errors() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog1 = simple_multilog(&path.join("1"));
let mut mlog2 = simple_multilog(&path.join("2"));
let lock1 = mlog1.lock().unwrap();
let lock2 = mlog2.lock().unwrap();
assert!(mlog1.write_meta(&lock2).is_err());
assert!(mlog2.write_meta(&lock1).is_err());
}
fn repair_output(opts: &OpenOptions, path: &Path) -> String {
let out = opts.open_options_repair(path).unwrap();
filter_repair_output(out)
}
fn filter_repair_output(out: String) -> String {
out.lines()
.filter(|l| {
!l.contains("bytes in log")
&& !l.contains("Backed up")
&& !l.contains("Processing")
&& !l.contains("date -d")
})
.collect::<Vec<_>>()
.join("\n")
}
#[test]
fn test_repair() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let opts = simple_open_opts();
let mut mlog = opts.open(path).unwrap();
let mut logs = mlog.detach_logs();
const N: usize = 12;
for i in 0..10u32 {
let lock = mlog.lock().unwrap();
for _ in 0..N {
logs[0].append(i.to_be_bytes()).unwrap();
logs[1].append(i.to_be_bytes()).unwrap();
logs[0].sync().unwrap();
}
logs[1].sync().unwrap();
mlog.write_meta(&lock).unwrap();
}
let repair = || repair_output(&opts, path);
let verify = || {
let mlog = opts.open(path).unwrap();
assert_eq!(mlog.logs[0].iter().count() % N, 0);
assert_eq!(mlog.logs[1].iter().count() % N, 0);
};
let s1 = repair();
assert_eq!(
&s1,
r#"Repairing MultiMeta Log:
Index "reverse" passed integrity check
Repairing Log a
Log a has valid length 1212 after repair
Repairing Log b
Log b has valid length 1212 after repair
MultiMeta is valid"#
);
let s2 = filter_repair_output(std::fs::read_to_string(path.join("repair.log")).unwrap());
assert_eq!(&s1, s2.trim_end());
pwrite(&path.join("a").join("log"), 1000, b"ff");
assert_eq!(
repair(),
r#"Repairing MultiMeta Log:
Index "reverse" passed integrity check
Repairing Log a
Reset log size to 992
Log a has valid length 992 after repair
Repairing Log b
Log b has valid length 1212 after repair
Found valid MultiMeta after 2 invalid entries: a: 972, b: 972
Invalidated indexes in log 'a'
Invalidated indexes in log 'b'
Write valid MultiMeta"#
);
verify();
assert_eq!(
repair(),
r#"Repairing MultiMeta Log:
Index "reverse" passed integrity check
Repairing Log a
Log a has valid length 992 after repair
Repairing Log b
Log b has valid length 1212 after repair
Invalidated indexes in log 'a'
Invalidated indexes in log 'b'
Write valid MultiMeta"#
);
}
#[test]
fn test_repair_broken_index() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let opts = index_open_opts();
let mut mlog = opts.open(path).unwrap();
let mut logs = mlog.detach_logs();
let repair = || repair_output(&opts, path);
let file_size = |path| std::fs::metadata(path).unwrap().len();
let meta_path = multi_meta_path(path);
let meta_log_path = multi_meta_log_path(path).join("log");
let index_path = path.join("a").join("index2-x");
let mut meta_log_sizes = Vec::new();
let mut index_sizes = Vec::new();
for data in [b"abcd", b"abce", b"acde", b"bcde"] {
let lock = mlog.lock().unwrap();
logs[0].append(data).unwrap();
logs[0].sync().unwrap();
mlog.write_meta(&lock).unwrap();
meta_log_sizes.push(file_size(&meta_log_path));
index_sizes.push(file_size(&index_path));
}
drop(mlog);
drop(logs);
pwrite(&index_path, -4, b"ffff");
pwrite(&meta_log_path, (meta_log_sizes[1] - 5) as _, b"xxxxx");
std::fs::remove_file(meta_path).unwrap();
let index_len_before = file_size(&index_path);
assert_eq!(
repair(),
r#"Repairing MultiMeta Log:
Reset log size to 111
Rebuilt index "reverse"
Repairing Log a
Rebuilt index "x"
Log a has valid length 52 after repair
Invalidated indexes in log 'a'
Write valid MultiMeta"#
);
let index_len_after = file_size(&index_path);
assert!(index_len_before > index_len_after);
opts.open(path).map(|_| 1).unwrap();
}
#[test]
fn test_mixed_old_new_read_writes() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut mlog_new = simple_open_opts().open(path).unwrap();
let mut logs_new = mlog_new.detach_logs();
let mut mlog_old = {
let mut opts = simple_open_opts();
opts.leacy_multimeta_source = true;
opts.open(path).unwrap()
};
let mut logs_old = mlog_old.detach_logs();
const N: usize = 2;
for i in 0..N {
for (mlog, logs, j) in [
(&mut mlog_new, &mut logs_new, 0u8),
(&mut mlog_old, &mut logs_old, 1u8),
] {
let lock = mlog.lock().unwrap();
logs[0].append([i as u8, j]).unwrap();
logs[0].sync().unwrap();
mlog.write_meta(&lock).unwrap();
}
}
let mlog = simple_open_opts().open(path).unwrap();
assert_eq!(
mlog.logs[0].iter().map(|e| e.unwrap()).collect::<Vec<_>>(),
[[0, 0], [0, 1], [1, 0], [1, 1]],
);
}
quickcheck! {
fn test_roundtrip_multimeta(name_len_list: Vec<(String, u64)>, version: (u64, u64)) -> bool {
let metas = name_len_list
.into_iter()
.map(|(name, len)| {
let meta = LogMetadata::new_with_primary_len(len);
(name, Arc::new(Mutex::new(meta)))
})
.collect();
let meta = MultiMeta { metas, version, ..Default::default() };
let mut buf = Vec::new();
meta.write(&mut buf).unwrap();
let mut meta2 = MultiMeta::default();
meta2.read(&buf[..]).unwrap();
let mut buf2 = Vec::new();
meta2.write(&mut buf2).unwrap();
assert_eq!(buf2, buf);
buf2 == buf
}
fn test_roundtrip_multilog(list_a: Vec<Vec<u8>>, list_b: Vec<Vec<u8>>) -> bool {
let dir = tempfile::tempdir().unwrap();
let mut mlog = simple_multilog(dir.path());
for a in &list_a {
mlog[0].append(a).unwrap();
}
for b in &list_b {
mlog[1].append(b).unwrap();
}
mlog.sync().unwrap();
let mlog_read = simple_multilog(dir.path());
let list_a_read: Vec<Vec<u8>> = mlog_read[0].iter().map(|e| e.unwrap().to_vec()).collect();
let list_b_read: Vec<Vec<u8>> = mlog_read[1].iter().map(|e| e.unwrap().to_vec()).collect();
list_a == list_a_read && list_b == list_b_read
}
}
}