use std::{
env, fmt,
path::{Path, PathBuf},
};
use figment::{
Figment,
providers::{Env, Format, Yaml},
};
use serde::{
Deserialize, Serialize,
de::{self, Deserializer, Visitor},
ser::Serializer,
};
const EMBEDDED_DEFAULT: &str = include_str!("config.yaml");
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("config load failed: {0}")]
Figment(Box<figment::Error>),
}
impl From<figment::Error> for ConfigError {
fn from(e: figment::Error) -> Self {
Self::Figment(Box::new(e))
}
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct Config {
#[serde(default)]
pub supertable: SupertableSettings,
#[serde(default)]
pub storage: StorageSettings,
#[serde(default)]
pub compaction: CompactionSettings,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(default)]
pub struct SupertableSettings {
pub reader_threads: ThreadCount,
pub writer_threads: ThreadCount,
pub id_column: String,
pub commit_threshold_size_mb: u64,
pub verify_crc_on_open: bool,
}
impl Default for SupertableSettings {
fn default() -> Self {
Self {
reader_threads: ThreadCount::default(),
writer_threads: ThreadCount::default(),
id_column: default_id_column(),
commit_threshold_size_mb: DEFAULT_COMMIT_THRESHOLD_SIZE_MB,
verify_crc_on_open: DEFAULT_VERIFY_CRC_ON_OPEN,
}
}
}
const DEFAULT_COMMIT_THRESHOLD_SIZE_MB: u64 = 1024;
const DEFAULT_VERIFY_CRC_ON_OPEN: bool = true;
const DEFAULT_COMPACTION_TARGET_SUPERFILE_SIZE_MB: u64 = 1024;
const DEFAULT_COMPACTION_MIN_FILL_PERCENT: u8 = 80;
const DEFAULT_COMPACTION_MAX_MEMORY_MB: u64 = DEFAULT_COMPACTION_TARGET_SUPERFILE_SIZE_MB + 2048;
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(default)]
pub struct CompactionSettings {
pub target_superfile_size_mb: u64,
pub min_fill_percent: u8,
pub max_memory_mb: u64,
}
impl Default for CompactionSettings {
fn default() -> Self {
Self {
target_superfile_size_mb: DEFAULT_COMPACTION_TARGET_SUPERFILE_SIZE_MB,
min_fill_percent: DEFAULT_COMPACTION_MIN_FILL_PERCENT,
max_memory_mb: DEFAULT_COMPACTION_MAX_MEMORY_MB,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct OptimizeOptions {
pub(crate) compaction: CompactionSettings,
}
impl OptimizeOptions {
pub fn compact(settings: CompactionSettings) -> Self {
Self {
compaction: settings,
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum StorageBackend {
#[default]
None,
LocalFs,
S3,
Azure,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum StorageColdFetchMode {
HybridWithPrefetch,
RangeOnly,
#[default]
LazyForegroundWithBackgroundFill,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(default)]
pub struct StorageSettings {
pub backend: StorageBackend,
pub local_root: Option<PathBuf>,
pub bucket: Option<String>,
pub prefix: String,
pub disk_cache_root: Option<PathBuf>,
pub disk_budget_bytes: u64,
pub manifest_disk_budget_bytes: u64,
pub cold_fetch_mode: StorageColdFetchMode,
pub cold_fetch_streams: usize,
pub cold_fetch_chunk_bytes: u64,
pub prefetch_concurrency: usize,
pub mmap_cold_threshold_secs: u64,
pub mmap_sweep_interval_secs: u64,
}
impl Default for StorageSettings {
fn default() -> Self {
Self {
backend: StorageBackend::None,
local_root: None,
bucket: None,
prefix: String::new(),
disk_cache_root: None,
disk_budget_bytes: DEFAULT_DISK_BUDGET_BYTES,
manifest_disk_budget_bytes: DEFAULT_MANIFEST_DISK_BUDGET_BYTES,
cold_fetch_mode: StorageColdFetchMode::LazyForegroundWithBackgroundFill,
cold_fetch_streams: DEFAULT_COLD_FETCH_STREAMS,
cold_fetch_chunk_bytes: DEFAULT_COLD_FETCH_CHUNK_BYTES,
prefetch_concurrency: DEFAULT_PREFETCH_CONCURRENCY,
mmap_cold_threshold_secs: DEFAULT_MMAP_COLD_THRESHOLD_SECS,
mmap_sweep_interval_secs: DEFAULT_MMAP_SWEEP_INTERVAL_SECS,
}
}
}
const DEFAULT_DISK_BUDGET_BYTES: u64 = 10 * (1 << 30);
const DEFAULT_MANIFEST_DISK_BUDGET_BYTES: u64 = 2 * (1 << 30);
const DEFAULT_COLD_FETCH_STREAMS: usize = 8;
const DEFAULT_COLD_FETCH_CHUNK_BYTES: u64 = 4 * (1 << 20);
const DEFAULT_PREFETCH_CONCURRENCY: usize = 8;
const DEFAULT_MMAP_COLD_THRESHOLD_SECS: u64 = 300;
const DEFAULT_MMAP_SWEEP_INTERVAL_SECS: u64 = 75;
fn default_id_column() -> String {
"_id".to_string()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ThreadCount {
#[default]
Auto,
Fixed(usize),
}
impl ThreadCount {
pub fn resolve_or_default(self, default_for_auto: usize) -> usize {
match self {
Self::Auto => default_for_auto.max(1),
Self::Fixed(n) => n.max(1),
}
}
}
impl<'de> Deserialize<'de> for ThreadCount {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
struct V;
impl<'de> Visitor<'de> for V {
type Value = ThreadCount;
fn expecting(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("\"auto\" or a positive integer")
}
fn visit_str<E: de::Error>(self, v: &str) -> Result<Self::Value, E> {
if v.eq_ignore_ascii_case("auto") {
Ok(ThreadCount::Auto)
} else {
v.parse::<usize>().map(ThreadCount::Fixed).map_err(|e| {
de::Error::custom(format!(
"thread count must be \"auto\" or a positive integer; \
got {v:?} ({e})"
))
})
}
}
fn visit_string<E: de::Error>(self, v: String) -> Result<Self::Value, E> {
self.visit_str(&v)
}
fn visit_u64<E: de::Error>(self, v: u64) -> Result<Self::Value, E> {
Ok(ThreadCount::Fixed(v as usize))
}
fn visit_i64<E: de::Error>(self, v: i64) -> Result<Self::Value, E> {
if v < 0 {
Err(de::Error::custom("thread count must be ≥ 0"))
} else {
Ok(ThreadCount::Fixed(v as usize))
}
}
}
d.deserialize_any(V)
}
}
impl Serialize for ThreadCount {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
match self {
Self::Auto => s.serialize_str("auto"),
Self::Fixed(n) => s.serialize_u64(*n as u64),
}
}
}
impl Config {
pub fn load() -> Result<Self, ConfigError> {
Self::from_figment(default_figment())
}
pub fn defaults() -> Result<Self, ConfigError> {
Ok(Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.extract()?)
}
pub fn from_figment(fig: Figment) -> Result<Self, ConfigError> {
Ok(fig.extract()?)
}
}
fn default_figment() -> Figment {
let mut fig = Figment::new().merge(Yaml::string(EMBEDDED_DEFAULT));
let etc = Path::new("/etc/infino/config.yaml");
if etc.is_file() {
fig = fig.merge(Yaml::file(etc));
}
if let Some(p) = user_config_path()
&& p.is_file()
{
fig = fig.merge(Yaml::file(p));
}
let cwd = Path::new("./infino.yaml");
if cwd.is_file() {
fig = fig.merge(Yaml::file(cwd));
}
fig.merge(Env::prefixed("INFINO_").split("__"))
}
fn user_config_path() -> Option<PathBuf> {
if let Ok(xdg) = env::var("XDG_CONFIG_HOME") {
return Some(PathBuf::from(xdg).join("infino/config.yaml"));
}
env::var("HOME")
.ok()
.map(|h| PathBuf::from(h).join(".config/infino/config.yaml"))
}
#[cfg(test)]
mod tests {
use std::{env, sync::Mutex};
use figment::providers::Serialized;
use serde_json::json;
use super::*;
static ENV_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn embedded_default_loads_with_expected_value() {
let cfg = Config::defaults().expect("embedded default must parse");
assert_eq!(cfg.supertable.commit_threshold_size_mb, 1024);
}
#[test]
fn env_overrides_default() {
let _g = ENV_LOCK.lock().expect("acquire lock");
unsafe { env::set_var("INFINO_SUPERTABLE__COMMIT_THRESHOLD_SIZE_MB", "2048") };
let cfg = Config::load().expect("load with env override");
assert_eq!(cfg.supertable.commit_threshold_size_mb, 2048);
unsafe { env::remove_var("INFINO_SUPERTABLE__COMMIT_THRESHOLD_SIZE_MB") };
}
#[test]
fn missing_env_falls_through_to_default() {
let _g = ENV_LOCK.lock().expect("acquire lock");
unsafe { env::remove_var("INFINO_SUPERTABLE__COMMIT_THRESHOLD_SIZE_MB") };
let cfg = Config::load().expect("load with no env override");
assert_eq!(cfg.supertable.commit_threshold_size_mb, 1024);
}
#[test]
fn from_figment_with_yaml_layer_overrides_default() {
let yaml = r#"
supertable:
commit_threshold_size_mb: 512
"#;
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Yaml::string(yaml));
let cfg = Config::from_figment(fig).expect("layered yaml");
assert_eq!(cfg.supertable.commit_threshold_size_mb, 512);
}
#[test]
fn embedded_default_storage_is_in_memory_only() {
let cfg = Config::defaults().expect("embedded default must parse");
assert_eq!(cfg.storage.backend, StorageBackend::None);
assert_eq!(cfg.storage.bucket, None);
assert_eq!(cfg.storage.disk_cache_root, None);
}
#[test]
fn storage_s3_config_parses_bucket_prefix_and_cache() {
let yaml = r#"
storage:
backend: s3
bucket: example-bucket
prefix: infino-real-s3-integration/example
disk_cache_root: /tmp/infino-cache
cold_fetch_mode: lazy_foreground_with_background_fill
cold_fetch_streams: 8
cold_fetch_chunk_bytes: 4194304
"#;
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Yaml::string(yaml));
let cfg = Config::from_figment(fig).expect("parse config");
assert_eq!(cfg.storage.backend, StorageBackend::S3);
assert_eq!(cfg.storage.bucket.as_deref(), Some("example-bucket"));
assert_eq!(cfg.storage.prefix, "infino-real-s3-integration/example");
assert_eq!(
cfg.storage.disk_cache_root.as_deref(),
Some(Path::new("/tmp/infino-cache"))
);
assert_eq!(
cfg.storage.cold_fetch_mode,
StorageColdFetchMode::LazyForegroundWithBackgroundFill
);
}
#[test]
fn storage_azure_config_parses_container_as_bucket() {
let yaml = r#"
storage:
backend: azure
bucket: infino-azure-container
prefix: infino-real-azure-integration/example
"#;
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Yaml::string(yaml));
let cfg = Config::from_figment(fig).expect("parse config");
assert_eq!(cfg.storage.backend, StorageBackend::Azure);
assert_eq!(
cfg.storage.bucket.as_deref(),
Some("infino-azure-container")
);
assert_eq!(cfg.storage.prefix, "infino-real-azure-integration/example");
}
#[test]
fn last_yaml_wins_among_layers() {
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Yaml::string(
"supertable:\n commit_threshold_size_mb: 256\n",
))
.merge(Yaml::string(
"supertable:\n commit_threshold_size_mb: 4096\n",
));
let cfg = Config::from_figment(fig).expect("parse config");
assert_eq!(cfg.supertable.commit_threshold_size_mb, 4096);
}
#[test]
fn invalid_value_type_errors_clearly() {
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Yaml::string(
"supertable:\n commit_threshold_size_mb: \"not-a-number\"\n",
));
let err = Config::from_figment(fig).expect_err("expected error");
let msg = err.to_string();
assert!(
msg.contains("commit_threshold_size_mb")
|| msg.contains("invalid type")
|| msg.contains("expected"),
"expected a typed-error message; got {msg:?}"
);
}
#[test]
fn programmatic_override_via_serialized_provider() {
#[derive(Serialize)]
struct SupertableOverride {
commit_threshold_size_mb: u64,
}
#[derive(Serialize)]
struct Override {
supertable: SupertableOverride,
}
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Serialized::defaults(Override {
supertable: SupertableOverride {
commit_threshold_size_mb: 16,
},
}));
let cfg = Config::from_figment(fig).expect("parse config");
assert_eq!(cfg.supertable.commit_threshold_size_mb, 16);
}
#[test]
fn user_config_path_uses_xdg_when_set() {
let _g = ENV_LOCK.lock().expect("acquire lock");
unsafe { env::set_var("XDG_CONFIG_HOME", "/tmp/xdg-test") };
let p = user_config_path().expect("path");
assert_eq!(p, PathBuf::from("/tmp/xdg-test/infino/config.yaml"));
unsafe { env::remove_var("XDG_CONFIG_HOME") };
}
#[test]
fn supertable_defaults_are_auto() {
let cfg = Config::defaults().expect("embedded default must parse");
assert_eq!(cfg.supertable.reader_threads, ThreadCount::Auto);
assert_eq!(cfg.supertable.writer_threads, ThreadCount::Auto);
}
#[test]
fn thread_count_parses_auto_string() {
let yaml = r#"
commit_threshold_size_mb: 1024
supertable:
reader_threads: auto
writer_threads: AUTO
"#;
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
assert_eq!(cfg.supertable.reader_threads, ThreadCount::Auto);
assert_eq!(cfg.supertable.writer_threads, ThreadCount::Auto);
}
#[test]
fn thread_count_parses_integer() {
let yaml = r#"
commit_threshold_size_mb: 1024
supertable:
reader_threads: 8
writer_threads: 4
"#;
let cfg =
Config::from_figment(Figment::new().merge(Yaml::string(yaml))).expect("parse config");
assert_eq!(cfg.supertable.reader_threads, ThreadCount::Fixed(8));
assert_eq!(cfg.supertable.writer_threads, ThreadCount::Fixed(4));
}
#[test]
fn thread_count_rejects_garbage_string() {
let yaml = r#"
commit_threshold_size_mb: 1024
supertable:
reader_threads: banana
"#;
let err = Config::from_figment(Figment::new().merge(Yaml::string(yaml)))
.expect_err("expected error");
let msg = err.to_string();
assert!(
msg.contains("auto") || msg.contains("positive integer") || msg.contains("banana"),
"expected a typed-error message; got {msg:?}"
);
}
#[test]
fn thread_count_resolve_clamps_to_one() {
assert_eq!(ThreadCount::Auto.resolve_or_default(0), 1);
assert_eq!(ThreadCount::Fixed(0).resolve_or_default(8), 1);
assert_eq!(ThreadCount::Auto.resolve_or_default(7), 7);
assert_eq!(ThreadCount::Fixed(3).resolve_or_default(8), 3);
}
#[test]
fn nested_env_var_overrides_supertable_field() {
let _g = ENV_LOCK.lock().expect("acquire lock");
unsafe {
env::set_var("INFINO_SUPERTABLE__WRITER_THREADS", "4");
env::set_var("INFINO_SUPERTABLE__READER_THREADS", "auto");
}
let cfg = Config::load().expect("load with nested env override");
assert_eq!(cfg.supertable.writer_threads, ThreadCount::Fixed(4));
assert_eq!(cfg.supertable.reader_threads, ThreadCount::Auto);
unsafe {
env::remove_var("INFINO_SUPERTABLE__WRITER_THREADS");
env::remove_var("INFINO_SUPERTABLE__READER_THREADS");
}
}
#[test]
fn user_config_path_falls_back_to_home() {
let _g = ENV_LOCK.lock().expect("acquire lock");
unsafe {
env::remove_var("XDG_CONFIG_HOME");
env::set_var("HOME", "/tmp/home-test");
}
let p = user_config_path().expect("path");
assert_eq!(
p,
PathBuf::from("/tmp/home-test/.config/infino/config.yaml")
);
unsafe { env::remove_var("HOME") };
}
#[test]
fn embedded_default_compaction_matches_spec() {
let cfg = Config::defaults().expect("embedded default must parse");
let c = &cfg.compaction;
assert_eq!(
c.target_superfile_size_mb,
DEFAULT_COMPACTION_TARGET_SUPERFILE_SIZE_MB
);
assert_eq!(c.min_fill_percent, DEFAULT_COMPACTION_MIN_FILL_PERCENT);
assert_eq!(
c.max_memory_mb, DEFAULT_COMPACTION_MAX_MEMORY_MB,
"target + 2048"
);
}
#[test]
fn compaction_struct_default_equals_embedded_yaml() {
let cfg = Config::defaults().expect("embedded default must parse");
assert_eq!(cfg.compaction, CompactionSettings::default());
}
#[test]
fn compaction_yaml_layer_overrides_defaults() {
let yaml = r#"
compaction:
target_superfile_size_mb: 2048
min_fill_percent: 50
"#;
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Yaml::string(yaml));
let cfg = Config::from_figment(fig).expect("layered yaml");
assert_eq!(cfg.compaction.target_superfile_size_mb, 2048);
assert_eq!(cfg.compaction.min_fill_percent, 50);
assert_eq!(cfg.compaction.max_memory_mb, 3072);
}
#[test]
fn compaction_nested_env_var_overrides_field() {
let _g = ENV_LOCK.lock().expect("acquire lock");
unsafe {
env::set_var("INFINO_COMPACTION__TARGET_SUPERFILE_SIZE_MB", "4096");
env::set_var("INFINO_COMPACTION__MIN_FILL_PERCENT", "60");
}
let cfg = Config::load().expect("load with compaction env override");
assert_eq!(cfg.compaction.target_superfile_size_mb, 4096);
assert_eq!(cfg.compaction.min_fill_percent, 60);
unsafe {
env::remove_var("INFINO_COMPACTION__TARGET_SUPERFILE_SIZE_MB");
env::remove_var("INFINO_COMPACTION__MIN_FILL_PERCENT");
}
}
#[test]
fn compaction_invalid_value_type_errors_clearly() {
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Yaml::string(
"compaction:\n target_superfile_size_mb: \"not-a-number\"\n",
));
let err = Config::from_figment(fig).expect_err("expected error");
let msg = err.to_string();
assert!(
msg.contains("target_superfile_size_mb")
|| msg.contains("invalid type")
|| msg.contains("expected"),
"expected a typed-error message; got {msg:?}"
);
}
#[test]
fn compaction_min_fill_percent_rejects_out_of_u8_range() {
let fig = Figment::new()
.merge(Yaml::string(EMBEDDED_DEFAULT))
.merge(Yaml::string("compaction:\n min_fill_percent: 256\n"));
let err = Config::from_figment(fig).expect_err("expected error");
let msg = err.to_string();
assert!(
msg.contains("min_fill_percent")
|| msg.contains("256")
|| msg.contains("u8")
|| msg.contains("out of range")
|| msg.contains("invalid value"),
"expected an out-of-range message; got {msg:?}"
);
}
#[test]
fn thread_count_serde_round_trips_and_rejects_bad_types() {
assert_eq!(
serde_json::to_value(ThreadCount::Auto).expect("serialize auto"),
json!("auto")
);
assert_eq!(
serde_json::to_value(ThreadCount::Fixed(8)).expect("serialize fixed"),
json!(8)
);
let tc: ThreadCount =
serde_json::from_value(json!("auto")).expect("deserialize owned string");
assert!(matches!(tc, ThreadCount::Auto));
assert!(serde_json::from_str::<ThreadCount>("-1").is_err());
assert!(serde_json::from_str::<ThreadCount>("true").is_err());
}
}