use duration_str::{deserialize_duration, deserialize_option_duration};
use figment::providers::{Env, Format, Json, Toml, Yaml};
use figment::{Figment, Metadata, Provider};
use serde::{Deserialize, Serialize, Serializer};
use std::path::Path;
use std::sync::Arc;
use std::{str::FromStr, time::Duration};
use uuid::Uuid;
use crate::error::SlateDBError;
use crate::db_cache::DbCache;
use crate::garbage_collector::{DEFAULT_INTERVAL, DEFAULT_MIN_AGE};
use crate::merge_operator::MergeOperatorType;
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq)]
pub enum PreloadLevel {
L0Sst,
AllSst,
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Default)]
pub enum SstBlockSize {
Block1Kib,
Block2Kib,
#[default]
Block4Kib,
Block8Kib,
Block16Kib,
Block32Kib,
Block64Kib,
#[cfg(test)]
Other(usize),
}
impl SstBlockSize {
pub fn as_bytes(&self) -> usize {
match self {
SstBlockSize::Block1Kib => 1024,
SstBlockSize::Block2Kib => 2048,
SstBlockSize::Block4Kib => 4096,
SstBlockSize::Block8Kib => 8192,
SstBlockSize::Block16Kib => 16384,
SstBlockSize::Block32Kib => 32768,
SstBlockSize::Block64Kib => 65536,
#[cfg(test)]
SstBlockSize::Other(size) => *size,
}
}
}
#[non_exhaustive]
#[derive(Clone, Default, Debug, Copy, PartialEq)]
pub enum DurabilityLevel {
Remote,
#[default]
Memory,
}
#[derive(Clone, Debug)]
pub struct ReadOptions {
pub durability_filter: DurabilityLevel,
pub dirty: bool,
pub cache_blocks: bool,
}
impl Default for ReadOptions {
fn default() -> Self {
Self {
durability_filter: DurabilityLevel::default(),
dirty: false,
cache_blocks: true,
}
}
}
impl ReadOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_dirty(self, dirty: bool) -> Self {
Self { dirty, ..self }
}
pub fn with_durability_filter(self, durability_filter: DurabilityLevel) -> Self {
Self {
durability_filter,
..self
}
}
pub fn with_cache_blocks(self, cache_blocks: bool) -> Self {
Self {
cache_blocks,
..self
}
}
}
#[derive(Clone, Debug)]
pub struct ScanOptions {
pub durability_filter: DurabilityLevel,
pub dirty: bool,
pub read_ahead_bytes: usize,
pub cache_blocks: bool,
pub max_fetch_tasks: usize,
}
impl Default for ScanOptions {
fn default() -> Self {
Self {
durability_filter: DurabilityLevel::default(),
dirty: false,
read_ahead_bytes: 1,
cache_blocks: false,
max_fetch_tasks: 1,
}
}
}
impl ScanOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_dirty(self, dirty: bool) -> Self {
Self { dirty, ..self }
}
pub fn with_durability_filter(self, durability_filter: DurabilityLevel) -> Self {
Self {
durability_filter,
..self
}
}
pub fn with_read_ahead_bytes(self, read_ahead_bytes: usize) -> Self {
Self {
read_ahead_bytes,
..self
}
}
pub fn with_cache_blocks(self, cache_blocks: bool) -> Self {
Self {
cache_blocks,
..self
}
}
pub fn with_max_fetch_tasks(self, max_fetch_tasks: usize) -> Self {
Self {
max_fetch_tasks,
..self
}
}
}
#[derive(Clone)]
pub enum FlushType {
MemTable,
Wal,
}
#[derive(Clone)]
pub struct FlushOptions {
pub flush_type: FlushType,
}
impl Default for FlushOptions {
fn default() -> Self {
Self {
flush_type: FlushType::Wal,
}
}
}
#[derive(Clone, Debug)]
pub struct WriteOptions {
pub await_durable: bool,
}
impl Default for WriteOptions {
fn default() -> Self {
Self {
await_durable: true,
}
}
}
#[derive(Clone, Default, PartialEq, Debug)]
pub struct PutOptions {
pub ttl: Ttl,
}
impl PutOptions {
pub(crate) fn expire_ts_from(&self, default: Option<u64>, now: i64) -> Option<i64> {
match self.ttl {
Ttl::Default => match default {
None => None,
Some(default_ttl) => Self::checked_expire_ts(now, default_ttl),
},
Ttl::NoExpiry => None,
Ttl::ExpireAfter(ttl) => Self::checked_expire_ts(now, ttl),
}
}
fn checked_expire_ts(now: i64, ttl: u64) -> Option<i64> {
if ttl > i64::MAX as u64 {
return None;
};
let expire_ts = now + (ttl as i64);
if expire_ts < now {
return None;
};
Some(expire_ts)
}
}
#[derive(Clone, Default, PartialEq, Debug)]
pub struct MergeOptions {
pub ttl: Ttl,
}
impl MergeOptions {
pub(crate) fn expire_ts_from(&self, default: Option<u64>, now: i64) -> Option<i64> {
match self.ttl {
Ttl::Default => match default {
None => None,
Some(default_ttl) => Self::checked_expire_ts(now, default_ttl),
},
Ttl::NoExpiry => None,
Ttl::ExpireAfter(ttl) => Self::checked_expire_ts(now, ttl),
}
}
fn checked_expire_ts(now: i64, ttl: u64) -> Option<i64> {
if ttl > i64::MAX as u64 {
return None;
};
let expire_ts = now + (ttl as i64);
if expire_ts < now {
return None;
};
Some(expire_ts)
}
}
#[non_exhaustive]
#[derive(Clone, Default, PartialEq, Debug)]
pub enum Ttl {
#[default]
Default,
NoExpiry,
ExpireAfter(u64),
}
#[non_exhaustive]
#[derive(Debug, Copy, Clone)]
pub enum CheckpointScope {
All,
Durable,
}
#[derive(Debug, Clone, Default)]
pub struct CheckpointOptions {
pub lifetime: Option<Duration>,
pub source: Option<Uuid>,
pub name: Option<String>,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct Settings {
#[serde(deserialize_with = "deserialize_option_duration")]
#[serde(serialize_with = "serialize_option_duration")]
pub flush_interval: Option<Duration>,
#[cfg(feature = "wal_disable")]
pub wal_enabled: bool,
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
pub manifest_poll_interval: Duration,
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
pub manifest_update_timeout: Duration,
pub min_filter_keys: u32,
pub filter_bits_per_key: u32,
pub l0_sst_size_bytes: usize,
pub l0_max_ssts: usize,
pub max_unflushed_bytes: usize,
pub compactor_options: Option<CompactorOptions>,
pub compression_codec: Option<CompressionCodec>,
pub object_store_cache_options: ObjectStoreCacheOptions,
pub garbage_collector_options: Option<GarbageCollectorOptions>,
pub default_ttl: Option<u64>,
#[serde(skip)]
pub merge_operator: Option<MergeOperatorType>,
}
impl std::fmt::Debug for Settings {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut data = f.debug_struct("DbOptions");
data.field("flush_interval", &self.flush_interval);
#[cfg(feature = "wal_disable")]
{
data.field("wal_enabled", &self.wal_enabled);
}
data.field("manifest_poll_interval", &self.manifest_poll_interval)
.field("manifest_update_timeout", &self.manifest_update_timeout)
.field("min_filter_keys", &self.min_filter_keys)
.field("max_unflushed_bytes", &self.max_unflushed_bytes)
.field("l0_sst_size_bytes", &self.l0_sst_size_bytes)
.field("l0_max_ssts", &self.l0_max_ssts)
.field("compactor_options", &self.compactor_options)
.field("compression_codec", &self.compression_codec)
.field(
"object_store_cache_options",
&self.object_store_cache_options,
)
.field("garbage_collector_options", &self.garbage_collector_options)
.field("filter_bits_per_key", &self.filter_bits_per_key)
.field("default_ttl", &self.default_ttl)
.field(
"merge_operator",
&self
.merge_operator
.as_ref()
.map(|_| "Some(merge_operator)")
.unwrap_or("None"),
)
.finish()
}
}
impl Settings {
pub fn to_json_string(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Settings, crate::Error> {
let path = path.as_ref();
let Some(ext) = path.extension() else {
return Err(SlateDBError::UnknownConfigurationFormat(path.into()).into());
};
let mut builder = Figment::from(Settings::default());
match ext.to_str().unwrap_or_default() {
"json" => builder = builder.merge(Json::file(path)),
"toml" => builder = builder.merge(Toml::file(path)),
"yaml" | "yml" => builder = builder.merge(Yaml::file(path)),
_ => return Err(SlateDBError::UnknownConfigurationFormat(path.into()).into()),
}
builder
.extract()
.map_err(|e| SlateDBError::InvalidConfigurationFormat(Box::new(e)).into())
}
pub fn from_env_with_default(
prefix: &str,
default: Settings,
) -> Result<Settings, crate::Error> {
Figment::from(default)
.merge(Env::prefixed(prefix))
.extract()
.map_err(|e| SlateDBError::InvalidConfigurationFormat(Box::new(e)).into())
}
pub fn from_env(prefix: &str) -> Result<Settings, crate::Error> {
Settings::from_env_with_default(prefix, Settings::default())
}
pub fn load() -> Result<Settings, crate::Error> {
Figment::from(Settings::default())
.merge(Json::file("SlateDb.json"))
.merge(Toml::file("SlateDb.toml"))
.merge(Yaml::file("SlateDb.yaml"))
.merge(Yaml::file("SlateDb.yml"))
.admerge(Env::prefixed("SLATEDB_"))
.extract()
.map_err(|e| SlateDBError::InvalidConfigurationFormat(Box::new(e)).into())
}
}
impl Provider for Settings {
fn metadata(&self) -> figment::Metadata {
Metadata::named("SlateDb configuration options")
}
fn data(
&self,
) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
figment::providers::Serialized::defaults(Settings::default()).data()
}
}
impl Default for Settings {
fn default() -> Self {
Self {
flush_interval: Some(Duration::from_millis(100)),
#[cfg(feature = "wal_disable")]
wal_enabled: true,
manifest_poll_interval: Duration::from_secs(1),
manifest_update_timeout: Duration::from_secs(300),
min_filter_keys: 1000,
max_unflushed_bytes: 1_073_741_824,
l0_sst_size_bytes: 64 * 1024 * 1024,
l0_max_ssts: 8,
compactor_options: Some(CompactorOptions::default()),
compression_codec: None,
object_store_cache_options: ObjectStoreCacheOptions::default(),
garbage_collector_options: None,
filter_bits_per_key: 10,
default_ttl: None,
merge_operator: None,
}
}
}
#[derive(Clone, Deserialize, Serialize)]
pub struct DbReaderOptions {
pub manifest_poll_interval: Duration,
pub checkpoint_lifetime: Duration,
pub max_memtable_bytes: u64,
#[serde(skip)]
pub block_cache: Option<Arc<dyn DbCache>>,
#[serde(skip)]
pub merge_operator: Option<MergeOperatorType>,
}
impl Default for DbReaderOptions {
fn default() -> Self {
Self {
manifest_poll_interval: Duration::from_secs(10),
checkpoint_lifetime: Duration::from_secs(10 * 60),
max_memtable_bytes: 64 * 1024 * 1024,
block_cache: default_block_cache(),
merge_operator: None,
}
}
}
#[allow(unreachable_code)]
pub(crate) fn default_block_cache() -> Option<Arc<dyn DbCache>> {
#[cfg(feature = "moka")]
{
return Some(Arc::new(crate::db_cache::moka::MokaCache::new_with_opts(
crate::db_cache::moka::MokaCacheOptions {
max_capacity: crate::db_cache::DEFAULT_BLOCK_CACHE_CAPACITY,
time_to_live: None,
time_to_idle: None,
},
)));
}
#[cfg(feature = "foyer")]
{
return Some(Arc::new(crate::db_cache::foyer::FoyerCache::new_with_opts(
crate::db_cache::foyer::FoyerCacheOptions {
max_capacity: crate::db_cache::DEFAULT_BLOCK_CACHE_CAPACITY,
..Default::default()
},
)));
}
None
}
#[allow(unreachable_code)]
pub(crate) fn default_meta_cache() -> Option<Arc<dyn DbCache>> {
#[cfg(feature = "moka")]
{
return Some(Arc::new(crate::db_cache::moka::MokaCache::new_with_opts(
crate::db_cache::moka::MokaCacheOptions {
max_capacity: crate::db_cache::DEFAULT_META_CACHE_CAPACITY,
time_to_live: None,
time_to_idle: None,
},
)));
}
#[cfg(feature = "foyer")]
{
return Some(Arc::new(crate::db_cache::foyer::FoyerCache::new_with_opts(
crate::db_cache::foyer::FoyerCacheOptions {
max_capacity: crate::db_cache::DEFAULT_META_CACHE_CAPACITY,
..Default::default()
},
)));
}
None
}
#[non_exhaustive]
#[derive(Clone, Copy, Deserialize, PartialEq, Debug, Serialize)]
pub enum CompressionCodec {
#[cfg(feature = "snappy")]
Snappy,
#[cfg(feature = "zlib")]
Zlib,
#[cfg(feature = "lz4")]
Lz4,
#[cfg(feature = "zstd")]
Zstd,
}
impl FromStr for CompressionCodec {
type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
#[cfg(feature = "snappy")]
"snappy" => Ok(Self::Snappy),
#[cfg(feature = "zlib")]
"zlib" => Ok(Self::Zlib),
#[cfg(feature = "lz4")]
"lz4" => Ok(Self::Lz4),
#[cfg(feature = "zstd")]
"zstd" => Ok(Self::Zstd),
_ => Err(SlateDBError::InvalidCompressionCodec.into()),
}
}
}
#[derive(Clone, Deserialize, Serialize)]
pub struct CompactorOptions {
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
pub poll_interval: Duration,
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
pub manifest_update_timeout: Duration,
pub max_sst_size: usize,
pub max_concurrent_compactions: usize,
}
impl Default for CompactorOptions {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(5),
manifest_update_timeout: Duration::from_secs(300),
max_sst_size: 256 * 1024 * 1024,
max_concurrent_compactions: 4,
}
}
}
impl std::fmt::Debug for CompactorOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompactorOptions")
.field("poll_interval", &self.poll_interval)
.field("max_sst_size", &self.max_sst_size)
.field(
"max_concurrent_compactions",
&self.max_concurrent_compactions,
)
.finish()
}
}
#[derive(Clone, Copy, Debug)]
pub struct SizeTieredCompactionSchedulerOptions {
pub min_compaction_sources: usize,
pub max_compaction_sources: usize,
pub include_size_threshold: f32,
}
impl Default for SizeTieredCompactionSchedulerOptions {
fn default() -> Self {
Self {
min_compaction_sources: 4,
max_compaction_sources: 8,
include_size_threshold: 4.0,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct GarbageCollectorOptions {
pub manifest_options: Option<GarbageCollectorDirectoryOptions>,
pub wal_options: Option<GarbageCollectorDirectoryOptions>,
pub compacted_options: Option<GarbageCollectorDirectoryOptions>,
pub compactions_options: Option<GarbageCollectorDirectoryOptions>,
}
impl GarbageCollectorOptions {
pub fn is_empty(&self) -> bool {
self.manifest_options.is_none()
&& self.wal_options.is_none()
&& self.compacted_options.is_none()
&& self.compactions_options.is_none()
}
}
impl Default for GarbageCollectorDirectoryOptions {
fn default() -> Self {
Self {
interval: Some(DEFAULT_INTERVAL),
min_age: DEFAULT_MIN_AGE,
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub struct GarbageCollectorDirectoryOptions {
#[serde(deserialize_with = "deserialize_option_duration")]
#[serde(serialize_with = "serialize_option_duration")]
pub interval: Option<Duration>,
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
pub min_age: Duration,
}
impl Default for GarbageCollectorOptions {
fn default() -> Self {
Self {
manifest_options: None,
wal_options: None,
compacted_options: None,
compactions_options: None,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ObjectStoreCacheOptions {
pub root_folder: Option<std::path::PathBuf>,
pub max_cache_size_bytes: Option<usize>,
pub part_size_bytes: usize,
pub cache_puts: bool,
pub preload_disk_cache_on_startup: Option<PreloadLevel>,
#[serde(deserialize_with = "deserialize_option_duration")]
#[serde(
serialize_with = "serialize_option_duration",
skip_serializing_if = "Option::is_none"
)]
pub scan_interval: Option<Duration>,
}
impl Default for ObjectStoreCacheOptions {
fn default() -> Self {
Self {
root_folder: None,
#[cfg(target_pointer_width = "32")]
max_cache_size_bytes: Some(usize::MAX),
#[cfg(not(target_pointer_width = "32"))]
max_cache_size_bytes: Some(16 * 1024 * 1024 * 1024),
part_size_bytes: 4 * 1024 * 1024,
cache_puts: false,
preload_disk_cache_on_startup: None,
scan_interval: Some(Duration::from_secs(3600)),
}
}
}
fn serialize_duration<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let secs = duration.as_secs();
let millis = duration.subsec_millis();
let duration_str = if secs > 0 && millis > 0 {
format!("{secs}s+{millis:03}ms")
} else if millis > 0 {
format!("{millis:03}ms")
} else {
format!("{secs}s")
};
serializer.serialize_str(&duration_str)
}
fn serialize_option_duration<S>(
duration: &Option<Duration>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match duration {
Some(d) => serialize_duration(d, serializer),
None => serializer.serialize_none(),
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use super::*;
#[test]
fn test_db_options_load_from_env() {
figment::Jail::expect_with(|jail| {
jail.set_env("SLATEDB_FLUSH_INTERVAL", "1s");
jail.set_env(
"SLATEDB_OBJECT_STORE_CACHE_OPTIONS.ROOT_FOLDER",
"/tmp/slatedb-root",
);
let options =
Settings::from_env("SLATEDB_").expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_db_options_load_from_json_file() {
figment::Jail::expect_with(|jail| {
jail.create_file(
"config.json",
r#"
{
"flush_interval": "1s",
"object_store_cache_options": {
"root_folder": "/tmp/slatedb-root"
}
}
"#,
)
.expect("failed to create db options config file");
let options = Settings::from_file("config.json")
.expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_db_options_load_from_toml_file() {
figment::Jail::expect_with(|jail| {
jail.create_file(
"config.toml",
r#"
flush_interval = "1s"
[object_store_cache_options]
root_folder = "/tmp/slatedb-root"
"#,
)
.expect("failed to create db options config file");
let options = Settings::from_file("config.toml")
.expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_db_options_load_from_yaml_file() {
figment::Jail::expect_with(|jail| {
jail.create_file(
"config.yaml",
r#"
flush_interval: "1s"
object_store_cache_options:
root_folder: "/tmp/slatedb-root"
"#,
)
.expect("failed to create db options config file");
let options = Settings::from_file("config.yaml")
.expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_db_options_load_with_default_locations() {
figment::Jail::expect_with(|jail| {
jail.set_env("SLATEDB_FLUSH_INTERVAL", "1s");
jail.create_file(
"SlateDb.yaml",
r#"
object_store_cache_options:
root_folder: "/tmp/slatedb-root"
"#,
)
.expect("failed to create db options config file");
let options = Settings::load().expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_default_read_options() {
let options = ReadOptions::default();
assert_eq!(options.durability_filter, DurabilityLevel::Memory);
assert!(!options.dirty);
assert!(options.cache_blocks);
let options = ScanOptions::default();
assert_eq!(options.durability_filter, DurabilityLevel::Memory);
assert!(!options.dirty);
assert_eq!(options.read_ahead_bytes, 1);
assert!(!options.cache_blocks);
assert_eq!(options.max_fetch_tasks, 1);
}
#[test]
fn test_scan_options_with_max_fetch_tasks() {
let options = ScanOptions::default().with_max_fetch_tasks(4);
assert_eq!(options.max_fetch_tasks, 4);
assert_eq!(options.durability_filter, DurabilityLevel::Memory);
assert!(!options.dirty);
assert_eq!(options.read_ahead_bytes, 1);
assert!(!options.cache_blocks);
}
}