indexedlog/log/
mod.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! Append-only storage with indexes and integrity checks.
9//!
10//! See [`Log`] for the main structure. This module also provides surrounding
11//! types needed to construct the [`Log`], including [`IndexDef`] and some
12//! iterators.
13
14// Detailed file formats:
15//
16// Primary log:
17//   LOG := HEADER + ENTRY_LIST
18//   HEADER := 'log\0'
19//   ENTRY_LIST := '' | ENTRY_LIST + ENTRY
20//   ENTRY := ENTRY_FLAGS + LEN(CONTENT) + CHECKSUM + CONTENT
21//   CHECKSUM := '' | XXHASH64(CONTENT) | XXHASH32(CONTENT)
22//
23// Metadata:
24//   META := HEADER + XXHASH64(DATA) + LEN(DATA) + DATA
25//   HEADER := 'meta\0'
26//   DATA := LEN(LOG) + LEN(INDEXES) + INDEXES
27//   INDEXES := '' | INDEXES + INDEX
28//   INDEX := LEN(NAME) + NAME + INDEX_LOGIC_LEN
29//
30// Indexes:
31//   See `index.rs`.
32//
33// Integers are VLQ encoded, except for XXHASH64 and XXHASH32, which uses
34// LittleEndian encoding.
35
36use std::borrow::Cow;
37use std::fmt;
38use std::fmt::Debug;
39use std::fmt::Formatter;
40use std::fs;
41use std::fs::File;
42use std::io;
43use std::io::Seek;
44use std::io::SeekFrom;
45use std::io::Write;
46use std::ops::RangeBounds;
47use std::path::Path;
48use std::pin::Pin;
49use std::sync::Arc;
50
51use byteorder::ByteOrder;
52use byteorder::LittleEndian;
53use byteorder::WriteBytesExt;
54use minibytes::Bytes;
55use tracing::debug_span;
56use tracing::trace;
57use vlqencoding::VLQDecodeAt;
58use vlqencoding::VLQEncode;
59
60use crate::config;
61use crate::errors::IoResultExt;
62use crate::errors::ResultExt;
63use crate::index;
64use crate::index::Index;
65use crate::index::InsertKey;
66use crate::index::InsertValue;
67use crate::index::LeafValueIter;
68use crate::index::RangeIter;
69use crate::index::ReadonlyBuffer;
70use crate::lock::ScopedDirLock;
71use crate::lock::READER_LOCK_OPTS;
72use crate::utils;
73use crate::utils::mmap_path;
74use crate::utils::xxhash;
75use crate::utils::xxhash32;
76
77mod fold;
78mod meta;
79mod open_options;
80mod path;
81mod repair;
82#[cfg(test)]
83pub(crate) mod tests;
84
85pub use open_options::ChecksumType;
86pub use open_options::FlushFilterContext;
87pub use open_options::FlushFilterFunc;
88pub use open_options::FlushFilterOutput;
89pub use open_options::IndexDef;
90pub use open_options::IndexOutput;
91pub use open_options::OpenOptions;
92pub use path::GenericPath;
93
94pub use self::fold::Fold;
95pub use self::fold::FoldDef;
96use self::fold::FoldState;
97pub use self::meta::LogMetadata;
98
99// Constants about file names
100pub(crate) const PRIMARY_FILE: &str = "log";
101const PRIMARY_HEADER: &[u8] = b"indexedlog0\0";
102const PRIMARY_START_OFFSET: u64 = 12; // PRIMARY_HEADER.len() as u64;
103pub(crate) const META_FILE: &str = "meta";
104
105const ENTRY_FLAG_HAS_XXHASH64: u32 = 1;
106const ENTRY_FLAG_HAS_XXHASH32: u32 = 2;
107
108// 1MB index checksum. This makes checksum file within one block (4KB) for 512MB index.
109const INDEX_CHECKSUM_CHUNK_SIZE_LOGARITHM: u32 = 20;
110
111/// An append-only storage with indexes and integrity checks.
112///
113/// The [`Log`] is backed by a directory in the filesystem. The
114/// directory includes:
115///
116/// - An append-only "log" file. It can be seen as a serialization
117///   result of an append-only list of byte slices. Each byte slice
118///   has a checksum.
119/// - Multiple user-defined indexes. Each index has an append-only
120///   on-disk radix-tree representation and a small, separate,
121///   non-append-only checksum file.
122/// - A small "metadata" file which records the logic lengths (in bytes)
123///   for the log and index files.
124///
125/// Reading is lock-free because the log and indexes are append-only.
126/// Writes are buffered in memory. Flushing in-memory parts to
127/// disk requires taking a flock on the directory.
128pub struct Log {
129    pub dir: GenericPath,
130    pub(crate) disk_buf: Bytes,
131    pub(crate) mem_buf: Pin<Box<Vec<u8>>>,
132    pub(crate) meta: LogMetadata,
133    indexes: Vec<Index>,
134    // On-demand caches of the folds defined by open_options.
135    // disk_folds only includes clean (on-disk) entries.
136    // all_folds includes both clean (on-disk) and dirty (in-memory) entries.
137    disk_folds: Vec<FoldState>,
138    all_folds: Vec<FoldState>,
139    // Whether the index and the log is out-of-sync. In which case, index-based reads (lookups)
140    // should return errors because it can no longer be trusted.
141    // This could be improved to be per index. For now, it's a single state for simplicity. It's
142    // probably fine considering index corruptions are rare.
143    index_corrupted: bool,
144    open_options: OpenOptions,
145    // Indicate an active reader. Destrictive writes (repair) are unsafe.
146    reader_lock: Option<ScopedDirLock>,
147}
148
149/// Iterator over all entries in a [`Log`].
150pub struct LogIter<'a> {
151    next_offset: u64,
152    errored: bool,
153    log: &'a Log,
154}
155
156/// Iterator over [`Log`] entries selected by an index lookup.
157///
158/// It is a wrapper around [index::LeafValueIter].
159pub struct LogLookupIter<'a> {
160    inner_iter: LeafValueIter<'a>,
161    errored: bool,
162    log: &'a Log,
163}
164
165/// Iterator over keys and [`LogLookupIter`], filtered by an index prefix.
166///
167/// It is a wrapper around [index::RangeIter].
168pub struct LogRangeIter<'a> {
169    inner_iter: RangeIter<'a>,
170    errored: bool,
171    log: &'a Log,
172    index: &'a Index,
173}
174
175/// Satisfy [`index::ReadonlyBuffer`] trait so [`Log`] can use external
176/// keys on [`Index`] for in-memory-only entries.
177struct ExternalKeyBuffer {
178    disk_buf: Bytes,
179    disk_len: u64,
180
181    // Prove the pointer is valid:
182    // 1. If ExternalKeyBuffer is alive, then the Index owning it is alive.
183    //    This is because ExternalKeyBuffer is private to Index, and there
184    //    is no way to get a clone of ExternalKeyBuffer without also
185    //    cloning its owner (Index).
186    // 2. If the Index owning ExternalKeyBuffer is alive, then the Log
187    //    owning the Index is alive. Similarily, Index is private to Log,
188    //    and there is no way to just clone the Index without cloning
189    //    its owner (Log).
190    // 3. If Log is alive, then Log.mem_buf is alive.
191    // 4. Log.mem_buf is pinned, so this pointer is valid.
192    //
193    // Here is why `Arc<Mutex<Vec<u8>>>` is not fesiable:
194    //
195    // - Bad performance: The Mutex overhead is visible.
196    //   "log insertion (no checksum)" takes 2x time.
197    //   "log insertion" and "log iteration (memory)" take 1.5x time.
198    // - Unsafe Rust is still necessary.
199    //   In [`Log::read_entry`], reading the in-memory entry case,
200    //   the borrow relationship changes from `&Log -> &[u8]` to
201    //   `&Log -> &MutexGuard -> &[u8]`, which means unsafe Rust is
202    //   needed, or it has to take the mutex lock. Neither desirable.
203    //
204    // Here is why normal liftime is not fesiable:
205    // - A normal lifetime will enforce the `mem_buf` to be read-only.
206    //   But Log needs to write to it.
207    //
208    // Note: Rust reference cannot be used here, because a reference implies
209    // LLVM "noalias", which is not true since Log can change Log.mem_buf.
210    //
211    // (UNSAFE NOTICE)
212    mem_buf: *const Vec<u8>,
213}
214
215// mem_buf can be read from multiple threads at the same time if no thread is
216// changing the actual mem_buf. If there is a thread changing mem_buf by
217// calling Log::append(&mut self, ...), then the compiler should make sure
218// Log methods taking &self are not called at the same time.
219unsafe impl Send for ExternalKeyBuffer {}
220unsafe impl Sync for ExternalKeyBuffer {}
221
222// Some design notes:
223// - Public APIs do not expose internal offsets of entries. This avoids issues when an in-memory
224//   entry gets moved after `flush`.
225// - The only write-to-disk operation is `flush`, aside from creating an empty `Log`. This makes it
226//   easier to verify correctness - just make sure `flush` is properly handled (ex. by locking).
227
228impl Log {
229    /// Construct [`Log`] at given directory. Incrementally build up specified
230    /// indexes.
231    ///
232    /// Create the [`Log`] if it does not exist.
233    ///
234    /// See [`OpenOptions::open`] for details.
235    pub fn open<P: AsRef<Path>>(dir: P, index_defs: Vec<IndexDef>) -> crate::Result<Self> {
236        OpenOptions::new()
237            .index_defs(index_defs)
238            .create(true)
239            .open(dir.as_ref())
240    }
241
242    /// Append an entry in-memory. Update related indexes in-memory.
243    ///
244    /// The memory part is not shared. Therefore other [`Log`] instances won't see
245    /// the change immediately.
246    ///
247    /// To write in-memory entries and indexes to disk, call [`Log::sync`].
248    pub fn append<T: AsRef<[u8]>>(&mut self, data: T) -> crate::Result<()> {
249        let result: crate::Result<_> = (|| {
250            let data = data.as_ref();
251
252            let checksum_type = if self.open_options.checksum_type == ChecksumType::Auto {
253                // xxhash64 is slower for smaller data. A quick benchmark on x64 platform shows:
254                //
255                // bytes  xxhash32  xxhash64 (MB/s)
256                //   32       1882      1600
257                //   40       1739      1538
258                //   48       2285      1846
259                //   56       2153      2000
260                //   64       2666      2782
261                //   72       2400      2322
262                //   80       2962      2758
263                //   88       2750      2750
264                //   96       3200      3692
265                //  104       2810      3058
266                //  112       3393      3500
267                //  120       3000      3428
268                //  128       3459      4266
269                const XXHASH64_THRESHOLD: usize = 88;
270                if data.len() >= XXHASH64_THRESHOLD {
271                    ChecksumType::Xxhash64
272                } else {
273                    ChecksumType::Xxhash32
274                }
275            } else {
276                self.open_options.checksum_type
277            };
278
279            let offset = self.meta.primary_len + self.mem_buf.len() as u64;
280
281            // Design note: Currently checksum_type is the only thing that decides
282            // entry_flags.  Entry flags is not designed to just cover different
283            // checksum types.  For example, if we'd like to introduce transparent
284            // compression (maybe not a good idea since it can be more cleanly built
285            // at an upper layer), or some other ways to store data (ex. reference
286            // to other data, or fixed length data), they can probably be done by
287            // extending the entry type.
288            let mut entry_flags = 0;
289            entry_flags |= match checksum_type {
290                ChecksumType::Xxhash64 => ENTRY_FLAG_HAS_XXHASH64,
291                ChecksumType::Xxhash32 => ENTRY_FLAG_HAS_XXHASH32,
292                ChecksumType::Auto => unreachable!(),
293            };
294
295            self.mem_buf.write_vlq(entry_flags).infallible()?;
296            self.mem_buf.write_vlq(data.len()).infallible()?;
297
298            match checksum_type {
299                ChecksumType::Xxhash64 => {
300                    self.mem_buf
301                        .write_u64::<LittleEndian>(xxhash(data))
302                        .infallible()?;
303                }
304                ChecksumType::Xxhash32 => {
305                    self.mem_buf
306                        .write_u32::<LittleEndian>(xxhash32(data))
307                        .infallible()?;
308                }
309                ChecksumType::Auto => unreachable!(),
310            };
311            let data_offset = self.meta.primary_len + self.mem_buf.len() as u64;
312
313            self.mem_buf.write_all(data).infallible()?;
314            self.update_indexes_for_in_memory_entry(data, offset, data_offset)?;
315            self.update_fold_for_in_memory_entry(data, offset, data_offset)?;
316
317            if let Some(threshold) = self.open_options.auto_sync_threshold {
318                if self.mem_buf.len() as u64 >= threshold {
319                    self.sync()
320                        .context("sync triggered by auto_sync_threshold")?;
321                }
322            }
323
324            Ok(())
325        })();
326
327        result
328            .context(|| {
329                let data = data.as_ref();
330                if data.len() < 128 {
331                    format!("in Log::append({:?})", data)
332                } else {
333                    format!("in Log::append(<a {}-byte long slice>)", data.len())
334                }
335            })
336            .context(|| format!("  Log.dir = {:?}", self.dir))
337    }
338
339    /// Remove dirty (in-memory) state. Restore the [`Log`] to the state as
340    /// if it's just loaded from disk without modifications.
341    pub fn clear_dirty(&mut self) -> crate::Result<()> {
342        let result: crate::Result<_> = (|| {
343            self.maybe_return_index_error()?;
344            for index in self.indexes.iter_mut() {
345                index.clear_dirty();
346            }
347            self.mem_buf.clear();
348            self.all_folds = self.disk_folds.clone();
349            self.update_indexes_for_on_disk_entries()?;
350            Ok(())
351        })();
352        result
353            .context("in Log::clear_dirty")
354            .context(|| format!("  Log.dir = {:?}", self.dir))
355    }
356
357    /// Return a cloned [`Log`] with pending in-memory changes.
358    pub fn try_clone(&self) -> crate::Result<Self> {
359        self.try_clone_internal(true)
360            .context("in Log:try_clone")
361            .context(|| format!("  Log.dir = {:?}", self.dir))
362    }
363
364    /// Return a cloned [`Log`] without pending in-memory changes.
365    ///
366    /// This is logically equivalent to calling `clear_dirty` immediately
367    /// on the result after `try_clone`, but potentially cheaper.
368    pub fn try_clone_without_dirty(&self) -> crate::Result<Self> {
369        self.try_clone_internal(false)
370            .context("in Log:try_clone_without_dirty")
371    }
372
373    fn try_clone_internal(&self, copy_dirty: bool) -> crate::Result<Self> {
374        self.maybe_return_index_error()?;
375
376        // Prepare cloned versions of things.
377        let mut indexes = self
378            .indexes
379            .iter()
380            .map(|i| i.try_clone_internal(copy_dirty))
381            .collect::<Result<Vec<Index>, _>>()?;
382        let disk_buf = self.disk_buf.clone();
383        let mem_buf = if copy_dirty {
384            self.mem_buf.clone()
385        } else {
386            Box::pin(Vec::new())
387        };
388
389        {
390            // Update external key buffer of indexes to point to the new mem_buf.
391            let mem_buf: &Vec<u8> = &mem_buf;
392            let mem_buf: *const Vec<u8> = mem_buf as *const Vec<u8>;
393            let index_key_buf = Arc::new(ExternalKeyBuffer {
394                disk_buf: disk_buf.clone(),
395                disk_len: self.meta.primary_len,
396                mem_buf,
397            });
398            for index in indexes.iter_mut() {
399                index.key_buf = index_key_buf.clone();
400            }
401        }
402
403        let reader_lock = match self.dir.as_opt_path() {
404            Some(d) => Some(ScopedDirLock::new_with_options(d, &READER_LOCK_OPTS)?),
405            None => None,
406        };
407
408        // Create the new Log.
409        let mut log = Log {
410            dir: self.dir.clone(),
411            disk_buf,
412            mem_buf,
413            meta: self.meta.clone(),
414            indexes,
415            disk_folds: self.disk_folds.clone(),
416            all_folds: if copy_dirty {
417                &self.all_folds
418            } else {
419                &self.disk_folds
420            }
421            .clone(),
422            index_corrupted: false,
423            open_options: self.open_options.clone(),
424            reader_lock,
425        };
426
427        if !copy_dirty {
428            // The indexes can be lagging. Update them.
429            // This is similar to what clear_dirty does.
430            log.update_indexes_for_on_disk_entries()?;
431        }
432
433        Ok(log)
434    }
435
436    /// Load the latest data from disk. Write in-memory entries to disk.
437    ///
438    /// After writing, update on-disk indexes. These happen in a same critical
439    /// section, protected by a lock on the directory.
440    ///
441    /// Even if [`Log::append`] is never called, this function has a side effect
442    /// updating the [`Log`] to contain latest entries on disk.
443    ///
444    /// Other [`Log`] instances living in a same process or other processes won't
445    /// be notified about the change and they can only access the data
446    /// "snapshotted" at open time.
447    ///
448    /// Return the size of the updated primary log file in bytes.
449    ///
450    /// For in-memory-only Logs, this function does nothing, and returns 0.
451    pub fn sync(&mut self) -> crate::Result<u64> {
452        let result: crate::Result<_> = (|| {
453            let span = debug_span!("Log::sync", dirty_bytes = self.mem_buf.len());
454            if let Some(dir) = &self.dir.as_opt_path() {
455                span.record("dir", &dir.to_string_lossy().as_ref());
456            }
457            let _guard = span.enter();
458
459            if self.dir.as_opt_path().is_none() {
460                // See Index::flush for why this is not an Err.
461                return Ok(0);
462            }
463
464            fn check_append_only(this: &Log, new_meta: &LogMetadata) -> crate::Result<()> {
465                let old_meta = &this.meta;
466                if old_meta.primary_len > new_meta.primary_len {
467                    Err(crate::Error::path(
468                        this.dir.as_opt_path().unwrap(),
469                        format!(
470                            "on-disk log is unexpectedly smaller ({} bytes) than its previous version ({} bytes)",
471                            new_meta.primary_len, old_meta.primary_len
472                        ),
473                    ))
474                } else {
475                    Ok(())
476                }
477            }
478
479            // Read-only fast path - no need to take directory lock.
480            if self.mem_buf.is_empty() {
481                if let Ok(meta) = Self::load_or_create_meta(&self.dir, false) {
482                    let changed = self.meta != meta;
483                    let truncated = self.meta.epoch != meta.epoch;
484                    if !truncated {
485                        check_append_only(self, &meta)?;
486                    }
487                    // No need to reload anything if metadata hasn't changed.
488                    if changed {
489                        // Indexes cannot be reused, if epoch has changed. Otherwise,
490                        // Indexes can be reused, since they do not have new in-memory
491                        // entries, and the on-disk primary log is append-only (so data
492                        // already present in the indexes is valid).
493                        *self = self.open_options.clone().open_internal(
494                            &self.dir,
495                            if truncated { None } else { Some(&self.indexes) },
496                            None,
497                        )?;
498                    }
499                } else {
500                    // If meta can not be read, do not error out.
501                    // This Log can still be used to answer queries.
502                    //
503                    // This behavior makes Log more friendly for cases where an
504                    // external process does a `rm -rf` and the current process
505                    // does a `sync()` just for loading new data. Not erroring
506                    // out and pretend that nothing happended.
507                }
508                return Ok(self.meta.primary_len);
509            }
510
511            // Take the lock so no other `flush` runs for this directory. Then reload meta, append
512            // log, then update indexes.
513            let dir = self.dir.as_opt_path().unwrap().to_path_buf();
514            let lock = ScopedDirLock::new(&dir)?;
515
516            // Step 1: Reload metadata to get the latest view of the files.
517            let mut meta = Self::load_or_create_meta(&self.dir, false)?;
518            let changed = self.meta != meta;
519            let truncated = self.meta.epoch != meta.epoch;
520            if !truncated {
521                check_append_only(self, &meta)?;
522            }
523
524            // Cases where Log and Indexes need to be reloaded.
525            if changed && self.open_options.flush_filter.is_some() {
526                let filter = self.open_options.flush_filter.unwrap();
527
528                // Start with a clean log that does not have dirty entries.
529                let mut log = self
530                    .open_options
531                    .clone()
532                    .open_with_lock(&self.dir, &lock)
533                    .context("re-open to run flush_filter")?;
534
535                for entry in self.iter_dirty() {
536                    let content = entry?;
537                    let context = FlushFilterContext { log: &log };
538                    // Re-insert entries to that clean log.
539                    match filter(&context, content)
540                        .map_err(|err| crate::Error::wrap(err, "failed to run filter function"))?
541                    {
542                        FlushFilterOutput::Drop => {}
543                        FlushFilterOutput::Keep => log.append(content)?,
544                        FlushFilterOutput::Replace(content) => log.append(content)?,
545                    }
546                }
547
548                // Replace "self" so we can continue flushing the updated data.
549                *self = log;
550            } else if truncated {
551                // Reload log and indexes, and re-insert entries.
552                let mut log = self
553                    .open_options
554                    .clone()
555                    .open_with_lock(&self.dir, &lock)
556                    .context(|| {
557                        format!(
558                            "re-open since epoch has changed ({} to {})",
559                            self.meta.epoch, meta.epoch
560                        )
561                    })?;
562
563                for entry in self.iter_dirty() {
564                    let content = entry?;
565                    log.append(content)?;
566                }
567
568                // Replace "self" so we can continue flushing the updated data.
569                *self = log;
570            }
571
572            // Step 2: Append to the primary log.
573            let primary_path = self.dir.as_opt_path().unwrap().join(PRIMARY_FILE);
574            let mut primary_file = fs::OpenOptions::new()
575                .read(true)
576                .write(true)
577                .open(&primary_path)
578                .context(&primary_path, "cannot open for read-write")?;
579
580            // It's possible that the previous write was interrupted. In that case,
581            // the length of "log" can be longer than the length of "log" stored in
582            // the metadata. Seek to the length specified by the metadata and
583            // overwrite (broken) data.
584            // This breaks the "append-only" property of the physical file. But all
585            // readers use "meta" to decide the length of "log". So "log" is still
586            // append-only as seen by readers, as long as the length specified in
587            // "meta" is append-only (i.e. "meta" is not rewritten to have a smaller
588            // length, and all bytes in the specified length are immutable).
589            // Note: file.set_len might easily fail on Windows due to mmap.
590            let pos = primary_file
591                .seek(SeekFrom::Start(meta.primary_len))
592                .context(&primary_path, || {
593                    format!("cannot seek to {}", meta.primary_len)
594                })?;
595            if pos != meta.primary_len {
596                let msg = format!(
597                    "log file {} has {} bytes, expect at least {} bytes",
598                    primary_path.to_string_lossy(),
599                    pos,
600                    meta.primary_len
601                );
602                // This might be another process re-creating the file.
603                // Do not consider this as a corruption (?).
604                // TODO: Review this decision.
605                let err = crate::Error::path(&primary_path, msg);
606                return Err(err);
607            }
608
609            // Actually write the primary log. Once it's written, we can remove the in-memory buffer.
610            primary_file
611                .write_all(&self.mem_buf)
612                .context(&primary_path, || {
613                    format!("cannot write data ({} bytes)", self.mem_buf.len())
614                })?;
615
616            if self.open_options.fsync || config::get_global_fsync() {
617                primary_file
618                    .sync_all()
619                    .context(&primary_path, "cannot fsync")?;
620            }
621
622            meta.primary_len += self.mem_buf.len() as u64;
623            self.mem_buf.clear();
624
625            // Step 3: Reload primary log and indexes to get the latest view.
626            let (disk_buf, indexes) = Self::load_log_and_indexes(
627                &self.dir,
628                &meta,
629                &self.open_options.index_defs,
630                &self.mem_buf,
631                if changed {
632                    // Existing indexes cannot be reused.
633                    None
634                } else {
635                    // Indexes can be reused, because they already contain all entries
636                    // that were just written to disk and the on-disk files do not
637                    // have new entries (tested by "self.meta != meta" in Step 1).
638                    //
639                    // The indexes contain all entries, because they were previously
640                    // "always-up-to-date", and the on-disk log does not have anything new.
641                    // Update "meta" so "update_indexes_for_on_disk_entries" below won't
642                    // re-index entries.
643                    //
644                    // This is needed because `Log::append` updated indexes in-memory but
645                    // did not update their metadata for performance. This is to update
646                    // the metadata stored in Indexes.
647                    Self::set_index_log_len(self.indexes.iter_mut(), meta.primary_len);
648                    Some(&self.indexes)
649                },
650                self.open_options.fsync,
651            )?;
652
653            self.disk_buf = disk_buf;
654            self.indexes = indexes;
655            self.meta = meta;
656
657            // Step 4: Update the indexes and folds. Optionally flush them.
658            self.update_indexes_for_on_disk_entries()?;
659            let lagging_index_ids = self.lagging_index_ids();
660            self.flush_lagging_indexes(&lagging_index_ids, &lock)?;
661            self.update_and_flush_disk_folds()?;
662            self.all_folds = self.disk_folds.clone();
663
664            // Step 5: Write the updated meta file.
665            self.dir.write_meta(&self.meta, self.open_options.fsync)?;
666
667            Ok(self.meta.primary_len)
668        })();
669
670        result
671            .context("in Log::sync")
672            .context(|| format!("  Log.dir = {:?}", self.dir))
673    }
674
675    /// Write (updated) lagging indexes back to disk.
676    /// Usually called after `update_indexes_for_on_disk_entries`.
677    /// This function might change `self.meta`. Be sure to write `self.meta` to
678    /// save the result.
679    pub(crate) fn flush_lagging_indexes(
680        &mut self,
681        index_ids: &[usize],
682        _lock: &ScopedDirLock,
683    ) -> crate::Result<()> {
684        for &index_id in index_ids.iter() {
685            let metaname = self.open_options.index_defs[index_id].metaname();
686            let new_length = self.indexes[index_id].flush();
687            let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
688            self.meta.indexes.insert(metaname, new_length);
689            trace!(
690                name = "Log::flush_lagging_index",
691                index_name = self.open_options.index_defs[index_id].name.as_str(),
692                new_index_length = new_length,
693            );
694        }
695        Ok(())
696    }
697
698    /// Return the index to indexes that are considered lagging.
699    /// This is usually followed by `update_indexes_for_on_disk_entries`.
700    pub(crate) fn lagging_index_ids(&self) -> Vec<usize> {
701        let log_bytes = self.meta.primary_len;
702        self.open_options
703            .index_defs
704            .iter()
705            .enumerate()
706            .filter(|(i, def)| {
707                let indexed_bytes = Self::get_index_log_len(&self.indexes[*i], false).unwrap_or(0);
708                let lag_bytes = log_bytes.max(indexed_bytes) - indexed_bytes;
709                let lag_threshold = def.lag_threshold;
710                trace!(
711                    name = "Log::is_index_lagging",
712                    index_name = def.name.as_str(),
713                    lag = lag_bytes,
714                    threshold = lag_threshold
715                );
716                lag_bytes > lag_threshold
717            })
718            .map(|(i, _def)| i)
719            .collect()
720    }
721
722    /// Check if the log is changed on disk.
723    pub fn is_changed(&self) -> bool {
724        match self.dir.read_meta() {
725            Ok(meta) => meta != self.meta,
726            Err(_) => true,
727        }
728    }
729
730    /// Renamed. Use [`Log::sync`] instead.
731    pub fn flush(&mut self) -> crate::Result<u64> {
732        self.sync()
733    }
734
735    /// Convert a slice to [`Bytes`].
736    /// Do not copy the slice if it's from the main on-disk buffer.
737    pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
738        self.disk_buf.slice_to_bytes(slice)
739    }
740
741    /// Convert a slice to [`Bytes`].
742    /// Do not copy the slice if it's from the specified index buffer.
743    pub fn index_slice_to_bytes(&self, index_id: usize, slice: &[u8]) -> Bytes {
744        self.indexes[index_id].slice_to_bytes(slice)
745    }
746
747    /// Make sure on-disk indexes are up-to-date with the primary log, regardless
748    /// of `lag_threshold`.
749    ///
750    /// This is used internally by [`RotateLog`] to make sure a [`Log`] has
751    /// complete indexes before rotating.
752    pub(crate) fn finalize_indexes(&mut self, _lock: &ScopedDirLock) -> crate::Result<()> {
753        let result: crate::Result<_> = (|| {
754            let dir = self.dir.clone();
755            if let Some(dir) = dir.as_opt_path() {
756                if !self.mem_buf.is_empty() {
757                    return Err(crate::Error::programming(
758                        "sync() should be called before finalize_indexes()",
759                    ));
760                }
761
762                let _lock = ScopedDirLock::new(&dir)?;
763
764                let meta = Self::load_or_create_meta(&self.dir, false)?;
765                // Only check primary_len, not meta.indexes. This is because
766                // meta.indexes can be updated on open. See D38261693 (test)
767                // and D20042046 (update index on open).
768                //
769                // More details:
770                // For RotateLog it has 2 levels of directories and locks, like:
771                // - rotate/lock: lock when RotateLog is writing
772                // - rotate/0/: Previous (considered by RotateLog as read-only) Log
773                // - rotate/1/: Previous (considered by RotateLog as read-only) Log
774                // - rotate/2/lock: lock when this Log is being written
775                // - rotate/2/: "Current" (writable) Log
776                //
777                // However, when opening rotate/0 as a Log, it might change the indexes
778                // without being noticed by other RotateLogs. If the indexes are updated,
779                // then the meta would be changed. The primary len is not changed, though.
780                if self.meta.primary_len != meta.primary_len || self.meta.epoch != meta.epoch {
781                    return Err(crate::Error::programming(format!(
782                        "race detected, callsite responsible for preventing races (old meta: {:?}, new meta: {:?})",
783                        &self.meta, &meta
784                    )));
785                }
786                self.meta = meta;
787
788                // Flush all indexes.
789                for i in 0..self.indexes.len() {
790                    let new_length = self.indexes[i].flush();
791                    let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
792                    let name = self.open_options.index_defs[i].metaname();
793                    self.meta.indexes.insert(name, new_length);
794                }
795
796                self.dir.write_meta(&self.meta, self.open_options.fsync)?;
797            }
798            Ok(())
799        })();
800        result
801            .context("in Log::finalize_indexes")
802            .context(|| format!("  Log.dir = {:?}", self.dir))
803    }
804
805    /// Rebuild indexes.
806    ///
807    /// If `force` is `false`, then indexes that pass the checksum check
808    /// will not be rebuilt. Otherwise, they will be rebuilt regardless.
809    ///
810    /// Setting `force` to `true` might reduce the size used by the index
811    /// files. But that is more expensive.
812    ///
813    /// The function consumes the [`Log`] object, since it is hard to recover
814    /// from an error case.
815    ///
816    /// Return message useful for human consumption.
817    pub fn rebuild_indexes(self, force: bool) -> crate::Result<String> {
818        let dir = self.dir.clone();
819        let result: crate::Result<_> = (|this: Log| {
820            if let Some(dir) = this.dir.clone().as_opt_path() {
821                let lock = ScopedDirLock::new(&dir)?;
822                this.rebuild_indexes_with_lock(force, &lock)
823            } else {
824                Ok(String::new())
825            }
826        })(self);
827
828        result
829            .context(|| format!("in Log::rebuild_indexes(force={})", force))
830            .context(|| format!("  Log.dir = {:?}", dir))
831    }
832
833    fn rebuild_indexes_with_lock(
834        mut self,
835        force: bool,
836        _lock: &ScopedDirLock,
837    ) -> crate::Result<String> {
838        let mut message = String::new();
839        {
840            if let Some(ref dir) = self.dir.as_opt_path() {
841                for (i, def) in self.open_options.index_defs.iter().enumerate() {
842                    let name = def.name.as_str();
843
844                    if let Some(index) = &self.indexes.get(i) {
845                        let should_skip = if force {
846                            false
847                        } else {
848                            match Self::get_index_log_len(index, true) {
849                                Err(_) => false,
850                                Ok(len) => {
851                                    if len > self.meta.primary_len {
852                                        message += &format!(
853                                            "Index {:?} is incompatible with (truncated) log\n",
854                                            name
855                                        );
856                                        false
857                                    } else if index.verify().is_ok() {
858                                        message +=
859                                            &format!("Index {:?} passed integrity check\n", name);
860                                        true
861                                    } else {
862                                        message +=
863                                            &format!("Index {:?} failed integrity check\n", name);
864                                        false
865                                    }
866                                }
867                            }
868                        };
869                        if should_skip {
870                            continue;
871                        } else {
872                            // Replace the index with a dummy, empty one.
873                            //
874                            // This will munmap index files, which is required on
875                            // Windows to rewrite the index files. It's also the reason
876                            // why it's hard to recover from an error state.
877                            //
878                            // This is also why this function consumes the Log object.
879                            self.indexes[i] = index::OpenOptions::new().create_in_memory()?;
880                        }
881                    }
882
883                    let tmp = tempfile::NamedTempFile::new_in(dir).context(&dir, || {
884                        format!("cannot create tempfile for rebuilding index {:?}", name)
885                    })?;
886                    let index_len = {
887                        let mut index = index::OpenOptions::new()
888                            .key_buf(Some(Arc::new(self.disk_buf.clone())))
889                            .open(&tmp.path())?;
890                        Self::update_index_for_on_disk_entry_unchecked(
891                            &self.dir,
892                            &mut index,
893                            def,
894                            &self.disk_buf,
895                            self.meta.primary_len,
896                        )?;
897                        index.flush()?
898                    };
899
900                    // Before replacing the index, set its "logic length" to 0 so
901                    // readers won't get inconsistent view about index length and data.
902                    let meta_path = dir.join(META_FILE);
903                    self.meta.indexes.insert(def.metaname(), 0);
904                    self.meta
905                        .write_file(&meta_path, self.open_options.fsync)
906                        .context(|| format!("  before replacing index {:?})", name))?;
907
908                    let _ = utils::fix_perm_file(tmp.as_file(), false);
909
910                    let path = dir.join(def.filename());
911                    tmp.persist(&path).map_err(|e| {
912                        crate::Error::wrap(Box::new(e), || {
913                            format!("cannot persist tempfile to replace index {:?}", name)
914                        })
915                    })?;
916
917                    self.meta.indexes.insert(def.metaname(), index_len);
918                    self.meta
919                        .write_file(&meta_path, self.open_options.fsync)
920                        .context(|| format!("  after replacing index {:?}", name))?;
921                    message += &format!("Rebuilt index {:?}\n", name);
922                }
923            }
924        }
925
926        Ok(message)
927    }
928
929    /// Look up an entry using the given index. The `index_id` is the index of
930    /// `index_defs` passed to [`Log::open`].
931    ///
932    /// Return an iterator of `Result<&[u8]>`, in reverse insertion order.
933    pub fn lookup<K: AsRef<[u8]>>(&self, index_id: usize, key: K) -> crate::Result<LogLookupIter> {
934        let result: crate::Result<_> = (|| {
935            self.maybe_return_index_error()?;
936            if let Some(index) = self.indexes.get(index_id) {
937                assert!(!key.as_ref().is_empty());
938                let link_offset = index.get(&key)?;
939                let inner_iter = link_offset.values(index);
940                Ok(LogLookupIter {
941                    inner_iter,
942                    errored: false,
943                    log: self,
944                })
945            } else {
946                let msg = format!(
947                    "invalid index_id {} (len={}, path={:?})",
948                    index_id,
949                    self.indexes.len(),
950                    &self.dir
951                );
952                Err(crate::Error::programming(msg))
953            }
954        })();
955        result
956            .context(|| format!("in Log::lookup({}, {:?})", index_id, key.as_ref()))
957            .context(|| format!("  Log.dir = {:?}", self.dir))
958    }
959
960    /// Look up keys and entries using the given prefix.
961    /// The `index_id` is the index of `index_defs` passed to [`Log::open`].
962    ///
963    /// Return an iterator that yields `(key, iter)`, where `key` is the full
964    /// key, `iter` is [`LogLookupIter`] that allows iteration through matched
965    /// entries.
966    pub fn lookup_prefix<K: AsRef<[u8]>>(
967        &self,
968        index_id: usize,
969        prefix: K,
970    ) -> crate::Result<LogRangeIter> {
971        let prefix = prefix.as_ref();
972        let result: crate::Result<_> = (|| {
973            let index = self.indexes.get(index_id).unwrap();
974            let inner_iter = index.scan_prefix(prefix)?;
975            Ok(LogRangeIter {
976                inner_iter,
977                errored: false,
978                log: self,
979                index,
980            })
981        })();
982        result
983            .context(|| format!("in Log::lookup_prefix({}, {:?})", index_id, prefix))
984            .context(|| format!("  Log.dir = {:?}", self.dir))
985    }
986
987    /// Look up keys and entries by querying a specified index about a specified
988    /// range.
989    ///
990    /// The `index_id` is the index of `index_defs` defined by [`OpenOptions`].
991    ///
992    /// Return an iterator that yields `(key, iter)`, where `key` is the full
993    /// key, `iter` is [`LogLookupIter`] that allows iteration through entries
994    /// matching that key.
995    pub fn lookup_range<'a>(
996        &self,
997        index_id: usize,
998        range: impl RangeBounds<&'a [u8]>,
999    ) -> crate::Result<LogRangeIter> {
1000        let start = range.start_bound();
1001        let end = range.end_bound();
1002        let result: crate::Result<_> = (|| {
1003            let index = self.indexes.get(index_id).unwrap();
1004            let inner_iter = index.range((start, end))?;
1005            Ok(LogRangeIter {
1006                inner_iter,
1007                errored: false,
1008                log: self,
1009                index,
1010            })
1011        })();
1012        result
1013            .context(|| {
1014                format!(
1015                    "in Log::lookup_range({}, {:?} to {:?})",
1016                    index_id, start, end,
1017                )
1018            })
1019            .context(|| format!("  Log.dir = {:?}", self.dir))
1020    }
1021
1022    /// Look up keys and entries using the given hex prefix.
1023    /// The length of the hex string can be odd.
1024    ///
1025    /// Return an iterator that yields `(key, iter)`, where `key` is the full
1026    /// key, `iter` is [`LogLookupIter`] that allows iteration through matched
1027    /// entries.
1028    pub fn lookup_prefix_hex<K: AsRef<[u8]>>(
1029        &self,
1030        index_id: usize,
1031        hex_prefix: K,
1032    ) -> crate::Result<LogRangeIter> {
1033        let prefix = hex_prefix.as_ref();
1034        let result: crate::Result<_> = (|| {
1035            let index = self.indexes.get(index_id).unwrap();
1036            let inner_iter = index.scan_prefix_hex(prefix)?;
1037            Ok(LogRangeIter {
1038                inner_iter,
1039                errored: false,
1040                log: self,
1041                index,
1042            })
1043        })();
1044        result
1045            .context(|| format!("in Log::lookup_prefix_hex({}, {:?})", index_id, prefix))
1046            .context(|| format!("  Log.dir = {:?}", self.dir))
1047    }
1048
1049    /// Return an iterator for all entries.
1050    pub fn iter(&self) -> LogIter {
1051        LogIter {
1052            log: self,
1053            next_offset: PRIMARY_START_OFFSET,
1054            errored: false,
1055        }
1056    }
1057
1058    /// Return an iterator for in-memory entries that haven't been flushed to disk.
1059    ///
1060    /// For in-memory Logs, this is the same as [`Log::iter`].
1061    pub fn iter_dirty(&self) -> LogIter {
1062        LogIter {
1063            log: self,
1064            next_offset: self.meta.primary_len,
1065            errored: false,
1066        }
1067    }
1068
1069    /// Applies the given index function to the entry data and returns the index keys.
1070    pub fn index_func<'a>(
1071        &self,
1072        index_id: usize,
1073        entry: &'a [u8],
1074    ) -> crate::Result<Vec<Cow<'a, [u8]>>> {
1075        let index_def = self.get_index_def(index_id)?;
1076        let mut result = vec![];
1077        for output in (index_def.func)(entry).into_iter() {
1078            result.push(
1079                output
1080                    .into_cow(&entry)
1081                    .context(|| format!("index_id = {}", index_id))?,
1082            );
1083        }
1084
1085        Ok(result)
1086    }
1087
1088    /// Return the fold state after calling `accumulate` on all (on-disk and
1089    /// in-memory) entries in insertion order.
1090    ///
1091    /// The fold function is the `fold_id`-th (0-based) `FoldDef` in
1092    /// [`OpenOptions`].
1093    pub fn fold(&self, fold_id: usize) -> crate::Result<&dyn Fold> {
1094        match self.all_folds.get(fold_id) {
1095            Some(f) => Ok(f.fold.as_ref()),
1096            None => Err(self.fold_out_of_bound(fold_id)),
1097        }
1098    }
1099
1100    fn fold_out_of_bound(&self, fold_id: usize) -> crate::Error {
1101        let msg = format!(
1102            "fold_id {} is out of bound (len={}, dir={:?})",
1103            fold_id,
1104            self.open_options.fold_defs.len(),
1105            &self.dir
1106        );
1107        crate::Error::programming(msg)
1108    }
1109
1110    /// Build in-memory index for the newly added entry.
1111    ///
1112    /// `offset` is the logical start offset of the entry.
1113    /// `data_offset` is the logical start offset of the real data (skips
1114    /// length, and checksum header in the entry).
1115    fn update_indexes_for_in_memory_entry(
1116        &mut self,
1117        data: &[u8],
1118        offset: u64,
1119        data_offset: u64,
1120    ) -> crate::Result<()> {
1121        let result = self.update_indexes_for_in_memory_entry_unchecked(data, offset, data_offset);
1122        self.maybe_set_index_error(result)
1123    }
1124
1125    /// Similar to `update_indexes_for_in_memory_entry`. But updates `fold` instead.
1126    fn update_fold_for_in_memory_entry(
1127        &mut self,
1128        data: &[u8],
1129        offset: u64,
1130        data_offset: u64,
1131    ) -> crate::Result<()> {
1132        for fold_state in self.all_folds.iter_mut() {
1133            fold_state.process_entry(data, offset, data_offset + data.len() as u64)?;
1134        }
1135        Ok(())
1136    }
1137
1138    fn update_indexes_for_in_memory_entry_unchecked(
1139        &mut self,
1140        data: &[u8],
1141        offset: u64,
1142        data_offset: u64,
1143    ) -> crate::Result<()> {
1144        for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
1145            for index_output in (def.func)(data) {
1146                match index_output {
1147                    IndexOutput::Reference(range) => {
1148                        assert!(range.start <= range.end && range.end <= data.len() as u64);
1149                        let start = range.start + data_offset;
1150                        let end = range.end + data_offset;
1151                        let key = InsertKey::Reference((start, end - start));
1152                        index.insert_advanced(key, InsertValue::Prepend(offset))?;
1153                    }
1154                    IndexOutput::Owned(key) => {
1155                        let key = InsertKey::Embed(&key);
1156                        index.insert_advanced(key, InsertValue::Prepend(offset))?;
1157                    }
1158                    IndexOutput::Remove(key) => {
1159                        index.remove(key)?;
1160                    }
1161                    IndexOutput::RemovePrefix(key) => {
1162                        index.remove_prefix(key)?;
1163                    }
1164                }
1165            }
1166        }
1167        Ok(())
1168    }
1169
1170    /// Incrementally update `disk_folds`.
1171    ///
1172    /// This is done by trying to reuse fold states from disk.
1173    /// If the on-disk fold state is outdated, then new fold states will be
1174    /// written to disk.
1175    fn update_and_flush_disk_folds(&mut self) -> crate::Result<()> {
1176        let mut folds = self.open_options.empty_folds();
1177        // Temporarily swap so `catch_up_with_log_on_disk_entries` can access `self`.
1178        std::mem::swap(&mut self.disk_folds, &mut folds);
1179        let result = (|| -> crate::Result<()> {
1180            for fold_state in folds.iter_mut() {
1181                fold_state.catch_up_with_log_on_disk_entries(self)?;
1182            }
1183            Ok(())
1184        })();
1185        self.disk_folds = folds;
1186        result
1187    }
1188
1189    /// Build in-memory index so they cover all entries stored in `self.disk_buf`.
1190    ///
1191    /// Returns number of entries built per index.
1192    fn update_indexes_for_on_disk_entries(&mut self) -> crate::Result<()> {
1193        let result = self.update_indexes_for_on_disk_entries_unchecked();
1194        self.maybe_set_index_error(result)
1195    }
1196
1197    fn update_indexes_for_on_disk_entries_unchecked(&mut self) -> crate::Result<()> {
1198        // It's a programming error to call this when mem_buf is not empty.
1199        for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
1200            Self::update_index_for_on_disk_entry_unchecked(
1201                &self.dir,
1202                index,
1203                def,
1204                &self.disk_buf,
1205                self.meta.primary_len,
1206            )?;
1207        }
1208        Ok(())
1209    }
1210
1211    fn update_index_for_on_disk_entry_unchecked(
1212        path: &GenericPath,
1213        index: &mut Index,
1214        def: &IndexDef,
1215        disk_buf: &Bytes,
1216        primary_len: u64,
1217    ) -> crate::Result<usize> {
1218        // The index meta is used to store the next offset the index should be built.
1219        let mut offset = Self::get_index_log_len(index, true)?;
1220        // How many times the index function gets called?
1221        let mut count = 0;
1222        // PERF: might be worthwhile to cache xxhash verification result.
1223        while let Some(entry_result) =
1224            Self::read_entry_from_buf(&path, disk_buf, offset).context(|| {
1225                format!(
1226                    "while updating index {:?} for on-disk entry at {}",
1227                    def.name, offset
1228                )
1229            })?
1230        {
1231            count += 1;
1232            let data = entry_result.data;
1233            for index_output in (def.func)(data) {
1234                match index_output {
1235                    IndexOutput::Reference(range) => {
1236                        assert!(range.start <= range.end && range.end <= data.len() as u64);
1237                        let start = range.start + entry_result.data_offset;
1238                        let end = range.end + entry_result.data_offset;
1239                        let key = InsertKey::Reference((start, end - start));
1240
1241                        index.insert_advanced(key, InsertValue::Prepend(offset))?;
1242                    }
1243                    IndexOutput::Owned(key) => {
1244                        let key = InsertKey::Embed(&key);
1245                        index.insert_advanced(key, InsertValue::Prepend(offset))?;
1246                    }
1247                    IndexOutput::Remove(key) => {
1248                        index.remove(key)?;
1249                    }
1250                    IndexOutput::RemovePrefix(key) => {
1251                        index.remove_prefix(key)?;
1252                    }
1253                }
1254            }
1255            offset = entry_result.next_offset;
1256        }
1257        // The index now contains all entries. Write "next_offset" as the index meta.
1258        Self::set_index_log_len(std::iter::once(index), primary_len);
1259
1260        Ok(count)
1261    }
1262
1263    /// Read [`LogMetadata`] from the given directory. If `create` is `true`,
1264    /// create an empty one on demand.
1265    ///
1266    /// The caller should ensure the directory exists and take a lock on it to
1267    /// avoid filesystem races.
1268    pub(crate) fn load_or_create_meta(
1269        path: &GenericPath,
1270        create: bool,
1271    ) -> crate::Result<LogMetadata> {
1272        Self::load_or_create_meta_internal(path, create)
1273    }
1274
1275    pub(crate) fn load_or_create_meta_internal(
1276        path: &GenericPath,
1277        create: bool,
1278    ) -> crate::Result<LogMetadata> {
1279        match path.read_meta() {
1280            Err(err) => {
1281                if err.io_error_kind() == io::ErrorKind::NotFound && create {
1282                    let dir = path.as_opt_path().unwrap();
1283                    // Create (and truncate) the primary log and indexes.
1284                    let primary_path = dir.join(PRIMARY_FILE);
1285                    let mut primary_file =
1286                        File::create(&primary_path).context(&primary_path, "cannot create")?;
1287                    primary_file
1288                        .write_all(PRIMARY_HEADER)
1289                        .context(&primary_path, "cannot write")?;
1290                    let _ = utils::fix_perm_file(&primary_file, false);
1291                    // Start from empty file and indexes.
1292                    let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
1293                    // An empty meta file is easy to recreate. No need to use fsync.
1294                    path.write_meta(&meta, false)?;
1295                    Ok(meta)
1296                } else {
1297                    Err(err)
1298                }
1299            }
1300            Ok(meta) => Ok(meta),
1301        }
1302    }
1303
1304    /// Read `(log.disk_buf, indexes)` from the directory using the metadata.
1305    ///
1306    /// If `reuse_indexes` is not None, they are existing indexes that match `index_defs`
1307    /// order. This should only be used in `sync` code path when the on-disk `meta` matches
1308    /// the in-memory `meta`. Otherwise it is not a sound use.
1309    ///
1310    /// The indexes loaded by this function can be lagging.
1311    /// Use `update_indexes_for_on_disk_entries` to update them.
1312    fn load_log_and_indexes(
1313        dir: &GenericPath,
1314        meta: &LogMetadata,
1315        index_defs: &[IndexDef],
1316        mem_buf: &Pin<Box<Vec<u8>>>,
1317        reuse_indexes: Option<&Vec<Index>>,
1318        fsync: bool,
1319    ) -> crate::Result<(Bytes, Vec<Index>)> {
1320        let primary_buf = match dir.as_opt_path() {
1321            Some(dir) => mmap_path(&dir.join(PRIMARY_FILE), meta.primary_len)?,
1322            None => Bytes::new(),
1323        };
1324
1325        let mem_buf: &Vec<u8> = &mem_buf;
1326        let mem_buf: *const Vec<u8> = mem_buf as *const Vec<u8>;
1327        let key_buf = Arc::new(ExternalKeyBuffer {
1328            disk_buf: primary_buf.clone(),
1329            disk_len: meta.primary_len,
1330            mem_buf,
1331        });
1332
1333        let indexes = match reuse_indexes {
1334            None => {
1335                // No indexes are reused, reload them.
1336                let mut indexes = Vec::with_capacity(index_defs.len());
1337                for def in index_defs.iter() {
1338                    let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
1339                    indexes.push(Self::load_index(
1340                        dir,
1341                        &def,
1342                        index_len,
1343                        key_buf.clone(),
1344                        fsync,
1345                    )?);
1346                }
1347                indexes
1348            }
1349            Some(indexes) => {
1350                assert_eq!(index_defs.len(), indexes.len());
1351                let mut new_indexes = Vec::with_capacity(indexes.len());
1352                // Avoid reloading the index from disk.
1353                // Update their ExternalKeyBuffer so they have the updated meta.primary_len.
1354                for (index, def) in indexes.iter().zip(index_defs) {
1355                    let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
1356                    let index = if index_len > Self::get_index_log_len(index, true).unwrap_or(0) {
1357                        Self::load_index(dir, &def, index_len, key_buf.clone(), fsync)?
1358                    } else {
1359                        let mut index = index.try_clone()?;
1360                        index.key_buf = key_buf.clone();
1361                        index
1362                    };
1363                    new_indexes.push(index);
1364                }
1365                new_indexes
1366            }
1367        };
1368        Ok((primary_buf, indexes))
1369    }
1370
1371    /// Return the reference to the [`GenericPath`] used to crate the [`Log`].
1372    pub fn path(&self) -> &GenericPath {
1373        &self.dir
1374    }
1375
1376    /// Load a single index.
1377    fn load_index(
1378        dir: &GenericPath,
1379        def: &IndexDef,
1380        len: u64,
1381        buf: Arc<dyn ReadonlyBuffer + Send + Sync>,
1382        fsync: bool,
1383    ) -> crate::Result<Index> {
1384        match dir.as_opt_path() {
1385            Some(dir) => {
1386                let path = dir.join(def.filename());
1387                index::OpenOptions::new()
1388                    .checksum_chunk_size_logarithm(INDEX_CHECKSUM_CHUNK_SIZE_LOGARITHM)
1389                    .logical_len(Some(len))
1390                    .key_buf(Some(buf))
1391                    .fsync(fsync)
1392                    .open(path)
1393            }
1394            None => index::OpenOptions::new()
1395                .logical_len(Some(len))
1396                .key_buf(Some(buf))
1397                .fsync(fsync)
1398                .create_in_memory(),
1399        }
1400    }
1401
1402    /// Read the entry at the given offset. Return `None` if offset is out of bound, or the content
1403    /// of the data, the real offset of the data, and the next offset. Raise errors if
1404    /// integrity-check failed.
1405    fn read_entry(&self, offset: u64) -> crate::Result<Option<EntryResult>> {
1406        let result = if offset < self.meta.primary_len {
1407            Self::read_entry_from_buf(&self.dir, &self.disk_buf, offset)?
1408        } else {
1409            let offset = offset - self.meta.primary_len;
1410            if offset >= self.mem_buf.len() as u64 {
1411                return Ok(None);
1412            }
1413            Self::read_entry_from_buf(&self.dir, &self.mem_buf, offset)?
1414                .map(|entry_result| entry_result.offset(self.meta.primary_len))
1415        };
1416        Ok(result)
1417    }
1418
1419    /// Read an entry at the given offset of the given buffer. Verify its integrity. Return the
1420    /// data, the real data offset, and the next entry offset. Return None if the offset is at
1421    /// the end of the buffer.  Raise errors if there are integrity check issues.
1422    fn read_entry_from_buf<'a>(
1423        path: &GenericPath,
1424        buf: &'a [u8],
1425        offset: u64,
1426    ) -> crate::Result<Option<EntryResult<'a>>> {
1427        let data_error = |msg: String| -> crate::Error {
1428            match path.as_opt_path() {
1429                Some(path) => crate::Error::corruption(path, msg),
1430                None => crate::Error::path(Path::new("<memory>"), msg),
1431            }
1432        };
1433
1434        use std::cmp::Ordering::Equal;
1435        use std::cmp::Ordering::Greater;
1436        match offset.cmp(&(buf.len() as u64)) {
1437            Equal => return Ok(None),
1438            Greater => {
1439                let msg = format!("read offset {} exceeds buffer size {}", offset, buf.len());
1440                return Err(data_error(msg));
1441            }
1442            _ => {}
1443        }
1444
1445        let (entry_flags, vlq_len): (u32, _) = buf.read_vlq_at(offset as usize).map_err(|e| {
1446            crate::Error::wrap(Box::new(e), || {
1447                format!("cannot read entry_flags at {}", offset)
1448            })
1449            .mark_corruption()
1450        })?;
1451        let offset = offset + vlq_len as u64;
1452
1453        // For now, data_len is the next field regardless of entry flags.
1454        let (data_len, vlq_len): (u64, _) = buf.read_vlq_at(offset as usize).map_err(|e| {
1455            crate::Error::wrap(Box::new(e), || {
1456                format!("cannot read data_len at {}", offset)
1457            })
1458            .mark_corruption()
1459        })?;
1460        let offset = offset + vlq_len as u64;
1461
1462        // Depends on entry_flags, some of them have a checksum field.
1463        let checksum_flags = entry_flags & (ENTRY_FLAG_HAS_XXHASH64 | ENTRY_FLAG_HAS_XXHASH32);
1464        let (checksum, offset) = match checksum_flags {
1465            ENTRY_FLAG_HAS_XXHASH64 => {
1466                let checksum = LittleEndian::read_u64(
1467                    &buf.get(offset as usize..offset as usize + 8)
1468                        .ok_or_else(|| {
1469                            data_error(format!("xxhash cannot be read at {}", offset))
1470                        })?,
1471                );
1472                (checksum, offset + 8)
1473            }
1474            ENTRY_FLAG_HAS_XXHASH32 => {
1475                let checksum = LittleEndian::read_u32(
1476                    &buf.get(offset as usize..offset as usize + 4)
1477                        .ok_or_else(|| {
1478                            data_error(format!("xxhash32 cannot be read at {}", offset))
1479                        })?,
1480                ) as u64;
1481                (checksum, offset + 4)
1482            }
1483            _ => {
1484                return Err(data_error(format!(
1485                    "entry at {} has malformed checksum metadata",
1486                    offset
1487                )));
1488            }
1489        };
1490
1491        // Read the actual payload
1492        let end = offset + data_len;
1493        if end > buf.len() as u64 {
1494            return Err(data_error(format!("incomplete entry data at {}", offset)));
1495        }
1496        let data = &buf[offset as usize..end as usize];
1497
1498        let verified = match checksum_flags {
1499            0 => true,
1500            ENTRY_FLAG_HAS_XXHASH64 => xxhash(&data) == checksum,
1501            ENTRY_FLAG_HAS_XXHASH32 => xxhash32(&data) as u64 == checksum,
1502            // Tested above. Therefore unreachable.
1503            _ => unreachable!(),
1504        };
1505        if verified {
1506            Ok(Some(EntryResult {
1507                data,
1508                data_offset: offset,
1509                next_offset: end,
1510            }))
1511        } else {
1512            Err(data_error(format!("integrity check failed at {}", offset)))
1513        }
1514    }
1515
1516    /// Wrapper around a `Result` returned by an index write operation.
1517    /// Make sure all index write operations are wrapped by this method.
1518    #[inline]
1519    fn maybe_set_index_error<T>(&mut self, result: crate::Result<T>) -> crate::Result<T> {
1520        if result.is_err() && !self.index_corrupted {
1521            self.index_corrupted = true;
1522        }
1523        result
1524    }
1525
1526    /// Wrapper to return an error if `index_corrupted` is set.
1527    /// Use this before doing index read operations.
1528    #[inline]
1529    fn maybe_return_index_error(&self) -> crate::Result<()> {
1530        if self.index_corrupted {
1531            let msg = "index is corrupted".to_string();
1532            Err(self.corruption(msg))
1533        } else {
1534            Ok(())
1535        }
1536    }
1537
1538    /// Get the log length (in bytes) covered by the given index.
1539    ///
1540    /// This only makes sense at open() or sync() time, since the data won't be updated
1541    /// by append() for performance reasons.
1542    ///
1543    /// `consider_dirty` specifies whether dirty entries in the Index are
1544    /// considered. It should be `true` for writing use-cases, since indexing
1545    /// an entry twice is an error. It can be set to `false` for detecting
1546    /// lags.
1547    fn get_index_log_len(index: &Index, consider_dirty: bool) -> crate::Result<u64> {
1548        let index_meta = if consider_dirty {
1549            index.get_meta()
1550        } else {
1551            index.get_original_meta()
1552        };
1553        Ok(if index_meta.is_empty() {
1554            // New index. Start processing at the first entry.
1555            PRIMARY_START_OFFSET
1556        } else {
1557            index_meta
1558                .read_vlq_at(0)
1559                .context(&index.path, || {
1560                    format!(
1561                        "index metadata cannot be parsed as an integer: {:?}",
1562                        &index_meta
1563                    )
1564                })?
1565                .0
1566        })
1567    }
1568
1569    /// Update the log length (in bytes) covered by the given indexes.
1570    ///
1571    /// `len` is usually `meta.primary_len`.
1572    fn set_index_log_len<'a>(indexes: impl Iterator<Item = &'a mut Index>, len: u64) {
1573        let mut index_meta = Vec::new();
1574        index_meta.write_vlq(len).unwrap();
1575        for index in indexes {
1576            index.set_meta(&index_meta);
1577        }
1578    }
1579}
1580
1581// Error-related utilities
1582
1583impl Log {
1584    /// Get the specified index, with error handling.
1585    fn get_index(&self, index_id: usize) -> crate::Result<&Index> {
1586        self.indexes.get(index_id).ok_or_else(|| {
1587            let msg = format!(
1588                "index_id {} is out of bound (len={}, dir={:?})",
1589                index_id,
1590                self.indexes.len(),
1591                &self.dir
1592            );
1593            crate::Error::programming(msg)
1594        })
1595    }
1596
1597    /// Get the specified index, with error handling.
1598    fn get_index_def(&self, index_id: usize) -> crate::Result<&IndexDef> {
1599        self.open_options.index_defs.get(index_id).ok_or_else(|| {
1600            let msg = format!(
1601                "index_id {} is out of bound (len={}, dir={:?})",
1602                index_id,
1603                self.indexes.len(),
1604                &self.dir
1605            );
1606            crate::Error::programming(msg)
1607        })
1608    }
1609
1610    fn corruption(&self, message: String) -> crate::Error {
1611        let path: &Path = match self.dir.as_opt_path() {
1612            Some(ref path) => &path,
1613            None => Path::new("<memory>"),
1614        };
1615        crate::Error::corruption(path, message)
1616    }
1617}
1618
1619/// "Pointer" to an entry. Used internally.
1620struct EntryResult<'a> {
1621    data: &'a [u8],
1622    data_offset: u64,
1623    next_offset: u64,
1624}
1625
1626impl<'a> EntryResult<'a> {
1627    /// Add some value to `next_offset`.
1628    fn offset(self, offset: u64) -> EntryResult<'a> {
1629        EntryResult {
1630            data: self.data,
1631            // `data_offset` is relative to the current buffer (disk_buf, or mem_buf).
1632            // So it does not need to be changed.
1633            data_offset: self.data_offset,
1634            next_offset: self.next_offset + offset,
1635        }
1636    }
1637}
1638
1639impl<'a> Iterator for LogLookupIter<'a> {
1640    type Item = crate::Result<&'a [u8]>;
1641
1642    fn next(&mut self) -> Option<Self::Item> {
1643        if self.errored {
1644            return None;
1645        }
1646        match self.inner_iter.next() {
1647            None => None,
1648            Some(Err(err)) => {
1649                self.errored = true;
1650                Some(Err(err))
1651            }
1652            Some(Ok(offset)) => match self
1653                .log
1654                .read_entry(offset)
1655                .context("in LogLookupIter::next")
1656            {
1657                Ok(Some(entry)) => Some(Ok(entry.data)),
1658                Ok(None) => None,
1659                Err(err) => {
1660                    // Do not set this iterator to an error state. It's possible
1661                    // that the index iterator still provides valid data, and
1662                    // only the "log" portion is corrupted.
1663                    //
1664                    // The index iterator is finite if integrity check is turned
1665                    // on. So trust it and don't worry about infinite iteration
1666                    // here.
1667                    Some(Err(err))
1668                }
1669            },
1670        }
1671    }
1672}
1673
1674impl<'a> LogLookupIter<'a> {
1675    /// A convenient way to get data.
1676    pub fn into_vec(self) -> crate::Result<Vec<&'a [u8]>> {
1677        self.collect()
1678    }
1679}
1680
1681impl<'a> Iterator for LogIter<'a> {
1682    type Item = crate::Result<&'a [u8]>;
1683
1684    fn next(&mut self) -> Option<Self::Item> {
1685        if self.errored {
1686            return None;
1687        }
1688        match self
1689            .log
1690            .read_entry(self.next_offset)
1691            .context("in LogIter::next")
1692        {
1693            Err(e) => {
1694                self.errored = true;
1695                Some(Err(e))
1696            }
1697            Ok(Some(entry_result)) => {
1698                assert!(entry_result.next_offset > self.next_offset);
1699                self.next_offset = entry_result.next_offset;
1700                Some(Ok(entry_result.data))
1701            }
1702            Ok(None) => None,
1703        }
1704    }
1705}
1706
1707impl<'a> LogRangeIter<'a> {
1708    /// Wrap `next()` or `next_back()` result by the inner iterator.
1709    fn wrap_inner_next_result(
1710        &mut self,
1711        item: Option<crate::Result<(Cow<'a, [u8]>, index::LinkOffset)>>,
1712    ) -> Option<crate::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>> {
1713        match item {
1714            None => None,
1715            Some(Err(err)) => {
1716                self.errored = true;
1717                Some(Err(err))
1718            }
1719            Some(Ok((key, link_offset))) => {
1720                let iter = LogLookupIter {
1721                    inner_iter: link_offset.values(self.index),
1722                    errored: false,
1723                    log: self.log,
1724                };
1725                Some(Ok((key, iter)))
1726            }
1727        }
1728    }
1729}
1730
1731impl<'a> Iterator for LogRangeIter<'a> {
1732    type Item = crate::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>;
1733
1734    fn next(&mut self) -> Option<Self::Item> {
1735        if self.errored {
1736            return None;
1737        }
1738        let inner = self.inner_iter.next();
1739        self.wrap_inner_next_result(inner)
1740    }
1741}
1742
1743impl<'a> DoubleEndedIterator for LogRangeIter<'a> {
1744    fn next_back(&mut self) -> Option<Self::Item> {
1745        if self.errored {
1746            return None;
1747        }
1748        let inner = self.inner_iter.next_back();
1749        self.wrap_inner_next_result(inner)
1750    }
1751}
1752
1753impl Debug for Log {
1754    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
1755        let mut count = 0;
1756        let mut iter = self.iter();
1757        let bytes_per_line = 16;
1758        loop {
1759            let offset = iter.next_offset;
1760            count += 1;
1761            match iter.next() {
1762                None => break,
1763                Some(Ok(bytes)) => {
1764                    if count > 1 {
1765                        write!(f, "\n")?;
1766                    }
1767                    write!(f, "# Entry {}:\n", count)?;
1768                    for (i, chunk) in bytes.chunks(bytes_per_line).enumerate() {
1769                        write!(f, "{:08x}:", offset as usize + i * bytes_per_line)?;
1770                        for b in chunk {
1771                            write!(f, " {:02x}", b)?;
1772                        }
1773                        for _ in chunk.len()..bytes_per_line {
1774                            write!(f, "   ")?;
1775                        }
1776                        write!(f, "  ")?;
1777                        for &b in chunk {
1778                            let ch = match b {
1779                                0x20..=0x7e => b as char, // printable
1780                                _ => '.',
1781                            };
1782                            write!(f, "{}", ch)?;
1783                        }
1784                        write!(f, "\n")?;
1785                    }
1786                }
1787                Some(Err(err)) => writeln!(f, "# Error: {:?}", err)?,
1788            }
1789        }
1790        Ok(())
1791    }
1792}
1793
1794impl ReadonlyBuffer for ExternalKeyBuffer {
1795    #[inline]
1796    fn slice(&self, start: u64, len: u64) -> Option<&[u8]> {
1797        if start < self.disk_len {
1798            self.disk_buf.get((start as usize)..(start + len) as usize)
1799        } else {
1800            let start = start - self.disk_len;
1801            // See "UNSAFE NOTICE" in ExternalKeyBuffer definition.
1802            // This pointer cannot be null.
1803            let mem_buf = unsafe { &*self.mem_buf };
1804            mem_buf.get((start as usize)..(start + len) as usize)
1805        }
1806    }
1807}