Skip to main content

lsm_tree/config/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod block_size;
6mod compression;
7mod filter;
8mod hash_ratio;
9mod pinning;
10mod restart_interval;
11
12pub use block_size::BlockSizePolicy;
13pub use compression::CompressionPolicy;
14pub use filter::{BloomConstructionPolicy, FilterPolicy, FilterPolicyEntry};
15pub use hash_ratio::HashRatioPolicy;
16pub use pinning::PinningPolicy;
17pub use restart_interval::RestartIntervalPolicy;
18
19/// Partitioning policy for indexes and filters
20pub type PartitioningPolicy = PinningPolicy;
21
22use crate::{
23    AnyTree, BlobTree, Cache, CompressionType, DescriptorTable, SequenceNumberCounter,
24    SharedSequenceNumberGenerator, Tree,
25    compaction::filter::Factory,
26    comparator::{self, SharedComparator},
27    encryption::EncryptionProvider,
28    file::TABLES_FOLDER,
29    fs::{Fs, StdFs},
30    merge_operator::MergeOperator,
31    path::absolute_path,
32    prefix::PrefixExtractor,
33    version::DEFAULT_LEVEL_COUNT,
34};
35use std::{
36    ops::Range,
37    path::{Path, PathBuf},
38    sync::Arc,
39};
40
41/// Per-level filesystem routing entry for tiered storage.
42///
43/// Maps a range of LSM levels to a base directory and filesystem backend.
44/// Tables at these levels are stored under `path/tables/`.
45///
46/// # Example
47///
48/// ```
49/// use lsm_tree::config::LevelRoute;
50/// use lsm_tree::fs::StdFs;
51/// use std::sync::Arc;
52///
53/// // Hot tier: L0-L1 on NVMe
54/// let hot = LevelRoute {
55///     levels: 0..2,
56///     path: "/mnt/nvme/db".into(),
57///     fs: Arc::new(StdFs),
58/// };
59///
60/// // Cold tier: L4-L6 on HDD
61/// let cold = LevelRoute {
62///     levels: 4..7,
63///     path: "/mnt/hdd/db".into(),
64///     fs: Arc::new(StdFs),
65/// };
66/// ```
67#[derive(Clone)]
68pub struct LevelRoute {
69    /// LSM levels this route covers (e.g., `0..2` for L0–L1).
70    pub levels: Range<u8>,
71
72    /// Base data directory for tables at these levels.
73    pub path: PathBuf,
74
75    /// Filesystem backend for I/O at these levels.
76    pub fs: Arc<dyn Fs>,
77}
78
79impl std::fmt::Debug for LevelRoute {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("LevelRoute")
82            .field("levels", &self.levels)
83            .field("path", &self.path)
84            .finish_non_exhaustive()
85    }
86}
87
88/// LSM-tree type
89#[derive(Copy, Clone, Debug, PartialEq, Eq)]
90pub enum TreeType {
91    /// Standard LSM-tree, see [`Tree`]
92    Standard,
93
94    /// Key-value separated LSM-tree, see [`BlobTree`]
95    Blob,
96}
97
98impl From<TreeType> for u8 {
99    fn from(val: TreeType) -> Self {
100        match val {
101            TreeType::Standard => 0,
102            TreeType::Blob => 1,
103        }
104    }
105}
106
107impl TryFrom<u8> for TreeType {
108    type Error = ();
109
110    fn try_from(value: u8) -> Result<Self, Self::Error> {
111        match value {
112            0 => Ok(Self::Standard),
113            1 => Ok(Self::Blob),
114            _ => Err(()),
115        }
116    }
117}
118
119const DEFAULT_FILE_FOLDER: &str = ".lsm.data";
120
121/// Options for key-value separation
122#[derive(Clone, Debug, PartialEq)]
123pub struct KvSeparationOptions {
124    /// What type of compression is used for blobs
125    #[doc(hidden)]
126    pub compression: CompressionType,
127
128    /// Blob file target size in bytes
129    #[doc(hidden)]
130    pub file_target_size: u64,
131
132    /// Key-value separation threshold in bytes
133    #[doc(hidden)]
134    pub separation_threshold: u32,
135
136    #[doc(hidden)]
137    pub staleness_threshold: f32,
138
139    #[doc(hidden)]
140    pub age_cutoff: f32,
141
142    /// Pre-trained zstd dictionary for blob-file dictionary compression.
143    ///
144    /// Required when `compression` is [`CompressionType::ZstdDict`].
145    /// The `dict_id` in the compression type must match [`ZstdDictionary::id`].
146    #[cfg(zstd_any)]
147    #[doc(hidden)]
148    pub zstd_dictionary: Option<std::sync::Arc<crate::compression::ZstdDictionary>>,
149}
150
151impl Default for KvSeparationOptions {
152    fn default() -> Self {
153        Self {
154            #[cfg(feature="lz4")]
155            compression:   CompressionType::Lz4,
156
157            #[cfg(not(feature="lz4"))]
158            compression: CompressionType::None,
159
160            file_target_size: /* 64 MiB */ 64 * 1_024 * 1_024,
161            separation_threshold: /* 1 KiB */ 1_024,
162
163            staleness_threshold: 0.25,
164            age_cutoff: 0.25,
165
166            #[cfg(zstd_any)]
167            zstd_dictionary: None,
168        }
169    }
170}
171
172impl KvSeparationOptions {
173    /// Sets the blob compression method.
174    #[must_use]
175    pub fn compression(mut self, compression: CompressionType) -> Self {
176        self.compression = compression;
177        self
178    }
179
180    /// Sets the target size of blob files.
181    ///
182    /// Smaller blob files allow more granular garbage collection
183    /// which allows lower space amp for lower write I/O cost.
184    ///
185    /// Larger blob files decrease the number of files on disk and maintenance
186    /// overhead.
187    ///
188    /// Defaults to 64 MiB.
189    #[must_use]
190    pub fn file_target_size(mut self, bytes: u64) -> Self {
191        self.file_target_size = bytes;
192        self
193    }
194
195    /// Sets the key-value separation threshold in bytes.
196    ///
197    /// Smaller value will reduce compaction overhead and thus write amplification,
198    /// at the cost of lower read performance.
199    ///
200    /// Defaults to 1 KiB.
201    #[must_use]
202    pub fn separation_threshold(mut self, bytes: u32) -> Self {
203        self.separation_threshold = bytes;
204        self
205    }
206
207    /// Sets the staleness threshold percentage.
208    ///
209    /// The staleness percentage determines how much a blob file needs to be fragmented to be
210    /// picked up by the garbage collection.
211    ///
212    /// Defaults to 33%.
213    #[must_use]
214    pub fn staleness_threshold(mut self, ratio: f32) -> Self {
215        self.staleness_threshold = ratio;
216        self
217    }
218
219    /// Sets the age cutoff threshold.
220    ///
221    /// Defaults to 20%.
222    #[must_use]
223    pub fn age_cutoff(mut self, ratio: f32) -> Self {
224        self.age_cutoff = ratio;
225        self
226    }
227
228    /// Sets the zstd dictionary for blob-file dictionary compression.
229    ///
230    /// Required when [`compression`](Self::compression) is set to
231    /// [`CompressionType::ZstdDict`].  The `dict_id` encoded in the
232    /// compression type must equal [`ZstdDictionary::id()`] of the
233    /// supplied dictionary; [`Config::open`] will return
234    /// [`Error::ZstdDictMismatch`](crate::Error::ZstdDictMismatch) if
235    /// they disagree.
236    #[cfg(zstd_any)]
237    #[must_use]
238    pub fn dict(mut self, dictionary: std::sync::Arc<crate::compression::ZstdDictionary>) -> Self {
239        self.zstd_dictionary = Some(dictionary);
240        self
241    }
242}
243
244/// Tree configuration builder
245pub struct Config {
246    /// Folder path
247    #[doc(hidden)]
248    pub path: PathBuf,
249
250    /// Default filesystem backend for levels without an explicit route.
251    ///
252    /// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an
253    /// alternative backend such as [`MemFs`](crate::fs::MemFs).
254    ///
255    /// Both fresh tree creation and reopening (recovery) are supported
256    /// for any backend that implements [`Fs`].
257    #[doc(hidden)]
258    pub fs: Arc<dyn Fs>,
259
260    /// Per-level filesystem routing for tiered storage.
261    ///
262    /// When set, tables at different LSM levels can be stored on different
263    /// storage devices (e.g., NVMe for L0–L1, SSD for L2–L4, HDD for L5–L6).
264    /// Each entry maps a range of levels to a base directory and filesystem
265    /// backend. Uncovered levels fall back to the primary `path` and `fs`.
266    ///
267    /// Zero additional overhead when `None` — only a single branch check;
268    /// path construction allocations are unchanged.
269    #[doc(hidden)]
270    pub level_routes: Option<Vec<LevelRoute>>,
271
272    /// Block cache to use
273    #[doc(hidden)]
274    pub cache: Arc<Cache>,
275
276    /// Descriptor table to use
277    #[doc(hidden)]
278    pub descriptor_table: Option<Arc<DescriptorTable>>,
279
280    /// Number of levels of the LSM tree (depth of tree)
281    ///
282    /// Once set, the level count is fixed (in the "manifest" file)
283    pub level_count: u8,
284
285    /// What type of compression is used for data blocks
286    pub data_block_compression_policy: CompressionPolicy,
287
288    /// What type of compression is used for index blocks
289    pub index_block_compression_policy: CompressionPolicy,
290
291    /// Restart interval inside data blocks
292    pub data_block_restart_interval_policy: RestartIntervalPolicy,
293
294    /// Restart interval inside index blocks
295    pub index_block_restart_interval_policy: RestartIntervalPolicy,
296
297    /// Block size of data blocks
298    pub data_block_size_policy: BlockSizePolicy,
299
300    /// Whether to pin index blocks
301    pub index_block_pinning_policy: PinningPolicy,
302
303    /// Whether to pin filter blocks
304    pub filter_block_pinning_policy: PinningPolicy,
305
306    /// Whether to pin top level index of partitioned index
307    pub top_level_index_block_pinning_policy: PinningPolicy,
308
309    /// Whether to pin top level index of partitioned filter
310    pub top_level_filter_block_pinning_policy: PinningPolicy,
311
312    /// Data block hash ratio
313    pub data_block_hash_ratio_policy: HashRatioPolicy,
314
315    /// Whether to partition index blocks
316    pub index_block_partitioning_policy: PartitioningPolicy,
317
318    /// Whether to partition filter blocks
319    pub filter_block_partitioning_policy: PartitioningPolicy,
320
321    /// Partition size when using partitioned indexes
322    pub index_block_partition_size_policy: BlockSizePolicy,
323
324    /// Partition size when using partitioned filters
325    pub filter_block_partition_size_policy: BlockSizePolicy,
326
327    /// If `true`, the last level will not build filters, reducing the filter size of a database
328    /// by ~90% typically
329    pub(crate) expect_point_read_hits: bool,
330
331    /// Filter construction policy
332    pub filter_policy: FilterPolicy,
333
334    /// Compaction filter factory
335    pub compaction_filter_factory: Option<Arc<dyn Factory>>,
336
337    /// Prefix extractor for prefix bloom filters.
338    ///
339    /// When set, the bloom filter indexes extracted prefixes in addition to
340    /// full keys, allowing prefix scans to skip segments that contain no
341    /// matching prefixes.
342    pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
343
344    /// Merge operator for commutative operations
345    ///
346    /// When set, enables `merge()` operations that store partial updates
347    /// which are lazily combined during reads and compaction.
348    pub merge_operator: Option<Arc<dyn MergeOperator>>,
349
350    #[doc(hidden)]
351    pub kv_separation_opts: Option<KvSeparationOptions>,
352
353    /// Custom user key comparator.
354    ///
355    /// When set, all key comparisons use this comparator instead of the
356    /// default lexicographic byte ordering. Once a tree is opened with a
357    /// comparator, it must always be re-opened with the same comparator.
358    // Not `pub` — use `Config::comparator()` builder method as the public API.
359    #[doc(hidden)]
360    pub(crate) comparator: SharedComparator,
361
362    /// Block-level encryption provider for encryption at rest.
363    ///
364    /// When set, all blocks (data, index, filter, meta) are encrypted
365    /// using this provider after compression and before checksumming.
366    pub(crate) encryption: Option<Arc<dyn EncryptionProvider>>,
367
368    /// Pre-trained zstd dictionary for dictionary compression.
369    ///
370    /// When set together with a [`CompressionType::ZstdDict`] compression
371    /// policy, data blocks are compressed using this dictionary. The
372    /// dictionary must remain the same for the lifetime of the tree —
373    /// opening a tree with a different dictionary will produce
374    /// [`Error::ZstdDictMismatch`](crate::Error::ZstdDictMismatch) errors.
375    #[cfg(zstd_any)]
376    pub(crate) zstd_dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
377
378    /// The global sequence number generator.
379    ///
380    /// Should be shared between multiple trees of a database.
381    pub(crate) seqno: SharedSequenceNumberGenerator,
382
383    /// Sequence number watermark that is visible to readers.
384    ///
385    /// Used for MVCC snapshots and to control which updates are
386    /// observable in a given view of the database.
387    pub(crate) visible_seqno: SharedSequenceNumberGenerator,
388}
389
390// TODO: remove default?
391impl Default for Config {
392    fn default() -> Self {
393        Self {
394            path: absolute_path(Path::new(DEFAULT_FILE_FOLDER)),
395            fs: Arc::new(StdFs),
396            level_routes: None,
397            descriptor_table: Some(Arc::new(DescriptorTable::new(256))),
398            seqno: SharedSequenceNumberGenerator::from(SequenceNumberCounter::default()),
399            visible_seqno: SharedSequenceNumberGenerator::from(SequenceNumberCounter::default()),
400
401            cache: Arc::new(Cache::with_capacity_bytes(
402                /* 16 MiB */ 16 * 1_024 * 1_024,
403            )),
404
405            data_block_restart_interval_policy: RestartIntervalPolicy::all(16),
406            index_block_restart_interval_policy: RestartIntervalPolicy::all(1),
407
408            level_count: DEFAULT_LEVEL_COUNT,
409
410            data_block_size_policy: BlockSizePolicy::all(4_096),
411
412            index_block_pinning_policy: PinningPolicy::new([true, true, false]),
413            filter_block_pinning_policy: PinningPolicy::new([true, false]),
414
415            top_level_index_block_pinning_policy: PinningPolicy::all(true), // TODO: implement
416            top_level_filter_block_pinning_policy: PinningPolicy::all(true), // TODO: implement
417
418            index_block_partitioning_policy: PinningPolicy::new([false, false, false, true]),
419            filter_block_partitioning_policy: PinningPolicy::new([false, false, false, true]),
420
421            index_block_partition_size_policy: BlockSizePolicy::all(4_096), // TODO: implement
422            filter_block_partition_size_policy: BlockSizePolicy::all(4_096), // TODO: implement
423
424            data_block_compression_policy: ({
425                #[cfg(feature = "lz4")]
426                let c = CompressionPolicy::new([CompressionType::None, CompressionType::Lz4]);
427
428                #[cfg(not(feature = "lz4"))]
429                let c = CompressionPolicy::new([CompressionType::None]);
430
431                c
432            }),
433            index_block_compression_policy: CompressionPolicy::all(CompressionType::None),
434
435            data_block_hash_ratio_policy: HashRatioPolicy::all(0.0),
436
437            filter_policy: FilterPolicy::all(FilterPolicyEntry::Bloom(
438                BloomConstructionPolicy::BitsPerKey(10.0),
439            )),
440
441            compaction_filter_factory: None,
442            merge_operator: None,
443
444            prefix_extractor: None,
445
446            expect_point_read_hits: false,
447
448            kv_separation_opts: None,
449
450            #[cfg(zstd_any)]
451            zstd_dictionary: None,
452
453            comparator: comparator::default_comparator(),
454            encryption: None,
455        }
456    }
457}
458
459impl Config {
460    /// Initializes a new config
461    pub fn new<P: AsRef<Path>>(
462        path: P,
463        seqno: SequenceNumberCounter,
464        visible_seqno: SequenceNumberCounter,
465    ) -> Self {
466        Self {
467            path: absolute_path(path.as_ref()),
468            seqno: Arc::new(seqno),
469            visible_seqno: Arc::new(visible_seqno),
470            ..Default::default()
471        }
472    }
473
474    /// Sets the default filesystem backend used for levels without an explicit route.
475    ///
476    /// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for
477    /// in-memory trees (testing, ephemeral indexes).
478    ///
479    /// # Example
480    ///
481    /// ```
482    /// # fn main() -> lsm_tree::Result<()> {
483    /// use lsm_tree::{Config, SequenceNumberCounter};
484    /// use lsm_tree::fs::MemFs;
485    ///
486    /// let tree = Config::new(
487    ///     "/virtual/tree",
488    ///     SequenceNumberCounter::default(),
489    ///     SequenceNumberCounter::default(),
490    /// )
491    /// .with_fs(MemFs::new())
492    /// .open()?;
493    /// # Ok(())
494    /// # }
495    /// ```
496    #[must_use]
497    pub fn with_fs<F: Fs>(mut self, fs: F) -> Self {
498        self.fs = Arc::new(fs);
499        self
500    }
501
502    /// Sets the default filesystem backend from an existing shared handle.
503    ///
504    /// Useful when multiple configs should reuse the same backend
505    /// instance, including trait objects and backends that are not `Clone`.
506    ///
507    #[must_use]
508    pub fn with_shared_fs(mut self, fs: Arc<dyn Fs>) -> Self {
509        self.fs = fs;
510        self
511    }
512
513    /// Opens a tree using the config.
514    ///
515    /// # Errors
516    ///
517    /// Will return `Err` if an IO error occurs.
518    /// Returns [`Error::ZstdDictMismatch`](crate::Error::ZstdDictMismatch) if
519    /// the compression policy references a `dict_id` that doesn't match the
520    /// configured dictionary.
521    pub fn open(self) -> crate::Result<AnyTree> {
522        #[cfg(zstd_any)]
523        self.validate_zstd_dictionary()?;
524
525        Ok(if self.kv_separation_opts.is_some() {
526            AnyTree::Blob(BlobTree::open(self)?)
527        } else {
528            AnyTree::Standard(Tree::open(self)?)
529        })
530    }
531
532    /// Validates that every `ZstdDict` entry in compression policies references
533    /// a `dict_id` that matches the configured dictionary. Catches mismatches
534    /// at open time rather than at first block write/read.
535    #[cfg(zstd_any)]
536    fn validate_zstd_dictionary(&self) -> crate::Result<()> {
537        let dict_id = self.zstd_dictionary.as_ref().map(|d| d.id());
538
539        // NOTE: Only data block policies are validated. Index blocks never
540        // carry a dictionary — Writer::use_index_block_compression() downgrades
541        // ZstdDict to plain Zstd. Validating index policies here would reject
542        // configs that use ZstdDict solely for index blocks even though the
543        // writer handles them correctly.
544        for ct in self.data_block_compression_policy.iter() {
545            if let &CompressionType::ZstdDict {
546                dict_id: required, ..
547            } = ct
548            {
549                match dict_id {
550                    None => {
551                        return Err(crate::Error::ZstdDictMismatch {
552                            expected: required,
553                            got: None,
554                        });
555                    }
556                    Some(actual) if actual != required => {
557                        return Err(crate::Error::ZstdDictMismatch {
558                            expected: required,
559                            got: Some(actual),
560                        });
561                    }
562                    _ => {}
563                }
564            }
565        }
566
567        // Blob files with ZstdDict compression must have a matching dictionary.
568        if let Some(ref kv_opts) = self.kv_separation_opts
569            && let CompressionType::ZstdDict {
570                dict_id: required, ..
571            } = kv_opts.compression
572        {
573            match kv_opts.zstd_dictionary.as_ref().map(|d| d.id()) {
574                None => {
575                    return Err(crate::Error::ZstdDictMismatch {
576                        expected: required,
577                        got: None,
578                    });
579                }
580                Some(actual) if actual != required => {
581                    return Err(crate::Error::ZstdDictMismatch {
582                        expected: required,
583                        got: Some(actual),
584                    });
585                }
586                _ => {}
587            }
588        }
589
590        Ok(())
591    }
592
593    /// Like [`Config::new`], but accepts pre-built shared generators.
594    ///
595    /// This is useful when the caller already has
596    /// [`SharedSequenceNumberGenerator`] instances (e.g., from a higher-level
597    /// database that shares generators across multiple trees).
598    pub fn new_with_generators<P: AsRef<Path>>(
599        path: P,
600        seqno: SharedSequenceNumberGenerator,
601        visible_seqno: SharedSequenceNumberGenerator,
602    ) -> Self {
603        Self {
604            path: absolute_path(path.as_ref()),
605            seqno,
606            visible_seqno,
607            ..Default::default()
608        }
609    }
610}
611
612#[cfg(all(test, zstd_any))]
613mod tests {
614    use super::*;
615    use crate::{CompressionType, SequenceNumberCounter, compression::ZstdDictionary};
616    use std::sync::Arc;
617
618    #[test]
619    fn blob_zstd_dict_no_dict_is_rejected() {
620        // ZstdDict compression for blobs without providing a dictionary must fail.
621        let folder = tempfile::tempdir().unwrap_or_else(|err| panic!("tempdir failed: {err}"));
622        let cfg = Config::new(
623            folder.path(),
624            SequenceNumberCounter::default(),
625            SequenceNumberCounter::default(),
626        )
627        .with_kv_separation(Some(KvSeparationOptions::default().compression(
628            CompressionType::ZstdDict {
629                level: 3,
630                dict_id: 7,
631            },
632        )));
633
634        assert!(
635            matches!(
636                cfg.validate_zstd_dictionary(),
637                Err(crate::Error::ZstdDictMismatch {
638                    expected: 7,
639                    got: None
640                })
641            ),
642            "expected ZstdDictMismatch when no dictionary is supplied",
643        );
644    }
645
646    #[test]
647    fn blob_zstd_dict_id_mismatch_is_rejected() {
648        // ZstdDict compression with a dictionary whose id doesn't match the
649        // compression type's dict_id must fail.
650        let folder = tempfile::tempdir().unwrap_or_else(|err| panic!("tempdir failed: {err}"));
651        let dict = Arc::new(ZstdDictionary::new(b"sample training data for test"));
652        let wrong_dict_id = dict.id().wrapping_add(1);
653        let cfg = Config::new(
654            folder.path(),
655            SequenceNumberCounter::default(),
656            SequenceNumberCounter::default(),
657        )
658        .with_kv_separation(Some(
659            KvSeparationOptions::default()
660                .compression(CompressionType::ZstdDict {
661                    level: 3,
662                    dict_id: wrong_dict_id,
663                })
664                .dict(Arc::clone(&dict)),
665        ));
666
667        assert!(
668            matches!(
669                cfg.validate_zstd_dictionary(),
670                Err(crate::Error::ZstdDictMismatch { .. })
671            ),
672            "expected ZstdDictMismatch when dict_id doesn't match dictionary",
673        );
674    }
675
676    #[test]
677    fn blob_zstd_dict_matching_dict_is_accepted() {
678        // ZstdDict compression with a correctly matching dictionary must succeed.
679        let folder = tempfile::tempdir().unwrap_or_else(|err| panic!("tempdir failed: {err}"));
680        let dict = Arc::new(ZstdDictionary::new(b"sample training data for test"));
681        let cfg = Config::new(
682            folder.path(),
683            SequenceNumberCounter::default(),
684            SequenceNumberCounter::default(),
685        )
686        .with_kv_separation(Some(
687            KvSeparationOptions::default()
688                .compression(CompressionType::ZstdDict {
689                    level: 3,
690                    dict_id: dict.id(),
691                })
692                .dict(Arc::clone(&dict)),
693        ));
694
695        assert!(
696            cfg.validate_zstd_dictionary().is_ok(),
697            "matching dictionary must be accepted",
698        );
699    }
700}
701
702impl Config {
703    /// Returns the tables folder path and [`Fs`] backend for the given level.
704    ///
705    /// If [`level_routes`](Self::level_routes) has an entry covering this
706    /// level, uses that entry's path and `Fs`. Otherwise falls back to the
707    /// primary [`path`](Self::path) and [`fs`](Self::fs).
708    #[must_use]
709    pub fn tables_folder_for_level(&self, level: u8) -> (PathBuf, Arc<dyn Fs>) {
710        if let Some(routes) = &self.level_routes {
711            for route in routes {
712                if route.levels.contains(&level) {
713                    return (route.path.join(TABLES_FOLDER), route.fs.clone());
714                }
715            }
716        }
717        (self.path.join(TABLES_FOLDER), self.fs.clone())
718    }
719
720    /// Returns all unique tables folders that need to be scanned during
721    /// recovery: the primary folder plus every [`LevelRoute`] folder.
722    #[must_use]
723    pub fn all_tables_folders(&self) -> Vec<(PathBuf, Arc<dyn Fs>)> {
724        let primary_fs: Arc<dyn Fs> = self.fs.clone();
725        let mut folders: Vec<(PathBuf, Arc<dyn Fs>)> =
726            vec![(self.path.join(TABLES_FOLDER), primary_fs)];
727
728        if let Some(routes) = &self.level_routes {
729            for route in routes {
730                let folder = route.path.join(TABLES_FOLDER);
731                // Dedup by path: scanning the same directory twice would cause
732                // already-recovered tables to be classified as orphans and
733                // deleted. Routing the same path through different Fs backends
734                // is a configuration error (level_routes validation in
735                // Config::level_routes rejects overlapping ranges).
736                if !folders.iter().any(|(p, _)| *p == folder) {
737                    folders.push((folder, route.fs.clone()));
738                }
739            }
740        }
741
742        folders
743    }
744
745    /// Configures per-level filesystem routing for tiered storage.
746    ///
747    /// Each [`LevelRoute`] maps a range of LSM levels to a base directory
748    /// and filesystem backend. Levels not covered by any route fall back to
749    /// the primary `path` and `fs`.
750    ///
751    /// # Reopen contract
752    ///
753    /// The route configuration is **not persisted** in the manifest.
754    /// On reopen, the [`Config`] must specify `level_routes` such that
755    /// [`all_tables_folders`](Self::all_tables_folders) includes every
756    /// directory and filesystem pair that may contain existing SST files
757    /// for this tree.
758    ///
759    /// Changing the mapping from levels to paths is allowed as long as
760    /// the previously used folders remain covered. If old folders are
761    /// omitted, recovery may fail with
762    /// [`RouteMismatch`](crate::Error::RouteMismatch) (when all missing
763    /// tables are on uncovered levels) or
764    /// [`Unrecoverable`](crate::Error::Unrecoverable) (when some missing
765    /// tables are on levels that are still covered).
766    ///
767    /// # Panics
768    ///
769    /// Panics if any route has an empty range or if any two routes have
770    /// overlapping level ranges.
771    #[must_use]
772    pub fn level_routes(mut self, routes: Vec<LevelRoute>) -> Self {
773        // Validate no empty/inverted ranges
774        for route in &routes {
775            assert!(
776                route.levels.start < route.levels.end,
777                "empty or inverted level route range: {:?}",
778                route.levels,
779            );
780        }
781
782        // Validate no overlapping ranges
783        for (i, a) in routes.iter().enumerate() {
784            for b in routes.iter().skip(i + 1) {
785                assert!(
786                    a.levels.end <= b.levels.start || b.levels.end <= a.levels.start,
787                    "overlapping level routes: {:?} and {:?}",
788                    a.levels,
789                    b.levels,
790                );
791            }
792        }
793        self.level_routes = if routes.is_empty() {
794            None
795        } else {
796            // Normalize paths the same way Config::new normalizes self.path
797            Some(
798                routes
799                    .into_iter()
800                    .map(|mut r| {
801                        r.path = absolute_path(&r.path);
802                        r
803                    })
804                    .collect(),
805            )
806        };
807        self
808    }
809
810    /// Overrides the sequence number generator.
811    ///
812    /// By default, [`SequenceNumberCounter`] is used. This allows plugging in
813    /// a custom generator (e.g., HLC for distributed databases).
814    #[must_use]
815    pub fn seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self {
816        self.seqno = generator;
817        self
818    }
819
820    /// Overrides the visible sequence number generator.
821    #[must_use]
822    pub fn visible_seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self {
823        self.visible_seqno = generator;
824        self
825    }
826
827    /// Sets the global cache.
828    ///
829    /// You can create a global [`Cache`] and share it between multiple
830    /// trees to cap global cache memory usage.
831    ///
832    /// Defaults to a cache with 16 MiB of capacity *per tree*.
833    #[must_use]
834    pub fn use_cache(mut self, cache: Arc<Cache>) -> Self {
835        self.cache = cache;
836        self
837    }
838
839    /// Sets the file descriptor cache.
840    ///
841    /// Can be shared across trees.
842    #[must_use]
843    pub fn use_descriptor_table(mut self, descriptor_table: Option<Arc<DescriptorTable>>) -> Self {
844        self.descriptor_table = descriptor_table;
845        self
846    }
847
848    /// If `true`, the last level will not build filters, reducing the filter size of a database
849    /// by ~90% typically.
850    ///
851    /// **Enable this only if you know that point reads generally are expected to find a key-value pair.**
852    #[must_use]
853    pub fn expect_point_read_hits(mut self, b: bool) -> Self {
854        self.expect_point_read_hits = b;
855        self
856    }
857
858    /// Sets the partitioning policy for filter blocks.
859    #[must_use]
860    pub fn filter_block_partitioning_policy(mut self, policy: PinningPolicy) -> Self {
861        self.filter_block_partitioning_policy = policy;
862        self
863    }
864
865    /// Sets the partitioning policy for index blocks.
866    #[must_use]
867    pub fn index_block_partitioning_policy(mut self, policy: PinningPolicy) -> Self {
868        self.index_block_partitioning_policy = policy;
869        self
870    }
871
872    /// Sets the pinning policy for filter blocks.
873    #[must_use]
874    pub fn filter_block_pinning_policy(mut self, policy: PinningPolicy) -> Self {
875        self.filter_block_pinning_policy = policy;
876        self
877    }
878
879    /// Sets the pinning policy for index blocks.
880    #[must_use]
881    pub fn index_block_pinning_policy(mut self, policy: PinningPolicy) -> Self {
882        self.index_block_pinning_policy = policy;
883        self
884    }
885
886    /// Sets the restart interval inside data blocks.
887    ///
888    /// A higher restart interval saves space while increasing lookup times
889    /// inside data blocks.
890    ///
891    /// Default = 16
892    ///
893    /// # Panics
894    ///
895    /// Panics if any restart interval in `policy` is zero.
896    #[must_use]
897    pub fn data_block_restart_interval_policy(mut self, policy: RestartIntervalPolicy) -> Self {
898        assert!(
899            policy.iter().all(|interval| *interval > 0),
900            "data block restart interval must be greater than zero",
901        );
902        self.data_block_restart_interval_policy = policy;
903        self
904    }
905
906    /// Sets the restart interval inside index blocks.
907    ///
908    /// A higher restart interval saves space while increasing lookup times
909    /// inside index blocks.
910    ///
911    /// Default = 1
912    ///
913    /// # Panics
914    ///
915    /// Panics if any restart interval in `policy` is zero.
916    #[must_use]
917    pub fn index_block_restart_interval_policy(mut self, policy: RestartIntervalPolicy) -> Self {
918        assert!(
919            policy.iter().all(|interval| *interval > 0),
920            "index block restart interval must be greater than zero",
921        );
922        self.index_block_restart_interval_policy = policy;
923        self
924    }
925
926    /// Sets the filter construction policy.
927    #[must_use]
928    pub fn filter_policy(mut self, policy: FilterPolicy) -> Self {
929        self.filter_policy = policy;
930        self
931    }
932
933    /// Sets the compression method for data blocks.
934    #[must_use]
935    pub fn data_block_compression_policy(mut self, policy: CompressionPolicy) -> Self {
936        self.data_block_compression_policy = policy;
937        self
938    }
939
940    /// Sets the compression method for index blocks.
941    #[must_use]
942    pub fn index_block_compression_policy(mut self, policy: CompressionPolicy) -> Self {
943        self.index_block_compression_policy = policy;
944        self
945    }
946
947    // TODO: level count is fixed to 7 right now
948    // /// Sets the number of levels of the LSM tree (depth of tree).
949    // ///
950    // /// Defaults to 7, like `LevelDB` and `RocksDB`.
951    // ///
952    // /// Cannot be changed once set.
953    // ///
954    // /// # Panics
955    // ///
956    // /// Panics if `n` is 0.
957    // #[must_use]
958    // pub fn level_count(mut self, n: u8) -> Self {
959    //     assert!(n > 0);
960
961    //     self.level_count = n;
962    //     self
963    // }
964
965    /// Sets the data block size policy.
966    #[must_use]
967    pub fn data_block_size_policy(mut self, policy: BlockSizePolicy) -> Self {
968        self.data_block_size_policy = policy;
969        self
970    }
971
972    /// Sets the hash ratio policy for data blocks.
973    ///
974    /// If greater than 0.0, a hash index is embedded into data blocks that can speed up reads
975    /// inside the data block.
976    #[must_use]
977    pub fn data_block_hash_ratio_policy(mut self, policy: HashRatioPolicy) -> Self {
978        self.data_block_hash_ratio_policy = policy;
979        self
980    }
981
982    /// Toggles key-value separation.
983    #[must_use]
984    pub fn with_kv_separation(mut self, opts: Option<KvSeparationOptions>) -> Self {
985        self.kv_separation_opts = opts;
986        self
987    }
988
989    /// Installs a custom compaction filter.
990    #[must_use]
991    pub fn with_compaction_filter_factory(mut self, factory: Option<Arc<dyn Factory>>) -> Self {
992        self.compaction_filter_factory = factory;
993        self
994    }
995
996    /// Sets the prefix extractor for prefix bloom filters.
997    ///
998    /// When configured, bloom filters will index key prefixes returned by
999    /// the extractor. Prefix scans can then skip segments whose bloom
1000    /// filter reports no match for the scan prefix.
1001    #[must_use]
1002    pub fn prefix_extractor(mut self, extractor: Arc<dyn PrefixExtractor>) -> Self {
1003        self.prefix_extractor = Some(extractor);
1004        self
1005    }
1006
1007    /// Installs a merge operator for commutative operations.
1008    ///
1009    /// When set, enables [`crate::AbstractTree::merge`] which stores partial updates
1010    /// (operands) that are lazily combined during reads and compaction.
1011    #[must_use]
1012    pub fn with_merge_operator(mut self, op: Option<Arc<dyn MergeOperator>>) -> Self {
1013        self.merge_operator = op;
1014        self
1015    }
1016
1017    /// Sets a custom user key comparator.
1018    ///
1019    /// When configured, all key ordering (memtable, block index, merge,
1020    /// range scans) uses this comparator instead of the default lexicographic
1021    /// byte ordering.
1022    ///
1023    /// # Important
1024    ///
1025    /// The comparator's [`crate::UserComparator::name`] is persisted when a tree is
1026    /// first created. On subsequent opens the stored name is compared against
1027    /// the supplied comparator's name — a mismatch causes the open to fail
1028    /// with [`Error::ComparatorMismatch`](crate::Error::ComparatorMismatch).
1029    #[must_use]
1030    pub fn comparator(mut self, comparator: SharedComparator) -> Self {
1031        self.comparator = comparator;
1032        self
1033    }
1034
1035    /// Sets the block-level encryption provider for encryption at rest.
1036    ///
1037    /// When set, all blocks written to SST files are encrypted after
1038    /// compression and before checksumming, using the provided
1039    /// [`EncryptionProvider`].
1040    ///
1041    /// The caller is responsible for key management and rotation.
1042    /// See [`crate::Aes256GcmProvider`] (behind the `encryption` feature)
1043    /// for a ready-to-use AES-256-GCM implementation.
1044    ///
1045    /// **Important constraints:**
1046    /// - Encryption state is NOT recorded in SST metadata. Opening an
1047    ///   encrypted tree without the correct provider (or vice versa) will
1048    ///   cause block validation errors, not silent corruption.
1049    /// - Blob files (KV-separated large values) are NOT covered by
1050    ///   block-level encryption. Large values stored via KV separation
1051    ///   remain in plaintext on disk.
1052    #[must_use]
1053    pub fn with_encryption(mut self, encryption: Option<Arc<dyn EncryptionProvider>>) -> Self {
1054        self.encryption = encryption;
1055        self
1056    }
1057
1058    /// Sets the pre-trained zstd dictionary for dictionary compression.
1059    ///
1060    /// When set, data blocks using [`CompressionType::ZstdDict`] will be
1061    /// compressed and decompressed with this dictionary. The dictionary
1062    /// should be trained on representative data samples for best results.
1063    ///
1064    /// Create a dictionary with [`ZstdDictionary::new`](crate::ZstdDictionary::new),
1065    /// then use [`CompressionType::zstd_dict`] to create a matching
1066    /// compression type:
1067    ///
1068    /// ```ignore
1069    /// use lsm_tree::{CompressionType, ZstdDictionary};
1070    ///
1071    /// let dict = ZstdDictionary::new(&training_data);
1072    /// let compression = CompressionType::zstd_dict(3, dict.id()).unwrap();
1073    ///
1074    /// config
1075    ///     .zstd_dictionary(Some(Arc::new(dict)))
1076    ///     .data_block_compression_policy(CompressionPolicy::all(compression));
1077    /// ```
1078    #[cfg(zstd_any)]
1079    #[must_use]
1080    pub fn zstd_dictionary(
1081        mut self,
1082        dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
1083    ) -> Self {
1084        self.zstd_dictionary = dictionary;
1085        self
1086    }
1087}
1088
1089#[cfg(test)]
1090mod builder_tests {
1091    use super::*;
1092    use crate::SequenceNumberCounter;
1093
1094    #[test]
1095    fn restart_interval_policies_can_be_overridden_independently() {
1096        let folder = match tempfile::tempdir() {
1097            Ok(folder) => folder,
1098            Err(err) => panic!("tempdir failed: {err}"),
1099        };
1100        let cfg = Config::new(
1101            folder.path(),
1102            SequenceNumberCounter::default(),
1103            SequenceNumberCounter::default(),
1104        )
1105        .data_block_restart_interval_policy(RestartIntervalPolicy::all(7))
1106        .index_block_restart_interval_policy(RestartIntervalPolicy::all(3));
1107
1108        assert_eq!(cfg.data_block_restart_interval_policy.first(), Some(&7));
1109        assert_eq!(cfg.index_block_restart_interval_policy.first(), Some(&3));
1110    }
1111
1112    #[test]
1113    #[should_panic(expected = "index block restart interval must be greater than zero")]
1114    fn index_restart_interval_policy_rejects_zero_values() {
1115        let folder = match tempfile::tempdir() {
1116            Ok(folder) => folder,
1117            Err(err) => panic!("tempdir failed: {err}"),
1118        };
1119        let _cfg = Config::new(
1120            folder.path(),
1121            SequenceNumberCounter::default(),
1122            SequenceNumberCounter::default(),
1123        )
1124        .index_block_restart_interval_policy(RestartIntervalPolicy::all(0));
1125    }
1126
1127    #[test]
1128    #[should_panic(expected = "data block restart interval must be greater than zero")]
1129    fn data_restart_interval_policy_rejects_zero_values() {
1130        let folder = match tempfile::tempdir() {
1131            Ok(folder) => folder,
1132            Err(err) => panic!("tempdir failed: {err}"),
1133        };
1134        let _cfg = Config::new(
1135            folder.path(),
1136            SequenceNumberCounter::default(),
1137            SequenceNumberCounter::default(),
1138        )
1139        .data_block_restart_interval_policy(RestartIntervalPolicy::all(0));
1140    }
1141
1142    #[test]
1143    #[should_panic(expected = "restart interval policy may not be empty")]
1144    fn index_restart_interval_policy_rejects_empty() {
1145        let folder = match tempfile::tempdir() {
1146            Ok(folder) => folder,
1147            Err(err) => panic!("tempdir failed: {err}"),
1148        };
1149        let _cfg = Config::new(
1150            folder.path(),
1151            SequenceNumberCounter::default(),
1152            SequenceNumberCounter::default(),
1153        )
1154        .index_block_restart_interval_policy(RestartIntervalPolicy::new([]));
1155    }
1156
1157    #[test]
1158    #[should_panic(expected = "restart interval policy may not be empty")]
1159    fn data_restart_interval_policy_rejects_empty() {
1160        let folder = match tempfile::tempdir() {
1161            Ok(folder) => folder,
1162            Err(err) => panic!("tempdir failed: {err}"),
1163        };
1164        let _cfg = Config::new(
1165            folder.path(),
1166            SequenceNumberCounter::default(),
1167            SequenceNumberCounter::default(),
1168        )
1169        .data_block_restart_interval_policy(RestartIntervalPolicy::new([]));
1170    }
1171}