indexedlog/log/open_options.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::borrow::Cow;
9use std::fmt;
10use std::fmt::Debug;
11use std::ops::Range;
12use std::sync::Arc;
13
14use tracing::debug_span;
15
16use super::fold::Fold;
17use super::fold::FoldDef;
18use super::fold::FoldState;
19use crate::errors::ResultExt;
20use crate::index::Index;
21use crate::lock::ScopedDirLock;
22use crate::lock::READER_LOCK_OPTS;
23use crate::log::GenericPath;
24use crate::log::Log;
25use crate::log::LogMetadata;
26use crate::log::PRIMARY_START_OFFSET;
27
28const INDEX_FILE_PREFIX: &str = "index2-";
29const META_PREFIX: &str = "2-";
30
31/// Definition of an index. It includes: name, function to extract index keys,
32/// and how much the index can lag on disk.
33#[derive(Clone)]
34pub struct IndexDef {
35 /// Function to extract index keys from an entry.
36 ///
37 /// The input is bytes of an entry (ex. the data passed to [`Log::append`]).
38 /// The output is an array of index keys. An entry can have zero or more
39 /// than one index keys for a same index.
40 ///
41 /// The output can be an allocated slice of bytes, or a reference to offsets
42 /// in the input. See [`IndexOutput`] for details.
43 ///
44 /// The function should be pure and fast. i.e. It should not use inputs
45 /// from other things, like the network, filesystem, or an external random
46 /// generator.
47 ///
48 /// For example, if the [`Log`] is to store git commits, and the index is to
49 /// help finding child commits given parent commit hashes as index keys.
50 /// This function gets the commit metadata as input. It then parses the
51 /// input, and extract parent commit hashes as the output. A git commit can
52 /// have 0 or 1 or 2 or even more parents. Therefore the output is a [`Vec`].
53 pub(crate) func: Arc<dyn Fn(&[u8]) -> Vec<IndexOutput> + Send + Sync + 'static>,
54
55 /// Name of the index.
56 ///
57 /// The name will be used as part of the index file name. Therefore do not
58 /// use user-generated content here. And do not abuse this by using `..` or `/`.
59 ///
60 /// When adding new or changing index functions, make sure a different
61 /// `name` is used so the existing index won't be reused incorrectly.
62 pub(crate) name: Arc<String>,
63
64 /// How many bytes (as counted in the file backing [`Log`]) could be left not
65 /// indexed on-disk.
66 ///
67 /// This is related to [`Index`] implementation detail. Since it's append-only
68 /// and needs to write `O(log N)` data for updating a single entry. Allowing
69 /// lagged indexes reduces writes and saves disk space.
70 ///
71 /// The lagged part of the index will be built on-demand in-memory by
72 /// [`Log::open`].
73 ///
74 /// Practically, this correlates to how fast `func` is.
75 pub(crate) lag_threshold: u64,
76}
77
78/// Output of an index function. Bytes that can be used for lookups.
79pub enum IndexOutput {
80 /// The index key is a slice, relative to the data entry (ex. input of the
81 /// index function).
82 ///
83 /// Use this if possible. It generates smaller indexes.
84 Reference(Range<u64>),
85
86 /// The index key is a separate sequence of bytes unrelated to the input
87 /// bytes.
88 ///
89 /// Use this if the index key is not in the entry. For example, if the entry
90 /// is compressed.
91 Owned(Box<[u8]>),
92
93 /// Remove all values associated with the key in the index.
94 ///
95 /// This only affects the index. The entry is not removed in the log.
96 Remove(Box<[u8]>),
97
98 /// Remove all values associated with all keys with the given prefix in the index.
99 ///
100 /// This only affects the index. The entry is not removed in the log.
101 RemovePrefix(Box<[u8]>),
102}
103
104/// What checksum function to use for an entry.
105#[derive(Copy, Clone, Debug, PartialEq)]
106pub enum ChecksumType {
107 /// Choose xxhash64 or xxhash32 automatically based on data size.
108 Auto,
109
110 /// Use xxhash64 checksum algorithm. Efficient on 64bit platforms.
111 Xxhash64,
112
113 /// Use xxhash32 checksum algorithm. It is slower than xxhash64 for 64bit
114 /// platforms, but takes less space. Perhaps a good fit when entries are
115 /// short.
116 Xxhash32,
117}
118
119/// Options used to configured how an [`Log`] is opened.
120#[derive(Clone)]
121pub struct OpenOptions {
122 pub(crate) index_defs: Vec<IndexDef>,
123 pub(crate) fold_defs: Vec<FoldDef>,
124 pub(crate) create: bool,
125 pub(crate) checksum_type: ChecksumType,
126 pub(crate) flush_filter: Option<FlushFilterFunc>,
127 pub(crate) fsync: bool,
128 pub(crate) auto_sync_threshold: Option<u64>,
129}
130
131pub type FlushFilterFunc =
132 fn(
133 &FlushFilterContext,
134 &[u8],
135 ) -> Result<FlushFilterOutput, Box<dyn std::error::Error + Send + Sync + 'static>>;
136
137/// Potentially useful context for the flush filter function.
138pub struct FlushFilterContext<'a> {
139 /// The [`log`] being flushed.
140 pub log: &'a Log,
141}
142
143/// Output of a flush filter.
144pub enum FlushFilterOutput {
145 /// Insert the entry as is.
146 Keep,
147
148 /// Remove this entry.
149 Drop,
150
151 /// Replace this entry with the specified new content.
152 Replace(Vec<u8>),
153}
154
155impl IndexDef {
156 /// Create an index definition.
157 ///
158 /// `index_func` is the function to extract index keys from an entry.
159 ///
160 /// The input is bytes of an entry (ex. the data passed to [`Log::append`]).
161 /// The output is an array of index keys. An entry can have zero or more
162 /// than one index keys for a same index.
163 ///
164 /// The output can be an allocated slice of bytes, or a reference to offsets
165 /// in the input. See [`IndexOutput`] for details.
166 ///
167 /// The function should be pure and fast. i.e. It should not use inputs
168 /// from other things, like the network, filesystem, or an external random
169 /// generator.
170 ///
171 /// For example, if the [`Log`] is to store git commits, and the index is to
172 /// help finding child commits given parent commit hashes as index keys.
173 /// This function gets the commit metadata as input. It then parses the
174 /// input, and extract parent commit hashes as the output. A git commit can
175 /// have 0 or 1 or 2 or even more parents. Therefore the output is a [`Vec`].
176 ///
177 /// `name` is the name of the index.
178 ///
179 /// The name will be used as part of the index file name. Therefore do not
180 /// use user-generated content here. And do not abuse this by using `..` or `/`.
181 ///
182 /// When adding new or changing index functions, make sure a different
183 /// `name` is used so the existing index won't be reused incorrectly.
184 pub fn new(
185 name: impl ToString,
186 index_func: impl Fn(&[u8]) -> Vec<IndexOutput> + Send + Sync + 'static,
187 ) -> Self {
188 Self {
189 func: Arc::new(index_func),
190 name: Arc::new(name.to_string()),
191 // For a typical commit hash index (20-byte). IndexedLog insertion
192 // overhead is about 1500 entries per millisecond. For other things
193 // the xxhash check might take some time. 500 entries takes <1ms
194 // for commit hash index, and might be okay for non-commit-hash
195 // indexes. Users should customize the value if the default is not
196 // good enough.
197 lag_threshold: 25 * 500,
198 }
199 }
200
201 /// Set how many bytes (as counted in the file backing [`Log`]) could be left
202 /// not indexed on-disk.
203 ///
204 /// This is related to [`Index`] implementation detail. Since it's append-only
205 /// and needs to write `O(log N)` data for updating a single entry. Allowing
206 /// lagged indexes reduces writes and saves disk space.
207 ///
208 /// The lagged part of the index will be built on-demand in-memory by
209 /// [`Log::open`].
210 ///
211 /// Practically, this correlates to how fast `func` is.
212 pub fn lag_threshold(self, lag_threshold: u64) -> Self {
213 Self {
214 func: self.func,
215 name: self.name,
216 lag_threshold,
217 }
218 }
219
220 /// Name used in log metadata.
221 pub(crate) fn metaname(&self) -> String {
222 format!("{}{}", META_PREFIX, self.name)
223 }
224
225 /// Name used in filesystem.
226 pub(crate) fn filename(&self) -> String {
227 format!("{}{}", INDEX_FILE_PREFIX, self.name)
228 }
229}
230
231impl OpenOptions {
232 #[allow(clippy::new_without_default)]
233 /// Creates a blank new set of options ready for configuration.
234 ///
235 /// `create` is initially `false`.
236 /// `fsync` is initially `false`.
237 /// `index_defs` is initially empty.
238 /// `auto_sync_threshold` is initially `None`.
239 pub fn new() -> Self {
240 Self {
241 create: false,
242 index_defs: Vec::new(),
243 fold_defs: Vec::new(),
244 checksum_type: ChecksumType::Auto,
245 flush_filter: None,
246 fsync: false,
247 auto_sync_threshold: None,
248 }
249 }
250
251 /// Set fsync behavior.
252 ///
253 /// If true, then [`Log::sync`] will use `fsync` to flush log and index
254 /// data to the physical device before returning.
255 pub fn fsync(mut self, fsync: bool) -> Self {
256 self.fsync = fsync;
257 self
258 }
259
260 /// Add an index function.
261 ///
262 /// This is a convenient way to define indexes without using [`IndexDef`]
263 /// explicitly.
264 pub fn index(mut self, name: &'static str, func: fn(&[u8]) -> Vec<IndexOutput>) -> Self {
265 self.index_defs.push(IndexDef::new(name, func));
266 self
267 }
268
269 /// Add a "fold" definition. See [`FoldDef`] and [`Fold`] for details.
270 pub fn fold_def(mut self, name: &'static str, create_fold: fn() -> Box<dyn Fold>) -> Self {
271 self.fold_defs.push(FoldDef::new(name, create_fold));
272 self
273 }
274
275 /// Sets index definitions.
276 ///
277 /// See [`IndexDef::new`] for details.
278 pub fn index_defs(mut self, index_defs: Vec<IndexDef>) -> Self {
279 self.index_defs = index_defs;
280 self
281 }
282
283 /// Sets the option for whether creating a new [`Log`] if it does not exist.
284 ///
285 /// If set to `true`, [`OpenOptions::open`] will create the [`Log`] on demand if
286 /// it does not already exist. If set to `false`, [`OpenOptions::open`] will
287 /// fail if the log does not exist.
288 pub fn create(mut self, create: bool) -> Self {
289 self.create = create;
290 self
291 }
292
293 /// Sets whether to call [`Log::sync`] automatically when the in-memory
294 /// buffer exceeds some size threshold.
295 /// - `None`: Do not call `sync` automatically.
296 /// - `Some(size)`: Call `sync` when the in-memory buffer exceeds `size`.
297 /// - `Some(0)`: Call `sync` after every `append` automatically.
298 pub fn auto_sync_threshold(mut self, threshold: impl Into<Option<u64>>) -> Self {
299 self.auto_sync_threshold = threshold.into();
300 self
301 }
302
303 /// Sets the checksum type.
304 ///
305 /// See [`ChecksumType`] for details.
306 pub fn checksum_type(mut self, checksum_type: ChecksumType) -> Self {
307 self.checksum_type = checksum_type;
308 self
309 }
310
311 /// Sets the flush filter function.
312 ///
313 /// The function will be called at [`Log::sync`] time, if there are
314 /// changes to the `log` since `open` (or last `sync`) time.
315 ///
316 /// The filter function can be used to avoid writing content that already
317 /// exists in the [`Log`], or rewrite content as needed.
318 pub fn flush_filter(mut self, flush_filter: Option<FlushFilterFunc>) -> Self {
319 self.flush_filter = flush_filter;
320 self
321 }
322
323 /// Remove index lagging.
324 ///
325 /// Used by `RotateLog` to make sure old logs have complete indexes.
326 pub(crate) fn with_zero_index_lag(mut self) -> Self {
327 for def in self.index_defs.iter_mut() {
328 def.lag_threshold = 0;
329 }
330 self
331 }
332
333 /// Construct [`Log`] at given directory. Incrementally build up specified
334 /// indexes.
335 ///
336 /// If the directory does not exist and `create` is set to `true`, it will
337 /// be created with essential files populated. After that, an empty [`Log`]
338 /// will be returned. Otherwise, `open` will fail.
339 ///
340 /// See [`IndexDef`] for index definitions. Indexes can be added, removed, or
341 /// reordered, as long as a same `name` indicates a same index function.
342 /// That is, when an index function is changed, the caller is responsible
343 /// for changing the index name.
344 ///
345 /// Driven by the "immutable by default" idea, together with append-only
346 /// properties, this structure is different from some traditional *mutable*
347 /// databases backed by the filesystem:
348 /// - Data are kind of "snapshotted and frozen" at open time. Mutating
349 /// files do not affect the view of instantiated [`Log`]s.
350 /// - Writes are buffered until [`Log::sync`] is called.
351 /// This maps to traditional "database transaction" concepts: a [`Log`] is
352 /// always bounded to a transaction. [`Log::sync`] is like committing the
353 /// transaction. Dropping the [`Log`] instance is like abandoning a
354 /// transaction.
355 pub fn open(&self, dir: impl Into<GenericPath>) -> crate::Result<Log> {
356 let dir = dir.into();
357 match dir.as_opt_path() {
358 None => self.create_in_memory(dir),
359 Some(fs_dir) => {
360 let span = debug_span!("Log::open", dir = &fs_dir.to_string_lossy().as_ref());
361 let _guard = span.enter();
362 self.open_internal(&dir, None, None)
363 .context(|| format!("in log::OpenOptions::open({:?})", &dir))
364 }
365 }
366 }
367
368 /// Construct an empty in-memory [`Log`] without side-effects on the
369 /// filesystem. The in-memory [`Log`] cannot be [`sync`]ed.
370 pub(crate) fn create_in_memory(&self, dir: GenericPath) -> crate::Result<Log> {
371 assert!(dir.as_opt_path().is_none());
372 let result: crate::Result<_> = (|| {
373 let meta = LogMetadata::new_with_primary_len(PRIMARY_START_OFFSET);
374 let mem_buf = Box::pin(Vec::new());
375 let (disk_buf, indexes) = Log::load_log_and_indexes(
376 &dir,
377 &meta,
378 &self.index_defs,
379 &mem_buf,
380 None,
381 self.fsync,
382 )?;
383 let disk_folds = self.empty_folds();
384 let all_folds = disk_folds.clone();
385 Ok(Log {
386 dir,
387 disk_buf,
388 mem_buf,
389 meta,
390 indexes,
391 disk_folds,
392 all_folds,
393 index_corrupted: false,
394 open_options: self.clone(),
395 reader_lock: None,
396 change_detector: None,
397 })
398 })();
399
400 result.context("in log::OpenOptions::create_in_memory")
401 }
402
403 pub(crate) fn open_with_lock(
404 &self,
405 dir: &GenericPath,
406 lock: &ScopedDirLock,
407 ) -> crate::Result<Log> {
408 self.open_internal(dir, None, Some(lock))
409 }
410
411 // "Back-door" version of "open" that allows reusing indexes.
412 // Used by [`Log::sync`]. See [`Log::load_log_and_indexes`] for when indexes
413 // can be reused.
414 pub(crate) fn open_internal(
415 &self,
416 dir: &GenericPath,
417 reuse_indexes: Option<&Vec<Index>>,
418 lock: Option<&ScopedDirLock>,
419 ) -> crate::Result<Log> {
420 let (reader_lock, change_detector) = match dir.as_opt_path() {
421 Some(d) => {
422 let lock = ScopedDirLock::new_with_options(d, &READER_LOCK_OPTS)?;
423 let detector = lock.shared_change_detector()?;
424 (Some(lock), Some(detector))
425 }
426 None => (None, None),
427 };
428 let create = self.create;
429
430 // Do a lock-less load_or_create_meta to avoid the flock overhead.
431 let meta = Log::load_or_create_meta(dir, false).or_else(|err| {
432 if create {
433 dir.mkdir()
434 .context("cannot mkdir after failing to read metadata")
435 .source(err)?;
436 // Make sure check and write happens atomically.
437 if lock.is_some() {
438 Log::load_or_create_meta(dir, true)
439 } else {
440 let _lock = dir.lock()?;
441 Log::load_or_create_meta(dir, true)
442 }
443 } else {
444 Err(err).context(|| format!("cannot open Log at {:?}", &dir))
445 }
446 })?;
447
448 let mem_buf = Box::pin(Vec::new());
449 let (disk_buf, indexes) = Log::load_log_and_indexes(
450 dir,
451 &meta,
452 &self.index_defs,
453 &mem_buf,
454 reuse_indexes,
455 self.fsync,
456 )?;
457 let disk_folds = self.empty_folds();
458 let all_folds = disk_folds.clone();
459 let mut log = Log {
460 dir: dir.clone(),
461 disk_buf,
462 mem_buf,
463 meta,
464 indexes,
465 disk_folds,
466 all_folds,
467 index_corrupted: false,
468 open_options: self.clone(),
469 reader_lock,
470 change_detector,
471 };
472 log.update_indexes_for_on_disk_entries()?;
473 log.update_and_flush_disk_folds()?;
474 log.all_folds = log.disk_folds.clone();
475 let lagging_index_ids = log.lagging_index_ids();
476 if !lagging_index_ids.is_empty() {
477 // Update indexes.
478 // NOTE: Consider ignoring failures if they are caused by permission
479 // issues.
480 if let Some(lock) = lock {
481 log.flush_lagging_indexes(&lagging_index_ids, lock)?;
482 log.dir.write_meta(&log.meta, self.fsync)?;
483 } else {
484 let lock = dir.lock()?;
485 // At this time the Log might be changed on-disk. Reload them.
486 return self.open_internal(dir, reuse_indexes, Some(&lock));
487 }
488 }
489 log.update_change_detector_to_match_meta();
490 Ok(log)
491 }
492
493 pub(crate) fn empty_folds(&self) -> Vec<FoldState> {
494 self.fold_defs.iter().map(|def| def.empty_state()).collect()
495 }
496}
497
498impl IndexOutput {
499 pub(crate) fn into_cow(self, data: &[u8]) -> crate::Result<Cow<[u8]>> {
500 Ok(match self {
501 IndexOutput::Reference(range) => Cow::Borrowed(
502 data.get(range.start as usize..range.end as usize)
503 .ok_or_else(|| {
504 let msg = format!(
505 "IndexFunc returned range {:?} but the data only has {} bytes",
506 range,
507 data.len()
508 );
509 let mut err = crate::Error::programming(msg);
510 // If the data is short, add its content to error message.
511 if data.len() < 128 {
512 err = err.message(format!("Data = {:?}", data))
513 }
514 err
515 })?,
516 ),
517 IndexOutput::Owned(key) => Cow::Owned(key.into_vec()),
518 IndexOutput::Remove(_) | IndexOutput::RemovePrefix(_) => {
519 return Err(crate::Error::programming(
520 "into_cow does not support Remove or RemovePrefix",
521 ));
522 }
523 })
524 }
525}
526
527impl fmt::Debug for OpenOptions {
528 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
529 write!(f, "OpenOptions {{ ")?;
530 write!(
531 f,
532 "index_defs: {:?}, ",
533 self.index_defs
534 .iter()
535 .map(|d| d.name.as_str())
536 .collect::<Vec<_>>()
537 )?;
538 write!(
539 f,
540 "fold_defs: {:?}, ",
541 self.fold_defs.iter().map(|d| d.name).collect::<Vec<_>>()
542 )?;
543 write!(f, "fsync: {}, ", self.fsync)?;
544 write!(f, "create: {}, ", self.create)?;
545 write!(f, "checksum_type: {:?}, ", self.checksum_type)?;
546 write!(f, "auto_sync_threshold: {:?}, ", self.auto_sync_threshold)?;
547 let flush_filter_desc = match self.flush_filter {
548 Some(ref _buf) => "Some(_)",
549 None => "None",
550 };
551 write!(f, "flush_filter: {} }}", flush_filter_desc)?;
552 Ok(())
553 }
554}