use clap::Parser;
use dashmap::DashMap;
use figment::{
Figment,
providers::{Env, Format, Yaml},
};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
ffi::OsString,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use url::Url;
use crate::api::models::users::Role;
use crate::errors::Error;
use crate::sample_files::SampleFilesConfig;
pub static ONWARDS_CONFIG_CHANGED_CHANNEL: &str = "auth_config_changed";
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short = 'f', long, env = "DWCTL_CONFIG", default_value = "config.yaml")]
pub config: PathBuf,
#[arg(long)]
pub validate: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
pub host: String,
pub port: u16,
pub dashboard_url: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub database_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub database_replica_url: Option<String>,
pub database: DatabaseConfig,
pub slow_statement_threshold_ms: u64,
pub admin_email: String,
pub admin_password: Option<String>,
pub secret_key: Option<String>,
pub model_sources: Vec<ModelSource>,
pub metadata: Metadata,
#[serde(skip_serializing_if = "Option::is_none")]
pub payment: Option<PaymentConfig>,
pub auth: AuthConfig,
pub batches: BatchConfig,
pub background_services: BackgroundServicesConfig,
pub enable_metrics: bool,
pub enable_request_logging: bool,
pub enable_analytics: bool,
#[serde(default)]
pub analytics: AnalyticsConfig,
pub enable_otel_export: bool,
pub credits: CreditsConfig,
pub sample_files: SampleFilesConfig,
pub limits: LimitsConfig,
pub email: EmailConfig,
pub onwards: OnwardsConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub onboarding_url: Option<String>,
pub support_email: String,
#[serde(default)]
pub connections: ConnectionsConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct PoolSettings {
pub max_connections: u32,
pub min_connections: u32,
pub acquire_timeout_secs: u64,
pub idle_timeout_secs: u64,
pub max_lifetime_secs: u64,
}
impl Default for PoolSettings {
fn default() -> Self {
Self {
max_connections: 10,
min_connections: 0,
acquire_timeout_secs: 30,
idle_timeout_secs: 600, max_lifetime_secs: 1800, }
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum ComponentDb {
Schema {
name: String,
#[serde(default)]
pool: PoolSettings,
#[serde(default, skip_serializing_if = "Option::is_none")]
replica_pool: Option<PoolSettings>,
},
Dedicated {
url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
replica_url: Option<String>,
#[serde(default)]
pool: PoolSettings,
#[serde(default, skip_serializing_if = "Option::is_none")]
replica_pool: Option<PoolSettings>,
},
}
impl ComponentDb {
pub fn pool_settings(&self) -> &PoolSettings {
match self {
ComponentDb::Schema { pool, .. } => pool,
ComponentDb::Dedicated { pool, .. } => pool,
}
}
pub fn replica_pool_settings(&self) -> &PoolSettings {
match self {
ComponentDb::Schema { pool, replica_pool, .. } => replica_pool.as_ref().unwrap_or(pool),
ComponentDb::Dedicated { pool, replica_pool, .. } => replica_pool.as_ref().unwrap_or(pool),
}
}
}
pub fn default_fusillade_component() -> ComponentDb {
ComponentDb::Schema {
name: "fusillade".into(),
pool: PoolSettings {
max_connections: 20,
min_connections: 2,
acquire_timeout_secs: 30,
idle_timeout_secs: 600,
max_lifetime_secs: 1800,
},
replica_pool: None,
}
}
pub fn default_outlet_component() -> ComponentDb {
ComponentDb::Schema {
name: "outlet".into(),
pool: PoolSettings {
max_connections: 5,
min_connections: 0,
acquire_timeout_secs: 30,
idle_timeout_secs: 600,
max_lifetime_secs: 1800,
},
replica_pool: None,
}
}
pub fn default_underway_pool() -> PoolSettings {
PoolSettings {
max_connections: 100,
min_connections: 0,
..Default::default()
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum DatabaseConfig {
Embedded {
#[serde(skip_serializing_if = "Option::is_none")]
data_dir: Option<PathBuf>,
#[serde(default)]
persistent: bool,
#[serde(default)]
pool: PoolSettings,
#[serde(default, skip_serializing_if = "Option::is_none")]
replica_pool: Option<PoolSettings>,
#[serde(default = "default_fusillade_component")]
fusillade: ComponentDb,
#[serde(default = "default_outlet_component")]
outlet: ComponentDb,
#[serde(default = "default_underway_pool")]
underway_pool: PoolSettings,
},
External {
url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
replica_url: Option<String>,
#[serde(default)]
pool: PoolSettings,
#[serde(default, skip_serializing_if = "Option::is_none")]
replica_pool: Option<PoolSettings>,
#[serde(default = "default_fusillade_component")]
fusillade: ComponentDb,
#[serde(default = "default_outlet_component")]
outlet: ComponentDb,
#[serde(default = "default_underway_pool")]
underway_pool: PoolSettings,
},
}
impl Default for DatabaseConfig {
fn default() -> Self {
#[cfg(feature = "embedded-db")]
{
DatabaseConfig::Embedded {
data_dir: None,
persistent: false,
pool: PoolSettings::default(),
replica_pool: None,
fusillade: default_fusillade_component(),
outlet: default_outlet_component(),
underway_pool: default_underway_pool(),
}
}
#[cfg(not(feature = "embedded-db"))]
{
DatabaseConfig::External {
url: "postgres://localhost:5432/control_layer".to_string(),
replica_url: None,
pool: PoolSettings::default(),
replica_pool: None,
fusillade: default_fusillade_component(),
outlet: default_outlet_component(),
underway_pool: default_underway_pool(),
}
}
}
}
impl DatabaseConfig {
pub fn is_embedded(&self) -> bool {
matches!(self, DatabaseConfig::Embedded { .. })
}
pub fn external_url(&self) -> Option<&str> {
match self {
DatabaseConfig::External { url, .. } => Some(url),
DatabaseConfig::Embedded { .. } => None,
}
}
pub fn external_replica_url(&self) -> Option<&str> {
match self {
DatabaseConfig::External { replica_url, .. } => replica_url.as_deref(),
DatabaseConfig::Embedded { .. } => None,
}
}
pub fn embedded_data_dir(&self) -> Option<PathBuf> {
match self {
DatabaseConfig::Embedded { data_dir, .. } => data_dir.clone(),
DatabaseConfig::External { .. } => None,
}
}
pub fn embedded_persistent(&self) -> bool {
match self {
DatabaseConfig::Embedded { persistent, .. } => *persistent,
DatabaseConfig::External { .. } => false,
}
}
pub fn main_pool_settings(&self) -> &PoolSettings {
match self {
DatabaseConfig::Embedded { pool, .. } => pool,
DatabaseConfig::External { pool, .. } => pool,
}
}
pub fn main_replica_pool_settings(&self) -> &PoolSettings {
match self {
DatabaseConfig::Embedded { pool, replica_pool, .. } => replica_pool.as_ref().unwrap_or(pool),
DatabaseConfig::External { pool, replica_pool, .. } => replica_pool.as_ref().unwrap_or(pool),
}
}
pub fn fusillade(&self) -> &ComponentDb {
match self {
DatabaseConfig::Embedded { fusillade, .. } => fusillade,
DatabaseConfig::External { fusillade, .. } => fusillade,
}
}
pub fn outlet(&self) -> &ComponentDb {
match self {
DatabaseConfig::Embedded { outlet, .. } => outlet,
DatabaseConfig::External { outlet, .. } => outlet,
}
}
pub fn underway_pool_settings(&self) -> &PoolSettings {
match self {
DatabaseConfig::Embedded { underway_pool, .. } => underway_pool,
DatabaseConfig::External { underway_pool, .. } => underway_pool,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum PaymentConfig {
Stripe(StripeConfig),
Dummy(DummyConfig),
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StripeConfig {
pub api_key: String,
pub webhook_secret: String,
pub price_id: String,
#[serde(default)]
pub enable_invoice_creation: bool,
pub auto_topup_terms_of_service_text: Option<String>,
pub tax_code: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DummyConfig {
pub amount: rust_decimal::Decimal,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct Metadata {
pub region: Option<String>,
pub organization: Option<String>,
pub docs_url: String,
pub docs_jsonl_url: Option<String>,
pub title: Option<String>,
pub ai_api_base_url: Option<String>,
}
impl Default for Metadata {
fn default() -> Self {
Self {
region: None,
organization: None,
docs_url: "https://docs.doubleword.ai/control-layer".to_string(),
docs_jsonl_url: None,
title: None,
ai_api_base_url: None,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ModelSource {
pub name: String,
pub url: Url,
pub api_key: Option<String>,
#[serde(default = "ModelSource::default_sync_interval")]
#[serde(with = "humantime_serde")]
pub sync_interval: Duration,
#[serde(default)]
pub default_models: Option<Vec<DefaultModel>>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct DefaultModel {
pub name: String,
pub add_to_everyone_group: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct AuthConfig {
pub native: NativeAuthConfig,
pub proxy_header: ProxyHeaderAuthConfig,
pub security: SecurityConfig,
pub default_user_roles: Vec<Role>,
}
impl Default for AuthConfig {
fn default() -> Self {
Self {
native: NativeAuthConfig::default(),
proxy_header: ProxyHeaderAuthConfig::default(),
security: SecurityConfig::default(),
default_user_roles: vec![Role::StandardUser],
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct NativeAuthConfig {
pub enabled: bool,
pub allow_registration: bool,
pub password: PasswordConfig,
pub session: SessionConfig,
#[serde(with = "humantime_serde")]
pub password_reset_token_duration: Duration,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct ProxyHeaderAuthConfig {
pub enabled: bool,
pub header_name: String,
pub email_header_name: String,
pub groups_field_name: String,
pub import_idp_groups: bool,
pub blacklisted_sso_groups: Vec<String>,
pub provider_field_name: String,
pub auto_create_users: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct SessionConfig {
#[serde(with = "humantime_serde")]
pub timeout: Duration,
pub cookie_name: String,
pub cookie_secure: bool,
pub cookie_same_site: String,
pub cookie_domain: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct PasswordConfig {
pub min_length: usize,
pub max_length: usize,
pub argon2_memory_kib: u32,
pub argon2_iterations: u32,
pub argon2_parallelism: u32,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct SecurityConfig {
#[serde(with = "humantime_serde")]
pub jwt_expiry: Duration,
pub cors: CorsConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct CorsConfig {
pub allowed_origins: Vec<CorsOrigin>,
pub allow_credentials: bool,
pub max_age: Option<u64>,
pub exposed_headers: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default)]
pub struct EmailConfig {
#[serde(flatten)]
pub transport: EmailTransportConfig,
pub from_email: String,
pub from_name: String,
pub reply_to: Option<String>,
pub templates_dir: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum EmailTransportConfig {
Smtp {
host: String,
port: u16,
username: String,
password: String,
use_tls: bool,
},
File {
path: String,
},
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct FilesConfig {
pub default_expiry_seconds: i64,
pub min_expiry_seconds: i64,
pub max_expiry_seconds: i64,
pub upload_buffer_size: usize,
pub download_buffer_size: usize,
pub batch_insert_size: usize,
}
impl Default for FilesConfig {
fn default() -> Self {
Self {
default_expiry_seconds: 24 * 60 * 60, min_expiry_seconds: 60 * 60, max_expiry_seconds: 30 * 24 * 60 * 60, upload_buffer_size: 100,
download_buffer_size: 100,
batch_insert_size: 5000,
}
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct LimitsConfig {
pub files: FileLimitsConfig,
pub requests: RequestLimitsConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct RequestLimitsConfig {
pub max_body_size: u64,
}
impl Default for RequestLimitsConfig {
fn default() -> Self {
Self {
max_body_size: 10 * 1024 * 1024, }
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
#[derive(Default)]
pub struct OnwardsConfig {
pub strict_mode: bool,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct FileLimitsConfig {
pub max_file_size: u64,
pub max_requests_per_file: usize,
pub max_concurrent_uploads: usize,
pub max_waiting_uploads: usize,
pub max_upload_wait_secs: u64,
}
impl Default for FileLimitsConfig {
fn default() -> Self {
Self {
max_file_size: 100 * 1024 * 1024, max_requests_per_file: 0, max_concurrent_uploads: 0,
max_waiting_uploads: 20,
max_upload_wait_secs: 60,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct BatchConfig {
pub enabled: bool,
pub allowed_completion_windows: Vec<String>,
#[serde(default, deserialize_with = "deserialize_relaxation_factors")]
pub window_relaxation_factors: HashMap<String, f32>,
pub allowed_url_paths: Vec<String>,
#[serde(default)]
pub async_requests: AsyncRequestsConfig,
pub files: FilesConfig,
#[serde(default = "default_batch_throughput", deserialize_with = "deserialize_positive_throughput")]
pub default_throughput: f32,
#[serde(
default = "default_reservation_ttl_secs",
deserialize_with = "deserialize_positive_reservation_ttl"
)]
pub reservation_ttl_secs: i64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct AsyncRequestsConfig {
pub enabled: bool,
pub completion_window: String,
}
impl Default for AsyncRequestsConfig {
fn default() -> Self {
Self {
enabled: true,
completion_window: "1h".to_string(),
}
}
}
fn default_batch_throughput() -> f32 {
100.0
}
fn default_reservation_ttl_secs() -> i64 {
10 * 60
}
fn deserialize_relaxation_factors<'de, D>(deserializer: D) -> Result<HashMap<String, f32>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let map: HashMap<String, f32> = HashMap::deserialize(deserializer)?;
for (window, &factor) in &map {
if !factor.is_finite() {
return Err(D::Error::custom(format!(
"window_relaxation_factors[{}] must be a finite number, got {}",
window, factor
)));
}
if factor < 0.0 {
return Err(D::Error::custom(format!(
"window_relaxation_factors[{}] must be >= 0.0, got {}",
window, factor
)));
}
}
Ok(map)
}
impl BatchConfig {
pub fn relaxation_factor(&self, completion_window: &str) -> f32 {
self.window_relaxation_factors.get(completion_window).copied().unwrap_or(1.0)
}
}
fn deserialize_positive_reservation_ttl<'de, D>(deserializer: D) -> Result<i64, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let opt: Option<i64> = Option::deserialize(deserializer)?;
match opt {
None => Ok(default_reservation_ttl_secs()),
Some(value) if value <= 0 => Err(D::Error::custom(format!(
"reservation_ttl_secs must be positive (> 0), got {}",
value
))),
Some(value) => Ok(value),
}
}
fn deserialize_positive_throughput<'de, D>(deserializer: D) -> Result<f32, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let opt: Option<f32> = Option::deserialize(deserializer)?;
match opt {
None => Ok(default_batch_throughput()), Some(value) if value <= 0.0 => Err(D::Error::custom(format!(
"default_throughput must be positive (> 0), got {}",
value
))),
Some(value) => Ok(value),
}
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
enabled: true,
allowed_completion_windows: vec!["24h".to_string()],
window_relaxation_factors: HashMap::new(),
allowed_url_paths: vec![
"/v1/chat/completions".to_string(),
"/v1/completions".to_string(),
"/v1/embeddings".to_string(),
"/v1/responses".to_string(),
],
async_requests: AsyncRequestsConfig::default(),
files: FilesConfig::default(),
default_throughput: default_batch_throughput(),
reservation_ttl_secs: default_reservation_ttl_secs(),
}
}
}
impl BatchConfig {
pub fn validate(&self) {
if self.async_requests.enabled && !self.allowed_completion_windows.contains(&self.async_requests.completion_window) {
tracing::error!(
async_window = %self.async_requests.completion_window,
allowed = ?self.allowed_completion_windows,
"async_requests.completion_window is not in allowed_completion_windows — async requests will fail"
);
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DaemonConfig {
pub enabled: DaemonEnabled,
pub claim_batch_size: usize,
pub default_model_concurrency: usize,
pub claim_interval_ms: u64,
pub max_retries: Option<u32>,
pub stop_before_deadline_ms: Option<i64>,
pub backoff_ms: u64,
pub backoff_factor: u64,
pub max_backoff_ms: u64,
pub timeout_ms: Option<u64>,
pub first_chunk_timeout_ms: u64,
pub chunk_timeout_ms: u64,
pub body_timeout_ms: u64,
pub status_log_interval_ms: Option<u64>,
pub claim_timeout_ms: u64,
pub processing_timeout_ms: u64,
pub model_escalations: HashMap<String, fusillade::ModelEscalationConfig>,
#[serde(default = "default_batch_metadata_fields_dwctl")]
pub batch_metadata_fields: Vec<String>,
pub purge_interval_ms: u64,
pub purge_batch_size: i64,
pub purge_throttle_ms: u64,
#[serde(default)]
pub streamable_endpoints: Vec<String>,
#[serde(default = "default_urgency_weight", deserialize_with = "deserialize_urgency_weight")]
pub urgency_weight: f64,
#[serde(default)]
pub inject_deadline_priority: bool,
}
fn default_urgency_weight() -> f64 {
0.5
}
fn deserialize_urgency_weight<'de, D>(deserializer: D) -> Result<f64, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let opt: Option<f64> = Option::deserialize(deserializer)?;
match opt {
None => Ok(default_urgency_weight()),
Some(value) if !value.is_finite() => Err(D::Error::custom(format!("urgency_weight must be a finite number, got {}", value))),
Some(value) if !(0.0..=1.0).contains(&value) => Err(D::Error::custom(format!(
"urgency_weight must be between 0.0 and 1.0, got {}",
value
))),
Some(value) => Ok(value),
}
}
fn default_batch_metadata_fields_dwctl() -> Vec<String> {
vec![
"id".to_string(),
"endpoint".to_string(),
"created_at".to_string(),
"completion_window".to_string(),
"request_source".to_string(),
]
}
impl Default for DaemonConfig {
fn default() -> Self {
Self {
enabled: DaemonEnabled::Leader,
claim_batch_size: 100,
default_model_concurrency: 10,
claim_interval_ms: 1000,
max_retries: Some(1000),
stop_before_deadline_ms: Some(900_000),
backoff_ms: 1000,
backoff_factor: 2,
max_backoff_ms: 10000,
timeout_ms: None,
first_chunk_timeout_ms: 86_400_000,
chunk_timeout_ms: 86_400_000,
body_timeout_ms: 86_400_000,
status_log_interval_ms: Some(2000),
claim_timeout_ms: 60000,
processing_timeout_ms: 600000,
batch_metadata_fields: default_batch_metadata_fields_dwctl(),
model_escalations: HashMap::new(),
purge_interval_ms: 600_000,
purge_batch_size: 1000,
purge_throttle_ms: 100,
streamable_endpoints: Vec::new(),
urgency_weight: default_urgency_weight(),
inject_deadline_priority: false,
}
}
}
impl DaemonConfig {
pub fn to_fusillade_config(&self) -> fusillade::daemon::DaemonConfig {
self.to_fusillade_config_with_limits(None)
}
pub fn to_fusillade_config_with_limits(
&self,
model_capacity_limits: Option<std::sync::Arc<dashmap::DashMap<String, usize>>>,
) -> fusillade::daemon::DaemonConfig {
let (first_chunk_timeout_ms, chunk_timeout_ms, body_timeout_ms) = if let Some(timeout) = self.timeout_ms {
if self.first_chunk_timeout_ms == 86_400_000 && self.chunk_timeout_ms == 86_400_000 && self.body_timeout_ms == 86_400_000 {
tracing::warn!(
timeout_ms = timeout,
"batch_daemon.timeout_ms is deprecated; \
use first_chunk_timeout_ms, chunk_timeout_ms, and body_timeout_ms instead"
);
(timeout * 9 / 10, 86_400_000, timeout / 10)
} else {
(self.first_chunk_timeout_ms, self.chunk_timeout_ms, self.body_timeout_ms)
}
} else {
(self.first_chunk_timeout_ms, self.chunk_timeout_ms, self.body_timeout_ms)
};
fusillade::daemon::DaemonConfig {
claim_batch_size: self.claim_batch_size,
model_concurrency_limits: model_capacity_limits.unwrap_or_else(|| std::sync::Arc::new(dashmap::DashMap::new())),
model_escalations: Arc::new(DashMap::from_iter(self.model_escalations.clone())),
claim_interval_ms: self.claim_interval_ms,
max_retries: self.max_retries,
stop_before_deadline_ms: self.stop_before_deadline_ms,
backoff_ms: self.backoff_ms,
backoff_factor: self.backoff_factor,
max_backoff_ms: self.max_backoff_ms,
first_chunk_timeout_ms,
chunk_timeout_ms,
body_timeout_ms,
status_log_interval_ms: self.status_log_interval_ms,
claim_timeout_ms: self.claim_timeout_ms,
processing_timeout_ms: self.processing_timeout_ms,
batch_metadata_fields: self.batch_metadata_fields.clone(),
purge_interval_ms: self.purge_interval_ms,
purge_batch_size: self.purge_batch_size,
purge_throttle_ms: self.purge_throttle_ms,
streamable_endpoints: self.streamable_endpoints.clone(),
urgency_weight: self.urgency_weight,
inject_deadline_priority: self.inject_deadline_priority,
..Default::default()
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum DaemonEnabled {
Always,
Never,
Leader,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct LeaderElectionConfig {
pub enabled: bool,
}
impl Default for LeaderElectionConfig {
fn default() -> Self {
Self { enabled: true }
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct NotificationsConfig {
pub enabled: bool,
#[serde(with = "humantime_serde")]
pub poll_interval: Duration,
pub webhooks: WebhookConfig,
}
impl Default for NotificationsConfig {
fn default() -> Self {
Self {
enabled: true,
poll_interval: Duration::from_secs(30),
webhooks: WebhookConfig::default(),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(default, deny_unknown_fields)]
pub struct BackgroundServicesConfig {
pub onwards_sync: OnwardsSyncConfig,
pub probe_scheduler: ProbeSchedulerConfig,
pub batch_daemon: DaemonConfig,
pub leader_election: LeaderElectionConfig,
pub pool_metrics: PoolMetricsSamplerConfig,
pub notifications: NotificationsConfig,
pub sync_workers: SyncWorkersConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct PoolMetricsSamplerConfig {
#[serde(with = "humantime_serde")]
pub sample_interval: Duration,
}
impl Default for PoolMetricsSamplerConfig {
fn default() -> Self {
Self {
sample_interval: Duration::from_secs(5),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct OnwardsSyncConfig {
pub enabled: bool,
pub fallback_interval_milliseconds: u64,
}
impl Default for OnwardsSyncConfig {
fn default() -> Self {
Self {
enabled: true,
fallback_interval_milliseconds: 10_000, }
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct ProbeSchedulerConfig {
pub enabled: bool,
}
impl Default for ProbeSchedulerConfig {
fn default() -> Self {
Self { enabled: true }
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct WebhookConfig {
pub enabled: bool,
pub timeout_secs: u64,
pub retry_schedule_secs: Vec<i64>,
pub circuit_breaker_threshold: i32,
pub claim_batch_size: i64,
pub max_concurrent_sends: usize,
pub channel_capacity: usize,
}
impl Default for WebhookConfig {
fn default() -> Self {
Self {
enabled: true,
timeout_secs: 30,
retry_schedule_secs: vec![0, 5, 300, 1800, 7200, 28800, 86400],
circuit_breaker_threshold: 10,
claim_batch_size: 50,
max_concurrent_sends: 20,
channel_capacity: 200,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(untagged)]
pub enum CorsOrigin {
#[serde(deserialize_with = "parse_wildcard")]
Wildcard,
#[serde(deserialize_with = "parse_url")]
Url(Url),
}
fn parse_wildcard<'de, D>(deserializer: D) -> Result<(), D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
if s == "*" {
Ok(())
} else {
Err(serde::de::Error::custom("Expected '*'"))
}
}
fn parse_url<'de, D>(deserializer: D) -> Result<Url, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
Url::parse(&s).map_err(serde::de::Error::custom)
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct CreditsConfig {
pub initial_credits_for_standard_users: rust_decimal::Decimal,
}
impl Default for CreditsConfig {
fn default() -> Self {
Self {
initial_credits_for_standard_users: rust_decimal::Decimal::ZERO,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct AnalyticsConfig {
pub batch_size: usize,
pub max_retries: u32,
pub retry_base_delay_ms: u64,
pub balance_notification_interval_milliseconds: u64,
}
impl Default for AnalyticsConfig {
fn default() -> Self {
Self {
batch_size: 100,
max_retries: 3,
retry_base_delay_ms: 100,
balance_notification_interval_milliseconds: 5000,
}
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct ConnectionsConfig {
pub encryption_key: Option<String>,
pub sync: SyncPipelineConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct SyncPipelineConfig {
pub default_completion_window: String,
pub default_endpoint: String,
}
impl Default for SyncPipelineConfig {
fn default() -> Self {
Self {
default_completion_window: "24h".to_string(),
default_endpoint: "/v1/chat/completions".to_string(),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct SyncWorkersConfig {
pub enabled: bool,
pub ingest_workers: usize,
pub activate_workers: usize,
#[serde(alias = "sync_workers")]
pub discovery_workers: usize,
}
impl Default for SyncWorkersConfig {
fn default() -> Self {
Self {
enabled: true,
ingest_workers: 4,
activate_workers: 1,
discovery_workers: 1,
}
}
}
impl Default for Config {
fn default() -> Self {
Self {
host: "0.0.0.0".to_string(),
port: 3001,
dashboard_url: "http://localhost:5173".to_string(),
database_url: None, database_replica_url: None,
database: DatabaseConfig::default(),
slow_statement_threshold_ms: 1000,
admin_email: "test@doubleword.ai".to_string(),
admin_password: Some("hunter2".to_string()),
secret_key: None,
model_sources: vec![],
metadata: Metadata::default(),
payment: None,
auth: AuthConfig::default(),
batches: BatchConfig::default(),
background_services: BackgroundServicesConfig::default(),
enable_metrics: true,
enable_request_logging: true,
enable_analytics: true,
analytics: AnalyticsConfig::default(),
enable_otel_export: false,
credits: CreditsConfig::default(),
sample_files: SampleFilesConfig::default(),
limits: LimitsConfig::default(),
email: EmailConfig::default(),
onwards: OnwardsConfig::default(),
onboarding_url: None,
support_email: "support@doubleword.ai".to_string(),
connections: ConnectionsConfig::default(),
}
}
}
impl Default for ModelSource {
fn default() -> Self {
Self {
name: String::new(),
url: Url::parse("http://localhost:8080").unwrap(),
api_key: None,
sync_interval: Duration::from_secs(10),
default_models: None,
}
}
}
impl Default for NativeAuthConfig {
fn default() -> Self {
Self {
enabled: true,
allow_registration: false,
password: PasswordConfig::default(),
session: SessionConfig::default(),
password_reset_token_duration: Duration::from_secs(30 * 60), }
}
}
impl Default for ProxyHeaderAuthConfig {
fn default() -> Self {
Self {
enabled: false,
header_name: "x-doubleword-user".to_string(),
email_header_name: "x-doubleword-email".to_string(),
groups_field_name: "x-doubleword-user-groups".to_string(),
provider_field_name: "x-doubleword-sso-provider".to_string(),
auto_create_users: true,
blacklisted_sso_groups: Vec::new(),
import_idp_groups: false,
}
}
}
impl Default for SessionConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(24 * 60 * 60), cookie_name: "dwctl_session".to_string(),
cookie_secure: true,
cookie_same_site: "strict".to_string(),
cookie_domain: None,
}
}
}
impl Default for PasswordConfig {
fn default() -> Self {
Self {
min_length: 8,
max_length: 64,
argon2_memory_kib: 19456, argon2_iterations: 2,
argon2_parallelism: 1,
}
}
}
impl Default for SecurityConfig {
fn default() -> Self {
Self {
jwt_expiry: Duration::from_secs(24 * 60 * 60), cors: CorsConfig::default(),
}
}
}
impl Default for CorsConfig {
fn default() -> Self {
Self {
allowed_origins: vec![
CorsOrigin::Url(Url::parse("htt://localhost:3001").unwrap()), ],
allow_credentials: true,
max_age: Some(3600), exposed_headers: vec!["location".to_string()],
}
}
}
impl Default for EmailConfig {
fn default() -> Self {
Self {
transport: EmailTransportConfig::default(),
from_email: "noreply@example.com".to_string(),
from_name: "Control Layer".to_string(),
reply_to: None,
templates_dir: None,
}
}
}
impl Default for EmailTransportConfig {
fn default() -> Self {
Self::File {
path: "./emails".to_string(),
}
}
}
impl ModelSource {
fn default_sync_interval() -> Duration {
Duration::from_secs(10)
}
}
impl Config {
#[allow(clippy::result_large_err)]
pub fn load_from_path(path: impl AsRef<Path>) -> Result<Self, figment::Error> {
Self::load(&Args {
config: path.as_ref().to_path_buf(),
validate: false,
})
}
#[allow(clippy::result_large_err)]
pub fn load(args: &Args) -> Result<Self, figment::Error> {
let mut config: Self = Self::figment(args).extract()?;
if let Some(url) = config.database_url.take() {
let pool = config.database.main_pool_settings().clone();
let fusillade = config.database.fusillade().clone();
let outlet = config.database.outlet().clone();
let underway_pool = config.database.underway_pool_settings().clone();
let original_replica_pool = match &config.database {
DatabaseConfig::External { replica_pool, .. } => replica_pool.clone(),
DatabaseConfig::Embedded { replica_pool, .. } => replica_pool.clone(),
};
let replica_url = config.database_replica_url.take();
config.database = DatabaseConfig::External {
url,
replica_url,
pool,
replica_pool: original_replica_pool, fusillade,
outlet,
underway_pool,
};
} else if let Some(replica_url) = config.database_replica_url.take() {
match &mut config.database {
DatabaseConfig::External {
replica_url: current_replica,
..
} => {
*current_replica = Some(replica_url);
}
DatabaseConfig::Embedded { .. } => {
}
}
}
if config.auth.native.session.cookie_domain.as_deref() == Some("") {
config.auth.native.session.cookie_domain = None;
}
config.validate().map_err(|e| figment::Error::from(e.to_string()))?;
Ok(config)
}
pub fn database_url(&self) -> Option<&str> {
self.database.external_url()
}
pub fn validate(&self) -> Result<(), Error> {
if self.auth.native.enabled {
if self.secret_key.is_none() {
return Err(Error::Internal {
operation: "Config validation: Native authentication is enabled but secret_key is not configured. \
Please set DWCTL_SECRET_KEY environment variable or add secret_key to config file."
.to_string(),
});
}
if self.auth.native.password.min_length > self.auth.native.password.max_length {
return Err(Error::Internal {
operation: format!(
"Config validation: Invalid password configuration: min_length ({}) cannot be greater than max_length ({})",
self.auth.native.password.min_length, self.auth.native.password.max_length
),
});
}
if self.auth.native.password.min_length < 1 {
return Err(Error::Internal {
operation: "Config validation: Invalid password configuration: min_length must be at least 1".to_string(),
});
}
}
if self.auth.security.jwt_expiry.as_secs() < 300 {
return Err(Error::Internal {
operation: "Config validation: JWT expiry duration is too short (minimum 5 minutes)".to_string(),
});
}
if self.auth.security.jwt_expiry.as_secs() > 86400 * 30 {
return Err(Error::Internal {
operation: "Config validation: JWT expiry duration is too long (maximum 30 days)".to_string(),
});
}
if !self.auth.native.enabled && !self.auth.proxy_header.enabled {
return Err(Error::Internal {
operation:
"Config validation: No authentication methods are enabled. Please enable either native or proxy_header authentication."
.to_string(),
});
}
if let Some(ref domain) = self.auth.native.session.cookie_domain {
let invalid = domain.is_empty() || domain.chars().any(|c| c.is_whitespace() || c.is_control()) || domain.contains(';');
if invalid {
return Err(Error::Internal {
operation: format!(
"Config validation: Invalid cookie_domain '{domain}'. \
Must not be empty or contain semicolons, whitespace, or control characters."
),
});
}
let fragment = format!("; Domain={domain}");
if axum::http::HeaderValue::from_str(&fragment).is_err() {
return Err(Error::Internal {
operation: format!("Config validation: cookie_domain '{domain}' produces an invalid HTTP header value."),
});
}
}
if self.auth.security.cors.allowed_origins.is_empty() {
return Err(Error::Internal {
operation: "Config validation: CORS allowed_origins cannot be empty. Add at least one allowed origin.".to_string(),
});
}
let has_wildcard = self
.auth
.security
.cors
.allowed_origins
.iter()
.any(|origin| matches!(origin, CorsOrigin::Wildcard));
if has_wildcard && self.auth.security.cors.allow_credentials {
return Err(Error::Internal {
operation: "Config validation: CORS cannot use wildcard origin '*' with allow_credentials=true. Specify explicit origins."
.to_string(),
});
}
let daemon_can_run = self.background_services.batch_daemon.enabled != DaemonEnabled::Never;
let validate_request_manager_config = self.batches.enabled || daemon_can_run;
if validate_request_manager_config {
if self.batches.files.batch_insert_size == 0 {
return Err(Error::Internal {
operation: "Config validation: batch_insert_size cannot be 0. Set a positive integer value (recommended: 1000-10000). \
This setting is used by the request manager when batches are enabled or the daemon runs."
.to_string(),
});
}
if self.batches.files.download_buffer_size == 0 {
return Err(Error::Internal {
operation: "Config validation: download_buffer_size cannot be 0. Set a positive integer value (default: 100). \
This setting is used by the request manager when batches are enabled or the daemon runs."
.to_string(),
});
}
}
if self.batches.enabled {
let unknown_windows: Vec<&str> = self
.batches
.window_relaxation_factors
.keys()
.filter(|w| !self.batches.allowed_completion_windows.contains(w))
.map(|w| w.as_str())
.collect();
if !unknown_windows.is_empty() {
return Err(Error::Internal {
operation: format!(
"Config validation: window_relaxation_factors contains window(s) not in \
allowed_completion_windows: {}. Add them to allowed_completion_windows or \
remove them from window_relaxation_factors.",
unknown_windows.join(", ")
),
});
}
if self.batches.allowed_url_paths.is_empty() {
return Err(Error::Internal {
operation: "Config validation: batches.allowed_url_paths cannot be empty. Add at least one supported URL path."
.to_string(),
});
}
if self.batches.files.upload_buffer_size == 0 {
return Err(Error::Internal {
operation: "Config validation: upload_buffer_size cannot be 0. Set a positive integer value (default: 100)."
.to_string(),
});
}
if self.batches.files.min_expiry_seconds <= 0 {
return Err(Error::Internal {
operation: "Config validation: min_expiry_seconds must be positive (default: 3600 = 1 hour).".to_string(),
});
}
if self.batches.files.default_expiry_seconds <= 0 {
return Err(Error::Internal {
operation: "Config validation: default_expiry_seconds must be positive (default: 86400 = 24 hours).".to_string(),
});
}
if self.batches.files.max_expiry_seconds <= 0 {
return Err(Error::Internal {
operation: "Config validation: max_expiry_seconds must be positive (default: 2592000 = 30 days).".to_string(),
});
}
if self.batches.files.min_expiry_seconds > self.batches.files.default_expiry_seconds {
return Err(Error::Internal {
operation: format!(
"Config validation: min_expiry_seconds ({}) cannot be greater than default_expiry_seconds ({})",
self.batches.files.min_expiry_seconds, self.batches.files.default_expiry_seconds
),
});
}
if self.batches.files.default_expiry_seconds > self.batches.files.max_expiry_seconds {
return Err(Error::Internal {
operation: format!(
"Config validation: default_expiry_seconds ({}) cannot be greater than max_expiry_seconds ({})",
self.batches.files.default_expiry_seconds, self.batches.files.max_expiry_seconds
),
});
}
if self.batches.files.min_expiry_seconds > self.batches.files.max_expiry_seconds {
return Err(Error::Internal {
operation: format!(
"Config validation: min_expiry_seconds ({}) cannot be greater than max_expiry_seconds ({})",
self.batches.files.min_expiry_seconds, self.batches.files.max_expiry_seconds
),
});
}
}
Ok(())
}
pub fn figment(args: &Args) -> Figment {
let config_path: OsString = args.config.as_os_str().to_owned();
Figment::new()
.merge(Yaml::file(config_path))
.merge(Env::prefixed("DWCTL_").split("__"))
.merge(Env::raw().only(&["DATABASE_URL", "DATABASE_REPLICA_URL"]))
.merge(
Env::raw()
.only(&["DWCTL_DATABASE_REPLICA_URL"])
.map(|_| "database_replica_url".into()),
)
}
pub fn bind_address(&self) -> String {
format!("{}:{}", self.host, self.port)
}
}
#[cfg(test)]
mod tests {
use super::*;
use figment::Jail;
#[test]
fn test_model_sources_config() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: hello
model_sources:
- name: openai
url: https://api.openai.com
api_key: sk-test
sync_interval: 30s
- name: internal
url: http://internal:8080
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.model_sources.len(), 2);
let openai = &config.model_sources[0];
assert_eq!(openai.name, "openai");
assert_eq!(openai.url.as_str(), "https://api.openai.com/");
assert_eq!(openai.api_key.as_deref(), Some("sk-test"));
assert_eq!(openai.sync_interval, Duration::from_secs(30));
let internal = &config.model_sources[1];
assert_eq!(internal.name, "internal");
assert_eq!(internal.sync_interval, Duration::from_secs(10));
Ok(())
});
}
#[test]
fn test_env_override() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: hello
metadata:
region: US East
organization: Test Corp
"#,
)?;
jail.set_env("DWCTL_HOST", "127.0.0.1");
jail.set_env("DWCTL_PORT", "8080");
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.host, "127.0.0.1");
assert_eq!(config.port, 8080);
assert_eq!(config.metadata.region, Some("US East".to_string()));
assert_eq!(config.metadata.organization, Some("Test Corp".to_string()));
Ok(())
});
}
#[test]
fn test_auth_config_override() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key-for-testing"
auth:
native:
enabled: true
allow_registration: false
password:
min_length: 12
proxy_header:
enabled: false
header_name: "x-custom-user"
security:
jwt_expiry: "2h"
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert!(config.auth.native.enabled);
assert!(!config.auth.native.allow_registration);
assert_eq!(config.auth.native.password.min_length, 12);
assert_eq!(config.auth.native.password.max_length, 64);
assert!(!config.auth.proxy_header.enabled);
assert_eq!(config.auth.proxy_header.header_name, "x-custom-user");
assert_eq!(config.auth.security.jwt_expiry, Duration::from_secs(2 * 60 * 60));
Ok(())
});
}
#[test]
fn test_config_validation_native_auth_missing_secret() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = None;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("secret_key is not configured"));
}
#[test]
fn test_config_validation_invalid_password_length() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-key".to_string());
config.auth.native.password.min_length = 10;
config.auth.native.password.max_length = 5;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("min_length"));
}
#[test]
fn test_config_validation_no_auth_methods_enabled() {
let mut config = Config::default();
config.auth.native.enabled = false;
config.auth.proxy_header.enabled = false;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("No authentication methods"));
}
#[test]
fn test_config_validation_valid_config() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
let result = config.validate();
assert!(result.is_ok());
}
#[test]
fn test_batch_insert_size_default() {
let config = Config::default();
assert_eq!(config.batches.files.batch_insert_size, 5000);
}
#[test]
fn test_batch_insert_size_yaml_override() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
files:
batch_insert_size: 10000
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.files.batch_insert_size, 10000);
Ok(())
});
}
#[test]
fn test_batch_insert_size_env_override() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
"#,
)?;
jail.set_env("DWCTL_BATCHES__FILES__BATCH_INSERT_SIZE", "7500");
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.files.batch_insert_size, 7500);
Ok(())
});
}
#[test]
fn test_batch_insert_size_zero_validation() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = true;
config.batches.files.batch_insert_size = 0;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("batch_insert_size cannot be 0"));
}
#[test]
fn test_upload_buffer_size_zero_validation() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = true;
config.batches.files.upload_buffer_size = 0;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("upload_buffer_size cannot be 0"));
}
#[test]
fn test_download_buffer_size_zero_validation() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = true;
config.batches.files.download_buffer_size = 0;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("download_buffer_size cannot be 0"));
}
#[test]
fn test_expiry_times_positive_validation() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = true;
config.batches.files.min_expiry_seconds = 0;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("min_expiry_seconds must be positive"));
config.batches.files.min_expiry_seconds = 3600;
config.batches.files.default_expiry_seconds = 0;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("default_expiry_seconds must be positive"));
config.batches.files.default_expiry_seconds = 86400;
config.batches.files.max_expiry_seconds = 0;
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("max_expiry_seconds must be positive"));
}
#[test]
fn test_expiry_times_order_validation() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = true;
config.batches.files.min_expiry_seconds = 86400;
config.batches.files.default_expiry_seconds = 3600;
config.batches.files.max_expiry_seconds = 2592000;
let result = config.validate();
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("min_expiry_seconds") && err_msg.contains("default_expiry_seconds"));
config.batches.files.min_expiry_seconds = 3600;
config.batches.files.default_expiry_seconds = 2592000;
config.batches.files.max_expiry_seconds = 86400;
let result = config.validate();
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("default_expiry_seconds") && err_msg.contains("max_expiry_seconds"));
config.batches.files.min_expiry_seconds = 2592000;
config.batches.files.default_expiry_seconds = 86400;
config.batches.files.max_expiry_seconds = 3600;
let result = config.validate();
assert!(result.is_err());
}
#[test]
fn test_batch_validation_skipped_when_disabled() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = false; config.background_services.batch_daemon.enabled = DaemonEnabled::Never; config.batches.files.batch_insert_size = 0;
let result = config.validate();
assert!(result.is_ok()); }
#[test]
fn test_batch_insert_size_validated_when_daemon_enabled() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = false; config.background_services.batch_daemon.enabled = DaemonEnabled::Leader; config.batches.files.batch_insert_size = 0;
let result = config.validate();
assert!(result.is_err()); assert!(result.unwrap_err().to_string().contains("batch_insert_size cannot be 0"));
}
#[test]
fn test_download_buffer_validated_when_daemon_enabled() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = false; config.background_services.batch_daemon.enabled = DaemonEnabled::Always; config.batches.files.download_buffer_size = 0;
let result = config.validate();
assert!(result.is_err()); assert!(result.unwrap_err().to_string().contains("download_buffer_size cannot be 0"));
}
#[test]
fn test_batch_insert_size_validated_when_batches_enabled_daemon_never() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = true; config.background_services.batch_daemon.enabled = DaemonEnabled::Never; config.batches.files.batch_insert_size = 0;
let result = config.validate();
assert!(result.is_err()); assert!(result.unwrap_err().to_string().contains("batch_insert_size cannot be 0"));
}
#[test]
fn test_download_buffer_validated_when_batches_enabled_daemon_never() {
let mut config = Config::default();
config.auth.native.enabled = true;
config.secret_key = Some("test-secret-key".to_string());
config.batches.enabled = true; config.background_services.batch_daemon.enabled = DaemonEnabled::Never; config.batches.files.download_buffer_size = 0;
let result = config.validate();
assert!(result.is_err()); assert!(result.unwrap_err().to_string().contains("download_buffer_size cannot be 0"));
}
#[test]
fn test_default_throughput_default_value() {
let config = Config::default();
assert_eq!(config.batches.default_throughput, 100.0);
}
#[test]
fn test_default_throughput_yaml_override() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
default_throughput: 100.0
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.default_throughput, 100.0);
Ok(())
});
}
#[test]
fn test_default_throughput_null_uses_default() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
default_throughput: null
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.default_throughput, 100.0);
Ok(())
});
}
#[test]
fn test_default_throughput_missing_uses_default() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
enabled: true
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.default_throughput, 100.0);
Ok(())
});
}
#[test]
fn test_default_throughput_zero_rejected() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
default_throughput: 0
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let result = Config::load(&args);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("default_throughput must be positive"));
Ok(())
});
}
#[test]
fn test_default_throughput_negative_rejected() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
default_throughput: -10.0
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let result = Config::load(&args);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("default_throughput must be positive"));
Ok(())
});
}
#[test]
fn test_default_throughput_env_override() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
"#,
)?;
jail.set_env("DWCTL_BATCHES__DEFAULT_THROUGHPUT", "75.5");
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.default_throughput, 75.5);
Ok(())
});
}
#[test]
fn test_reservation_ttl_zero_rejected() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
reservation_ttl_secs: 0
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let result = Config::load(&args);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("reservation_ttl_secs must be positive"));
Ok(())
});
}
#[test]
fn test_reservation_ttl_negative_rejected() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
reservation_ttl_secs: -60
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let result = Config::load(&args);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("reservation_ttl_secs must be positive"));
Ok(())
});
}
#[test]
fn test_reservation_ttl_null_uses_default() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
reservation_ttl_secs: null
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.reservation_ttl_secs, 600);
Ok(())
});
}
#[test]
fn test_reservation_ttl_default() {
let config = Config::default();
assert_eq!(config.batches.reservation_ttl_secs, 600);
}
#[test]
fn test_relaxation_factor_defaults_to_one() {
let config = Config::default();
assert_eq!(config.batches.relaxation_factor("1h"), 1.0);
assert_eq!(config.batches.relaxation_factor("24h"), 1.0);
}
#[test]
fn test_relaxation_factor_explicit_value() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
allowed_completion_windows: ["1h", "24h"]
window_relaxation_factors:
"1h": 1.0
"24h": 1.5
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.relaxation_factor("1h"), 1.0);
assert_eq!(config.batches.relaxation_factor("24h"), 1.5);
Ok(())
});
}
#[test]
fn test_relaxation_factor_unknown_window_rejected() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
allowed_completion_windows: ["1h", "24h"]
window_relaxation_factors:
"12h": 1.5
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let result = Config::load(&args);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("12h"));
Ok(())
});
}
#[test]
fn test_relaxation_factor_negative_rejected() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
allowed_completion_windows: ["1h", "24h"]
window_relaxation_factors:
"24h": -0.5
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let result = Config::load(&args);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("window_relaxation_factors"));
Ok(())
});
}
#[test]
fn test_relaxation_factor_zero_allowed() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
allowed_completion_windows: ["1h", "24h"]
window_relaxation_factors:
"1h": 0.0
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.relaxation_factor("1h"), 0.0);
Ok(())
});
}
#[test]
fn test_relaxation_factor_empty_map_backwards_compatible() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
batches:
allowed_completion_windows: ["1h", "24h"]
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.batches.relaxation_factor("1h"), 1.0);
assert_eq!(config.batches.relaxation_factor("24h"), 1.0);
Ok(())
});
}
#[test]
fn test_empty_cookie_domain_env_override_normalized_to_none() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
auth:
native:
session:
cookie_domain: ".doubleword.ai"
"#,
)?;
jail.set_env("DWCTL_AUTH__NATIVE__SESSION__COOKIE_DOMAIN", "");
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.auth.native.session.cookie_domain, None);
Ok(())
});
}
#[test]
fn test_urgency_weight_default() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.background_services.batch_daemon.urgency_weight, 0.5);
Ok(())
});
}
#[test]
fn test_urgency_weight_yaml_override() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
background_services:
batch_daemon:
urgency_weight: 0.8
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.background_services.batch_daemon.urgency_weight, 0.8);
Ok(())
});
}
#[test]
fn test_urgency_weight_negative_rejected() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
background_services:
batch_daemon:
urgency_weight: -0.1
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let result = Config::load(&args);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("urgency_weight must be between 0.0 and 1.0"));
Ok(())
});
}
#[test]
fn test_urgency_weight_above_one_rejected() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
background_services:
batch_daemon:
urgency_weight: 1.5
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let result = Config::load(&args);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("urgency_weight must be between 0.0 and 1.0"));
Ok(())
});
}
#[test]
fn test_urgency_weight_null_uses_default() {
Jail::expect_with(|jail| {
jail.create_file(
"test.yaml",
r#"
secret_key: "test-secret-key"
background_services:
batch_daemon:
urgency_weight: null
"#,
)?;
let args = Args {
config: "test.yaml".into(),
validate: false,
};
let config = Config::load(&args)?;
assert_eq!(config.background_services.batch_daemon.urgency_weight, 0.5);
Ok(())
});
}
}