adaptive_pipeline/infrastructure/config/
generic_config_manager.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Generic Configuration Manager
9//!
10//! This module provides a generic, reusable configuration management system
11//! for the adaptive pipeline system. It supports validation, versioning,
12//! migration, and hot-reloading of configuration data.
13//!
14//! ## Overview
15//!
16//! The generic configuration manager provides:
17//!
18//! - **Type-Safe Configuration**: Generic configuration management for any type
19//! - **Validation**: Comprehensive validation with detailed error reporting
20//! - **Versioning**: Configuration schema versioning and migration support
21//! - **Hot Reloading**: Runtime configuration updates without restart
22//! - **Thread Safety**: Safe concurrent access to configuration data
23//!
24//! ## Architecture
25//!
26//! The configuration manager follows generic design patterns:
27//!
28//! - **Generic Design**: Works with any configuration type implementing
29//!   required traits
30//! - **Validation Framework**: Pluggable validation with detailed error
31//!   reporting
32//! - **Migration System**: Automatic migration between configuration versions
33//! - **Event System**: Configuration change notifications and observers
34//!
35//! ## Key Features
36//!
37//! ### Configuration Validation
38//!
39//! - **Schema Validation**: Validate configuration against defined schemas
40//! - **Business Rules**: Enforce business logic and constraints
41//! - **Error Reporting**: Detailed error messages with field-level information
42//! - **Warning System**: Non-fatal warnings for configuration issues
43//!
44//! ### Version Management
45//!
46//! - **Schema Versioning**: Track configuration schema versions
47//! - **Migration Support**: Automatic migration between versions
48//! - **Backward Compatibility**: Support for older configuration formats
49//! - **Version Detection**: Automatic detection of configuration versions
50//!
51//! ### Hot Reloading
52//!
53//! - **Runtime Updates**: Update configuration without application restart
54//! - **Change Detection**: Detect configuration file changes
55//! - **Atomic Updates**: Atomic configuration updates to prevent inconsistency
56//! - **Rollback Support**: Rollback to previous configuration on errors
57//!
58//! ## Usage Examples
59//!
60//! ### Basic Configuration Management
61
62//!
63//! ### Configuration with Hot Reloading
64
65//!
66//! ### Configuration Migration
67
68//!
69//! ## Validation Framework
70//!
71//! ### Validation Results
72//!
73//! The validation system provides detailed feedback:
74//!
75//! - **Errors**: Critical validation failures that prevent usage
76//! - **Warnings**: Non-critical issues that should be addressed
77//! - **Field-Level Information**: Specific field names and error contexts
78//! - **Severity Levels**: Different severity levels for different issues
79//!
80//! ### Custom Validation Rules
81//!
82//! - **Business Logic**: Implement custom business logic validation
83//! - **Cross-Field Validation**: Validate relationships between fields
84//! - **External Validation**: Validate against external resources
85//! - **Conditional Validation**: Conditional validation based on other fields
86//!
87//! ## Migration System
88//!
89//! ### Migration Strategies
90//!
91//! - **Automatic Migration**: Automatic migration between compatible versions
92//! - **Manual Migration**: Manual migration for complex changes
93//! - **Incremental Migration**: Step-by-step migration through versions
94//! - **Rollback Support**: Rollback to previous versions on failure
95//!
96//! ### Version Compatibility
97//!
98//! - **Semantic Versioning**: Use semantic versioning for schema versions
99//! - **Compatibility Matrix**: Define compatibility between versions
100//! - **Breaking Changes**: Handle breaking changes gracefully
101//! - **Deprecation Warnings**: Warn about deprecated configuration options
102//!
103//! ## Performance Considerations
104//!
105//! ### Memory Usage
106//!
107//! - **Efficient Storage**: Efficient storage of configuration data
108//! - **Lazy Loading**: Lazy loading of configuration sections
109//! - **Memory Pooling**: Reuse configuration objects when possible
110//!
111//! ### Access Performance
112//!
113//! - **Caching**: Cache frequently accessed configuration values
114//! - **Read Optimization**: Optimize for frequent read operations
115//! - **Lock Contention**: Minimize lock contention for concurrent access
116//!
117//! ## Error Handling
118//!
119//! ### Configuration Errors
120//!
121//! - **Parse Errors**: Handle configuration file parsing errors
122//! - **Validation Errors**: Comprehensive validation error reporting
123//! - **Migration Errors**: Handle migration failures gracefully
124//! - **File System Errors**: Handle file system access errors
125//!
126//! ### Recovery Strategies
127//!
128//! - **Default Values**: Use default values for missing configuration
129//! - **Fallback Configuration**: Fallback to previous valid configuration
130//! - **Error Reporting**: Detailed error reporting with suggestions
131//! - **Graceful Degradation**: Continue operation with reduced functionality
132//!
133//! ## Integration
134//!
135//! The configuration manager integrates with:
136//!
137//! - **File System**: Load configuration from various file formats
138//! - **Environment Variables**: Override configuration with environment
139//!   variables
140//! - **Command Line**: Override configuration with command line arguments
141//! - **External Services**: Load configuration from external configuration
142//!   services
143//!
144//! ## Thread Safety
145//!
146//! The configuration manager is fully thread-safe:
147//!
148//! - **Concurrent Access**: Safe concurrent access to configuration data
149//! - **Atomic Updates**: Atomic configuration updates
150//! - **Lock-Free Reads**: Lock-free reads for better performance
151//!
152//! ## Future Enhancements
153//!
154//! Planned enhancements include:
155//!
156//! - **Configuration UI**: Web-based configuration management interface
157//! - **A/B Testing**: Support for A/B testing of configuration changes
158//! - **Configuration Validation Service**: External validation service
159//!   integration
160//! - **Configuration Analytics**: Analytics and monitoring of configuration
161//!   usage
162
163use adaptive_pipeline_domain::error::PipelineError;
164use adaptive_pipeline_domain::services::datetime_serde;
165use async_trait::async_trait;
166use serde::de::DeserializeOwned;
167use serde::{Deserialize, Serialize};
168use std::collections::HashMap;
169use std::fmt::Debug;
170use std::sync::{Arc, RwLock};
171
172/// Generic trait for configuration validation with detailed error reporting
173///
174/// This trait defines the interface for configuration validation, enabling
175/// type-safe validation of configuration data with comprehensive error
176/// reporting and schema versioning support.
177///
178/// # Key Features
179///
180/// - **Validation Logic**: Implement custom validation rules for configuration
181/// - **Error Reporting**: Detailed error messages with field-level information
182/// - **Schema Versioning**: Track and manage configuration schema versions
183/// - **Migration Support**: Automatic migration between configuration versions
184///
185/// # Implementation Requirements
186///
187/// Implementing types must:
188/// - Be cloneable for configuration updates
189/// - Be debuggable for error reporting
190/// - Be thread-safe (`Send + Sync`)
191/// - Have a stable lifetime (`'static`)
192///
193/// # Examples
194pub trait ConfigValidation: Clone + Debug + Send + Sync + 'static {
195    /// Validates the configuration and returns detailed validation results
196    fn validate(&self) -> ConfigValidationResult;
197
198    /// Returns the configuration schema version for compatibility checking
199    fn schema_version(&self) -> String;
200
201    /// Migrates configuration from an older schema version
202    fn migrate_from_version(&self, from_version: &str, data: &str) -> Result<Self, PipelineError>;
203}
204
205/// Result of configuration validation with detailed error information
206#[derive(Debug, Clone)]
207pub struct ConfigValidationResult {
208    pub is_valid: bool,
209    pub errors: Vec<ConfigValidationError>,
210    pub warnings: Vec<ConfigValidationWarning>,
211}
212
213impl ConfigValidationResult {
214    pub fn valid() -> Self {
215        Self {
216            is_valid: true,
217            errors: Vec::new(),
218            warnings: Vec::new(),
219        }
220    }
221
222    pub fn invalid(errors: Vec<ConfigValidationError>) -> Self {
223        Self {
224            is_valid: false,
225            errors,
226            warnings: Vec::new(),
227        }
228    }
229
230    pub fn with_warnings(mut self, warnings: Vec<ConfigValidationWarning>) -> Self {
231        self.warnings = warnings;
232        self
233    }
234
235    pub fn add_error(&mut self, field: String, message: String) {
236        self.errors.push(ConfigValidationError { field, message });
237        self.is_valid = false;
238    }
239
240    pub fn add_warning(&mut self, field: String, message: String) {
241        self.warnings.push(ConfigValidationWarning { field, message });
242    }
243}
244
245#[derive(Debug, Clone)]
246pub struct ConfigValidationError {
247    pub field: String,
248    pub message: String,
249}
250
251#[derive(Debug, Clone)]
252pub struct ConfigValidationWarning {
253    pub field: String,
254    pub message: String,
255}
256
257/// Configuration source trait for loading configurations from different sources
258#[async_trait]
259pub trait ConfigSource: Send + Sync {
260    /// Loads configuration data as a string
261    async fn load(&self) -> Result<String, PipelineError>;
262
263    /// Saves configuration data
264    async fn save(&self, data: &str) -> Result<(), PipelineError>;
265
266    /// Checks if the source exists and is accessible
267    async fn exists(&self) -> bool;
268
269    /// Gets the source identifier (e.g., file path, URL)
270    fn source_id(&self) -> String;
271}
272
273/// File-based configuration source
274pub struct FileConfigSource {
275    file_path: String,
276}
277
278impl FileConfigSource {
279    pub fn new(file_path: String) -> Self {
280        Self { file_path }
281    }
282}
283
284#[async_trait]
285impl ConfigSource for FileConfigSource {
286    async fn load(&self) -> Result<String, PipelineError> {
287        tokio::fs::read_to_string(&self.file_path)
288            .await
289            .map_err(|e| PipelineError::InternalError(format!("Failed to read config file {}: {}", self.file_path, e)))
290    }
291
292    async fn save(&self, data: &str) -> Result<(), PipelineError> {
293        tokio::fs::write(&self.file_path, data)
294            .await
295            .map_err(|e| PipelineError::InternalError(format!("Failed to write config file {}: {}", self.file_path, e)))
296    }
297
298    async fn exists(&self) -> bool {
299        tokio::fs::metadata(&self.file_path).await.is_ok()
300    }
301
302    fn source_id(&self) -> String {
303        self.file_path.clone()
304    }
305}
306
307/// Environment variable configuration source
308pub struct EnvConfigSource {
309    prefix: String,
310}
311
312impl EnvConfigSource {
313    pub fn new(prefix: String) -> Self {
314        Self { prefix }
315    }
316}
317
318#[async_trait]
319impl ConfigSource for EnvConfigSource {
320    async fn load(&self) -> Result<String, PipelineError> {
321        let mut config_map = HashMap::new();
322
323        for (key, value) in std::env::vars() {
324            if key.starts_with(&self.prefix) {
325                let config_key = key.strip_prefix(&self.prefix).unwrap_or(&key);
326                config_map.insert(config_key.to_lowercase(), value);
327            }
328        }
329
330        serde_json::to_string(&config_map)
331            .map_err(|e| PipelineError::InternalError(format!("Failed to serialize env config: {}", e)))
332    }
333
334    async fn save(&self, _data: &str) -> Result<(), PipelineError> {
335        Err(PipelineError::InternalError(
336            "Cannot save to environment variables".to_string(),
337        ))
338    }
339
340    async fn exists(&self) -> bool {
341        std::env::vars().any(|(key, _)| key.starts_with(&self.prefix))
342    }
343
344    fn source_id(&self) -> String {
345        format!("env:{}", self.prefix)
346    }
347}
348
349/// Configuration change event
350#[derive(Debug, Clone, Serialize, Deserialize)]
351pub struct ConfigChangeEvent<T> {
352    pub config_type: String,
353    pub old_config: Option<T>,
354    pub new_config: T,
355    #[serde(with = "datetime_serde")]
356    pub changed_at: chrono::DateTime<chrono::Utc>,
357    pub change_reason: String,
358    pub changed_by: String,
359}
360
361/// Configuration change listener trait
362#[async_trait]
363pub trait ConfigChangeListener<T>: Send + Sync
364where
365    T: ConfigValidation + Serialize + DeserializeOwned,
366{
367    /// Called when configuration changes
368    async fn on_config_changed(&self, event: ConfigChangeEvent<T>) -> Result<(), PipelineError>;
369}
370
371/// Generic configuration manager providing centralized configuration management
372pub struct GenericConfigManager<T>
373where
374    T: ConfigValidation + Serialize + DeserializeOwned,
375{
376    config: RwLock<T>,
377    sources: Vec<Arc<dyn ConfigSource>>,
378    listeners: Vec<Arc<dyn ConfigChangeListener<T>>>,
379    change_history: RwLock<Vec<ConfigChangeEvent<T>>>,
380    auto_reload: bool,
381}
382
383impl<T> GenericConfigManager<T>
384where
385    T: ConfigValidation + Serialize + DeserializeOwned,
386{
387    /// Creates a new configuration manager with default configuration
388    pub fn new(default_config: T) -> Self {
389        Self {
390            config: RwLock::new(default_config),
391            sources: Vec::new(),
392            listeners: Vec::new(),
393            change_history: RwLock::new(Vec::new()),
394            auto_reload: false,
395        }
396    }
397
398    /// Adds a configuration source
399    pub fn add_source(mut self, source: Arc<dyn ConfigSource>) -> Self {
400        self.sources.push(source);
401        self
402    }
403
404    /// Adds a configuration change listener
405    pub fn add_listener(mut self, listener: Arc<dyn ConfigChangeListener<T>>) -> Self {
406        self.listeners.push(listener);
407        self
408    }
409
410    /// Enables or disables automatic configuration reloading
411    pub fn with_auto_reload(mut self, auto_reload: bool) -> Self {
412        self.auto_reload = auto_reload;
413        self
414    }
415
416    /// Gets the current configuration
417    pub fn get_config(&self) -> Result<T, PipelineError> {
418        self.config
419            .read()
420            .map_err(|e| PipelineError::InternalError(format!("Failed to read config: {}", e)))
421            .map(|config| config.clone())
422    }
423
424    /// Updates the configuration with validation
425    pub async fn update_config(
426        &self,
427        new_config: T,
428        change_reason: String,
429        changed_by: String,
430    ) -> Result<(), PipelineError> {
431        // Validate the new configuration
432        let validation_result = new_config.validate();
433        if !validation_result.is_valid {
434            let error_messages: Vec<String> = validation_result
435                .errors
436                .iter()
437                .map(|e| format!("{}: {}", e.field, e.message))
438                .collect();
439            return Err(PipelineError::InvalidConfiguration(format!(
440                "Configuration validation failed: {}",
441                error_messages.join(", ")
442            )));
443        }
444
445        // Get the old configuration for the change event
446        let old_config = self.get_config().ok();
447
448        // Update the configuration
449        {
450            let mut config = self
451                .config
452                .write()
453                .map_err(|e| PipelineError::InternalError(format!("Failed to write config: {}", e)))?;
454            *config = new_config.clone();
455        }
456
457        // Create change event
458        let change_event = ConfigChangeEvent {
459            config_type: std::any::type_name::<T>().to_string(),
460            old_config,
461            new_config: new_config.clone(),
462            changed_at: chrono::Utc::now(),
463            change_reason,
464            changed_by,
465        };
466
467        // Record the change
468        if let Ok(mut history) = self.change_history.write() {
469            history.push(change_event.clone());
470            // Keep only the last 100 changes
471            if history.len() > 100 {
472                history.remove(0);
473            }
474        }
475
476        // Notify listeners
477        for listener in &self.listeners {
478            if let Err(e) = listener.on_config_changed(change_event.clone()).await {
479                eprintln!("Config change listener error: {}", e);
480            }
481        }
482
483        Ok(())
484    }
485
486    /// Loads configuration from all sources in order
487    pub async fn load_from_sources(&self, changed_by: String) -> Result<(), PipelineError> {
488        let mut merged_config = None;
489
490        for source in &self.sources {
491            if source.exists().await {
492                let config_data = source.load().await?;
493                let config: T = serde_json::from_str(&config_data).map_err(|e| {
494                    PipelineError::InternalError(format!("Failed to parse config from {}: {}", source.source_id(), e))
495                })?;
496
497                merged_config = Some(match merged_config {
498                    Some(_existing) => {
499                        // In a real implementation, you'd want a merge strategy
500                        // For now, later sources override earlier ones
501                        config
502                    }
503                    None => config,
504                });
505            }
506        }
507
508        if let Some(config) = merged_config {
509            self.update_config(config, "Loaded from sources".to_string(), changed_by)
510                .await?;
511        }
512
513        Ok(())
514    }
515
516    /// Saves current configuration to the first writable source
517    pub async fn save_to_source(&self) -> Result<(), PipelineError> {
518        let config = self.get_config()?;
519        let config_data = serde_json::to_string_pretty(&config)
520            .map_err(|e| PipelineError::InternalError(format!("Failed to serialize config: {}", e)))?;
521
522        for source in &self.sources {
523            if let Ok(()) = source.save(&config_data).await {
524                return Ok(());
525            }
526        }
527
528        Err(PipelineError::InternalError(
529            "No writable configuration source available".to_string(),
530        ))
531    }
532
533    /// Gets configuration change history
534    pub fn get_change_history(&self) -> Vec<ConfigChangeEvent<T>> {
535        self.change_history
536            .read()
537            .map(|history| history.clone())
538            .unwrap_or_default()
539    }
540
541    /// Validates current configuration
542    pub fn validate_current_config(&self) -> Result<ConfigValidationResult, PipelineError> {
543        let config = self.get_config()?;
544        Ok(config.validate())
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551
552    #[derive(Clone, Debug, Serialize, Deserialize)]
553    struct TestConfig {
554        database_url: String,
555        max_connections: u32,
556        timeout_seconds: u64,
557        features: Vec<String>,
558    }
559
560    impl ConfigValidation for TestConfig {
561        fn validate(&self) -> ConfigValidationResult {
562            let mut result = ConfigValidationResult::valid();
563
564            if self.database_url.is_empty() {
565                result.add_error("database_url".to_string(), "Database URL cannot be empty".to_string());
566            }
567
568            if self.max_connections == 0 {
569                result.add_error(
570                    "max_connections".to_string(),
571                    "Max connections must be greater than 0".to_string(),
572                );
573            }
574
575            if self.max_connections > 1000 {
576                result.add_warning(
577                    "max_connections".to_string(),
578                    "Very high connection count may impact performance".to_string(),
579                );
580            }
581
582            if self.timeout_seconds == 0 {
583                result.add_error(
584                    "timeout_seconds".to_string(),
585                    "Timeout must be greater than 0".to_string(),
586                );
587            }
588
589            result
590        }
591
592        fn schema_version(&self) -> String {
593            "1.0.0".to_string()
594        }
595
596        fn migrate_from_version(&self, _from_version: &str, _data: &str) -> Result<Self, PipelineError> {
597            // Simple migration - just return self for now
598            Ok(self.clone())
599        }
600    }
601
602    impl Default for TestConfig {
603        fn default() -> Self {
604            Self {
605                database_url: "postgresql://localhost:5432/test".to_string(),
606                max_connections: 10,
607                timeout_seconds: 30,
608                features: vec!["feature1".to_string(), "feature2".to_string()],
609            }
610        }
611    }
612
613    #[tokio::test]
614    async fn test_config_manager_creation() {
615        let config_manager = GenericConfigManager::new(TestConfig::default());
616        let config = config_manager.get_config().unwrap();
617
618        assert_eq!(config.database_url, "postgresql://localhost:5432/test");
619        assert_eq!(config.max_connections, 10);
620    }
621
622    #[tokio::test]
623    async fn test_config_validation() {
624        let config_manager = GenericConfigManager::new(TestConfig::default());
625
626        let invalid_config = TestConfig {
627            database_url: "".to_string(),
628            max_connections: 0,
629            timeout_seconds: 0,
630            features: vec![],
631        };
632
633        let result = config_manager
634            .update_config(invalid_config, "Test update".to_string(), "test_user".to_string())
635            .await;
636
637        assert!(result.is_err());
638    }
639
640    #[tokio::test]
641    async fn test_config_change_history() {
642        let config_manager = GenericConfigManager::new(TestConfig::default());
643
644        let new_config = TestConfig {
645            database_url: "postgresql://localhost:5432/newdb".to_string(),
646            max_connections: 20,
647            timeout_seconds: 60,
648            features: vec!["new_feature".to_string()],
649        };
650
651        config_manager
652            .update_config(new_config, "Updated for testing".to_string(), "test_user".to_string())
653            .await
654            .unwrap();
655
656        let history = config_manager.get_change_history();
657        assert_eq!(history.len(), 1);
658        assert_eq!(history[0].change_reason, "Updated for testing");
659        assert_eq!(history[0].changed_by, "test_user");
660    }
661
662    /// Tests configuration validation result management and state tracking.
663    ///
664    /// This test validates that configuration validation results can
665    /// properly track validation state, errors, and warnings for
666    /// comprehensive configuration validation reporting.
667    ///
668    /// # Test Coverage
669    ///
670    /// - Valid validation result creation
671    /// - Initial state validation (valid, no errors)
672    /// - Error addition and state change
673    /// - Validation state update on error
674    /// - Warning addition functionality
675    /// - Error and warning collection management
676    ///
677    /// # Test Scenario
678    ///
679    /// Creates a valid validation result, adds errors and warnings,
680    /// and verifies that state and collections are managed correctly.
681    ///
682    /// # Domain Concerns
683    ///
684    /// - Configuration validation reporting
685    /// - Validation state management
686    /// - Error and warning collection
687    /// - Validation result aggregation
688    ///
689    /// # Assertions
690    ///
691    /// - Initial result is valid with no errors
692    /// - Adding error changes validity to false
693    /// - Error collection is updated correctly
694    /// - Warning collection is managed properly
695    /// - State tracking works correctly
696    #[test]
697    fn test_validation_result() {
698        let mut result = ConfigValidationResult::valid();
699        assert!(result.is_valid);
700        assert!(result.errors.is_empty());
701
702        result.add_error("field1".to_string(), "Error message".to_string());
703        assert!(!result.is_valid);
704        assert_eq!(result.errors.len(), 1);
705
706        result.add_warning("field2".to_string(), "Warning message".to_string());
707        assert_eq!(result.warnings.len(), 1);
708    }
709}