#![allow(dead_code)]
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::mpsc::{self, Receiver, Sender};
use std::time::Duration;
use parking_lot::RwLock;
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("invalid value for {field}: {message}")]
InvalidField {
field: &'static str,
message: String,
},
}
#[derive(Clone, Debug)]
pub struct DbConfig {
pub data_dir: PathBuf,
pub manifest_path: PathBuf,
pub wal_path: PathBuf,
pub cache_bytes: u64,
pub wal_group_interval_ms: u64,
pub wal_max_batch_ops: usize,
pub wal_group_bytes: u64,
pub memtable_limit_bytes: u64,
pub wal_tail_bytes: Option<u64>,
pub compaction_max_concurrency: usize,
pub wal_fsync_on_commit: bool,
}
impl DbConfig {
pub fn new<P: Into<PathBuf>>(data_dir: P) -> Self {
let data_dir = data_dir.into();
let manifest_path = data_dir.join("manifest.json");
let wal_path = data_dir.join("wal.log");
Self {
data_dir,
manifest_path,
wal_path,
cache_bytes: 256 * 1024 * 1024,
wal_group_interval_ms: 1,
wal_max_batch_ops: 1024,
wal_group_bytes: 4 * 1024 * 1024,
memtable_limit_bytes: 512 * 1024 * 1024,
wal_tail_bytes: Some(64 * 1024 * 1024),
compaction_max_concurrency: 2,
wal_fsync_on_commit: true,
}
}
pub fn validate(&self) -> Result<(), ConfigError> {
if self.cache_bytes < 8 * 1024 * 1024 {
return Err(ConfigError::InvalidField {
field: "cache_bytes",
message: "must be at least 8 MiB".into(),
});
}
if !(1..=60_000).contains(&self.wal_group_interval_ms) {
return Err(ConfigError::InvalidField {
field: "wal_group_interval_ms",
message: "expected a value between 1 and 60000".into(),
});
}
if self.wal_max_batch_ops == 0 {
return Err(ConfigError::InvalidField {
field: "wal_max_batch_ops",
message: "must be greater than zero".into(),
});
}
if self.wal_group_bytes < 4 * 1024 {
return Err(ConfigError::InvalidField {
field: "wal_group_bytes",
message: "should be at least 4 KiB".into(),
});
}
if self.memtable_limit_bytes < 64 * 1024 * 1024 {
return Err(ConfigError::InvalidField {
field: "memtable_limit_bytes",
message: "must be at least 64 MiB".into(),
});
}
if self.compaction_max_concurrency == 0 {
return Err(ConfigError::InvalidField {
field: "compaction_max_concurrency",
message: "must be greater than zero".into(),
});
}
Ok(())
}
pub fn wal_group_interval(&self) -> Duration {
Duration::from_millis(self.wal_group_interval_ms.max(1) as u64)
}
pub fn wal_tail_bytes(&self) -> Option<u64> {
self.wal_tail_bytes
}
}
#[derive(Clone, Debug)]
pub struct ConfigManager {
current: Arc<RwLock<DbConfig>>,
listeners: Arc<RwLock<Vec<Sender<DbConfig>>>>,
}
impl ConfigManager {
pub fn new(config: DbConfig) -> Result<Self, ConfigError> {
config.validate()?;
Ok(Self {
current: Arc::new(RwLock::new(config)),
listeners: Arc::new(RwLock::new(Vec::new())),
})
}
pub fn current(&self) -> DbConfig {
self.current.read().clone()
}
pub fn subscribe(&self) -> Receiver<DbConfig> {
let (tx, rx) = mpsc::channel();
tx.send(self.current()).ok();
self.listeners.write().push(tx);
rx
}
pub fn reload(&self, updated: DbConfig) -> Result<(), ConfigError> {
updated.validate()?;
{
let mut guard = self.current.write();
*guard = updated.clone();
}
let mut listeners = self.listeners.write();
listeners.retain(|listener| listener.send(updated.clone()).is_ok());
Ok(())
}
pub fn update<F>(&self, func: F) -> Result<(), ConfigError>
where
F: FnOnce(&mut DbConfig),
{
let mut next = self.current();
func(&mut next);
self.reload(next)
}
}