1use std::collections::BTreeMap;
11use std::collections::HashMap;
12use std::io;
13use std::mem;
14use std::ops;
15use std::path::Path;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::Mutex;
19
20use vlqencoding::VLQDecode;
21use vlqencoding::VLQEncode;
22
23use crate::errors::IoResultExt;
24use crate::errors::ResultExt;
25use crate::lock::ScopedDirLock;
26use crate::lock::READER_LOCK_OPTS;
27use crate::log;
28use crate::log::GenericPath;
29use crate::log::LogMetadata;
30use crate::repair::OpenOptionsOutput;
31use crate::repair::OpenOptionsRepair;
32use crate::repair::RepairMessage;
33use crate::utils;
34use crate::utils::rand_u64;
35
36#[derive(Clone, Default)]
38pub struct OpenOptions {
39 name_open_options: Vec<(&'static str, log::OpenOptions)>,
41
42 leacy_multimeta_source: bool,
46}
47
48pub struct MultiLog {
65 path: PathBuf,
68
69 multimeta: MultiMeta,
71
72 logs: Vec<log::Log>,
74
75 multimeta_log: log::Log,
77
78 leacy_multimeta_source: bool,
82
83 reader_lock: ScopedDirLock,
85}
86
87const INDEX_REVERSE_KEY: &[u8] = b"r";
89
90const INDEX_REVERSE: usize = 0;
92
93#[derive(Debug)]
94pub struct MultiMeta {
95 metas: BTreeMap<String, Arc<Mutex<LogMetadata>>>,
96
97 version: (u64, u64),
101}
102
103impl OpenOptions {
104 pub fn from_name_opts(name_opts: Vec<(&'static str, log::OpenOptions)>) -> Self {
106 for (name, _) in &name_opts {
108 if name == &"multimeta" {
109 panic!("MultiLog: cannot use 'multimeta' as Log name");
110 } else if name.contains('/') || name.contains('\\') {
111 panic!("MultiLog: cannot use '/' or '\\' in Log name");
112 }
113 }
114 Self {
115 name_open_options: name_opts,
116 leacy_multimeta_source: false,
117 }
118 }
119
120 pub fn open(&self, path: &Path) -> crate::Result<MultiLog> {
125 let result: crate::Result<_> = (|| {
126 let reader_lock = ScopedDirLock::new_with_options(path, &READER_LOCK_OPTS)?;
127
128 let meta_log_path = multi_meta_log_path(&path);
131 let meta_path = multi_meta_path(path);
132 let mut multimeta_log = multi_meta_log_open_options().open(&meta_log_path)?;
133 let multimeta_log_is_empty = multimeta_log.iter().next().is_none();
134
135 let mut multimeta = MultiMeta::default();
137 if multimeta_log_is_empty || self.leacy_multimeta_source {
138 multimeta.read_file(&meta_path)?;
141 } else {
142 multimeta.read_log(&multimeta_log)?;
144 apply_legacy_meta_if_it_is_newer(&meta_path, &mut multimeta);
145 }
146
147 let locked = if !multimeta_log_is_empty
148 && self
149 .name_open_options
150 .iter()
151 .all(|(name, _)| multimeta.metas.contains_key(AsRef::<str>::as_ref(name)))
152 {
153 None
155 } else {
156 utils::mkdir_p(path)?;
158 Some(LockGuard(ScopedDirLock::new(path)?))
159 };
160
161 let mut logs = Vec::with_capacity(self.name_open_options.len());
162 for (name, opts) in self.name_open_options.iter() {
163 let fspath = path.join(name);
164 let name_ref: &str = name;
165 if !multimeta.metas.contains_key(name_ref) {
166 utils::mkdir_p(&fspath)?;
168 let meta = log::Log::load_or_create_meta(&fspath.as_path().into(), true)?;
169 let meta = Arc::new(Mutex::new(meta));
170 multimeta.metas.insert(name.to_string(), meta);
171 }
172 let path = GenericPath::SharedMeta {
173 path: Box::new(fspath.as_path().into()),
174 meta: multimeta.metas[name_ref].clone(),
175 };
176 let log = opts.open(path)?;
177 logs.push(log);
178 }
179
180 if let Some(locked) = locked.as_ref() {
181 if !self.leacy_multimeta_source {
182 multimeta.write_log(&mut multimeta_log, locked)?;
183 }
184 multimeta.write_file(&meta_path)?;
185 }
186
187 Ok(MultiLog {
188 path: path.to_path_buf(),
189 logs,
190 multimeta,
191 multimeta_log,
192 leacy_multimeta_source: self.leacy_multimeta_source,
193 reader_lock,
194 })
195 })();
196
197 result.context("in multi::OpenOptions::open")
198 }
199}
200
201impl MultiLog {
202 pub fn lock(&mut self) -> crate::Result<LockGuard> {
210 let result: crate::Result<_> = (|| {
211 let lock = LockGuard(ScopedDirLock::new(&self.path)?);
212 self.read_meta(&lock)?;
213 Ok(lock)
214 })();
215 result.context("in MultiLog::lock")
216 }
217
218 pub fn write_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
223 if lock.0.path() != self.path {
224 let msg = format!(
225 "Invalid lock used to write_meta (Lock path = {:?}, MultiLog path = {:?})",
226 lock.0.path(),
227 &self.path
228 );
229 return Err(crate::Error::programming(msg));
230 }
231 let result: crate::Result<_> = (|| {
232 self.multimeta.bump_version();
233 if !self.leacy_multimeta_source {
234 self.multimeta.write_log(&mut self.multimeta_log, lock)?;
236 }
237
238 let meta_path = multi_meta_path(&self.path);
240 self.multimeta.write_file(&meta_path)?;
241
242 Ok(())
243 })();
244 result.context("in MultiLog::write_meta")
245 }
246
247 pub fn version(&self) -> (u64, u64) {
256 self.multimeta.version
257 }
258
259 fn read_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
264 debug_assert_eq!(lock.0.path(), &self.path);
265 (|| -> crate::Result<()> {
266 let meta_path = multi_meta_path(&self.path);
267 if self.leacy_multimeta_source {
268 self.multimeta.read_file(&meta_path)?;
269 } else {
270 self.multimeta_log.clear_dirty()?;
271 self.multimeta_log.sync()?;
272 self.multimeta.read_log(&self.multimeta_log)?;
273 apply_legacy_meta_if_it_is_newer(&meta_path, &mut self.multimeta);
274 }
275 Ok(())
276 })()
277 .context("reloading multimeta")
278 }
279
280 pub fn detach_logs(&mut self) -> Vec<log::Log> {
288 let mut result = Vec::new();
289 mem::swap(&mut result, &mut self.logs);
290 result
291 }
292
293 fn sync(&mut self) -> crate::Result<()> {
302 let lock = self.lock()?;
303 for log in self.logs.iter_mut() {
304 log.sync()?;
305 }
306 self.write_meta(&lock)?;
307 Ok(())
308 }
309}
310
311fn apply_legacy_meta_if_it_is_newer(meta_path: &Path, multimeta: &mut MultiMeta) {
312 let mut maybe_new_multimeta = MultiMeta::default();
315 if maybe_new_multimeta.read_file(meta_path).is_ok() {
316 if maybe_new_multimeta.metas.iter().all(|(k, v)| {
317 v.lock().unwrap().primary_len
318 >= match multimeta.metas.get(k) {
319 None => 0,
320 Some(v) => v.lock().unwrap().primary_len,
321 }
322 }) {
323 for (k, v) in multimeta.metas.iter() {
326 let mut current = v.lock().unwrap();
327 if let Some(newer) = maybe_new_multimeta.metas.remove(k) {
328 let newer = newer.lock().unwrap();
329 current.primary_len = newer.primary_len;
330 current.indexes = newer.indexes.clone();
331 }
332 }
333 }
334 }
335}
336
337fn multi_meta_log_open_options() -> log::OpenOptions {
338 log::OpenOptions::new()
339 .index("reverse", |_data| -> Vec<_> {
340 vec![log::IndexOutput::Owned(
342 INDEX_REVERSE_KEY.to_vec().into_boxed_slice(),
343 )]
344 })
345 .create(true)
346}
347
348pub struct LockGuard(ScopedDirLock);
350
351impl ops::Index<usize> for MultiLog {
352 type Output = log::Log;
353 fn index(&self, index: usize) -> &Self::Output {
354 &self.logs[index]
355 }
356}
357
358impl ops::IndexMut<usize> for MultiLog {
359 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
360 &mut self.logs[index]
361 }
362}
363
364impl OpenOptionsRepair for OpenOptions {
365 fn open_options_repair(&self, path: impl AsRef<Path>) -> crate::Result<String> {
366 let path = path.as_ref();
367 let lock = LockGuard(ScopedDirLock::new(path)?);
368 let mut out = RepairMessage::new(path);
369
370 let mpath = multi_meta_log_path(path);
372 out += "Repairing MultiMeta Log:\n";
373 out += &indent(&multi_meta_log_open_options().open_options_repair(&mpath)?);
374
375 let mut repaired_log_metas = HashMap::new();
377 for (name, opts) in self.name_open_options.iter() {
378 let fspath = path.join(name);
379 if !fspath.exists() {
380 out += &format!("Skipping non-existed Log {}\n", name);
381 continue;
382 }
383 out += &format!("Repairing Log {}\n", name);
384 out += &indent(&opts.open_options_repair(&fspath)?);
385 let log = opts.open(&fspath)?;
386 let len = log.meta.primary_len;
387 out += &format!("Log {} has valid length {} after repair\n", name, len);
388 repaired_log_metas.insert(*name, log.meta);
389 }
390
391 let mut mlog = multi_meta_log_open_options()
393 .open(&mpath)
394 .context("repair cannot open MultiMeta Log after repairing it")?;
395 let mut selected_meta = None;
396 let mut invalid_count = 0;
397 for entry in mlog.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)? {
398 if let Ok(data) = entry {
401 let mut mmeta = MultiMeta::default();
402 if mmeta.read(data).is_ok() {
403 if mmeta.metas.iter().all(|(name, meta)| {
405 let len_required = meta.lock().unwrap().primary_len;
406 let len_provided = repaired_log_metas
407 .get(name.as_str())
408 .map(|m| m.primary_len)
409 .unwrap_or_default();
410 len_required <= len_provided
411 }) {
412 if invalid_count > 0 {
413 let mmeta_desc = mmeta
415 .metas
416 .iter()
417 .map(|(name, meta)| {
418 format!("{}: {}", name, meta.lock().unwrap().primary_len)
419 })
420 .collect::<Vec<_>>()
421 .join(", ");
422 out += &format!(
423 "Found valid MultiMeta after {} invalid entries: {}\n",
424 invalid_count, mmeta_desc
425 );
426 }
427 selected_meta = Some(mmeta);
428 break;
429 } else {
430 invalid_count += 1;
431 }
432 }
433 }
434 }
435
436 if selected_meta.is_none() {
437 let mut mmeta = MultiMeta::default();
439 if mmeta.read_file(&multi_meta_path(path)).is_ok() {
440 selected_meta = Some(mmeta);
441 }
442 }
443
444 let selected_meta = match selected_meta {
445 None => {
446 return Err(crate::Error::corruption(
447 &mpath,
448 "repair cannot find valid MultiMeta",
449 ))
450 .context(|| format!("Repair log:\n{}", indent(out.as_str())));
451 }
452 Some(meta) => meta,
453 };
454
455 let mut should_write_new_meta_entry = invalid_count > 0;
456 for (name, log_meta) in selected_meta.metas.iter() {
457 let mut log_meta = log_meta.lock().unwrap();
458 let should_invalidate_indexes = match repaired_log_metas.get(name.as_str()) {
459 None => true,
460 Some(repaired_log_meta) => &*log_meta != repaired_log_meta,
461 };
462 if should_invalidate_indexes {
463 out += &format!("Invalidated indexes in log '{}'\n", name);
464 log_meta.indexes.clear();
465 should_write_new_meta_entry = true;
466 }
467 }
468
469 if should_write_new_meta_entry {
470 selected_meta
471 .write_log(&mut mlog, &lock)
472 .context("repair cannot write MultiMeta log")?;
473 selected_meta
474 .write_file(multi_meta_path(path))
475 .context("repair cannot write valid MultiMeta file")?;
476 out += "Write valid MultiMeta\n";
477 } else {
478 out += "MultiMeta is valid\n";
479 }
480
481 Ok(out.into_string())
482 }
483}
484
485impl OpenOptionsOutput for OpenOptions {
486 type Output = MultiLog;
487
488 fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
489 self.open(path)
490 }
491}
492
493fn multi_meta_path(dir: &Path) -> PathBuf {
494 dir.join("multimeta")
495}
496
497fn multi_meta_log_path(dir: &Path) -> PathBuf {
498 dir.join("multimetalog")
499}
500
501fn indent(s: &str) -> String {
503 s.lines()
504 .map(|l| format!(" {}\n", l))
505 .collect::<Vec<_>>()
506 .concat()
507}
508
509impl Default for MultiMeta {
510 fn default() -> Self {
511 Self {
512 metas: Default::default(),
513 version: (rand_u64(), 0),
514 }
515 }
516}
517
518impl MultiMeta {
519 fn read(&mut self, mut reader: impl io::Read) -> io::Result<()> {
522 let format_version: usize = reader.read_vlq()?;
523 if format_version != 0 {
524 return Err(io::Error::new(
525 io::ErrorKind::Other,
526 format!("MultiMeta format {} is unsupported", format_version),
527 ));
528 }
529 let count: usize = reader.read_vlq()?;
530 for _ in 0..count {
531 let name_len = reader.read_vlq()?;
532 let mut name_buf = vec![0; name_len];
533 reader.read_exact(&mut name_buf)?;
534 let name = String::from_utf8(name_buf)
535 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Log name is not utf-8"))?;
536 let meta = LogMetadata::read(&mut reader)?;
537 self.metas
538 .entry(name.to_string())
539 .and_modify(|e| {
540 let mut e = e.lock().unwrap();
541 let truncated = e.primary_len > meta.primary_len && e.epoch == meta.epoch;
542 *e = meta.clone();
543 if truncated {
545 e.epoch = e.epoch.wrapping_add(1);
546 }
547 })
548 .or_insert_with(|| Arc::new(Mutex::new(meta.clone())));
549 }
550 let version_major: u64 = reader.read_vlq().unwrap_or_else(|_| rand_u64());
551 let version_minor: u64 = reader.read_vlq().unwrap_or_default();
552 self.version = (version_major, version_minor);
553 Ok(())
554 }
555
556 fn write(&self, mut writer: impl io::Write) -> io::Result<()> {
558 let version = 0;
559 writer.write_vlq(version)?;
560 writer.write_vlq(self.metas.len())?;
561 for (name, meta) in self.metas.iter() {
562 writer.write_vlq(name.len())?;
563 writer.write_all(name.as_bytes())?;
564 meta.lock().unwrap().write(&mut writer)?;
565 }
566 writer.write_vlq(self.version.0)?;
567 writer.write_vlq(self.version.1)?;
568 Ok(())
569 }
570
571 fn read_file<P: AsRef<Path>>(&mut self, path: P) -> crate::Result<()> {
574 let path = path.as_ref();
575 match utils::atomic_read(path) {
576 Ok(buf) => self.read(&buf[..]),
577 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
578 Err(e) => Err(e),
579 }
580 .context(path, "when decoding MultiMeta")
581 }
582
583 fn write_file<P: AsRef<Path>>(&self, path: P) -> crate::Result<()> {
585 let mut buf = Vec::new();
586 self.write(&mut buf).infallible()?;
587 utils::atomic_write(path, &buf, false)?;
588 Ok(())
589 }
590
591 fn read_log(&mut self, log: &log::Log) -> crate::Result<()> {
593 if let Some(last_entry) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
594 let data = last_entry?;
595 self.read(data).context(
596 log.path().as_opt_path().unwrap_or_else(|| Path::new("")),
597 "when decoding MutltiMeta",
598 )?;
599 }
600 Ok(())
601 }
602
603 fn write_log(&self, log: &mut log::Log, _lock: &LockGuard) -> crate::Result<()> {
605 let mut data = Vec::new();
606 self.write(&mut data).infallible()?;
607 log.clear_dirty()?;
609 log.sync()?;
610 if let Some(Ok(last_data)) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
611 if last_data == &data {
612 return Ok(());
614 }
615 }
616 log.append(&data)?;
617 log.sync()?;
618 Ok(())
619 }
620
621 fn bump_version(&mut self) {
623 self.version.1 += 1;
624 }
625}
626
627#[cfg(test)]
628mod tests {
629 use log::tests::pwrite;
630 use quickcheck::quickcheck;
631
632 use super::*;
633
634 fn simple_open_opts() -> OpenOptions {
635 OpenOptions::from_name_opts(vec![
636 ("a", log::OpenOptions::new()),
637 ("b", log::OpenOptions::new()),
638 ])
639 }
640
641 fn simple_multilog(path: &Path) -> MultiLog {
643 let mopts = simple_open_opts();
644 mopts.open(path).unwrap()
645 }
646
647 fn index_open_opts() -> OpenOptions {
648 fn index_func(bytes: &[u8]) -> Vec<log::IndexOutput> {
649 (0..bytes.len() as u64)
650 .map(|i| log::IndexOutput::Reference(i..i + 1))
651 .collect()
652 }
653 let index_def = log::IndexDef::new("x", index_func).lag_threshold(0);
654 OpenOptions::from_name_opts(vec![(
655 "a",
656 log::OpenOptions::new().index_defs(vec![index_def]),
657 )])
658 }
659
660 #[test]
661 fn test_individual_log_can_be_opened_directly() {
662 let dir = tempfile::tempdir().unwrap();
663 let path = dir.path();
664 let mut mlog = simple_multilog(path);
665
666 log::OpenOptions::new().open(path.join("a")).unwrap();
667 log::OpenOptions::new().open(path.join("b")).unwrap();
668
669 mlog[0].append(b"1").unwrap();
671 mlog[0].flush().unwrap();
672 log::OpenOptions::new().open(path.join("a")).unwrap();
673 }
674
675 #[test]
676 fn test_individual_log_flushes_are_invisible() {
677 let dir = tempfile::tempdir().unwrap();
678 let path = dir.path();
679 let mut mlog = simple_multilog(path);
680
681 mlog[0].append(b"2").unwrap();
685 mlog[0].sync().unwrap();
686 mlog[0].append(b"3").unwrap();
687 mlog[0].append(b"4").unwrap();
688
689 mlog[1].append(b"y").unwrap();
690 mlog[1].sync().unwrap();
691 mlog[1].append(b"z").unwrap();
692 mlog[1].sync().unwrap();
693
694 assert_eq!(mlog[0].iter().count(), 3);
695 assert_eq!(mlog[1].iter().count(), 2);
696
697 let mlog2 = simple_multilog(path);
700 assert_eq!(mlog2[0].iter().count(), 0);
701 assert_eq!(mlog2[1].iter().count(), 0);
702
703 mlog.sync().unwrap();
706 assert_eq!(mlog[0].iter().count(), 2);
707 assert_eq!(mlog[1].iter().count(), 0);
708
709 let mlog2 = simple_multilog(path);
710 assert_eq!(mlog2[0].iter().count(), 2);
711 assert_eq!(mlog2[1].iter().count(), 0);
712 }
713
714 #[test]
715 fn test_version() {
716 let dir = tempfile::tempdir().unwrap();
717 let path = dir.path();
718 let mut mlog1 = simple_multilog(&path.join("1"));
719 let mut mlog2 = simple_multilog(&path.join("2"));
720
721 let v1 = mlog1.version();
723 let v2 = mlog2.version();
724 assert!(v1.1 == 0);
725 assert!(v2.1 == 0);
726 assert_ne!(v1, v2);
727
728 mlog1.sync().unwrap();
730 mlog2.sync().unwrap();
731 let v3 = mlog1.version();
732 let v4 = mlog2.version();
733 assert_eq!(v3.0, v1.0);
734 assert_eq!(v4.0, v2.0);
735 assert!(v3 > v1);
736 assert!(v4 > v2);
737
738 let mlog1 = simple_multilog(&path.join("1"));
740 let mlog2 = simple_multilog(&path.join("2"));
741 let v5 = mlog1.version();
742 let v6 = mlog2.version();
743 assert_eq!(v5, v3);
744 assert_eq!(v6, v4);
745 }
746
747 #[test]
748 fn test_detach_logs() {
749 let dir = tempfile::tempdir().unwrap();
750 let path = dir.path();
751 let mut mlog = simple_multilog(path);
752 let mut logs = mlog.detach_logs();
753 logs[0].append(b"0").unwrap();
754 logs[1].append(b"1").unwrap();
755
756 let lock = mlog.lock().unwrap();
758 logs[0].sync().unwrap();
759 logs[1].sync().unwrap();
760 mlog.write_meta(&lock).unwrap();
761 drop(lock);
762
763 let mlog2 = simple_multilog(path);
764 assert_eq!(mlog2[0].iter().count(), 1);
765 assert_eq!(mlog2[1].iter().count(), 1);
766 }
767
768 #[test]
769 fn test_new_index_built_only_once() {
770 let dir = tempfile::tempdir().unwrap();
771 let path = dir.path();
772 let mopts = OpenOptions::from_name_opts(vec![("a", log::OpenOptions::new())]);
773 let mut mlog = mopts.open(path).unwrap();
774 mlog[0].append(b"0").unwrap();
775 mlog.sync().unwrap();
776
777 let index_def =
779 log::IndexDef::new("i", |_| vec![log::IndexOutput::Reference(0..1)]).lag_threshold(0);
780 let mopts = OpenOptions::from_name_opts(vec![(
781 "a",
782 log::OpenOptions::new().index_defs(vec![index_def.clone()]),
783 )]);
784 let index_size = || {
785 path.join("a")
786 .join(index_def.filename())
787 .metadata()
788 .map(|m| m.len())
789 .unwrap_or_default()
790 };
791
792 assert_eq!(index_size(), 0);
793
794 let _mlog = mopts.open(path).unwrap();
796 assert_eq!(index_size(), 36);
797
798 let mut mlog = mopts.open(path).unwrap();
800 assert_eq!(index_size(), 36);
801
802 let lock = LockGuard(ScopedDirLock::new(path).unwrap());
804 mlog.multimeta.metas["a"].lock().unwrap().epoch ^= 1;
805 mlog.multimeta
806 .write_log(&mut mlog.multimeta_log, &lock)
807 .unwrap();
808 mlog.multimeta.write_file(&multi_meta_path(path)).unwrap();
809 drop(lock);
810
811 let _mlog = mopts.open(path).unwrap();
813 assert_eq!(index_size(), 71);
814 }
815
816 #[test]
817 fn test_wrong_locks_cause_errors() {
818 let dir = tempfile::tempdir().unwrap();
819 let path = dir.path();
820 let mut mlog1 = simple_multilog(&path.join("1"));
821 let mut mlog2 = simple_multilog(&path.join("2"));
822
823 let lock1 = mlog1.lock().unwrap();
824 let lock2 = mlog2.lock().unwrap();
825 assert!(mlog1.write_meta(&lock2).is_err());
826 assert!(mlog2.write_meta(&lock1).is_err());
827 }
828
829 fn repair_output(opts: &OpenOptions, path: &Path) -> String {
830 let out = opts.open_options_repair(path).unwrap();
831 filter_repair_output(out)
832 }
833
834 fn filter_repair_output(out: String) -> String {
835 out.lines()
837 .filter(|l| {
838 !l.contains("bytes in log")
839 && !l.contains("Backed up")
840 && !l.contains("Processing")
841 && !l.contains("date -d")
842 })
843 .collect::<Vec<_>>()
844 .join("\n")
845 }
846
847 #[test]
848 fn test_repair() {
849 let dir = tempfile::tempdir().unwrap();
850 let path = dir.path();
851 let opts = simple_open_opts();
852 let mut mlog = opts.open(&path).unwrap();
853 let mut logs = mlog.detach_logs();
854
855 const N: usize = 12;
857 for i in 0..10u32 {
858 let lock = mlog.lock().unwrap();
859 for _ in 0..N {
860 logs[0].append(i.to_be_bytes()).unwrap();
861 logs[1].append(i.to_be_bytes()).unwrap();
862 logs[0].sync().unwrap();
863 }
864 logs[1].sync().unwrap();
865 mlog.write_meta(&lock).unwrap();
866 }
867
868 let repair = || repair_output(&opts, path);
869
870 let verify = || {
872 let mlog = opts.open(&path).unwrap();
873 assert_eq!(mlog.logs[0].iter().count() % N, 0);
874 assert_eq!(mlog.logs[1].iter().count() % N, 0);
875 };
876
877 let s1 = repair();
879 assert_eq!(
880 &s1,
881 r#"Repairing MultiMeta Log:
882 Index "reverse" passed integrity check
883Repairing Log a
884Log a has valid length 1212 after repair
885Repairing Log b
886Log b has valid length 1212 after repair
887MultiMeta is valid"#
888 );
889
890 let s2 = filter_repair_output(std::fs::read_to_string(path.join("repair.log")).unwrap());
892 assert_eq!(&s1, s2.trim_end());
893
894 pwrite(&path.join("a").join("log"), 1000, b"ff");
897 assert_eq!(
898 repair(),
899 r#"Repairing MultiMeta Log:
900 Index "reverse" passed integrity check
901Repairing Log a
902 Reset log size to 992
903Log a has valid length 992 after repair
904Repairing Log b
905Log b has valid length 1212 after repair
906Found valid MultiMeta after 2 invalid entries: a: 972, b: 972
907Invalidated indexes in log 'a'
908Invalidated indexes in log 'b'
909Write valid MultiMeta"#
910 );
911 verify();
912
913 assert_eq!(
914 repair(),
915 r#"Repairing MultiMeta Log:
916 Index "reverse" passed integrity check
917Repairing Log a
918Log a has valid length 992 after repair
919Repairing Log b
920Log b has valid length 1212 after repair
921Invalidated indexes in log 'a'
922Invalidated indexes in log 'b'
923Write valid MultiMeta"#
924 );
925 }
926
927 #[test]
928 fn test_repair_broken_index() {
929 let dir = tempfile::tempdir().unwrap();
931 let path = dir.path();
932 let opts = index_open_opts();
933 let mut mlog = opts.open(&path).unwrap();
934 let mut logs = mlog.detach_logs();
935
936 let repair = || repair_output(&opts, path);
937 let file_size = |path| std::fs::metadata(path).unwrap().len();
938
939 let meta_path = multi_meta_path(path);
940 let meta_log_path = multi_meta_log_path(path).join("log");
941 let index_path = path.join("a").join("index2-x");
942
943 let mut meta_log_sizes = Vec::new();
946 let mut index_sizes = Vec::new();
947 for data in [b"abcd", b"abce", b"acde", b"bcde"] {
948 let lock = mlog.lock().unwrap();
949 logs[0].append(data).unwrap();
950 logs[0].sync().unwrap();
951 mlog.write_meta(&lock).unwrap();
952 meta_log_sizes.push(file_size(&meta_log_path));
953 index_sizes.push(file_size(&index_path));
954 }
955 drop(mlog);
956 drop(logs);
957
958 pwrite(&index_path, -4, b"ffff");
963 pwrite(&meta_log_path, (meta_log_sizes[1] - 5) as _, b"xxxxx");
964 std::fs::remove_file(&meta_path).unwrap();
965
966 let index_len_before = file_size(&index_path);
967 assert_eq!(
968 repair(),
969 r#"Repairing MultiMeta Log:
970 Reset log size to 111
971 Rebuilt index "reverse"
972Repairing Log a
973 Rebuilt index "x"
974Log a has valid length 52 after repair
975Invalidated indexes in log 'a'
976Write valid MultiMeta"#
977 );
978
979 let index_len_after = file_size(&index_path);
981 assert!(index_len_before > index_len_after);
982
983 opts.open(path).map(|_| 1).unwrap();
985 }
986
987 #[test]
988 fn test_mixed_old_new_read_writes() {
989 let dir = tempfile::tempdir().unwrap();
990 let path = dir.path();
991
992 let mut mlog_new = simple_open_opts().open(&path).unwrap();
993 let mut logs_new = mlog_new.detach_logs();
994
995 let mut mlog_old = {
996 let mut opts = simple_open_opts();
997 opts.leacy_multimeta_source = true;
998 opts.open(&path).unwrap()
999 };
1000 let mut logs_old = mlog_old.detach_logs();
1001
1002 const N: usize = 2;
1004 for i in 0..N {
1005 for (mlog, logs, j) in [
1006 (&mut mlog_new, &mut logs_new, 0u8),
1007 (&mut mlog_old, &mut logs_old, 1u8),
1008 ] {
1009 let lock = mlog.lock().unwrap();
1010 logs[0].append(&[i as u8, j]).unwrap();
1011 logs[0].sync().unwrap();
1012 mlog.write_meta(&lock).unwrap();
1013 }
1014 }
1015
1016 let mlog = simple_open_opts().open(&path).unwrap();
1018 assert_eq!(
1019 mlog.logs[0].iter().map(|e| e.unwrap()).collect::<Vec<_>>(),
1020 [[0, 0], [0, 1], [1, 0], [1, 1]],
1021 );
1022 }
1023
1024 quickcheck! {
1025 fn test_roundtrip_multimeta(name_len_list: Vec<(String, u64)>, version: (u64, u64)) -> bool {
1026 let metas = name_len_list
1027 .into_iter()
1028 .map(|(name, len)| {
1029 let meta = LogMetadata::new_with_primary_len(len);
1030 (name, Arc::new(Mutex::new(meta)))
1031 })
1032 .collect();
1033 let meta = MultiMeta { metas, version, ..Default::default() };
1034 let mut buf = Vec::new();
1035 meta.write(&mut buf).unwrap();
1036 let mut meta2 = MultiMeta::default();
1037 meta2.read(&buf[..]).unwrap();
1038 let mut buf2 = Vec::new();
1039 meta2.write(&mut buf2).unwrap();
1040 assert_eq!(buf2, buf);
1041 buf2 == buf
1042 }
1043
1044 fn test_roundtrip_multilog(list_a: Vec<Vec<u8>>, list_b: Vec<Vec<u8>>) -> bool {
1045 let dir = tempfile::tempdir().unwrap();
1046 let mut mlog = simple_multilog(dir.path());
1047 for a in &list_a {
1048 mlog[0].append(a).unwrap();
1049 }
1050 for b in &list_b {
1051 mlog[1].append(b).unwrap();
1052 }
1053 mlog.sync().unwrap();
1054
1055 let mlog_read = simple_multilog(dir.path());
1056 let list_a_read: Vec<Vec<u8>> = mlog_read[0].iter().map(|e| e.unwrap().to_vec()).collect();
1057 let list_b_read: Vec<Vec<u8>> = mlog_read[1].iter().map(|e| e.unwrap().to_vec()).collect();
1058
1059 list_a == list_a_read && list_b == list_b_read
1060 }
1061 }
1062}