use crate::types::schema::TableSchema;
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Clone)]
pub struct MirrorConfig {
pub target: Arc<dyn crate::store::traits::MeruStore>,
pub max_lag_alert_secs: u64,
pub mirror_parallelism: usize,
}
impl std::fmt::Debug for MirrorConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MirrorConfig")
.field("target", &"Arc<dyn MeruStore>")
.field("max_lag_alert_secs", &self.max_lag_alert_secs)
.field("mirror_parallelism", &self.mirror_parallelism)
.finish()
}
}
impl MirrorConfig {
pub fn new(target: Arc<dyn crate::store::traits::MeruStore>) -> Self {
Self {
target,
max_lag_alert_secs: 60,
mirror_parallelism: 4,
}
}
pub fn max_lag_alert_secs(mut self, secs: u64) -> Self {
self.max_lag_alert_secs = secs;
self
}
pub fn mirror_parallelism(mut self, n: usize) -> Self {
self.mirror_parallelism = n.max(1);
self
}
}
#[derive(Clone, Debug)]
pub struct OpenOptions {
pub schema: TableSchema,
pub catalog_uri: String,
pub object_store_url: String,
pub wal_dir: PathBuf,
pub memtable_size_mb: usize,
pub max_immutable_count: usize,
pub row_cache_capacity: usize,
pub level_target_bytes: Vec<u64>,
pub l0_compaction_trigger: usize,
pub l0_slowdown_trigger: usize,
pub l0_stop_trigger: usize,
pub bloom_bits_per_key: u8,
pub max_compaction_bytes: u64,
pub max_compaction_input_rows: u64,
pub flush_parallelism: usize,
pub compaction_parallelism: usize,
pub gc_grace_period_secs: u64,
pub read_only: bool,
pub dual_format_max_level: Option<u8>,
pub mirror: Option<MirrorConfig>,
}
impl OpenOptions {
pub fn new(schema: TableSchema) -> Self {
let ec = crate::engine::config::EngineConfig::default();
Self {
schema,
catalog_uri: String::new(),
object_store_url: String::new(),
wal_dir: ec.wal_dir,
memtable_size_mb: ec.memtable_size_bytes / (1024 * 1024),
max_immutable_count: ec.max_immutable_count,
row_cache_capacity: ec.row_cache_capacity,
level_target_bytes: ec.level_target_bytes,
l0_compaction_trigger: ec.l0_compaction_trigger,
l0_slowdown_trigger: ec.l0_slowdown_trigger,
l0_stop_trigger: ec.l0_stop_trigger,
bloom_bits_per_key: ec.bloom_bits_per_key,
max_compaction_bytes: ec.max_compaction_bytes,
max_compaction_input_rows: ec.max_compaction_input_rows,
flush_parallelism: ec.flush_parallelism,
compaction_parallelism: ec.compaction_parallelism,
gc_grace_period_secs: ec.gc_grace_period_secs,
read_only: ec.read_only,
dual_format_max_level: ec.dual_format_max_level,
mirror: None,
}
}
pub fn mirror(mut self, cfg: MirrorConfig) -> Self {
self.mirror = Some(cfg);
self
}
pub fn dual_format_max_level(mut self, max: Option<u8>) -> Self {
self.dual_format_max_level = max;
self
}
pub fn catalog_uri(mut self, uri: impl Into<String>) -> Self {
self.catalog_uri = uri.into();
self
}
pub fn object_store(mut self, url: impl Into<String>) -> Self {
self.object_store_url = url.into();
self
}
pub fn wal_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.wal_dir = dir.into();
self
}
pub fn memtable_size_mb(mut self, mb: usize) -> Self {
self.memtable_size_mb = mb;
self
}
pub fn max_immutable_count(mut self, n: usize) -> Self {
self.max_immutable_count = n;
self
}
pub fn row_cache_capacity(mut self, capacity: usize) -> Self {
self.row_cache_capacity = capacity;
self
}
pub fn level_target_bytes(mut self, targets: Vec<u64>) -> Self {
self.level_target_bytes = targets;
self
}
pub fn l0_compaction_trigger(mut self, n: usize) -> Self {
self.l0_compaction_trigger = n;
self
}
pub fn l0_slowdown_trigger(mut self, n: usize) -> Self {
self.l0_slowdown_trigger = n;
self
}
pub fn l0_stop_trigger(mut self, n: usize) -> Self {
self.l0_stop_trigger = n;
self
}
pub fn bloom_bits_per_key(mut self, bits: u8) -> Self {
self.bloom_bits_per_key = bits;
self
}
pub fn max_compaction_input_rows(mut self, rows: u64) -> Self {
self.max_compaction_input_rows = rows;
self
}
pub fn max_compaction_bytes(mut self, bytes: u64) -> Self {
self.max_compaction_bytes = bytes;
self
}
pub fn flush_parallelism(mut self, n: usize) -> Self {
self.flush_parallelism = n;
self
}
pub fn compaction_parallelism(mut self, n: usize) -> Self {
self.compaction_parallelism = n;
self
}
pub fn gc_grace_period_secs(mut self, secs: u64) -> Self {
self.gc_grace_period_secs = secs;
self
}
pub fn read_only(mut self, enabled: bool) -> Self {
self.read_only = enabled;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::local::LocalFileStore;
#[test]
fn mirror_defaults_are_production_sane() {
let tmp = tempfile::tempdir().unwrap();
let store = Arc::new(LocalFileStore::new(tmp.path()).unwrap());
let cfg = MirrorConfig::new(store);
assert_eq!(cfg.max_lag_alert_secs, 60);
assert_eq!(cfg.mirror_parallelism, 4);
}
#[test]
fn mirror_parallelism_floored_at_one() {
let tmp = tempfile::tempdir().unwrap();
let store = Arc::new(LocalFileStore::new(tmp.path()).unwrap());
let cfg = MirrorConfig::new(store).mirror_parallelism(0);
assert_eq!(cfg.mirror_parallelism, 1, "zero coerced to one");
}
}