1use std::collections::BTreeMap;
11use std::collections::HashMap;
12use std::io;
13use std::mem;
14use std::ops;
15use std::path::Path;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::Mutex;
19
20use vlqencoding::VLQDecode;
21use vlqencoding::VLQEncode;
22
23use crate::errors::IoResultExt;
24use crate::errors::ResultExt;
25use crate::lock::ScopedDirLock;
26use crate::lock::READER_LOCK_OPTS;
27use crate::log;
28use crate::log::GenericPath;
29use crate::log::LogMetadata;
30use crate::repair::OpenOptionsOutput;
31use crate::repair::OpenOptionsRepair;
32use crate::repair::RepairMessage;
33use crate::utils;
34use crate::utils::rand_u64;
35
36#[derive(Clone, Default)]
38pub struct OpenOptions {
39 name_open_options: Vec<(&'static str, log::OpenOptions)>,
41
42 leacy_multimeta_source: bool,
46}
47
48pub struct MultiLog {
65 path: PathBuf,
68
69 multimeta: MultiMeta,
71
72 logs: Vec<log::Log>,
74
75 multimeta_log: log::Log,
77
78 leacy_multimeta_source: bool,
82
83 reader_lock: ScopedDirLock,
85}
86
87const INDEX_REVERSE_KEY: &[u8] = b"r";
89
90const INDEX_REVERSE: usize = 0;
92
93#[derive(Debug)]
94pub struct MultiMeta {
95 metas: BTreeMap<String, Arc<Mutex<LogMetadata>>>,
96
97 version: (u64, u64),
101}
102
103impl OpenOptions {
104 pub fn from_name_opts(name_opts: Vec<(&'static str, log::OpenOptions)>) -> Self {
106 for (name, _) in &name_opts {
108 if name == &"multimeta" {
109 panic!("MultiLog: cannot use 'multimeta' as Log name");
110 } else if name.contains('/') || name.contains('\\') {
111 panic!("MultiLog: cannot use '/' or '\\' in Log name");
112 }
113 }
114 Self {
115 name_open_options: name_opts,
116 leacy_multimeta_source: false,
117 }
118 }
119
120 pub fn open(&self, path: &Path) -> crate::Result<MultiLog> {
125 let result: crate::Result<_> = (|| {
126 let reader_lock = ScopedDirLock::new_with_options(path, &READER_LOCK_OPTS)?;
127
128 let meta_log_path = multi_meta_log_path(path);
131 let meta_path = multi_meta_path(path);
132 let mut multimeta_log = multi_meta_log_open_options().open(meta_log_path)?;
133 let multimeta_log_is_empty = multimeta_log.iter().next().is_none();
134
135 let mut multimeta = MultiMeta::default();
137 if multimeta_log_is_empty || self.leacy_multimeta_source {
138 multimeta.read_file(&meta_path)?;
141 } else {
142 multimeta.read_log(&multimeta_log)?;
144 apply_legacy_meta_if_it_is_newer(&meta_path, &mut multimeta);
145 }
146
147 let locked = if !multimeta_log_is_empty
148 && self
149 .name_open_options
150 .iter()
151 .all(|(name, _)| multimeta.metas.contains_key(AsRef::<str>::as_ref(name)))
152 {
153 None
155 } else {
156 utils::mkdir_p(path)?;
158 Some(LockGuard(ScopedDirLock::new(path)?))
159 };
160
161 let mut logs = Vec::with_capacity(self.name_open_options.len());
162 for (name, opts) in self.name_open_options.iter() {
163 let fspath = path.join(name);
164 let name_ref: &str = name;
165 if !multimeta.metas.contains_key(name_ref) {
166 utils::mkdir_p(&fspath)?;
168 let meta = log::Log::load_or_create_meta(&fspath.as_path().into(), true)?;
169 let meta = Arc::new(Mutex::new(meta));
170 multimeta.metas.insert(name.to_string(), meta);
171 }
172 let path = GenericPath::SharedMeta {
173 path: Box::new(fspath.as_path().into()),
174 meta: multimeta.metas[name_ref].clone(),
175 };
176 let log = opts.open(path)?;
177 logs.push(log);
178 }
179
180 if let Some(locked) = locked.as_ref() {
181 if !self.leacy_multimeta_source {
182 multimeta.write_log(&mut multimeta_log, locked)?;
183 }
184 multimeta.write_file(&meta_path)?;
185 }
186
187 Ok(MultiLog {
188 path: path.to_path_buf(),
189 logs,
190 multimeta,
191 multimeta_log,
192 leacy_multimeta_source: self.leacy_multimeta_source,
193 reader_lock,
194 })
195 })();
196
197 result.context("in multi::OpenOptions::open")
198 }
199}
200
201impl MultiLog {
202 pub fn lock(&mut self) -> crate::Result<LockGuard> {
210 let result: crate::Result<_> = (|| {
211 let lock = LockGuard(ScopedDirLock::new(&self.path)?);
212 self.read_meta(&lock)?;
213 Ok(lock)
214 })();
215 result.context("in MultiLog::lock")
216 }
217
218 pub fn write_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
223 if lock.0.path() != self.path {
224 let msg = format!(
225 "Invalid lock used to write_meta (Lock path = {:?}, MultiLog path = {:?})",
226 lock.0.path(),
227 &self.path
228 );
229 return Err(crate::Error::programming(msg));
230 }
231 let result: crate::Result<_> = (|| {
232 self.multimeta.bump_version();
233 if !self.leacy_multimeta_source {
234 self.multimeta.write_log(&mut self.multimeta_log, lock)?;
236 }
237
238 let meta_path = multi_meta_path(&self.path);
240 self.multimeta.write_file(meta_path)?;
241
242 Ok(())
243 })();
244 result.context("in MultiLog::write_meta")
245 }
246
247 pub fn version(&self) -> (u64, u64) {
257 self.multimeta.version
258 }
259
260 fn read_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
265 debug_assert_eq!(lock.0.path(), &self.path);
266 (|| -> crate::Result<()> {
267 let meta_path = multi_meta_path(&self.path);
268 if self.leacy_multimeta_source {
269 self.multimeta.read_file(&meta_path)?;
270 } else {
271 self.multimeta_log.clear_dirty()?;
272 self.multimeta_log.sync()?;
273 self.multimeta.read_log(&self.multimeta_log)?;
274 apply_legacy_meta_if_it_is_newer(&meta_path, &mut self.multimeta);
275 }
276 Ok(())
277 })()
278 .context("reloading multimeta")
279 }
280
281 pub fn detach_logs(&mut self) -> Vec<log::Log> {
289 let mut result = Vec::new();
290 mem::swap(&mut result, &mut self.logs);
291 result
292 }
293
294 fn sync(&mut self) -> crate::Result<()> {
303 let lock = self.lock()?;
304 for log in self.logs.iter_mut() {
305 log.sync()?;
306 }
307 self.write_meta(&lock)?;
308 Ok(())
309 }
310}
311
312fn apply_legacy_meta_if_it_is_newer(meta_path: &Path, multimeta: &mut MultiMeta) {
313 let mut maybe_new_multimeta = MultiMeta::default();
316 if maybe_new_multimeta.read_file(meta_path).is_ok() {
317 if maybe_new_multimeta.metas.iter().all(|(k, v)| {
318 v.lock().unwrap().primary_len
319 >= match multimeta.metas.get(k) {
320 None => 0,
321 Some(v) => v.lock().unwrap().primary_len,
322 }
323 }) {
324 for (k, v) in multimeta.metas.iter() {
327 let mut current = v.lock().unwrap();
328 if let Some(newer) = maybe_new_multimeta.metas.remove(k) {
329 let newer = newer.lock().unwrap();
330 current.primary_len = newer.primary_len;
331 current.indexes = newer.indexes.clone();
332 }
333 }
334 }
335 }
336}
337
338fn multi_meta_log_open_options() -> log::OpenOptions {
339 log::OpenOptions::new()
340 .index("reverse", |_data| -> Vec<_> {
341 vec![log::IndexOutput::Owned(
343 INDEX_REVERSE_KEY.to_vec().into_boxed_slice(),
344 )]
345 })
346 .create(true)
347}
348
349pub struct LockGuard(ScopedDirLock);
351
352impl ops::Index<usize> for MultiLog {
353 type Output = log::Log;
354 fn index(&self, index: usize) -> &Self::Output {
355 &self.logs[index]
356 }
357}
358
359impl ops::IndexMut<usize> for MultiLog {
360 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
361 &mut self.logs[index]
362 }
363}
364
365impl OpenOptionsRepair for OpenOptions {
366 fn open_options_repair(&self, path: impl AsRef<Path>) -> crate::Result<String> {
367 let path = path.as_ref();
368 let lock = LockGuard(ScopedDirLock::new(path)?);
369 let mut out = RepairMessage::new(path);
370
371 let mpath = multi_meta_log_path(path);
373 out += "Repairing MultiMeta Log:\n";
374 out += &indent(&multi_meta_log_open_options().open_options_repair(&mpath)?);
375
376 let mut repaired_log_metas = HashMap::new();
378 for (name, opts) in self.name_open_options.iter() {
379 let fspath = path.join(name);
380 if !fspath.exists() {
381 out += &format!("Skipping non-existed Log {}\n", name);
382 continue;
383 }
384 out += &format!("Repairing Log {}\n", name);
385 out += &indent(&opts.open_options_repair(&fspath)?);
386 let log = opts.open(&fspath)?;
387 let len = log.meta.primary_len;
388 out += &format!("Log {} has valid length {} after repair\n", name, len);
389 repaired_log_metas.insert(*name, log.meta);
390 }
391
392 let mut mlog = multi_meta_log_open_options()
394 .open(&mpath)
395 .context("repair cannot open MultiMeta Log after repairing it")?;
396 let mut selected_meta = None;
397 let mut invalid_count = 0;
398 for entry in mlog.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)? {
399 if let Ok(data) = entry {
402 let mut mmeta = MultiMeta::default();
403 if mmeta.read(data).is_ok() {
404 if mmeta.metas.iter().all(|(name, meta)| {
406 let len_required = meta.lock().unwrap().primary_len;
407 let len_provided = repaired_log_metas
408 .get(name.as_str())
409 .map(|m| m.primary_len)
410 .unwrap_or_default();
411 len_required <= len_provided
412 }) {
413 if invalid_count > 0 {
414 let mmeta_desc = mmeta
416 .metas
417 .iter()
418 .map(|(name, meta)| {
419 format!("{}: {}", name, meta.lock().unwrap().primary_len)
420 })
421 .collect::<Vec<_>>()
422 .join(", ");
423 out += &format!(
424 "Found valid MultiMeta after {} invalid entries: {}\n",
425 invalid_count, mmeta_desc
426 );
427 }
428 selected_meta = Some(mmeta);
429 break;
430 } else {
431 invalid_count += 1;
432 }
433 }
434 }
435 }
436
437 if selected_meta.is_none() {
438 let mut mmeta = MultiMeta::default();
440 if mmeta.read_file(&multi_meta_path(path)).is_ok() {
441 selected_meta = Some(mmeta);
442 }
443 }
444
445 let selected_meta = match selected_meta {
446 None => {
447 return Err(crate::Error::corruption(
448 &mpath,
449 "repair cannot find valid MultiMeta",
450 ))
451 .context(|| format!("Repair log:\n{}", indent(out.as_str())));
452 }
453 Some(meta) => meta,
454 };
455
456 let mut should_write_new_meta_entry = invalid_count > 0;
457 for (name, log_meta) in selected_meta.metas.iter() {
458 let mut log_meta = log_meta.lock().unwrap();
459 let should_invalidate_indexes = match repaired_log_metas.get(name.as_str()) {
460 None => true,
461 Some(repaired_log_meta) => &*log_meta != repaired_log_meta,
462 };
463 if should_invalidate_indexes {
464 out += &format!("Invalidated indexes in log '{}'\n", name);
465 log_meta.indexes.clear();
466 should_write_new_meta_entry = true;
467 }
468 }
469
470 if should_write_new_meta_entry {
471 selected_meta
472 .write_log(&mut mlog, &lock)
473 .context("repair cannot write MultiMeta log")?;
474 selected_meta
475 .write_file(multi_meta_path(path))
476 .context("repair cannot write valid MultiMeta file")?;
477 out += "Write valid MultiMeta\n";
478 } else {
479 out += "MultiMeta is valid\n";
480 }
481
482 Ok(out.into_string())
483 }
484}
485
486impl OpenOptionsOutput for OpenOptions {
487 type Output = MultiLog;
488
489 fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
490 self.open(path)
491 }
492}
493
494fn multi_meta_path(dir: &Path) -> PathBuf {
495 dir.join("multimeta")
496}
497
498fn multi_meta_log_path(dir: &Path) -> PathBuf {
499 dir.join("multimetalog")
500}
501
502fn indent(s: &str) -> String {
504 s.lines()
505 .map(|l| format!(" {}\n", l))
506 .collect::<Vec<_>>()
507 .concat()
508}
509
510impl Default for MultiMeta {
511 fn default() -> Self {
512 Self {
513 metas: Default::default(),
514 version: (rand_u64(), 0),
515 }
516 }
517}
518
519impl MultiMeta {
520 fn read(&mut self, mut reader: impl io::Read) -> io::Result<()> {
523 let format_version: usize = reader.read_vlq()?;
524 if format_version != 0 {
525 return Err(io::Error::new(
526 io::ErrorKind::Other,
527 format!("MultiMeta format {} is unsupported", format_version),
528 ));
529 }
530 let count: usize = reader.read_vlq()?;
531 for _ in 0..count {
532 let name_len = reader.read_vlq()?;
533 let mut name_buf = vec![0; name_len];
534 reader.read_exact(&mut name_buf)?;
535 let name = String::from_utf8(name_buf)
536 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Log name is not utf-8"))?;
537 let meta = LogMetadata::read(&mut reader)?;
538 self.metas
539 .entry(name.to_string())
540 .and_modify(|e| {
541 let mut e = e.lock().unwrap();
542 let truncated = e.primary_len > meta.primary_len && e.epoch == meta.epoch;
543 *e = meta.clone();
544 if truncated {
546 e.epoch = e.epoch.wrapping_add(1);
547 }
548 })
549 .or_insert_with(|| Arc::new(Mutex::new(meta.clone())));
550 }
551 let version_major: u64 = reader.read_vlq().unwrap_or_else(|_| rand_u64());
552 let version_minor: u64 = reader.read_vlq().unwrap_or_default();
553 self.version = (version_major, version_minor);
554 Ok(())
555 }
556
557 fn write(&self, mut writer: impl io::Write) -> io::Result<()> {
559 let version = 0;
560 writer.write_vlq(version)?;
561 writer.write_vlq(self.metas.len())?;
562 for (name, meta) in self.metas.iter() {
563 writer.write_vlq(name.len())?;
564 writer.write_all(name.as_bytes())?;
565 meta.lock().unwrap().write(&mut writer)?;
566 }
567 writer.write_vlq(self.version.0)?;
568 writer.write_vlq(self.version.1)?;
569 Ok(())
570 }
571
572 fn read_file<P: AsRef<Path>>(&mut self, path: P) -> crate::Result<()> {
575 let path = path.as_ref();
576 match utils::atomic_read(path) {
577 Ok(buf) => self.read(&buf[..]),
578 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
579 Err(e) => Err(e),
580 }
581 .context(path, "when decoding MultiMeta")
582 }
583
584 fn write_file<P: AsRef<Path>>(&self, path: P) -> crate::Result<()> {
586 let mut buf = Vec::new();
587 self.write(&mut buf).infallible()?;
588 utils::atomic_write(path, &buf, false)?;
589 Ok(())
590 }
591
592 fn read_log(&mut self, log: &log::Log) -> crate::Result<()> {
594 if let Some(last_entry) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
595 let data = last_entry?;
596 self.read(data).context(
597 log.path().as_opt_path().unwrap_or_else(|| Path::new("")),
598 "when decoding MutltiMeta",
599 )?;
600 }
601 Ok(())
602 }
603
604 fn write_log(&self, log: &mut log::Log, _lock: &LockGuard) -> crate::Result<()> {
606 let mut data = Vec::new();
607 self.write(&mut data).infallible()?;
608 log.clear_dirty()?;
610 log.sync()?;
611 if let Some(Ok(last_data)) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
612 if last_data == &data {
613 return Ok(());
615 }
616 }
617 log.append(&data)?;
618 log.sync()?;
619 Ok(())
620 }
621
622 fn bump_version(&mut self) {
624 self.version.1 += 1;
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use log::tests::pwrite;
631 use quickcheck::quickcheck;
632
633 use super::*;
634
635 fn simple_open_opts() -> OpenOptions {
636 OpenOptions::from_name_opts(vec![
637 ("a", log::OpenOptions::new()),
638 ("b", log::OpenOptions::new()),
639 ])
640 }
641
642 fn simple_multilog(path: &Path) -> MultiLog {
644 let mopts = simple_open_opts();
645 mopts.open(path).unwrap()
646 }
647
648 fn index_open_opts() -> OpenOptions {
649 fn index_func(bytes: &[u8]) -> Vec<log::IndexOutput> {
650 (0..bytes.len() as u64)
651 .map(|i| log::IndexOutput::Reference(i..i + 1))
652 .collect()
653 }
654 let index_def = log::IndexDef::new("x", index_func).lag_threshold(0);
655 OpenOptions::from_name_opts(vec![(
656 "a",
657 log::OpenOptions::new().index_defs(vec![index_def]),
658 )])
659 }
660
661 #[test]
662 fn test_individual_log_can_be_opened_directly() {
663 let dir = tempfile::tempdir().unwrap();
664 let path = dir.path();
665 let mut mlog = simple_multilog(path);
666
667 log::OpenOptions::new().open(path.join("a")).unwrap();
668 log::OpenOptions::new().open(path.join("b")).unwrap();
669
670 mlog[0].append(b"1").unwrap();
672 mlog[0].flush().unwrap();
673 log::OpenOptions::new().open(path.join("a")).unwrap();
674 }
675
676 #[test]
677 fn test_individual_log_flushes_are_invisible() {
678 let dir = tempfile::tempdir().unwrap();
679 let path = dir.path();
680 let mut mlog = simple_multilog(path);
681
682 mlog[0].append(b"2").unwrap();
686 mlog[0].sync().unwrap();
687 mlog[0].append(b"3").unwrap();
688 mlog[0].append(b"4").unwrap();
689
690 mlog[1].append(b"y").unwrap();
691 mlog[1].sync().unwrap();
692 mlog[1].append(b"z").unwrap();
693 mlog[1].sync().unwrap();
694
695 assert_eq!(mlog[0].iter().count(), 3);
696 assert_eq!(mlog[1].iter().count(), 2);
697
698 let mlog2 = simple_multilog(path);
701 assert_eq!(mlog2[0].iter().count(), 0);
702 assert_eq!(mlog2[1].iter().count(), 0);
703
704 mlog.sync().unwrap();
707 assert_eq!(mlog[0].iter().count(), 2);
708 assert_eq!(mlog[1].iter().count(), 0);
709
710 let mlog2 = simple_multilog(path);
711 assert_eq!(mlog2[0].iter().count(), 2);
712 assert_eq!(mlog2[1].iter().count(), 0);
713 }
714
715 #[test]
716 fn test_version() {
717 let dir = tempfile::tempdir().unwrap();
718 let path = dir.path();
719 let mut mlog1 = simple_multilog(&path.join("1"));
720 let mut mlog2 = simple_multilog(&path.join("2"));
721
722 let v1 = mlog1.version();
724 let v2 = mlog2.version();
725 assert!(v1.1 == 0);
726 assert!(v2.1 == 0);
727 assert_ne!(v1, v2);
728
729 mlog1.sync().unwrap();
731 mlog2.sync().unwrap();
732 let v3 = mlog1.version();
733 let v4 = mlog2.version();
734 assert_eq!(v3.0, v1.0);
735 assert_eq!(v4.0, v2.0);
736 assert!(v3 > v1);
737 assert!(v4 > v2);
738
739 let mlog1 = simple_multilog(&path.join("1"));
741 let mlog2 = simple_multilog(&path.join("2"));
742 let v5 = mlog1.version();
743 let v6 = mlog2.version();
744 assert_eq!(v5, v3);
745 assert_eq!(v6, v4);
746 }
747
748 #[test]
749 fn test_detach_logs() {
750 let dir = tempfile::tempdir().unwrap();
751 let path = dir.path();
752 let mut mlog = simple_multilog(path);
753 let mut logs = mlog.detach_logs();
754 logs[0].append(b"0").unwrap();
755 logs[1].append(b"1").unwrap();
756
757 let lock = mlog.lock().unwrap();
759 logs[0].sync().unwrap();
760 logs[1].sync().unwrap();
761 mlog.write_meta(&lock).unwrap();
762 drop(lock);
763
764 let mlog2 = simple_multilog(path);
765 assert_eq!(mlog2[0].iter().count(), 1);
766 assert_eq!(mlog2[1].iter().count(), 1);
767 }
768
769 #[test]
770 fn test_new_index_built_only_once() {
771 let dir = tempfile::tempdir().unwrap();
772 let path = dir.path();
773 let mopts = OpenOptions::from_name_opts(vec![("a", log::OpenOptions::new())]);
774 let mut mlog = mopts.open(path).unwrap();
775 mlog[0].append(b"0").unwrap();
776 mlog.sync().unwrap();
777
778 let index_def =
780 log::IndexDef::new("i", |_| vec![log::IndexOutput::Reference(0..1)]).lag_threshold(0);
781 let mopts = OpenOptions::from_name_opts(vec![(
782 "a",
783 log::OpenOptions::new().index_defs(vec![index_def.clone()]),
784 )]);
785 let index_size = || {
786 path.join("a")
787 .join(index_def.filename())
788 .metadata()
789 .map(|m| m.len())
790 .unwrap_or_default()
791 };
792
793 assert_eq!(index_size(), 0);
794
795 let _mlog = mopts.open(path).unwrap();
797 assert_eq!(index_size(), 36);
798
799 let mut mlog = mopts.open(path).unwrap();
801 assert_eq!(index_size(), 36);
802
803 let lock = LockGuard(ScopedDirLock::new(path).unwrap());
805 mlog.multimeta.metas["a"].lock().unwrap().epoch ^= 1;
806 mlog.multimeta
807 .write_log(&mut mlog.multimeta_log, &lock)
808 .unwrap();
809 mlog.multimeta.write_file(multi_meta_path(path)).unwrap();
810 drop(lock);
811
812 let _mlog = mopts.open(path).unwrap();
814 assert_eq!(index_size(), 71);
815 }
816
817 #[test]
818 fn test_wrong_locks_cause_errors() {
819 let dir = tempfile::tempdir().unwrap();
820 let path = dir.path();
821 let mut mlog1 = simple_multilog(&path.join("1"));
822 let mut mlog2 = simple_multilog(&path.join("2"));
823
824 let lock1 = mlog1.lock().unwrap();
825 let lock2 = mlog2.lock().unwrap();
826 assert!(mlog1.write_meta(&lock2).is_err());
827 assert!(mlog2.write_meta(&lock1).is_err());
828 }
829
830 fn repair_output(opts: &OpenOptions, path: &Path) -> String {
831 let out = opts.open_options_repair(path).unwrap();
832 filter_repair_output(out)
833 }
834
835 fn filter_repair_output(out: String) -> String {
836 out.lines()
838 .filter(|l| {
839 !l.contains("bytes in log")
840 && !l.contains("Backed up")
841 && !l.contains("Processing")
842 && !l.contains("date -d")
843 })
844 .collect::<Vec<_>>()
845 .join("\n")
846 }
847
848 #[test]
849 fn test_repair() {
850 let dir = tempfile::tempdir().unwrap();
851 let path = dir.path();
852 let opts = simple_open_opts();
853 let mut mlog = opts.open(path).unwrap();
854 let mut logs = mlog.detach_logs();
855
856 const N: usize = 12;
858 for i in 0..10u32 {
859 let lock = mlog.lock().unwrap();
860 for _ in 0..N {
861 logs[0].append(i.to_be_bytes()).unwrap();
862 logs[1].append(i.to_be_bytes()).unwrap();
863 logs[0].sync().unwrap();
864 }
865 logs[1].sync().unwrap();
866 mlog.write_meta(&lock).unwrap();
867 }
868
869 let repair = || repair_output(&opts, path);
870
871 let verify = || {
873 let mlog = opts.open(path).unwrap();
874 assert_eq!(mlog.logs[0].iter().count() % N, 0);
875 assert_eq!(mlog.logs[1].iter().count() % N, 0);
876 };
877
878 let s1 = repair();
880 assert_eq!(
881 &s1,
882 r#"Repairing MultiMeta Log:
883 Index "reverse" passed integrity check
884Repairing Log a
885Log a has valid length 1212 after repair
886Repairing Log b
887Log b has valid length 1212 after repair
888MultiMeta is valid"#
889 );
890
891 let s2 = filter_repair_output(std::fs::read_to_string(path.join("repair.log")).unwrap());
893 assert_eq!(&s1, s2.trim_end());
894
895 pwrite(&path.join("a").join("log"), 1000, b"ff");
898 assert_eq!(
899 repair(),
900 r#"Repairing MultiMeta Log:
901 Index "reverse" passed integrity check
902Repairing Log a
903 Reset log size to 992
904Log a has valid length 992 after repair
905Repairing Log b
906Log b has valid length 1212 after repair
907Found valid MultiMeta after 2 invalid entries: a: 972, b: 972
908Invalidated indexes in log 'a'
909Invalidated indexes in log 'b'
910Write valid MultiMeta"#
911 );
912 verify();
913
914 assert_eq!(
915 repair(),
916 r#"Repairing MultiMeta Log:
917 Index "reverse" passed integrity check
918Repairing Log a
919Log a has valid length 992 after repair
920Repairing Log b
921Log b has valid length 1212 after repair
922Invalidated indexes in log 'a'
923Invalidated indexes in log 'b'
924Write valid MultiMeta"#
925 );
926 }
927
928 #[test]
929 fn test_repair_broken_index() {
930 let dir = tempfile::tempdir().unwrap();
932 let path = dir.path();
933 let opts = index_open_opts();
934 let mut mlog = opts.open(path).unwrap();
935 let mut logs = mlog.detach_logs();
936
937 let repair = || repair_output(&opts, path);
938 let file_size = |path| std::fs::metadata(path).unwrap().len();
939
940 let meta_path = multi_meta_path(path);
941 let meta_log_path = multi_meta_log_path(path).join("log");
942 let index_path = path.join("a").join("index2-x");
943
944 let mut meta_log_sizes = Vec::new();
947 let mut index_sizes = Vec::new();
948 for data in [b"abcd", b"abce", b"acde", b"bcde"] {
949 let lock = mlog.lock().unwrap();
950 logs[0].append(data).unwrap();
951 logs[0].sync().unwrap();
952 mlog.write_meta(&lock).unwrap();
953 meta_log_sizes.push(file_size(&meta_log_path));
954 index_sizes.push(file_size(&index_path));
955 }
956 drop(mlog);
957 drop(logs);
958
959 pwrite(&index_path, -4, b"ffff");
964 pwrite(&meta_log_path, (meta_log_sizes[1] - 5) as _, b"xxxxx");
965 std::fs::remove_file(meta_path).unwrap();
966
967 let index_len_before = file_size(&index_path);
968 assert_eq!(
969 repair(),
970 r#"Repairing MultiMeta Log:
971 Reset log size to 111
972 Rebuilt index "reverse"
973Repairing Log a
974 Rebuilt index "x"
975Log a has valid length 52 after repair
976Invalidated indexes in log 'a'
977Write valid MultiMeta"#
978 );
979
980 let index_len_after = file_size(&index_path);
982 assert!(index_len_before > index_len_after);
983
984 opts.open(path).map(|_| 1).unwrap();
986 }
987
988 #[test]
989 fn test_mixed_old_new_read_writes() {
990 let dir = tempfile::tempdir().unwrap();
991 let path = dir.path();
992
993 let mut mlog_new = simple_open_opts().open(path).unwrap();
994 let mut logs_new = mlog_new.detach_logs();
995
996 let mut mlog_old = {
997 let mut opts = simple_open_opts();
998 opts.leacy_multimeta_source = true;
999 opts.open(path).unwrap()
1000 };
1001 let mut logs_old = mlog_old.detach_logs();
1002
1003 const N: usize = 2;
1005 for i in 0..N {
1006 for (mlog, logs, j) in [
1007 (&mut mlog_new, &mut logs_new, 0u8),
1008 (&mut mlog_old, &mut logs_old, 1u8),
1009 ] {
1010 let lock = mlog.lock().unwrap();
1011 logs[0].append([i as u8, j]).unwrap();
1012 logs[0].sync().unwrap();
1013 mlog.write_meta(&lock).unwrap();
1014 }
1015 }
1016
1017 let mlog = simple_open_opts().open(path).unwrap();
1019 assert_eq!(
1020 mlog.logs[0].iter().map(|e| e.unwrap()).collect::<Vec<_>>(),
1021 [[0, 0], [0, 1], [1, 0], [1, 1]],
1022 );
1023 }
1024
1025 quickcheck! {
1026 fn test_roundtrip_multimeta(name_len_list: Vec<(String, u64)>, version: (u64, u64)) -> bool {
1027 let metas = name_len_list
1028 .into_iter()
1029 .map(|(name, len)| {
1030 let meta = LogMetadata::new_with_primary_len(len);
1031 (name, Arc::new(Mutex::new(meta)))
1032 })
1033 .collect();
1034 let meta = MultiMeta { metas, version, ..Default::default() };
1035 let mut buf = Vec::new();
1036 meta.write(&mut buf).unwrap();
1037 let mut meta2 = MultiMeta::default();
1038 meta2.read(&buf[..]).unwrap();
1039 let mut buf2 = Vec::new();
1040 meta2.write(&mut buf2).unwrap();
1041 assert_eq!(buf2, buf);
1042 buf2 == buf
1043 }
1044
1045 fn test_roundtrip_multilog(list_a: Vec<Vec<u8>>, list_b: Vec<Vec<u8>>) -> bool {
1046 let dir = tempfile::tempdir().unwrap();
1047 let mut mlog = simple_multilog(dir.path());
1048 for a in &list_a {
1049 mlog[0].append(a).unwrap();
1050 }
1051 for b in &list_b {
1052 mlog[1].append(b).unwrap();
1053 }
1054 mlog.sync().unwrap();
1055
1056 let mlog_read = simple_multilog(dir.path());
1057 let list_a_read: Vec<Vec<u8>> = mlog_read[0].iter().map(|e| e.unwrap().to_vec()).collect();
1058 let list_b_read: Vec<Vec<u8>> = mlog_read[1].iter().map(|e| e.unwrap().to_vec()).collect();
1059
1060 list_a == list_a_read && list_b == list_b_read
1061 }
1062 }
1063}