use duration_str::{deserialize_duration, deserialize_option_duration};
use figment::providers::{Env, Format, Json, Toml, Yaml};
use figment::{Figment, Metadata, Provider};
use serde::{Deserialize, Serialize, Serializer};
use std::path::Path;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{str::FromStr, time::Duration};
use tokio::runtime::Handle;
use uuid::Uuid;
use crate::compactor::CompactionScheduler;
use crate::config::GcExecutionMode::Periodic;
use crate::error::{DbOptionsError, SlateDBError};
use crate::db_cache::DbCache;
use crate::size_tiered_compaction::SizeTieredCompactionSchedulerSupplier;
#[non_exhaustive]
#[derive(Clone, Default, Debug, Copy)]
pub enum ReadLevel {
#[default]
Committed,
Uncommitted,
}
#[derive(Clone, Default)]
pub struct ReadOptions {
pub read_level: ReadLevel,
}
#[derive(Clone)]
pub struct ScanOptions {
pub read_level: ReadLevel,
pub read_ahead_bytes: usize,
pub cache_blocks: bool,
}
impl Default for ScanOptions {
fn default() -> Self {
Self {
read_level: ReadLevel::Committed,
read_ahead_bytes: 1,
cache_blocks: false,
}
}
}
#[derive(Clone)]
pub struct WriteOptions {
pub await_durable: bool,
}
impl Default for WriteOptions {
fn default() -> Self {
Self {
await_durable: true,
}
}
}
#[derive(Clone, Default)]
pub struct PutOptions {
pub ttl: Ttl,
}
impl PutOptions {
pub(crate) fn expire_ts_from(&self, default: Option<u64>, now: i64) -> Option<i64> {
match self.ttl {
Ttl::Default => match default {
None => None,
Some(default_ttl) => Self::checked_expire_ts(now, default_ttl),
},
Ttl::NoExpiry => None,
Ttl::ExpireAfter(ttl) => Self::checked_expire_ts(now, ttl),
}
}
fn checked_expire_ts(now: i64, ttl: u64) -> Option<i64> {
if ttl > i64::MAX as u64 {
return None;
};
let expire_ts = now + (ttl as i64);
if expire_ts < now {
return None;
};
Some(expire_ts)
}
}
#[non_exhaustive]
#[derive(Clone, Default)]
pub enum Ttl {
#[default]
Default,
NoExpiry,
ExpireAfter(u64),
}
pub trait Clock {
fn now(&self) -> i64;
}
#[derive(Default)]
pub struct SystemClock {
last_tick: AtomicI64,
}
impl Clock for SystemClock {
fn now(&self) -> i64 {
let tick = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(duration) => duration.as_millis() as i64, Err(e) => -(e.duration().as_millis() as i64), };
self.last_tick.fetch_max(tick, SeqCst);
self.last_tick.load(SeqCst)
}
}
pub(crate) fn default_clock() -> Arc<dyn Clock + Send + Sync> {
Arc::new(SystemClock {
last_tick: AtomicI64::new(i64::MIN),
})
}
#[non_exhaustive]
#[derive(Debug, Copy, Clone)]
pub enum CheckpointScope {
#[non_exhaustive]
All {
force_flush: bool,
},
Durable,
}
impl CheckpointScope {
pub fn all_with_force_flush(force_flush: bool) -> Self {
Self::All { force_flush }
}
}
#[derive(Debug, Clone, Default)]
pub struct CheckpointOptions {
pub lifetime: Option<Duration>,
pub source: Option<Uuid>,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct DbOptions {
#[serde(deserialize_with = "deserialize_option_duration")]
#[serde(serialize_with = "serialize_option_duration")]
pub flush_interval: Option<Duration>,
#[cfg(feature = "wal_disable")]
pub wal_enabled: bool,
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
pub manifest_poll_interval: Duration,
pub min_filter_keys: u32,
pub filter_bits_per_key: u32,
pub l0_sst_size_bytes: usize,
pub l0_max_ssts: usize,
pub max_unflushed_bytes: usize,
pub compactor_options: Option<CompactorOptions>,
pub compression_codec: Option<CompressionCodec>,
pub object_store_cache_options: ObjectStoreCacheOptions,
#[serde(skip)]
pub block_cache: Option<Arc<dyn DbCache>>,
pub garbage_collector_options: Option<GarbageCollectorOptions>,
#[serde(skip)]
#[serde(default = "default_clock")]
pub clock: Arc<dyn Clock + Send + Sync>,
pub default_ttl: Option<u64>,
}
impl std::fmt::Debug for DbOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut data = f.debug_struct("DbOptions");
data.field("flush_interval", &self.flush_interval);
#[cfg(feature = "wal_disable")]
{
data.field("wal_enabled", &self.wal_enabled);
}
data.field("manifest_poll_interval", &self.manifest_poll_interval)
.field("min_filter_keys", &self.min_filter_keys)
.field("max_unflushed_bytes", &self.max_unflushed_bytes)
.field("l0_sst_size_bytes", &self.l0_sst_size_bytes)
.field("l0_max_ssts", &self.l0_max_ssts)
.field("compactor_options", &self.compactor_options)
.field("compression_codec", &self.compression_codec)
.field(
"object_store_cache_options",
&self.object_store_cache_options,
)
.field("garbage_collector_options", &self.garbage_collector_options)
.field("filter_bits_per_key", &self.filter_bits_per_key)
.field("default_ttl", &self.default_ttl)
.finish()
}
}
impl DbOptions {
pub fn to_json_string(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<DbOptions, DbOptionsError> {
let path = path.as_ref();
let Some(ext) = path.extension() else {
return Err(DbOptionsError::UnknownFormat(path.into()));
};
let mut builder = Figment::from(DbOptions::default());
match ext.to_str().unwrap_or_default() {
"json" => builder = builder.merge(Json::file(path)),
"toml" => builder = builder.merge(Toml::file(path)),
"yaml" | "yml" => builder = builder.merge(Yaml::file(path)),
_ => return Err(DbOptionsError::UnknownFormat(path.into())),
}
builder.extract().map_err(Into::into)
}
pub fn from_env(prefix: &str) -> Result<DbOptions, DbOptionsError> {
Figment::from(DbOptions::default())
.merge(Env::prefixed(prefix))
.extract()
.map_err(Into::into)
}
pub fn load() -> Result<DbOptions, DbOptionsError> {
Figment::from(DbOptions::default())
.merge(Json::file("SlateDb.json"))
.merge(Toml::file("SlateDb.toml"))
.merge(Yaml::file("SlateDb.yaml"))
.merge(Yaml::file("SlateDb.yml"))
.admerge(Env::prefixed("SLATEDB_"))
.extract()
.map_err(Into::into)
}
}
impl Provider for DbOptions {
fn metadata(&self) -> figment::Metadata {
Metadata::named("SlateDb configuration options")
}
fn data(
&self,
) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
figment::providers::Serialized::defaults(DbOptions::default()).data()
}
}
impl Default for DbOptions {
fn default() -> Self {
Self {
flush_interval: Some(Duration::from_millis(100)),
#[cfg(feature = "wal_disable")]
wal_enabled: true,
manifest_poll_interval: Duration::from_secs(1),
min_filter_keys: 1000,
max_unflushed_bytes: 1_073_741_824,
l0_sst_size_bytes: 64 * 1024 * 1024,
l0_max_ssts: 8,
compactor_options: Some(CompactorOptions::default()),
compression_codec: None,
object_store_cache_options: ObjectStoreCacheOptions::default(),
block_cache: default_block_cache(),
garbage_collector_options: Some(GarbageCollectorOptions::default()),
filter_bits_per_key: 10,
clock: default_clock(),
default_ttl: None,
}
}
}
#[derive(Clone, Deserialize, Serialize)]
pub struct DbReaderOptions {
pub manifest_poll_interval: Duration,
pub checkpoint_lifetime: Duration,
pub max_memtable_bytes: u64,
#[serde(skip)]
pub block_cache: Option<Arc<dyn DbCache>>,
}
impl Default for DbReaderOptions {
fn default() -> Self {
Self {
manifest_poll_interval: Duration::from_secs(10),
checkpoint_lifetime: Duration::from_secs(10 * 60),
max_memtable_bytes: 64 * 1024 * 1024,
block_cache: default_block_cache(),
}
}
}
#[allow(unreachable_code)]
fn default_block_cache() -> Option<Arc<dyn DbCache>> {
#[cfg(feature = "moka")]
{
return Some(Arc::new(crate::db_cache::moka::MokaCache::new()));
}
#[cfg(feature = "foyer")]
{
return Some(Arc::new(crate::db_cache::foyer::FoyerCache::new()));
}
None
}
#[non_exhaustive]
#[derive(Clone, Copy, Deserialize, PartialEq, Debug, Serialize)]
pub enum CompressionCodec {
#[cfg(feature = "snappy")]
Snappy,
#[cfg(feature = "zlib")]
Zlib,
#[cfg(feature = "lz4")]
Lz4,
#[cfg(feature = "zstd")]
Zstd,
}
impl FromStr for CompressionCodec {
type Err = SlateDBError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
#[cfg(feature = "snappy")]
"snappy" => Ok(Self::Snappy),
#[cfg(feature = "zlib")]
"zlib" => Ok(Self::Zlib),
#[cfg(feature = "lz4")]
"lz4" => Ok(Self::Lz4),
#[cfg(feature = "zstd")]
"zstd" => Ok(Self::Zstd),
_ => Err(SlateDBError::InvalidCompressionCodec),
}
}
}
pub trait CompactionSchedulerSupplier: Send + Sync {
fn compaction_scheduler(&self) -> Box<dyn CompactionScheduler>;
}
#[derive(Clone, Deserialize, Serialize)]
pub struct CompactorOptions {
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
pub poll_interval: Duration,
pub max_sst_size: usize,
#[serde(skip, default = "default_compaction_scheduler")]
pub compaction_scheduler: Arc<dyn CompactionSchedulerSupplier>,
pub max_concurrent_compactions: usize,
#[serde(skip)]
pub compaction_runtime: Option<Handle>,
}
impl Default for CompactorOptions {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(5),
max_sst_size: 1024 * 1024 * 1024,
compaction_scheduler: default_compaction_scheduler(),
max_concurrent_compactions: 4,
compaction_runtime: None,
}
}
}
impl std::fmt::Debug for CompactorOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompactorOptions")
.field("poll_interval", &self.poll_interval)
.field("max_sst_size", &self.max_sst_size)
.field(
"max_concurrent_compactions",
&self.max_concurrent_compactions,
)
.finish()
}
}
fn default_compaction_scheduler() -> Arc<dyn CompactionSchedulerSupplier> {
Arc::new(SizeTieredCompactionSchedulerSupplier::default())
}
#[derive(Clone)]
pub struct SizeTieredCompactionSchedulerOptions {
pub min_compaction_sources: usize,
pub max_compaction_sources: usize,
pub include_size_threshold: f32,
}
impl Default for SizeTieredCompactionSchedulerOptions {
fn default() -> Self {
Self {
min_compaction_sources: 4,
max_compaction_sources: 8,
include_size_threshold: 4.0,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct GarbageCollectorOptions {
pub manifest_options: Option<GarbageCollectorDirectoryOptions>,
pub wal_options: Option<GarbageCollectorDirectoryOptions>,
pub compacted_options: Option<GarbageCollectorDirectoryOptions>,
#[serde(skip)]
pub gc_runtime: Option<Handle>,
}
impl Default for GarbageCollectorDirectoryOptions {
fn default() -> Self {
Self {
execution_mode: Periodic(Duration::from_secs(300)),
min_age: Duration::from_secs(86_400),
}
}
}
#[derive(Clone, Copy, Deserialize, Serialize, Debug)]
#[serde(tag = "mode", content = "config")]
pub enum GcExecutionMode {
Once,
Periodic(
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
Duration,
),
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub struct GarbageCollectorDirectoryOptions {
pub execution_mode: GcExecutionMode,
#[serde(deserialize_with = "deserialize_duration")]
#[serde(serialize_with = "serialize_duration")]
pub min_age: Duration,
}
impl Default for GarbageCollectorOptions {
fn default() -> Self {
Self {
manifest_options: Some(Default::default()),
wal_options: Some(GarbageCollectorDirectoryOptions {
execution_mode: Periodic(Duration::from_secs(60)),
min_age: Duration::from_secs(60),
}),
compacted_options: Some(Default::default()),
gc_runtime: None,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ObjectStoreCacheOptions {
pub root_folder: Option<std::path::PathBuf>,
pub max_cache_size_bytes: Option<usize>,
pub part_size_bytes: usize,
#[serde(deserialize_with = "deserialize_option_duration")]
#[serde(
serialize_with = "serialize_option_duration",
skip_serializing_if = "Option::is_none"
)]
pub scan_interval: Option<Duration>,
}
impl Default for ObjectStoreCacheOptions {
fn default() -> Self {
Self {
root_folder: None,
max_cache_size_bytes: Some(16 * 1024 * 1024 * 1024),
part_size_bytes: 4 * 1024 * 1024,
scan_interval: Some(Duration::from_secs(3600)),
}
}
}
fn serialize_duration<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let secs = duration.as_secs();
let millis = duration.subsec_millis();
let duration_str = if secs > 0 && millis > 0 {
format!("{secs}s+{millis:03}ms")
} else if millis > 0 {
format!("{millis:03}ms")
} else {
format!("{secs}s")
};
serializer.serialize_str(&duration_str)
}
fn serialize_option_duration<S>(
duration: &Option<Duration>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match duration {
Some(d) => serialize_duration(d, serializer),
None => serializer.serialize_none(),
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use super::*;
#[test]
fn test_db_options_load_from_env() {
figment::Jail::expect_with(|jail| {
jail.set_env("SLATEDB_FLUSH_INTERVAL", "1s");
jail.set_env(
"SLATEDB_OBJECT_STORE_CACHE_OPTIONS.ROOT_FOLDER",
"/tmp/slatedb-root",
);
let options = DbOptions::from_env("SLATEDB_")
.expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_db_options_load_from_json_file() {
figment::Jail::expect_with(|jail| {
jail.create_file(
"config.json",
r#"
{
"flush_interval": "1s",
"object_store_cache_options": {
"root_folder": "/tmp/slatedb-root"
}
}
"#,
)
.expect("failed to create db options config file");
let options = DbOptions::from_file("config.json")
.expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_db_options_load_from_toml_file() {
figment::Jail::expect_with(|jail| {
jail.create_file(
"config.toml",
r#"
flush_interval = "1s"
[object_store_cache_options]
root_folder = "/tmp/slatedb-root"
"#,
)
.expect("failed to create db options config file");
let options = DbOptions::from_file("config.toml")
.expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_db_options_load_from_yaml_file() {
figment::Jail::expect_with(|jail| {
jail.create_file(
"config.yaml",
r#"
flush_interval: "1s"
object_store_cache_options:
root_folder: "/tmp/slatedb-root"
"#,
)
.expect("failed to create db options config file");
let options = DbOptions::from_file("config.yaml")
.expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
#[test]
fn test_db_options_load_with_default_locations() {
figment::Jail::expect_with(|jail| {
jail.set_env("SLATEDB_FLUSH_INTERVAL", "1s");
jail.create_file(
"SlateDb.yaml",
r#"
object_store_cache_options:
root_folder: "/tmp/slatedb-root"
"#,
)
.expect("failed to create db options config file");
let options = DbOptions::load().expect("failed to load db options from environment");
assert_eq!(Some(Duration::from_secs(1)), options.flush_interval);
assert_eq!(
Some(PathBuf::from("/tmp/slatedb-root")),
options.object_store_cache_options.root_folder
);
Ok(())
});
}
}