indexedlog/
log.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8//! Append-only storage with indexes and integrity checks.
9//!
10//! See [`Log`] for the main structure. This module also provides surrounding
11//! types needed to construct the [`Log`], including [`IndexDef`] and some
12//! iterators.
13
14// Detailed file formats:
15//
16// Primary log:
17//   LOG := HEADER + ENTRY_LIST
18//   HEADER := 'log\0'
19//   ENTRY_LIST := '' | ENTRY_LIST + ENTRY
20//   ENTRY := ENTRY_FLAGS + LEN(CONTENT) + CHECKSUM + CONTENT
21//   CHECKSUM := '' | XXHASH64(CONTENT) | XXHASH32(CONTENT)
22//
23// Metadata:
24//   META := HEADER + XXHASH64(DATA) + LEN(DATA) + DATA
25//   HEADER := 'meta\0'
26//   DATA := LEN(LOG) + LEN(INDEXES) + INDEXES
27//   INDEXES := '' | INDEXES + INDEX
28//   INDEX := LEN(NAME) + NAME + INDEX_LOGIC_LEN
29//
30// Indexes:
31//   See `index.rs`.
32//
33// Integers are VLQ encoded, except for XXHASH64 and XXHASH32, which uses
34// LittleEndian encoding.
35
36use std::borrow::Cow;
37use std::fmt;
38use std::fmt::Debug;
39use std::fmt::Formatter;
40use std::fs;
41use std::fs::File;
42use std::io;
43use std::io::Seek;
44use std::io::SeekFrom;
45use std::io::Write;
46use std::ops::RangeBounds;
47use std::path::Path;
48use std::pin::Pin;
49use std::sync::Arc;
50
51use byteorder::ByteOrder;
52use byteorder::LittleEndian;
53use byteorder::WriteBytesExt;
54use minibytes::Bytes;
55use tracing::debug_span;
56use tracing::trace;
57use vlqencoding::VLQDecodeAt;
58use vlqencoding::VLQEncode;
59
60use crate::change_detect::SharedChangeDetector;
61use crate::config;
62use crate::errors::IoResultExt;
63use crate::errors::ResultExt;
64use crate::index;
65use crate::index::Index;
66use crate::index::InsertKey;
67use crate::index::InsertValue;
68use crate::index::LeafValueIter;
69use crate::index::RangeIter;
70use crate::index::ReadonlyBuffer;
71use crate::lock::ScopedDirLock;
72use crate::lock::READER_LOCK_OPTS;
73use crate::utils;
74use crate::utils::mmap_path;
75use crate::utils::xxhash;
76use crate::utils::xxhash32;
77
78mod fold;
79mod meta;
80mod open_options;
81mod path;
82mod repair;
83#[cfg(test)]
84pub(crate) mod tests;
85mod wait;
86
87pub use open_options::ChecksumType;
88pub use open_options::FlushFilterContext;
89pub use open_options::FlushFilterFunc;
90pub use open_options::FlushFilterOutput;
91pub use open_options::IndexDef;
92pub use open_options::IndexOutput;
93pub use open_options::OpenOptions;
94pub use path::GenericPath;
95
96pub use self::fold::Fold;
97pub use self::fold::FoldDef;
98use self::fold::FoldState;
99pub use self::meta::LogMetadata;
100pub use self::wait::Wait;
101
102// Constants about file names
103pub(crate) const PRIMARY_FILE: &str = "log";
104const PRIMARY_HEADER: &[u8] = b"indexedlog0\0";
105const PRIMARY_START_OFFSET: u64 = 12; // PRIMARY_HEADER.len() as u64;
106pub(crate) const META_FILE: &str = "meta";
107
108const ENTRY_FLAG_HAS_XXHASH64: u32 = 1;
109const ENTRY_FLAG_HAS_XXHASH32: u32 = 2;
110
111// 1MB index checksum. This makes checksum file within one block (4KB) for 512MB index.
112const INDEX_CHECKSUM_CHUNK_SIZE_LOGARITHM: u32 = 20;
113
114/// An append-only storage with indexes and integrity checks.
115///
116/// The [`Log`] is backed by a directory in the filesystem. The
117/// directory includes:
118///
119/// - An append-only "log" file. It can be seen as a serialization
120///   result of an append-only list of byte slices. Each byte slice
121///   has a checksum.
122/// - Multiple user-defined indexes. Each index has an append-only
123///   on-disk radix-tree representation and a small, separate,
124///   non-append-only checksum file.
125/// - A small "metadata" file which records the logic lengths (in bytes)
126///   for the log and index files.
127///
128/// Reading is lock-free because the log and indexes are append-only.
129/// Writes are buffered in memory. Flushing in-memory parts to
130/// disk requires taking a flock on the directory.
131pub struct Log {
132    pub dir: GenericPath,
133    pub(crate) disk_buf: Bytes,
134    pub(crate) mem_buf: Pin<Box<Vec<u8>>>,
135    pub(crate) meta: LogMetadata,
136    indexes: Vec<Index>,
137    // On-demand caches of the folds defined by open_options.
138    // disk_folds only includes clean (on-disk) entries.
139    // all_folds includes both clean (on-disk) and dirty (in-memory) entries.
140    disk_folds: Vec<FoldState>,
141    all_folds: Vec<FoldState>,
142    // Whether the index and the log is out-of-sync. In which case, index-based reads (lookups)
143    // should return errors because it can no longer be trusted.
144    // This could be improved to be per index. For now, it's a single state for simplicity. It's
145    // probably fine considering index corruptions are rare.
146    index_corrupted: bool,
147    open_options: OpenOptions,
148    // Indicate an active reader. Destrictive writes (repair) are unsafe.
149    reader_lock: Option<ScopedDirLock>,
150    // Cross-process cheap change detector backed by mmap.
151    change_detector: Option<SharedChangeDetector>,
152}
153
154/// Iterator over all entries in a [`Log`].
155pub struct LogIter<'a> {
156    next_offset: u64,
157    errored: bool,
158    log: &'a Log,
159}
160
161/// Iterator over [`Log`] entries selected by an index lookup.
162///
163/// It is a wrapper around [index::LeafValueIter].
164pub struct LogLookupIter<'a> {
165    inner_iter: LeafValueIter<'a>,
166    errored: bool,
167    log: &'a Log,
168}
169
170/// Iterator over keys and [`LogLookupIter`], filtered by an index prefix.
171///
172/// It is a wrapper around [index::RangeIter].
173pub struct LogRangeIter<'a> {
174    inner_iter: RangeIter<'a>,
175    errored: bool,
176    log: &'a Log,
177    index: &'a Index,
178}
179
180/// Satisfy [`index::ReadonlyBuffer`] trait so [`Log`] can use external
181/// keys on [`Index`] for in-memory-only entries.
182struct ExternalKeyBuffer {
183    disk_buf: Bytes,
184    disk_len: u64,
185
186    // Prove the pointer is valid:
187    // 1. If ExternalKeyBuffer is alive, then the Index owning it is alive.
188    //    This is because ExternalKeyBuffer is private to Index, and there
189    //    is no way to get a clone of ExternalKeyBuffer without also
190    //    cloning its owner (Index).
191    // 2. If the Index owning ExternalKeyBuffer is alive, then the Log
192    //    owning the Index is alive. Similarily, Index is private to Log,
193    //    and there is no way to just clone the Index without cloning
194    //    its owner (Log).
195    // 3. If Log is alive, then Log.mem_buf is alive.
196    // 4. Log.mem_buf is pinned, so this pointer is valid.
197    //
198    // Here is why `Arc<Mutex<Vec<u8>>>` is not fesiable:
199    //
200    // - Bad performance: The Mutex overhead is visible.
201    //   "log insertion (no checksum)" takes 2x time.
202    //   "log insertion" and "log iteration (memory)" take 1.5x time.
203    // - Unsafe Rust is still necessary.
204    //   In [`Log::read_entry`], reading the in-memory entry case,
205    //   the borrow relationship changes from `&Log -> &[u8]` to
206    //   `&Log -> &MutexGuard -> &[u8]`, which means unsafe Rust is
207    //   needed, or it has to take the mutex lock. Neither desirable.
208    //
209    // Here is why normal liftime is not fesiable:
210    // - A normal lifetime will enforce the `mem_buf` to be read-only.
211    //   But Log needs to write to it.
212    //
213    // Note: Rust reference cannot be used here, because a reference implies
214    // LLVM "noalias", which is not true since Log can change Log.mem_buf.
215    //
216    // (UNSAFE NOTICE)
217    mem_buf: *const Vec<u8>,
218}
219
220// mem_buf can be read from multiple threads at the same time if no thread is
221// changing the actual mem_buf. If there is a thread changing mem_buf by
222// calling Log::append(&mut self, ...), then the compiler should make sure
223// Log methods taking &self are not called at the same time.
224unsafe impl Send for ExternalKeyBuffer {}
225unsafe impl Sync for ExternalKeyBuffer {}
226
227// Some design notes:
228// - Public APIs do not expose internal offsets of entries. This avoids issues when an in-memory
229//   entry gets moved after `flush`.
230// - The only write-to-disk operation is `flush`, aside from creating an empty `Log`. This makes it
231//   easier to verify correctness - just make sure `flush` is properly handled (ex. by locking).
232
233impl Log {
234    /// Construct [`Log`] at given directory. Incrementally build up specified
235    /// indexes.
236    ///
237    /// Create the [`Log`] if it does not exist.
238    ///
239    /// See [`OpenOptions::open`] for details.
240    pub fn open<P: AsRef<Path>>(dir: P, index_defs: Vec<IndexDef>) -> crate::Result<Self> {
241        OpenOptions::new()
242            .index_defs(index_defs)
243            .create(true)
244            .open(dir.as_ref())
245    }
246
247    /// Append an entry in-memory. Update related indexes in-memory.
248    ///
249    /// The memory part is not shared. Therefore other [`Log`] instances won't see
250    /// the change immediately.
251    ///
252    /// To write in-memory entries and indexes to disk, call [`Log::sync`].
253    pub fn append<T: AsRef<[u8]>>(&mut self, data: T) -> crate::Result<()> {
254        let result: crate::Result<_> = (|| {
255            let data = data.as_ref();
256
257            let checksum_type = if self.open_options.checksum_type == ChecksumType::Auto {
258                // xxhash64 is slower for smaller data. A quick benchmark on x64 platform shows:
259                //
260                // bytes  xxhash32  xxhash64 (MB/s)
261                //   32       1882      1600
262                //   40       1739      1538
263                //   48       2285      1846
264                //   56       2153      2000
265                //   64       2666      2782
266                //   72       2400      2322
267                //   80       2962      2758
268                //   88       2750      2750
269                //   96       3200      3692
270                //  104       2810      3058
271                //  112       3393      3500
272                //  120       3000      3428
273                //  128       3459      4266
274                const XXHASH64_THRESHOLD: usize = 88;
275                if data.len() >= XXHASH64_THRESHOLD {
276                    ChecksumType::Xxhash64
277                } else {
278                    ChecksumType::Xxhash32
279                }
280            } else {
281                self.open_options.checksum_type
282            };
283
284            let offset = self.meta.primary_len + self.mem_buf.len() as u64;
285
286            // Design note: Currently checksum_type is the only thing that decides
287            // entry_flags.  Entry flags is not designed to just cover different
288            // checksum types.  For example, if we'd like to introduce transparent
289            // compression (maybe not a good idea since it can be more cleanly built
290            // at an upper layer), or some other ways to store data (ex. reference
291            // to other data, or fixed length data), they can probably be done by
292            // extending the entry type.
293            let mut entry_flags = 0;
294            entry_flags |= match checksum_type {
295                ChecksumType::Xxhash64 => ENTRY_FLAG_HAS_XXHASH64,
296                ChecksumType::Xxhash32 => ENTRY_FLAG_HAS_XXHASH32,
297                ChecksumType::Auto => unreachable!(),
298            };
299
300            self.mem_buf.write_vlq(entry_flags).infallible()?;
301            self.mem_buf.write_vlq(data.len()).infallible()?;
302
303            match checksum_type {
304                ChecksumType::Xxhash64 => {
305                    self.mem_buf
306                        .write_u64::<LittleEndian>(xxhash(data))
307                        .infallible()?;
308                }
309                ChecksumType::Xxhash32 => {
310                    self.mem_buf
311                        .write_u32::<LittleEndian>(xxhash32(data))
312                        .infallible()?;
313                }
314                ChecksumType::Auto => unreachable!(),
315            };
316            let data_offset = self.meta.primary_len + self.mem_buf.len() as u64;
317
318            self.mem_buf.write_all(data).infallible()?;
319            self.update_indexes_for_in_memory_entry(data, offset, data_offset)?;
320            self.update_fold_for_in_memory_entry(data, offset, data_offset)?;
321
322            if let Some(threshold) = self.open_options.auto_sync_threshold {
323                if self.mem_buf.len() as u64 >= threshold {
324                    self.sync()
325                        .context("sync triggered by auto_sync_threshold")?;
326                }
327            }
328
329            Ok(())
330        })();
331
332        result
333            .context(|| {
334                let data = data.as_ref();
335                if data.len() < 128 {
336                    format!("in Log::append({:?})", data)
337                } else {
338                    format!("in Log::append(<a {}-byte long slice>)", data.len())
339                }
340            })
341            .context(|| format!("  Log.dir = {:?}", self.dir))
342    }
343
344    /// Remove dirty (in-memory) state. Restore the [`Log`] to the state as
345    /// if it's just loaded from disk without modifications.
346    pub fn clear_dirty(&mut self) -> crate::Result<()> {
347        let result: crate::Result<_> = (|| {
348            self.maybe_return_index_error()?;
349            for index in self.indexes.iter_mut() {
350                index.clear_dirty();
351            }
352            self.mem_buf.clear();
353            self.all_folds = self.disk_folds.clone();
354            self.update_indexes_for_on_disk_entries()?;
355            Ok(())
356        })();
357        result
358            .context("in Log::clear_dirty")
359            .context(|| format!("  Log.dir = {:?}", self.dir))
360    }
361
362    /// Return a cloned [`Log`] with pending in-memory changes.
363    pub fn try_clone(&self) -> crate::Result<Self> {
364        self.try_clone_internal(true)
365            .context("in Log:try_clone")
366            .context(|| format!("  Log.dir = {:?}", self.dir))
367    }
368
369    /// Return a cloned [`Log`] without pending in-memory changes.
370    ///
371    /// This is logically equivalent to calling `clear_dirty` immediately
372    /// on the result after `try_clone`, but potentially cheaper.
373    pub fn try_clone_without_dirty(&self) -> crate::Result<Self> {
374        self.try_clone_internal(false)
375            .context("in Log:try_clone_without_dirty")
376    }
377
378    fn try_clone_internal(&self, copy_dirty: bool) -> crate::Result<Self> {
379        self.maybe_return_index_error()?;
380
381        // Prepare cloned versions of things.
382        let mut indexes = self
383            .indexes
384            .iter()
385            .map(|i| i.try_clone_internal(copy_dirty))
386            .collect::<Result<Vec<Index>, _>>()?;
387        let disk_buf = self.disk_buf.clone();
388        let mem_buf = if copy_dirty {
389            self.mem_buf.clone()
390        } else {
391            Box::pin(Vec::new())
392        };
393
394        {
395            // Update external key buffer of indexes to point to the new mem_buf.
396            let mem_buf: &Vec<u8> = &mem_buf;
397            let mem_buf: *const Vec<u8> = mem_buf as *const Vec<u8>;
398            let index_key_buf = Arc::new(ExternalKeyBuffer {
399                disk_buf: disk_buf.clone(),
400                disk_len: self.meta.primary_len,
401                mem_buf,
402            });
403            for index in indexes.iter_mut() {
404                index.key_buf = index_key_buf.clone();
405            }
406        }
407
408        let reader_lock = match self.dir.as_opt_path() {
409            Some(d) => Some(ScopedDirLock::new_with_options(d, &READER_LOCK_OPTS)?),
410            None => None,
411        };
412
413        // Create the new Log.
414        let mut log = Log {
415            dir: self.dir.clone(),
416            disk_buf,
417            mem_buf,
418            meta: self.meta.clone(),
419            indexes,
420            disk_folds: self.disk_folds.clone(),
421            all_folds: if copy_dirty {
422                &self.all_folds
423            } else {
424                &self.disk_folds
425            }
426            .clone(),
427            index_corrupted: false,
428            open_options: self.open_options.clone(),
429            reader_lock,
430            change_detector: self.change_detector.clone(),
431        };
432
433        if !copy_dirty {
434            // The indexes can be lagging. Update them.
435            // This is similar to what clear_dirty does.
436            log.update_indexes_for_on_disk_entries()?;
437        }
438
439        Ok(log)
440    }
441
442    /// Load the latest data from disk. Write in-memory entries to disk.
443    ///
444    /// After writing, update on-disk indexes. These happen in a same critical
445    /// section, protected by a lock on the directory.
446    ///
447    /// Even if [`Log::append`] is never called, this function has a side effect
448    /// updating the [`Log`] to contain latest entries on disk.
449    ///
450    /// Other [`Log`] instances living in a same process or other processes won't
451    /// be notified about the change and they can only access the data
452    /// "snapshotted" at open time.
453    ///
454    /// Return the size of the updated primary log file in bytes.
455    ///
456    /// For in-memory-only Logs, this function does nothing, and returns 0.
457    pub fn sync(&mut self) -> crate::Result<u64> {
458        let result: crate::Result<_> = (|| {
459            let span = debug_span!("Log::sync", dirty_bytes = self.mem_buf.len());
460            if let Some(dir) = &self.dir.as_opt_path() {
461                span.record("dir", dir.to_string_lossy().as_ref());
462            }
463            let _guard = span.enter();
464
465            if self.dir.as_opt_path().is_none() {
466                // See Index::flush for why this is not an Err.
467                return Ok(0);
468            }
469
470            fn check_append_only(this: &Log, new_meta: &LogMetadata) -> crate::Result<()> {
471                let old_meta = &this.meta;
472                if old_meta.primary_len > new_meta.primary_len {
473                    Err(crate::Error::path(
474                        this.dir.as_opt_path().unwrap(),
475                        format!(
476                            "on-disk log is unexpectedly smaller ({} bytes) than its previous version ({} bytes)",
477                            new_meta.primary_len, old_meta.primary_len
478                        ),
479                    ))
480                } else {
481                    Ok(())
482                }
483            }
484
485            // Read-only fast path - no need to take directory lock.
486            if self.mem_buf.is_empty() {
487                if let Ok(meta) = Self::load_or_create_meta(&self.dir, false) {
488                    let changed = self.meta != meta;
489                    let truncated = self.meta.epoch != meta.epoch;
490                    if !truncated {
491                        check_append_only(self, &meta)?;
492                    }
493                    // No need to reload anything if metadata hasn't changed.
494                    if changed {
495                        // Indexes cannot be reused, if epoch has changed. Otherwise,
496                        // Indexes can be reused, since they do not have new in-memory
497                        // entries, and the on-disk primary log is append-only (so data
498                        // already present in the indexes is valid).
499                        *self = self.open_options.clone().open_internal(
500                            &self.dir,
501                            if truncated { None } else { Some(&self.indexes) },
502                            None,
503                        )?;
504                    }
505                } else {
506                    // If meta can not be read, do not error out.
507                    // This Log can still be used to answer queries.
508                    //
509                    // This behavior makes Log more friendly for cases where an
510                    // external process does a `rm -rf` and the current process
511                    // does a `sync()` just for loading new data. Not erroring
512                    // out and pretend that nothing happended.
513                }
514                self.update_change_detector_to_match_meta();
515                return Ok(self.meta.primary_len);
516            }
517
518            // Take the lock so no other `flush` runs for this directory. Then reload meta, append
519            // log, then update indexes.
520            let dir = self.dir.as_opt_path().unwrap().to_path_buf();
521            let lock = ScopedDirLock::new(&dir)?;
522
523            // Step 1: Reload metadata to get the latest view of the files.
524            let mut meta = Self::load_or_create_meta(&self.dir, false)?;
525            let changed = self.meta != meta;
526            let truncated = self.meta.epoch != meta.epoch;
527            if !truncated {
528                check_append_only(self, &meta)?;
529            }
530
531            // Cases where Log and Indexes need to be reloaded.
532            if changed && self.open_options.flush_filter.is_some() {
533                let filter = self.open_options.flush_filter.unwrap();
534
535                // Start with a clean log that does not have dirty entries.
536                let mut log = self
537                    .open_options
538                    .clone()
539                    .open_with_lock(&self.dir, &lock)
540                    .context("re-open to run flush_filter")?;
541
542                for entry in self.iter_dirty() {
543                    let content = entry?;
544                    let context = FlushFilterContext { log: &log };
545                    // Re-insert entries to that clean log.
546                    match filter(&context, content)
547                        .map_err(|err| crate::Error::wrap(err, "failed to run filter function"))?
548                    {
549                        FlushFilterOutput::Drop => {}
550                        FlushFilterOutput::Keep => log.append(content)?,
551                        FlushFilterOutput::Replace(content) => log.append(content)?,
552                    }
553                }
554
555                // Replace "self" so we can continue flushing the updated data.
556                *self = log;
557            } else if truncated {
558                // Reload log and indexes, and re-insert entries.
559                let mut log = self
560                    .open_options
561                    .clone()
562                    .open_with_lock(&self.dir, &lock)
563                    .context(|| {
564                        format!(
565                            "re-open since epoch has changed ({} to {})",
566                            self.meta.epoch, meta.epoch
567                        )
568                    })?;
569
570                for entry in self.iter_dirty() {
571                    let content = entry?;
572                    log.append(content)?;
573                }
574
575                // Replace "self" so we can continue flushing the updated data.
576                *self = log;
577            }
578
579            // Step 2: Append to the primary log.
580            let primary_path = self.dir.as_opt_path().unwrap().join(PRIMARY_FILE);
581            let mut primary_file = fs::OpenOptions::new()
582                .read(true)
583                .write(true)
584                .open(&primary_path)
585                .context(&primary_path, "cannot open for read-write")?;
586
587            // It's possible that the previous write was interrupted. In that case,
588            // the length of "log" can be longer than the length of "log" stored in
589            // the metadata. Seek to the length specified by the metadata and
590            // overwrite (broken) data.
591            // This breaks the "append-only" property of the physical file. But all
592            // readers use "meta" to decide the length of "log". So "log" is still
593            // append-only as seen by readers, as long as the length specified in
594            // "meta" is append-only (i.e. "meta" is not rewritten to have a smaller
595            // length, and all bytes in the specified length are immutable).
596            // Note: file.set_len might easily fail on Windows due to mmap.
597            let pos = primary_file
598                .seek(SeekFrom::Start(meta.primary_len))
599                .context(&primary_path, || {
600                    format!("cannot seek to {}", meta.primary_len)
601                })?;
602            if pos != meta.primary_len {
603                let msg = format!(
604                    "log file {} has {} bytes, expect at least {} bytes",
605                    primary_path.to_string_lossy(),
606                    pos,
607                    meta.primary_len
608                );
609                // This might be another process re-creating the file.
610                // Do not consider this as a corruption (?).
611                // TODO: Review this decision.
612                let err = crate::Error::path(&primary_path, msg);
613                return Err(err);
614            }
615
616            // Actually write the primary log. Once it's written, we can remove the in-memory buffer.
617            primary_file
618                .write_all(&self.mem_buf)
619                .context(&primary_path, || {
620                    format!("cannot write data ({} bytes)", self.mem_buf.len())
621                })?;
622
623            if self.open_options.fsync || config::get_global_fsync() {
624                primary_file
625                    .sync_all()
626                    .context(&primary_path, "cannot fsync")?;
627            }
628
629            meta.primary_len += self.mem_buf.len() as u64;
630            self.mem_buf.clear();
631
632            // Step 3: Reload primary log and indexes to get the latest view.
633            let (disk_buf, indexes) = Self::load_log_and_indexes(
634                &self.dir,
635                &meta,
636                &self.open_options.index_defs,
637                &self.mem_buf,
638                if changed {
639                    // Existing indexes cannot be reused.
640                    None
641                } else {
642                    // Indexes can be reused, because they already contain all entries
643                    // that were just written to disk and the on-disk files do not
644                    // have new entries (tested by "self.meta != meta" in Step 1).
645                    //
646                    // The indexes contain all entries, because they were previously
647                    // "always-up-to-date", and the on-disk log does not have anything new.
648                    // Update "meta" so "update_indexes_for_on_disk_entries" below won't
649                    // re-index entries.
650                    //
651                    // This is needed because `Log::append` updated indexes in-memory but
652                    // did not update their metadata for performance. This is to update
653                    // the metadata stored in Indexes.
654                    Self::set_index_log_len(self.indexes.iter_mut(), meta.primary_len);
655                    Some(&self.indexes)
656                },
657                self.open_options.fsync,
658            )?;
659
660            self.disk_buf = disk_buf;
661            self.indexes = indexes;
662            self.meta = meta;
663
664            // Step 4: Update the indexes and folds. Optionally flush them.
665            self.update_indexes_for_on_disk_entries()?;
666            let lagging_index_ids = self.lagging_index_ids();
667            self.flush_lagging_indexes(&lagging_index_ids, &lock)?;
668            self.update_and_flush_disk_folds()?;
669            self.all_folds = self.disk_folds.clone();
670
671            // Step 5: Write the updated meta file.
672            self.dir.write_meta(&self.meta, self.open_options.fsync)?;
673
674            // Bump the change detector to communicate the change.
675            self.update_change_detector_to_match_meta();
676
677            Ok(self.meta.primary_len)
678        })();
679
680        result
681            .context("in Log::sync")
682            .context(|| format!("  Log.dir = {:?}", self.dir))
683    }
684
685    pub(crate) fn update_change_detector_to_match_meta(&self) {
686        if let Some(detector) = &self.change_detector {
687            detector.set(self.meta.primary_len ^ self.meta.epoch);
688        }
689    }
690
691    /// Write (updated) lagging indexes back to disk.
692    /// Usually called after `update_indexes_for_on_disk_entries`.
693    /// This function might change `self.meta`. Be sure to write `self.meta` to
694    /// save the result.
695    pub(crate) fn flush_lagging_indexes(
696        &mut self,
697        index_ids: &[usize],
698        _lock: &ScopedDirLock,
699    ) -> crate::Result<()> {
700        for &index_id in index_ids.iter() {
701            let metaname = self.open_options.index_defs[index_id].metaname();
702            let new_length = self.indexes[index_id].flush();
703            let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
704            self.meta.indexes.insert(metaname, new_length);
705            trace!(
706                name = "Log::flush_lagging_index",
707                index_name = self.open_options.index_defs[index_id].name.as_str(),
708                new_index_length = new_length,
709            );
710        }
711        Ok(())
712    }
713
714    /// Return the index to indexes that are considered lagging.
715    /// This is usually followed by `update_indexes_for_on_disk_entries`.
716    pub(crate) fn lagging_index_ids(&self) -> Vec<usize> {
717        let log_bytes = self.meta.primary_len;
718        self.open_options
719            .index_defs
720            .iter()
721            .enumerate()
722            .filter(|(i, def)| {
723                let indexed_bytes = Self::get_index_log_len(&self.indexes[*i], false).unwrap_or(0);
724                let lag_bytes = log_bytes.max(indexed_bytes) - indexed_bytes;
725                let lag_threshold = def.lag_threshold;
726                trace!(
727                    name = "Log::is_index_lagging",
728                    index_name = def.name.as_str(),
729                    lag = lag_bytes,
730                    threshold = lag_threshold
731                );
732                lag_bytes > lag_threshold
733            })
734            .map(|(i, _def)| i)
735            .collect()
736    }
737
738    /// Returns `true` if `sync` will load more data on disk.
739    ///
740    /// This function is optimized to be called frequently. It does not access
741    /// the filesystem directly, but communicate using a shared mmap buffer.
742    ///
743    /// This is not about testing buffered pending changes. To access buffered
744    /// pending changes, use [`Log::iter_dirty`] instead.
745    ///
746    /// For an in-memory [`Log`], this always returns `false`.
747    pub fn is_changed_on_disk(&self) -> bool {
748        match &self.change_detector {
749            None => false,
750            Some(detector) => detector.is_changed(),
751        }
752    }
753
754    /// Renamed. Use [`Log::sync`] instead.
755    pub fn flush(&mut self) -> crate::Result<u64> {
756        self.sync()
757    }
758
759    /// Convert a slice to [`Bytes`].
760    /// Do not copy the slice if it's from the main on-disk buffer.
761    pub fn slice_to_bytes(&self, slice: &[u8]) -> Bytes {
762        self.disk_buf.slice_to_bytes(slice)
763    }
764
765    /// Convert a slice to [`Bytes`].
766    /// Do not copy the slice if it's from the specified index buffer.
767    pub fn index_slice_to_bytes(&self, index_id: usize, slice: &[u8]) -> Bytes {
768        self.indexes[index_id].slice_to_bytes(slice)
769    }
770
771    /// Make sure on-disk indexes are up-to-date with the primary log, regardless
772    /// of `lag_threshold`.
773    ///
774    /// This is used internally by [`RotateLog`] to make sure a [`Log`] has
775    /// complete indexes before rotating.
776    pub(crate) fn finalize_indexes(&mut self, _lock: &ScopedDirLock) -> crate::Result<()> {
777        let result: crate::Result<_> = (|| {
778            let dir = self.dir.clone();
779            if let Some(dir) = dir.as_opt_path() {
780                if !self.mem_buf.is_empty() {
781                    return Err(crate::Error::programming(
782                        "sync() should be called before finalize_indexes()",
783                    ));
784                }
785
786                let _lock = ScopedDirLock::new(dir)?;
787
788                let meta = Self::load_or_create_meta(&self.dir, false)?;
789                // Only check primary_len, not meta.indexes. This is because
790                // meta.indexes can be updated on open. See D38261693 (test)
791                // and D20042046 (update index on open).
792                //
793                // More details:
794                // For RotateLog it has 2 levels of directories and locks, like:
795                // - rotate/lock: lock when RotateLog is writing
796                // - rotate/0/: Previous (considered by RotateLog as read-only) Log
797                // - rotate/1/: Previous (considered by RotateLog as read-only) Log
798                // - rotate/2/lock: lock when this Log is being written
799                // - rotate/2/: "Current" (writable) Log
800                //
801                // However, when opening rotate/0 as a Log, it might change the indexes
802                // without being noticed by other RotateLogs. If the indexes are updated,
803                // then the meta would be changed. The primary len is not changed, though.
804                if self.meta.primary_len != meta.primary_len || self.meta.epoch != meta.epoch {
805                    return Err(crate::Error::programming(format!(
806                        "race detected, callsite responsible for preventing races (old meta: {:?}, new meta: {:?})",
807                        &self.meta, &meta
808                    )));
809                }
810                self.meta = meta;
811
812                // Flush all indexes.
813                for i in 0..self.indexes.len() {
814                    let new_length = self.indexes[i].flush();
815                    let new_length = self.maybe_set_index_error(new_length.map_err(Into::into))?;
816                    let name = self.open_options.index_defs[i].metaname();
817                    self.meta.indexes.insert(name, new_length);
818                }
819
820                self.dir.write_meta(&self.meta, self.open_options.fsync)?;
821            }
822            Ok(())
823        })();
824        result
825            .context("in Log::finalize_indexes")
826            .context(|| format!("  Log.dir = {:?}", self.dir))
827    }
828
829    /// Rebuild indexes.
830    ///
831    /// If `force` is `false`, then indexes that pass the checksum check
832    /// will not be rebuilt. Otherwise, they will be rebuilt regardless.
833    ///
834    /// Setting `force` to `true` might reduce the size used by the index
835    /// files. But that is more expensive.
836    ///
837    /// The function consumes the [`Log`] object, since it is hard to recover
838    /// from an error case.
839    ///
840    /// Return message useful for human consumption.
841    pub fn rebuild_indexes(self, force: bool) -> crate::Result<String> {
842        let dir = self.dir.clone();
843        let result: crate::Result<_> = (|this: Log| {
844            if let Some(dir) = this.dir.clone().as_opt_path() {
845                let lock = ScopedDirLock::new(dir)?;
846                this.rebuild_indexes_with_lock(force, &lock)
847            } else {
848                Ok(String::new())
849            }
850        })(self);
851
852        result
853            .context(|| format!("in Log::rebuild_indexes(force={})", force))
854            .context(|| format!("  Log.dir = {:?}", dir))
855    }
856
857    fn rebuild_indexes_with_lock(
858        mut self,
859        force: bool,
860        _lock: &ScopedDirLock,
861    ) -> crate::Result<String> {
862        let mut message = String::new();
863        {
864            if let Some(ref dir) = self.dir.as_opt_path() {
865                for (i, def) in self.open_options.index_defs.iter().enumerate() {
866                    let name = def.name.as_str();
867
868                    if let Some(index) = &self.indexes.get(i) {
869                        let should_skip = if force {
870                            false
871                        } else {
872                            match Self::get_index_log_len(index, true) {
873                                Err(_) => false,
874                                Ok(len) => {
875                                    if len > self.meta.primary_len {
876                                        message += &format!(
877                                            "Index {:?} is incompatible with (truncated) log\n",
878                                            name
879                                        );
880                                        false
881                                    } else if index.verify().is_ok() {
882                                        message +=
883                                            &format!("Index {:?} passed integrity check\n", name);
884                                        true
885                                    } else {
886                                        message +=
887                                            &format!("Index {:?} failed integrity check\n", name);
888                                        false
889                                    }
890                                }
891                            }
892                        };
893                        if should_skip {
894                            continue;
895                        } else {
896                            // Replace the index with a dummy, empty one.
897                            //
898                            // This will munmap index files, which is required on
899                            // Windows to rewrite the index files. It's also the reason
900                            // why it's hard to recover from an error state.
901                            //
902                            // This is also why this function consumes the Log object.
903                            self.indexes[i] = index::OpenOptions::new().create_in_memory()?;
904                        }
905                    }
906
907                    let tmp = tempfile::NamedTempFile::new_in(dir).context(dir, || {
908                        format!("cannot create tempfile for rebuilding index {:?}", name)
909                    })?;
910                    let index_len = {
911                        let mut index = index::OpenOptions::new()
912                            .key_buf(Some(Arc::new(self.disk_buf.clone())))
913                            .open(tmp.path())?;
914                        Self::update_index_for_on_disk_entry_unchecked(
915                            &self.dir,
916                            &mut index,
917                            def,
918                            &self.disk_buf,
919                            self.meta.primary_len,
920                        )?;
921                        index.flush()?
922                    };
923
924                    // Before replacing the index, set its "logic length" to 0 so
925                    // readers won't get inconsistent view about index length and data.
926                    let meta_path = dir.join(META_FILE);
927                    self.meta.indexes.insert(def.metaname(), 0);
928                    self.meta
929                        .write_file(&meta_path, self.open_options.fsync)
930                        .context(|| format!("  before replacing index {:?})", name))?;
931
932                    let _ = utils::fix_perm_file(tmp.as_file(), false);
933
934                    let path = dir.join(def.filename());
935                    tmp.persist(&path).map_err(|e| {
936                        crate::Error::wrap(Box::new(e), || {
937                            format!("cannot persist tempfile to replace index {:?}", name)
938                        })
939                    })?;
940
941                    self.meta.indexes.insert(def.metaname(), index_len);
942                    self.meta
943                        .write_file(&meta_path, self.open_options.fsync)
944                        .context(|| format!("  after replacing index {:?}", name))?;
945                    message += &format!("Rebuilt index {:?}\n", name);
946                }
947            }
948        }
949
950        Ok(message)
951    }
952
953    /// Look up an entry using the given index. The `index_id` is the index of
954    /// `index_defs` passed to [`Log::open`].
955    ///
956    /// Return an iterator of `Result<&[u8]>`, in reverse insertion order.
957    pub fn lookup<K: AsRef<[u8]>>(&self, index_id: usize, key: K) -> crate::Result<LogLookupIter> {
958        let result: crate::Result<_> = (|| {
959            self.maybe_return_index_error()?;
960            if let Some(index) = self.indexes.get(index_id) {
961                assert!(!key.as_ref().is_empty());
962                let link_offset = index.get(&key)?;
963                let inner_iter = link_offset.values(index);
964                Ok(LogLookupIter {
965                    inner_iter,
966                    errored: false,
967                    log: self,
968                })
969            } else {
970                let msg = format!(
971                    "invalid index_id {} (len={}, path={:?})",
972                    index_id,
973                    self.indexes.len(),
974                    &self.dir
975                );
976                Err(crate::Error::programming(msg))
977            }
978        })();
979        result
980            .context(|| format!("in Log::lookup({}, {:?})", index_id, key.as_ref()))
981            .context(|| format!("  Log.dir = {:?}", self.dir))
982    }
983
984    /// Look up keys and entries using the given prefix.
985    /// The `index_id` is the index of `index_defs` passed to [`Log::open`].
986    ///
987    /// Return an iterator that yields `(key, iter)`, where `key` is the full
988    /// key, `iter` is [`LogLookupIter`] that allows iteration through matched
989    /// entries.
990    pub fn lookup_prefix<K: AsRef<[u8]>>(
991        &self,
992        index_id: usize,
993        prefix: K,
994    ) -> crate::Result<LogRangeIter> {
995        let prefix = prefix.as_ref();
996        let result: crate::Result<_> = (|| {
997            let index = self.indexes.get(index_id).unwrap();
998            let inner_iter = index.scan_prefix(prefix)?;
999            Ok(LogRangeIter {
1000                inner_iter,
1001                errored: false,
1002                log: self,
1003                index,
1004            })
1005        })();
1006        result
1007            .context(|| format!("in Log::lookup_prefix({}, {:?})", index_id, prefix))
1008            .context(|| format!("  Log.dir = {:?}", self.dir))
1009    }
1010
1011    /// Look up keys and entries by querying a specified index about a specified
1012    /// range.
1013    ///
1014    /// The `index_id` is the index of `index_defs` defined by [`OpenOptions`].
1015    ///
1016    /// Return an iterator that yields `(key, iter)`, where `key` is the full
1017    /// key, `iter` is [`LogLookupIter`] that allows iteration through entries
1018    /// matching that key.
1019    pub fn lookup_range<'a>(
1020        &self,
1021        index_id: usize,
1022        range: impl RangeBounds<&'a [u8]>,
1023    ) -> crate::Result<LogRangeIter> {
1024        let start = range.start_bound();
1025        let end = range.end_bound();
1026        let result: crate::Result<_> = (|| {
1027            let index = self.indexes.get(index_id).unwrap();
1028            let inner_iter = index.range((start, end))?;
1029            Ok(LogRangeIter {
1030                inner_iter,
1031                errored: false,
1032                log: self,
1033                index,
1034            })
1035        })();
1036        result
1037            .context(|| {
1038                format!(
1039                    "in Log::lookup_range({}, {:?} to {:?})",
1040                    index_id, start, end,
1041                )
1042            })
1043            .context(|| format!("  Log.dir = {:?}", self.dir))
1044    }
1045
1046    /// Look up keys and entries using the given hex prefix.
1047    /// The length of the hex string can be odd.
1048    ///
1049    /// Return an iterator that yields `(key, iter)`, where `key` is the full
1050    /// key, `iter` is [`LogLookupIter`] that allows iteration through matched
1051    /// entries.
1052    pub fn lookup_prefix_hex<K: AsRef<[u8]>>(
1053        &self,
1054        index_id: usize,
1055        hex_prefix: K,
1056    ) -> crate::Result<LogRangeIter> {
1057        let prefix = hex_prefix.as_ref();
1058        let result: crate::Result<_> = (|| {
1059            let index = self.indexes.get(index_id).unwrap();
1060            let inner_iter = index.scan_prefix_hex(prefix)?;
1061            Ok(LogRangeIter {
1062                inner_iter,
1063                errored: false,
1064                log: self,
1065                index,
1066            })
1067        })();
1068        result
1069            .context(|| format!("in Log::lookup_prefix_hex({}, {:?})", index_id, prefix))
1070            .context(|| format!("  Log.dir = {:?}", self.dir))
1071    }
1072
1073    /// Return an iterator for all entries.
1074    pub fn iter(&self) -> LogIter {
1075        LogIter {
1076            log: self,
1077            next_offset: PRIMARY_START_OFFSET,
1078            errored: false,
1079        }
1080    }
1081
1082    /// Return an iterator for in-memory entries that haven't been flushed to disk.
1083    ///
1084    /// For in-memory Logs, this is the same as [`Log::iter`].
1085    pub fn iter_dirty(&self) -> LogIter {
1086        LogIter {
1087            log: self,
1088            next_offset: self.meta.primary_len,
1089            errored: false,
1090        }
1091    }
1092
1093    /// Applies the given index function to the entry data and returns the index keys.
1094    pub fn index_func<'a>(
1095        &self,
1096        index_id: usize,
1097        entry: &'a [u8],
1098    ) -> crate::Result<Vec<Cow<'a, [u8]>>> {
1099        let index_def = self.get_index_def(index_id)?;
1100        let mut result = vec![];
1101        for output in (index_def.func)(entry).into_iter() {
1102            result.push(
1103                output
1104                    .into_cow(entry)
1105                    .context(|| format!("index_id = {}", index_id))?,
1106            );
1107        }
1108
1109        Ok(result)
1110    }
1111
1112    /// Return the fold state after calling `accumulate` on all (on-disk and
1113    /// in-memory) entries in insertion order.
1114    ///
1115    /// The fold function is the `fold_id`-th (0-based) `FoldDef` in
1116    /// [`OpenOptions`].
1117    pub fn fold(&self, fold_id: usize) -> crate::Result<&dyn Fold> {
1118        match self.all_folds.get(fold_id) {
1119            Some(f) => Ok(f.fold.as_ref()),
1120            None => Err(self.fold_out_of_bound(fold_id)),
1121        }
1122    }
1123
1124    fn fold_out_of_bound(&self, fold_id: usize) -> crate::Error {
1125        let msg = format!(
1126            "fold_id {} is out of bound (len={}, dir={:?})",
1127            fold_id,
1128            self.open_options.fold_defs.len(),
1129            &self.dir
1130        );
1131        crate::Error::programming(msg)
1132    }
1133
1134    /// Build in-memory index for the newly added entry.
1135    ///
1136    /// `offset` is the logical start offset of the entry.
1137    /// `data_offset` is the logical start offset of the real data (skips
1138    /// length, and checksum header in the entry).
1139    fn update_indexes_for_in_memory_entry(
1140        &mut self,
1141        data: &[u8],
1142        offset: u64,
1143        data_offset: u64,
1144    ) -> crate::Result<()> {
1145        let result = self.update_indexes_for_in_memory_entry_unchecked(data, offset, data_offset);
1146        self.maybe_set_index_error(result)
1147    }
1148
1149    /// Similar to `update_indexes_for_in_memory_entry`. But updates `fold` instead.
1150    fn update_fold_for_in_memory_entry(
1151        &mut self,
1152        data: &[u8],
1153        offset: u64,
1154        data_offset: u64,
1155    ) -> crate::Result<()> {
1156        for fold_state in self.all_folds.iter_mut() {
1157            fold_state.process_entry(data, offset, data_offset + data.len() as u64)?;
1158        }
1159        Ok(())
1160    }
1161
1162    fn update_indexes_for_in_memory_entry_unchecked(
1163        &mut self,
1164        data: &[u8],
1165        offset: u64,
1166        data_offset: u64,
1167    ) -> crate::Result<()> {
1168        for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
1169            for index_output in (def.func)(data) {
1170                match index_output {
1171                    IndexOutput::Reference(range) => {
1172                        assert!(range.start <= range.end && range.end <= data.len() as u64);
1173                        let start = range.start + data_offset;
1174                        let end = range.end + data_offset;
1175                        let key = InsertKey::Reference((start, end - start));
1176                        index.insert_advanced(key, InsertValue::Prepend(offset))?;
1177                    }
1178                    IndexOutput::Owned(key) => {
1179                        let key = InsertKey::Embed(&key);
1180                        index.insert_advanced(key, InsertValue::Prepend(offset))?;
1181                    }
1182                    IndexOutput::Remove(key) => {
1183                        index.remove(key)?;
1184                    }
1185                    IndexOutput::RemovePrefix(key) => {
1186                        index.remove_prefix(key)?;
1187                    }
1188                }
1189            }
1190        }
1191        Ok(())
1192    }
1193
1194    /// Incrementally update `disk_folds`.
1195    ///
1196    /// This is done by trying to reuse fold states from disk.
1197    /// If the on-disk fold state is outdated, then new fold states will be
1198    /// written to disk.
1199    fn update_and_flush_disk_folds(&mut self) -> crate::Result<()> {
1200        let mut folds = self.open_options.empty_folds();
1201        // Temporarily swap so `catch_up_with_log_on_disk_entries` can access `self`.
1202        std::mem::swap(&mut self.disk_folds, &mut folds);
1203        let result = (|| -> crate::Result<()> {
1204            for fold_state in folds.iter_mut() {
1205                fold_state.catch_up_with_log_on_disk_entries(self)?;
1206            }
1207            Ok(())
1208        })();
1209        self.disk_folds = folds;
1210        result
1211    }
1212
1213    /// Build in-memory index so they cover all entries stored in `self.disk_buf`.
1214    ///
1215    /// Returns number of entries built per index.
1216    fn update_indexes_for_on_disk_entries(&mut self) -> crate::Result<()> {
1217        let result = self.update_indexes_for_on_disk_entries_unchecked();
1218        self.maybe_set_index_error(result)
1219    }
1220
1221    fn update_indexes_for_on_disk_entries_unchecked(&mut self) -> crate::Result<()> {
1222        // It's a programming error to call this when mem_buf is not empty.
1223        for (index, def) in self.indexes.iter_mut().zip(&self.open_options.index_defs) {
1224            Self::update_index_for_on_disk_entry_unchecked(
1225                &self.dir,
1226                index,
1227                def,
1228                &self.disk_buf,
1229                self.meta.primary_len,
1230            )?;
1231        }
1232        Ok(())
1233    }
1234
1235    fn update_index_for_on_disk_entry_unchecked(
1236        path: &GenericPath,
1237        index: &mut Index,
1238        def: &IndexDef,
1239        disk_buf: &Bytes,
1240        primary_len: u64,
1241    ) -> crate::Result<usize> {
1242        // The index meta is used to store the next offset the index should be built.
1243        let mut offset = Self::get_index_log_len(index, true)?;
1244        // How many times the index function gets called?
1245        let mut count = 0;
1246        // PERF: might be worthwhile to cache xxhash verification result.
1247        while let Some(entry_result) =
1248            Self::read_entry_from_buf(path, disk_buf, offset).context(|| {
1249                format!(
1250                    "while updating index {:?} for on-disk entry at {}",
1251                    def.name, offset
1252                )
1253            })?
1254        {
1255            count += 1;
1256            let data = entry_result.data;
1257            for index_output in (def.func)(data) {
1258                match index_output {
1259                    IndexOutput::Reference(range) => {
1260                        assert!(range.start <= range.end && range.end <= data.len() as u64);
1261                        let start = range.start + entry_result.data_offset;
1262                        let end = range.end + entry_result.data_offset;
1263                        let key = InsertKey::Reference((start, end - start));
1264
1265                        index.insert_advanced(key, InsertValue::Prepend(offset))?;
1266                    }
1267                    IndexOutput::Owned(key) => {
1268                        let key = InsertKey::Embed(&key);
1269                        index.insert_advanced(key, InsertValue::Prepend(offset))?;
1270                    }
1271                    IndexOutput::Remove(key) => {
1272                        index.remove(key)?;
1273                    }
1274                    IndexOutput::RemovePrefix(key) => {
1275                        index.remove_prefix(key)?;
1276                    }
1277                }
1278            }
1279            offset = entry_result.next_offset;
1280        }
1281        // The index now contains all entries. Write "next_offset" as the index meta.
1282        Self::set_index_log_len(std::iter::once(index), primary_len);
1283
1284        Ok(count)
1285    }
1286
1287    /// Read [`LogMetadata`] from the given directory. If `create` is `true`,
1288    /// create an empty one on demand.
1289    ///
1290    /// The caller should ensure the directory exists and take a lock on it to
1291    /// avoid filesystem races.
1292    pub(crate) fn load_or_create_meta(
1293        path: &GenericPath,
1294        create: bool,
1295    ) -> crate::Result<LogMetadata> {
1296        Self::load_or_create_meta_internal(path, create)
1297    }
1298
1299    pub(crate) fn load_or_create_meta_internal(
1300        path: &GenericPath,
1301        create: bool,
1302    ) -> crate::Result<LogMetadata> {
1303        match path.read_meta() {
1304            Err(err) => {
1305                if err.io_error_kind() == io::ErrorKind::NotFound && create {
1306                    let dir = path.as_opt_path().unwrap();
1307                    // Create (and truncate) the primary log and indexes.
1308                    let primary_path = dir.join(PRIMARY_FILE);
1309                    let mut primary_file =
1310                        File::create(&primary_path).context(&primary_path, "cannot create")?;
1311                    primary_file
1312                        .write_all(PRIMARY_HEADER)
1313                        .context(&primary_path, "cannot write")?;
1314                    let _ = utils::fix_perm_file(&primary_file, false);
1315                    // Start from empty file and indexes.
1316                    let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
1317                    // An empty meta file is easy to recreate. No need to use fsync.
1318                    path.write_meta(&meta, false)?;
1319                    Ok(meta)
1320                } else {
1321                    Err(err)
1322                }
1323            }
1324            Ok(meta) => Ok(meta),
1325        }
1326    }
1327
1328    /// Read `(log.disk_buf, indexes)` from the directory using the metadata.
1329    ///
1330    /// If `reuse_indexes` is not None, they are existing indexes that match `index_defs`
1331    /// order. This should only be used in `sync` code path when the on-disk `meta` matches
1332    /// the in-memory `meta`. Otherwise it is not a sound use.
1333    ///
1334    /// The indexes loaded by this function can be lagging.
1335    /// Use `update_indexes_for_on_disk_entries` to update them.
1336    fn load_log_and_indexes(
1337        dir: &GenericPath,
1338        meta: &LogMetadata,
1339        index_defs: &[IndexDef],
1340        mem_buf: &Pin<Box<Vec<u8>>>,
1341        reuse_indexes: Option<&Vec<Index>>,
1342        fsync: bool,
1343    ) -> crate::Result<(Bytes, Vec<Index>)> {
1344        let primary_buf = match dir.as_opt_path() {
1345            Some(dir) => mmap_path(&dir.join(PRIMARY_FILE), meta.primary_len)?,
1346            None => Bytes::new(),
1347        };
1348
1349        let mem_buf: &Vec<u8> = mem_buf;
1350        let mem_buf: *const Vec<u8> = mem_buf as *const Vec<u8>;
1351        let key_buf = Arc::new(ExternalKeyBuffer {
1352            disk_buf: primary_buf.clone(),
1353            disk_len: meta.primary_len,
1354            mem_buf,
1355        });
1356
1357        let indexes = match reuse_indexes {
1358            None => {
1359                // No indexes are reused, reload them.
1360                let mut indexes = Vec::with_capacity(index_defs.len());
1361                for def in index_defs.iter() {
1362                    let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
1363                    indexes.push(Self::load_index(
1364                        dir,
1365                        def,
1366                        index_len,
1367                        key_buf.clone(),
1368                        fsync,
1369                    )?);
1370                }
1371                indexes
1372            }
1373            Some(indexes) => {
1374                assert_eq!(index_defs.len(), indexes.len());
1375                let mut new_indexes = Vec::with_capacity(indexes.len());
1376                // Avoid reloading the index from disk.
1377                // Update their ExternalKeyBuffer so they have the updated meta.primary_len.
1378                for (index, def) in indexes.iter().zip(index_defs) {
1379                    let index_len = meta.indexes.get(&def.metaname()).cloned().unwrap_or(0);
1380                    let index = if index_len > Self::get_index_log_len(index, true).unwrap_or(0) {
1381                        Self::load_index(dir, def, index_len, key_buf.clone(), fsync)?
1382                    } else {
1383                        let mut index = index.try_clone()?;
1384                        index.key_buf = key_buf.clone();
1385                        index
1386                    };
1387                    new_indexes.push(index);
1388                }
1389                new_indexes
1390            }
1391        };
1392        Ok((primary_buf, indexes))
1393    }
1394
1395    /// Return the reference to the [`GenericPath`] used to crate the [`Log`].
1396    pub fn path(&self) -> &GenericPath {
1397        &self.dir
1398    }
1399
1400    /// Return the version in `(epoch, length)` form.
1401    ///
1402    /// The version is maintained exclusively by indexedlog and cannot be
1403    /// changed directly via public APIs. Appending data bumps `length`.
1404    /// Rewriting data changes `epoch`.
1405    ///
1406    /// See also [`crate::multi::MultiLog::version`].
1407    pub fn version(&self) -> (u64, u64) {
1408        (self.meta.epoch, self.meta.primary_len)
1409    }
1410
1411    /// Load a single index.
1412    fn load_index(
1413        dir: &GenericPath,
1414        def: &IndexDef,
1415        len: u64,
1416        buf: Arc<dyn ReadonlyBuffer + Send + Sync>,
1417        fsync: bool,
1418    ) -> crate::Result<Index> {
1419        match dir.as_opt_path() {
1420            Some(dir) => {
1421                let path = dir.join(def.filename());
1422                index::OpenOptions::new()
1423                    .checksum_chunk_size_logarithm(INDEX_CHECKSUM_CHUNK_SIZE_LOGARITHM)
1424                    .logical_len(Some(len))
1425                    .key_buf(Some(buf))
1426                    .fsync(fsync)
1427                    .open(path)
1428            }
1429            None => index::OpenOptions::new()
1430                .logical_len(Some(len))
1431                .key_buf(Some(buf))
1432                .fsync(fsync)
1433                .create_in_memory(),
1434        }
1435    }
1436
1437    /// Read the entry at the given offset. Return `None` if offset is out of bound, or the content
1438    /// of the data, the real offset of the data, and the next offset. Raise errors if
1439    /// integrity-check failed.
1440    fn read_entry(&self, offset: u64) -> crate::Result<Option<EntryResult>> {
1441        let result = if offset < self.meta.primary_len {
1442            let entry = Self::read_entry_from_buf(&self.dir, &self.disk_buf, offset)?;
1443            if let Some(ref entry) = entry {
1444                crate::page_out::adjust_available(-(entry.data.len() as i64));
1445            }
1446            entry
1447        } else {
1448            let offset = offset - self.meta.primary_len;
1449            if offset >= self.mem_buf.len() as u64 {
1450                return Ok(None);
1451            }
1452            Self::read_entry_from_buf(&self.dir, &self.mem_buf, offset)?
1453                .map(|entry_result| entry_result.offset(self.meta.primary_len))
1454        };
1455        Ok(result)
1456    }
1457
1458    /// Read an entry at the given offset of the given buffer. Verify its integrity. Return the
1459    /// data, the real data offset, and the next entry offset. Return None if the offset is at
1460    /// the end of the buffer.  Raise errors if there are integrity check issues.
1461    fn read_entry_from_buf<'a>(
1462        path: &GenericPath,
1463        buf: &'a [u8],
1464        offset: u64,
1465    ) -> crate::Result<Option<EntryResult<'a>>> {
1466        let data_error = |msg: String| -> crate::Error {
1467            match path.as_opt_path() {
1468                Some(path) => crate::Error::corruption(path, msg),
1469                None => crate::Error::path(Path::new("<memory>"), msg),
1470            }
1471        };
1472
1473        use std::cmp::Ordering::Equal;
1474        use std::cmp::Ordering::Greater;
1475        match offset.cmp(&(buf.len() as u64)) {
1476            Equal => return Ok(None),
1477            Greater => {
1478                let msg = format!("read offset {} exceeds buffer size {}", offset, buf.len());
1479                return Err(data_error(msg));
1480            }
1481            _ => {}
1482        }
1483
1484        let (entry_flags, vlq_len): (u32, _) = buf.read_vlq_at(offset as usize).map_err(|e| {
1485            crate::Error::wrap(Box::new(e), || {
1486                format!("cannot read entry_flags at {}", offset)
1487            })
1488            .mark_corruption()
1489        })?;
1490        let offset = offset + vlq_len as u64;
1491
1492        // For now, data_len is the next field regardless of entry flags.
1493        let (data_len, vlq_len): (u64, _) = buf.read_vlq_at(offset as usize).map_err(|e| {
1494            crate::Error::wrap(Box::new(e), || {
1495                format!("cannot read data_len at {}", offset)
1496            })
1497            .mark_corruption()
1498        })?;
1499        let offset = offset + vlq_len as u64;
1500
1501        // Depends on entry_flags, some of them have a checksum field.
1502        let checksum_flags = entry_flags & (ENTRY_FLAG_HAS_XXHASH64 | ENTRY_FLAG_HAS_XXHASH32);
1503        let (checksum, offset) = match checksum_flags {
1504            ENTRY_FLAG_HAS_XXHASH64 => {
1505                let checksum = LittleEndian::read_u64(
1506                    buf.get(offset as usize..offset as usize + 8)
1507                        .ok_or_else(|| {
1508                            data_error(format!("xxhash cannot be read at {}", offset))
1509                        })?,
1510                );
1511                (checksum, offset + 8)
1512            }
1513            ENTRY_FLAG_HAS_XXHASH32 => {
1514                let checksum = LittleEndian::read_u32(
1515                    buf.get(offset as usize..offset as usize + 4)
1516                        .ok_or_else(|| {
1517                            data_error(format!("xxhash32 cannot be read at {}", offset))
1518                        })?,
1519                ) as u64;
1520                (checksum, offset + 4)
1521            }
1522            _ => {
1523                return Err(data_error(format!(
1524                    "entry at {} has malformed checksum metadata",
1525                    offset
1526                )));
1527            }
1528        };
1529
1530        // Read the actual payload
1531        let end = offset + data_len;
1532        if end > buf.len() as u64 {
1533            return Err(data_error(format!("incomplete entry data at {}", offset)));
1534        }
1535        let data = &buf[offset as usize..end as usize];
1536
1537        let verified = match checksum_flags {
1538            0 => true,
1539            ENTRY_FLAG_HAS_XXHASH64 => xxhash(data) == checksum,
1540            ENTRY_FLAG_HAS_XXHASH32 => xxhash32(data) as u64 == checksum,
1541            // Tested above. Therefore unreachable.
1542            _ => unreachable!(),
1543        };
1544        if verified {
1545            Ok(Some(EntryResult {
1546                data,
1547                data_offset: offset,
1548                next_offset: end,
1549            }))
1550        } else {
1551            Err(data_error(format!("integrity check failed at {}", offset)))
1552        }
1553    }
1554
1555    /// Wrapper around a `Result` returned by an index write operation.
1556    /// Make sure all index write operations are wrapped by this method.
1557    #[inline]
1558    fn maybe_set_index_error<T>(&mut self, result: crate::Result<T>) -> crate::Result<T> {
1559        if result.is_err() && !self.index_corrupted {
1560            self.index_corrupted = true;
1561        }
1562        result
1563    }
1564
1565    /// Wrapper to return an error if `index_corrupted` is set.
1566    /// Use this before doing index read operations.
1567    #[inline]
1568    fn maybe_return_index_error(&self) -> crate::Result<()> {
1569        if self.index_corrupted {
1570            let msg = "index is corrupted".to_string();
1571            Err(self.corruption(msg))
1572        } else {
1573            Ok(())
1574        }
1575    }
1576
1577    /// Get the log length (in bytes) covered by the given index.
1578    ///
1579    /// This only makes sense at open() or sync() time, since the data won't be updated
1580    /// by append() for performance reasons.
1581    ///
1582    /// `consider_dirty` specifies whether dirty entries in the Index are
1583    /// considered. It should be `true` for writing use-cases, since indexing
1584    /// an entry twice is an error. It can be set to `false` for detecting
1585    /// lags.
1586    fn get_index_log_len(index: &Index, consider_dirty: bool) -> crate::Result<u64> {
1587        let index_meta = if consider_dirty {
1588            index.get_meta()
1589        } else {
1590            index.get_original_meta()
1591        };
1592        Ok(if index_meta.is_empty() {
1593            // New index. Start processing at the first entry.
1594            PRIMARY_START_OFFSET
1595        } else {
1596            index_meta
1597                .read_vlq_at(0)
1598                .context(&index.path, || {
1599                    format!(
1600                        "index metadata cannot be parsed as an integer: {:?}",
1601                        &index_meta
1602                    )
1603                })?
1604                .0
1605        })
1606    }
1607
1608    /// Update the log length (in bytes) covered by the given indexes.
1609    ///
1610    /// `len` is usually `meta.primary_len`.
1611    fn set_index_log_len<'a>(indexes: impl Iterator<Item = &'a mut Index>, len: u64) {
1612        let mut index_meta = Vec::new();
1613        index_meta.write_vlq(len).unwrap();
1614        for index in indexes {
1615            index.set_meta(&index_meta);
1616        }
1617    }
1618}
1619
1620// Error-related utilities
1621
1622impl Log {
1623    /// Get the specified index, with error handling.
1624    fn get_index(&self, index_id: usize) -> crate::Result<&Index> {
1625        self.indexes.get(index_id).ok_or_else(|| {
1626            let msg = format!(
1627                "index_id {} is out of bound (len={}, dir={:?})",
1628                index_id,
1629                self.indexes.len(),
1630                &self.dir
1631            );
1632            crate::Error::programming(msg)
1633        })
1634    }
1635
1636    /// Get the specified index, with error handling.
1637    fn get_index_def(&self, index_id: usize) -> crate::Result<&IndexDef> {
1638        self.open_options.index_defs.get(index_id).ok_or_else(|| {
1639            let msg = format!(
1640                "index_id {} is out of bound (len={}, dir={:?})",
1641                index_id,
1642                self.indexes.len(),
1643                &self.dir
1644            );
1645            crate::Error::programming(msg)
1646        })
1647    }
1648
1649    fn corruption(&self, message: String) -> crate::Error {
1650        let path: &Path = match self.dir.as_opt_path() {
1651            Some(path) => path,
1652            None => Path::new("<memory>"),
1653        };
1654        crate::Error::corruption(path, message)
1655    }
1656}
1657
1658/// "Pointer" to an entry. Used internally.
1659struct EntryResult<'a> {
1660    data: &'a [u8],
1661    data_offset: u64,
1662    next_offset: u64,
1663}
1664
1665impl<'a> EntryResult<'a> {
1666    /// Add some value to `next_offset`.
1667    fn offset(self, offset: u64) -> EntryResult<'a> {
1668        EntryResult {
1669            data: self.data,
1670            // `data_offset` is relative to the current buffer (disk_buf, or mem_buf).
1671            // So it does not need to be changed.
1672            data_offset: self.data_offset,
1673            next_offset: self.next_offset + offset,
1674        }
1675    }
1676}
1677
1678impl<'a> Iterator for LogLookupIter<'a> {
1679    type Item = crate::Result<&'a [u8]>;
1680
1681    fn next(&mut self) -> Option<Self::Item> {
1682        if self.errored {
1683            return None;
1684        }
1685        match self.inner_iter.next() {
1686            None => None,
1687            Some(Err(err)) => {
1688                self.errored = true;
1689                Some(Err(err))
1690            }
1691            Some(Ok(offset)) => match self
1692                .log
1693                .read_entry(offset)
1694                .context("in LogLookupIter::next")
1695            {
1696                Ok(Some(entry)) => Some(Ok(entry.data)),
1697                Ok(None) => None,
1698                Err(err) => {
1699                    // Do not set this iterator to an error state. It's possible
1700                    // that the index iterator still provides valid data, and
1701                    // only the "log" portion is corrupted.
1702                    //
1703                    // The index iterator is finite if integrity check is turned
1704                    // on. So trust it and don't worry about infinite iteration
1705                    // here.
1706                    Some(Err(err))
1707                }
1708            },
1709        }
1710    }
1711}
1712
1713impl<'a> LogLookupIter<'a> {
1714    /// A convenient way to get data.
1715    pub fn into_vec(self) -> crate::Result<Vec<&'a [u8]>> {
1716        self.collect()
1717    }
1718
1719    pub fn is_empty(&self) -> bool {
1720        self.inner_iter.is_empty()
1721    }
1722}
1723
1724impl<'a> Iterator for LogIter<'a> {
1725    type Item = crate::Result<&'a [u8]>;
1726
1727    fn next(&mut self) -> Option<Self::Item> {
1728        if self.errored {
1729            return None;
1730        }
1731        match self
1732            .log
1733            .read_entry(self.next_offset)
1734            .context("in LogIter::next")
1735        {
1736            Err(e) => {
1737                self.errored = true;
1738                Some(Err(e))
1739            }
1740            Ok(Some(entry_result)) => {
1741                assert!(entry_result.next_offset > self.next_offset);
1742                self.next_offset = entry_result.next_offset;
1743                Some(Ok(entry_result.data))
1744            }
1745            Ok(None) => None,
1746        }
1747    }
1748}
1749
1750impl<'a> LogRangeIter<'a> {
1751    /// Wrap `next()` or `next_back()` result by the inner iterator.
1752    fn wrap_inner_next_result(
1753        &mut self,
1754        item: Option<crate::Result<(Cow<'a, [u8]>, index::LinkOffset)>>,
1755    ) -> Option<crate::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>> {
1756        match item {
1757            None => None,
1758            Some(Err(err)) => {
1759                self.errored = true;
1760                Some(Err(err))
1761            }
1762            Some(Ok((key, link_offset))) => {
1763                let iter = LogLookupIter {
1764                    inner_iter: link_offset.values(self.index),
1765                    errored: false,
1766                    log: self.log,
1767                };
1768                Some(Ok((key, iter)))
1769            }
1770        }
1771    }
1772}
1773
1774impl<'a> Iterator for LogRangeIter<'a> {
1775    type Item = crate::Result<(Cow<'a, [u8]>, LogLookupIter<'a>)>;
1776
1777    fn next(&mut self) -> Option<Self::Item> {
1778        if self.errored {
1779            return None;
1780        }
1781        let inner = self.inner_iter.next();
1782        self.wrap_inner_next_result(inner)
1783    }
1784}
1785
1786impl<'a> DoubleEndedIterator for LogRangeIter<'a> {
1787    fn next_back(&mut self) -> Option<Self::Item> {
1788        if self.errored {
1789            return None;
1790        }
1791        let inner = self.inner_iter.next_back();
1792        self.wrap_inner_next_result(inner)
1793    }
1794}
1795
1796impl Debug for Log {
1797    fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
1798        let mut count = 0;
1799        let mut iter = self.iter();
1800        let bytes_per_line = 16;
1801        loop {
1802            let offset = iter.next_offset;
1803            count += 1;
1804            match iter.next() {
1805                None => break,
1806                Some(Ok(bytes)) => {
1807                    if count > 1 {
1808                        write!(f, "\n")?;
1809                    }
1810                    write!(f, "# Entry {}:\n", count)?;
1811                    for (i, chunk) in bytes.chunks(bytes_per_line).enumerate() {
1812                        write!(f, "{:08x}:", offset as usize + i * bytes_per_line)?;
1813                        for b in chunk {
1814                            write!(f, " {:02x}", b)?;
1815                        }
1816                        for _ in chunk.len()..bytes_per_line {
1817                            write!(f, "   ")?;
1818                        }
1819                        write!(f, "  ")?;
1820                        for &b in chunk {
1821                            let ch = match b {
1822                                0x20..=0x7e => b as char, // printable
1823                                _ => '.',
1824                            };
1825                            write!(f, "{}", ch)?;
1826                        }
1827                        write!(f, "\n")?;
1828                    }
1829                }
1830                Some(Err(err)) => writeln!(f, "# Error: {:?}", err)?,
1831            }
1832        }
1833        Ok(())
1834    }
1835}
1836
1837impl ReadonlyBuffer for ExternalKeyBuffer {
1838    #[inline]
1839    fn slice(&self, start: u64, len: u64) -> Option<&[u8]> {
1840        if start < self.disk_len {
1841            self.disk_buf.get((start as usize)..(start + len) as usize)
1842        } else {
1843            let start = start - self.disk_len;
1844            // See "UNSAFE NOTICE" in ExternalKeyBuffer definition.
1845            // This pointer cannot be null.
1846            let mem_buf = unsafe { &*self.mem_buf };
1847            mem_buf.get((start as usize)..(start + len) as usize)
1848        }
1849    }
1850}