mod block_size;
mod compression;
mod delete_strategy;
mod filter;
mod hash_ratio;
mod locator;
mod pinning;
mod restart_interval;
pub use block_size::BlockSizePolicy;
pub use compression::CompressionPolicy;
pub use delete_strategy::{DeleteStrategy, DeleteStrategyPolicy};
pub use filter::{BloomConstructionPolicy, FilterPolicy, FilterPolicyEntry};
pub use hash_ratio::HashRatioPolicy;
pub use locator::{LocatorPolicy, LocatorPolicyEntry, LocatorPrecision};
pub use pinning::PinningPolicy;
pub use restart_interval::RestartIntervalPolicy;
pub type PartitioningPolicy = PinningPolicy;
#[cfg(feature = "std")]
use crate::fs::StdFs;
use crate::path::PathBuf;
use crate::{
AnyTree, BlobTree, Cache, CompressionType, DescriptorTable, SharedSequenceNumberGenerator,
Tree,
compaction::filter::Factory,
comparator::SharedComparator,
encryption::EncryptionProvider,
file::TABLES_FOLDER,
fs::{Fs, SyncMode},
merge_operator::MergeOperator,
path::absolute_path,
prefix::PrefixExtractor,
};
#[cfg(feature = "std")]
use crate::{SequenceNumberCounter, comparator, path::Path, version::DEFAULT_LEVEL_COUNT};
use alloc::sync::Arc;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
use core::ops::Range;
#[derive(Clone)]
pub struct LevelRoute {
pub levels: Range<u8>,
pub path: PathBuf,
pub fs: Arc<dyn Fs>,
}
impl core::fmt::Debug for LevelRoute {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("LevelRoute")
.field("levels", &self.levels)
.field("path", &self.path)
.finish_non_exhaustive()
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
pub enum ManifestRecoveryMode {
#[default]
AbsoluteConsistency,
TolerateCorruptedTailRecords,
PointInTimeRecovery,
SkipAnyCorruptedRecords,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum TreeType {
Standard,
Blob,
}
impl From<TreeType> for u8 {
fn from(val: TreeType) -> Self {
match val {
TreeType::Standard => 0,
TreeType::Blob => 1,
}
}
}
impl TryFrom<u8> for TreeType {
type Error = ();
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Self::Standard),
1 => Ok(Self::Blob),
_ => Err(()),
}
}
}
#[cfg_attr(
not(feature = "std"),
allow(
dead_code,
reason = "default data-folder path used only on the std-gated default-config path"
)
)]
const DEFAULT_FILE_FOLDER: &str = ".lsm.data";
#[derive(Clone, Debug, PartialEq)]
pub struct KvSeparationOptions {
#[doc(hidden)]
pub compression: CompressionType,
#[doc(hidden)]
pub file_target_size: u64,
#[doc(hidden)]
pub separation_threshold: u32,
#[doc(hidden)]
pub staleness_threshold: f32,
#[doc(hidden)]
pub age_cutoff: f32,
#[cfg(zstd_any)]
#[doc(hidden)]
pub zstd_dictionary: Option<alloc::sync::Arc<crate::compression::ZstdDictionary>>,
}
impl Default for KvSeparationOptions {
fn default() -> Self {
Self {
#[cfg(feature="lz4")]
compression: CompressionType::Lz4,
#[cfg(not(feature="lz4"))]
compression: CompressionType::None,
file_target_size: 64 * 1_024 * 1_024,
separation_threshold: 1_024,
staleness_threshold: 0.25,
age_cutoff: 0.25,
#[cfg(zstd_any)]
zstd_dictionary: None,
}
}
}
impl KvSeparationOptions {
#[must_use]
pub fn compression(mut self, compression: CompressionType) -> Self {
self.compression = compression;
self
}
#[must_use]
pub fn file_target_size(mut self, bytes: u64) -> Self {
self.file_target_size = bytes;
self
}
#[must_use]
pub fn separation_threshold(mut self, bytes: u32) -> Self {
self.separation_threshold = bytes;
self
}
#[must_use]
pub fn staleness_threshold(mut self, ratio: f32) -> Self {
self.staleness_threshold = ratio;
self
}
#[must_use]
pub fn age_cutoff(mut self, ratio: f32) -> Self {
self.age_cutoff = ratio;
self
}
#[cfg(zstd_any)]
#[must_use]
pub fn dict(
mut self,
dictionary: alloc::sync::Arc<crate::compression::ZstdDictionary>,
) -> Self {
self.zstd_dictionary = Some(dictionary);
self
}
}
pub struct Config {
#[doc(hidden)]
pub path: PathBuf,
#[doc(hidden)]
pub fs: Arc<dyn Fs>,
#[doc(hidden)]
pub level_routes: Option<Vec<LevelRoute>>,
#[doc(hidden)]
pub cache: Arc<Cache>,
#[doc(hidden)]
pub descriptor_table: Option<Arc<DescriptorTable>>,
pub level_count: u8,
pub data_block_compression_policy: CompressionPolicy,
pub index_block_compression_policy: CompressionPolicy,
pub data_block_restart_interval_policy: RestartIntervalPolicy,
pub index_block_restart_interval_policy: RestartIntervalPolicy,
pub data_block_size_policy: BlockSizePolicy,
pub index_block_pinning_policy: PinningPolicy,
pub filter_block_pinning_policy: PinningPolicy,
pub top_level_index_block_pinning_policy: PinningPolicy,
pub top_level_filter_block_pinning_policy: PinningPolicy,
pub data_block_hash_ratio_policy: HashRatioPolicy,
pub index_block_partitioning_policy: PartitioningPolicy,
pub filter_block_partitioning_policy: PartitioningPolicy,
pub index_block_partition_size_policy: BlockSizePolicy,
pub filter_block_partition_size_policy: BlockSizePolicy,
pub(crate) expect_point_read_hits: bool,
pub(crate) page_ecc: bool,
#[expect(
clippy::struct_field_names,
reason = "name mirrors the type for grep-ability across the persist + Tree handle init wiring"
)]
pub(crate) initial_runtime_config: crate::runtime_config::RuntimeConfig,
pub filter_policy: FilterPolicy,
pub locator_policy: LocatorPolicy,
pub compaction_filter_factory: Option<Arc<dyn Factory>>,
pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
pub merge_operator: Option<Arc<dyn MergeOperator>>,
#[doc(hidden)]
pub kv_separation_opts: Option<KvSeparationOptions>,
#[doc(hidden)]
pub(crate) comparator: SharedComparator,
pub(crate) encryption: Option<Arc<dyn EncryptionProvider>>,
pub(crate) manifest_recovery_mode: ManifestRecoveryMode,
pub(crate) sync_mode: SyncMode,
pub(crate) directory_lock: bool,
pub(crate) manifest_log_rotate_bytes: u64,
pub(crate) compaction_rate_limit: u64,
#[cfg(feature = "std")] pub(crate) compaction_threads: usize,
#[cfg(feature = "std")]
pub(crate) compaction_pool: Option<Arc<dyn crate::table::writer::CompactionSpawner>>,
#[cfg(feature = "std")]
pub(crate) subcompaction_min_bytes: u64,
#[cfg(all(test, feature = "std"))]
pub(crate) fail_one_subcompaction: Arc<core::sync::atomic::AtomicBool>,
#[cfg(all(test, feature = "std"))]
pub(crate) fail_tight_after_first_slice: Arc<core::sync::atomic::AtomicBool>,
#[cfg(zstd_any)]
pub(crate) zstd_dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
pub(crate) seqno: SharedSequenceNumberGenerator,
pub(crate) visible_seqno: SharedSequenceNumberGenerator,
}
#[cfg(feature = "std")]
impl Default for Config {
fn default() -> Self {
Self {
path: absolute_path(Path::new(DEFAULT_FILE_FOLDER)),
fs: Arc::new(StdFs),
level_routes: None,
descriptor_table: Some(Arc::new(DescriptorTable::new(256))),
seqno: SharedSequenceNumberGenerator::from(SequenceNumberCounter::default()),
visible_seqno: SharedSequenceNumberGenerator::from(SequenceNumberCounter::default()),
cache: Arc::new(Cache::with_capacity_bytes(
16 * 1_024 * 1_024,
)),
data_block_restart_interval_policy: RestartIntervalPolicy::all(16),
index_block_restart_interval_policy: RestartIntervalPolicy::all(1),
level_count: DEFAULT_LEVEL_COUNT,
data_block_size_policy: BlockSizePolicy::all(4_096),
index_block_pinning_policy: PinningPolicy::new([true, true, false]),
filter_block_pinning_policy: PinningPolicy::new([true, false]),
top_level_index_block_pinning_policy: PinningPolicy::all(true), top_level_filter_block_pinning_policy: PinningPolicy::all(true),
index_block_partitioning_policy: PinningPolicy::all(true),
filter_block_partitioning_policy: PinningPolicy::new([false, false, false, true]),
index_block_partition_size_policy: BlockSizePolicy::all(4_096), filter_block_partition_size_policy: BlockSizePolicy::all(4_096),
data_block_compression_policy: ({
#[cfg(feature = "lz4")]
let c = CompressionPolicy::new([CompressionType::None, CompressionType::Lz4]);
#[cfg(not(feature = "lz4"))]
let c = CompressionPolicy::new([CompressionType::None]);
c
}),
index_block_compression_policy: CompressionPolicy::all(CompressionType::None),
data_block_hash_ratio_policy: HashRatioPolicy::all(0.0),
locator_policy: LocatorPolicy::block_level(),
filter_policy: FilterPolicy::all(FilterPolicyEntry::Bloom(
BloomConstructionPolicy::BitsPerKey(10.0),
)),
compaction_filter_factory: None,
merge_operator: None,
prefix_extractor: None,
expect_point_read_hits: false,
page_ecc: false,
initial_runtime_config: crate::runtime_config::RuntimeConfig::default(),
kv_separation_opts: None,
#[cfg(zstd_any)]
zstd_dictionary: None,
comparator: comparator::default_comparator(),
encryption: None,
manifest_recovery_mode: ManifestRecoveryMode::AbsoluteConsistency,
sync_mode: SyncMode::Normal,
directory_lock: true,
manifest_log_rotate_bytes: 1024 * 1024,
compaction_rate_limit: 0,
#[cfg(feature = "std")]
compaction_threads: std::thread::available_parallelism()
.map_or(1, |n| (n.get() / 2).max(1)),
#[cfg(feature = "std")]
compaction_pool: None,
#[cfg(feature = "std")]
subcompaction_min_bytes: crate::compaction::worker::SUBCOMPACTION_MIN_INPUT_BYTES,
#[cfg(all(test, feature = "std"))]
fail_one_subcompaction: Arc::new(core::sync::atomic::AtomicBool::new(false)),
#[cfg(all(test, feature = "std"))]
fail_tight_after_first_slice: Arc::new(core::sync::atomic::AtomicBool::new(false)),
}
}
}
#[cfg_attr(
not(feature = "std"),
allow(
dead_code,
reason = "directory-lock filename used only by the std-gated lock-acquisition path"
)
)]
pub(crate) const DIRECTORY_LOCK_FILE: &str = "LOCK";
#[cfg(feature = "std")]
pub(crate) fn acquire_directory_lock(
fs: &dyn Fs,
dir: &Path,
enabled: bool,
) -> crate::Result<Option<Box<dyn crate::fs::FsFile>>> {
if !enabled {
return Ok(None);
}
let lock_path = dir.join(DIRECTORY_LOCK_FILE);
let file = fs.open(
&lock_path,
&crate::fs::FsOpenOptions::new()
.read(true)
.write(true)
.create(true),
)?;
if file.try_lock_exclusive()? {
Ok(Some(file))
} else {
Err(crate::Error::Locked(dir.display().to_string()))
}
}
impl Config {
#[cfg(feature = "std")]
pub fn new<P: AsRef<Path>>(
path: P,
seqno: SequenceNumberCounter,
visible_seqno: SequenceNumberCounter,
) -> Self {
Self {
path: absolute_path(path.as_ref()),
seqno: Arc::new(seqno),
visible_seqno: Arc::new(visible_seqno),
..Default::default()
}
}
#[must_use]
pub fn with_fs<F: Fs>(mut self, fs: F) -> Self {
self.fs = Arc::new(fs);
self
}
#[must_use]
pub fn with_shared_fs(mut self, fs: Arc<dyn Fs>) -> Self {
self.fs = fs;
self
}
pub fn open(self) -> crate::Result<AnyTree> {
#[cfg(zstd_any)]
self.validate_zstd_dictionary()?;
#[cfg(zstd_any)]
if self
.encryption
.as_ref()
.is_some_and(|enc| !enc.supports_aad_block_path())
{
return Err(crate::Error::Encrypt(
"encryption provider does not implement the AAD-bound block path \
(encrypt_block_aad / decrypt_block_aad) required for encrypted \
blocks on a zstd build",
));
}
Ok(if self.kv_separation_opts.is_some() {
AnyTree::Blob(BlobTree::open(self)?)
} else {
AnyTree::Standard(Tree::open(self)?)
})
}
#[cfg(zstd_any)]
fn validate_zstd_dictionary(&self) -> crate::Result<()> {
let dict_id = self.zstd_dictionary.as_ref().map(|d| d.id());
for ct in self.data_block_compression_policy.iter() {
if let &CompressionType::ZstdDict {
dict_id: required, ..
} = ct
{
match dict_id {
None => {
return Err(crate::Error::ZstdDictMismatch {
expected: required,
got: None,
});
}
Some(actual) if actual != required => {
return Err(crate::Error::ZstdDictMismatch {
expected: required,
got: Some(actual),
});
}
_ => {}
}
}
}
if let Some(ref kv_opts) = self.kv_separation_opts
&& let CompressionType::ZstdDict {
dict_id: required, ..
} = kv_opts.compression
{
match kv_opts.zstd_dictionary.as_ref().map(|d| d.id()) {
None => {
return Err(crate::Error::ZstdDictMismatch {
expected: required,
got: None,
});
}
Some(actual) if actual != required => {
return Err(crate::Error::ZstdDictMismatch {
expected: required,
got: Some(actual),
});
}
_ => {}
}
}
Ok(())
}
#[cfg(feature = "std")]
pub fn new_with_generators<P: AsRef<Path>>(
path: P,
seqno: SharedSequenceNumberGenerator,
visible_seqno: SharedSequenceNumberGenerator,
) -> Self {
Self {
path: absolute_path(path.as_ref()),
seqno,
visible_seqno,
..Default::default()
}
}
}
#[cfg(all(test, zstd_any))]
mod tests;
impl Config {
#[must_use]
pub fn tables_folder_for_level(&self, level: u8) -> (PathBuf, Arc<dyn Fs>) {
if let Some(routes) = &self.level_routes {
for route in routes {
if route.levels.contains(&level) {
return (route.path.join(TABLES_FOLDER), route.fs.clone());
}
}
}
(self.path.join(TABLES_FOLDER), self.fs.clone())
}
#[must_use]
pub(crate) fn min_available_space(&self) -> u64 {
let mut free = self.fs.available_space(&self.path).unwrap_or(u64::MAX);
if let Some(routes) = &self.level_routes {
for route in routes {
free = free.min(route.fs.available_space(&route.path).unwrap_or(u64::MAX));
}
}
free
}
#[must_use]
pub fn all_tables_folders(&self) -> Vec<(PathBuf, Arc<dyn Fs>)> {
let primary_fs: Arc<dyn Fs> = self.fs.clone();
let mut folders: Vec<(PathBuf, Arc<dyn Fs>)> =
vec![(self.path.join(TABLES_FOLDER), primary_fs)];
if let Some(routes) = &self.level_routes {
for route in routes {
let folder = route.path.join(TABLES_FOLDER);
if !folders.iter().any(|(p, _)| *p == folder) {
folders.push((folder, route.fs.clone()));
}
}
}
folders
}
#[must_use]
pub fn level_routes(mut self, routes: Vec<LevelRoute>) -> Self {
for route in &routes {
assert!(
route.levels.start < route.levels.end,
"empty or inverted level route range: {:?}",
route.levels,
);
}
for (i, a) in routes.iter().enumerate() {
for b in routes.iter().skip(i + 1) {
assert!(
a.levels.end <= b.levels.start || b.levels.end <= a.levels.start,
"overlapping level routes: {:?} and {:?}",
a.levels,
b.levels,
);
}
}
self.level_routes = if routes.is_empty() {
None
} else {
Some(
routes
.into_iter()
.map(|mut r| {
r.path = absolute_path(&r.path);
r
})
.collect(),
)
};
self
}
#[must_use]
pub fn seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self {
self.seqno = generator;
self
}
#[must_use]
pub fn visible_seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self {
self.visible_seqno = generator;
self
}
#[must_use]
pub fn use_cache(mut self, cache: Arc<Cache>) -> Self {
self.cache = cache;
self
}
#[must_use]
pub fn use_descriptor_table(mut self, descriptor_table: Option<Arc<DescriptorTable>>) -> Self {
self.descriptor_table = descriptor_table;
self
}
#[must_use]
pub fn expect_point_read_hits(mut self, b: bool) -> Self {
self.expect_point_read_hits = b;
self
}
#[must_use]
pub fn page_ecc(mut self, enabled: bool) -> Self {
self.page_ecc = enabled;
self
}
#[must_use]
pub fn with_directory_lock(mut self, enabled: bool) -> Self {
self.directory_lock = enabled;
self
}
#[must_use]
pub fn ecc_scheme(mut self, scheme: crate::runtime_config::EccScheme) -> Self {
self.initial_runtime_config.ecc_scheme = scheme;
self
}
#[must_use]
pub fn disable_cow_on_sst_files(mut self, enabled: bool) -> Self {
self.initial_runtime_config.disable_cow_on_sst_files = enabled;
self
}
#[must_use]
pub fn use_reflink_for_checkpoint(mut self, enabled: bool) -> Self {
self.initial_runtime_config.use_reflink_for_checkpoint = enabled;
self
}
#[must_use]
pub fn with_runtime_config(mut self, runtime: crate::runtime_config::RuntimeConfig) -> Self {
self.initial_runtime_config = runtime;
self
}
#[must_use]
pub fn filter_block_partitioning_policy(mut self, policy: PinningPolicy) -> Self {
self.filter_block_partitioning_policy = policy;
self
}
#[must_use]
pub fn index_block_partitioning_policy(mut self, policy: PinningPolicy) -> Self {
self.index_block_partitioning_policy = policy;
self
}
#[must_use]
pub fn filter_block_pinning_policy(mut self, policy: PinningPolicy) -> Self {
self.filter_block_pinning_policy = policy;
self
}
#[must_use]
pub fn index_block_pinning_policy(mut self, policy: PinningPolicy) -> Self {
self.index_block_pinning_policy = policy;
self
}
#[must_use]
pub fn data_block_restart_interval_policy(mut self, policy: RestartIntervalPolicy) -> Self {
assert!(
policy.iter().all(|interval| *interval > 0),
"data block restart interval must be greater than zero",
);
self.data_block_restart_interval_policy = policy;
self
}
#[must_use]
pub fn index_block_restart_interval_policy(mut self, policy: RestartIntervalPolicy) -> Self {
assert!(
policy.iter().all(|interval| *interval > 0),
"index block restart interval must be greater than zero",
);
self.index_block_restart_interval_policy = policy;
self
}
#[must_use]
pub fn filter_policy(mut self, policy: FilterPolicy) -> Self {
self.filter_policy = policy;
self
}
#[must_use]
pub fn locator_policy(mut self, policy: LocatorPolicy) -> Self {
self.locator_policy = policy;
self
}
#[must_use]
pub fn data_block_compression_policy(mut self, policy: CompressionPolicy) -> Self {
self.data_block_compression_policy = policy;
self
}
#[must_use]
pub fn index_block_compression_policy(mut self, policy: CompressionPolicy) -> Self {
self.index_block_compression_policy = policy;
self
}
#[must_use]
pub fn data_block_size_policy(mut self, policy: BlockSizePolicy) -> Self {
self.data_block_size_policy = policy;
self
}
#[must_use]
pub fn data_block_hash_ratio_policy(mut self, policy: HashRatioPolicy) -> Self {
self.data_block_hash_ratio_policy = policy;
self
}
#[must_use]
pub fn with_kv_separation(mut self, opts: Option<KvSeparationOptions>) -> Self {
self.kv_separation_opts = opts;
self
}
#[must_use]
pub fn with_compaction_filter_factory(mut self, factory: Option<Arc<dyn Factory>>) -> Self {
self.compaction_filter_factory = factory;
self
}
#[must_use]
pub fn prefix_extractor(mut self, extractor: Arc<dyn PrefixExtractor>) -> Self {
self.prefix_extractor = Some(extractor);
self
}
#[must_use]
pub fn with_merge_operator(mut self, op: Option<Arc<dyn MergeOperator>>) -> Self {
self.merge_operator = op;
self
}
#[must_use]
pub fn comparator(mut self, comparator: SharedComparator) -> Self {
self.comparator = comparator;
self
}
#[must_use]
pub fn with_encryption(mut self, encryption: Option<Arc<dyn EncryptionProvider>>) -> Self {
self.encryption = encryption;
self
}
#[must_use]
pub fn manifest_recovery_mode(mut self, mode: ManifestRecoveryMode) -> Self {
self.manifest_recovery_mode = mode;
self
}
#[must_use]
pub fn sync_mode(mut self, mode: SyncMode) -> Self {
self.sync_mode = mode;
self
}
#[must_use]
pub fn manifest_log_rotate_bytes(mut self, bytes: u64) -> Self {
self.manifest_log_rotate_bytes = bytes;
self
}
#[must_use]
pub fn compaction_rate_limit(mut self, bytes_per_sec: u64) -> Self {
self.compaction_rate_limit = bytes_per_sec;
self
}
#[cfg(feature = "std")]
#[must_use]
pub fn compaction_threads(mut self, threads: usize) -> Self {
self.compaction_threads = threads.max(1);
self
}
#[cfg(feature = "std")]
#[must_use]
pub fn subcompaction_min_bytes(mut self, bytes: u64) -> Self {
self.subcompaction_min_bytes = bytes;
self
}
#[cfg(feature = "std")]
#[must_use]
pub fn compaction_pool(
mut self,
pool: Option<Arc<dyn crate::table::writer::CompactionSpawner>>,
) -> Self {
self.compaction_pool = pool;
self
}
#[cfg(zstd_any)]
#[must_use]
pub fn zstd_dictionary(
mut self,
dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
) -> Self {
self.zstd_dictionary = dictionary;
self
}
}
#[cfg(test)]
mod builder_tests;