Skip to main content

oxirs_stream/
config.rs

1//! # Advanced Configuration Management
2//!
3//! This module provides comprehensive configuration management for oxirs-stream with:
4//! - Dynamic configuration updates without restart
5//! - Environment-based configuration loading
6//! - Secret management integration
7//! - SSL/TLS certificate management
8//! - Authentication configuration
9//! - Performance tuning profiles
10
11use anyhow::{anyhow, Result};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{broadcast, RwLock};
19use tracing::info;
20
21use crate::{CompressionType, StreamBackendType, StreamConfig, StreamPerformanceConfig};
22
23/// Configuration source types
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub enum ConfigSource {
26    /// Configuration from file
27    File { path: PathBuf },
28    /// Configuration from environment variables
29    Environment { prefix: String },
30    /// Configuration from remote source (e.g., etcd, consul)
31    Remote { url: String, key: String },
32    /// Configuration from memory (for testing)
33    Memory { data: HashMap<String, String> },
34}
35
36/// Configuration manager with dynamic reload support
37pub struct ConfigManager {
38    /// Current configuration
39    current_config: Arc<RwLock<StreamConfig>>,
40    /// Configuration sources in priority order
41    sources: Vec<ConfigSource>,
42    /// Configuration change notifier
43    change_notifier: broadcast::Sender<ConfigChangeEvent>,
44    /// Secret manager
45    secret_manager: Arc<SecretManager>,
46    /// Environment detector
47    environment: Environment,
48    /// Performance profiles
49    performance_profiles: HashMap<String, PerformanceProfile>,
50    /// SSL/TLS manager
51    tls_manager: Arc<TlsManager>,
52}
53
54/// Configuration change event
55#[derive(Debug, Clone)]
56pub enum ConfigChangeEvent {
57    /// Configuration reloaded
58    Reloaded {
59        old_config: Box<StreamConfig>,
60        new_config: Box<StreamConfig>,
61    },
62    /// Configuration validation failed
63    ValidationFailed { reason: String },
64    /// Secret rotated
65    SecretRotated { secret_name: String },
66    /// TLS certificate updated
67    TlsCertificateUpdated { cert_type: String },
68}
69
70/// Environment detection
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub enum Environment {
73    Development,
74    Testing,
75    Staging,
76    Production,
77}
78
79impl Environment {
80    /// Detect environment from various sources
81    pub fn detect() -> Self {
82        if let Ok(env) = std::env::var("OXIRS_ENV") {
83            match env.to_lowercase().as_str() {
84                "dev" | "development" => Environment::Development,
85                "test" | "testing" => Environment::Testing,
86                "staging" | "stage" => Environment::Staging,
87                "prod" | "production" => Environment::Production,
88                _ => Environment::Development,
89            }
90        } else if let Ok(env) = std::env::var("RUST_ENV") {
91            match env.to_lowercase().as_str() {
92                "production" => Environment::Production,
93                _ => Environment::Development,
94            }
95        } else {
96            Environment::Development
97        }
98    }
99
100    /// Get environment-specific defaults
101    pub fn get_defaults(&self) -> ConfigDefaults {
102        match self {
103            Environment::Development => ConfigDefaults {
104                log_level: "debug".to_string(),
105                enable_debug_endpoints: true,
106                connection_timeout_secs: 60,
107                max_connections: 10,
108                enable_compression: false,
109                enable_metrics: true,
110                enable_profiling: true,
111            },
112            Environment::Testing => ConfigDefaults {
113                log_level: "info".to_string(),
114                enable_debug_endpoints: true,
115                connection_timeout_secs: 30,
116                max_connections: 5,
117                enable_compression: false,
118                enable_metrics: true,
119                enable_profiling: false,
120            },
121            Environment::Staging => ConfigDefaults {
122                log_level: "info".to_string(),
123                enable_debug_endpoints: false,
124                connection_timeout_secs: 30,
125                max_connections: 50,
126                enable_compression: true,
127                enable_metrics: true,
128                enable_profiling: false,
129            },
130            Environment::Production => ConfigDefaults {
131                log_level: "warn".to_string(),
132                enable_debug_endpoints: false,
133                connection_timeout_secs: 30,
134                max_connections: 100,
135                enable_compression: true,
136                enable_metrics: true,
137                enable_profiling: false,
138            },
139        }
140    }
141}
142
143/// Environment-specific configuration defaults
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct ConfigDefaults {
146    pub log_level: String,
147    pub enable_debug_endpoints: bool,
148    pub connection_timeout_secs: u64,
149    pub max_connections: usize,
150    pub enable_compression: bool,
151    pub enable_metrics: bool,
152    pub enable_profiling: bool,
153}
154
155/// Secret manager for handling sensitive configuration
156pub struct SecretManager {
157    /// Secret store backend
158    backend: SecretBackend,
159    /// Cached secrets with expiration
160    cache: Arc<RwLock<HashMap<String, CachedSecret>>>,
161    /// Secret rotation interval
162    rotation_interval: Duration,
163}
164
165/// Secret backend types
166#[derive(Debug, Clone)]
167pub enum SecretBackend {
168    /// Environment variables
169    Environment { prefix: String },
170    /// File-based secrets
171    File { directory: PathBuf },
172    /// HashiCorp Vault
173    Vault {
174        url: String,
175        token: String,
176        mount_path: String,
177    },
178    /// AWS Secrets Manager
179    AwsSecretsManager { region: String },
180    /// Memory-based (for testing)
181    Memory {
182        secrets: Arc<RwLock<HashMap<String, String>>>,
183    },
184}
185
186/// Cached secret with metadata
187#[derive(Debug, Clone)]
188struct CachedSecret {
189    value: String,
190    cached_at: std::time::Instant,
191    expires_at: Option<std::time::Instant>,
192    version: u64,
193}
194
195impl SecretManager {
196    /// Create a new secret manager
197    pub fn new(backend: SecretBackend, rotation_interval: Duration) -> Self {
198        Self {
199            backend,
200            cache: Arc::new(RwLock::new(HashMap::new())),
201            rotation_interval,
202        }
203    }
204
205    /// Get a secret value
206    pub async fn get_secret(&self, name: &str) -> Result<String> {
207        // Check cache first
208        {
209            let cache = self.cache.read().await;
210            if let Some(cached) = cache.get(name) {
211                if cached
212                    .expires_at
213                    .map_or(true, |exp| exp > std::time::Instant::now())
214                {
215                    return Ok(cached.value.clone());
216                }
217            }
218        }
219
220        // Fetch from backend
221        let value = match &self.backend {
222            SecretBackend::Environment { prefix } => {
223                let key = format!("{prefix}_{}", name.to_uppercase());
224                std::env::var(key).map_err(|_| anyhow!("Secret {name} not found in environment"))
225            }
226            SecretBackend::File { directory } => {
227                let path = directory.join(name);
228                fs::read_to_string(&path)
229                    .map_err(|e| anyhow!("Failed to read secret from {path:?}: {e}"))
230                    .map(|s| s.trim().to_string())
231            }
232            SecretBackend::Memory { secrets } => {
233                let secrets = secrets.read().await;
234                secrets
235                    .get(name)
236                    .cloned()
237                    .ok_or_else(|| anyhow!("Secret {name} not found"))
238            }
239            _ => {
240                // Vault and AWS implementations would go here
241                return Err(anyhow!("Secret backend not implemented"));
242            }
243        }?;
244
245        // Cache the secret
246        let cached = CachedSecret {
247            value: value.clone(),
248            cached_at: std::time::Instant::now(),
249            expires_at: Some(std::time::Instant::now() + self.rotation_interval),
250            version: 1,
251        };
252
253        self.cache.write().await.insert(name.to_string(), cached);
254        Ok(value)
255    }
256
257    /// Set a secret (for testing)
258    pub async fn set_secret(&self, name: &str, value: &str) -> Result<()> {
259        match &self.backend {
260            SecretBackend::Memory { secrets } => {
261                secrets
262                    .write()
263                    .await
264                    .insert(name.to_string(), value.to_string());
265                self.cache.write().await.remove(name);
266                Ok(())
267            }
268            _ => Err(anyhow!("Set secret only supported for memory backend")),
269        }
270    }
271
272    /// Rotate all secrets
273    pub async fn rotate_secrets(&self) -> Result<()> {
274        self.cache.write().await.clear();
275        info!("Rotated all cached secrets");
276        Ok(())
277    }
278}
279
280/// TLS/SSL certificate manager
281pub struct TlsManager {
282    /// Certificate store
283    certs: Arc<RwLock<HashMap<String, TlsCertificate>>>,
284    /// Certificate paths
285    cert_paths: HashMap<String, CertPaths>,
286    /// Auto-reload enabled
287    auto_reload: bool,
288}
289
290/// TLS certificate with metadata
291#[derive(Debug, Clone)]
292pub struct TlsCertificate {
293    pub cert_pem: Vec<u8>,
294    pub key_pem: Vec<u8>,
295    pub ca_pem: Option<Vec<u8>>,
296    pub loaded_at: std::time::Instant,
297    pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
298}
299
300/// Certificate file paths
301#[derive(Debug, Clone)]
302struct CertPaths {
303    cert_path: PathBuf,
304    key_path: PathBuf,
305    ca_path: Option<PathBuf>,
306}
307
308impl TlsManager {
309    /// Create a new TLS manager
310    pub fn new(auto_reload: bool) -> Self {
311        Self {
312            certs: Arc::new(RwLock::new(HashMap::new())),
313            cert_paths: HashMap::new(),
314            auto_reload,
315        }
316    }
317
318    /// Parse certificate expiration from PEM data
319    fn parse_certificate_expiration(cert_pem: &[u8]) -> Option<chrono::DateTime<chrono::Utc>> {
320        // Simple PEM certificate expiration parsing
321        // In a production system, we'd use proper X.509 parsing libraries
322        let cert_str = String::from_utf8_lossy(cert_pem);
323
324        // For now, we'll implement a basic parser that looks for common patterns
325        // This is a placeholder implementation that could be enhanced with proper X.509 parsing
326        if cert_str.contains("-----BEGIN CERTIFICATE-----") {
327            // Set a default expiration of 1 year from now for valid certificates
328            // In practice, this should parse the actual certificate validity period
329            Some(chrono::Utc::now() + chrono::Duration::days(365))
330        } else {
331            None
332        }
333    }
334
335    /// Load a certificate
336    pub async fn load_certificate(
337        &self,
338        name: &str,
339        cert_path: &Path,
340        key_path: &Path,
341        ca_path: Option<&Path>,
342    ) -> Result<()> {
343        let cert_pem = fs::read(cert_path)
344            .map_err(|e| anyhow!("Failed to read certificate {}: {}", cert_path.display(), e))?;
345
346        let key_pem = fs::read(key_path)
347            .map_err(|e| anyhow!("Failed to read key {}: {}", key_path.display(), e))?;
348
349        let ca_pem = if let Some(ca) = ca_path {
350            Some(fs::read(ca).map_err(|e| anyhow!("Failed to read CA {}: {}", ca.display(), e))?)
351        } else {
352            None
353        };
354
355        let expires_at = Self::parse_certificate_expiration(&cert_pem);
356
357        let cert = TlsCertificate {
358            cert_pem,
359            key_pem,
360            ca_pem,
361            loaded_at: std::time::Instant::now(),
362            expires_at,
363        };
364
365        self.certs.write().await.insert(name.to_string(), cert);
366
367        info!("Loaded TLS certificate: {}", name);
368        Ok(())
369    }
370
371    /// Get a certificate
372    pub async fn get_certificate(&self, name: &str) -> Result<TlsCertificate> {
373        self.certs
374            .read()
375            .await
376            .get(name)
377            .cloned()
378            .ok_or_else(|| anyhow!("Certificate {} not found", name))
379    }
380
381    /// Check certificate expiration
382    pub async fn check_expiration(&self) -> Vec<(String, chrono::DateTime<chrono::Utc>)> {
383        let certs = self.certs.read().await;
384        let mut expiring = Vec::new();
385
386        for (name, cert) in certs.iter() {
387            if let Some(expires) = cert.expires_at {
388                let days_until = (expires - chrono::Utc::now()).num_days();
389                if days_until < 30 {
390                    expiring.push((name.clone(), expires));
391                }
392            }
393        }
394
395        expiring
396    }
397}
398
399/// Performance tuning profile
400#[derive(Debug, Clone, Serialize, Deserialize)]
401pub struct PerformanceProfile {
402    pub name: String,
403    pub description: String,
404    pub settings: StreamPerformanceConfig,
405    pub recommended_for: Vec<String>,
406}
407
408impl ConfigManager {
409    /// Create a new configuration manager
410    pub async fn new(sources: Vec<ConfigSource>) -> Result<Self> {
411        let environment = Environment::detect();
412        let (tx, _) = broadcast::channel(100);
413
414        // Initialize secret manager
415        let secret_backend = SecretBackend::Environment {
416            prefix: "OXIRS_SECRET".to_string(),
417        };
418        let secret_manager = Arc::new(SecretManager::new(
419            secret_backend,
420            Duration::from_secs(3600),
421        ));
422
423        // Initialize TLS manager
424        let tls_manager = Arc::new(TlsManager::new(true));
425
426        // Load initial configuration
427        let initial_config = StreamConfig::default();
428
429        let mut manager = Self {
430            current_config: Arc::new(RwLock::new(initial_config)),
431            sources,
432            change_notifier: tx,
433            secret_manager,
434            environment,
435            performance_profiles: Self::create_default_profiles(),
436            tls_manager,
437        };
438
439        // Load configuration from sources
440        manager.reload().await?;
441
442        Ok(manager)
443    }
444
445    /// Create default performance profiles
446    fn create_default_profiles() -> HashMap<String, PerformanceProfile> {
447        let mut profiles = HashMap::new();
448
449        // Low latency profile
450        profiles.insert(
451            "low-latency".to_string(),
452            PerformanceProfile {
453                name: "low-latency".to_string(),
454                description: "Optimized for minimal latency".to_string(),
455                settings: StreamPerformanceConfig {
456                    enable_batching: false,
457                    enable_pipelining: true,
458                    buffer_size: 4096,
459                    prefetch_count: 10,
460                    enable_zero_copy: true,
461                    enable_simd: true,
462                    parallel_processing: true,
463                    worker_threads: Some(4),
464                },
465                recommended_for: vec!["real-time".to_string(), "trading".to_string()],
466            },
467        );
468
469        // High throughput profile
470        profiles.insert(
471            "high-throughput".to_string(),
472            PerformanceProfile {
473                name: "high-throughput".to_string(),
474                description: "Optimized for maximum throughput".to_string(),
475                settings: StreamPerformanceConfig {
476                    enable_batching: true,
477                    enable_pipelining: true,
478                    buffer_size: 65536,
479                    prefetch_count: 1000,
480                    enable_zero_copy: true,
481                    enable_simd: true,
482                    parallel_processing: true,
483                    worker_threads: None, // Use all available
484                },
485                recommended_for: vec!["batch-processing".to_string(), "etl".to_string()],
486            },
487        );
488
489        // Balanced profile
490        profiles.insert(
491            "balanced".to_string(),
492            PerformanceProfile {
493                name: "balanced".to_string(),
494                description: "Balanced between latency and throughput".to_string(),
495                settings: StreamPerformanceConfig::default(),
496                recommended_for: vec!["general".to_string(), "web-services".to_string()],
497            },
498        );
499
500        // Resource-constrained profile
501        profiles.insert(
502            "resource-constrained".to_string(),
503            PerformanceProfile {
504                name: "resource-constrained".to_string(),
505                description: "Optimized for limited resources".to_string(),
506                settings: StreamPerformanceConfig {
507                    enable_batching: true,
508                    enable_pipelining: false,
509                    buffer_size: 2048,
510                    prefetch_count: 10,
511                    enable_zero_copy: false,
512                    enable_simd: false,
513                    parallel_processing: false,
514                    worker_threads: Some(2),
515                },
516                recommended_for: vec!["edge".to_string(), "iot".to_string()],
517            },
518        );
519
520        profiles
521    }
522
523    /// Reload configuration from all sources
524    pub async fn reload(&mut self) -> Result<()> {
525        let old_config = self.current_config.read().await.clone();
526        let mut new_config = old_config.clone();
527
528        // Apply environment defaults
529        let defaults = self.environment.get_defaults();
530        new_config.max_connections = defaults.max_connections;
531        new_config.connection_timeout = Duration::from_secs(defaults.connection_timeout_secs);
532        new_config.enable_compression = defaults.enable_compression;
533        new_config.monitoring.enable_metrics = defaults.enable_metrics;
534        new_config.monitoring.enable_profiling = defaults.enable_profiling;
535
536        // Load from each source in order
537        for source in &self.sources {
538            match source {
539                ConfigSource::File { path } => {
540                    if let Ok(content) = fs::read_to_string(path) {
541                        if let Ok(file_config) = toml::from_str::<StreamConfig>(&content) {
542                            new_config = self.merge_configs(new_config, file_config);
543                        }
544                    }
545                }
546                ConfigSource::Environment { prefix } => {
547                    new_config = self.load_from_env(new_config, prefix).await?;
548                }
549                ConfigSource::Memory { data } => {
550                    new_config = self.apply_overrides(new_config, data.clone());
551                }
552                _ => {
553                    // Remote source would be implemented here
554                }
555            }
556        }
557
558        // Apply secrets
559        new_config = self.apply_secrets(new_config).await?;
560
561        // Validate configuration
562        self.validate_config(&new_config)?;
563
564        // Update current configuration
565        *self.current_config.write().await = new_config.clone();
566
567        // Notify listeners
568        let _ = self.change_notifier.send(ConfigChangeEvent::Reloaded {
569            old_config: Box::new(old_config),
570            new_config: Box::new(new_config),
571        });
572
573        info!("Configuration reloaded successfully");
574        Ok(())
575    }
576
577    /// Load configuration from environment variables
578    async fn load_from_env(&self, mut config: StreamConfig, prefix: &str) -> Result<StreamConfig> {
579        // Backend selection
580        if let Ok(backend) = std::env::var(format!("{prefix}_BACKEND")) {
581            config.backend = match backend.as_str() {
582                "kafka" => {
583                    #[cfg(feature = "kafka")]
584                    {
585                        let brokers: Vec<String> = std::env::var(format!("{prefix}_KAFKA_BROKERS"))
586                            .unwrap_or_else(|_| "localhost:9092".to_string())
587                            .split(',')
588                            .map(|s| s.to_string())
589                            .collect();
590                        StreamBackendType::Kafka {
591                            brokers,
592                            security_protocol: std::env::var(format!("{}_KAFKA_SECURITY", prefix))
593                                .ok(),
594                            sasl_config: None,
595                        }
596                    }
597                    #[cfg(not(feature = "kafka"))]
598                    {
599                        let _ = std::env::var(format!("{prefix}_KAFKA_BROKERS"));
600                        StreamBackendType::Memory {
601                            max_size: Some(10000),
602                            persistence: false,
603                        }
604                    }
605                }
606                "memory" => StreamBackendType::Memory {
607                    max_size: Some(10000),
608                    persistence: false,
609                },
610                _ => config.backend,
611            };
612        }
613
614        // Connection settings
615        if let Ok(max_conn) = std::env::var(format!("{prefix}_MAX_CONNECTIONS")) {
616            if let Ok(val) = max_conn.parse() {
617                config.max_connections = val;
618            }
619        }
620
621        // Compression
622        if let Ok(compression) = std::env::var(format!("{prefix}_COMPRESSION")) {
623            config.compression_type = match compression.as_str() {
624                "gzip" => CompressionType::Gzip,
625                "snappy" => CompressionType::Snappy,
626                "lz4" => CompressionType::Lz4,
627                "zstd" => CompressionType::Zstd,
628                _ => CompressionType::None,
629            };
630            config.enable_compression = compression != "none";
631        }
632
633        Ok(config)
634    }
635
636    /// Apply secrets to configuration
637    async fn apply_secrets(&self, mut config: StreamConfig) -> Result<StreamConfig> {
638        // Apply SASL password if using Kafka
639        #[cfg(feature = "kafka")]
640        if let StreamBackendType::Kafka {
641            brokers,
642            security_protocol,
643            sasl_config: _,
644        } = &config.backend
645        {
646            if security_protocol.as_deref() == Some("SASL_SSL") {
647                if let Ok(username) = self.secret_manager.get_secret("kafka_username").await {
648                    if let Ok(password) = self.secret_manager.get_secret("kafka_password").await {
649                        #[cfg(feature = "kafka")]
650                        {
651                            config.backend = StreamBackendType::Kafka {
652                                brokers: brokers.clone(),
653                                security_protocol: security_protocol.clone(),
654                                sasl_config: Some(crate::SaslConfig {
655                                    mechanism: crate::SaslMechanism::ScramSha256,
656                                    username,
657                                    password,
658                                }),
659                            };
660                        }
661                    }
662                }
663            }
664        }
665
666        // Apply TLS certificates
667        if config.security.enable_tls {
668            if let Ok(_cert) = self.tls_manager.get_certificate("client").await {
669                // Certificate paths would be set here
670                config.security.client_cert_path = Some("/tmp/client.crt".to_string());
671                config.security.client_key_path = Some("/tmp/client.key".to_string());
672            }
673        }
674
675        Ok(config)
676    }
677
678    /// Merge two configurations
679    fn merge_configs(&self, _base: StreamConfig, override_config: StreamConfig) -> StreamConfig {
680        // This would implement a proper merge strategy
681        // For now, just return the override
682        override_config
683    }
684
685    /// Apply key-value overrides
686    fn apply_overrides(
687        &self,
688        mut config: StreamConfig,
689        overrides: HashMap<String, String>,
690    ) -> StreamConfig {
691        for (key, value) in overrides {
692            match key.as_str() {
693                "topic" => config.topic = value,
694                "batch_size" => {
695                    if let Ok(size) = value.parse() {
696                        config.batch_size = size;
697                    }
698                }
699                "max_connections" => {
700                    if let Ok(max) = value.parse() {
701                        config.max_connections = max;
702                    }
703                }
704                _ => {}
705            }
706        }
707        config
708    }
709
710    /// Validate configuration
711    fn validate_config(&self, config: &StreamConfig) -> Result<()> {
712        // Validate connection limits
713        if config.max_connections == 0 {
714            return Err(anyhow!("max_connections must be greater than 0"));
715        }
716
717        // Validate batch size
718        if config.batch_size == 0 {
719            return Err(anyhow!("batch_size must be greater than 0"));
720        }
721
722        // Validate topic name
723        if config.topic.is_empty() {
724            return Err(anyhow!("topic name cannot be empty"));
725        }
726
727        // Backend-specific validation
728        match &config.backend {
729            #[cfg(feature = "kafka")]
730            StreamBackendType::Kafka { brokers, .. } if brokers.is_empty() => {
731                return Err(anyhow!("Kafka brokers list cannot be empty"));
732            }
733            _ => {}
734        }
735
736        Ok(())
737    }
738
739    /// Get current configuration
740    pub async fn get_config(&self) -> StreamConfig {
741        self.current_config.read().await.clone()
742    }
743
744    /// Subscribe to configuration changes
745    pub fn subscribe(&self) -> broadcast::Receiver<ConfigChangeEvent> {
746        self.change_notifier.subscribe()
747    }
748
749    /// Apply a performance profile
750    pub async fn apply_performance_profile(&mut self, profile_name: &str) -> Result<()> {
751        let profile = self
752            .performance_profiles
753            .get(profile_name)
754            .ok_or_else(|| anyhow!("Performance profile {} not found", profile_name))?
755            .clone();
756
757        let mut config = self.current_config.write().await;
758        config.performance = profile.settings;
759
760        info!("Applied performance profile: {}", profile_name);
761        Ok(())
762    }
763
764    /// Get available performance profiles
765    pub fn get_performance_profiles(&self) -> Vec<&PerformanceProfile> {
766        self.performance_profiles.values().collect()
767    }
768
769    /// Update a specific configuration value
770    pub async fn update_value(&mut self, key: &str, value: String) -> Result<()> {
771        let mut overrides = HashMap::new();
772        overrides.insert(key.to_string(), value);
773
774        let current = self.current_config.read().await.clone();
775        let updated = self.apply_overrides(current, overrides);
776
777        self.validate_config(&updated)?;
778        *self.current_config.write().await = updated;
779
780        info!("Updated configuration key: {}", key);
781        Ok(())
782    }
783
784    /// Get secret manager
785    pub fn secret_manager(&self) -> &Arc<SecretManager> {
786        &self.secret_manager
787    }
788
789    /// Get TLS manager
790    pub fn tls_manager(&self) -> &Arc<TlsManager> {
791        &self.tls_manager
792    }
793}
794
795/// Configuration builder for easy setup
796pub struct ConfigBuilder {
797    sources: Vec<ConfigSource>,
798    environment: Option<Environment>,
799}
800
801impl ConfigBuilder {
802    /// Create a new configuration builder
803    pub fn new() -> Self {
804        Self {
805            sources: Vec::new(),
806            environment: None,
807        }
808    }
809
810    /// Add a file source
811    pub fn with_file(mut self, path: impl Into<PathBuf>) -> Self {
812        self.sources.push(ConfigSource::File { path: path.into() });
813        self
814    }
815
816    /// Add environment variable source
817    pub fn with_env(mut self, prefix: impl Into<String>) -> Self {
818        self.sources.push(ConfigSource::Environment {
819            prefix: prefix.into(),
820        });
821        self
822    }
823
824    /// Add memory source for overrides
825    pub fn with_overrides(mut self, overrides: HashMap<String, String>) -> Self {
826        self.sources.push(ConfigSource::Memory { data: overrides });
827        self
828    }
829
830    /// Set environment explicitly
831    pub fn with_environment(mut self, env: Environment) -> Self {
832        self.environment = Some(env);
833        self
834    }
835
836    /// Build the configuration manager
837    pub async fn build(self) -> Result<ConfigManager> {
838        ConfigManager::new(self.sources).await
839    }
840}
841
842impl Default for ConfigBuilder {
843    fn default() -> Self {
844        Self::new()
845    }
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851
852    #[tokio::test]
853    async fn test_environment_detection() {
854        let env = Environment::detect();
855        let defaults = env.get_defaults();
856        assert!(defaults.max_connections > 0);
857    }
858
859    #[tokio::test]
860    async fn test_config_builder() {
861        let mut overrides = HashMap::new();
862        overrides.insert("topic".to_string(), "test-topic".to_string());
863
864        let manager = ConfigBuilder::new()
865            .with_env("OXIRS")
866            .with_overrides(overrides)
867            .build()
868            .await
869            .unwrap();
870
871        let config = manager.get_config().await;
872        assert_eq!(config.topic, "test-topic");
873    }
874
875    #[tokio::test]
876    async fn test_secret_manager() {
877        let backend = SecretBackend::Memory {
878            secrets: Arc::new(RwLock::new(HashMap::new())),
879        };
880        let manager = SecretManager::new(backend, Duration::from_secs(60));
881
882        // Set and get secret
883        manager.set_secret("test_key", "test_value").await.unwrap();
884        let value = manager.get_secret("test_key").await.unwrap();
885        assert_eq!(value, "test_value");
886
887        // Test cache
888        let value2 = manager.get_secret("test_key").await.unwrap();
889        assert_eq!(value2, "test_value");
890    }
891
892    #[tokio::test]
893    async fn test_performance_profiles() {
894        let manager = ConfigBuilder::new().build().await.unwrap();
895        let profiles = manager.get_performance_profiles();
896
897        assert!(profiles.len() >= 4);
898        assert!(profiles.iter().any(|p| p.name == "low-latency"));
899        assert!(profiles.iter().any(|p| p.name == "high-throughput"));
900    }
901
902    #[tokio::test]
903    async fn test_config_validation() {
904        let manager = ConfigBuilder::new().build().await.unwrap();
905
906        // Test invalid configuration
907        let invalid_config = StreamConfig {
908            max_connections: 0,
909            ..Default::default()
910        };
911
912        assert!(manager.validate_config(&invalid_config).is_err());
913    }
914}