use crate::{
config::{
BlockSizePolicy, BloomConstructionPolicy, CompressionPolicy, FilterPolicy,
FilterPolicyEntry, HashRatioPolicy, PartitioningPolicy, PinningPolicy,
RestartIntervalPolicy,
},
keyspace::{config::DecodeConfig, InternalKeyspaceId},
meta_keyspace::{encode_config_key, MetaKeyspace},
};
use byteorder::ReadBytesExt;
use lsm_tree::{
compaction::Factory as CompactionFilterFactory, CompressionType, KvPair, KvSeparationOptions,
};
use std::sync::Arc;
#[expect(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct CreateOptions {
pub(crate) level_count: u8,
pub(crate) max_memtable_size: u64,
pub data_block_hash_ratio_policy: HashRatioPolicy,
#[doc(hidden)]
pub data_block_size_policy: BlockSizePolicy,
#[doc(hidden)]
pub data_block_restart_interval_policy: RestartIntervalPolicy,
#[doc(hidden)]
pub index_block_restart_interval_policy: RestartIntervalPolicy,
#[doc(hidden)]
pub index_block_pinning_policy: PinningPolicy,
#[doc(hidden)]
pub filter_block_pinning_policy: PinningPolicy,
#[doc(hidden)]
pub filter_block_partitioning_policy: PartitioningPolicy,
#[doc(hidden)]
pub index_block_partitioning_policy: PartitioningPolicy,
#[doc(hidden)]
pub expect_point_read_hits: bool,
#[doc(hidden)]
pub filter_policy: FilterPolicy,
#[doc(hidden)]
pub data_block_compression_policy: CompressionPolicy,
#[doc(hidden)]
pub index_block_compression_policy: CompressionPolicy,
pub(crate) manual_journal_persist: bool,
#[doc(hidden)]
pub compaction_strategy: Arc<dyn lsm_tree::compaction::CompactionStrategy + Send + Sync>,
#[doc(hidden)]
pub kv_separation_opts: Option<KvSeparationOptions>,
pub(crate) compaction_filter_factory: Option<Arc<dyn CompactionFilterFactory>>,
}
impl Default for CreateOptions {
fn default() -> Self {
let default_tree_config = lsm_tree::Config::default();
Self {
manual_journal_persist: false,
max_memtable_size: 64 * 1_024 * 1_024,
data_block_hash_ratio_policy: HashRatioPolicy::all(0.0),
data_block_size_policy: BlockSizePolicy::all( 4 * 1_024),
data_block_restart_interval_policy: RestartIntervalPolicy::new([10, 16]),
index_block_restart_interval_policy: RestartIntervalPolicy::all(1),
index_block_pinning_policy: PinningPolicy::new([true, true, false]),
filter_block_pinning_policy: PinningPolicy::new([true, false]),
index_block_partitioning_policy: PartitioningPolicy::new([false, false, false, true]),
filter_block_partitioning_policy: PartitioningPolicy::new([false, false, false, true]),
expect_point_read_hits: false,
filter_policy: FilterPolicy::new([
FilterPolicyEntry::Bloom(BloomConstructionPolicy::FalsePositiveRate(0.0001)),
FilterPolicyEntry::Bloom(BloomConstructionPolicy::BitsPerKey(10.0)),
]),
level_count: default_tree_config.level_count,
#[cfg(feature = "lz4")]
data_block_compression_policy: CompressionPolicy::new([CompressionType::None, CompressionType::None, CompressionType::Lz4]),
#[cfg(not(feature = "lz4"))]
data_block_compression_policy: CompressionPolicy::new(&[CompressionType::None]),
index_block_compression_policy: CompressionPolicy::all(CompressionType::None),
compaction_strategy: Arc::new(
crate::compaction::Leveled::default()
),
kv_separation_opts: None,
compaction_filter_factory: None,
}
}
}
macro_rules! policy {
($keyspace_id:expr, $name:expr, $field:expr) => {{
let key = encode_config_key($keyspace_id, $name);
(key.into(), $field.encode())
}};
}
impl CreateOptions {
pub(crate) fn with_compaction_filter_factory(
mut self,
factory: Arc<dyn CompactionFilterFactory + 'static>,
) -> Self {
self.compaction_filter_factory = Some(factory);
self
}
#[expect(clippy::expect_used, clippy::too_many_lines)]
pub(crate) fn from_kvs(
keyspace_id: InternalKeyspaceId,
meta_keyspace: &MetaKeyspace,
) -> crate::Result<Self> {
let blob = meta_keyspace.get_kv_for_config(keyspace_id, "blob")?;
let data_block_compression_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "data_block_compression_policy")?
.expect("should exist");
let data_block_compression_policy =
CompressionPolicy::decode(&data_block_compression_policy)?;
let index_block_compression_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "index_block_compression_policy")?
.expect("should exist");
let index_block_compression_policy =
CompressionPolicy::decode(&index_block_compression_policy)?;
let data_block_size_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "data_block_size_policy")?
.expect("should exist");
let data_block_size_policy = BlockSizePolicy::decode(&data_block_size_policy)?;
let filter_block_partitioning_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "filter_block_partitioning_policy")?
.expect("should exist");
let filter_block_partitioning_policy =
PinningPolicy::decode(&filter_block_partitioning_policy)?;
let index_block_partitioning_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "index_block_partitioning_policy")?
.expect("should exist");
let index_block_partitioning_policy =
PinningPolicy::decode(&index_block_partitioning_policy)?;
let filter_block_pinning_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "filter_block_pinning_policy")?
.expect("should exist");
let filter_block_pinning_policy = PinningPolicy::decode(&filter_block_pinning_policy)?;
let index_block_pinning_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "index_block_pinning_policy")?
.expect("should exist");
let index_block_pinning_policy = PinningPolicy::decode(&index_block_pinning_policy)?;
let data_block_restart_interval_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "data_block_restart_interval_policy")?
.expect("should exist");
let data_block_restart_interval_policy =
RestartIntervalPolicy::decode(&data_block_restart_interval_policy)?;
let index_block_restart_interval_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "index_block_restart_interval_policy")?
.expect("should exist");
let index_block_restart_interval_policy =
RestartIntervalPolicy::decode(&index_block_restart_interval_policy)?;
let data_block_hash_ratio_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "data_block_hash_ratio_policy")?
.expect("should exist");
let data_block_hash_ratio_policy = HashRatioPolicy::decode(&data_block_hash_ratio_policy)?;
let expect_point_read_hits = meta_keyspace
.get_kv_for_config(keyspace_id, "expect_point_read_hits")?
.expect("should exist");
let expect_point_read_hits = expect_point_read_hits == [1];
let filter_policy = meta_keyspace
.get_kv_for_config(keyspace_id, "filter_policy")?
.expect("should exist");
let filter_policy = FilterPolicy::decode(&filter_policy)?;
let blob_opts = blob.map(|_| {
use byteorder::LE;
use lsm_tree::coding::Decode;
let blob_age_cutoff = meta_keyspace
.get_kv_for_config(keyspace_id, "blob_age_cutoff")?
.expect("blob_age_cutoff should be defined");
let blob_age_cutoff = (&mut &blob_age_cutoff[..]).read_f32::<LE>()?;
let blob_compression = meta_keyspace
.get_kv_for_config(keyspace_id, "blob_compression")?
.expect("blob_compression should be defined");
let blob_compression = CompressionType::decode_from(&mut &blob_compression[..])?;
let file_target_size = meta_keyspace
.get_kv_for_config(keyspace_id, "blob_file_target_size")?
.expect("blob_file_target_size should be defined");
let file_target_size = (&mut &file_target_size[..]).read_u64::<LE>()?;
let separation_threshold = meta_keyspace
.get_kv_for_config(keyspace_id, "blob_separation_threshold")?
.expect("blob_separation_threshold should be defined");
let separation_threshold = (&mut &separation_threshold[..]).read_u32::<LE>()?;
let staleness_threshold = meta_keyspace
.get_kv_for_config(keyspace_id, "blob_staleness_threshold")?
.expect("blob_staleness_threshold should be defined");
let staleness_threshold = (&mut &staleness_threshold[..]).read_f32::<LE>()?;
Ok::<_, crate::Error>(
KvSeparationOptions::default()
.compression(blob_compression)
.file_target_size(file_target_size)
.separation_threshold(separation_threshold)
.staleness_threshold(staleness_threshold)
.age_cutoff(blob_age_cutoff),
)
});
let compaction_strategy_name = meta_keyspace
.get_kv_for_config(keyspace_id, "compaction_strategy")?
.expect("compaction_strategy should be defined");
let compaction_strategy_name = std::str::from_utf8(&compaction_strategy_name)
.expect("compaction_strategy should be UTF-8");
let compaction_strategy = match compaction_strategy_name {
lsm_tree::compaction::LEVELED_COMPACTION_NAME => {
use byteorder::LE;
let l0_threshold = meta_keyspace
.get_kv_for_config(keyspace_id, "leveled_l0_threshold")?
.expect("leveled_l0_threshold should be defined");
let l0_threshold = (&mut &l0_threshold[..]).read_u8()?;
let target_size = meta_keyspace
.get_kv_for_config(keyspace_id, "leveled_target_size")?
.expect("leveled_target_size should be defined");
let target_size = (&mut &target_size[..]).read_u64::<LE>()?;
let level_ratio_policy_bytes = meta_keyspace
.get_kv_for_config(keyspace_id, "leveled_level_ratio_policy")?
.expect("leveled_level_ratio_policy should be defined");
let level_ratio_policy_bytes = &mut &level_ratio_policy_bytes[..];
let level_ratio_policy_len = level_ratio_policy_bytes.read_u8()?;
let mut ratios = vec![];
for _ in 0..level_ratio_policy_len {
ratios.push(level_ratio_policy_bytes.read_f32::<LE>()?);
}
Arc::new(
crate::compaction::Leveled::default()
.with_l0_threshold(l0_threshold)
.with_table_target_size(target_size)
.with_level_ratio_policy(ratios),
) as Arc<dyn lsm_tree::compaction::CompactionStrategy + Send + Sync>
}
lsm_tree::compaction::FIFO_COMPACTION_NAME => {
use byteorder::LE;
let fifo_limit = meta_keyspace
.get_kv_for_config(keyspace_id, "fifo_limit")?
.expect("fifo_limit should be defined");
let fifo_limit = (&mut &fifo_limit[..]).read_u64::<LE>()?;
let has_ttl = meta_keyspace
.get_kv_for_config(keyspace_id, "fifo_ttl")?
.expect("fifo_ttl should be defined")
== [1];
let ttl_seconds = if has_ttl {
let fifo_ttl_seconds = meta_keyspace
.get_kv_for_config(keyspace_id, "fifo_ttl_seconds")?
.expect("fifo_ttl_seconds should be defined");
let fifo_ttl_seconds = (&mut &fifo_ttl_seconds[..]).read_u64::<LE>()?;
Some(fifo_ttl_seconds)
} else {
None
};
Arc::new(crate::compaction::Fifo::new(fifo_limit, ttl_seconds))
}
name => {
panic!("Invalid/unsupported compaction strategy: {name:?}");
}
};
let manual_journal_persist = meta_keyspace
.get_kv_for_config(keyspace_id, "manual_journal_persist")?
.expect("should exist")
== [1];
let max_memtable_size = meta_keyspace
.get_kv_for_config(keyspace_id, "max_memtable_size")?
.expect("should exist");
let max_memtable_size = (&mut &max_memtable_size[..]).read_u64::<byteorder::LE>()?;
Ok(Self {
data_block_hash_ratio_policy,
filter_block_partitioning_policy,
index_block_partitioning_policy,
filter_block_pinning_policy,
index_block_pinning_policy,
data_block_compression_policy,
index_block_compression_policy,
data_block_size_policy,
data_block_restart_interval_policy,
index_block_restart_interval_policy,
expect_point_read_hits,
filter_policy,
level_count: 7,
manual_journal_persist,
max_memtable_size,
compaction_strategy,
kv_separation_opts: blob_opts.transpose()?,
compaction_filter_factory: None,
})
}
#[expect(clippy::too_many_lines)]
pub(crate) fn encode_kvs(&self, keyspace_id: InternalKeyspaceId) -> Vec<KvPair> {
use crate::keyspace::config::EncodeConfig;
let mut kvs = vec![
{
let key = encode_config_key(keyspace_id, "compaction_strategy");
(key, self.compaction_strategy.get_name().into())
},
policy!(
keyspace_id,
"data_block_compression_policy",
self.data_block_compression_policy
),
policy!(
keyspace_id,
"data_block_hash_ratio_policy",
self.data_block_hash_ratio_policy
),
policy!(
keyspace_id,
"data_block_restart_interval_policy",
self.data_block_restart_interval_policy
),
policy!(
keyspace_id,
"data_block_size_policy",
self.data_block_size_policy
),
{
let key = encode_config_key(keyspace_id, "expect_point_read_hits");
let value = (if self.expect_point_read_hits {
[1u8]
} else {
[0u8]
})
.into();
(key, value)
},
policy!(
keyspace_id,
"filter_block_partitioning_policy",
self.filter_block_partitioning_policy
),
policy!(
keyspace_id,
"filter_block_pinning_policy",
self.filter_block_pinning_policy
),
policy!(keyspace_id, "filter_policy", self.filter_policy),
policy!(
keyspace_id,
"index_block_compression_policy",
self.index_block_compression_policy
),
policy!(
keyspace_id,
"index_block_partitioning_policy",
self.index_block_partitioning_policy
),
policy!(
keyspace_id,
"index_block_pinning_policy",
self.index_block_pinning_policy
),
policy!(
keyspace_id,
"index_block_restart_interval_policy",
self.index_block_restart_interval_policy
),
{
let key = encode_config_key(keyspace_id, "level_count");
(key, [self.level_count].into())
},
{
let key = encode_config_key(keyspace_id, "manual_journal_persist");
(key, [u8::from(self.manual_journal_persist)].into())
},
{
let key = encode_config_key(keyspace_id, "max_memtable_size");
(key, self.max_memtable_size.to_le_bytes().into())
},
{
let key = encode_config_key(keyspace_id, "version");
(key, [3u8].into())
},
];
match self.compaction_strategy.get_name() {
"LeveledCompaction" | "FifoCompaction" => {
kvs.extend(
self.compaction_strategy
.get_config()
.into_iter()
.map(|(k, v)| (encode_config_key(keyspace_id, k), v)),
);
}
name => {
panic!("Invalid/unsupported compaction strategy: {name:?}");
}
}
if let Some(blob_opts) = &self.kv_separation_opts {
kvs.extend([
{
let key = encode_config_key(keyspace_id, "blob");
(key, [1u8].into())
},
{
let key = encode_config_key(keyspace_id, "blob_age_cutoff");
(key, blob_opts.age_cutoff.to_le_bytes().into())
},
{
use lsm_tree::coding::Encode;
let key = encode_config_key(keyspace_id, "blob_compression");
(key, blob_opts.compression.encode_into_vec().into())
},
{
let key = encode_config_key(keyspace_id, "blob_file_target_size");
(key, blob_opts.file_target_size.to_le_bytes().into())
},
{
let key = encode_config_key(keyspace_id, "blob_separation_threshold");
(key, blob_opts.separation_threshold.to_le_bytes().into())
},
{
let key = encode_config_key(keyspace_id, "blob_staleness_threshold");
(key, blob_opts.staleness_threshold.to_le_bytes().into())
},
]);
}
kvs
}
#[must_use]
pub fn with_kv_separation(mut self, opts: Option<KvSeparationOptions>) -> Self {
self.kv_separation_opts = opts;
self
}
#[must_use]
pub fn data_block_restart_interval_policy(mut self, policy: RestartIntervalPolicy) -> Self {
self.data_block_restart_interval_policy = policy;
self
}
#[must_use]
pub fn filter_block_pinning_policy(mut self, policy: PinningPolicy) -> Self {
self.filter_block_pinning_policy = policy;
self
}
#[must_use]
pub fn index_block_pinning_policy(mut self, policy: PinningPolicy) -> Self {
self.index_block_pinning_policy = policy;
self
}
#[must_use]
pub fn filter_block_partitioning_policy(mut self, policy: PartitioningPolicy) -> Self {
self.filter_block_partitioning_policy = policy;
self
}
#[must_use]
pub fn index_block_partitioning_policy(mut self, policy: PartitioningPolicy) -> Self {
self.index_block_partitioning_policy = policy;
self
}
#[must_use]
#[doc(hidden)]
pub fn data_block_hash_ratio_policy(mut self, policy: HashRatioPolicy) -> Self {
self.data_block_hash_ratio_policy = policy;
self
}
#[must_use]
pub fn filter_policy(mut self, policy: FilterPolicy) -> Self {
self.filter_policy = policy;
self
}
#[must_use]
pub fn expect_point_read_hits(mut self, b: bool) -> Self {
self.expect_point_read_hits = b;
self
}
#[must_use]
pub fn data_block_compression_policy(mut self, policy: CompressionPolicy) -> Self {
self.data_block_compression_policy = policy;
self
}
#[must_use]
pub fn index_block_compression_policy(mut self, policy: CompressionPolicy) -> Self {
self.index_block_compression_policy = policy;
self
}
#[must_use]
pub fn compaction_strategy(
mut self,
compaction_strategy: Arc<dyn lsm_tree::compaction::CompactionStrategy + Send + Sync>,
) -> Self {
self.compaction_strategy = compaction_strategy;
self
}
#[must_use]
pub fn manual_journal_persist(mut self, flag: bool) -> Self {
self.manual_journal_persist = flag;
self
}
#[must_use]
pub fn max_memtable_size(mut self, bytes: u64) -> Self {
self.max_memtable_size = bytes;
self
}
#[must_use]
pub fn data_block_size_policy(mut self, policy: BlockSizePolicy) -> Self {
self.data_block_size_policy = policy;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use test_log::test;
#[test]
#[cfg(not(feature = "lz4"))]
fn keyspace_opts_compression_none() {
let mut c = CreateOptions::default();
assert_eq!(
c.data_block_compression_policy,
CompressionPolicy::disabled(),
);
assert_eq!(c.kv_separation_opts, None);
c = c.with_kv_separation(KvSeparationOptions::default());
assert_eq!(
c.kv_separation_opts.as_ref().unwrap().compression,
CompressionType::None,
);
c = c.data_block_compression_policy(CompressionPolicy::disabled());
assert_eq!(
c.data_block_compression_policy,
CompressionPolicy::disabled(),
);
assert_eq!(
c.kv_separation_opts.unwrap().compression,
CompressionType::None,
);
}
#[test]
#[expect(clippy::unwrap_used)]
#[cfg(feature = "lz4")]
fn keyspace_opts_compression_default() {
use CompressionType::{Lz4, None as Uncompressed};
let mut c = CreateOptions::default();
assert_eq!(
c.data_block_compression_policy,
CompressionPolicy::new([Uncompressed, Uncompressed, Lz4]),
);
assert_eq!(c.kv_separation_opts, None);
c = c.with_kv_separation(Some(KvSeparationOptions::default()));
assert_eq!(c.kv_separation_opts.as_ref().unwrap().compression, Lz4);
c = c.data_block_compression_policy(CompressionPolicy::disabled());
assert_eq!(
c.data_block_compression_policy,
CompressionPolicy::disabled(),
);
assert_eq!(c.kv_separation_opts.as_ref().unwrap().compression, Lz4);
}
}