#![allow(
clippy::missing_errors_doc,
clippy::missing_panics_doc,
clippy::return_self_not_must_use
)]
#[allow(clippy::wildcard_imports)]
use crate::message_router::domain_types::*;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ConfigError {
#[error("Invalid configuration: {field} - {reason}")]
ValidationError { field: String, reason: String },
#[error("I/O error: {source}")]
IoError {
#[from]
source: std::io::Error,
},
#[error("Serialization error: {source}")]
SerializationError {
#[from]
source: serde_json::Error,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObservabilityConfig {
pub trace_sampling_ratio: TraceSamplingRatio,
pub enable_metrics: bool,
pub enable_detailed_logs: bool,
}
impl ObservabilityConfig {
pub fn development() -> Self {
Self {
trace_sampling_ratio: TraceSamplingRatio::try_new(1.0).unwrap(),
enable_metrics: true,
enable_detailed_logs: true,
}
}
pub fn production() -> Self {
Self {
trace_sampling_ratio: TraceSamplingRatio::try_new(0.01).unwrap(),
enable_metrics: true,
enable_detailed_logs: false,
}
}
pub fn testing() -> Self {
Self {
trace_sampling_ratio: TraceSamplingRatio::try_new(0.0).unwrap(),
enable_metrics: false,
enable_detailed_logs: false,
}
}
}
impl Default for ObservabilityConfig {
fn default() -> Self {
Self::development()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceConfig {
pub enable_batching: bool,
pub enable_connection_pooling: bool,
pub connection_pool_size: usize,
pub enable_compression: bool,
}
impl PerformanceConfig {
pub fn development() -> Self {
Self {
enable_batching: true,
enable_connection_pooling: false,
connection_pool_size: 5,
enable_compression: false,
}
}
pub fn production() -> Self {
Self {
enable_batching: true,
enable_connection_pooling: true,
connection_pool_size: 50,
enable_compression: true,
}
}
pub fn testing() -> Self {
Self {
enable_batching: false,
enable_connection_pooling: false,
connection_pool_size: 1,
enable_compression: false,
}
}
}
impl Default for PerformanceConfig {
fn default() -> Self {
Self::development()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityConfig {
pub enable_message_validation: bool,
pub max_message_size_bytes: usize,
pub enable_rate_limiting: bool,
pub rate_limit_messages_per_second: usize,
}
impl SecurityConfig {
pub fn development() -> Self {
Self {
enable_message_validation: true,
max_message_size_bytes: 1_048_576, enable_rate_limiting: false,
rate_limit_messages_per_second: 1000,
}
}
pub fn production() -> Self {
Self {
enable_message_validation: true,
max_message_size_bytes: 10_485_760, enable_rate_limiting: true,
rate_limit_messages_per_second: 10_000,
}
}
pub fn testing() -> Self {
Self {
enable_message_validation: true,
max_message_size_bytes: 1024,
enable_rate_limiting: false,
rate_limit_messages_per_second: 100,
}
}
}
impl Default for SecurityConfig {
fn default() -> Self {
Self::development()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
pub storage_path: Option<PathBuf>,
pub enable_persistence: bool,
pub storage_cleanup_interval_ms: u64,
}
impl StorageConfig {
pub fn development() -> Self {
Self {
storage_path: None,
enable_persistence: false,
storage_cleanup_interval_ms: 60_000,
}
}
pub fn production() -> Self {
Self {
storage_path: Some(PathBuf::from("./data/message_router")),
enable_persistence: true,
storage_cleanup_interval_ms: 3_600_000, }
}
pub fn testing() -> Self {
Self {
storage_path: None,
enable_persistence: false,
storage_cleanup_interval_ms: 30_000,
}
}
}
impl Default for StorageConfig {
fn default() -> Self {
Self::development()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RouterConfig {
pub inbound_queue_size: ChannelCapacity,
pub outbound_queue_size: ChannelCapacity,
pub message_timeout_ms: MessageTimeoutMs,
pub message_batch_size: MessageBatchSize,
pub worker_thread_count: WorkerThreadCount,
pub max_retries: MaxRetries,
pub retry_delay_ms: RetryDelayMs,
pub retry_backoff_factor: RetryBackoffFactor,
pub dead_letter_queue_size: DeadLetterQueueSize,
pub circuit_breaker_threshold: CircuitBreakerThreshold,
pub circuit_breaker_timeout_ms: CircuitBreakerTimeoutMs,
pub conversation_timeout_ms: ConversationTimeoutMs,
pub max_conversation_participants: MaxConversationParticipants,
pub health_check_interval_ms: HealthCheckIntervalMs,
pub observability: ObservabilityConfig,
pub storage: StorageConfig,
pub performance: PerformanceConfig,
pub security: SecurityConfig,
}
impl RouterConfig {
pub fn trace_sampling_ratio(&self) -> TraceSamplingRatio {
self.observability.trace_sampling_ratio
}
pub fn enable_metrics(&self) -> bool {
self.observability.enable_metrics
}
pub fn enable_detailed_logs(&self) -> bool {
self.observability.enable_detailed_logs
}
pub fn storage_path(&self) -> Option<&PathBuf> {
self.storage.storage_path.as_ref()
}
pub fn enable_persistence(&self) -> bool {
self.storage.enable_persistence
}
pub fn storage_cleanup_interval_ms(&self) -> u64 {
self.storage.storage_cleanup_interval_ms
}
pub fn enable_batching(&self) -> bool {
self.performance.enable_batching
}
pub fn enable_connection_pooling(&self) -> bool {
self.performance.enable_connection_pooling
}
pub fn connection_pool_size(&self) -> usize {
self.performance.connection_pool_size
}
pub fn enable_compression(&self) -> bool {
self.performance.enable_compression
}
pub fn enable_message_validation(&self) -> bool {
self.security.enable_message_validation
}
pub fn max_message_size_bytes(&self) -> usize {
self.security.max_message_size_bytes
}
pub fn enable_rate_limiting(&self) -> bool {
self.security.enable_rate_limiting
}
pub fn rate_limit_messages_per_second(&self) -> usize {
self.security.rate_limit_messages_per_second
}
}
impl RouterConfig {
pub fn development() -> Self {
Self {
inbound_queue_size: ChannelCapacity::try_new(1_000).unwrap(),
outbound_queue_size: ChannelCapacity::try_new(1_000).unwrap(),
message_timeout_ms: MessageTimeoutMs::try_new(10_000).unwrap(), message_batch_size: MessageBatchSize::try_new(10).unwrap(),
worker_thread_count: WorkerThreadCount::try_new(2).unwrap(),
max_retries: MaxRetries::try_new(2).unwrap(),
retry_delay_ms: RetryDelayMs::try_new(500).unwrap(),
retry_backoff_factor: RetryBackoffFactor::try_new(1.5).unwrap(),
dead_letter_queue_size: DeadLetterQueueSize::try_new(10_000).unwrap(),
circuit_breaker_threshold: CircuitBreakerThreshold::try_new(3).unwrap(),
circuit_breaker_timeout_ms: CircuitBreakerTimeoutMs::try_new(30_000).unwrap(),
conversation_timeout_ms: ConversationTimeoutMs::try_new(600_000).unwrap(), max_conversation_participants: MaxConversationParticipants::try_new(5).unwrap(),
health_check_interval_ms: HealthCheckIntervalMs::try_new(10_000).unwrap(),
observability: ObservabilityConfig::development(),
storage: StorageConfig::development(),
performance: PerformanceConfig::development(),
security: SecurityConfig::development(),
}
}
pub fn production() -> Self {
Self {
inbound_queue_size: ChannelCapacity::try_new(100_000).unwrap(),
outbound_queue_size: ChannelCapacity::try_new(50_000).unwrap(),
message_timeout_ms: MessageTimeoutMs::try_new(30_000).unwrap(), message_batch_size: MessageBatchSize::try_new(1000).unwrap(),
worker_thread_count: WorkerThreadCount::try_new(8).unwrap(),
max_retries: MaxRetries::try_new(3).unwrap(),
retry_delay_ms: RetryDelayMs::try_new(1000).unwrap(),
retry_backoff_factor: RetryBackoffFactor::try_new(2.0).unwrap(),
dead_letter_queue_size: DeadLetterQueueSize::try_new(1_000_000).unwrap(),
circuit_breaker_threshold: CircuitBreakerThreshold::try_new(10).unwrap(),
circuit_breaker_timeout_ms: CircuitBreakerTimeoutMs::try_new(60_000).unwrap(),
conversation_timeout_ms: ConversationTimeoutMs::default(), max_conversation_participants: MaxConversationParticipants::try_new(20).unwrap(),
health_check_interval_ms: HealthCheckIntervalMs::try_new(60_000).unwrap(),
observability: ObservabilityConfig::production(),
storage: StorageConfig::production(),
performance: PerformanceConfig::production(),
security: SecurityConfig::production(),
}
}
pub fn builder() -> RouterConfigBuilder {
RouterConfigBuilder::new()
}
pub fn validate(&self) -> Result<(), ConfigError> {
if self.inbound_queue_size.as_usize() < 10 {
return Err(ConfigError::ValidationError {
field: "inbound_queue_size".to_string(),
reason: "Must be at least 10".to_string(),
});
}
if self.outbound_queue_size.as_usize() < 10 {
return Err(ConfigError::ValidationError {
field: "outbound_queue_size".to_string(),
reason: "Must be at least 10".to_string(),
});
}
if self.message_timeout_ms.as_u64() < 1000 {
return Err(ConfigError::ValidationError {
field: "message_timeout_ms".to_string(),
reason: "Must be at least 1 second".to_string(),
});
}
if self.conversation_timeout_ms.as_u64() < 60_000 {
return Err(ConfigError::ValidationError {
field: "conversation_timeout_ms".to_string(),
reason: "Must be at least 1 minute".to_string(),
});
}
if self.worker_thread_count.as_usize() > num_cpus::get() * 2 {
return Err(ConfigError::ValidationError {
field: "worker_thread_count".to_string(),
reason: format!("Should not exceed 2x CPU cores ({})", num_cpus::get() * 2),
});
}
if self.message_batch_size.as_usize() > self.inbound_queue_size.as_usize() / 10 {
return Err(ConfigError::ValidationError {
field: "message_batch_size".to_string(),
reason: "Should not exceed 10% of inbound queue size".to_string(),
});
}
if self.retry_delay_ms.as_u64() >= self.message_timeout_ms.as_u64() {
return Err(ConfigError::ValidationError {
field: "retry_delay_ms".to_string(),
reason: "Should be less than message timeout".to_string(),
});
}
if self.circuit_breaker_timeout_ms.as_u64()
< self.retry_delay_ms.as_u64() * u64::from(self.max_retries.as_u8())
{
return Err(ConfigError::ValidationError {
field: "circuit_breaker_timeout_ms".to_string(),
reason: "Should be longer than total retry time".to_string(),
});
}
if self.storage.enable_persistence && self.storage.storage_path.is_none() {
return Err(ConfigError::ValidationError {
field: "storage_path".to_string(),
reason: "Must specify storage path when persistence is enabled".to_string(),
});
}
if self.security.enable_rate_limiting && self.security.rate_limit_messages_per_second == 0 {
return Err(ConfigError::ValidationError {
field: "rate_limit_messages_per_second".to_string(),
reason: "Must be greater than 0 when rate limiting is enabled".to_string(),
});
}
Ok(())
}
pub fn save_to_file<P: AsRef<std::path::Path>>(&self, path: P) -> Result<(), ConfigError> {
let json = serde_json::to_string_pretty(self)?;
std::fs::write(path, json)?;
Ok(())
}
pub fn load_from_file<P: AsRef<std::path::Path>>(path: P) -> Result<Self, ConfigError> {
let json = std::fs::read_to_string(path)?;
let config: Self = serde_json::from_str(&json)?;
config.validate()?;
Ok(config)
}
pub fn testing() -> Self {
Self {
inbound_queue_size: ChannelCapacity::try_new(10000).unwrap(),
outbound_queue_size: ChannelCapacity::try_new(10000).unwrap(),
message_timeout_ms: MessageTimeoutMs::try_new(5_000).unwrap(),
message_batch_size: MessageBatchSize::try_new(5).unwrap(),
worker_thread_count: WorkerThreadCount::try_new(1).unwrap(),
max_retries: MaxRetries::try_new(1).unwrap(),
retry_delay_ms: RetryDelayMs::try_new(100).unwrap(),
retry_backoff_factor: RetryBackoffFactor::try_new(1.1).unwrap(),
dead_letter_queue_size: DeadLetterQueueSize::try_new(10_000).unwrap(),
circuit_breaker_threshold: CircuitBreakerThreshold::try_new(1).unwrap(),
circuit_breaker_timeout_ms: CircuitBreakerTimeoutMs::try_new(5_000).unwrap(),
conversation_timeout_ms: ConversationTimeoutMs::try_new(300_000).unwrap(), max_conversation_participants: MaxConversationParticipants::try_new(3).unwrap(),
health_check_interval_ms: HealthCheckIntervalMs::try_new(5_000).unwrap(),
observability: ObservabilityConfig::testing(),
storage: StorageConfig::testing(),
performance: PerformanceConfig::testing(),
security: SecurityConfig::testing(),
}
}
}
impl Default for RouterConfig {
fn default() -> Self {
Self::development()
}
}
pub struct RouterConfigBuilder {
config: RouterConfig,
}
impl RouterConfigBuilder {
pub fn new() -> Self {
Self {
config: RouterConfig::development(),
}
}
pub fn inbound_queue_size(mut self, size: ChannelCapacity) -> Self {
self.config.inbound_queue_size = size;
self
}
pub fn outbound_queue_size(mut self, size: ChannelCapacity) -> Self {
self.config.outbound_queue_size = size;
self
}
pub fn message_timeout_ms(mut self, timeout: MessageTimeoutMs) -> Self {
self.config.message_timeout_ms = timeout;
self
}
pub fn message_batch_size(mut self, size: MessageBatchSize) -> Self {
self.config.message_batch_size = size;
self
}
pub fn worker_thread_count(mut self, count: WorkerThreadCount) -> Self {
self.config.worker_thread_count = count;
self
}
pub fn max_retries(mut self, retries: MaxRetries) -> Self {
self.config.max_retries = retries;
self
}
pub fn retry_delay_ms(mut self, delay: RetryDelayMs) -> Self {
self.config.retry_delay_ms = delay;
self
}
pub fn conversation_timeout_ms(mut self, timeout: ConversationTimeoutMs) -> Self {
self.config.conversation_timeout_ms = timeout;
self
}
pub fn trace_sampling_ratio(mut self, ratio: TraceSamplingRatio) -> Self {
self.config.observability.trace_sampling_ratio = ratio;
self
}
pub fn enable_persistence(mut self, enable: bool) -> Self {
self.config.storage.enable_persistence = enable;
self
}
pub fn storage_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
self.config.storage.storage_path = Some(path.into());
self
}
pub fn enable_batching(mut self, enable: bool) -> Self {
self.config.performance.enable_batching = enable;
self
}
pub fn enable_connection_pooling(mut self, enable: bool) -> Self {
self.config.performance.enable_connection_pooling = enable;
self
}
pub fn connection_pool_size(mut self, size: usize) -> Self {
self.config.performance.connection_pool_size = size;
self
}
pub fn enable_metrics(mut self, enable: bool) -> Self {
self.config.observability.enable_metrics = enable;
self
}
pub fn enable_detailed_logs(mut self, enable: bool) -> Self {
self.config.observability.enable_detailed_logs = enable;
self
}
pub fn enable_rate_limiting(mut self, enable: bool) -> Self {
self.config.security.enable_rate_limiting = enable;
self
}
pub fn rate_limit_messages_per_second(mut self, rate: usize) -> Self {
self.config.security.rate_limit_messages_per_second = rate;
self
}
pub fn observability(mut self, observability: ObservabilityConfig) -> Self {
self.config.observability = observability;
self
}
pub fn storage(mut self, storage: StorageConfig) -> Self {
self.config.storage = storage;
self
}
pub fn performance(mut self, performance: PerformanceConfig) -> Self {
self.config.performance = performance;
self
}
pub fn security(mut self, security: SecurityConfig) -> Self {
self.config.security = security;
self
}
pub fn build(self) -> Result<RouterConfig, ConfigError> {
self.config.validate()?;
Ok(self.config)
}
}
impl Default for RouterConfigBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
#[test]
fn test_development_config_is_valid() {
let config = RouterConfig::development();
assert!(config.validate().is_ok());
}
#[test]
fn test_production_config_is_valid() {
let config = RouterConfig::production();
assert!(config.validate().is_ok());
}
#[test]
fn test_testing_config_is_valid() {
let config = RouterConfig::testing();
assert!(config.validate().is_ok());
}
#[test]
fn test_config_builder() {
let config = RouterConfig::builder()
.inbound_queue_size(ChannelCapacity::try_new(5000).unwrap())
.message_timeout_ms(MessageTimeoutMs::try_new(15000).unwrap())
.enable_persistence(false)
.build()
.unwrap();
assert_eq!(config.inbound_queue_size.as_usize(), 5000);
assert_eq!(config.message_timeout_ms.as_u64(), 15000);
assert!(!config.enable_persistence());
}
#[test]
fn test_config_validation_errors() {
let invalid_config = RouterConfig::builder()
.inbound_queue_size(ChannelCapacity::try_new(5).unwrap()) .build();
assert!(invalid_config.is_err());
let invalid_config = RouterConfig::builder()
.message_timeout_ms(MessageTimeoutMs::try_new(1000).unwrap()) .retry_delay_ms(RetryDelayMs::try_new(2000).unwrap()) .build();
assert!(invalid_config.is_err());
}
#[test]
fn test_config_serialization() {
let config = RouterConfig::development();
let json = serde_json::to_string(&config).unwrap();
let deserialized: RouterConfig = serde_json::from_str(&json).unwrap();
assert_eq!(config.inbound_queue_size, deserialized.inbound_queue_size);
assert_eq!(config.message_timeout_ms, deserialized.message_timeout_ms);
assert_eq!(
config.enable_persistence(),
deserialized.enable_persistence()
);
}
#[test]
fn test_config_file_operations() {
let config = RouterConfig::development();
let temp_file = NamedTempFile::new().unwrap();
config.save_to_file(temp_file.path()).unwrap();
let loaded_config = RouterConfig::load_from_file(temp_file.path()).unwrap();
assert_eq!(config.inbound_queue_size, loaded_config.inbound_queue_size);
assert_eq!(config.message_timeout_ms, loaded_config.message_timeout_ms);
}
}