#[cfg(not(target_arch = "wasm32"))]
use crate::http::h1::listener::Http1ListenerConfig;
#[cfg(not(target_arch = "wasm32"))]
use crate::http::h1::server::Http1Config;
#[cfg(not(target_arch = "wasm32"))]
use crate::http::pool::PoolConfig;
use crate::observability::{LogLevel, ObservabilityConfig};
use crate::security::AuthMode;
use crate::transport::{
AggregatorConfig, ExperimentalTransportGate, PathSelectionPolicy, TransportCodingPolicy,
};
use std::collections::BTreeMap;
#[cfg(not(target_arch = "wasm32"))]
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::time::Duration;
#[derive(Debug, Clone, Default)]
pub struct RaptorQConfig {
pub encoding: EncodingConfig,
pub transport: TransportConfig,
pub resources: ResourceConfig,
pub timeouts: TimeoutConfig,
pub observability: ObservabilityConfig,
pub security: SecurityConfig,
}
impl RaptorQConfig {
pub fn validate(&self) -> Result<(), ConfigError> {
if !self.encoding.repair_overhead.is_finite() || self.encoding.repair_overhead < 1.0 {
return Err(ConfigError::InvalidRepairOverhead);
}
if self.encoding.symbol_size < 8 {
return Err(ConfigError::InvalidSymbolSize);
}
if self.encoding.max_block_size == 0 {
return Err(ConfigError::InvalidMaxBlockSize);
}
if self.encoding.encoding_parallelism == 0 || self.encoding.decoding_parallelism == 0 {
return Err(ConfigError::InvalidParallelism);
}
if self.resources.max_symbol_buffer_memory < 1024 * 1024 {
return Err(ConfigError::InsufficientMemory);
}
if self.timeouts.default_timeout < Duration::from_millis(100) {
return Err(ConfigError::TimeoutTooShort);
}
if !(0.0..=1.0).contains(&self.observability.sample_rate()) {
return Err(ConfigError::InvalidSampleRate(
self.observability.sample_rate(),
));
}
let m = self.transport.dead_path_backoff.multiplier;
if !m.is_finite() || m <= 0.0 {
return Err(ConfigError::InvalidBackoffMultiplier);
}
if self.transport.dead_path_backoff.initial_delay
> self.transport.dead_path_backoff.max_delay
{
return Err(ConfigError::InvalidBackoffRange);
}
Ok(())
}
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub bind_addr: SocketAddr,
pub http: Http1Config,
pub listener: Http1ListenerConfig,
pub pool: PoolConfig,
pub shutdown_timeout: Duration,
pub worker_threads: Option<usize>,
}
#[cfg(not(target_arch = "wasm32"))]
impl Default for ServerConfig {
#[inline]
fn default() -> Self {
Self {
bind_addr: SocketAddr::from(([0, 0, 0, 0], 8080)),
http: Http1Config::default(),
listener: Http1ListenerConfig::default(),
pool: PoolConfig::default(),
shutdown_timeout: Duration::from_secs(30),
worker_threads: None,
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl ServerConfig {
#[inline]
#[must_use]
pub fn from_profile(profile: ServerProfile) -> Self {
match profile {
ServerProfile::Development => Self {
bind_addr: SocketAddr::from(([127, 0, 0, 1], 8080)),
listener: Http1ListenerConfig::default()
.max_connections(Some(100))
.drain_timeout(Duration::from_secs(5)),
shutdown_timeout: Duration::from_secs(5),
..Default::default()
},
ServerProfile::Testing => Self {
bind_addr: SocketAddr::from(([127, 0, 0, 1], 0)),
http: Http1Config::default()
.idle_timeout(Some(Duration::from_secs(5)))
.max_requests(Some(100)),
listener: Http1ListenerConfig::default()
.max_connections(Some(10))
.drain_timeout(Duration::from_millis(500)),
shutdown_timeout: Duration::from_millis(500),
worker_threads: Some(1),
..Default::default()
},
ServerProfile::Production => Self {
bind_addr: SocketAddr::from(([0, 0, 0, 0], 8080)),
http: Http1Config::default()
.max_headers_size(32 * 1024)
.max_body_size(8 * 1024 * 1024)
.max_requests(Some(10_000))
.idle_timeout(Some(Duration::from_mins(2))),
listener: Http1ListenerConfig::default()
.max_connections(Some(50_000))
.drain_timeout(Duration::from_secs(30)),
shutdown_timeout: Duration::from_secs(30),
..Default::default()
},
}
}
#[inline]
#[must_use]
pub fn bind_addr(mut self, addr: SocketAddr) -> Self {
self.bind_addr = addr;
self
}
#[inline]
#[must_use]
pub fn http(mut self, config: Http1Config) -> Self {
self.http = config;
self
}
#[inline]
#[must_use]
pub fn listener(mut self, config: Http1ListenerConfig) -> Self {
self.listener = config;
self
}
#[inline]
#[must_use]
pub fn pool(mut self, config: PoolConfig) -> Self {
self.pool = config;
self
}
#[inline]
#[must_use]
pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
self.shutdown_timeout = timeout;
self
}
#[inline]
#[must_use]
pub fn worker_threads(mut self, threads: Option<usize>) -> Self {
self.worker_threads = threads;
self
}
pub fn validate(&self) -> Result<(), ConfigError> {
if self.shutdown_timeout < Duration::from_millis(100) {
return Err(ConfigError::TimeoutTooShort);
}
if let Some(threads) = self.worker_threads {
if threads == 0 {
return Err(ConfigError::InvalidParallelism);
}
}
if let Some(max) = self.listener.max_connections {
if max == 0 {
return Err(ConfigError::Parse("max_connections must be > 0".to_owned()));
}
}
Ok(())
}
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServerProfile {
Development,
Testing,
Production,
}
#[derive(Debug, Clone)]
pub struct EncodingConfig {
pub repair_overhead: f64,
pub max_block_size: usize,
pub symbol_size: u16,
pub encoding_parallelism: usize,
pub decoding_parallelism: usize,
}
impl Default for EncodingConfig {
#[inline]
fn default() -> Self {
Self {
repair_overhead: 1.05,
max_block_size: 1024 * 1024,
symbol_size: 256,
encoding_parallelism: 2,
decoding_parallelism: 2,
}
}
}
#[derive(Debug, Clone)]
pub struct TransportConfig {
pub max_paths: usize,
pub health_check_interval: Duration,
pub dead_path_backoff: BackoffConfig,
pub max_symbols_in_flight: usize,
pub path_strategy: PathSelectionStrategy,
pub experiment_gate: ExperimentalTransportGate,
pub coding_policy: TransportCodingPolicy,
}
impl Default for TransportConfig {
#[inline]
fn default() -> Self {
Self {
max_paths: 4,
health_check_interval: Duration::from_secs(5),
dead_path_backoff: BackoffConfig::default(),
max_symbols_in_flight: 256,
path_strategy: PathSelectionStrategy::RoundRobin,
experiment_gate: ExperimentalTransportGate::Disabled,
coding_policy: TransportCodingPolicy::Disabled,
}
}
}
impl TransportConfig {
#[inline]
#[must_use]
pub fn aggregator_config(&self) -> AggregatorConfig {
let path_policy = match self.path_strategy {
PathSelectionStrategy::RoundRobin | PathSelectionStrategy::Random => {
PathSelectionPolicy::RoundRobin
}
PathSelectionStrategy::LatencyWeighted
| PathSelectionStrategy::Adaptive(AdaptiveConfig { .. }) => {
PathSelectionPolicy::BestQuality { count: 1 }
}
};
AggregatorConfig {
path_policy,
experiment_gate: self.experiment_gate,
coding_policy: self.coding_policy,
..AggregatorConfig::default()
}
}
}
#[derive(Debug, Clone)]
pub struct BackoffConfig {
pub initial_delay: Duration,
pub max_delay: Duration,
pub multiplier: f64,
}
impl Default for BackoffConfig {
#[inline]
fn default() -> Self {
Self {
initial_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
multiplier: 2.0,
}
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveConfig {
pub min_samples: usize,
pub decay: f64,
}
impl Default for AdaptiveConfig {
#[inline]
fn default() -> Self {
Self {
min_samples: 16,
decay: 0.9,
}
}
}
#[derive(Debug, Clone)]
pub enum PathSelectionStrategy {
RoundRobin,
LatencyWeighted,
Adaptive(AdaptiveConfig),
Random,
}
#[derive(Debug, Clone)]
pub struct ResourceConfig {
pub max_symbol_buffer_memory: usize,
pub max_encoding_ops: usize,
pub max_decoding_ops: usize,
pub symbol_pool_size: usize,
}
impl Default for ResourceConfig {
fn default() -> Self {
Self {
max_symbol_buffer_memory: 64 * 1024 * 1024,
max_encoding_ops: 8,
max_decoding_ops: 8,
symbol_pool_size: 1024,
}
}
}
#[derive(Debug, Clone)]
pub struct TimeoutConfig {
pub default_timeout: Duration,
pub encoding_timeout: Duration,
pub decoding_timeout: Duration,
pub path_timeout: Duration,
pub quorum_timeout: Duration,
}
impl Default for TimeoutConfig {
fn default() -> Self {
Self {
default_timeout: Duration::from_secs(30),
encoding_timeout: Duration::from_secs(30),
decoding_timeout: Duration::from_secs(30),
path_timeout: Duration::from_secs(10),
quorum_timeout: Duration::from_secs(10),
}
}
}
#[derive(Debug, Clone)]
pub struct SecurityConfig {
pub auth_mode: AuthMode,
pub auth_key_seed: Option<u64>,
pub reject_unauthenticated: bool,
}
impl Default for SecurityConfig {
fn default() -> Self {
Self {
auth_mode: AuthMode::Strict,
auth_key_seed: None,
reject_unauthenticated: true,
}
}
}
#[derive(Debug, Clone)]
pub enum RuntimeProfile {
Development,
Testing,
Staging,
Production,
HighThroughput,
LowLatency,
Custom(Box<RaptorQConfig>),
}
impl RuntimeProfile {
#[must_use]
pub fn to_config(&self) -> RaptorQConfig {
match self {
Self::Development => {
let mut config = RaptorQConfig::default();
config.encoding.repair_overhead = 1.1;
config.encoding.symbol_size = 256;
config.encoding.encoding_parallelism = 2;
config.observability = ObservabilityConfig::development();
config
}
Self::Testing => RaptorQConfig {
observability: ObservabilityConfig::testing(),
..Default::default()
},
Self::Staging => {
let mut config = RaptorQConfig::default();
config.encoding.repair_overhead = 1.05;
config.observability = ObservabilityConfig::production()
.with_sample_rate(0.05)
.with_log_level(LogLevel::Info);
config
}
Self::Production => {
let mut config = RaptorQConfig::default();
config.encoding.repair_overhead = 1.02;
config.encoding.symbol_size = 1024;
config.encoding.encoding_parallelism = available_parallelism();
config.encoding.decoding_parallelism = available_parallelism();
config.observability = ObservabilityConfig::production();
config
}
Self::HighThroughput => {
let mut config = RaptorQConfig::default();
config.encoding.repair_overhead = 1.05;
config.encoding.symbol_size = 1024;
config.encoding.encoding_parallelism = available_parallelism();
config.encoding.decoding_parallelism = available_parallelism();
config.resources.max_symbol_buffer_memory = 512 * 1024 * 1024;
config.resources.symbol_pool_size = 8192;
config
}
Self::LowLatency => {
let mut config = RaptorQConfig::default();
config.encoding.repair_overhead = 1.01;
config.encoding.symbol_size = 128;
config.timeouts.default_timeout = Duration::from_secs(5);
config.timeouts.encoding_timeout = Duration::from_secs(5);
config.timeouts.decoding_timeout = Duration::from_secs(5);
config
}
Self::Custom(config) => config.as_ref().clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct ConfigLoader {
profile: RuntimeProfile,
file_path: Option<PathBuf>,
overrides: BTreeMap<String, String>,
}
impl ConfigLoader {
#[must_use]
pub fn new() -> Self {
Self {
profile: RuntimeProfile::Development,
file_path: None,
overrides: BTreeMap::new(),
}
}
#[must_use]
pub fn profile(mut self, profile: RuntimeProfile) -> Self {
self.profile = profile;
self
}
#[must_use]
pub fn file(mut self, path: impl Into<PathBuf>) -> Self {
self.file_path = Some(path.into());
self
}
#[must_use]
pub fn override_value(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.overrides.insert(key.into(), value.into());
self
}
pub fn load(&self) -> Result<RaptorQConfig, ConfigError> {
let mut config = if let Some(path) = &self.file_path {
load_from_file(path, &self.profile)?
} else {
self.profile.to_config()
};
apply_env_overrides(&mut config)?;
apply_overrides(&mut config, &self.overrides)?;
config.validate()?;
Ok(config)
}
}
impl Default for ConfigLoader {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub enum ConfigError {
Io(std::io::Error),
Parse(String),
InvalidRepairOverhead,
InvalidSymbolSize,
InvalidMaxBlockSize,
InvalidParallelism,
InsufficientMemory,
TimeoutTooShort,
InvalidSampleRate(f64),
InvalidOverride(String),
InvalidBackoffMultiplier,
InvalidBackoffRange,
}
impl std::fmt::Display for ConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(err) => write!(f, "config I/O error: {err}"),
Self::Parse(err) => write!(f, "config parse error: {err}"),
Self::InvalidRepairOverhead => write!(f, "repair_overhead must be >= 1.0"),
Self::InvalidSymbolSize => write!(f, "symbol_size out of range"),
Self::InvalidMaxBlockSize => write!(f, "max_block_size must be > 0"),
Self::InvalidParallelism => write!(f, "parallelism must be > 0"),
Self::InsufficientMemory => write!(f, "max_symbol_buffer_memory too small"),
Self::TimeoutTooShort => write!(f, "default_timeout too short"),
Self::InvalidSampleRate(value) => {
write!(f, "sample_rate out of range: {value}")
}
Self::InvalidOverride(key) => write!(f, "invalid override: {key}"),
Self::InvalidBackoffMultiplier => {
write!(f, "backoff multiplier must be finite and positive")
}
Self::InvalidBackoffRange => {
write!(f, "backoff initial_delay must not exceed max_delay")
}
}
}
}
impl std::error::Error for ConfigError {}
impl From<std::io::Error> for ConfigError {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
fn available_parallelism() -> usize {
std::thread::available_parallelism()
.map_or(1, std::num::NonZeroUsize::get)
.max(1)
}
fn load_from_file(path: &Path, profile: &RuntimeProfile) -> Result<RaptorQConfig, ConfigError> {
let contents = std::fs::read_to_string(path)?;
let base = profile.to_config();
parse_config(&contents, base)
}
fn apply_env_overrides(config: &mut RaptorQConfig) -> Result<(), ConfigError> {
let mut overrides = BTreeMap::new();
for (key, value) in std::env::vars() {
if key.starts_with("RAPTORQ_") {
overrides.insert(key, value);
}
}
apply_overrides(config, &overrides)
}
fn apply_overrides(
config: &mut RaptorQConfig,
overrides: &BTreeMap<String, String>,
) -> Result<(), ConfigError> {
for (key, value) in overrides {
apply_env_override(config, key, value)?;
}
Ok(())
}
#[allow(clippy::too_many_lines)]
fn apply_env_override(
config: &mut RaptorQConfig,
key: &str,
value: &str,
) -> Result<(), ConfigError> {
match key {
"RAPTORQ_ENCODING_REPAIR_OVERHEAD" => {
config.encoding.repair_overhead = parse_f64(value, key)?;
}
"RAPTORQ_ENCODING_MAX_BLOCK_SIZE" => {
config.encoding.max_block_size = parse_usize(value, key)?;
}
"RAPTORQ_ENCODING_SYMBOL_SIZE" => {
config.encoding.symbol_size = parse_u16(value, key)?;
}
"RAPTORQ_ENCODING_PARALLELISM" => {
let parallelism = parse_usize(value, key)?;
config.encoding.encoding_parallelism = parallelism;
config.encoding.decoding_parallelism = parallelism;
}
"RAPTORQ_ENCODING_ENCODING_PARALLELISM" => {
config.encoding.encoding_parallelism = parse_usize(value, key)?;
}
"RAPTORQ_ENCODING_DECODING_PARALLELISM" => {
config.encoding.decoding_parallelism = parse_usize(value, key)?;
}
"RAPTORQ_TRANSPORT_MAX_PATHS" => {
config.transport.max_paths = parse_usize(value, key)?;
}
"RAPTORQ_TRANSPORT_HEALTH_CHECK_INTERVAL_MS" => {
config.transport.health_check_interval = parse_duration_ms(value, key)?;
}
"RAPTORQ_TRANSPORT_DEAD_PATH_BACKOFF_INITIAL_MS" => {
config.transport.dead_path_backoff.initial_delay = parse_duration_ms(value, key)?;
}
"RAPTORQ_TRANSPORT_DEAD_PATH_BACKOFF_MAX_MS" => {
config.transport.dead_path_backoff.max_delay = parse_duration_ms(value, key)?;
}
"RAPTORQ_TRANSPORT_DEAD_PATH_BACKOFF_MULTIPLIER" => {
config.transport.dead_path_backoff.multiplier = parse_f64(value, key)?;
}
"RAPTORQ_TRANSPORT_MAX_SYMBOLS_IN_FLIGHT" => {
config.transport.max_symbols_in_flight = parse_usize(value, key)?;
}
"RAPTORQ_TRANSPORT_PATH_STRATEGY" => {
config.transport.path_strategy = parse_path_strategy(value, key)?;
}
"RAPTORQ_RESOURCES_MAX_SYMBOL_BUFFER_MEMORY" => {
config.resources.max_symbol_buffer_memory = parse_usize(value, key)?;
}
"RAPTORQ_RESOURCES_MAX_ENCODING_OPS" => {
config.resources.max_encoding_ops = parse_usize(value, key)?;
}
"RAPTORQ_RESOURCES_MAX_DECODING_OPS" => {
config.resources.max_decoding_ops = parse_usize(value, key)?;
}
"RAPTORQ_RESOURCES_SYMBOL_POOL_SIZE" => {
config.resources.symbol_pool_size = parse_usize(value, key)?;
}
"RAPTORQ_TIMEOUTS_DEFAULT_TIMEOUT_MS" => {
config.timeouts.default_timeout = parse_duration_ms(value, key)?;
}
"RAPTORQ_TIMEOUTS_ENCODING_TIMEOUT_MS" => {
config.timeouts.encoding_timeout = parse_duration_ms(value, key)?;
}
"RAPTORQ_TIMEOUTS_DECODING_TIMEOUT_MS" => {
config.timeouts.decoding_timeout = parse_duration_ms(value, key)?;
}
"RAPTORQ_TIMEOUTS_PATH_TIMEOUT_MS" => {
config.timeouts.path_timeout = parse_duration_ms(value, key)?;
}
"RAPTORQ_TIMEOUTS_QUORUM_TIMEOUT_MS" => {
config.timeouts.quorum_timeout = parse_duration_ms(value, key)?;
}
"RAPTORQ_OBSERVABILITY_LOG_LEVEL" => {
let level = parse_log_level(value, key)?;
config.observability = config.observability.clone().with_log_level(level);
}
"RAPTORQ_OBSERVABILITY_TRACE_ALL_SYMBOLS" => {
let trace = parse_bool(value, key)?;
config.observability = config.observability.clone().with_trace_all_symbols(trace);
}
"RAPTORQ_OBSERVABILITY_SAMPLE_RATE" => {
let rate = parse_sample_rate(value, key)?;
config.observability = config.observability.clone().with_sample_rate(rate);
}
"RAPTORQ_OBSERVABILITY_MAX_SPANS" => {
let max = parse_usize(value, key)?;
config.observability = config.observability.clone().with_max_spans(max);
}
"RAPTORQ_OBSERVABILITY_MAX_LOG_ENTRIES" => {
let max = parse_usize(value, key)?;
config.observability = config.observability.clone().with_max_log_entries(max);
}
"RAPTORQ_OBSERVABILITY_INCLUDE_TIMESTAMPS" => {
let include = parse_bool(value, key)?;
config.observability = config
.observability
.clone()
.with_include_timestamps(include);
}
"RAPTORQ_OBSERVABILITY_METRICS_ENABLED" => {
let enabled = parse_bool(value, key)?;
config.observability = config.observability.clone().with_metrics_enabled(enabled);
}
"RAPTORQ_SECURITY_AUTH_MODE" => {
config.security.auth_mode = parse_auth_mode(value, key)?;
}
"RAPTORQ_SECURITY_AUTH_KEY_SEED" => {
config.security.auth_key_seed = Some(parse_u64(value, key)?);
}
"RAPTORQ_SECURITY_REJECT_UNAUTHENTICATED" => {
config.security.reject_unauthenticated = parse_bool(value, key)?;
}
_ => return Err(ConfigError::InvalidOverride(key.to_string())),
}
Ok(())
}
fn parse_config(contents: &str, base: RaptorQConfig) -> Result<RaptorQConfig, ConfigError> {
let mut config = base;
let mut section = String::new();
for (line_idx, raw) in contents.lines().enumerate() {
let line = raw.trim();
if line.is_empty() || line.starts_with('#') || line.starts_with("//") {
continue;
}
if line.starts_with('[') && line.ends_with(']') {
section = line[1..line.len() - 1].trim().to_lowercase();
continue;
}
let (key, value) = line
.split_once('=')
.ok_or_else(|| ConfigError::Parse(format!("line {}: {}", line_idx + 1, line)))?;
let key = key.trim();
let value = value.trim().trim_matches('"');
apply_section_kv(&mut config, §ion, key, value)?;
}
Ok(config)
}
fn apply_section_kv(
config: &mut RaptorQConfig,
section: &str,
key: &str,
value: &str,
) -> Result<(), ConfigError> {
match section {
"encoding" => apply_encoding_kv(&mut config.encoding, key, value),
"transport" => apply_transport_kv(&mut config.transport, key, value),
"resources" => apply_resource_kv(&mut config.resources, key, value),
"timeouts" => apply_timeout_kv(&mut config.timeouts, key, value),
"observability" => apply_observability_kv(&mut config.observability, key, value),
"security" => apply_security_kv(&mut config.security, key, value),
"" => Err(ConfigError::Parse(format!(
"missing section for key: {key}"
))),
_ => Err(ConfigError::Parse(format!("unknown section: {section}"))),
}
}
fn apply_encoding_kv(
encoding: &mut EncodingConfig,
key: &str,
value: &str,
) -> Result<(), ConfigError> {
match key {
"repair_overhead" => encoding.repair_overhead = parse_f64(value, key)?,
"max_block_size" => encoding.max_block_size = parse_usize(value, key)?,
"symbol_size" => encoding.symbol_size = parse_u16(value, key)?,
"encoding_parallelism" => encoding.encoding_parallelism = parse_usize(value, key)?,
"decoding_parallelism" => encoding.decoding_parallelism = parse_usize(value, key)?,
_ => return Err(ConfigError::Parse(format!("unknown key: encoding.{key}"))),
}
Ok(())
}
fn apply_transport_kv(
transport: &mut TransportConfig,
key: &str,
value: &str,
) -> Result<(), ConfigError> {
match key {
"max_paths" => transport.max_paths = parse_usize(value, key)?,
"health_check_interval_ms" => {
transport.health_check_interval = parse_duration_ms(value, key)?;
}
"dead_path_backoff_initial_ms" => {
transport.dead_path_backoff.initial_delay = parse_duration_ms(value, key)?;
}
"dead_path_backoff_max_ms" => {
transport.dead_path_backoff.max_delay = parse_duration_ms(value, key)?;
}
"dead_path_backoff_multiplier" => {
transport.dead_path_backoff.multiplier = parse_f64(value, key)?;
}
"max_symbols_in_flight" => transport.max_symbols_in_flight = parse_usize(value, key)?,
"path_strategy" => transport.path_strategy = parse_path_strategy(value, key)?,
"experiment_gate" => transport.experiment_gate = parse_experiment_gate(value, key)?,
"coding_policy" => transport.coding_policy = parse_transport_coding_policy(value, key)?,
_ => return Err(ConfigError::Parse(format!("unknown key: transport.{key}"))),
}
Ok(())
}
fn apply_resource_kv(
resources: &mut ResourceConfig,
key: &str,
value: &str,
) -> Result<(), ConfigError> {
match key {
"max_symbol_buffer_memory" => {
resources.max_symbol_buffer_memory = parse_usize(value, key)?;
}
"max_encoding_ops" => resources.max_encoding_ops = parse_usize(value, key)?,
"max_decoding_ops" => resources.max_decoding_ops = parse_usize(value, key)?,
"symbol_pool_size" => resources.symbol_pool_size = parse_usize(value, key)?,
_ => return Err(ConfigError::Parse(format!("unknown key: resources.{key}"))),
}
Ok(())
}
fn apply_timeout_kv(
timeouts: &mut TimeoutConfig,
key: &str,
value: &str,
) -> Result<(), ConfigError> {
match key {
"default_timeout_ms" => timeouts.default_timeout = parse_duration_ms(value, key)?,
"encoding_timeout_ms" => timeouts.encoding_timeout = parse_duration_ms(value, key)?,
"decoding_timeout_ms" => timeouts.decoding_timeout = parse_duration_ms(value, key)?,
"path_timeout_ms" => timeouts.path_timeout = parse_duration_ms(value, key)?,
"quorum_timeout_ms" => timeouts.quorum_timeout = parse_duration_ms(value, key)?,
_ => return Err(ConfigError::Parse(format!("unknown key: timeouts.{key}"))),
}
Ok(())
}
fn apply_observability_kv(
observability: &mut ObservabilityConfig,
key: &str,
value: &str,
) -> Result<(), ConfigError> {
match key {
"log_level" => {
let level = parse_log_level(value, key)?;
*observability = observability.clone().with_log_level(level);
}
"trace_all_symbols" => {
let trace = parse_bool(value, key)?;
*observability = observability.clone().with_trace_all_symbols(trace);
}
"sample_rate" => {
let rate = parse_sample_rate(value, key)?;
*observability = observability.clone().with_sample_rate(rate);
}
"max_spans" => {
let max = parse_usize(value, key)?;
*observability = observability.clone().with_max_spans(max);
}
"max_log_entries" => {
let max = parse_usize(value, key)?;
*observability = observability.clone().with_max_log_entries(max);
}
"include_timestamps" => {
let include = parse_bool(value, key)?;
*observability = observability.clone().with_include_timestamps(include);
}
"metrics_enabled" => {
let enabled = parse_bool(value, key)?;
*observability = observability.clone().with_metrics_enabled(enabled);
}
_ => {
return Err(ConfigError::Parse(format!(
"unknown key: observability.{key}"
)));
}
}
Ok(())
}
fn apply_security_kv(
security: &mut SecurityConfig,
key: &str,
value: &str,
) -> Result<(), ConfigError> {
match key {
"auth_mode" => security.auth_mode = parse_auth_mode(value, key)?,
"auth_key_seed" => security.auth_key_seed = Some(parse_u64(value, key)?),
"reject_unauthenticated" => security.reject_unauthenticated = parse_bool(value, key)?,
_ => return Err(ConfigError::Parse(format!("unknown key: security.{key}"))),
}
Ok(())
}
fn parse_u64(value: &str, key: &str) -> Result<u64, ConfigError> {
value
.parse::<u64>()
.map_err(|_| ConfigError::Parse(format!("invalid u64 for {key}: {value}")))
}
fn parse_usize(value: &str, key: &str) -> Result<usize, ConfigError> {
value
.parse::<usize>()
.map_err(|_| ConfigError::Parse(format!("invalid usize for {key}: {value}")))
}
fn parse_u16(value: &str, key: &str) -> Result<u16, ConfigError> {
value
.parse::<u16>()
.map_err(|_| ConfigError::Parse(format!("invalid u16 for {key}: {value}")))
}
fn parse_f64(value: &str, key: &str) -> Result<f64, ConfigError> {
value
.parse::<f64>()
.map_err(|_| ConfigError::Parse(format!("invalid f64 for {key}: {value}")))
}
fn parse_sample_rate(value: &str, key: &str) -> Result<f64, ConfigError> {
let rate = parse_f64(value, key)?;
if (0.0..=1.0).contains(&rate) {
Ok(rate)
} else {
Err(ConfigError::InvalidSampleRate(rate))
}
}
fn parse_bool(value: &str, key: &str) -> Result<bool, ConfigError> {
match value.to_lowercase().as_str() {
"true" | "1" | "yes" => Ok(true),
"false" | "0" | "no" => Ok(false),
_ => Err(ConfigError::Parse(format!(
"invalid bool for {key}: {value}"
))),
}
}
fn parse_duration_ms(value: &str, key: &str) -> Result<Duration, ConfigError> {
let millis = parse_u64(value, key)?;
Ok(Duration::from_millis(millis))
}
fn parse_log_level(value: &str, key: &str) -> Result<LogLevel, ConfigError> {
value
.parse::<LogLevel>()
.map_err(|err| ConfigError::Parse(format!("invalid log level for {key}: {err}")))
}
fn parse_auth_mode(value: &str, key: &str) -> Result<AuthMode, ConfigError> {
match value.to_lowercase().as_str() {
"strict" => Ok(AuthMode::Strict),
"permissive" => Ok(AuthMode::Permissive),
"disabled" => Ok(AuthMode::Disabled),
_ => Err(ConfigError::Parse(format!(
"invalid auth mode for {key}: {value}"
))),
}
}
fn parse_path_strategy(value: &str, key: &str) -> Result<PathSelectionStrategy, ConfigError> {
match value.to_lowercase().as_str() {
"round_robin" => Ok(PathSelectionStrategy::RoundRobin),
"latency_weighted" => Ok(PathSelectionStrategy::LatencyWeighted),
"random" => Ok(PathSelectionStrategy::Random),
"adaptive" => Ok(PathSelectionStrategy::Adaptive(AdaptiveConfig::default())),
_ => Err(ConfigError::Parse(format!(
"invalid path strategy for {key}: {value}"
))),
}
}
fn parse_experiment_gate(value: &str, key: &str) -> Result<ExperimentalTransportGate, ConfigError> {
match value.to_lowercase().as_str() {
"disabled" => Ok(ExperimentalTransportGate::Disabled),
"multipath_preview" => Ok(ExperimentalTransportGate::MultipathPreview),
_ => Err(ConfigError::Parse(format!(
"invalid experiment gate for {key}: {value}"
))),
}
}
fn parse_transport_coding_policy(
value: &str,
key: &str,
) -> Result<TransportCodingPolicy, ConfigError> {
match value.to_lowercase().as_str() {
"disabled" => Ok(TransportCodingPolicy::Disabled),
"raptorq_fec_preview" => Ok(TransportCodingPolicy::RaptorQFecPreview),
"rlnc_preview" => Ok(TransportCodingPolicy::RlncPreview),
_ => Err(ConfigError::Parse(format!(
"invalid coding policy for {key}: {value}"
))),
}
}
#[cfg(test)]
#[allow(unsafe_code)]
mod tests {
use super::*;
#[cfg(not(target_arch = "wasm32"))]
mod native_server_config_tests {
use super::*;
use std::net::SocketAddr;
#[test]
fn server_config_default_valid() {
let config = ServerConfig::default();
assert!(config.validate().is_ok());
}
#[test]
fn server_config_profiles_valid() {
for profile in [
ServerProfile::Development,
ServerProfile::Testing,
ServerProfile::Production,
] {
let config = ServerConfig::from_profile(profile);
assert!(config.validate().is_ok(), "Profile {profile:?} invalid");
}
}
#[test]
fn server_config_builder() {
let config = ServerConfig::default()
.bind_addr(SocketAddr::from(([127, 0, 0, 1], 9090)))
.shutdown_timeout(Duration::from_secs(60))
.worker_threads(Some(4));
assert_eq!(config.bind_addr.port(), 9090);
assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
assert_eq!(config.worker_threads, Some(4));
assert!(config.validate().is_ok());
}
#[test]
fn server_config_validation_errors() {
let config = ServerConfig::default().shutdown_timeout(Duration::from_millis(10));
assert!(matches!(
config.validate(),
Err(ConfigError::TimeoutTooShort)
));
let config = ServerConfig::default().worker_threads(Some(0));
assert!(matches!(
config.validate(),
Err(ConfigError::InvalidParallelism)
));
}
#[test]
fn server_config_testing_profile() {
let config = ServerConfig::from_profile(ServerProfile::Testing);
assert_eq!(config.bind_addr.port(), 0); assert_eq!(config.worker_threads, Some(1));
assert_eq!(config.listener.max_connections, Some(10));
}
#[test]
fn server_config_production_profile() {
let config = ServerConfig::from_profile(ServerProfile::Production);
assert_eq!(config.bind_addr.port(), 8080);
assert_eq!(config.listener.max_connections, Some(50_000));
assert_eq!(config.http.max_body_size, 8 * 1024 * 1024);
}
#[test]
fn server_profile_debug_clone_copy_eq() {
let p = ServerProfile::Development;
let cloned = p;
let copied = p;
assert_eq!(cloned, copied);
assert_ne!(p, ServerProfile::Production);
}
#[test]
fn server_config_debug_clone() {
let config = ServerConfig::default();
let dbg = format!("{config:?}");
assert!(dbg.contains("ServerConfig"));
let cloned = config.clone();
assert_eq!(cloned.bind_addr, config.bind_addr);
}
}
#[test]
fn default_config_valid() {
let config = RaptorQConfig::default();
assert!(config.validate().is_ok());
}
#[test]
fn profile_configs_valid() {
for profile in [
RuntimeProfile::Development,
RuntimeProfile::Testing,
RuntimeProfile::Staging,
RuntimeProfile::Production,
RuntimeProfile::HighThroughput,
RuntimeProfile::LowLatency,
] {
let config = profile.to_config();
assert!(config.validate().is_ok(), "Profile {profile:?} invalid");
}
}
#[test]
fn env_override_symbol_size() {
let _guard = crate::test_utils::env_lock();
unsafe { std::env::set_var("RAPTORQ_ENCODING_SYMBOL_SIZE", "512") };
let config = ConfigLoader::default().load().unwrap();
assert_eq!(config.encoding.symbol_size, 512);
unsafe { std::env::remove_var("RAPTORQ_ENCODING_SYMBOL_SIZE") };
}
#[test]
fn file_loading_minimal() {
let input = r"
[encoding]
symbol_size = 512
repair_overhead = 1.2
[timeouts]
default_timeout_ms = 5000
";
let base = RuntimeProfile::Development.to_config();
let config = parse_config(input, base).unwrap();
assert_eq!(config.encoding.symbol_size, 512);
assert!((config.encoding.repair_overhead - 1.2).abs() < f64::EPSILON);
assert_eq!(config.timeouts.default_timeout, Duration::from_secs(5));
}
#[test]
fn invalid_repair_overhead() {
let mut config = RaptorQConfig::default();
config.encoding.repair_overhead = 0.5;
assert!(matches!(
config.validate(),
Err(ConfigError::InvalidRepairOverhead)
));
}
#[test]
fn repair_overhead_rejects_nan_and_infinity() {
let mut config = RaptorQConfig::default();
config.encoding.repair_overhead = f64::NAN;
assert!(
matches!(config.validate(), Err(ConfigError::InvalidRepairOverhead)),
"NaN must be rejected"
);
config.encoding.repair_overhead = f64::INFINITY;
assert!(
matches!(config.validate(), Err(ConfigError::InvalidRepairOverhead)),
"Infinity must be rejected"
);
config.encoding.repair_overhead = f64::NEG_INFINITY;
assert!(
matches!(config.validate(), Err(ConfigError::InvalidRepairOverhead)),
"Negative infinity must be rejected"
);
}
#[test]
fn invalid_symbol_size() {
let mut config = RaptorQConfig::default();
config.encoding.symbol_size = 4;
assert!(matches!(
config.validate(),
Err(ConfigError::InvalidSymbolSize)
));
}
#[test]
fn invalid_max_block_size() {
let mut config = RaptorQConfig::default();
config.encoding.max_block_size = 0;
assert!(matches!(
config.validate(),
Err(ConfigError::InvalidMaxBlockSize)
));
}
#[test]
fn invalid_parallelism_encoding() {
let mut config = RaptorQConfig::default();
config.encoding.encoding_parallelism = 0;
assert!(matches!(
config.validate(),
Err(ConfigError::InvalidParallelism)
));
}
#[test]
fn insufficient_memory() {
let mut config = RaptorQConfig::default();
config.resources.max_symbol_buffer_memory = 512;
assert!(matches!(
config.validate(),
Err(ConfigError::InsufficientMemory)
));
}
#[test]
fn timeout_too_short() {
let mut config = RaptorQConfig::default();
config.timeouts.default_timeout = Duration::from_millis(10);
assert!(matches!(
config.validate(),
Err(ConfigError::TimeoutTooShort)
));
}
#[test]
fn config_error_display() {
let err = ConfigError::InvalidRepairOverhead;
assert!(format!("{err}").contains("repair_overhead"));
let err = ConfigError::InvalidSymbolSize;
assert!(format!("{err}").contains("symbol_size"));
let err = ConfigError::InvalidMaxBlockSize;
assert!(format!("{err}").contains("max_block_size"));
let err = ConfigError::InvalidParallelism;
assert!(format!("{err}").contains("parallelism"));
let err = ConfigError::InsufficientMemory;
assert!(format!("{err}").contains("memory"));
let err = ConfigError::TimeoutTooShort;
assert!(format!("{err}").contains("timeout"));
let err = ConfigError::InvalidSampleRate(1.5);
assert!(format!("{err}").contains("sample_rate"));
}
#[test]
fn config_error_debug() {
let err = ConfigError::InvalidRepairOverhead;
let dbg = format!("{err:?}");
assert!(dbg.contains("InvalidRepairOverhead"));
}
#[test]
fn config_error_display_io() {
let err = ConfigError::Io(std::io::Error::new(std::io::ErrorKind::NotFound, "missing"));
assert!(err.to_string().contains("I/O"));
}
#[test]
fn config_error_display_parse() {
let err = ConfigError::Parse("bad value".into());
assert!(err.to_string().contains("parse error"));
}
#[test]
fn config_error_display_invalid_override() {
let err = ConfigError::InvalidOverride("BAD_KEY".into());
assert!(err.to_string().contains("BAD_KEY"));
}
#[test]
fn config_error_source() {
use std::error::Error;
let err = ConfigError::InvalidRepairOverhead;
assert!(err.source().is_none());
let err = ConfigError::Io(std::io::Error::new(std::io::ErrorKind::NotFound, "missing"));
assert!(err.source().is_none());
}
#[test]
fn config_error_from_io() {
let io_err = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "denied");
let config_err = ConfigError::from(io_err);
assert!(matches!(config_err, ConfigError::Io(_)));
}
#[test]
fn raptorq_config_debug_clone() {
let config = RaptorQConfig::default();
let dbg = format!("{config:?}");
assert!(dbg.contains("RaptorQConfig"));
let cloned = config.clone();
assert_eq!(cloned.encoding.symbol_size, config.encoding.symbol_size);
}
#[test]
fn encoding_config_default() {
let enc = EncodingConfig::default();
assert!((enc.repair_overhead - 1.05).abs() < f64::EPSILON);
assert_eq!(enc.symbol_size, 256);
assert_eq!(enc.encoding_parallelism, 2);
}
#[test]
fn transport_config_default() {
let tc = TransportConfig::default();
assert_eq!(tc.max_paths, 4);
assert_eq!(tc.max_symbols_in_flight, 256);
assert_eq!(tc.experiment_gate, ExperimentalTransportGate::Disabled);
assert_eq!(tc.coding_policy, TransportCodingPolicy::Disabled);
}
#[test]
fn backoff_config_debug_clone_default() {
let bc = BackoffConfig::default();
let dbg = format!("{bc:?}");
assert!(dbg.contains("BackoffConfig"));
let cloned = bc;
assert_eq!(cloned.initial_delay, Duration::from_millis(100));
assert_eq!(cloned.max_delay, Duration::from_secs(10));
}
#[test]
fn adaptive_config_debug_clone_default() {
let ac = AdaptiveConfig::default();
let dbg = format!("{ac:?}");
assert!(dbg.contains("AdaptiveConfig"));
let cloned = ac;
assert_eq!(cloned.min_samples, 16);
}
#[test]
fn path_selection_strategy_debug_clone() {
let s = PathSelectionStrategy::RoundRobin;
let dbg = format!("{s:?}");
assert!(dbg.contains("RoundRobin"));
let s = PathSelectionStrategy::Adaptive(AdaptiveConfig::default());
let cloned = s;
let dbg = format!("{cloned:?}");
assert!(dbg.contains("Adaptive"));
}
#[test]
fn transport_config_aggregator_config_is_fail_closed_by_default() {
let transport = TransportConfig::default();
let aggregator = transport.aggregator_config();
assert_eq!(aggregator.path_policy, PathSelectionPolicy::RoundRobin);
assert_eq!(
aggregator.experiment_gate,
ExperimentalTransportGate::Disabled
);
assert_eq!(aggregator.coding_policy, TransportCodingPolicy::Disabled);
}
#[test]
fn transport_config_aggregator_config_maps_preview_fields() {
let transport = TransportConfig {
path_strategy: PathSelectionStrategy::LatencyWeighted,
experiment_gate: ExperimentalTransportGate::MultipathPreview,
coding_policy: TransportCodingPolicy::RaptorQFecPreview,
..TransportConfig::default()
};
let aggregator = transport.aggregator_config();
assert_eq!(
aggregator.path_policy,
PathSelectionPolicy::BestQuality { count: 1 }
);
assert_eq!(
aggregator.experiment_gate,
ExperimentalTransportGate::MultipathPreview
);
assert_eq!(
aggregator.coding_policy,
TransportCodingPolicy::RaptorQFecPreview
);
}
#[test]
fn parse_transport_preview_fields() {
let mut transport = TransportConfig::default();
apply_transport_kv(&mut transport, "experiment_gate", "multipath_preview").unwrap();
apply_transport_kv(&mut transport, "coding_policy", "rlnc_preview").unwrap();
assert_eq!(
transport.experiment_gate,
ExperimentalTransportGate::MultipathPreview
);
assert_eq!(transport.coding_policy, TransportCodingPolicy::RlncPreview);
}
#[test]
fn resource_config_debug_clone_default() {
let rc = ResourceConfig::default();
let dbg = format!("{rc:?}");
assert!(dbg.contains("ResourceConfig"));
let cloned = rc;
assert_eq!(cloned.max_encoding_ops, 8);
}
#[test]
fn timeout_config_debug_clone_default() {
let tc = TimeoutConfig::default();
let dbg = format!("{tc:?}");
assert!(dbg.contains("TimeoutConfig"));
let cloned = tc;
assert_eq!(cloned.default_timeout, Duration::from_secs(30));
}
#[test]
fn security_config_debug_clone_default() {
let sc = SecurityConfig::default();
let dbg = format!("{sc:?}");
assert!(dbg.contains("SecurityConfig"));
let cloned = sc;
assert!(cloned.reject_unauthenticated);
assert!(cloned.auth_key_seed.is_none());
}
#[test]
fn runtime_profile_custom() {
let config = RaptorQConfig::default();
let profile = RuntimeProfile::Custom(Box::new(config.clone()));
let expanded = profile.to_config();
assert_eq!(expanded.encoding.symbol_size, config.encoding.symbol_size);
}
#[test]
fn runtime_profile_debug_clone() {
let p = RuntimeProfile::Development;
let dbg = format!("{p:?}");
assert!(dbg.contains("Development"));
let cloned = p;
let dbg2 = format!("{cloned:?}");
assert!(dbg2.contains("Development"));
}
#[test]
fn config_loader_debug_clone_default() {
let loader = ConfigLoader::new();
let dbg = format!("{loader:?}");
assert!(dbg.contains("ConfigLoader"));
let cloned = loader;
let dbg2 = format!("{cloned:?}");
assert!(dbg2.contains("ConfigLoader"));
let default_loader = ConfigLoader::default();
let dbg3 = format!("{default_loader:?}");
assert!(dbg3.contains("ConfigLoader"));
}
#[test]
fn config_loader_builder_chain() {
let loader = ConfigLoader::new()
.profile(RuntimeProfile::Testing)
.override_value("encoding.symbol_size", "128");
let dbg = format!("{loader:?}");
assert!(dbg.contains("Testing"));
}
}