indexedlog/
multi.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! Atomic `sync` support for multiple [`Log`]s.
9
10use std::collections::BTreeMap;
11use std::collections::HashMap;
12use std::io;
13use std::mem;
14use std::ops;
15use std::path::Path;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::Mutex;
19
20use vlqencoding::VLQDecode;
21use vlqencoding::VLQEncode;
22
23use crate::errors::IoResultExt;
24use crate::errors::ResultExt;
25use crate::lock::ScopedDirLock;
26use crate::lock::READER_LOCK_OPTS;
27use crate::log;
28use crate::log::GenericPath;
29use crate::log::LogMetadata;
30use crate::repair::OpenOptionsOutput;
31use crate::repair::OpenOptionsRepair;
32use crate::repair::RepairMessage;
33use crate::utils;
34use crate::utils::rand_u64;
35
36/// Options used to configure how a [`MultiLog`] is opened.
37#[derive(Clone, Default)]
38pub struct OpenOptions {
39    /// Name (subdir) of the Log and its OpenOptions.
40    name_open_options: Vec<(&'static str, log::OpenOptions)>,
41
42    /// Whether to use legacy MultiMeta source.
43    /// true: use "multimeta" file; false: use "multimeta_log" Log.
44    /// For testing purpose only.
45    leacy_multimeta_source: bool,
46}
47
48/// A [`MultiLog`] contains multiple [`Log`]s with a centric metadata file.
49///
50/// Metadata is "frozen" and changes to the metadata on the filesystem are not
51/// visible to Logs until [`MultiLog::lock`] gets called.  The only way to write
52/// the centric metadata back to the filesystem is [`MultiLog::write_meta`].
53/// Note: [`MultiLog::sync`] calls the above functions and is another way to
54/// exchange data with the filesystem.
55///
56/// [`Log`]s will be accessible via indexing. For example, `multilog[0]`
57/// accesses the first [`Log`]. [`Log`]s can also be moved out of this
58/// struct by [`MultiLog::detach_logs`].
59///
60/// [`MultiLog`] makes sure the data consistency on disk but not always
61/// in memory. In case [`MultiLog::write_meta`] is not called or is not
62/// successful, but [`Log::sync`] was called. The data in [`Log`] might
63/// be rewritten by other processes, breaking the [`Log`]!
64pub struct MultiLog {
65    /// Directory containing all the Logs.
66    /// Used to write metadata.
67    path: PathBuf,
68
69    /// Combined metadata from logs.
70    multimeta: MultiMeta,
71
72    /// Logs loaded by MultiLog.
73    logs: Vec<log::Log>,
74
75    /// Log used for `MultiMeta`. For data recovery.
76    multimeta_log: log::Log,
77
78    /// Whether to use legacy MultiMeta source.
79    /// true: use "multimeta" file; false: use "multimeta_log" Log.
80    /// For testing purpose only.
81    leacy_multimeta_source: bool,
82
83    /// Indicate an active reader. Destrictive writes (repair) are unsafe.
84    reader_lock: ScopedDirLock,
85}
86
87/// Constant for the reverse index of multimeta log.
88const INDEX_REVERSE_KEY: &[u8] = b"r";
89
90/// The reverse index is the first index. See [`multi_meta_log_open_options`].
91const INDEX_REVERSE: usize = 0;
92
93#[derive(Debug)]
94pub struct MultiMeta {
95    metas: BTreeMap<String, Arc<Mutex<LogMetadata>>>,
96
97    /// The version. Updated on flush.
98    /// `(a, b)` is backwards compatible (only has appended content) with
99    /// `(c, d)` if `a == c` and `b >= d`.
100    version: (u64, u64),
101}
102
103impl OpenOptions {
104    /// Create [`OpenOptions`] from names and OpenOptions of [`Log`].
105    pub fn from_name_opts(name_opts: Vec<(&'static str, log::OpenOptions)>) -> Self {
106        // Sanity check.
107        for (name, _) in &name_opts {
108            if name == &"multimeta" {
109                panic!("MultiLog: cannot use 'multimeta' as Log name");
110            } else if name.contains('/') || name.contains('\\') {
111                panic!("MultiLog: cannot use '/' or '\\' in Log name");
112            }
113        }
114        Self {
115            name_open_options: name_opts,
116            leacy_multimeta_source: false,
117        }
118    }
119
120    /// Open [`MultiLog`] at the given directory.
121    ///
122    /// This ignores the `create` option per [`Log`]. [`Log`] and their metadata
123    /// are created on demand.
124    pub fn open(&self, path: &Path) -> crate::Result<MultiLog> {
125        let result: crate::Result<_> = (|| {
126            let reader_lock = ScopedDirLock::new_with_options(path, &READER_LOCK_OPTS)?;
127
128            // The multimeta log contains the "MultiMeta" metadata about how to load other
129            // logs.
130            let meta_log_path = multi_meta_log_path(path);
131            let meta_path = multi_meta_path(path);
132            let mut multimeta_log = multi_meta_log_open_options().open(meta_log_path)?;
133            let multimeta_log_is_empty = multimeta_log.iter().next().is_none();
134
135            // Read meltimeta from the multimeta log.
136            let mut multimeta = MultiMeta::default();
137            if multimeta_log_is_empty || self.leacy_multimeta_source {
138                // Previous versions of MultiLog uses the "multimeta" file. Read it for
139                // compatibility.
140                multimeta.read_file(&meta_path)?;
141            } else {
142                // New version uses a Log for the "multimeta" data. It enables "repair()".
143                multimeta.read_log(&multimeta_log)?;
144                apply_legacy_meta_if_it_is_newer(&meta_path, &mut multimeta);
145            }
146
147            let locked = if !multimeta_log_is_empty
148                && self
149                    .name_open_options
150                    .iter()
151                    .all(|(name, _)| multimeta.metas.contains_key(AsRef::<str>::as_ref(name)))
152            {
153                // Not using legacy format. All keys exist. No need to write files on disk.
154                None
155            } else {
156                // Need to create some Logs and rewrite the multimeta.
157                utils::mkdir_p(path)?;
158                Some(LockGuard(ScopedDirLock::new(path)?))
159            };
160
161            let mut logs = Vec::with_capacity(self.name_open_options.len());
162            for (name, opts) in self.name_open_options.iter() {
163                let fspath = path.join(name);
164                let name_ref: &str = name;
165                if !multimeta.metas.contains_key(name_ref) {
166                    // Create a new Log if it does not exist in MultiMeta.
167                    utils::mkdir_p(&fspath)?;
168                    let meta = log::Log::load_or_create_meta(&fspath.as_path().into(), true)?;
169                    let meta = Arc::new(Mutex::new(meta));
170                    multimeta.metas.insert(name.to_string(), meta);
171                }
172                let path = GenericPath::SharedMeta {
173                    path: Box::new(fspath.as_path().into()),
174                    meta: multimeta.metas[name_ref].clone(),
175                };
176                let log = opts.open(path)?;
177                logs.push(log);
178            }
179
180            if let Some(locked) = locked.as_ref() {
181                if !self.leacy_multimeta_source {
182                    multimeta.write_log(&mut multimeta_log, locked)?;
183                }
184                multimeta.write_file(&meta_path)?;
185            }
186
187            Ok(MultiLog {
188                path: path.to_path_buf(),
189                logs,
190                multimeta,
191                multimeta_log,
192                leacy_multimeta_source: self.leacy_multimeta_source,
193                reader_lock,
194            })
195        })();
196
197        result.context("in multi::OpenOptions::open")
198    }
199}
200
201impl MultiLog {
202    /// Lock the MultiLog directory for writing.
203    ///
204    /// After taking the lock, metadata will be reloaded so [`Log`]s can see the
205    /// latest metadata on disk and do `sync()` accordingly.
206    ///
207    /// Once everything is done, use [`MultiLog::write_meta`] to persistent the
208    /// changed metadata.
209    pub fn lock(&mut self) -> crate::Result<LockGuard> {
210        let result: crate::Result<_> = (|| {
211            let lock = LockGuard(ScopedDirLock::new(&self.path)?);
212            self.read_meta(&lock)?;
213            Ok(lock)
214        })();
215        result.context("in MultiLog::lock")
216    }
217
218    /// Write meta to disk so they become visible to other processes.
219    ///
220    /// A lock must be provided to prove that there is no race condition.
221    /// The lock is usually obtained via `lock()`.
222    pub fn write_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
223        if lock.0.path() != self.path {
224            let msg = format!(
225                "Invalid lock used to write_meta (Lock path = {:?}, MultiLog path = {:?})",
226                lock.0.path(),
227                &self.path
228            );
229            return Err(crate::Error::programming(msg));
230        }
231        let result: crate::Result<_> = (|| {
232            self.multimeta.bump_version();
233            if !self.leacy_multimeta_source {
234                // New MultiLog uses multimeta_log to track MultiMeta.
235                self.multimeta.write_log(&mut self.multimeta_log, lock)?;
236            }
237
238            // Legacy MultiLog uses multimeta file to track MultiMeta.
239            let meta_path = multi_meta_path(&self.path);
240            self.multimeta.write_file(meta_path)?;
241
242            Ok(())
243        })();
244        result.context("in MultiLog::write_meta")
245    }
246
247    /// Return the version in `(epoch, length)` form.
248    ///
249    /// Version gets updated on `write_meta`.
250    ///
251    /// The version is maintained exclusively by indexedlog and cannot be
252    /// changed directly via public APIs. Appending data bumps `length`.
253    /// Rewriting data changes `epoch`.
254    ///
255    /// See also [`crate::log::Log::version`].
256    pub fn version(&self) -> (u64, u64) {
257        self.multimeta.version
258    }
259
260    /// Reload meta from disk so they become visible to Logs.
261    ///
262    /// This is called automatically by `lock` so it's not part of the
263    /// public interface.
264    fn read_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
265        debug_assert_eq!(lock.0.path(), &self.path);
266        (|| -> crate::Result<()> {
267            let meta_path = multi_meta_path(&self.path);
268            if self.leacy_multimeta_source {
269                self.multimeta.read_file(&meta_path)?;
270            } else {
271                self.multimeta_log.clear_dirty()?;
272                self.multimeta_log.sync()?;
273                self.multimeta.read_log(&self.multimeta_log)?;
274                apply_legacy_meta_if_it_is_newer(&meta_path, &mut self.multimeta);
275            }
276            Ok(())
277        })()
278        .context("reloading multimeta")
279    }
280
281    /// Detach [`Log`]s from this [`MultiLog`].
282    ///
283    /// Once detached, [`Log`]s will no longer be available via indexing
284    /// like `multilog[0]`.
285    ///
286    /// This is useful for places where [`Log`]s are owned by other
287    /// structured, instead of being accessed via [`MultiLog`].
288    pub fn detach_logs(&mut self) -> Vec<log::Log> {
289        let mut result = Vec::new();
290        mem::swap(&mut result, &mut self.logs);
291        result
292    }
293
294    /// Sync all [`Log`]s. This is an atomic operation.
295    ///
296    /// This function simply calls [`MultiLog::lock`], [`Log::sync`] and
297    /// [`MultiLog::write_meta`]. For more advanced use-cases, call those
298    /// functions manually.
299    ///
300    /// This function should not be called if logs were detached.
301    /// This does not seem very useful practically. So it is private.
302    fn sync(&mut self) -> crate::Result<()> {
303        let lock = self.lock()?;
304        for log in self.logs.iter_mut() {
305            log.sync()?;
306        }
307        self.write_meta(&lock)?;
308        Ok(())
309    }
310}
311
312fn apply_legacy_meta_if_it_is_newer(meta_path: &Path, multimeta: &mut MultiMeta) {
313    // For safe migration. Also check the "multimeta" file.
314    // It can contain newer data if written by an older version.
315    let mut maybe_new_multimeta = MultiMeta::default();
316    if maybe_new_multimeta.read_file(meta_path).is_ok() {
317        if maybe_new_multimeta.metas.iter().all(|(k, v)| {
318            v.lock().unwrap().primary_len
319                >= match multimeta.metas.get(k) {
320                    None => 0,
321                    Some(v) => v.lock().unwrap().primary_len,
322                }
323        }) {
324            // Only update "primary_len" and "indexes" metadata in place.
325            // The "epoch" might contain changes that need to be preserved.
326            for (k, v) in multimeta.metas.iter() {
327                let mut current = v.lock().unwrap();
328                if let Some(newer) = maybe_new_multimeta.metas.remove(k) {
329                    let newer = newer.lock().unwrap();
330                    current.primary_len = newer.primary_len;
331                    current.indexes = newer.indexes.clone();
332                }
333            }
334        }
335    }
336}
337
338fn multi_meta_log_open_options() -> log::OpenOptions {
339    log::OpenOptions::new()
340        .index("reverse", |_data| -> Vec<_> {
341            // Reverse index so we can find the last entries quickly.
342            vec![log::IndexOutput::Owned(
343                INDEX_REVERSE_KEY.to_vec().into_boxed_slice(),
344            )]
345        })
346        .create(true)
347}
348
349/// Structure proving a lock was taken for [`MultiLog`].
350pub struct LockGuard(ScopedDirLock);
351
352impl ops::Index<usize> for MultiLog {
353    type Output = log::Log;
354    fn index(&self, index: usize) -> &Self::Output {
355        &self.logs[index]
356    }
357}
358
359impl ops::IndexMut<usize> for MultiLog {
360    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
361        &mut self.logs[index]
362    }
363}
364
365impl OpenOptionsRepair for OpenOptions {
366    fn open_options_repair(&self, path: impl AsRef<Path>) -> crate::Result<String> {
367        let path = path.as_ref();
368        let lock = LockGuard(ScopedDirLock::new(path)?);
369        let mut out = RepairMessage::new(path);
370
371        // First, repair the MultiMeta log.
372        let mpath = multi_meta_log_path(path);
373        out += "Repairing MultiMeta Log:\n";
374        out += &indent(&multi_meta_log_open_options().open_options_repair(&mpath)?);
375
376        // Then, repair each logs.
377        let mut repaired_log_metas = HashMap::new();
378        for (name, opts) in self.name_open_options.iter() {
379            let fspath = path.join(name);
380            if !fspath.exists() {
381                out += &format!("Skipping non-existed Log {}\n", name);
382                continue;
383            }
384            out += &format!("Repairing Log {}\n", name);
385            out += &indent(&opts.open_options_repair(&fspath)?);
386            let log = opts.open(&fspath)?;
387            let len = log.meta.primary_len;
388            out += &format!("Log {} has valid length {} after repair\n", name, len);
389            repaired_log_metas.insert(*name, log.meta);
390        }
391
392        // Finally, figure out a good "multimeta" from the multimeta log.
393        let mut mlog = multi_meta_log_open_options()
394            .open(&mpath)
395            .context("repair cannot open MultiMeta Log after repairing it")?;
396        let mut selected_meta = None;
397        let mut invalid_count = 0;
398        for entry in mlog.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)? {
399            // The linked list in the index is in the reversed order.
400            // So the first entry contains the last root id.
401            if let Ok(data) = entry {
402                let mut mmeta = MultiMeta::default();
403                if mmeta.read(data).is_ok() {
404                    // Check if everything is okay.
405                    if mmeta.metas.iter().all(|(name, meta)| {
406                        let len_required = meta.lock().unwrap().primary_len;
407                        let len_provided = repaired_log_metas
408                            .get(name.as_str())
409                            .map(|m| m.primary_len)
410                            .unwrap_or_default();
411                        len_required <= len_provided
412                    }) {
413                        if invalid_count > 0 {
414                            // Write repair log.
415                            let mmeta_desc = mmeta
416                                .metas
417                                .iter()
418                                .map(|(name, meta)| {
419                                    format!("{}: {}", name, meta.lock().unwrap().primary_len)
420                                })
421                                .collect::<Vec<_>>()
422                                .join(", ");
423                            out += &format!(
424                                "Found valid MultiMeta after {} invalid entries: {}\n",
425                                invalid_count, mmeta_desc
426                            );
427                        }
428                        selected_meta = Some(mmeta);
429                        break;
430                    } else {
431                        invalid_count += 1;
432                    }
433                }
434            }
435        }
436
437        if selected_meta.is_none() {
438            // For legacy MultiLog, the MultiMeta is stored in the file.
439            let mut mmeta = MultiMeta::default();
440            if mmeta.read_file(&multi_meta_path(path)).is_ok() {
441                selected_meta = Some(mmeta);
442            }
443        }
444
445        let selected_meta = match selected_meta {
446            None => {
447                return Err(crate::Error::corruption(
448                    &mpath,
449                    "repair cannot find valid MultiMeta",
450                ))
451                .context(|| format!("Repair log:\n{}", indent(out.as_str())));
452            }
453            Some(meta) => meta,
454        };
455
456        let mut should_write_new_meta_entry = invalid_count > 0;
457        for (name, log_meta) in selected_meta.metas.iter() {
458            let mut log_meta = log_meta.lock().unwrap();
459            let should_invalidate_indexes = match repaired_log_metas.get(name.as_str()) {
460                None => true,
461                Some(repaired_log_meta) => &*log_meta != repaired_log_meta,
462            };
463            if should_invalidate_indexes {
464                out += &format!("Invalidated indexes in log '{}'\n", name);
465                log_meta.indexes.clear();
466                should_write_new_meta_entry = true;
467            }
468        }
469
470        if should_write_new_meta_entry {
471            selected_meta
472                .write_log(&mut mlog, &lock)
473                .context("repair cannot write MultiMeta log")?;
474            selected_meta
475                .write_file(multi_meta_path(path))
476                .context("repair cannot write valid MultiMeta file")?;
477            out += "Write valid MultiMeta\n";
478        } else {
479            out += "MultiMeta is valid\n";
480        }
481
482        Ok(out.into_string())
483    }
484}
485
486impl OpenOptionsOutput for OpenOptions {
487    type Output = MultiLog;
488
489    fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
490        self.open(path)
491    }
492}
493
494fn multi_meta_path(dir: &Path) -> PathBuf {
495    dir.join("multimeta")
496}
497
498fn multi_meta_log_path(dir: &Path) -> PathBuf {
499    dir.join("multimetalog")
500}
501
502/// Indent lines by 2 spaces.
503fn indent(s: &str) -> String {
504    s.lines()
505        .map(|l| format!("  {}\n", l))
506        .collect::<Vec<_>>()
507        .concat()
508}
509
510impl Default for MultiMeta {
511    fn default() -> Self {
512        Self {
513            metas: Default::default(),
514            version: (rand_u64(), 0),
515        }
516    }
517}
518
519impl MultiMeta {
520    /// Update self with content from a reader.
521    /// Metadata with existing keys are mutated in-place.
522    fn read(&mut self, mut reader: impl io::Read) -> io::Result<()> {
523        let format_version: usize = reader.read_vlq()?;
524        if format_version != 0 {
525            return Err(io::Error::new(
526                io::ErrorKind::Other,
527                format!("MultiMeta format {} is unsupported", format_version),
528            ));
529        }
530        let count: usize = reader.read_vlq()?;
531        for _ in 0..count {
532            let name_len = reader.read_vlq()?;
533            let mut name_buf = vec![0; name_len];
534            reader.read_exact(&mut name_buf)?;
535            let name = String::from_utf8(name_buf)
536                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Log name is not utf-8"))?;
537            let meta = LogMetadata::read(&mut reader)?;
538            self.metas
539                .entry(name.to_string())
540                .and_modify(|e| {
541                    let mut e = e.lock().unwrap();
542                    let truncated = e.primary_len > meta.primary_len && e.epoch == meta.epoch;
543                    *e = meta.clone();
544                    // Force a different epoch for truncation.
545                    if truncated {
546                        e.epoch = e.epoch.wrapping_add(1);
547                    }
548                })
549                .or_insert_with(|| Arc::new(Mutex::new(meta.clone())));
550        }
551        let version_major: u64 = reader.read_vlq().unwrap_or_else(|_| rand_u64());
552        let version_minor: u64 = reader.read_vlq().unwrap_or_default();
553        self.version = (version_major, version_minor);
554        Ok(())
555    }
556
557    /// Write metadata to a writer.
558    fn write(&self, mut writer: impl io::Write) -> io::Result<()> {
559        let version = 0;
560        writer.write_vlq(version)?;
561        writer.write_vlq(self.metas.len())?;
562        for (name, meta) in self.metas.iter() {
563            writer.write_vlq(name.len())?;
564            writer.write_all(name.as_bytes())?;
565            meta.lock().unwrap().write(&mut writer)?;
566        }
567        writer.write_vlq(self.version.0)?;
568        writer.write_vlq(self.version.1)?;
569        Ok(())
570    }
571
572    /// Update self with metadata from a file (legacy, for backwards compatibility).
573    /// If the file does not exist, self is not updated.
574    fn read_file<P: AsRef<Path>>(&mut self, path: P) -> crate::Result<()> {
575        let path = path.as_ref();
576        match utils::atomic_read(path) {
577            Ok(buf) => self.read(&buf[..]),
578            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
579            Err(e) => Err(e),
580        }
581        .context(path, "when decoding MultiMeta")
582    }
583
584    /// Atomically write metadata to a file (legacy, for backwards compatibility).
585    fn write_file<P: AsRef<Path>>(&self, path: P) -> crate::Result<()> {
586        let mut buf = Vec::new();
587        self.write(&mut buf).infallible()?;
588        utils::atomic_write(path, &buf, false)?;
589        Ok(())
590    }
591
592    /// Update self from a [`log::Log`].
593    fn read_log(&mut self, log: &log::Log) -> crate::Result<()> {
594        if let Some(last_entry) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
595            let data = last_entry?;
596            self.read(data).context(
597                log.path().as_opt_path().unwrap_or_else(|| Path::new("")),
598                "when decoding MutltiMeta",
599            )?;
600        }
601        Ok(())
602    }
603
604    /// Write metadata to a [`log::Log`] and persist to disk.
605    fn write_log(&self, log: &mut log::Log, _lock: &LockGuard) -> crate::Result<()> {
606        let mut data = Vec::new();
607        self.write(&mut data).infallible()?;
608        // Reload to check if the last entry is already up-to-date.
609        log.clear_dirty()?;
610        log.sync()?;
611        if let Some(Ok(last_data)) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
612            if last_data == &data {
613                // log does not change. Do not write redundant data.
614                return Ok(());
615            }
616        }
617        log.append(&data)?;
618        log.sync()?;
619        Ok(())
620    }
621
622    /// Bump the version recorded in this [`MultiMeta`].
623    fn bump_version(&mut self) {
624        self.version.1 += 1;
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use log::tests::pwrite;
631    use quickcheck::quickcheck;
632
633    use super::*;
634
635    fn simple_open_opts() -> OpenOptions {
636        OpenOptions::from_name_opts(vec![
637            ("a", log::OpenOptions::new()),
638            ("b", log::OpenOptions::new()),
639        ])
640    }
641
642    /// Create a simple MultiLog containing Log 'a' and 'b' for testing.
643    fn simple_multilog(path: &Path) -> MultiLog {
644        let mopts = simple_open_opts();
645        mopts.open(path).unwrap()
646    }
647
648    fn index_open_opts() -> OpenOptions {
649        fn index_func(bytes: &[u8]) -> Vec<log::IndexOutput> {
650            (0..bytes.len() as u64)
651                .map(|i| log::IndexOutput::Reference(i..i + 1))
652                .collect()
653        }
654        let index_def = log::IndexDef::new("x", index_func).lag_threshold(0);
655        OpenOptions::from_name_opts(vec![(
656            "a",
657            log::OpenOptions::new().index_defs(vec![index_def]),
658        )])
659    }
660
661    #[test]
662    fn test_individual_log_can_be_opened_directly() {
663        let dir = tempfile::tempdir().unwrap();
664        let path = dir.path();
665        let mut mlog = simple_multilog(path);
666
667        log::OpenOptions::new().open(path.join("a")).unwrap();
668        log::OpenOptions::new().open(path.join("b")).unwrap();
669
670        // After flush - still readable.
671        mlog[0].append(b"1").unwrap();
672        mlog[0].flush().unwrap();
673        log::OpenOptions::new().open(path.join("a")).unwrap();
674    }
675
676    #[test]
677    fn test_individual_log_flushes_are_invisible() {
678        let dir = tempfile::tempdir().unwrap();
679        let path = dir.path();
680        let mut mlog = simple_multilog(path);
681
682        // This is not a proper use of Log::sync, since
683        // it's not protected by a lock. But it demonstrates
684        // the properties.
685        mlog[0].append(b"2").unwrap();
686        mlog[0].sync().unwrap();
687        mlog[0].append(b"3").unwrap();
688        mlog[0].append(b"4").unwrap();
689
690        mlog[1].append(b"y").unwrap();
691        mlog[1].sync().unwrap();
692        mlog[1].append(b"z").unwrap();
693        mlog[1].sync().unwrap();
694
695        assert_eq!(mlog[0].iter().count(), 3);
696        assert_eq!(mlog[1].iter().count(), 2);
697
698        // mlog changes are not written via MultiLog::write_meta.
699        // Therefore invisible to mlog2.
700        let mlog2 = simple_multilog(path);
701        assert_eq!(mlog2[0].iter().count(), 0);
702        assert_eq!(mlog2[1].iter().count(), 0);
703
704        // mlog.sync reloads multimeta. "Flushed" contents are dropped.
705        // But in-memory content is kept and written.
706        mlog.sync().unwrap();
707        assert_eq!(mlog[0].iter().count(), 2);
708        assert_eq!(mlog[1].iter().count(), 0);
709
710        let mlog2 = simple_multilog(path);
711        assert_eq!(mlog2[0].iter().count(), 2);
712        assert_eq!(mlog2[1].iter().count(), 0);
713    }
714
715    #[test]
716    fn test_version() {
717        let dir = tempfile::tempdir().unwrap();
718        let path = dir.path();
719        let mut mlog1 = simple_multilog(&path.join("1"));
720        let mut mlog2 = simple_multilog(&path.join("2"));
721
722        // Different logs have different versions.
723        let v1 = mlog1.version();
724        let v2 = mlog2.version();
725        assert!(v1.1 == 0);
726        assert!(v2.1 == 0);
727        assert_ne!(v1, v2);
728
729        // The second number of the version gets bumped on flush.
730        mlog1.sync().unwrap();
731        mlog2.sync().unwrap();
732        let v3 = mlog1.version();
733        let v4 = mlog2.version();
734        assert_eq!(v3.0, v1.0);
735        assert_eq!(v4.0, v2.0);
736        assert!(v3 > v1);
737        assert!(v4 > v2);
738
739        // Reopen preserves the versions.
740        let mlog1 = simple_multilog(&path.join("1"));
741        let mlog2 = simple_multilog(&path.join("2"));
742        let v5 = mlog1.version();
743        let v6 = mlog2.version();
744        assert_eq!(v5, v3);
745        assert_eq!(v6, v4);
746    }
747
748    #[test]
749    fn test_detach_logs() {
750        let dir = tempfile::tempdir().unwrap();
751        let path = dir.path();
752        let mut mlog = simple_multilog(path);
753        let mut logs = mlog.detach_logs();
754        logs[0].append(b"0").unwrap();
755        logs[1].append(b"1").unwrap();
756
757        // Although logs are detached. MultiLog can still update multimeta.
758        let lock = mlog.lock().unwrap();
759        logs[0].sync().unwrap();
760        logs[1].sync().unwrap();
761        mlog.write_meta(&lock).unwrap();
762        drop(lock);
763
764        let mlog2 = simple_multilog(path);
765        assert_eq!(mlog2[0].iter().count(), 1);
766        assert_eq!(mlog2[1].iter().count(), 1);
767    }
768
769    #[test]
770    fn test_new_index_built_only_once() {
771        let dir = tempfile::tempdir().unwrap();
772        let path = dir.path();
773        let mopts = OpenOptions::from_name_opts(vec![("a", log::OpenOptions::new())]);
774        let mut mlog = mopts.open(path).unwrap();
775        mlog[0].append(b"0").unwrap();
776        mlog.sync().unwrap();
777
778        // Reopen with an index newly defined.
779        let index_def =
780            log::IndexDef::new("i", |_| vec![log::IndexOutput::Reference(0..1)]).lag_threshold(0);
781        let mopts = OpenOptions::from_name_opts(vec![(
782            "a",
783            log::OpenOptions::new().index_defs(vec![index_def.clone()]),
784        )]);
785        let index_size = || {
786            path.join("a")
787                .join(index_def.filename())
788                .metadata()
789                .map(|m| m.len())
790                .unwrap_or_default()
791        };
792
793        assert_eq!(index_size(), 0);
794
795        // Open one time, index is built on demand.
796        let _mlog = mopts.open(path).unwrap();
797        assert_eq!(index_size(), 36);
798
799        // Open another time, index is reused.
800        let mut mlog = mopts.open(path).unwrap();
801        assert_eq!(index_size(), 36);
802
803        // Force updating epoch to make multimeta and per-log meta incompatible.
804        let lock = LockGuard(ScopedDirLock::new(path).unwrap());
805        mlog.multimeta.metas["a"].lock().unwrap().epoch ^= 1;
806        mlog.multimeta
807            .write_log(&mut mlog.multimeta_log, &lock)
808            .unwrap();
809        mlog.multimeta.write_file(multi_meta_path(path)).unwrap();
810        drop(lock);
811
812        // The index is rebuilt (appended) at open time because of incompatible meta.
813        let _mlog = mopts.open(path).unwrap();
814        assert_eq!(index_size(), 71);
815    }
816
817    #[test]
818    fn test_wrong_locks_cause_errors() {
819        let dir = tempfile::tempdir().unwrap();
820        let path = dir.path();
821        let mut mlog1 = simple_multilog(&path.join("1"));
822        let mut mlog2 = simple_multilog(&path.join("2"));
823
824        let lock1 = mlog1.lock().unwrap();
825        let lock2 = mlog2.lock().unwrap();
826        assert!(mlog1.write_meta(&lock2).is_err());
827        assert!(mlog2.write_meta(&lock1).is_err());
828    }
829
830    fn repair_output(opts: &OpenOptions, path: &Path) -> String {
831        let out = opts.open_options_repair(path).unwrap();
832        filter_repair_output(out)
833    }
834
835    fn filter_repair_output(out: String) -> String {
836        // Filter out dynamic content.
837        out.lines()
838            .filter(|l| {
839                !l.contains("bytes in log")
840                    && !l.contains("Backed up")
841                    && !l.contains("Processing")
842                    && !l.contains("date -d")
843            })
844            .collect::<Vec<_>>()
845            .join("\n")
846    }
847
848    #[test]
849    fn test_repair() {
850        let dir = tempfile::tempdir().unwrap();
851        let path = dir.path();
852        let opts = simple_open_opts();
853        let mut mlog = opts.open(path).unwrap();
854        let mut logs = mlog.detach_logs();
855
856        // Create 10 "multimeta"s. Each MultiMeta contains N entires for each log.
857        const N: usize = 12;
858        for i in 0..10u32 {
859            let lock = mlog.lock().unwrap();
860            for _ in 0..N {
861                logs[0].append(i.to_be_bytes()).unwrap();
862                logs[1].append(i.to_be_bytes()).unwrap();
863                logs[0].sync().unwrap();
864            }
865            logs[1].sync().unwrap();
866            mlog.write_meta(&lock).unwrap();
867        }
868
869        let repair = || repair_output(&opts, path);
870
871        // Check that both logs only have a multiple of N entries.
872        let verify = || {
873            let mlog = opts.open(path).unwrap();
874            assert_eq!(mlog.logs[0].iter().count() % N, 0);
875            assert_eq!(mlog.logs[1].iter().count() % N, 0);
876        };
877
878        // Valid MultiLog.
879        let s1 = repair();
880        assert_eq!(
881            &s1,
882            r#"Repairing MultiMeta Log:
883  Index "reverse" passed integrity check
884Repairing Log a
885Log a has valid length 1212 after repair
886Repairing Log b
887Log b has valid length 1212 after repair
888MultiMeta is valid"#
889        );
890
891        // Repair output is also written to "repair.log" file.
892        let s2 = filter_repair_output(std::fs::read_to_string(path.join("repair.log")).unwrap());
893        assert_eq!(&s1, s2.trim_end());
894
895        // Put bad data in the first log. The repair will pick a recent MultiMeta point and
896        // dropping some entries.
897        pwrite(&path.join("a").join("log"), 1000, b"ff");
898        assert_eq!(
899            repair(),
900            r#"Repairing MultiMeta Log:
901  Index "reverse" passed integrity check
902Repairing Log a
903  Reset log size to 992
904Log a has valid length 992 after repair
905Repairing Log b
906Log b has valid length 1212 after repair
907Found valid MultiMeta after 2 invalid entries: a: 972, b: 972
908Invalidated indexes in log 'a'
909Invalidated indexes in log 'b'
910Write valid MultiMeta"#
911        );
912        verify();
913
914        assert_eq!(
915            repair(),
916            r#"Repairing MultiMeta Log:
917  Index "reverse" passed integrity check
918Repairing Log a
919Log a has valid length 992 after repair
920Repairing Log b
921Log b has valid length 1212 after repair
922Invalidated indexes in log 'a'
923Invalidated indexes in log 'b'
924Write valid MultiMeta"#
925        );
926    }
927
928    #[test]
929    fn test_repair_broken_index() {
930        // Test repair where the logs are fine but the indexes are broken.
931        let dir = tempfile::tempdir().unwrap();
932        let path = dir.path();
933        let opts = index_open_opts();
934        let mut mlog = opts.open(path).unwrap();
935        let mut logs = mlog.detach_logs();
936
937        let repair = || repair_output(&opts, path);
938        let file_size = |path| std::fs::metadata(path).unwrap().len();
939
940        let meta_path = multi_meta_path(path);
941        let meta_log_path = multi_meta_log_path(path).join("log");
942        let index_path = path.join("a").join("index2-x");
943
944        // Write some data. Flush the "log" multiple times to cause index
945        // fragmentation so the rebuilt index would be shorter.
946        let mut meta_log_sizes = Vec::new();
947        let mut index_sizes = Vec::new();
948        for data in [b"abcd", b"abce", b"acde", b"bcde"] {
949            let lock = mlog.lock().unwrap();
950            logs[0].append(data).unwrap();
951            logs[0].sync().unwrap();
952            mlog.write_meta(&lock).unwrap();
953            meta_log_sizes.push(file_size(&meta_log_path));
954            index_sizes.push(file_size(&index_path));
955        }
956        drop(mlog);
957        drop(logs);
958
959        // Corrupt the index and the multimeta log so the repair
960        // logic would revert to a previous MultiMeta, and rebuild
961        // index. If it's not careful, MultiMeta can contain offsets
962        // to the index file that is no longer valid.
963        pwrite(&index_path, -4, b"ffff");
964        pwrite(&meta_log_path, (meta_log_sizes[1] - 5) as _, b"xxxxx");
965        std::fs::remove_file(meta_path).unwrap();
966
967        let index_len_before = file_size(&index_path);
968        assert_eq!(
969            repair(),
970            r#"Repairing MultiMeta Log:
971  Reset log size to 111
972  Rebuilt index "reverse"
973Repairing Log a
974  Rebuilt index "x"
975Log a has valid length 52 after repair
976Invalidated indexes in log 'a'
977Write valid MultiMeta"#
978        );
979
980        // Index should be rebuilt (shorter).
981        let index_len_after = file_size(&index_path);
982        assert!(index_len_before > index_len_after);
983
984        // The MultiLog can be opened fine.
985        opts.open(path).map(|_| 1).unwrap();
986    }
987
988    #[test]
989    fn test_mixed_old_new_read_writes() {
990        let dir = tempfile::tempdir().unwrap();
991        let path = dir.path();
992
993        let mut mlog_new = simple_open_opts().open(path).unwrap();
994        let mut logs_new = mlog_new.detach_logs();
995
996        let mut mlog_old = {
997            let mut opts = simple_open_opts();
998            opts.leacy_multimeta_source = true;
999            opts.open(path).unwrap()
1000        };
1001        let mut logs_old = mlog_old.detach_logs();
1002
1003        // Mixed writes from old and new mlogs.
1004        const N: usize = 2;
1005        for i in 0..N {
1006            for (mlog, logs, j) in [
1007                (&mut mlog_new, &mut logs_new, 0u8),
1008                (&mut mlog_old, &mut logs_old, 1u8),
1009            ] {
1010                let lock = mlog.lock().unwrap();
1011                logs[0].append([i as u8, j]).unwrap();
1012                logs[0].sync().unwrap();
1013                mlog.write_meta(&lock).unwrap();
1014            }
1015        }
1016
1017        // Reading the log. It should contain N * 2 entries.
1018        let mlog = simple_open_opts().open(path).unwrap();
1019        assert_eq!(
1020            mlog.logs[0].iter().map(|e| e.unwrap()).collect::<Vec<_>>(),
1021            [[0, 0], [0, 1], [1, 0], [1, 1]],
1022        );
1023    }
1024
1025    quickcheck! {
1026        fn test_roundtrip_multimeta(name_len_list: Vec<(String, u64)>, version: (u64, u64)) -> bool {
1027            let metas = name_len_list
1028                .into_iter()
1029                .map(|(name, len)| {
1030                    let meta = LogMetadata::new_with_primary_len(len);
1031                    (name, Arc::new(Mutex::new(meta)))
1032                })
1033                .collect();
1034            let meta = MultiMeta { metas, version, ..Default::default() };
1035            let mut buf = Vec::new();
1036            meta.write(&mut buf).unwrap();
1037            let mut meta2 = MultiMeta::default();
1038            meta2.read(&buf[..]).unwrap();
1039            let mut buf2 = Vec::new();
1040            meta2.write(&mut buf2).unwrap();
1041            assert_eq!(buf2, buf);
1042            buf2 == buf
1043        }
1044
1045        fn test_roundtrip_multilog(list_a: Vec<Vec<u8>>, list_b: Vec<Vec<u8>>) -> bool {
1046            let dir = tempfile::tempdir().unwrap();
1047            let mut mlog = simple_multilog(dir.path());
1048            for a in &list_a {
1049                mlog[0].append(a).unwrap();
1050            }
1051            for b in &list_b {
1052                mlog[1].append(b).unwrap();
1053            }
1054            mlog.sync().unwrap();
1055
1056            let mlog_read = simple_multilog(dir.path());
1057            let list_a_read: Vec<Vec<u8>> = mlog_read[0].iter().map(|e| e.unwrap().to_vec()).collect();
1058            let list_b_read: Vec<Vec<u8>> = mlog_read[1].iter().map(|e| e.unwrap().to_vec()).collect();
1059
1060            list_a == list_a_read && list_b == list_b_read
1061        }
1062    }
1063}