1use anyhow::{anyhow, Result};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::{broadcast, RwLock};
19use tracing::info;
20
21use crate::{CompressionType, StreamBackendType, StreamConfig, StreamPerformanceConfig};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub enum ConfigSource {
26 File { path: PathBuf },
28 Environment { prefix: String },
30 Remote { url: String, key: String },
32 Memory { data: HashMap<String, String> },
34}
35
36pub struct ConfigManager {
38 current_config: Arc<RwLock<StreamConfig>>,
40 sources: Vec<ConfigSource>,
42 change_notifier: broadcast::Sender<ConfigChangeEvent>,
44 secret_manager: Arc<SecretManager>,
46 environment: Environment,
48 performance_profiles: HashMap<String, PerformanceProfile>,
50 tls_manager: Arc<TlsManager>,
52}
53
54#[derive(Debug, Clone)]
56pub enum ConfigChangeEvent {
57 Reloaded {
59 old_config: Box<StreamConfig>,
60 new_config: Box<StreamConfig>,
61 },
62 ValidationFailed { reason: String },
64 SecretRotated { secret_name: String },
66 TlsCertificateUpdated { cert_type: String },
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub enum Environment {
73 Development,
74 Testing,
75 Staging,
76 Production,
77}
78
79impl Environment {
80 pub fn detect() -> Self {
82 if let Ok(env) = std::env::var("OXIRS_ENV") {
83 match env.to_lowercase().as_str() {
84 "dev" | "development" => Environment::Development,
85 "test" | "testing" => Environment::Testing,
86 "staging" | "stage" => Environment::Staging,
87 "prod" | "production" => Environment::Production,
88 _ => Environment::Development,
89 }
90 } else if let Ok(env) = std::env::var("RUST_ENV") {
91 match env.to_lowercase().as_str() {
92 "production" => Environment::Production,
93 _ => Environment::Development,
94 }
95 } else {
96 Environment::Development
97 }
98 }
99
100 pub fn get_defaults(&self) -> ConfigDefaults {
102 match self {
103 Environment::Development => ConfigDefaults {
104 log_level: "debug".to_string(),
105 enable_debug_endpoints: true,
106 connection_timeout_secs: 60,
107 max_connections: 10,
108 enable_compression: false,
109 enable_metrics: true,
110 enable_profiling: true,
111 },
112 Environment::Testing => ConfigDefaults {
113 log_level: "info".to_string(),
114 enable_debug_endpoints: true,
115 connection_timeout_secs: 30,
116 max_connections: 5,
117 enable_compression: false,
118 enable_metrics: true,
119 enable_profiling: false,
120 },
121 Environment::Staging => ConfigDefaults {
122 log_level: "info".to_string(),
123 enable_debug_endpoints: false,
124 connection_timeout_secs: 30,
125 max_connections: 50,
126 enable_compression: true,
127 enable_metrics: true,
128 enable_profiling: false,
129 },
130 Environment::Production => ConfigDefaults {
131 log_level: "warn".to_string(),
132 enable_debug_endpoints: false,
133 connection_timeout_secs: 30,
134 max_connections: 100,
135 enable_compression: true,
136 enable_metrics: true,
137 enable_profiling: false,
138 },
139 }
140 }
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct ConfigDefaults {
146 pub log_level: String,
147 pub enable_debug_endpoints: bool,
148 pub connection_timeout_secs: u64,
149 pub max_connections: usize,
150 pub enable_compression: bool,
151 pub enable_metrics: bool,
152 pub enable_profiling: bool,
153}
154
155pub struct SecretManager {
157 backend: SecretBackend,
159 cache: Arc<RwLock<HashMap<String, CachedSecret>>>,
161 rotation_interval: Duration,
163}
164
165#[derive(Debug, Clone)]
167pub enum SecretBackend {
168 Environment { prefix: String },
170 File { directory: PathBuf },
172 Vault {
174 url: String,
175 token: String,
176 mount_path: String,
177 },
178 AwsSecretsManager { region: String },
180 Memory {
182 secrets: Arc<RwLock<HashMap<String, String>>>,
183 },
184}
185
186#[derive(Debug, Clone)]
188struct CachedSecret {
189 value: String,
190 cached_at: std::time::Instant,
191 expires_at: Option<std::time::Instant>,
192 version: u64,
193}
194
195impl SecretManager {
196 pub fn new(backend: SecretBackend, rotation_interval: Duration) -> Self {
198 Self {
199 backend,
200 cache: Arc::new(RwLock::new(HashMap::new())),
201 rotation_interval,
202 }
203 }
204
205 pub async fn get_secret(&self, name: &str) -> Result<String> {
207 {
209 let cache = self.cache.read().await;
210 if let Some(cached) = cache.get(name) {
211 if cached
212 .expires_at
213 .map_or(true, |exp| exp > std::time::Instant::now())
214 {
215 return Ok(cached.value.clone());
216 }
217 }
218 }
219
220 let value = match &self.backend {
222 SecretBackend::Environment { prefix } => {
223 let key = format!("{prefix}_{}", name.to_uppercase());
224 std::env::var(key).map_err(|_| anyhow!("Secret {name} not found in environment"))
225 }
226 SecretBackend::File { directory } => {
227 let path = directory.join(name);
228 fs::read_to_string(&path)
229 .map_err(|e| anyhow!("Failed to read secret from {path:?}: {e}"))
230 .map(|s| s.trim().to_string())
231 }
232 SecretBackend::Memory { secrets } => {
233 let secrets = secrets.read().await;
234 secrets
235 .get(name)
236 .cloned()
237 .ok_or_else(|| anyhow!("Secret {name} not found"))
238 }
239 _ => {
240 return Err(anyhow!("Secret backend not implemented"));
242 }
243 }?;
244
245 let cached = CachedSecret {
247 value: value.clone(),
248 cached_at: std::time::Instant::now(),
249 expires_at: Some(std::time::Instant::now() + self.rotation_interval),
250 version: 1,
251 };
252
253 self.cache.write().await.insert(name.to_string(), cached);
254 Ok(value)
255 }
256
257 pub async fn set_secret(&self, name: &str, value: &str) -> Result<()> {
259 match &self.backend {
260 SecretBackend::Memory { secrets } => {
261 secrets
262 .write()
263 .await
264 .insert(name.to_string(), value.to_string());
265 self.cache.write().await.remove(name);
266 Ok(())
267 }
268 _ => Err(anyhow!("Set secret only supported for memory backend")),
269 }
270 }
271
272 pub async fn rotate_secrets(&self) -> Result<()> {
274 self.cache.write().await.clear();
275 info!("Rotated all cached secrets");
276 Ok(())
277 }
278}
279
280pub struct TlsManager {
282 certs: Arc<RwLock<HashMap<String, TlsCertificate>>>,
284 cert_paths: HashMap<String, CertPaths>,
286 auto_reload: bool,
288}
289
290#[derive(Debug, Clone)]
292pub struct TlsCertificate {
293 pub cert_pem: Vec<u8>,
294 pub key_pem: Vec<u8>,
295 pub ca_pem: Option<Vec<u8>>,
296 pub loaded_at: std::time::Instant,
297 pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
298}
299
300#[derive(Debug, Clone)]
302struct CertPaths {
303 cert_path: PathBuf,
304 key_path: PathBuf,
305 ca_path: Option<PathBuf>,
306}
307
308impl TlsManager {
309 pub fn new(auto_reload: bool) -> Self {
311 Self {
312 certs: Arc::new(RwLock::new(HashMap::new())),
313 cert_paths: HashMap::new(),
314 auto_reload,
315 }
316 }
317
318 fn parse_certificate_expiration(cert_pem: &[u8]) -> Option<chrono::DateTime<chrono::Utc>> {
320 let cert_str = String::from_utf8_lossy(cert_pem);
323
324 if cert_str.contains("-----BEGIN CERTIFICATE-----") {
327 Some(chrono::Utc::now() + chrono::Duration::days(365))
330 } else {
331 None
332 }
333 }
334
335 pub async fn load_certificate(
337 &self,
338 name: &str,
339 cert_path: &Path,
340 key_path: &Path,
341 ca_path: Option<&Path>,
342 ) -> Result<()> {
343 let cert_pem = fs::read(cert_path)
344 .map_err(|e| anyhow!("Failed to read certificate {}: {}", cert_path.display(), e))?;
345
346 let key_pem = fs::read(key_path)
347 .map_err(|e| anyhow!("Failed to read key {}: {}", key_path.display(), e))?;
348
349 let ca_pem = if let Some(ca) = ca_path {
350 Some(fs::read(ca).map_err(|e| anyhow!("Failed to read CA {}: {}", ca.display(), e))?)
351 } else {
352 None
353 };
354
355 let expires_at = Self::parse_certificate_expiration(&cert_pem);
356
357 let cert = TlsCertificate {
358 cert_pem,
359 key_pem,
360 ca_pem,
361 loaded_at: std::time::Instant::now(),
362 expires_at,
363 };
364
365 self.certs.write().await.insert(name.to_string(), cert);
366
367 info!("Loaded TLS certificate: {}", name);
368 Ok(())
369 }
370
371 pub async fn get_certificate(&self, name: &str) -> Result<TlsCertificate> {
373 self.certs
374 .read()
375 .await
376 .get(name)
377 .cloned()
378 .ok_or_else(|| anyhow!("Certificate {} not found", name))
379 }
380
381 pub async fn check_expiration(&self) -> Vec<(String, chrono::DateTime<chrono::Utc>)> {
383 let certs = self.certs.read().await;
384 let mut expiring = Vec::new();
385
386 for (name, cert) in certs.iter() {
387 if let Some(expires) = cert.expires_at {
388 let days_until = (expires - chrono::Utc::now()).num_days();
389 if days_until < 30 {
390 expiring.push((name.clone(), expires));
391 }
392 }
393 }
394
395 expiring
396 }
397}
398
399#[derive(Debug, Clone, Serialize, Deserialize)]
401pub struct PerformanceProfile {
402 pub name: String,
403 pub description: String,
404 pub settings: StreamPerformanceConfig,
405 pub recommended_for: Vec<String>,
406}
407
408impl ConfigManager {
409 pub async fn new(sources: Vec<ConfigSource>) -> Result<Self> {
411 let environment = Environment::detect();
412 let (tx, _) = broadcast::channel(100);
413
414 let secret_backend = SecretBackend::Environment {
416 prefix: "OXIRS_SECRET".to_string(),
417 };
418 let secret_manager = Arc::new(SecretManager::new(
419 secret_backend,
420 Duration::from_secs(3600),
421 ));
422
423 let tls_manager = Arc::new(TlsManager::new(true));
425
426 let initial_config = StreamConfig::default();
428
429 let mut manager = Self {
430 current_config: Arc::new(RwLock::new(initial_config)),
431 sources,
432 change_notifier: tx,
433 secret_manager,
434 environment,
435 performance_profiles: Self::create_default_profiles(),
436 tls_manager,
437 };
438
439 manager.reload().await?;
441
442 Ok(manager)
443 }
444
445 fn create_default_profiles() -> HashMap<String, PerformanceProfile> {
447 let mut profiles = HashMap::new();
448
449 profiles.insert(
451 "low-latency".to_string(),
452 PerformanceProfile {
453 name: "low-latency".to_string(),
454 description: "Optimized for minimal latency".to_string(),
455 settings: StreamPerformanceConfig {
456 enable_batching: false,
457 enable_pipelining: true,
458 buffer_size: 4096,
459 prefetch_count: 10,
460 enable_zero_copy: true,
461 enable_simd: true,
462 parallel_processing: true,
463 worker_threads: Some(4),
464 },
465 recommended_for: vec!["real-time".to_string(), "trading".to_string()],
466 },
467 );
468
469 profiles.insert(
471 "high-throughput".to_string(),
472 PerformanceProfile {
473 name: "high-throughput".to_string(),
474 description: "Optimized for maximum throughput".to_string(),
475 settings: StreamPerformanceConfig {
476 enable_batching: true,
477 enable_pipelining: true,
478 buffer_size: 65536,
479 prefetch_count: 1000,
480 enable_zero_copy: true,
481 enable_simd: true,
482 parallel_processing: true,
483 worker_threads: None, },
485 recommended_for: vec!["batch-processing".to_string(), "etl".to_string()],
486 },
487 );
488
489 profiles.insert(
491 "balanced".to_string(),
492 PerformanceProfile {
493 name: "balanced".to_string(),
494 description: "Balanced between latency and throughput".to_string(),
495 settings: StreamPerformanceConfig::default(),
496 recommended_for: vec!["general".to_string(), "web-services".to_string()],
497 },
498 );
499
500 profiles.insert(
502 "resource-constrained".to_string(),
503 PerformanceProfile {
504 name: "resource-constrained".to_string(),
505 description: "Optimized for limited resources".to_string(),
506 settings: StreamPerformanceConfig {
507 enable_batching: true,
508 enable_pipelining: false,
509 buffer_size: 2048,
510 prefetch_count: 10,
511 enable_zero_copy: false,
512 enable_simd: false,
513 parallel_processing: false,
514 worker_threads: Some(2),
515 },
516 recommended_for: vec!["edge".to_string(), "iot".to_string()],
517 },
518 );
519
520 profiles
521 }
522
523 pub async fn reload(&mut self) -> Result<()> {
525 let old_config = self.current_config.read().await.clone();
526 let mut new_config = old_config.clone();
527
528 let defaults = self.environment.get_defaults();
530 new_config.max_connections = defaults.max_connections;
531 new_config.connection_timeout = Duration::from_secs(defaults.connection_timeout_secs);
532 new_config.enable_compression = defaults.enable_compression;
533 new_config.monitoring.enable_metrics = defaults.enable_metrics;
534 new_config.monitoring.enable_profiling = defaults.enable_profiling;
535
536 for source in &self.sources {
538 match source {
539 ConfigSource::File { path } => {
540 if let Ok(content) = fs::read_to_string(path) {
541 if let Ok(file_config) = toml::from_str::<StreamConfig>(&content) {
542 new_config = self.merge_configs(new_config, file_config);
543 }
544 }
545 }
546 ConfigSource::Environment { prefix } => {
547 new_config = self.load_from_env(new_config, prefix).await?;
548 }
549 ConfigSource::Memory { data } => {
550 new_config = self.apply_overrides(new_config, data.clone());
551 }
552 _ => {
553 }
555 }
556 }
557
558 new_config = self.apply_secrets(new_config).await?;
560
561 self.validate_config(&new_config)?;
563
564 *self.current_config.write().await = new_config.clone();
566
567 let _ = self.change_notifier.send(ConfigChangeEvent::Reloaded {
569 old_config: Box::new(old_config),
570 new_config: Box::new(new_config),
571 });
572
573 info!("Configuration reloaded successfully");
574 Ok(())
575 }
576
577 async fn load_from_env(&self, mut config: StreamConfig, prefix: &str) -> Result<StreamConfig> {
579 if let Ok(backend) = std::env::var(format!("{prefix}_BACKEND")) {
581 config.backend = match backend.as_str() {
582 "kafka" => {
583 #[cfg(feature = "kafka")]
584 {
585 let brokers: Vec<String> = std::env::var(format!("{prefix}_KAFKA_BROKERS"))
586 .unwrap_or_else(|_| "localhost:9092".to_string())
587 .split(',')
588 .map(|s| s.to_string())
589 .collect();
590 StreamBackendType::Kafka {
591 brokers,
592 security_protocol: std::env::var(format!("{}_KAFKA_SECURITY", prefix))
593 .ok(),
594 sasl_config: None,
595 }
596 }
597 #[cfg(not(feature = "kafka"))]
598 {
599 let _ = std::env::var(format!("{prefix}_KAFKA_BROKERS"));
600 StreamBackendType::Memory {
601 max_size: Some(10000),
602 persistence: false,
603 }
604 }
605 }
606 "memory" => StreamBackendType::Memory {
607 max_size: Some(10000),
608 persistence: false,
609 },
610 _ => config.backend,
611 };
612 }
613
614 if let Ok(max_conn) = std::env::var(format!("{prefix}_MAX_CONNECTIONS")) {
616 if let Ok(val) = max_conn.parse() {
617 config.max_connections = val;
618 }
619 }
620
621 if let Ok(compression) = std::env::var(format!("{prefix}_COMPRESSION")) {
623 config.compression_type = match compression.as_str() {
624 "gzip" => CompressionType::Gzip,
625 "snappy" => CompressionType::Snappy,
626 "lz4" => CompressionType::Lz4,
627 "zstd" => CompressionType::Zstd,
628 _ => CompressionType::None,
629 };
630 config.enable_compression = compression != "none";
631 }
632
633 Ok(config)
634 }
635
636 async fn apply_secrets(&self, mut config: StreamConfig) -> Result<StreamConfig> {
638 #[cfg(feature = "kafka")]
640 if let StreamBackendType::Kafka {
641 brokers,
642 security_protocol,
643 sasl_config: _,
644 } = &config.backend
645 {
646 if security_protocol.as_deref() == Some("SASL_SSL") {
647 if let Ok(username) = self.secret_manager.get_secret("kafka_username").await {
648 if let Ok(password) = self.secret_manager.get_secret("kafka_password").await {
649 #[cfg(feature = "kafka")]
650 {
651 config.backend = StreamBackendType::Kafka {
652 brokers: brokers.clone(),
653 security_protocol: security_protocol.clone(),
654 sasl_config: Some(crate::SaslConfig {
655 mechanism: crate::SaslMechanism::ScramSha256,
656 username,
657 password,
658 }),
659 };
660 }
661 }
662 }
663 }
664 }
665
666 if config.security.enable_tls {
668 if let Ok(_cert) = self.tls_manager.get_certificate("client").await {
669 config.security.client_cert_path = Some("/tmp/client.crt".to_string());
671 config.security.client_key_path = Some("/tmp/client.key".to_string());
672 }
673 }
674
675 Ok(config)
676 }
677
678 fn merge_configs(&self, _base: StreamConfig, override_config: StreamConfig) -> StreamConfig {
680 override_config
683 }
684
685 fn apply_overrides(
687 &self,
688 mut config: StreamConfig,
689 overrides: HashMap<String, String>,
690 ) -> StreamConfig {
691 for (key, value) in overrides {
692 match key.as_str() {
693 "topic" => config.topic = value,
694 "batch_size" => {
695 if let Ok(size) = value.parse() {
696 config.batch_size = size;
697 }
698 }
699 "max_connections" => {
700 if let Ok(max) = value.parse() {
701 config.max_connections = max;
702 }
703 }
704 _ => {}
705 }
706 }
707 config
708 }
709
710 fn validate_config(&self, config: &StreamConfig) -> Result<()> {
712 if config.max_connections == 0 {
714 return Err(anyhow!("max_connections must be greater than 0"));
715 }
716
717 if config.batch_size == 0 {
719 return Err(anyhow!("batch_size must be greater than 0"));
720 }
721
722 if config.topic.is_empty() {
724 return Err(anyhow!("topic name cannot be empty"));
725 }
726
727 match &config.backend {
729 #[cfg(feature = "kafka")]
730 StreamBackendType::Kafka { brokers, .. } if brokers.is_empty() => {
731 return Err(anyhow!("Kafka brokers list cannot be empty"));
732 }
733 _ => {}
734 }
735
736 Ok(())
737 }
738
739 pub async fn get_config(&self) -> StreamConfig {
741 self.current_config.read().await.clone()
742 }
743
744 pub fn subscribe(&self) -> broadcast::Receiver<ConfigChangeEvent> {
746 self.change_notifier.subscribe()
747 }
748
749 pub async fn apply_performance_profile(&mut self, profile_name: &str) -> Result<()> {
751 let profile = self
752 .performance_profiles
753 .get(profile_name)
754 .ok_or_else(|| anyhow!("Performance profile {} not found", profile_name))?
755 .clone();
756
757 let mut config = self.current_config.write().await;
758 config.performance = profile.settings;
759
760 info!("Applied performance profile: {}", profile_name);
761 Ok(())
762 }
763
764 pub fn get_performance_profiles(&self) -> Vec<&PerformanceProfile> {
766 self.performance_profiles.values().collect()
767 }
768
769 pub async fn update_value(&mut self, key: &str, value: String) -> Result<()> {
771 let mut overrides = HashMap::new();
772 overrides.insert(key.to_string(), value);
773
774 let current = self.current_config.read().await.clone();
775 let updated = self.apply_overrides(current, overrides);
776
777 self.validate_config(&updated)?;
778 *self.current_config.write().await = updated;
779
780 info!("Updated configuration key: {}", key);
781 Ok(())
782 }
783
784 pub fn secret_manager(&self) -> &Arc<SecretManager> {
786 &self.secret_manager
787 }
788
789 pub fn tls_manager(&self) -> &Arc<TlsManager> {
791 &self.tls_manager
792 }
793}
794
795pub struct ConfigBuilder {
797 sources: Vec<ConfigSource>,
798 environment: Option<Environment>,
799}
800
801impl ConfigBuilder {
802 pub fn new() -> Self {
804 Self {
805 sources: Vec::new(),
806 environment: None,
807 }
808 }
809
810 pub fn with_file(mut self, path: impl Into<PathBuf>) -> Self {
812 self.sources.push(ConfigSource::File { path: path.into() });
813 self
814 }
815
816 pub fn with_env(mut self, prefix: impl Into<String>) -> Self {
818 self.sources.push(ConfigSource::Environment {
819 prefix: prefix.into(),
820 });
821 self
822 }
823
824 pub fn with_overrides(mut self, overrides: HashMap<String, String>) -> Self {
826 self.sources.push(ConfigSource::Memory { data: overrides });
827 self
828 }
829
830 pub fn with_environment(mut self, env: Environment) -> Self {
832 self.environment = Some(env);
833 self
834 }
835
836 pub async fn build(self) -> Result<ConfigManager> {
838 ConfigManager::new(self.sources).await
839 }
840}
841
842impl Default for ConfigBuilder {
843 fn default() -> Self {
844 Self::new()
845 }
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851
852 #[tokio::test]
853 async fn test_environment_detection() {
854 let env = Environment::detect();
855 let defaults = env.get_defaults();
856 assert!(defaults.max_connections > 0);
857 }
858
859 #[tokio::test]
860 async fn test_config_builder() {
861 let mut overrides = HashMap::new();
862 overrides.insert("topic".to_string(), "test-topic".to_string());
863
864 let manager = ConfigBuilder::new()
865 .with_env("OXIRS")
866 .with_overrides(overrides)
867 .build()
868 .await
869 .unwrap();
870
871 let config = manager.get_config().await;
872 assert_eq!(config.topic, "test-topic");
873 }
874
875 #[tokio::test]
876 async fn test_secret_manager() {
877 let backend = SecretBackend::Memory {
878 secrets: Arc::new(RwLock::new(HashMap::new())),
879 };
880 let manager = SecretManager::new(backend, Duration::from_secs(60));
881
882 manager.set_secret("test_key", "test_value").await.unwrap();
884 let value = manager.get_secret("test_key").await.unwrap();
885 assert_eq!(value, "test_value");
886
887 let value2 = manager.get_secret("test_key").await.unwrap();
889 assert_eq!(value2, "test_value");
890 }
891
892 #[tokio::test]
893 async fn test_performance_profiles() {
894 let manager = ConfigBuilder::new().build().await.unwrap();
895 let profiles = manager.get_performance_profiles();
896
897 assert!(profiles.len() >= 4);
898 assert!(profiles.iter().any(|p| p.name == "low-latency"));
899 assert!(profiles.iter().any(|p| p.name == "high-throughput"));
900 }
901
902 #[tokio::test]
903 async fn test_config_validation() {
904 let manager = ConfigBuilder::new().build().await.unwrap();
905
906 let invalid_config = StreamConfig {
908 max_connections: 0,
909 ..Default::default()
910 };
911
912 assert!(manager.validate_config(&invalid_config).is_err());
913 }
914}