indexedlog/log/
open_options.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::borrow::Cow;
9use std::fmt;
10use std::fmt::Debug;
11use std::ops::Range;
12use std::sync::Arc;
13
14use tracing::debug_span;
15
16use super::fold::Fold;
17use super::fold::FoldDef;
18use super::fold::FoldState;
19use crate::errors::ResultExt;
20use crate::index::Index;
21use crate::lock::ScopedDirLock;
22use crate::lock::READER_LOCK_OPTS;
23use crate::log::GenericPath;
24use crate::log::Log;
25use crate::log::LogMetadata;
26use crate::log::PRIMARY_START_OFFSET;
27
28const INDEX_FILE_PREFIX: &str = "index2-";
29const META_PREFIX: &str = "2-";
30
31/// Definition of an index. It includes: name, function to extract index keys,
32/// and how much the index can lag on disk.
33#[derive(Clone)]
34pub struct IndexDef {
35    /// Function to extract index keys from an entry.
36    ///
37    /// The input is bytes of an entry (ex. the data passed to [`Log::append`]).
38    /// The output is an array of index keys. An entry can have zero or more
39    /// than one index keys for a same index.
40    ///
41    /// The output can be an allocated slice of bytes, or a reference to offsets
42    /// in the input. See [`IndexOutput`] for details.
43    ///
44    /// The function should be pure and fast. i.e. It should not use inputs
45    /// from other things, like the network, filesystem, or an external random
46    /// generator.
47    ///
48    /// For example, if the [`Log`] is to store git commits, and the index is to
49    /// help finding child commits given parent commit hashes as index keys.
50    /// This function gets the commit metadata as input. It then parses the
51    /// input, and extract parent commit hashes as the output. A git commit can
52    /// have 0 or 1 or 2 or even more parents. Therefore the output is a [`Vec`].
53    pub(crate) func: Arc<dyn Fn(&[u8]) -> Vec<IndexOutput> + Send + Sync + 'static>,
54
55    /// Name of the index.
56    ///
57    /// The name will be used as part of the index file name. Therefore do not
58    /// use user-generated content here. And do not abuse this by using `..` or `/`.
59    ///
60    /// When adding new or changing index functions, make sure a different
61    /// `name` is used so the existing index won't be reused incorrectly.
62    pub(crate) name: Arc<String>,
63
64    /// How many bytes (as counted in the file backing [`Log`]) could be left not
65    /// indexed on-disk.
66    ///
67    /// This is related to [`Index`] implementation detail. Since it's append-only
68    /// and needs to write `O(log N)` data for updating a single entry. Allowing
69    /// lagged indexes reduces writes and saves disk space.
70    ///
71    /// The lagged part of the index will be built on-demand in-memory by
72    /// [`Log::open`].
73    ///
74    /// Practically, this correlates to how fast `func` is.
75    pub(crate) lag_threshold: u64,
76}
77
78/// Output of an index function. Bytes that can be used for lookups.
79pub enum IndexOutput {
80    /// The index key is a slice, relative to the data entry (ex. input of the
81    /// index function).
82    ///
83    /// Use this if possible. It generates smaller indexes.
84    Reference(Range<u64>),
85
86    /// The index key is a separate sequence of bytes unrelated to the input
87    /// bytes.
88    ///
89    /// Use this if the index key is not in the entry. For example, if the entry
90    /// is compressed.
91    Owned(Box<[u8]>),
92
93    /// Remove all values associated with the key in the index.
94    ///
95    /// This only affects the index. The entry is not removed in the log.
96    Remove(Box<[u8]>),
97
98    /// Remove all values associated with all keys with the given prefix in the index.
99    ///
100    /// This only affects the index. The entry is not removed in the log.
101    RemovePrefix(Box<[u8]>),
102}
103
104/// What checksum function to use for an entry.
105#[derive(Copy, Clone, Debug, PartialEq)]
106pub enum ChecksumType {
107    /// Choose xxhash64 or xxhash32 automatically based on data size.
108    Auto,
109
110    /// Use xxhash64 checksum algorithm. Efficient on 64bit platforms.
111    Xxhash64,
112
113    /// Use xxhash32 checksum algorithm. It is slower than xxhash64 for 64bit
114    /// platforms, but takes less space. Perhaps a good fit when entries are
115    /// short.
116    Xxhash32,
117}
118
119/// Options used to configured how an [`Log`] is opened.
120#[derive(Clone)]
121pub struct OpenOptions {
122    pub(crate) index_defs: Vec<IndexDef>,
123    pub(crate) fold_defs: Vec<FoldDef>,
124    pub(crate) create: bool,
125    pub(crate) checksum_type: ChecksumType,
126    pub(crate) flush_filter: Option<FlushFilterFunc>,
127    pub(crate) fsync: bool,
128    pub(crate) auto_sync_threshold: Option<u64>,
129}
130
131pub type FlushFilterFunc =
132    fn(
133        &FlushFilterContext,
134        &[u8],
135    ) -> Result<FlushFilterOutput, Box<dyn std::error::Error + Send + Sync + 'static>>;
136
137/// Potentially useful context for the flush filter function.
138pub struct FlushFilterContext<'a> {
139    /// The [`log`] being flushed.
140    pub log: &'a Log,
141}
142
143/// Output of a flush filter.
144pub enum FlushFilterOutput {
145    /// Insert the entry as is.
146    Keep,
147
148    /// Remove this entry.
149    Drop,
150
151    /// Replace this entry with the specified new content.
152    Replace(Vec<u8>),
153}
154
155impl IndexDef {
156    /// Create an index definition.
157    ///
158    /// `index_func` is the function to extract index keys from an entry.
159    ///
160    /// The input is bytes of an entry (ex. the data passed to [`Log::append`]).
161    /// The output is an array of index keys. An entry can have zero or more
162    /// than one index keys for a same index.
163    ///
164    /// The output can be an allocated slice of bytes, or a reference to offsets
165    /// in the input. See [`IndexOutput`] for details.
166    ///
167    /// The function should be pure and fast. i.e. It should not use inputs
168    /// from other things, like the network, filesystem, or an external random
169    /// generator.
170    ///
171    /// For example, if the [`Log`] is to store git commits, and the index is to
172    /// help finding child commits given parent commit hashes as index keys.
173    /// This function gets the commit metadata as input. It then parses the
174    /// input, and extract parent commit hashes as the output. A git commit can
175    /// have 0 or 1 or 2 or even more parents. Therefore the output is a [`Vec`].
176    ///
177    /// `name` is the name of the index.
178    ///
179    /// The name will be used as part of the index file name. Therefore do not
180    /// use user-generated content here. And do not abuse this by using `..` or `/`.
181    ///
182    /// When adding new or changing index functions, make sure a different
183    /// `name` is used so the existing index won't be reused incorrectly.
184    pub fn new(
185        name: impl ToString,
186        index_func: impl Fn(&[u8]) -> Vec<IndexOutput> + Send + Sync + 'static,
187    ) -> Self {
188        Self {
189            func: Arc::new(index_func),
190            name: Arc::new(name.to_string()),
191            // For a typical commit hash index (20-byte). IndexedLog insertion
192            // overhead is about 1500 entries per millisecond. For other things
193            // the xxhash check might take some time. 500 entries takes <1ms
194            // for commit hash index, and might be okay for non-commit-hash
195            // indexes. Users should customize the value if the default is not
196            // good enough.
197            lag_threshold: 25 * 500,
198        }
199    }
200
201    /// Set how many bytes (as counted in the file backing [`Log`]) could be left
202    /// not indexed on-disk.
203    ///
204    /// This is related to [`Index`] implementation detail. Since it's append-only
205    /// and needs to write `O(log N)` data for updating a single entry. Allowing
206    /// lagged indexes reduces writes and saves disk space.
207    ///
208    /// The lagged part of the index will be built on-demand in-memory by
209    /// [`Log::open`].
210    ///
211    /// Practically, this correlates to how fast `func` is.
212    pub fn lag_threshold(self, lag_threshold: u64) -> Self {
213        Self {
214            func: self.func,
215            name: self.name,
216            lag_threshold,
217        }
218    }
219
220    /// Name used in log metadata.
221    pub(crate) fn metaname(&self) -> String {
222        format!("{}{}", META_PREFIX, self.name)
223    }
224
225    /// Name used in filesystem.
226    pub(crate) fn filename(&self) -> String {
227        format!("{}{}", INDEX_FILE_PREFIX, self.name)
228    }
229}
230
231impl OpenOptions {
232    #[allow(clippy::new_without_default)]
233    /// Creates a blank new set of options ready for configuration.
234    ///
235    /// `create` is initially `false`.
236    /// `fsync` is initially `false`.
237    /// `index_defs` is initially empty.
238    /// `auto_sync_threshold` is initially `None`.
239    pub fn new() -> Self {
240        Self {
241            create: false,
242            index_defs: Vec::new(),
243            fold_defs: Vec::new(),
244            checksum_type: ChecksumType::Auto,
245            flush_filter: None,
246            fsync: false,
247            auto_sync_threshold: None,
248        }
249    }
250
251    /// Set fsync behavior.
252    ///
253    /// If true, then [`Log::sync`] will use `fsync` to flush log and index
254    /// data to the physical device before returning.
255    pub fn fsync(mut self, fsync: bool) -> Self {
256        self.fsync = fsync;
257        self
258    }
259
260    /// Add an index function.
261    ///
262    /// This is a convenient way to define indexes without using [`IndexDef`]
263    /// explicitly.
264    pub fn index(mut self, name: &'static str, func: fn(&[u8]) -> Vec<IndexOutput>) -> Self {
265        self.index_defs.push(IndexDef::new(name, func));
266        self
267    }
268
269    /// Add a "fold" definition. See [`FoldDef`] and [`Fold`] for details.
270    pub fn fold_def(mut self, name: &'static str, create_fold: fn() -> Box<dyn Fold>) -> Self {
271        self.fold_defs.push(FoldDef::new(name, create_fold));
272        self
273    }
274
275    /// Sets index definitions.
276    ///
277    /// See [`IndexDef::new`] for details.
278    pub fn index_defs(mut self, index_defs: Vec<IndexDef>) -> Self {
279        self.index_defs = index_defs;
280        self
281    }
282
283    /// Sets the option for whether creating a new [`Log`] if it does not exist.
284    ///
285    /// If set to `true`, [`OpenOptions::open`] will create the [`Log`] on demand if
286    /// it does not already exist. If set to `false`, [`OpenOptions::open`] will
287    /// fail if the log does not exist.
288    pub fn create(mut self, create: bool) -> Self {
289        self.create = create;
290        self
291    }
292
293    /// Sets whether to call [`Log::sync`] automatically when the in-memory
294    /// buffer exceeds some size threshold.
295    /// - `None`: Do not call `sync` automatically.
296    /// - `Some(size)`: Call `sync` when the in-memory buffer exceeds `size`.
297    /// - `Some(0)`: Call `sync` after every `append` automatically.
298    pub fn auto_sync_threshold(mut self, threshold: impl Into<Option<u64>>) -> Self {
299        self.auto_sync_threshold = threshold.into();
300        self
301    }
302
303    /// Sets the checksum type.
304    ///
305    /// See [`ChecksumType`] for details.
306    pub fn checksum_type(mut self, checksum_type: ChecksumType) -> Self {
307        self.checksum_type = checksum_type;
308        self
309    }
310
311    /// Sets the flush filter function.
312    ///
313    /// The function will be called at [`Log::sync`] time, if there are
314    /// changes to the `log` since `open` (or last `sync`) time.
315    ///
316    /// The filter function can be used to avoid writing content that already
317    /// exists in the [`Log`], or rewrite content as needed.
318    pub fn flush_filter(mut self, flush_filter: Option<FlushFilterFunc>) -> Self {
319        self.flush_filter = flush_filter;
320        self
321    }
322
323    /// Remove index lagging.
324    ///
325    /// Used by `RotateLog` to make sure old logs have complete indexes.
326    pub(crate) fn with_zero_index_lag(mut self) -> Self {
327        for def in self.index_defs.iter_mut() {
328            def.lag_threshold = 0;
329        }
330        self
331    }
332
333    /// Construct [`Log`] at given directory. Incrementally build up specified
334    /// indexes.
335    ///
336    /// If the directory does not exist and `create` is set to `true`, it will
337    /// be created with essential files populated. After that, an empty [`Log`]
338    /// will be returned. Otherwise, `open` will fail.
339    ///
340    /// See [`IndexDef`] for index definitions. Indexes can be added, removed, or
341    /// reordered, as long as a same `name` indicates a same index function.
342    /// That is, when an index function is changed, the caller is responsible
343    /// for changing the index name.
344    ///
345    /// Driven by the "immutable by default" idea, together with append-only
346    /// properties, this structure is different from some traditional *mutable*
347    /// databases backed by the filesystem:
348    /// - Data are kind of "snapshotted and frozen" at open time. Mutating
349    ///   files do not affect the view of instantiated [`Log`]s.
350    /// - Writes are buffered until [`Log::sync`] is called.
351    /// This maps to traditional "database transaction" concepts: a [`Log`] is
352    /// always bounded to a transaction. [`Log::sync`] is like committing the
353    /// transaction. Dropping the [`Log`] instance is like abandoning a
354    /// transaction.
355    pub fn open(&self, dir: impl Into<GenericPath>) -> crate::Result<Log> {
356        let dir = dir.into();
357        match dir.as_opt_path() {
358            None => self.create_in_memory(dir),
359            Some(fs_dir) => {
360                let span = debug_span!("Log::open", dir = &fs_dir.to_string_lossy().as_ref());
361                let _guard = span.enter();
362                self.open_internal(&dir, None, None)
363                    .context(|| format!("in log::OpenOptions::open({:?})", &dir))
364            }
365        }
366    }
367
368    /// Construct an empty in-memory [`Log`] without side-effects on the
369    /// filesystem. The in-memory [`Log`] cannot be [`sync`]ed.
370    pub(crate) fn create_in_memory(&self, dir: GenericPath) -> crate::Result<Log> {
371        assert!(dir.as_opt_path().is_none());
372        let result: crate::Result<_> = (|| {
373            let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
374            let mem_buf = Box::pin(Vec::new());
375            let (disk_buf, indexes) = Log::load_log_and_indexes(
376                &dir,
377                &meta,
378                &self.index_defs,
379                &mem_buf,
380                None,
381                self.fsync,
382            )?;
383            let disk_folds = self.empty_folds();
384            let all_folds = disk_folds.clone();
385            Ok(Log {
386                dir,
387                disk_buf,
388                mem_buf,
389                meta,
390                indexes,
391                disk_folds,
392                all_folds,
393                index_corrupted: false,
394                open_options: self.clone(),
395                reader_lock: None,
396                change_detector: None,
397            })
398        })();
399
400        result.context("in log::OpenOptions::create_in_memory")
401    }
402
403    pub(crate) fn open_with_lock(
404        &self,
405        dir: &GenericPath,
406        lock: &ScopedDirLock,
407    ) -> crate::Result<Log> {
408        self.open_internal(dir, None, Some(lock))
409    }
410
411    // "Back-door" version of "open" that allows reusing indexes.
412    // Used by [`Log::sync`]. See [`Log::load_log_and_indexes`] for when indexes
413    // can be reused.
414    pub(crate) fn open_internal(
415        &self,
416        dir: &GenericPath,
417        reuse_indexes: Option<&Vec<Index>>,
418        lock: Option<&ScopedDirLock>,
419    ) -> crate::Result<Log> {
420        let (reader_lock, change_detector) = match dir.as_opt_path() {
421            Some(d) => {
422                let lock = ScopedDirLock::new_with_options(d, &READER_LOCK_OPTS)?;
423                let detector = lock.shared_change_detector()?;
424                (Some(lock), Some(detector))
425            }
426            None => (None, None),
427        };
428        let create = self.create;
429
430        // Do a lock-less load_or_create_meta to avoid the flock overhead.
431        let meta = Log::load_or_create_meta(dir, false).or_else(|err| {
432            if create {
433                dir.mkdir()
434                    .context("cannot mkdir after failing to read metadata")
435                    .source(err)?;
436                // Make sure check and write happens atomically.
437                if lock.is_some() {
438                    Log::load_or_create_meta(dir, true)
439                } else {
440                    let _lock = dir.lock()?;
441                    Log::load_or_create_meta(dir, true)
442                }
443            } else {
444                Err(err).context(|| format!("cannot open Log at {:?}", &dir))
445            }
446        })?;
447
448        let mem_buf = Box::pin(Vec::new());
449        let (disk_buf, indexes) = Log::load_log_and_indexes(
450            dir,
451            &meta,
452            &self.index_defs,
453            &mem_buf,
454            reuse_indexes,
455            self.fsync,
456        )?;
457        let disk_folds = self.empty_folds();
458        let all_folds = disk_folds.clone();
459        let mut log = Log {
460            dir: dir.clone(),
461            disk_buf,
462            mem_buf,
463            meta,
464            indexes,
465            disk_folds,
466            all_folds,
467            index_corrupted: false,
468            open_options: self.clone(),
469            reader_lock,
470            change_detector,
471        };
472        log.update_indexes_for_on_disk_entries()?;
473        log.update_and_flush_disk_folds()?;
474        log.all_folds = log.disk_folds.clone();
475        let lagging_index_ids = log.lagging_index_ids();
476        if !lagging_index_ids.is_empty() {
477            // Update indexes.
478            // NOTE: Consider ignoring failures if they are caused by permission
479            // issues.
480            if let Some(lock) = lock {
481                log.flush_lagging_indexes(&lagging_index_ids, lock)?;
482                log.dir.write_meta(&log.meta, self.fsync)?;
483            } else {
484                let lock = dir.lock()?;
485                // At this time the Log might be changed on-disk. Reload them.
486                return self.open_internal(dir, reuse_indexes, Some(&lock));
487            }
488        }
489        log.update_change_detector_to_match_meta();
490        Ok(log)
491    }
492
493    pub(crate) fn empty_folds(&self) -> Vec<FoldState> {
494        self.fold_defs.iter().map(|def| def.empty_state()).collect()
495    }
496}
497
498impl IndexOutput {
499    pub(crate) fn into_cow(self, data: &[u8]) -> crate::Result<Cow<[u8]>> {
500        Ok(match self {
501            IndexOutput::Reference(range) => Cow::Borrowed(
502                data.get(range.start as usize..range.end as usize)
503                    .ok_or_else(|| {
504                        let msg = format!(
505                            "IndexFunc returned range {:?} but the data only has {} bytes",
506                            range,
507                            data.len()
508                        );
509                        let mut err = crate::Error::programming(msg);
510                        // If the data is short, add its content to error message.
511                        if data.len() < 128 {
512                            err = err.message(format!("Data = {:?}", data))
513                        }
514                        err
515                    })?,
516            ),
517            IndexOutput::Owned(key) => Cow::Owned(key.into_vec()),
518            IndexOutput::Remove(_) | IndexOutput::RemovePrefix(_) => {
519                return Err(crate::Error::programming(
520                    "into_cow does not support Remove or RemovePrefix",
521                ));
522            }
523        })
524    }
525}
526
527impl fmt::Debug for OpenOptions {
528    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
529        write!(f, "OpenOptions {{ ")?;
530        write!(
531            f,
532            "index_defs: {:?}, ",
533            self.index_defs
534                .iter()
535                .map(|d| d.name.as_str())
536                .collect::<Vec<_>>()
537        )?;
538        write!(
539            f,
540            "fold_defs: {:?}, ",
541            self.fold_defs.iter().map(|d| d.name).collect::<Vec<_>>()
542        )?;
543        write!(f, "fsync: {}, ", self.fsync)?;
544        write!(f, "create: {}, ", self.create)?;
545        write!(f, "checksum_type: {:?}, ", self.checksum_type)?;
546        write!(f, "auto_sync_threshold: {:?}, ", self.auto_sync_threshold)?;
547        let flush_filter_desc = match self.flush_filter {
548            Some(ref _buf) => "Some(_)",
549            None => "None",
550        };
551        write!(f, "flush_filter: {} }}", flush_filter_desc)?;
552        Ok(())
553    }
554}