adaptive_pipeline/infrastructure/config/generic_config_manager.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Generic Configuration Manager
9//!
10//! This module provides a generic, reusable configuration management system
11//! for the adaptive pipeline system. It supports validation, versioning,
12//! migration, and hot-reloading of configuration data.
13//!
14//! ## Overview
15//!
16//! The generic configuration manager provides:
17//!
18//! - **Type-Safe Configuration**: Generic configuration management for any type
19//! - **Validation**: Comprehensive validation with detailed error reporting
20//! - **Versioning**: Configuration schema versioning and migration support
21//! - **Hot Reloading**: Runtime configuration updates without restart
22//! - **Thread Safety**: Safe concurrent access to configuration data
23//!
24//! ## Architecture
25//!
26//! The configuration manager follows generic design patterns:
27//!
28//! - **Generic Design**: Works with any configuration type implementing
29//! required traits
30//! - **Validation Framework**: Pluggable validation with detailed error
31//! reporting
32//! - **Migration System**: Automatic migration between configuration versions
33//! - **Event System**: Configuration change notifications and observers
34//!
35//! ## Key Features
36//!
37//! ### Configuration Validation
38//!
39//! - **Schema Validation**: Validate configuration against defined schemas
40//! - **Business Rules**: Enforce business logic and constraints
41//! - **Error Reporting**: Detailed error messages with field-level information
42//! - **Warning System**: Non-fatal warnings for configuration issues
43//!
44//! ### Version Management
45//!
46//! - **Schema Versioning**: Track configuration schema versions
47//! - **Migration Support**: Automatic migration between versions
48//! - **Backward Compatibility**: Support for older configuration formats
49//! - **Version Detection**: Automatic detection of configuration versions
50//!
51//! ### Hot Reloading
52//!
53//! - **Runtime Updates**: Update configuration without application restart
54//! - **Change Detection**: Detect configuration file changes
55//! - **Atomic Updates**: Atomic configuration updates to prevent inconsistency
56//! - **Rollback Support**: Rollback to previous configuration on errors
57//!
58//! ## Usage Examples
59//!
60//! ### Basic Configuration Management
61
62//!
63//! ### Configuration with Hot Reloading
64
65//!
66//! ### Configuration Migration
67
68//!
69//! ## Validation Framework
70//!
71//! ### Validation Results
72//!
73//! The validation system provides detailed feedback:
74//!
75//! - **Errors**: Critical validation failures that prevent usage
76//! - **Warnings**: Non-critical issues that should be addressed
77//! - **Field-Level Information**: Specific field names and error contexts
78//! - **Severity Levels**: Different severity levels for different issues
79//!
80//! ### Custom Validation Rules
81//!
82//! - **Business Logic**: Implement custom business logic validation
83//! - **Cross-Field Validation**: Validate relationships between fields
84//! - **External Validation**: Validate against external resources
85//! - **Conditional Validation**: Conditional validation based on other fields
86//!
87//! ## Migration System
88//!
89//! ### Migration Strategies
90//!
91//! - **Automatic Migration**: Automatic migration between compatible versions
92//! - **Manual Migration**: Manual migration for complex changes
93//! - **Incremental Migration**: Step-by-step migration through versions
94//! - **Rollback Support**: Rollback to previous versions on failure
95//!
96//! ### Version Compatibility
97//!
98//! - **Semantic Versioning**: Use semantic versioning for schema versions
99//! - **Compatibility Matrix**: Define compatibility between versions
100//! - **Breaking Changes**: Handle breaking changes gracefully
101//! - **Deprecation Warnings**: Warn about deprecated configuration options
102//!
103//! ## Performance Considerations
104//!
105//! ### Memory Usage
106//!
107//! - **Efficient Storage**: Efficient storage of configuration data
108//! - **Lazy Loading**: Lazy loading of configuration sections
109//! - **Memory Pooling**: Reuse configuration objects when possible
110//!
111//! ### Access Performance
112//!
113//! - **Caching**: Cache frequently accessed configuration values
114//! - **Read Optimization**: Optimize for frequent read operations
115//! - **Lock Contention**: Minimize lock contention for concurrent access
116//!
117//! ## Error Handling
118//!
119//! ### Configuration Errors
120//!
121//! - **Parse Errors**: Handle configuration file parsing errors
122//! - **Validation Errors**: Comprehensive validation error reporting
123//! - **Migration Errors**: Handle migration failures gracefully
124//! - **File System Errors**: Handle file system access errors
125//!
126//! ### Recovery Strategies
127//!
128//! - **Default Values**: Use default values for missing configuration
129//! - **Fallback Configuration**: Fallback to previous valid configuration
130//! - **Error Reporting**: Detailed error reporting with suggestions
131//! - **Graceful Degradation**: Continue operation with reduced functionality
132//!
133//! ## Integration
134//!
135//! The configuration manager integrates with:
136//!
137//! - **File System**: Load configuration from various file formats
138//! - **Environment Variables**: Override configuration with environment
139//! variables
140//! - **Command Line**: Override configuration with command line arguments
141//! - **External Services**: Load configuration from external configuration
142//! services
143//!
144//! ## Thread Safety
145//!
146//! The configuration manager is fully thread-safe:
147//!
148//! - **Concurrent Access**: Safe concurrent access to configuration data
149//! - **Atomic Updates**: Atomic configuration updates
150//! - **Lock-Free Reads**: Lock-free reads for better performance
151//!
152//! ## Future Enhancements
153//!
154//! Planned enhancements include:
155//!
156//! - **Configuration UI**: Web-based configuration management interface
157//! - **A/B Testing**: Support for A/B testing of configuration changes
158//! - **Configuration Validation Service**: External validation service
159//! integration
160//! - **Configuration Analytics**: Analytics and monitoring of configuration
161//! usage
162
163use adaptive_pipeline_domain::error::PipelineError;
164use adaptive_pipeline_domain::services::datetime_serde;
165use async_trait::async_trait;
166use serde::de::DeserializeOwned;
167use serde::{Deserialize, Serialize};
168use std::collections::HashMap;
169use std::fmt::Debug;
170use std::sync::{Arc, RwLock};
171
172/// Generic trait for configuration validation with detailed error reporting
173///
174/// This trait defines the interface for configuration validation, enabling
175/// type-safe validation of configuration data with comprehensive error
176/// reporting and schema versioning support.
177///
178/// # Key Features
179///
180/// - **Validation Logic**: Implement custom validation rules for configuration
181/// - **Error Reporting**: Detailed error messages with field-level information
182/// - **Schema Versioning**: Track and manage configuration schema versions
183/// - **Migration Support**: Automatic migration between configuration versions
184///
185/// # Implementation Requirements
186///
187/// Implementing types must:
188/// - Be cloneable for configuration updates
189/// - Be debuggable for error reporting
190/// - Be thread-safe (`Send + Sync`)
191/// - Have a stable lifetime (`'static`)
192///
193/// # Examples
194pub trait ConfigValidation: Clone + Debug + Send + Sync + 'static {
195 /// Validates the configuration and returns detailed validation results
196 fn validate(&self) -> ConfigValidationResult;
197
198 /// Returns the configuration schema version for compatibility checking
199 fn schema_version(&self) -> String;
200
201 /// Migrates configuration from an older schema version
202 fn migrate_from_version(&self, from_version: &str, data: &str) -> Result<Self, PipelineError>;
203}
204
205/// Result of configuration validation with detailed error information
206#[derive(Debug, Clone)]
207pub struct ConfigValidationResult {
208 pub is_valid: bool,
209 pub errors: Vec<ConfigValidationError>,
210 pub warnings: Vec<ConfigValidationWarning>,
211}
212
213impl ConfigValidationResult {
214 pub fn valid() -> Self {
215 Self {
216 is_valid: true,
217 errors: Vec::new(),
218 warnings: Vec::new(),
219 }
220 }
221
222 pub fn invalid(errors: Vec<ConfigValidationError>) -> Self {
223 Self {
224 is_valid: false,
225 errors,
226 warnings: Vec::new(),
227 }
228 }
229
230 pub fn with_warnings(mut self, warnings: Vec<ConfigValidationWarning>) -> Self {
231 self.warnings = warnings;
232 self
233 }
234
235 pub fn add_error(&mut self, field: String, message: String) {
236 self.errors.push(ConfigValidationError { field, message });
237 self.is_valid = false;
238 }
239
240 pub fn add_warning(&mut self, field: String, message: String) {
241 self.warnings.push(ConfigValidationWarning { field, message });
242 }
243}
244
245#[derive(Debug, Clone)]
246pub struct ConfigValidationError {
247 pub field: String,
248 pub message: String,
249}
250
251#[derive(Debug, Clone)]
252pub struct ConfigValidationWarning {
253 pub field: String,
254 pub message: String,
255}
256
257/// Configuration source trait for loading configurations from different sources
258#[async_trait]
259pub trait ConfigSource: Send + Sync {
260 /// Loads configuration data as a string
261 async fn load(&self) -> Result<String, PipelineError>;
262
263 /// Saves configuration data
264 async fn save(&self, data: &str) -> Result<(), PipelineError>;
265
266 /// Checks if the source exists and is accessible
267 async fn exists(&self) -> bool;
268
269 /// Gets the source identifier (e.g., file path, URL)
270 fn source_id(&self) -> String;
271}
272
273/// File-based configuration source
274pub struct FileConfigSource {
275 file_path: String,
276}
277
278impl FileConfigSource {
279 pub fn new(file_path: String) -> Self {
280 Self { file_path }
281 }
282}
283
284#[async_trait]
285impl ConfigSource for FileConfigSource {
286 async fn load(&self) -> Result<String, PipelineError> {
287 tokio::fs::read_to_string(&self.file_path)
288 .await
289 .map_err(|e| PipelineError::InternalError(format!("Failed to read config file {}: {}", self.file_path, e)))
290 }
291
292 async fn save(&self, data: &str) -> Result<(), PipelineError> {
293 tokio::fs::write(&self.file_path, data)
294 .await
295 .map_err(|e| PipelineError::InternalError(format!("Failed to write config file {}: {}", self.file_path, e)))
296 }
297
298 async fn exists(&self) -> bool {
299 tokio::fs::metadata(&self.file_path).await.is_ok()
300 }
301
302 fn source_id(&self) -> String {
303 self.file_path.clone()
304 }
305}
306
307/// Environment variable configuration source
308pub struct EnvConfigSource {
309 prefix: String,
310}
311
312impl EnvConfigSource {
313 pub fn new(prefix: String) -> Self {
314 Self { prefix }
315 }
316}
317
318#[async_trait]
319impl ConfigSource for EnvConfigSource {
320 async fn load(&self) -> Result<String, PipelineError> {
321 let mut config_map = HashMap::new();
322
323 for (key, value) in std::env::vars() {
324 if key.starts_with(&self.prefix) {
325 let config_key = key.strip_prefix(&self.prefix).unwrap_or(&key);
326 config_map.insert(config_key.to_lowercase(), value);
327 }
328 }
329
330 serde_json::to_string(&config_map)
331 .map_err(|e| PipelineError::InternalError(format!("Failed to serialize env config: {}", e)))
332 }
333
334 async fn save(&self, _data: &str) -> Result<(), PipelineError> {
335 Err(PipelineError::InternalError(
336 "Cannot save to environment variables".to_string(),
337 ))
338 }
339
340 async fn exists(&self) -> bool {
341 std::env::vars().any(|(key, _)| key.starts_with(&self.prefix))
342 }
343
344 fn source_id(&self) -> String {
345 format!("env:{}", self.prefix)
346 }
347}
348
349/// Configuration change event
350#[derive(Debug, Clone, Serialize, Deserialize)]
351pub struct ConfigChangeEvent<T> {
352 pub config_type: String,
353 pub old_config: Option<T>,
354 pub new_config: T,
355 #[serde(with = "datetime_serde")]
356 pub changed_at: chrono::DateTime<chrono::Utc>,
357 pub change_reason: String,
358 pub changed_by: String,
359}
360
361/// Configuration change listener trait
362#[async_trait]
363pub trait ConfigChangeListener<T>: Send + Sync
364where
365 T: ConfigValidation + Serialize + DeserializeOwned,
366{
367 /// Called when configuration changes
368 async fn on_config_changed(&self, event: ConfigChangeEvent<T>) -> Result<(), PipelineError>;
369}
370
371/// Generic configuration manager providing centralized configuration management
372pub struct GenericConfigManager<T>
373where
374 T: ConfigValidation + Serialize + DeserializeOwned,
375{
376 config: RwLock<T>,
377 sources: Vec<Arc<dyn ConfigSource>>,
378 listeners: Vec<Arc<dyn ConfigChangeListener<T>>>,
379 change_history: RwLock<Vec<ConfigChangeEvent<T>>>,
380 auto_reload: bool,
381}
382
383impl<T> GenericConfigManager<T>
384where
385 T: ConfigValidation + Serialize + DeserializeOwned,
386{
387 /// Creates a new configuration manager with default configuration
388 pub fn new(default_config: T) -> Self {
389 Self {
390 config: RwLock::new(default_config),
391 sources: Vec::new(),
392 listeners: Vec::new(),
393 change_history: RwLock::new(Vec::new()),
394 auto_reload: false,
395 }
396 }
397
398 /// Adds a configuration source
399 pub fn add_source(mut self, source: Arc<dyn ConfigSource>) -> Self {
400 self.sources.push(source);
401 self
402 }
403
404 /// Adds a configuration change listener
405 pub fn add_listener(mut self, listener: Arc<dyn ConfigChangeListener<T>>) -> Self {
406 self.listeners.push(listener);
407 self
408 }
409
410 /// Enables or disables automatic configuration reloading
411 pub fn with_auto_reload(mut self, auto_reload: bool) -> Self {
412 self.auto_reload = auto_reload;
413 self
414 }
415
416 /// Gets the current configuration
417 pub fn get_config(&self) -> Result<T, PipelineError> {
418 self.config
419 .read()
420 .map_err(|e| PipelineError::InternalError(format!("Failed to read config: {}", e)))
421 .map(|config| config.clone())
422 }
423
424 /// Updates the configuration with validation
425 pub async fn update_config(
426 &self,
427 new_config: T,
428 change_reason: String,
429 changed_by: String,
430 ) -> Result<(), PipelineError> {
431 // Validate the new configuration
432 let validation_result = new_config.validate();
433 if !validation_result.is_valid {
434 let error_messages: Vec<String> = validation_result
435 .errors
436 .iter()
437 .map(|e| format!("{}: {}", e.field, e.message))
438 .collect();
439 return Err(PipelineError::InvalidConfiguration(format!(
440 "Configuration validation failed: {}",
441 error_messages.join(", ")
442 )));
443 }
444
445 // Get the old configuration for the change event
446 let old_config = self.get_config().ok();
447
448 // Update the configuration
449 {
450 let mut config = self
451 .config
452 .write()
453 .map_err(|e| PipelineError::InternalError(format!("Failed to write config: {}", e)))?;
454 *config = new_config.clone();
455 }
456
457 // Create change event
458 let change_event = ConfigChangeEvent {
459 config_type: std::any::type_name::<T>().to_string(),
460 old_config,
461 new_config: new_config.clone(),
462 changed_at: chrono::Utc::now(),
463 change_reason,
464 changed_by,
465 };
466
467 // Record the change
468 if let Ok(mut history) = self.change_history.write() {
469 history.push(change_event.clone());
470 // Keep only the last 100 changes
471 if history.len() > 100 {
472 history.remove(0);
473 }
474 }
475
476 // Notify listeners
477 for listener in &self.listeners {
478 if let Err(e) = listener.on_config_changed(change_event.clone()).await {
479 eprintln!("Config change listener error: {}", e);
480 }
481 }
482
483 Ok(())
484 }
485
486 /// Loads configuration from all sources in order
487 pub async fn load_from_sources(&self, changed_by: String) -> Result<(), PipelineError> {
488 let mut merged_config = None;
489
490 for source in &self.sources {
491 if source.exists().await {
492 let config_data = source.load().await?;
493 let config: T = serde_json::from_str(&config_data).map_err(|e| {
494 PipelineError::InternalError(format!("Failed to parse config from {}: {}", source.source_id(), e))
495 })?;
496
497 merged_config = Some(match merged_config {
498 Some(_existing) => {
499 // In a real implementation, you'd want a merge strategy
500 // For now, later sources override earlier ones
501 config
502 }
503 None => config,
504 });
505 }
506 }
507
508 if let Some(config) = merged_config {
509 self.update_config(config, "Loaded from sources".to_string(), changed_by)
510 .await?;
511 }
512
513 Ok(())
514 }
515
516 /// Saves current configuration to the first writable source
517 pub async fn save_to_source(&self) -> Result<(), PipelineError> {
518 let config = self.get_config()?;
519 let config_data = serde_json::to_string_pretty(&config)
520 .map_err(|e| PipelineError::InternalError(format!("Failed to serialize config: {}", e)))?;
521
522 for source in &self.sources {
523 if let Ok(()) = source.save(&config_data).await {
524 return Ok(());
525 }
526 }
527
528 Err(PipelineError::InternalError(
529 "No writable configuration source available".to_string(),
530 ))
531 }
532
533 /// Gets configuration change history
534 pub fn get_change_history(&self) -> Vec<ConfigChangeEvent<T>> {
535 self.change_history
536 .read()
537 .map(|history| history.clone())
538 .unwrap_or_default()
539 }
540
541 /// Validates current configuration
542 pub fn validate_current_config(&self) -> Result<ConfigValidationResult, PipelineError> {
543 let config = self.get_config()?;
544 Ok(config.validate())
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551
552 #[derive(Clone, Debug, Serialize, Deserialize)]
553 struct TestConfig {
554 database_url: String,
555 max_connections: u32,
556 timeout_seconds: u64,
557 features: Vec<String>,
558 }
559
560 impl ConfigValidation for TestConfig {
561 fn validate(&self) -> ConfigValidationResult {
562 let mut result = ConfigValidationResult::valid();
563
564 if self.database_url.is_empty() {
565 result.add_error("database_url".to_string(), "Database URL cannot be empty".to_string());
566 }
567
568 if self.max_connections == 0 {
569 result.add_error(
570 "max_connections".to_string(),
571 "Max connections must be greater than 0".to_string(),
572 );
573 }
574
575 if self.max_connections > 1000 {
576 result.add_warning(
577 "max_connections".to_string(),
578 "Very high connection count may impact performance".to_string(),
579 );
580 }
581
582 if self.timeout_seconds == 0 {
583 result.add_error(
584 "timeout_seconds".to_string(),
585 "Timeout must be greater than 0".to_string(),
586 );
587 }
588
589 result
590 }
591
592 fn schema_version(&self) -> String {
593 "1.0.0".to_string()
594 }
595
596 fn migrate_from_version(&self, _from_version: &str, _data: &str) -> Result<Self, PipelineError> {
597 // Simple migration - just return self for now
598 Ok(self.clone())
599 }
600 }
601
602 impl Default for TestConfig {
603 fn default() -> Self {
604 Self {
605 database_url: "postgresql://localhost:5432/test".to_string(),
606 max_connections: 10,
607 timeout_seconds: 30,
608 features: vec!["feature1".to_string(), "feature2".to_string()],
609 }
610 }
611 }
612
613 #[tokio::test]
614 async fn test_config_manager_creation() {
615 let config_manager = GenericConfigManager::new(TestConfig::default());
616 let config = config_manager.get_config().unwrap();
617
618 assert_eq!(config.database_url, "postgresql://localhost:5432/test");
619 assert_eq!(config.max_connections, 10);
620 }
621
622 #[tokio::test]
623 async fn test_config_validation() {
624 let config_manager = GenericConfigManager::new(TestConfig::default());
625
626 let invalid_config = TestConfig {
627 database_url: "".to_string(),
628 max_connections: 0,
629 timeout_seconds: 0,
630 features: vec![],
631 };
632
633 let result = config_manager
634 .update_config(invalid_config, "Test update".to_string(), "test_user".to_string())
635 .await;
636
637 assert!(result.is_err());
638 }
639
640 #[tokio::test]
641 async fn test_config_change_history() {
642 let config_manager = GenericConfigManager::new(TestConfig::default());
643
644 let new_config = TestConfig {
645 database_url: "postgresql://localhost:5432/newdb".to_string(),
646 max_connections: 20,
647 timeout_seconds: 60,
648 features: vec!["new_feature".to_string()],
649 };
650
651 config_manager
652 .update_config(new_config, "Updated for testing".to_string(), "test_user".to_string())
653 .await
654 .unwrap();
655
656 let history = config_manager.get_change_history();
657 assert_eq!(history.len(), 1);
658 assert_eq!(history[0].change_reason, "Updated for testing");
659 assert_eq!(history[0].changed_by, "test_user");
660 }
661
662 /// Tests configuration validation result management and state tracking.
663 ///
664 /// This test validates that configuration validation results can
665 /// properly track validation state, errors, and warnings for
666 /// comprehensive configuration validation reporting.
667 ///
668 /// # Test Coverage
669 ///
670 /// - Valid validation result creation
671 /// - Initial state validation (valid, no errors)
672 /// - Error addition and state change
673 /// - Validation state update on error
674 /// - Warning addition functionality
675 /// - Error and warning collection management
676 ///
677 /// # Test Scenario
678 ///
679 /// Creates a valid validation result, adds errors and warnings,
680 /// and verifies that state and collections are managed correctly.
681 ///
682 /// # Domain Concerns
683 ///
684 /// - Configuration validation reporting
685 /// - Validation state management
686 /// - Error and warning collection
687 /// - Validation result aggregation
688 ///
689 /// # Assertions
690 ///
691 /// - Initial result is valid with no errors
692 /// - Adding error changes validity to false
693 /// - Error collection is updated correctly
694 /// - Warning collection is managed properly
695 /// - State tracking works correctly
696 #[test]
697 fn test_validation_result() {
698 let mut result = ConfigValidationResult::valid();
699 assert!(result.is_valid);
700 assert!(result.errors.is_empty());
701
702 result.add_error("field1".to_string(), "Error message".to_string());
703 assert!(!result.is_valid);
704 assert_eq!(result.errors.len(), 1);
705
706 result.add_warning("field2".to_string(), "Warning message".to_string());
707 assert_eq!(result.warnings.len(), 1);
708 }
709}