use eyre::{Report, eyre};
use once_cell::sync::Lazy;
use std::cell::RefCell;
use std::{path::Path, sync::RwLock, time::Duration};
thread_local! {
static TLS_CONFIG_DIR: RefCell<Option<String>> = const { RefCell::new(None) };
}
#[derive(Debug, Clone, Default)]
pub struct EnvironmentConfig {
pub quic_idle_ms: Option<u64>,
pub backoff_ms: Option<u64>,
pub interval_ms: Option<u64>,
pub connect_timeout_ms: Option<u64>,
pub auth_timeout_secs: Option<u64>,
pub heartbeat_secs: Option<u64>,
pub join_timeout_secs: Option<u64>,
pub peer_cleanup_secs: Option<u64>,
pub peer_check_secs: Option<u64>,
pub rebalance_improvement: Option<f64>,
pub worker_idle_multiplier: Option<u64>,
pub rebalance_startup_grace_secs: Option<u64>,
pub rebalance_check_interval_secs: Option<u64>,
pub rebalance_min_interval_secs: Option<u64>,
pub migration_check_interval_secs: Option<u64>,
pub migration_connection_timeout_secs: Option<u64>,
pub migration_handoff_timeout_secs: Option<u64>,
pub rebalance_good_score_cutoff: Option<f64>,
pub rebalance_periodic_improvement: Option<f64>,
pub rebalance_consecutive_signals: Option<u32>,
pub rebalance_max_per_hour: Option<u32>,
pub rebalance_weight_health: Option<f64>,
pub rebalance_weight_history: Option<f64>,
pub rebalance_weight_breaker: Option<f64>,
pub rebalance_empty_delta_threshold: Option<f64>,
pub rebalance_empty_bonus_weight: Option<f64>,
pub rebalance_empty_health_cutoff: Option<f64>,
pub rebalance_health_stale_secs: Option<u64>,
pub rebalance_health_stale_factor: Option<f64>,
}
impl EnvironmentConfig {
pub const DEFAULT_HEARTBEAT_SECS: u64 = 2;
pub const DEFAULT_AUTH_TIMEOUT_SECS: u64 = 5;
pub const DEFAULT_JOIN_TIMEOUT_SECS: u64 = 30;
pub const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 10_000;
fn from_env() -> Self {
let mut cfg = Self {
quic_idle_ms: std::env::var("VOLLI_TEST_QUIC_IDLE_MS")
.ok()
.and_then(|v| v.parse().ok()),
backoff_ms: std::env::var("VOLLI_TEST_BACKOFF_MS")
.ok()
.and_then(|v| v.parse().ok()),
interval_ms: std::env::var("VOLLI_TEST_INTERVAL_MS")
.ok()
.and_then(|v| v.parse().ok()),
connect_timeout_ms: std::env::var("VOLLI_CONNECT_TIMEOUT_MS")
.ok()
.and_then(|v| v.parse().ok()),
auth_timeout_secs: std::env::var("VOLLI_TEST_AUTH_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok()),
heartbeat_secs: std::env::var("VOLLI_TEST_HEARTBEAT_SECS")
.ok()
.and_then(|v| v.parse().ok()),
join_timeout_secs: std::env::var("VOLLI_JOIN_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok()),
peer_cleanup_secs: std::env::var("VOLLI_PEER_CLEANUP_SECS")
.ok()
.and_then(|v| v.parse().ok()),
peer_check_secs: std::env::var("VOLLI_PEER_CHECK_SECS")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_improvement: std::env::var("VOLLI_REBALANCE_IMPROVEMENT")
.ok()
.and_then(|v| v.parse().ok()),
worker_idle_multiplier: std::env::var("VOLLI_WORKER_IDLE_MULTIPLIER")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_startup_grace_secs: std::env::var("VOLLI_REBALANCE_STARTUP_GRACE_SECS")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_check_interval_secs: std::env::var("VOLLI_REBALANCE_CHECK_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_min_interval_secs: std::env::var("VOLLI_REBALANCE_MIN_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok()),
migration_check_interval_secs: std::env::var("VOLLI_MIGRATION_CHECK_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok()),
migration_connection_timeout_secs: std::env::var(
"VOLLI_MIGRATION_CONNECTION_TIMEOUT_SECS",
)
.ok()
.and_then(|v| v.parse().ok()),
migration_handoff_timeout_secs: std::env::var("VOLLI_MIGRATION_HANDOFF_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_good_score_cutoff: std::env::var("VOLLI_REBALANCE_GOOD_SCORE_CUTOFF")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_periodic_improvement: std::env::var("VOLLI_REBALANCE_PERIODIC_IMPROVEMENT")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_consecutive_signals: std::env::var("VOLLI_REBALANCE_CONSECUTIVE_SIGNALS")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_max_per_hour: std::env::var("VOLLI_REBALANCE_MAX_PER_HOUR")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_weight_health: std::env::var("VOLLI_REBALANCE_WEIGHT_HEALTH")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_weight_history: std::env::var("VOLLI_REBALANCE_WEIGHT_HISTORY")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_weight_breaker: std::env::var("VOLLI_REBALANCE_WEIGHT_BREAKER")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_empty_delta_threshold: std::env::var("VOLLI_REBALANCE_EMPTY_DELTA")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_empty_bonus_weight: std::env::var("VOLLI_REBALANCE_EMPTY_BONUS_WEIGHT")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_empty_health_cutoff: std::env::var("VOLLI_REBALANCE_EMPTY_HEALTH_CUTOFF")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_health_stale_secs: std::env::var("VOLLI_REBALANCE_HEALTH_STALE_SECS")
.ok()
.and_then(|v| v.parse().ok()),
rebalance_health_stale_factor: std::env::var("VOLLI_REBALANCE_HEALTH_STALE_FACTOR")
.ok()
.and_then(|v| v.parse().ok()),
};
let fast = std::env::var("VOLLI_FAST_TESTS")
.map(|v| v != "0")
.unwrap_or_else(|_| std::env::var("NEXTEST").is_ok());
if fast {
if cfg.connect_timeout_ms.is_none() {
cfg.connect_timeout_ms = Some(50);
}
if cfg.backoff_ms.is_none() {
cfg.backoff_ms = Some(0);
}
if cfg.interval_ms.is_none() {
cfg.interval_ms = Some(1);
}
if cfg.auth_timeout_secs.is_none() {
cfg.auth_timeout_secs = Some(1);
}
if cfg.heartbeat_secs.is_none() {
cfg.heartbeat_secs = Some(1);
}
}
cfg
}
pub fn heartbeat_secs(&self) -> u64 {
self.heartbeat_secs.unwrap_or(Self::DEFAULT_HEARTBEAT_SECS)
}
pub fn auth_timeout_secs(&self) -> u64 {
self.auth_timeout_secs
.unwrap_or(Self::DEFAULT_AUTH_TIMEOUT_SECS)
}
pub fn join_timeout_secs(&self) -> u64 {
self.join_timeout_secs
.unwrap_or(Self::DEFAULT_JOIN_TIMEOUT_SECS)
}
pub fn connect_timeout_ms(&self) -> u64 {
self.connect_timeout_ms
.unwrap_or(Self::DEFAULT_CONNECT_TIMEOUT_MS)
}
pub fn quic_idle_duration(&self) -> Result<Duration, Report> {
let hb = self.heartbeat_secs();
let ms = self.quic_idle_ms.unwrap_or(hb * 3 * 1000);
if ms <= hb * 1000 {
return Err(eyre!(
"quic idle timeout {ms}ms must exceed heartbeat interval {hb}s"
));
}
Ok(Duration::from_millis(ms))
}
pub fn worker_idle_multiplier(&self) -> u64 {
self.worker_idle_multiplier.unwrap_or(5)
}
pub fn rebalance_startup_grace_secs(&self) -> u64 {
self.rebalance_startup_grace_secs.unwrap_or(300)
}
pub fn rebalance_check_interval_secs(&self) -> u64 {
self.rebalance_check_interval_secs.unwrap_or(300)
}
pub fn rebalance_min_interval_secs(&self) -> u64 {
self.rebalance_min_interval_secs.unwrap_or(300)
}
pub fn migration_check_interval_secs(&self) -> u64 {
self.migration_check_interval_secs.unwrap_or(180)
}
pub fn migration_connection_timeout_secs(&self) -> u64 {
self.migration_connection_timeout_secs.unwrap_or(15)
}
pub fn migration_handoff_timeout_secs(&self) -> u64 {
self.migration_handoff_timeout_secs.unwrap_or(30)
}
pub fn rebalance_good_score_cutoff(&self) -> f64 {
self.rebalance_good_score_cutoff.unwrap_or(0.6)
}
pub fn rebalance_periodic_improvement(&self) -> f64 {
self.rebalance_periodic_improvement.unwrap_or(0.30)
}
pub fn rebalance_consecutive_signals(&self) -> u32 {
self.rebalance_consecutive_signals.unwrap_or(1)
}
pub fn rebalance_max_per_hour(&self) -> u32 {
self.rebalance_max_per_hour.unwrap_or(3)
}
pub fn rebalance_weight_health(&self) -> f64 {
self.rebalance_weight_health.unwrap_or(0.6)
}
pub fn rebalance_weight_history(&self) -> f64 {
self.rebalance_weight_history.unwrap_or(0.2)
}
pub fn rebalance_weight_breaker(&self) -> f64 {
self.rebalance_weight_breaker.unwrap_or(0.2)
}
pub fn rebalance_empty_delta_threshold(&self) -> f64 {
self.rebalance_empty_delta_threshold.unwrap_or(0.20)
}
pub fn rebalance_empty_bonus_weight(&self) -> f64 {
self.rebalance_empty_bonus_weight.unwrap_or(0.5)
}
pub fn rebalance_empty_health_cutoff(&self) -> f64 {
self.rebalance_empty_health_cutoff.unwrap_or(0.6)
}
pub fn rebalance_health_stale_secs(&self) -> u64 {
self.rebalance_health_stale_secs.unwrap_or(60)
}
pub fn rebalance_health_stale_factor(&self) -> f64 {
self.rebalance_health_stale_factor.unwrap_or(0.75)
}
}
static CONFIG: Lazy<RwLock<EnvironmentConfig>> =
Lazy::new(|| RwLock::new(EnvironmentConfig::from_env()));
static CONFIG_DIR: Lazy<RwLock<Option<String>>> =
Lazy::new(|| RwLock::new(std::env::var("VOLLI_CONFIG_DIR").ok()));
pub fn env_config() -> EnvironmentConfig {
CONFIG.read().unwrap().clone()
}
pub fn config_dir_env() -> Option<String> {
if let Some(dir) = TLS_CONFIG_DIR.with(|c| c.borrow().clone()) {
return Some(dir);
}
CONFIG_DIR.read().unwrap().clone()
}
pub fn profile_env() -> Option<String> {
std::env::var("VOLLI_PROFILE").ok()
}
pub fn tcp_port_env() -> Option<u16> {
std::env::var("VOLLI_TCP_PORT")
.ok()
.and_then(|v| v.parse().ok())
}
pub fn quic_port_env() -> Option<u16> {
std::env::var("VOLLI_QUIC_PORT")
.ok()
.and_then(|v| v.parse().ok())
}
pub fn editor_env() -> Option<String> {
std::env::var("EDITOR").ok()
}
pub struct ConfigDirGuard {
prev_thread: Option<String>,
prev_global: Option<String>,
}
pub fn override_config_dir<P: AsRef<Path>>(dir: Option<P>) -> ConfigDirGuard {
let new = dir.map(|p| p.as_ref().to_string_lossy().to_string());
let prev_thread = TLS_CONFIG_DIR.with(|c| {
let mut b = c.borrow_mut();
std::mem::replace(&mut *b, new.clone())
});
let prev_global = {
let mut lock = CONFIG_DIR.write().unwrap();
std::mem::replace(&mut *lock, new)
};
ConfigDirGuard {
prev_thread,
prev_global,
}
}
pub struct ConfigGuard(Option<EnvironmentConfig>);
pub fn override_env_config(cfg: EnvironmentConfig) -> ConfigGuard {
let prev = {
let mut lock = CONFIG.write().unwrap();
std::mem::replace(&mut *lock, cfg)
};
ConfigGuard(Some(prev))
}
pub fn override_env_config_patch(patch: EnvironmentConfig) -> ConfigGuard {
let prev = CONFIG.read().unwrap().clone();
let merged = EnvironmentConfig {
quic_idle_ms: patch.quic_idle_ms.or(prev.quic_idle_ms),
backoff_ms: patch.backoff_ms.or(prev.backoff_ms),
interval_ms: patch.interval_ms.or(prev.interval_ms),
connect_timeout_ms: patch.connect_timeout_ms.or(prev.connect_timeout_ms),
auth_timeout_secs: patch.auth_timeout_secs.or(prev.auth_timeout_secs),
heartbeat_secs: patch.heartbeat_secs.or(prev.heartbeat_secs),
join_timeout_secs: patch.join_timeout_secs.or(prev.join_timeout_secs),
peer_cleanup_secs: patch.peer_cleanup_secs.or(prev.peer_cleanup_secs),
peer_check_secs: patch.peer_check_secs.or(prev.peer_check_secs),
rebalance_improvement: patch.rebalance_improvement.or(prev.rebalance_improvement),
worker_idle_multiplier: patch.worker_idle_multiplier.or(prev.worker_idle_multiplier),
rebalance_startup_grace_secs: patch
.rebalance_startup_grace_secs
.or(prev.rebalance_startup_grace_secs),
rebalance_check_interval_secs: patch
.rebalance_check_interval_secs
.or(prev.rebalance_check_interval_secs),
rebalance_min_interval_secs: patch
.rebalance_min_interval_secs
.or(prev.rebalance_min_interval_secs),
migration_check_interval_secs: patch
.migration_check_interval_secs
.or(prev.migration_check_interval_secs),
migration_connection_timeout_secs: patch
.migration_connection_timeout_secs
.or(prev.migration_connection_timeout_secs),
migration_handoff_timeout_secs: patch
.migration_handoff_timeout_secs
.or(prev.migration_handoff_timeout_secs),
rebalance_good_score_cutoff: patch
.rebalance_good_score_cutoff
.or(prev.rebalance_good_score_cutoff),
rebalance_periodic_improvement: patch
.rebalance_periodic_improvement
.or(prev.rebalance_periodic_improvement),
rebalance_consecutive_signals: patch
.rebalance_consecutive_signals
.or(prev.rebalance_consecutive_signals),
rebalance_max_per_hour: patch.rebalance_max_per_hour.or(prev.rebalance_max_per_hour),
rebalance_weight_health: patch
.rebalance_weight_health
.or(prev.rebalance_weight_health),
rebalance_weight_history: patch
.rebalance_weight_history
.or(prev.rebalance_weight_history),
rebalance_weight_breaker: patch
.rebalance_weight_breaker
.or(prev.rebalance_weight_breaker),
rebalance_empty_delta_threshold: patch
.rebalance_empty_delta_threshold
.or(prev.rebalance_empty_delta_threshold),
rebalance_empty_bonus_weight: patch
.rebalance_empty_bonus_weight
.or(prev.rebalance_empty_bonus_weight),
rebalance_empty_health_cutoff: patch
.rebalance_empty_health_cutoff
.or(prev.rebalance_empty_health_cutoff),
rebalance_health_stale_secs: patch
.rebalance_health_stale_secs
.or(prev.rebalance_health_stale_secs),
rebalance_health_stale_factor: patch
.rebalance_health_stale_factor
.or(prev.rebalance_health_stale_factor),
};
let old = {
let mut lock = CONFIG.write().unwrap();
std::mem::replace(&mut *lock, merged)
};
ConfigGuard(Some(old))
}
impl Drop for ConfigGuard {
fn drop(&mut self) {
if let Some(prev) = self.0.take() {
let mut lock = CONFIG.write().unwrap();
*lock = prev;
}
}
}
impl Drop for ConfigDirGuard {
fn drop(&mut self) {
TLS_CONFIG_DIR.with(|c| {
*c.borrow_mut() = self.prev_thread.take();
});
let mut lock = CONFIG_DIR.write().unwrap();
*lock = self.prev_global.take();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn quic_idle_default() {
let cfg = EnvironmentConfig {
heartbeat_secs: Some(2),
..Default::default()
};
assert_eq!(cfg.quic_idle_duration().unwrap(), Duration::from_secs(6));
}
#[test]
fn quic_idle_invalid() {
let cfg = EnvironmentConfig {
heartbeat_secs: Some(5),
quic_idle_ms: Some(4000),
..Default::default()
};
assert!(cfg.quic_idle_duration().is_err());
}
#[test]
fn quic_idle_valid() {
let cfg = EnvironmentConfig {
heartbeat_secs: Some(5),
quic_idle_ms: Some(11000),
..Default::default()
};
assert_eq!(
cfg.quic_idle_duration().unwrap(),
Duration::from_millis(11000)
);
}
}