indexedlog/
multi.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! Atomic `sync` support for multiple [`Log`]s.
9
10use std::collections::BTreeMap;
11use std::collections::HashMap;
12use std::io;
13use std::mem;
14use std::ops;
15use std::path::Path;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::sync::Mutex;
19
20use vlqencoding::VLQDecode;
21use vlqencoding::VLQEncode;
22
23use crate::errors::IoResultExt;
24use crate::errors::ResultExt;
25use crate::lock::ScopedDirLock;
26use crate::lock::READER_LOCK_OPTS;
27use crate::log;
28use crate::log::GenericPath;
29use crate::log::LogMetadata;
30use crate::repair::OpenOptionsOutput;
31use crate::repair::OpenOptionsRepair;
32use crate::repair::RepairMessage;
33use crate::utils;
34use crate::utils::rand_u64;
35
36/// Options used to configure how a [`MultiLog`] is opened.
37#[derive(Clone, Default)]
38pub struct OpenOptions {
39    /// Name (subdir) of the Log and its OpenOptions.
40    name_open_options: Vec<(&'static str, log::OpenOptions)>,
41
42    /// Whether to use legacy MultiMeta source.
43    /// true: use "multimeta" file; false: use "multimeta_log" Log.
44    /// For testing purpose only.
45    leacy_multimeta_source: bool,
46}
47
48/// A [`MultiLog`] contains multiple [`Log`]s with a centric metadata file.
49///
50/// Metadata is "frozen" and changes to the metadata on the filesystem are not
51/// visible to Logs until [`MultiLog::lock`] gets called.  The only way to write
52/// the centric metadata back to the filesystem is [`MultiLog::write_meta`].
53/// Note: [`MultiLog::sync`] calls the above functions and is another way to
54/// exchange data with the filesystem.
55///
56/// [`Log`]s will be accessible via indexing. For example, `multilog[0]`
57/// accesses the first [`Log`]. [`Log`]s can also be moved out of this
58/// struct by [`MultiLog::detach_logs`].
59///
60/// [`MultiLog`] makes sure the data consistency on disk but not always
61/// in memory. In case [`MultiLog::write_meta`] is not called or is not
62/// successful, but [`Log::sync`] was called. The data in [`Log`] might
63/// be rewritten by other processes, breaking the [`Log`]!
64pub struct MultiLog {
65    /// Directory containing all the Logs.
66    /// Used to write metadata.
67    path: PathBuf,
68
69    /// Combined metadata from logs.
70    multimeta: MultiMeta,
71
72    /// Logs loaded by MultiLog.
73    logs: Vec<log::Log>,
74
75    /// Log used for `MultiMeta`. For data recovery.
76    multimeta_log: log::Log,
77
78    /// Whether to use legacy MultiMeta source.
79    /// true: use "multimeta" file; false: use "multimeta_log" Log.
80    /// For testing purpose only.
81    leacy_multimeta_source: bool,
82
83    /// Indicate an active reader. Destrictive writes (repair) are unsafe.
84    reader_lock: ScopedDirLock,
85}
86
87/// Constant for the reverse index of multimeta log.
88const INDEX_REVERSE_KEY: &[u8] = b"r";
89
90/// The reverse index is the first index. See [`multi_meta_log_open_options`].
91const INDEX_REVERSE: usize = 0;
92
93#[derive(Debug)]
94pub struct MultiMeta {
95    metas: BTreeMap<String, Arc<Mutex<LogMetadata>>>,
96
97    /// The version. Updated on flush.
98    /// `(a, b)` is backwards compatible (only has appended content) with
99    /// `(c, d)` if `a == c` and `b >= d`.
100    version: (u64, u64),
101}
102
103impl OpenOptions {
104    /// Create [`OpenOptions`] from names and OpenOptions of [`Log`].
105    pub fn from_name_opts(name_opts: Vec<(&'static str, log::OpenOptions)>) -> Self {
106        // Sanity check.
107        for (name, _) in &name_opts {
108            if name == &"multimeta" {
109                panic!("MultiLog: cannot use 'multimeta' as Log name");
110            } else if name.contains('/') || name.contains('\\') {
111                panic!("MultiLog: cannot use '/' or '\\' in Log name");
112            }
113        }
114        Self {
115            name_open_options: name_opts,
116            leacy_multimeta_source: false,
117        }
118    }
119
120    /// Open [`MultiLog`] at the given directory.
121    ///
122    /// This ignores the `create` option per [`Log`]. [`Log`] and their metadata
123    /// are created on demand.
124    pub fn open(&self, path: &Path) -> crate::Result<MultiLog> {
125        let result: crate::Result<_> = (|| {
126            let reader_lock = ScopedDirLock::new_with_options(path, &READER_LOCK_OPTS)?;
127
128            // The multimeta log contains the "MultiMeta" metadata about how to load other
129            // logs.
130            let meta_log_path = multi_meta_log_path(&path);
131            let meta_path = multi_meta_path(path);
132            let mut multimeta_log = multi_meta_log_open_options().open(&meta_log_path)?;
133            let multimeta_log_is_empty = multimeta_log.iter().next().is_none();
134
135            // Read meltimeta from the multimeta log.
136            let mut multimeta = MultiMeta::default();
137            if multimeta_log_is_empty || self.leacy_multimeta_source {
138                // Previous versions of MultiLog uses the "multimeta" file. Read it for
139                // compatibility.
140                multimeta.read_file(&meta_path)?;
141            } else {
142                // New version uses a Log for the "multimeta" data. It enables "repair()".
143                multimeta.read_log(&multimeta_log)?;
144                apply_legacy_meta_if_it_is_newer(&meta_path, &mut multimeta);
145            }
146
147            let locked = if !multimeta_log_is_empty
148                && self
149                    .name_open_options
150                    .iter()
151                    .all(|(name, _)| multimeta.metas.contains_key(AsRef::<str>::as_ref(name)))
152            {
153                // Not using legacy format. All keys exist. No need to write files on disk.
154                None
155            } else {
156                // Need to create some Logs and rewrite the multimeta.
157                utils::mkdir_p(path)?;
158                Some(LockGuard(ScopedDirLock::new(path)?))
159            };
160
161            let mut logs = Vec::with_capacity(self.name_open_options.len());
162            for (name, opts) in self.name_open_options.iter() {
163                let fspath = path.join(name);
164                let name_ref: &str = name;
165                if !multimeta.metas.contains_key(name_ref) {
166                    // Create a new Log if it does not exist in MultiMeta.
167                    utils::mkdir_p(&fspath)?;
168                    let meta = log::Log::load_or_create_meta(&fspath.as_path().into(), true)?;
169                    let meta = Arc::new(Mutex::new(meta));
170                    multimeta.metas.insert(name.to_string(), meta);
171                }
172                let path = GenericPath::SharedMeta {
173                    path: Box::new(fspath.as_path().into()),
174                    meta: multimeta.metas[name_ref].clone(),
175                };
176                let log = opts.open(path)?;
177                logs.push(log);
178            }
179
180            if let Some(locked) = locked.as_ref() {
181                if !self.leacy_multimeta_source {
182                    multimeta.write_log(&mut multimeta_log, locked)?;
183                }
184                multimeta.write_file(&meta_path)?;
185            }
186
187            Ok(MultiLog {
188                path: path.to_path_buf(),
189                logs,
190                multimeta,
191                multimeta_log,
192                leacy_multimeta_source: self.leacy_multimeta_source,
193                reader_lock,
194            })
195        })();
196
197        result.context("in multi::OpenOptions::open")
198    }
199}
200
201impl MultiLog {
202    /// Lock the MultiLog directory for writing.
203    ///
204    /// After taking the lock, metadata will be reloaded so [`Log`]s can see the
205    /// latest metadata on disk and do `sync()` accordingly.
206    ///
207    /// Once everything is done, use [`MultiLog::write_meta`] to persistent the
208    /// changed metadata.
209    pub fn lock(&mut self) -> crate::Result<LockGuard> {
210        let result: crate::Result<_> = (|| {
211            let lock = LockGuard(ScopedDirLock::new(&self.path)?);
212            self.read_meta(&lock)?;
213            Ok(lock)
214        })();
215        result.context("in MultiLog::lock")
216    }
217
218    /// Write meta to disk so they become visible to other processes.
219    ///
220    /// A lock must be provided to prove that there is no race condition.
221    /// The lock is usually obtained via `lock()`.
222    pub fn write_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
223        if lock.0.path() != self.path {
224            let msg = format!(
225                "Invalid lock used to write_meta (Lock path = {:?}, MultiLog path = {:?})",
226                lock.0.path(),
227                &self.path
228            );
229            return Err(crate::Error::programming(msg));
230        }
231        let result: crate::Result<_> = (|| {
232            self.multimeta.bump_version();
233            if !self.leacy_multimeta_source {
234                // New MultiLog uses multimeta_log to track MultiMeta.
235                self.multimeta.write_log(&mut self.multimeta_log, lock)?;
236            }
237
238            // Legacy MultiLog uses multimeta file to track MultiMeta.
239            let meta_path = multi_meta_path(&self.path);
240            self.multimeta.write_file(&meta_path)?;
241
242            Ok(())
243        })();
244        result.context("in MultiLog::write_meta")
245    }
246
247    /// Return the version in `(a, b)` form.
248    ///
249    /// Version `(a, b)` only has append-only data than version `(c, d)`, if
250    /// `a == c` and `b > d`.
251    ///
252    /// Version `(a, _)` is incompatible with version `(b, _)` if `a != b`.
253    ///
254    /// Version gets updated on `write_meta`.
255    pub fn version(&self) -> (u64, u64) {
256        self.multimeta.version
257    }
258
259    /// Reload meta from disk so they become visible to Logs.
260    ///
261    /// This is called automatically by `lock` so it's not part of the
262    /// public interface.
263    fn read_meta(&mut self, lock: &LockGuard) -> crate::Result<()> {
264        debug_assert_eq!(lock.0.path(), &self.path);
265        (|| -> crate::Result<()> {
266            let meta_path = multi_meta_path(&self.path);
267            if self.leacy_multimeta_source {
268                self.multimeta.read_file(&meta_path)?;
269            } else {
270                self.multimeta_log.clear_dirty()?;
271                self.multimeta_log.sync()?;
272                self.multimeta.read_log(&self.multimeta_log)?;
273                apply_legacy_meta_if_it_is_newer(&meta_path, &mut self.multimeta);
274            }
275            Ok(())
276        })()
277        .context("reloading multimeta")
278    }
279
280    /// Detach [`Log`]s from this [`MultiLog`].
281    ///
282    /// Once detached, [`Log`]s will no longer be available via indexing
283    /// like `multilog[0]`.
284    ///
285    /// This is useful for places where [`Log`]s are owned by other
286    /// structured, instead of being accessed via [`MultiLog`].
287    pub fn detach_logs(&mut self) -> Vec<log::Log> {
288        let mut result = Vec::new();
289        mem::swap(&mut result, &mut self.logs);
290        result
291    }
292
293    /// Sync all [`Log`]s. This is an atomic operation.
294    ///
295    /// This function simply calls [`MultiLog::lock`], [`Log::sync`] and
296    /// [`MultiLog::write_meta`]. For more advanced use-cases, call those
297    /// functions manually.
298    ///
299    /// This function should not be called if logs were detached.
300    /// This does not seem very useful practically. So it is private.
301    fn sync(&mut self) -> crate::Result<()> {
302        let lock = self.lock()?;
303        for log in self.logs.iter_mut() {
304            log.sync()?;
305        }
306        self.write_meta(&lock)?;
307        Ok(())
308    }
309}
310
311fn apply_legacy_meta_if_it_is_newer(meta_path: &Path, multimeta: &mut MultiMeta) {
312    // For safe migration. Also check the "multimeta" file.
313    // It can contain newer data if written by an older version.
314    let mut maybe_new_multimeta = MultiMeta::default();
315    if maybe_new_multimeta.read_file(meta_path).is_ok() {
316        if maybe_new_multimeta.metas.iter().all(|(k, v)| {
317            v.lock().unwrap().primary_len
318                >= match multimeta.metas.get(k) {
319                    None => 0,
320                    Some(v) => v.lock().unwrap().primary_len,
321                }
322        }) {
323            // Only update "primary_len" and "indexes" metadata in place.
324            // The "epoch" might contain changes that need to be preserved.
325            for (k, v) in multimeta.metas.iter() {
326                let mut current = v.lock().unwrap();
327                if let Some(newer) = maybe_new_multimeta.metas.remove(k) {
328                    let newer = newer.lock().unwrap();
329                    current.primary_len = newer.primary_len;
330                    current.indexes = newer.indexes.clone();
331                }
332            }
333        }
334    }
335}
336
337fn multi_meta_log_open_options() -> log::OpenOptions {
338    log::OpenOptions::new()
339        .index("reverse", |_data| -> Vec<_> {
340            // Reverse index so we can find the last entries quickly.
341            vec![log::IndexOutput::Owned(
342                INDEX_REVERSE_KEY.to_vec().into_boxed_slice(),
343            )]
344        })
345        .create(true)
346}
347
348/// Structure proving a lock was taken for [`MultiLog`].
349pub struct LockGuard(ScopedDirLock);
350
351impl ops::Index<usize> for MultiLog {
352    type Output = log::Log;
353    fn index(&self, index: usize) -> &Self::Output {
354        &self.logs[index]
355    }
356}
357
358impl ops::IndexMut<usize> for MultiLog {
359    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
360        &mut self.logs[index]
361    }
362}
363
364impl OpenOptionsRepair for OpenOptions {
365    fn open_options_repair(&self, path: impl AsRef<Path>) -> crate::Result<String> {
366        let path = path.as_ref();
367        let lock = LockGuard(ScopedDirLock::new(path)?);
368        let mut out = RepairMessage::new(path);
369
370        // First, repair the MultiMeta log.
371        let mpath = multi_meta_log_path(path);
372        out += "Repairing MultiMeta Log:\n";
373        out += &indent(&multi_meta_log_open_options().open_options_repair(&mpath)?);
374
375        // Then, repair each logs.
376        let mut repaired_log_metas = HashMap::new();
377        for (name, opts) in self.name_open_options.iter() {
378            let fspath = path.join(name);
379            if !fspath.exists() {
380                out += &format!("Skipping non-existed Log {}\n", name);
381                continue;
382            }
383            out += &format!("Repairing Log {}\n", name);
384            out += &indent(&opts.open_options_repair(&fspath)?);
385            let log = opts.open(&fspath)?;
386            let len = log.meta.primary_len;
387            out += &format!("Log {} has valid length {} after repair\n", name, len);
388            repaired_log_metas.insert(*name, log.meta);
389        }
390
391        // Finally, figure out a good "multimeta" from the multimeta log.
392        let mut mlog = multi_meta_log_open_options()
393            .open(&mpath)
394            .context("repair cannot open MultiMeta Log after repairing it")?;
395        let mut selected_meta = None;
396        let mut invalid_count = 0;
397        for entry in mlog.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)? {
398            // The linked list in the index is in the reversed order.
399            // So the first entry contains the last root id.
400            if let Ok(data) = entry {
401                let mut mmeta = MultiMeta::default();
402                if mmeta.read(data).is_ok() {
403                    // Check if everything is okay.
404                    if mmeta.metas.iter().all(|(name, meta)| {
405                        let len_required = meta.lock().unwrap().primary_len;
406                        let len_provided = repaired_log_metas
407                            .get(name.as_str())
408                            .map(|m| m.primary_len)
409                            .unwrap_or_default();
410                        len_required <= len_provided
411                    }) {
412                        if invalid_count > 0 {
413                            // Write repair log.
414                            let mmeta_desc = mmeta
415                                .metas
416                                .iter()
417                                .map(|(name, meta)| {
418                                    format!("{}: {}", name, meta.lock().unwrap().primary_len)
419                                })
420                                .collect::<Vec<_>>()
421                                .join(", ");
422                            out += &format!(
423                                "Found valid MultiMeta after {} invalid entries: {}\n",
424                                invalid_count, mmeta_desc
425                            );
426                        }
427                        selected_meta = Some(mmeta);
428                        break;
429                    } else {
430                        invalid_count += 1;
431                    }
432                }
433            }
434        }
435
436        if selected_meta.is_none() {
437            // For legacy MultiLog, the MultiMeta is stored in the file.
438            let mut mmeta = MultiMeta::default();
439            if mmeta.read_file(&multi_meta_path(path)).is_ok() {
440                selected_meta = Some(mmeta);
441            }
442        }
443
444        let selected_meta = match selected_meta {
445            None => {
446                return Err(crate::Error::corruption(
447                    &mpath,
448                    "repair cannot find valid MultiMeta",
449                ))
450                .context(|| format!("Repair log:\n{}", indent(out.as_str())));
451            }
452            Some(meta) => meta,
453        };
454
455        let mut should_write_new_meta_entry = invalid_count > 0;
456        for (name, log_meta) in selected_meta.metas.iter() {
457            let mut log_meta = log_meta.lock().unwrap();
458            let should_invalidate_indexes = match repaired_log_metas.get(name.as_str()) {
459                None => true,
460                Some(repaired_log_meta) => &*log_meta != repaired_log_meta,
461            };
462            if should_invalidate_indexes {
463                out += &format!("Invalidated indexes in log '{}'\n", name);
464                log_meta.indexes.clear();
465                should_write_new_meta_entry = true;
466            }
467        }
468
469        if should_write_new_meta_entry {
470            selected_meta
471                .write_log(&mut mlog, &lock)
472                .context("repair cannot write MultiMeta log")?;
473            selected_meta
474                .write_file(multi_meta_path(path))
475                .context("repair cannot write valid MultiMeta file")?;
476            out += "Write valid MultiMeta\n";
477        } else {
478            out += "MultiMeta is valid\n";
479        }
480
481        Ok(out.into_string())
482    }
483}
484
485impl OpenOptionsOutput for OpenOptions {
486    type Output = MultiLog;
487
488    fn open_path(&self, path: &Path) -> crate::Result<Self::Output> {
489        self.open(path)
490    }
491}
492
493fn multi_meta_path(dir: &Path) -> PathBuf {
494    dir.join("multimeta")
495}
496
497fn multi_meta_log_path(dir: &Path) -> PathBuf {
498    dir.join("multimetalog")
499}
500
501/// Indent lines by 2 spaces.
502fn indent(s: &str) -> String {
503    s.lines()
504        .map(|l| format!("  {}\n", l))
505        .collect::<Vec<_>>()
506        .concat()
507}
508
509impl Default for MultiMeta {
510    fn default() -> Self {
511        Self {
512            metas: Default::default(),
513            version: (rand_u64(), 0),
514        }
515    }
516}
517
518impl MultiMeta {
519    /// Update self with content from a reader.
520    /// Metadata with existing keys are mutated in-place.
521    fn read(&mut self, mut reader: impl io::Read) -> io::Result<()> {
522        let format_version: usize = reader.read_vlq()?;
523        if format_version != 0 {
524            return Err(io::Error::new(
525                io::ErrorKind::Other,
526                format!("MultiMeta format {} is unsupported", format_version),
527            ));
528        }
529        let count: usize = reader.read_vlq()?;
530        for _ in 0..count {
531            let name_len = reader.read_vlq()?;
532            let mut name_buf = vec![0; name_len];
533            reader.read_exact(&mut name_buf)?;
534            let name = String::from_utf8(name_buf)
535                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Log name is not utf-8"))?;
536            let meta = LogMetadata::read(&mut reader)?;
537            self.metas
538                .entry(name.to_string())
539                .and_modify(|e| {
540                    let mut e = e.lock().unwrap();
541                    let truncated = e.primary_len > meta.primary_len && e.epoch == meta.epoch;
542                    *e = meta.clone();
543                    // Force a different epoch for truncation.
544                    if truncated {
545                        e.epoch = e.epoch.wrapping_add(1);
546                    }
547                })
548                .or_insert_with(|| Arc::new(Mutex::new(meta.clone())));
549        }
550        let version_major: u64 = reader.read_vlq().unwrap_or_else(|_| rand_u64());
551        let version_minor: u64 = reader.read_vlq().unwrap_or_default();
552        self.version = (version_major, version_minor);
553        Ok(())
554    }
555
556    /// Write metadata to a writer.
557    fn write(&self, mut writer: impl io::Write) -> io::Result<()> {
558        let version = 0;
559        writer.write_vlq(version)?;
560        writer.write_vlq(self.metas.len())?;
561        for (name, meta) in self.metas.iter() {
562            writer.write_vlq(name.len())?;
563            writer.write_all(name.as_bytes())?;
564            meta.lock().unwrap().write(&mut writer)?;
565        }
566        writer.write_vlq(self.version.0)?;
567        writer.write_vlq(self.version.1)?;
568        Ok(())
569    }
570
571    /// Update self with metadata from a file (legacy, for backwards compatibility).
572    /// If the file does not exist, self is not updated.
573    fn read_file<P: AsRef<Path>>(&mut self, path: P) -> crate::Result<()> {
574        let path = path.as_ref();
575        match utils::atomic_read(path) {
576            Ok(buf) => self.read(&buf[..]),
577            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
578            Err(e) => Err(e),
579        }
580        .context(path, "when decoding MultiMeta")
581    }
582
583    /// Atomically write metadata to a file (legacy, for backwards compatibility).
584    fn write_file<P: AsRef<Path>>(&self, path: P) -> crate::Result<()> {
585        let mut buf = Vec::new();
586        self.write(&mut buf).infallible()?;
587        utils::atomic_write(path, &buf, false)?;
588        Ok(())
589    }
590
591    /// Update self from a [`log::Log`].
592    fn read_log(&mut self, log: &log::Log) -> crate::Result<()> {
593        if let Some(last_entry) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
594            let data = last_entry?;
595            self.read(data).context(
596                log.path().as_opt_path().unwrap_or_else(|| Path::new("")),
597                "when decoding MutltiMeta",
598            )?;
599        }
600        Ok(())
601    }
602
603    /// Write metadata to a [`log::Log`] and persist to disk.
604    fn write_log(&self, log: &mut log::Log, _lock: &LockGuard) -> crate::Result<()> {
605        let mut data = Vec::new();
606        self.write(&mut data).infallible()?;
607        // Reload to check if the last entry is already up-to-date.
608        log.clear_dirty()?;
609        log.sync()?;
610        if let Some(Ok(last_data)) = log.lookup(INDEX_REVERSE, INDEX_REVERSE_KEY)?.next() {
611            if last_data == &data {
612                // log does not change. Do not write redundant data.
613                return Ok(());
614            }
615        }
616        log.append(&data)?;
617        log.sync()?;
618        Ok(())
619    }
620
621    /// Bump the version recorded in this [`MultiMeta`].
622    fn bump_version(&mut self) {
623        self.version.1 += 1;
624    }
625}
626
627#[cfg(test)]
628mod tests {
629    use log::tests::pwrite;
630    use quickcheck::quickcheck;
631
632    use super::*;
633
634    fn simple_open_opts() -> OpenOptions {
635        OpenOptions::from_name_opts(vec![
636            ("a", log::OpenOptions::new()),
637            ("b", log::OpenOptions::new()),
638        ])
639    }
640
641    /// Create a simple MultiLog containing Log 'a' and 'b' for testing.
642    fn simple_multilog(path: &Path) -> MultiLog {
643        let mopts = simple_open_opts();
644        mopts.open(path).unwrap()
645    }
646
647    fn index_open_opts() -> OpenOptions {
648        fn index_func(bytes: &[u8]) -> Vec<log::IndexOutput> {
649            (0..bytes.len() as u64)
650                .map(|i| log::IndexOutput::Reference(i..i + 1))
651                .collect()
652        }
653        let index_def = log::IndexDef::new("x", index_func).lag_threshold(0);
654        OpenOptions::from_name_opts(vec![(
655            "a",
656            log::OpenOptions::new().index_defs(vec![index_def]),
657        )])
658    }
659
660    #[test]
661    fn test_individual_log_can_be_opened_directly() {
662        let dir = tempfile::tempdir().unwrap();
663        let path = dir.path();
664        let mut mlog = simple_multilog(path);
665
666        log::OpenOptions::new().open(path.join("a")).unwrap();
667        log::OpenOptions::new().open(path.join("b")).unwrap();
668
669        // After flush - still readable.
670        mlog[0].append(b"1").unwrap();
671        mlog[0].flush().unwrap();
672        log::OpenOptions::new().open(path.join("a")).unwrap();
673    }
674
675    #[test]
676    fn test_individual_log_flushes_are_invisible() {
677        let dir = tempfile::tempdir().unwrap();
678        let path = dir.path();
679        let mut mlog = simple_multilog(path);
680
681        // This is not a proper use of Log::sync, since
682        // it's not protected by a lock. But it demonstrates
683        // the properties.
684        mlog[0].append(b"2").unwrap();
685        mlog[0].sync().unwrap();
686        mlog[0].append(b"3").unwrap();
687        mlog[0].append(b"4").unwrap();
688
689        mlog[1].append(b"y").unwrap();
690        mlog[1].sync().unwrap();
691        mlog[1].append(b"z").unwrap();
692        mlog[1].sync().unwrap();
693
694        assert_eq!(mlog[0].iter().count(), 3);
695        assert_eq!(mlog[1].iter().count(), 2);
696
697        // mlog changes are not written via MultiLog::write_meta.
698        // Therefore invisible to mlog2.
699        let mlog2 = simple_multilog(path);
700        assert_eq!(mlog2[0].iter().count(), 0);
701        assert_eq!(mlog2[1].iter().count(), 0);
702
703        // mlog.sync reloads multimeta. "Flushed" contents are dropped.
704        // But in-memory content is kept and written.
705        mlog.sync().unwrap();
706        assert_eq!(mlog[0].iter().count(), 2);
707        assert_eq!(mlog[1].iter().count(), 0);
708
709        let mlog2 = simple_multilog(path);
710        assert_eq!(mlog2[0].iter().count(), 2);
711        assert_eq!(mlog2[1].iter().count(), 0);
712    }
713
714    #[test]
715    fn test_version() {
716        let dir = tempfile::tempdir().unwrap();
717        let path = dir.path();
718        let mut mlog1 = simple_multilog(&path.join("1"));
719        let mut mlog2 = simple_multilog(&path.join("2"));
720
721        // Different logs have different versions.
722        let v1 = mlog1.version();
723        let v2 = mlog2.version();
724        assert!(v1.1 == 0);
725        assert!(v2.1 == 0);
726        assert_ne!(v1, v2);
727
728        // The second number of the version gets bumped on flush.
729        mlog1.sync().unwrap();
730        mlog2.sync().unwrap();
731        let v3 = mlog1.version();
732        let v4 = mlog2.version();
733        assert_eq!(v3.0, v1.0);
734        assert_eq!(v4.0, v2.0);
735        assert!(v3 > v1);
736        assert!(v4 > v2);
737
738        // Reopen preserves the versions.
739        let mlog1 = simple_multilog(&path.join("1"));
740        let mlog2 = simple_multilog(&path.join("2"));
741        let v5 = mlog1.version();
742        let v6 = mlog2.version();
743        assert_eq!(v5, v3);
744        assert_eq!(v6, v4);
745    }
746
747    #[test]
748    fn test_detach_logs() {
749        let dir = tempfile::tempdir().unwrap();
750        let path = dir.path();
751        let mut mlog = simple_multilog(path);
752        let mut logs = mlog.detach_logs();
753        logs[0].append(b"0").unwrap();
754        logs[1].append(b"1").unwrap();
755
756        // Although logs are detached. MultiLog can still update multimeta.
757        let lock = mlog.lock().unwrap();
758        logs[0].sync().unwrap();
759        logs[1].sync().unwrap();
760        mlog.write_meta(&lock).unwrap();
761        drop(lock);
762
763        let mlog2 = simple_multilog(path);
764        assert_eq!(mlog2[0].iter().count(), 1);
765        assert_eq!(mlog2[1].iter().count(), 1);
766    }
767
768    #[test]
769    fn test_new_index_built_only_once() {
770        let dir = tempfile::tempdir().unwrap();
771        let path = dir.path();
772        let mopts = OpenOptions::from_name_opts(vec![("a", log::OpenOptions::new())]);
773        let mut mlog = mopts.open(path).unwrap();
774        mlog[0].append(b"0").unwrap();
775        mlog.sync().unwrap();
776
777        // Reopen with an index newly defined.
778        let index_def =
779            log::IndexDef::new("i", |_| vec![log::IndexOutput::Reference(0..1)]).lag_threshold(0);
780        let mopts = OpenOptions::from_name_opts(vec![(
781            "a",
782            log::OpenOptions::new().index_defs(vec![index_def.clone()]),
783        )]);
784        let index_size = || {
785            path.join("a")
786                .join(index_def.filename())
787                .metadata()
788                .map(|m| m.len())
789                .unwrap_or_default()
790        };
791
792        assert_eq!(index_size(), 0);
793
794        // Open one time, index is built on demand.
795        let _mlog = mopts.open(path).unwrap();
796        assert_eq!(index_size(), 36);
797
798        // Open another time, index is reused.
799        let mut mlog = mopts.open(path).unwrap();
800        assert_eq!(index_size(), 36);
801
802        // Force updating epoch to make multimeta and per-log meta incompatible.
803        let lock = LockGuard(ScopedDirLock::new(path).unwrap());
804        mlog.multimeta.metas["a"].lock().unwrap().epoch ^= 1;
805        mlog.multimeta
806            .write_log(&mut mlog.multimeta_log, &lock)
807            .unwrap();
808        mlog.multimeta.write_file(&multi_meta_path(path)).unwrap();
809        drop(lock);
810
811        // The index is rebuilt (appended) at open time because of incompatible meta.
812        let _mlog = mopts.open(path).unwrap();
813        assert_eq!(index_size(), 71);
814    }
815
816    #[test]
817    fn test_wrong_locks_cause_errors() {
818        let dir = tempfile::tempdir().unwrap();
819        let path = dir.path();
820        let mut mlog1 = simple_multilog(&path.join("1"));
821        let mut mlog2 = simple_multilog(&path.join("2"));
822
823        let lock1 = mlog1.lock().unwrap();
824        let lock2 = mlog2.lock().unwrap();
825        assert!(mlog1.write_meta(&lock2).is_err());
826        assert!(mlog2.write_meta(&lock1).is_err());
827    }
828
829    fn repair_output(opts: &OpenOptions, path: &Path) -> String {
830        let out = opts.open_options_repair(path).unwrap();
831        filter_repair_output(out)
832    }
833
834    fn filter_repair_output(out: String) -> String {
835        // Filter out dynamic content.
836        out.lines()
837            .filter(|l| {
838                !l.contains("bytes in log")
839                    && !l.contains("Backed up")
840                    && !l.contains("Processing")
841                    && !l.contains("date -d")
842            })
843            .collect::<Vec<_>>()
844            .join("\n")
845    }
846
847    #[test]
848    fn test_repair() {
849        let dir = tempfile::tempdir().unwrap();
850        let path = dir.path();
851        let opts = simple_open_opts();
852        let mut mlog = opts.open(&path).unwrap();
853        let mut logs = mlog.detach_logs();
854
855        // Create 10 "multimeta"s. Each MultiMeta contains N entires for each log.
856        const N: usize = 12;
857        for i in 0..10u32 {
858            let lock = mlog.lock().unwrap();
859            for _ in 0..N {
860                logs[0].append(i.to_be_bytes()).unwrap();
861                logs[1].append(i.to_be_bytes()).unwrap();
862                logs[0].sync().unwrap();
863            }
864            logs[1].sync().unwrap();
865            mlog.write_meta(&lock).unwrap();
866        }
867
868        let repair = || repair_output(&opts, path);
869
870        // Check that both logs only have a multiple of N entries.
871        let verify = || {
872            let mlog = opts.open(&path).unwrap();
873            assert_eq!(mlog.logs[0].iter().count() % N, 0);
874            assert_eq!(mlog.logs[1].iter().count() % N, 0);
875        };
876
877        // Valid MultiLog.
878        let s1 = repair();
879        assert_eq!(
880            &s1,
881            r#"Repairing MultiMeta Log:
882  Index "reverse" passed integrity check
883Repairing Log a
884Log a has valid length 1212 after repair
885Repairing Log b
886Log b has valid length 1212 after repair
887MultiMeta is valid"#
888        );
889
890        // Repair output is also written to "repair.log" file.
891        let s2 = filter_repair_output(std::fs::read_to_string(path.join("repair.log")).unwrap());
892        assert_eq!(&s1, s2.trim_end());
893
894        // Put bad data in the first log. The repair will pick a recent MultiMeta point and
895        // dropping some entries.
896        pwrite(&path.join("a").join("log"), 1000, b"ff");
897        assert_eq!(
898            repair(),
899            r#"Repairing MultiMeta Log:
900  Index "reverse" passed integrity check
901Repairing Log a
902  Reset log size to 992
903Log a has valid length 992 after repair
904Repairing Log b
905Log b has valid length 1212 after repair
906Found valid MultiMeta after 2 invalid entries: a: 972, b: 972
907Invalidated indexes in log 'a'
908Invalidated indexes in log 'b'
909Write valid MultiMeta"#
910        );
911        verify();
912
913        assert_eq!(
914            repair(),
915            r#"Repairing MultiMeta Log:
916  Index "reverse" passed integrity check
917Repairing Log a
918Log a has valid length 992 after repair
919Repairing Log b
920Log b has valid length 1212 after repair
921Invalidated indexes in log 'a'
922Invalidated indexes in log 'b'
923Write valid MultiMeta"#
924        );
925    }
926
927    #[test]
928    fn test_repair_broken_index() {
929        // Test repair where the logs are fine but the indexes are broken.
930        let dir = tempfile::tempdir().unwrap();
931        let path = dir.path();
932        let opts = index_open_opts();
933        let mut mlog = opts.open(&path).unwrap();
934        let mut logs = mlog.detach_logs();
935
936        let repair = || repair_output(&opts, path);
937        let file_size = |path| std::fs::metadata(path).unwrap().len();
938
939        let meta_path = multi_meta_path(path);
940        let meta_log_path = multi_meta_log_path(path).join("log");
941        let index_path = path.join("a").join("index2-x");
942
943        // Write some data. Flush the "log" multiple times to cause index
944        // fragmentation so the rebuilt index would be shorter.
945        let mut meta_log_sizes = Vec::new();
946        let mut index_sizes = Vec::new();
947        for data in [b"abcd", b"abce", b"acde", b"bcde"] {
948            let lock = mlog.lock().unwrap();
949            logs[0].append(data).unwrap();
950            logs[0].sync().unwrap();
951            mlog.write_meta(&lock).unwrap();
952            meta_log_sizes.push(file_size(&meta_log_path));
953            index_sizes.push(file_size(&index_path));
954        }
955        drop(mlog);
956        drop(logs);
957
958        // Corrupt the index and the multimeta log so the repair
959        // logic would revert to a previous MultiMeta, and rebuild
960        // index. If it's not careful, MultiMeta can contain offsets
961        // to the index file that is no longer valid.
962        pwrite(&index_path, -4, b"ffff");
963        pwrite(&meta_log_path, (meta_log_sizes[1] - 5) as _, b"xxxxx");
964        std::fs::remove_file(&meta_path).unwrap();
965
966        let index_len_before = file_size(&index_path);
967        assert_eq!(
968            repair(),
969            r#"Repairing MultiMeta Log:
970  Reset log size to 111
971  Rebuilt index "reverse"
972Repairing Log a
973  Rebuilt index "x"
974Log a has valid length 52 after repair
975Invalidated indexes in log 'a'
976Write valid MultiMeta"#
977        );
978
979        // Index should be rebuilt (shorter).
980        let index_len_after = file_size(&index_path);
981        assert!(index_len_before > index_len_after);
982
983        // The MultiLog can be opened fine.
984        opts.open(path).map(|_| 1).unwrap();
985    }
986
987    #[test]
988    fn test_mixed_old_new_read_writes() {
989        let dir = tempfile::tempdir().unwrap();
990        let path = dir.path();
991
992        let mut mlog_new = simple_open_opts().open(&path).unwrap();
993        let mut logs_new = mlog_new.detach_logs();
994
995        let mut mlog_old = {
996            let mut opts = simple_open_opts();
997            opts.leacy_multimeta_source = true;
998            opts.open(&path).unwrap()
999        };
1000        let mut logs_old = mlog_old.detach_logs();
1001
1002        // Mixed writes from old and new mlogs.
1003        const N: usize = 2;
1004        for i in 0..N {
1005            for (mlog, logs, j) in [
1006                (&mut mlog_new, &mut logs_new, 0u8),
1007                (&mut mlog_old, &mut logs_old, 1u8),
1008            ] {
1009                let lock = mlog.lock().unwrap();
1010                logs[0].append(&[i as u8, j]).unwrap();
1011                logs[0].sync().unwrap();
1012                mlog.write_meta(&lock).unwrap();
1013            }
1014        }
1015
1016        // Reading the log. It should contain N * 2 entries.
1017        let mlog = simple_open_opts().open(&path).unwrap();
1018        assert_eq!(
1019            mlog.logs[0].iter().map(|e| e.unwrap()).collect::<Vec<_>>(),
1020            [[0, 0], [0, 1], [1, 0], [1, 1]],
1021        );
1022    }
1023
1024    quickcheck! {
1025        fn test_roundtrip_multimeta(name_len_list: Vec<(String, u64)>, version: (u64, u64)) -> bool {
1026            let metas = name_len_list
1027                .into_iter()
1028                .map(|(name, len)| {
1029                    let meta = LogMetadata::new_with_primary_len(len);
1030                    (name, Arc::new(Mutex::new(meta)))
1031                })
1032                .collect();
1033            let meta = MultiMeta { metas, version, ..Default::default() };
1034            let mut buf = Vec::new();
1035            meta.write(&mut buf).unwrap();
1036            let mut meta2 = MultiMeta::default();
1037            meta2.read(&buf[..]).unwrap();
1038            let mut buf2 = Vec::new();
1039            meta2.write(&mut buf2).unwrap();
1040            assert_eq!(buf2, buf);
1041            buf2 == buf
1042        }
1043
1044        fn test_roundtrip_multilog(list_a: Vec<Vec<u8>>, list_b: Vec<Vec<u8>>) -> bool {
1045            let dir = tempfile::tempdir().unwrap();
1046            let mut mlog = simple_multilog(dir.path());
1047            for a in &list_a {
1048                mlog[0].append(a).unwrap();
1049            }
1050            for b in &list_b {
1051                mlog[1].append(b).unwrap();
1052            }
1053            mlog.sync().unwrap();
1054
1055            let mlog_read = simple_multilog(dir.path());
1056            let list_a_read: Vec<Vec<u8>> = mlog_read[0].iter().map(|e| e.unwrap().to_vec()).collect();
1057            let list_b_read: Vec<Vec<u8>> = mlog_read[1].iter().map(|e| e.unwrap().to_vec()).collect();
1058
1059            list_a == list_a_read && list_b == list_b_read
1060        }
1061    }
1062}