use ff_core::partition::PartitionConfig;
use ff_core::types::LaneId;
use ff_engine::EngineConfig;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum BackendKind {
#[default]
Valkey,
Postgres,
}
impl BackendKind {
pub fn as_str(&self) -> &'static str {
match self {
Self::Valkey => "valkey",
Self::Postgres => "postgres",
}
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct PostgresServerConfig {
pub url: String,
pub pool_size: u32,
}
impl Default for PostgresServerConfig {
fn default() -> Self {
Self {
url: String::new(),
pool_size: 10,
}
}
}
#[derive(Debug, Clone)]
pub struct ValkeyServerConfig {
pub host: String,
pub port: u16,
pub tls: bool,
pub cluster: bool,
pub skip_library_load: bool,
}
impl Default for ValkeyServerConfig {
fn default() -> Self {
Self {
host: "localhost".into(),
port: 6379,
tls: false,
cluster: false,
skip_library_load: false,
}
}
}
pub struct ServerConfig {
pub partition_config: PartitionConfig,
pub lanes: Vec<LaneId>,
pub listen_addr: String,
pub engine_config: EngineConfig,
pub cors_origins: Vec<String>,
pub api_token: Option<String>,
pub waitpoint_hmac_secret: String,
pub waitpoint_hmac_grace_ms: u64,
pub max_concurrent_stream_ops: u32,
pub backend: BackendKind,
pub valkey: ValkeyServerConfig,
pub postgres: PostgresServerConfig,
}
impl ServerConfig {
pub fn postgres_config(&self) -> ff_core::backend::BackendConfig {
let mut cfg = ff_core::backend::BackendConfig::postgres(&self.postgres.url);
if let ff_core::backend::BackendConnection::Postgres(ref mut conn) = cfg.connection {
conn.max_connections = self.postgres.pool_size;
}
cfg
}
}
impl ServerConfig {
pub fn from_env() -> Result<Self, ConfigError> {
let valkey = ValkeyServerConfig {
host: env_or("FF_HOST", "localhost"),
port: env_u16("FF_PORT", 6379)?,
tls: env_bool("FF_TLS"),
cluster: env_bool("FF_CLUSTER"),
skip_library_load: false,
};
let listen_addr = env_or("FF_LISTEN_ADDR", "0.0.0.0:9090");
let cors_raw = std::env::var("FF_CORS_ORIGINS");
let cors_source = match &cors_raw {
Ok(s) if s.is_empty() => {
return Err(ConfigError::InvalidValue {
var: "FF_CORS_ORIGINS".to_owned(),
message: "FF_CORS_ORIGINS is set but empty; \
unset it to default to \"*\", or pass \"*\" explicitly, \
or pass a non-empty comma-separated origin list"
.to_owned(),
});
}
Ok(s) => s.clone(),
Err(_) => "*".to_owned(),
};
let cors_origins: Vec<String> = cors_source
.split(',')
.map(|s| s.trim().to_owned())
.filter(|s| !s.is_empty())
.collect();
let api_token = std::env::var("FF_API_TOKEN").ok().filter(|s| !s.is_empty());
let waitpoint_hmac_secret = std::env::var("FF_WAITPOINT_HMAC_SECRET")
.map_err(|_| ConfigError::InvalidValue {
var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
message:
"required: hex-encoded HMAC signing secret for waitpoint tokens \
(RFC-004 §Waitpoint Security); suggested 64 hex chars (32 bytes)"
.to_owned(),
})?;
if waitpoint_hmac_secret.is_empty() {
return Err(ConfigError::InvalidValue {
var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
message: "must not be empty".to_owned(),
});
}
if waitpoint_hmac_secret.len() % 2 != 0
|| !waitpoint_hmac_secret.chars().all(|c| c.is_ascii_hexdigit())
{
return Err(ConfigError::InvalidValue {
var: "FF_WAITPOINT_HMAC_SECRET".to_owned(),
message: "must be an even-length hex string (0-9a-fA-F)".to_owned(),
});
}
let waitpoint_hmac_grace_ms = env_u64("FF_WAITPOINT_HMAC_GRACE_MS", 86_400_000)?;
let max_concurrent_stream_ops = match std::env::var("FF_MAX_CONCURRENT_STREAM_OPS") {
Ok(_) => env_u32_positive("FF_MAX_CONCURRENT_STREAM_OPS", 64)?,
Err(_) => env_u32_positive("FF_MAX_CONCURRENT_TAIL", 64)?,
};
let lanes: Vec<LaneId> = env_or("FF_LANES", "default")
.split(',')
.map(|s| LaneId::new(s.trim()))
.filter(|l| !l.as_str().is_empty())
.collect();
if lanes.is_empty() {
return Err(ConfigError::InvalidValue {
var: "FF_LANES".to_owned(),
message: "at least one non-empty lane name is required".to_owned(),
});
}
let partition_config = PartitionConfig {
num_flow_partitions: env_u16_positive("FF_FLOW_PARTITIONS", 256)?,
num_budget_partitions: env_u16_positive("FF_BUDGET_PARTITIONS", 32)?,
num_quota_partitions: env_u16_positive("FF_QUOTA_PARTITIONS", 32)?,
};
let lease_expiry_interval =
Duration::from_millis(env_u64("FF_LEASE_EXPIRY_INTERVAL_MS", 1500)?);
let delayed_promoter_interval =
Duration::from_millis(env_u64("FF_DELAYED_PROMOTER_INTERVAL_MS", 750)?);
let index_reconciler_interval =
Duration::from_secs(env_u64("FF_INDEX_RECONCILER_INTERVAL_S", 45)?);
let attempt_timeout_interval =
Duration::from_secs(env_u64("FF_ATTEMPT_TIMEOUT_INTERVAL_S", 2)?);
let suspension_timeout_interval =
Duration::from_secs(env_u64("FF_SUSPENSION_TIMEOUT_INTERVAL_S", 2)?);
let pending_wp_expiry_interval =
Duration::from_secs(env_u64("FF_PENDING_WP_EXPIRY_INTERVAL_S", 5)?);
let retention_trimmer_interval =
Duration::from_secs(env_u64("FF_RETENTION_TRIMMER_INTERVAL_S", 60)?);
let budget_reset_interval =
Duration::from_secs(env_u64("FF_BUDGET_RESET_INTERVAL_S", 15)?);
let budget_reconciler_interval =
Duration::from_secs(env_u64("FF_BUDGET_RECONCILER_INTERVAL_S", 30)?);
let quota_reconciler_interval =
Duration::from_secs(env_u64("FF_QUOTA_RECONCILER_INTERVAL_S", 30)?);
let unblock_interval =
Duration::from_secs(env_u64("FF_UNBLOCK_INTERVAL_S", 5)?);
let dependency_reconciler_interval =
Duration::from_secs(env_u64("FF_DEPENDENCY_RECONCILER_INTERVAL_S", 15)?);
let engine_config = EngineConfig {
partition_config,
lanes: lanes.clone(),
lease_expiry_interval,
delayed_promoter_interval,
index_reconciler_interval,
attempt_timeout_interval,
suspension_timeout_interval,
pending_wp_expiry_interval,
retention_trimmer_interval,
budget_reset_interval,
budget_reconciler_interval,
quota_reconciler_interval,
unblock_interval,
dependency_reconciler_interval,
flow_projector_interval: Duration::from_secs(
env_u64("FF_FLOW_PROJECTOR_INTERVAL_S", 15)?
),
execution_deadline_interval: Duration::from_secs(
env_u64("FF_EXECUTION_DEADLINE_INTERVAL_S", 5)?
),
cancel_reconciler_interval: Duration::from_secs(
env_u64("FF_CANCEL_RECONCILER_INTERVAL_S", 15)?
),
edge_cancel_dispatcher_interval: Duration::from_secs(
env_u64("FF_EDGE_CANCEL_DISPATCHER_INTERVAL_S", 1)?
),
edge_cancel_reconciler_interval: Duration::from_secs(
env_u64("FF_EDGE_CANCEL_RECONCILER_INTERVAL_S", 10)?
),
scanner_filter: Default::default(),
};
let postgres = PostgresServerConfig {
url: std::env::var("FF_POSTGRES_URL").unwrap_or_default(),
pool_size: env_u32_positive("FF_POSTGRES_POOL_SIZE", 10)?,
};
let backend = match std::env::var("FF_BACKEND") {
Ok(v) => match v.to_ascii_lowercase().as_str() {
"" | "valkey" => BackendKind::Valkey,
"postgres" => BackendKind::Postgres,
other => {
return Err(ConfigError::InvalidValue {
var: "FF_BACKEND".to_owned(),
message: format!(
"unknown backend '{other}': expected 'valkey' or 'postgres'"
),
});
}
},
Err(_) => BackendKind::default(),
};
Ok(Self {
partition_config,
lanes,
listen_addr,
engine_config,
cors_origins,
api_token,
waitpoint_hmac_secret,
waitpoint_hmac_grace_ms,
max_concurrent_stream_ops,
backend,
valkey,
postgres,
})
}
}
impl Default for ServerConfig {
fn default() -> Self {
let lanes = vec![LaneId::new("default")];
let partition_config = PartitionConfig::default();
Self {
partition_config,
lanes: lanes.clone(),
listen_addr: "0.0.0.0:9090".into(),
engine_config: EngineConfig {
partition_config,
lanes,
..Default::default()
},
cors_origins: vec!["*".to_owned()],
api_token: None,
waitpoint_hmac_secret:
"0000000000000000000000000000000000000000000000000000000000000000"
.to_owned(),
waitpoint_hmac_grace_ms: 86_400_000,
max_concurrent_stream_ops: 64,
backend: BackendKind::default(),
valkey: ValkeyServerConfig::default(),
postgres: PostgresServerConfig::default(),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("invalid value for {var}: {message}")]
InvalidValue { var: String, message: String },
}
fn env_or(key: &str, default: &str) -> String {
std::env::var(key).unwrap_or_else(|_| default.to_owned())
}
fn env_bool(key: &str) -> bool {
std::env::var(key)
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
fn env_u16(key: &str, default: u16) -> Result<u16, ConfigError> {
match std::env::var(key) {
Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
var: key.to_owned(),
message: format!("expected u16, got '{v}'"),
}),
Err(_) => Ok(default),
}
}
fn env_u16_positive(key: &str, default: u16) -> Result<u16, ConfigError> {
let val = env_u16(key, default)?;
if val == 0 {
return Err(ConfigError::InvalidValue {
var: key.to_owned(),
message: "must be > 0 (used as divisor in partition math)".to_owned(),
});
}
Ok(val)
}
fn env_u64(key: &str, default: u64) -> Result<u64, ConfigError> {
match std::env::var(key) {
Ok(v) => v.parse().map_err(|_| ConfigError::InvalidValue {
var: key.to_owned(),
message: format!("expected u64, got '{v}'"),
}),
Err(_) => Ok(default),
}
}
fn env_u32_positive(key: &str, default: u32) -> Result<u32, ConfigError> {
let val = match std::env::var(key) {
Ok(v) => v.parse::<u32>().map_err(|_| ConfigError::InvalidValue {
var: key.to_owned(),
message: format!("expected u32, got '{v}'"),
})?,
Err(_) => default,
};
if val == 0 {
return Err(ConfigError::InvalidValue {
var: key.to_owned(),
message: "must be > 0 (semaphore size)".to_owned(),
});
}
Ok(val)
}