lsm_tree/config/mod.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod block_size;
6mod compression;
7mod filter;
8mod hash_ratio;
9mod pinning;
10mod restart_interval;
11
12pub use block_size::BlockSizePolicy;
13pub use compression::CompressionPolicy;
14pub use filter::{BloomConstructionPolicy, FilterPolicy, FilterPolicyEntry};
15pub use hash_ratio::HashRatioPolicy;
16pub use pinning::PinningPolicy;
17pub use restart_interval::RestartIntervalPolicy;
18
19/// Partitioning policy for indexes and filters
20pub type PartitioningPolicy = PinningPolicy;
21
22use crate::{
23 AnyTree, BlobTree, Cache, CompressionType, DescriptorTable, SequenceNumberCounter,
24 SharedSequenceNumberGenerator, Tree,
25 compaction::filter::Factory,
26 comparator::{self, SharedComparator},
27 encryption::EncryptionProvider,
28 file::TABLES_FOLDER,
29 fs::{Fs, StdFs},
30 merge_operator::MergeOperator,
31 path::absolute_path,
32 prefix::PrefixExtractor,
33 version::DEFAULT_LEVEL_COUNT,
34};
35use std::{
36 ops::Range,
37 path::{Path, PathBuf},
38 sync::Arc,
39};
40
41/// Per-level filesystem routing entry for tiered storage.
42///
43/// Maps a range of LSM levels to a base directory and filesystem backend.
44/// Tables at these levels are stored under `path/tables/`.
45///
46/// # Example
47///
48/// ```
49/// use lsm_tree::config::LevelRoute;
50/// use lsm_tree::fs::StdFs;
51/// use std::sync::Arc;
52///
53/// // Hot tier: L0-L1 on NVMe
54/// let hot = LevelRoute {
55/// levels: 0..2,
56/// path: "/mnt/nvme/db".into(),
57/// fs: Arc::new(StdFs),
58/// };
59///
60/// // Cold tier: L4-L6 on HDD
61/// let cold = LevelRoute {
62/// levels: 4..7,
63/// path: "/mnt/hdd/db".into(),
64/// fs: Arc::new(StdFs),
65/// };
66/// ```
67#[derive(Clone)]
68pub struct LevelRoute {
69 /// LSM levels this route covers (e.g., `0..2` for L0–L1).
70 pub levels: Range<u8>,
71
72 /// Base data directory for tables at these levels.
73 pub path: PathBuf,
74
75 /// Filesystem backend for I/O at these levels.
76 pub fs: Arc<dyn Fs>,
77}
78
79impl std::fmt::Debug for LevelRoute {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("LevelRoute")
82 .field("levels", &self.levels)
83 .field("path", &self.path)
84 .finish_non_exhaustive()
85 }
86}
87
88/// LSM-tree type
89#[derive(Copy, Clone, Debug, PartialEq, Eq)]
90pub enum TreeType {
91 /// Standard LSM-tree, see [`Tree`]
92 Standard,
93
94 /// Key-value separated LSM-tree, see [`BlobTree`]
95 Blob,
96}
97
98impl From<TreeType> for u8 {
99 fn from(val: TreeType) -> Self {
100 match val {
101 TreeType::Standard => 0,
102 TreeType::Blob => 1,
103 }
104 }
105}
106
107impl TryFrom<u8> for TreeType {
108 type Error = ();
109
110 fn try_from(value: u8) -> Result<Self, Self::Error> {
111 match value {
112 0 => Ok(Self::Standard),
113 1 => Ok(Self::Blob),
114 _ => Err(()),
115 }
116 }
117}
118
119const DEFAULT_FILE_FOLDER: &str = ".lsm.data";
120
121/// Options for key-value separation
122#[derive(Clone, Debug, PartialEq)]
123pub struct KvSeparationOptions {
124 /// What type of compression is used for blobs
125 #[doc(hidden)]
126 pub compression: CompressionType,
127
128 /// Blob file target size in bytes
129 #[doc(hidden)]
130 pub file_target_size: u64,
131
132 /// Key-value separation threshold in bytes
133 #[doc(hidden)]
134 pub separation_threshold: u32,
135
136 #[doc(hidden)]
137 pub staleness_threshold: f32,
138
139 #[doc(hidden)]
140 pub age_cutoff: f32,
141
142 /// Pre-trained zstd dictionary for blob-file dictionary compression.
143 ///
144 /// Required when `compression` is [`CompressionType::ZstdDict`].
145 /// The `dict_id` in the compression type must match [`ZstdDictionary::id`].
146 #[cfg(zstd_any)]
147 #[doc(hidden)]
148 pub zstd_dictionary: Option<std::sync::Arc<crate::compression::ZstdDictionary>>,
149}
150
151impl Default for KvSeparationOptions {
152 fn default() -> Self {
153 Self {
154 #[cfg(feature="lz4")]
155 compression: CompressionType::Lz4,
156
157 #[cfg(not(feature="lz4"))]
158 compression: CompressionType::None,
159
160 file_target_size: /* 64 MiB */ 64 * 1_024 * 1_024,
161 separation_threshold: /* 1 KiB */ 1_024,
162
163 staleness_threshold: 0.25,
164 age_cutoff: 0.25,
165
166 #[cfg(zstd_any)]
167 zstd_dictionary: None,
168 }
169 }
170}
171
172impl KvSeparationOptions {
173 /// Sets the blob compression method.
174 #[must_use]
175 pub fn compression(mut self, compression: CompressionType) -> Self {
176 self.compression = compression;
177 self
178 }
179
180 /// Sets the target size of blob files.
181 ///
182 /// Smaller blob files allow more granular garbage collection
183 /// which allows lower space amp for lower write I/O cost.
184 ///
185 /// Larger blob files decrease the number of files on disk and maintenance
186 /// overhead.
187 ///
188 /// Defaults to 64 MiB.
189 #[must_use]
190 pub fn file_target_size(mut self, bytes: u64) -> Self {
191 self.file_target_size = bytes;
192 self
193 }
194
195 /// Sets the key-value separation threshold in bytes.
196 ///
197 /// Smaller value will reduce compaction overhead and thus write amplification,
198 /// at the cost of lower read performance.
199 ///
200 /// Defaults to 1 KiB.
201 #[must_use]
202 pub fn separation_threshold(mut self, bytes: u32) -> Self {
203 self.separation_threshold = bytes;
204 self
205 }
206
207 /// Sets the staleness threshold percentage.
208 ///
209 /// The staleness percentage determines how much a blob file needs to be fragmented to be
210 /// picked up by the garbage collection.
211 ///
212 /// Defaults to 33%.
213 #[must_use]
214 pub fn staleness_threshold(mut self, ratio: f32) -> Self {
215 self.staleness_threshold = ratio;
216 self
217 }
218
219 /// Sets the age cutoff threshold.
220 ///
221 /// Defaults to 20%.
222 #[must_use]
223 pub fn age_cutoff(mut self, ratio: f32) -> Self {
224 self.age_cutoff = ratio;
225 self
226 }
227
228 /// Sets the zstd dictionary for blob-file dictionary compression.
229 ///
230 /// Required when [`compression`](Self::compression) is set to
231 /// [`CompressionType::ZstdDict`]. The `dict_id` encoded in the
232 /// compression type must equal [`ZstdDictionary::id()`] of the
233 /// supplied dictionary; [`Config::open`] will return
234 /// [`Error::ZstdDictMismatch`](crate::Error::ZstdDictMismatch) if
235 /// they disagree.
236 #[cfg(zstd_any)]
237 #[must_use]
238 pub fn dict(mut self, dictionary: std::sync::Arc<crate::compression::ZstdDictionary>) -> Self {
239 self.zstd_dictionary = Some(dictionary);
240 self
241 }
242}
243
244/// Tree configuration builder
245pub struct Config {
246 /// Folder path
247 #[doc(hidden)]
248 pub path: PathBuf,
249
250 /// Default filesystem backend for levels without an explicit route.
251 ///
252 /// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an
253 /// alternative backend such as [`MemFs`](crate::fs::MemFs).
254 ///
255 /// Both fresh tree creation and reopening (recovery) are supported
256 /// for any backend that implements [`Fs`].
257 #[doc(hidden)]
258 pub fs: Arc<dyn Fs>,
259
260 /// Per-level filesystem routing for tiered storage.
261 ///
262 /// When set, tables at different LSM levels can be stored on different
263 /// storage devices (e.g., NVMe for L0–L1, SSD for L2–L4, HDD for L5–L6).
264 /// Each entry maps a range of levels to a base directory and filesystem
265 /// backend. Uncovered levels fall back to the primary `path` and `fs`.
266 ///
267 /// Zero additional overhead when `None` — only a single branch check;
268 /// path construction allocations are unchanged.
269 #[doc(hidden)]
270 pub level_routes: Option<Vec<LevelRoute>>,
271
272 /// Block cache to use
273 #[doc(hidden)]
274 pub cache: Arc<Cache>,
275
276 /// Descriptor table to use
277 #[doc(hidden)]
278 pub descriptor_table: Option<Arc<DescriptorTable>>,
279
280 /// Number of levels of the LSM tree (depth of tree)
281 ///
282 /// Once set, the level count is fixed (in the "manifest" file)
283 pub level_count: u8,
284
285 /// What type of compression is used for data blocks
286 pub data_block_compression_policy: CompressionPolicy,
287
288 /// What type of compression is used for index blocks
289 pub index_block_compression_policy: CompressionPolicy,
290
291 /// Restart interval inside data blocks
292 pub data_block_restart_interval_policy: RestartIntervalPolicy,
293
294 /// Restart interval inside index blocks
295 pub index_block_restart_interval_policy: RestartIntervalPolicy,
296
297 /// Block size of data blocks
298 pub data_block_size_policy: BlockSizePolicy,
299
300 /// Whether to pin index blocks
301 pub index_block_pinning_policy: PinningPolicy,
302
303 /// Whether to pin filter blocks
304 pub filter_block_pinning_policy: PinningPolicy,
305
306 /// Whether to pin top level index of partitioned index
307 pub top_level_index_block_pinning_policy: PinningPolicy,
308
309 /// Whether to pin top level index of partitioned filter
310 pub top_level_filter_block_pinning_policy: PinningPolicy,
311
312 /// Data block hash ratio
313 pub data_block_hash_ratio_policy: HashRatioPolicy,
314
315 /// Whether to partition index blocks
316 pub index_block_partitioning_policy: PartitioningPolicy,
317
318 /// Whether to partition filter blocks
319 pub filter_block_partitioning_policy: PartitioningPolicy,
320
321 /// Partition size when using partitioned indexes
322 pub index_block_partition_size_policy: BlockSizePolicy,
323
324 /// Partition size when using partitioned filters
325 pub filter_block_partition_size_policy: BlockSizePolicy,
326
327 /// If `true`, the last level will not build filters, reducing the filter size of a database
328 /// by ~90% typically
329 pub(crate) expect_point_read_hits: bool,
330
331 /// Filter construction policy
332 pub filter_policy: FilterPolicy,
333
334 /// Compaction filter factory
335 pub compaction_filter_factory: Option<Arc<dyn Factory>>,
336
337 /// Prefix extractor for prefix bloom filters.
338 ///
339 /// When set, the bloom filter indexes extracted prefixes in addition to
340 /// full keys, allowing prefix scans to skip segments that contain no
341 /// matching prefixes.
342 pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
343
344 /// Merge operator for commutative operations
345 ///
346 /// When set, enables `merge()` operations that store partial updates
347 /// which are lazily combined during reads and compaction.
348 pub merge_operator: Option<Arc<dyn MergeOperator>>,
349
350 #[doc(hidden)]
351 pub kv_separation_opts: Option<KvSeparationOptions>,
352
353 /// Custom user key comparator.
354 ///
355 /// When set, all key comparisons use this comparator instead of the
356 /// default lexicographic byte ordering. Once a tree is opened with a
357 /// comparator, it must always be re-opened with the same comparator.
358 // Not `pub` — use `Config::comparator()` builder method as the public API.
359 #[doc(hidden)]
360 pub(crate) comparator: SharedComparator,
361
362 /// Block-level encryption provider for encryption at rest.
363 ///
364 /// When set, all blocks (data, index, filter, meta) are encrypted
365 /// using this provider after compression and before checksumming.
366 pub(crate) encryption: Option<Arc<dyn EncryptionProvider>>,
367
368 /// Pre-trained zstd dictionary for dictionary compression.
369 ///
370 /// When set together with a [`CompressionType::ZstdDict`] compression
371 /// policy, data blocks are compressed using this dictionary. The
372 /// dictionary must remain the same for the lifetime of the tree —
373 /// opening a tree with a different dictionary will produce
374 /// [`Error::ZstdDictMismatch`](crate::Error::ZstdDictMismatch) errors.
375 #[cfg(zstd_any)]
376 pub(crate) zstd_dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
377
378 /// The global sequence number generator.
379 ///
380 /// Should be shared between multiple trees of a database.
381 pub(crate) seqno: SharedSequenceNumberGenerator,
382
383 /// Sequence number watermark that is visible to readers.
384 ///
385 /// Used for MVCC snapshots and to control which updates are
386 /// observable in a given view of the database.
387 pub(crate) visible_seqno: SharedSequenceNumberGenerator,
388}
389
390// TODO: remove default?
391impl Default for Config {
392 fn default() -> Self {
393 Self {
394 path: absolute_path(Path::new(DEFAULT_FILE_FOLDER)),
395 fs: Arc::new(StdFs),
396 level_routes: None,
397 descriptor_table: Some(Arc::new(DescriptorTable::new(256))),
398 seqno: SharedSequenceNumberGenerator::from(SequenceNumberCounter::default()),
399 visible_seqno: SharedSequenceNumberGenerator::from(SequenceNumberCounter::default()),
400
401 cache: Arc::new(Cache::with_capacity_bytes(
402 /* 16 MiB */ 16 * 1_024 * 1_024,
403 )),
404
405 data_block_restart_interval_policy: RestartIntervalPolicy::all(16),
406 index_block_restart_interval_policy: RestartIntervalPolicy::all(1),
407
408 level_count: DEFAULT_LEVEL_COUNT,
409
410 data_block_size_policy: BlockSizePolicy::all(4_096),
411
412 index_block_pinning_policy: PinningPolicy::new([true, true, false]),
413 filter_block_pinning_policy: PinningPolicy::new([true, false]),
414
415 top_level_index_block_pinning_policy: PinningPolicy::all(true), // TODO: implement
416 top_level_filter_block_pinning_policy: PinningPolicy::all(true), // TODO: implement
417
418 index_block_partitioning_policy: PinningPolicy::new([false, false, false, true]),
419 filter_block_partitioning_policy: PinningPolicy::new([false, false, false, true]),
420
421 index_block_partition_size_policy: BlockSizePolicy::all(4_096), // TODO: implement
422 filter_block_partition_size_policy: BlockSizePolicy::all(4_096), // TODO: implement
423
424 data_block_compression_policy: ({
425 #[cfg(feature = "lz4")]
426 let c = CompressionPolicy::new([CompressionType::None, CompressionType::Lz4]);
427
428 #[cfg(not(feature = "lz4"))]
429 let c = CompressionPolicy::new([CompressionType::None]);
430
431 c
432 }),
433 index_block_compression_policy: CompressionPolicy::all(CompressionType::None),
434
435 data_block_hash_ratio_policy: HashRatioPolicy::all(0.0),
436
437 filter_policy: FilterPolicy::all(FilterPolicyEntry::Bloom(
438 BloomConstructionPolicy::BitsPerKey(10.0),
439 )),
440
441 compaction_filter_factory: None,
442 merge_operator: None,
443
444 prefix_extractor: None,
445
446 expect_point_read_hits: false,
447
448 kv_separation_opts: None,
449
450 #[cfg(zstd_any)]
451 zstd_dictionary: None,
452
453 comparator: comparator::default_comparator(),
454 encryption: None,
455 }
456 }
457}
458
459impl Config {
460 /// Initializes a new config
461 pub fn new<P: AsRef<Path>>(
462 path: P,
463 seqno: SequenceNumberCounter,
464 visible_seqno: SequenceNumberCounter,
465 ) -> Self {
466 Self {
467 path: absolute_path(path.as_ref()),
468 seqno: Arc::new(seqno),
469 visible_seqno: Arc::new(visible_seqno),
470 ..Default::default()
471 }
472 }
473
474 /// Sets the default filesystem backend used for levels without an explicit route.
475 ///
476 /// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for
477 /// in-memory trees (testing, ephemeral indexes).
478 ///
479 /// # Example
480 ///
481 /// ```
482 /// # fn main() -> lsm_tree::Result<()> {
483 /// use lsm_tree::{Config, SequenceNumberCounter};
484 /// use lsm_tree::fs::MemFs;
485 ///
486 /// let tree = Config::new(
487 /// "/virtual/tree",
488 /// SequenceNumberCounter::default(),
489 /// SequenceNumberCounter::default(),
490 /// )
491 /// .with_fs(MemFs::new())
492 /// .open()?;
493 /// # Ok(())
494 /// # }
495 /// ```
496 #[must_use]
497 pub fn with_fs<F: Fs>(mut self, fs: F) -> Self {
498 self.fs = Arc::new(fs);
499 self
500 }
501
502 /// Sets the default filesystem backend from an existing shared handle.
503 ///
504 /// Useful when multiple configs should reuse the same backend
505 /// instance, including trait objects and backends that are not `Clone`.
506 ///
507 #[must_use]
508 pub fn with_shared_fs(mut self, fs: Arc<dyn Fs>) -> Self {
509 self.fs = fs;
510 self
511 }
512
513 /// Opens a tree using the config.
514 ///
515 /// # Errors
516 ///
517 /// Will return `Err` if an IO error occurs.
518 /// Returns [`Error::ZstdDictMismatch`](crate::Error::ZstdDictMismatch) if
519 /// the compression policy references a `dict_id` that doesn't match the
520 /// configured dictionary.
521 pub fn open(self) -> crate::Result<AnyTree> {
522 #[cfg(zstd_any)]
523 self.validate_zstd_dictionary()?;
524
525 Ok(if self.kv_separation_opts.is_some() {
526 AnyTree::Blob(BlobTree::open(self)?)
527 } else {
528 AnyTree::Standard(Tree::open(self)?)
529 })
530 }
531
532 /// Validates that every `ZstdDict` entry in compression policies references
533 /// a `dict_id` that matches the configured dictionary. Catches mismatches
534 /// at open time rather than at first block write/read.
535 #[cfg(zstd_any)]
536 fn validate_zstd_dictionary(&self) -> crate::Result<()> {
537 let dict_id = self.zstd_dictionary.as_ref().map(|d| d.id());
538
539 // NOTE: Only data block policies are validated. Index blocks never
540 // carry a dictionary — Writer::use_index_block_compression() downgrades
541 // ZstdDict to plain Zstd. Validating index policies here would reject
542 // configs that use ZstdDict solely for index blocks even though the
543 // writer handles them correctly.
544 for ct in self.data_block_compression_policy.iter() {
545 if let &CompressionType::ZstdDict {
546 dict_id: required, ..
547 } = ct
548 {
549 match dict_id {
550 None => {
551 return Err(crate::Error::ZstdDictMismatch {
552 expected: required,
553 got: None,
554 });
555 }
556 Some(actual) if actual != required => {
557 return Err(crate::Error::ZstdDictMismatch {
558 expected: required,
559 got: Some(actual),
560 });
561 }
562 _ => {}
563 }
564 }
565 }
566
567 // Blob files with ZstdDict compression must have a matching dictionary.
568 if let Some(ref kv_opts) = self.kv_separation_opts
569 && let CompressionType::ZstdDict {
570 dict_id: required, ..
571 } = kv_opts.compression
572 {
573 match kv_opts.zstd_dictionary.as_ref().map(|d| d.id()) {
574 None => {
575 return Err(crate::Error::ZstdDictMismatch {
576 expected: required,
577 got: None,
578 });
579 }
580 Some(actual) if actual != required => {
581 return Err(crate::Error::ZstdDictMismatch {
582 expected: required,
583 got: Some(actual),
584 });
585 }
586 _ => {}
587 }
588 }
589
590 Ok(())
591 }
592
593 /// Like [`Config::new`], but accepts pre-built shared generators.
594 ///
595 /// This is useful when the caller already has
596 /// [`SharedSequenceNumberGenerator`] instances (e.g., from a higher-level
597 /// database that shares generators across multiple trees).
598 pub fn new_with_generators<P: AsRef<Path>>(
599 path: P,
600 seqno: SharedSequenceNumberGenerator,
601 visible_seqno: SharedSequenceNumberGenerator,
602 ) -> Self {
603 Self {
604 path: absolute_path(path.as_ref()),
605 seqno,
606 visible_seqno,
607 ..Default::default()
608 }
609 }
610}
611
612#[cfg(all(test, zstd_any))]
613mod tests {
614 use super::*;
615 use crate::{CompressionType, SequenceNumberCounter, compression::ZstdDictionary};
616 use std::sync::Arc;
617
618 #[test]
619 fn blob_zstd_dict_no_dict_is_rejected() {
620 // ZstdDict compression for blobs without providing a dictionary must fail.
621 let folder = tempfile::tempdir().unwrap_or_else(|err| panic!("tempdir failed: {err}"));
622 let cfg = Config::new(
623 folder.path(),
624 SequenceNumberCounter::default(),
625 SequenceNumberCounter::default(),
626 )
627 .with_kv_separation(Some(KvSeparationOptions::default().compression(
628 CompressionType::ZstdDict {
629 level: 3,
630 dict_id: 7,
631 },
632 )));
633
634 assert!(
635 matches!(
636 cfg.validate_zstd_dictionary(),
637 Err(crate::Error::ZstdDictMismatch {
638 expected: 7,
639 got: None
640 })
641 ),
642 "expected ZstdDictMismatch when no dictionary is supplied",
643 );
644 }
645
646 #[test]
647 fn blob_zstd_dict_id_mismatch_is_rejected() {
648 // ZstdDict compression with a dictionary whose id doesn't match the
649 // compression type's dict_id must fail.
650 let folder = tempfile::tempdir().unwrap_or_else(|err| panic!("tempdir failed: {err}"));
651 let dict = Arc::new(ZstdDictionary::new(b"sample training data for test"));
652 let wrong_dict_id = dict.id().wrapping_add(1);
653 let cfg = Config::new(
654 folder.path(),
655 SequenceNumberCounter::default(),
656 SequenceNumberCounter::default(),
657 )
658 .with_kv_separation(Some(
659 KvSeparationOptions::default()
660 .compression(CompressionType::ZstdDict {
661 level: 3,
662 dict_id: wrong_dict_id,
663 })
664 .dict(Arc::clone(&dict)),
665 ));
666
667 assert!(
668 matches!(
669 cfg.validate_zstd_dictionary(),
670 Err(crate::Error::ZstdDictMismatch { .. })
671 ),
672 "expected ZstdDictMismatch when dict_id doesn't match dictionary",
673 );
674 }
675
676 #[test]
677 fn blob_zstd_dict_matching_dict_is_accepted() {
678 // ZstdDict compression with a correctly matching dictionary must succeed.
679 let folder = tempfile::tempdir().unwrap_or_else(|err| panic!("tempdir failed: {err}"));
680 let dict = Arc::new(ZstdDictionary::new(b"sample training data for test"));
681 let cfg = Config::new(
682 folder.path(),
683 SequenceNumberCounter::default(),
684 SequenceNumberCounter::default(),
685 )
686 .with_kv_separation(Some(
687 KvSeparationOptions::default()
688 .compression(CompressionType::ZstdDict {
689 level: 3,
690 dict_id: dict.id(),
691 })
692 .dict(Arc::clone(&dict)),
693 ));
694
695 assert!(
696 cfg.validate_zstd_dictionary().is_ok(),
697 "matching dictionary must be accepted",
698 );
699 }
700}
701
702impl Config {
703 /// Returns the tables folder path and [`Fs`] backend for the given level.
704 ///
705 /// If [`level_routes`](Self::level_routes) has an entry covering this
706 /// level, uses that entry's path and `Fs`. Otherwise falls back to the
707 /// primary [`path`](Self::path) and [`fs`](Self::fs).
708 #[must_use]
709 pub fn tables_folder_for_level(&self, level: u8) -> (PathBuf, Arc<dyn Fs>) {
710 if let Some(routes) = &self.level_routes {
711 for route in routes {
712 if route.levels.contains(&level) {
713 return (route.path.join(TABLES_FOLDER), route.fs.clone());
714 }
715 }
716 }
717 (self.path.join(TABLES_FOLDER), self.fs.clone())
718 }
719
720 /// Returns all unique tables folders that need to be scanned during
721 /// recovery: the primary folder plus every [`LevelRoute`] folder.
722 #[must_use]
723 pub fn all_tables_folders(&self) -> Vec<(PathBuf, Arc<dyn Fs>)> {
724 let primary_fs: Arc<dyn Fs> = self.fs.clone();
725 let mut folders: Vec<(PathBuf, Arc<dyn Fs>)> =
726 vec![(self.path.join(TABLES_FOLDER), primary_fs)];
727
728 if let Some(routes) = &self.level_routes {
729 for route in routes {
730 let folder = route.path.join(TABLES_FOLDER);
731 // Dedup by path: scanning the same directory twice would cause
732 // already-recovered tables to be classified as orphans and
733 // deleted. Routing the same path through different Fs backends
734 // is a configuration error (level_routes validation in
735 // Config::level_routes rejects overlapping ranges).
736 if !folders.iter().any(|(p, _)| *p == folder) {
737 folders.push((folder, route.fs.clone()));
738 }
739 }
740 }
741
742 folders
743 }
744
745 /// Configures per-level filesystem routing for tiered storage.
746 ///
747 /// Each [`LevelRoute`] maps a range of LSM levels to a base directory
748 /// and filesystem backend. Levels not covered by any route fall back to
749 /// the primary `path` and `fs`.
750 ///
751 /// # Reopen contract
752 ///
753 /// The route configuration is **not persisted** in the manifest.
754 /// On reopen, the [`Config`] must specify `level_routes` such that
755 /// [`all_tables_folders`](Self::all_tables_folders) includes every
756 /// directory and filesystem pair that may contain existing SST files
757 /// for this tree.
758 ///
759 /// Changing the mapping from levels to paths is allowed as long as
760 /// the previously used folders remain covered. If old folders are
761 /// omitted, recovery may fail with
762 /// [`RouteMismatch`](crate::Error::RouteMismatch) (when all missing
763 /// tables are on uncovered levels) or
764 /// [`Unrecoverable`](crate::Error::Unrecoverable) (when some missing
765 /// tables are on levels that are still covered).
766 ///
767 /// # Panics
768 ///
769 /// Panics if any route has an empty range or if any two routes have
770 /// overlapping level ranges.
771 #[must_use]
772 pub fn level_routes(mut self, routes: Vec<LevelRoute>) -> Self {
773 // Validate no empty/inverted ranges
774 for route in &routes {
775 assert!(
776 route.levels.start < route.levels.end,
777 "empty or inverted level route range: {:?}",
778 route.levels,
779 );
780 }
781
782 // Validate no overlapping ranges
783 for (i, a) in routes.iter().enumerate() {
784 for b in routes.iter().skip(i + 1) {
785 assert!(
786 a.levels.end <= b.levels.start || b.levels.end <= a.levels.start,
787 "overlapping level routes: {:?} and {:?}",
788 a.levels,
789 b.levels,
790 );
791 }
792 }
793 self.level_routes = if routes.is_empty() {
794 None
795 } else {
796 // Normalize paths the same way Config::new normalizes self.path
797 Some(
798 routes
799 .into_iter()
800 .map(|mut r| {
801 r.path = absolute_path(&r.path);
802 r
803 })
804 .collect(),
805 )
806 };
807 self
808 }
809
810 /// Overrides the sequence number generator.
811 ///
812 /// By default, [`SequenceNumberCounter`] is used. This allows plugging in
813 /// a custom generator (e.g., HLC for distributed databases).
814 #[must_use]
815 pub fn seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self {
816 self.seqno = generator;
817 self
818 }
819
820 /// Overrides the visible sequence number generator.
821 #[must_use]
822 pub fn visible_seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self {
823 self.visible_seqno = generator;
824 self
825 }
826
827 /// Sets the global cache.
828 ///
829 /// You can create a global [`Cache`] and share it between multiple
830 /// trees to cap global cache memory usage.
831 ///
832 /// Defaults to a cache with 16 MiB of capacity *per tree*.
833 #[must_use]
834 pub fn use_cache(mut self, cache: Arc<Cache>) -> Self {
835 self.cache = cache;
836 self
837 }
838
839 /// Sets the file descriptor cache.
840 ///
841 /// Can be shared across trees.
842 #[must_use]
843 pub fn use_descriptor_table(mut self, descriptor_table: Option<Arc<DescriptorTable>>) -> Self {
844 self.descriptor_table = descriptor_table;
845 self
846 }
847
848 /// If `true`, the last level will not build filters, reducing the filter size of a database
849 /// by ~90% typically.
850 ///
851 /// **Enable this only if you know that point reads generally are expected to find a key-value pair.**
852 #[must_use]
853 pub fn expect_point_read_hits(mut self, b: bool) -> Self {
854 self.expect_point_read_hits = b;
855 self
856 }
857
858 /// Sets the partitioning policy for filter blocks.
859 #[must_use]
860 pub fn filter_block_partitioning_policy(mut self, policy: PinningPolicy) -> Self {
861 self.filter_block_partitioning_policy = policy;
862 self
863 }
864
865 /// Sets the partitioning policy for index blocks.
866 #[must_use]
867 pub fn index_block_partitioning_policy(mut self, policy: PinningPolicy) -> Self {
868 self.index_block_partitioning_policy = policy;
869 self
870 }
871
872 /// Sets the pinning policy for filter blocks.
873 #[must_use]
874 pub fn filter_block_pinning_policy(mut self, policy: PinningPolicy) -> Self {
875 self.filter_block_pinning_policy = policy;
876 self
877 }
878
879 /// Sets the pinning policy for index blocks.
880 #[must_use]
881 pub fn index_block_pinning_policy(mut self, policy: PinningPolicy) -> Self {
882 self.index_block_pinning_policy = policy;
883 self
884 }
885
886 /// Sets the restart interval inside data blocks.
887 ///
888 /// A higher restart interval saves space while increasing lookup times
889 /// inside data blocks.
890 ///
891 /// Default = 16
892 ///
893 /// # Panics
894 ///
895 /// Panics if any restart interval in `policy` is zero.
896 #[must_use]
897 pub fn data_block_restart_interval_policy(mut self, policy: RestartIntervalPolicy) -> Self {
898 assert!(
899 policy.iter().all(|interval| *interval > 0),
900 "data block restart interval must be greater than zero",
901 );
902 self.data_block_restart_interval_policy = policy;
903 self
904 }
905
906 /// Sets the restart interval inside index blocks.
907 ///
908 /// A higher restart interval saves space while increasing lookup times
909 /// inside index blocks.
910 ///
911 /// Default = 1
912 ///
913 /// # Panics
914 ///
915 /// Panics if any restart interval in `policy` is zero.
916 #[must_use]
917 pub fn index_block_restart_interval_policy(mut self, policy: RestartIntervalPolicy) -> Self {
918 assert!(
919 policy.iter().all(|interval| *interval > 0),
920 "index block restart interval must be greater than zero",
921 );
922 self.index_block_restart_interval_policy = policy;
923 self
924 }
925
926 /// Sets the filter construction policy.
927 #[must_use]
928 pub fn filter_policy(mut self, policy: FilterPolicy) -> Self {
929 self.filter_policy = policy;
930 self
931 }
932
933 /// Sets the compression method for data blocks.
934 #[must_use]
935 pub fn data_block_compression_policy(mut self, policy: CompressionPolicy) -> Self {
936 self.data_block_compression_policy = policy;
937 self
938 }
939
940 /// Sets the compression method for index blocks.
941 #[must_use]
942 pub fn index_block_compression_policy(mut self, policy: CompressionPolicy) -> Self {
943 self.index_block_compression_policy = policy;
944 self
945 }
946
947 // TODO: level count is fixed to 7 right now
948 // /// Sets the number of levels of the LSM tree (depth of tree).
949 // ///
950 // /// Defaults to 7, like `LevelDB` and `RocksDB`.
951 // ///
952 // /// Cannot be changed once set.
953 // ///
954 // /// # Panics
955 // ///
956 // /// Panics if `n` is 0.
957 // #[must_use]
958 // pub fn level_count(mut self, n: u8) -> Self {
959 // assert!(n > 0);
960
961 // self.level_count = n;
962 // self
963 // }
964
965 /// Sets the data block size policy.
966 #[must_use]
967 pub fn data_block_size_policy(mut self, policy: BlockSizePolicy) -> Self {
968 self.data_block_size_policy = policy;
969 self
970 }
971
972 /// Sets the hash ratio policy for data blocks.
973 ///
974 /// If greater than 0.0, a hash index is embedded into data blocks that can speed up reads
975 /// inside the data block.
976 #[must_use]
977 pub fn data_block_hash_ratio_policy(mut self, policy: HashRatioPolicy) -> Self {
978 self.data_block_hash_ratio_policy = policy;
979 self
980 }
981
982 /// Toggles key-value separation.
983 #[must_use]
984 pub fn with_kv_separation(mut self, opts: Option<KvSeparationOptions>) -> Self {
985 self.kv_separation_opts = opts;
986 self
987 }
988
989 /// Installs a custom compaction filter.
990 #[must_use]
991 pub fn with_compaction_filter_factory(mut self, factory: Option<Arc<dyn Factory>>) -> Self {
992 self.compaction_filter_factory = factory;
993 self
994 }
995
996 /// Sets the prefix extractor for prefix bloom filters.
997 ///
998 /// When configured, bloom filters will index key prefixes returned by
999 /// the extractor. Prefix scans can then skip segments whose bloom
1000 /// filter reports no match for the scan prefix.
1001 #[must_use]
1002 pub fn prefix_extractor(mut self, extractor: Arc<dyn PrefixExtractor>) -> Self {
1003 self.prefix_extractor = Some(extractor);
1004 self
1005 }
1006
1007 /// Installs a merge operator for commutative operations.
1008 ///
1009 /// When set, enables [`crate::AbstractTree::merge`] which stores partial updates
1010 /// (operands) that are lazily combined during reads and compaction.
1011 #[must_use]
1012 pub fn with_merge_operator(mut self, op: Option<Arc<dyn MergeOperator>>) -> Self {
1013 self.merge_operator = op;
1014 self
1015 }
1016
1017 /// Sets a custom user key comparator.
1018 ///
1019 /// When configured, all key ordering (memtable, block index, merge,
1020 /// range scans) uses this comparator instead of the default lexicographic
1021 /// byte ordering.
1022 ///
1023 /// # Important
1024 ///
1025 /// The comparator's [`crate::UserComparator::name`] is persisted when a tree is
1026 /// first created. On subsequent opens the stored name is compared against
1027 /// the supplied comparator's name — a mismatch causes the open to fail
1028 /// with [`Error::ComparatorMismatch`](crate::Error::ComparatorMismatch).
1029 #[must_use]
1030 pub fn comparator(mut self, comparator: SharedComparator) -> Self {
1031 self.comparator = comparator;
1032 self
1033 }
1034
1035 /// Sets the block-level encryption provider for encryption at rest.
1036 ///
1037 /// When set, all blocks written to SST files are encrypted after
1038 /// compression and before checksumming, using the provided
1039 /// [`EncryptionProvider`].
1040 ///
1041 /// The caller is responsible for key management and rotation.
1042 /// See [`crate::Aes256GcmProvider`] (behind the `encryption` feature)
1043 /// for a ready-to-use AES-256-GCM implementation.
1044 ///
1045 /// **Important constraints:**
1046 /// - Encryption state is NOT recorded in SST metadata. Opening an
1047 /// encrypted tree without the correct provider (or vice versa) will
1048 /// cause block validation errors, not silent corruption.
1049 /// - Blob files (KV-separated large values) are NOT covered by
1050 /// block-level encryption. Large values stored via KV separation
1051 /// remain in plaintext on disk.
1052 #[must_use]
1053 pub fn with_encryption(mut self, encryption: Option<Arc<dyn EncryptionProvider>>) -> Self {
1054 self.encryption = encryption;
1055 self
1056 }
1057
1058 /// Sets the pre-trained zstd dictionary for dictionary compression.
1059 ///
1060 /// When set, data blocks using [`CompressionType::ZstdDict`] will be
1061 /// compressed and decompressed with this dictionary. The dictionary
1062 /// should be trained on representative data samples for best results.
1063 ///
1064 /// Create a dictionary with [`ZstdDictionary::new`](crate::ZstdDictionary::new),
1065 /// then use [`CompressionType::zstd_dict`] to create a matching
1066 /// compression type:
1067 ///
1068 /// ```ignore
1069 /// use lsm_tree::{CompressionType, ZstdDictionary};
1070 ///
1071 /// let dict = ZstdDictionary::new(&training_data);
1072 /// let compression = CompressionType::zstd_dict(3, dict.id()).unwrap();
1073 ///
1074 /// config
1075 /// .zstd_dictionary(Some(Arc::new(dict)))
1076 /// .data_block_compression_policy(CompressionPolicy::all(compression));
1077 /// ```
1078 #[cfg(zstd_any)]
1079 #[must_use]
1080 pub fn zstd_dictionary(
1081 mut self,
1082 dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
1083 ) -> Self {
1084 self.zstd_dictionary = dictionary;
1085 self
1086 }
1087}
1088
1089#[cfg(test)]
1090mod builder_tests {
1091 use super::*;
1092 use crate::SequenceNumberCounter;
1093
1094 #[test]
1095 fn restart_interval_policies_can_be_overridden_independently() {
1096 let folder = match tempfile::tempdir() {
1097 Ok(folder) => folder,
1098 Err(err) => panic!("tempdir failed: {err}"),
1099 };
1100 let cfg = Config::new(
1101 folder.path(),
1102 SequenceNumberCounter::default(),
1103 SequenceNumberCounter::default(),
1104 )
1105 .data_block_restart_interval_policy(RestartIntervalPolicy::all(7))
1106 .index_block_restart_interval_policy(RestartIntervalPolicy::all(3));
1107
1108 assert_eq!(cfg.data_block_restart_interval_policy.first(), Some(&7));
1109 assert_eq!(cfg.index_block_restart_interval_policy.first(), Some(&3));
1110 }
1111
1112 #[test]
1113 #[should_panic(expected = "index block restart interval must be greater than zero")]
1114 fn index_restart_interval_policy_rejects_zero_values() {
1115 let folder = match tempfile::tempdir() {
1116 Ok(folder) => folder,
1117 Err(err) => panic!("tempdir failed: {err}"),
1118 };
1119 let _cfg = Config::new(
1120 folder.path(),
1121 SequenceNumberCounter::default(),
1122 SequenceNumberCounter::default(),
1123 )
1124 .index_block_restart_interval_policy(RestartIntervalPolicy::all(0));
1125 }
1126
1127 #[test]
1128 #[should_panic(expected = "data block restart interval must be greater than zero")]
1129 fn data_restart_interval_policy_rejects_zero_values() {
1130 let folder = match tempfile::tempdir() {
1131 Ok(folder) => folder,
1132 Err(err) => panic!("tempdir failed: {err}"),
1133 };
1134 let _cfg = Config::new(
1135 folder.path(),
1136 SequenceNumberCounter::default(),
1137 SequenceNumberCounter::default(),
1138 )
1139 .data_block_restart_interval_policy(RestartIntervalPolicy::all(0));
1140 }
1141
1142 #[test]
1143 #[should_panic(expected = "restart interval policy may not be empty")]
1144 fn index_restart_interval_policy_rejects_empty() {
1145 let folder = match tempfile::tempdir() {
1146 Ok(folder) => folder,
1147 Err(err) => panic!("tempdir failed: {err}"),
1148 };
1149 let _cfg = Config::new(
1150 folder.path(),
1151 SequenceNumberCounter::default(),
1152 SequenceNumberCounter::default(),
1153 )
1154 .index_block_restart_interval_policy(RestartIntervalPolicy::new([]));
1155 }
1156
1157 #[test]
1158 #[should_panic(expected = "restart interval policy may not be empty")]
1159 fn data_restart_interval_policy_rejects_empty() {
1160 let folder = match tempfile::tempdir() {
1161 Ok(folder) => folder,
1162 Err(err) => panic!("tempdir failed: {err}"),
1163 };
1164 let _cfg = Config::new(
1165 folder.path(),
1166 SequenceNumberCounter::default(),
1167 SequenceNumberCounter::default(),
1168 )
1169 .data_block_restart_interval_policy(RestartIntervalPolicy::new([]));
1170 }
1171}