use super::{
hierarchy::{AppConfig, ConfigHierarchy, ConfigValidationError, PipelineConfig},
persistence::ConfigLoadError,
};
use std::{
sync::{Arc, RwLock},
time::Instant,
};
use tokio::sync::broadcast;
pub struct DynamicConfigManager {
config: Arc<RwLock<AppConfig>>,
change_notifier: broadcast::Sender<ConfigChangeEvent>,
validators: Vec<Box<dyn ConfigValidator + Send + Sync>>,
history: Arc<RwLock<ConfigHistory>>,
}
impl DynamicConfigManager {
pub fn new(initial_config: AppConfig) -> Self {
let (change_notifier, _) = broadcast::channel(100);
Self {
config: Arc::new(RwLock::new(initial_config)),
change_notifier,
validators: Vec::new(),
history: Arc::new(RwLock::new(ConfigHistory::new())),
}
}
pub fn add_validator<V: ConfigValidator + Send + Sync + 'static>(
mut self,
validator: V,
) -> Self {
self.validators.push(Box::new(validator));
self
}
pub fn get_config(&self) -> AppConfig {
self.config
.read()
.map(|cfg| cfg.clone())
.unwrap_or_default()
}
pub async fn update_config(&self, new_config: AppConfig) -> Result<(), ConfigUpdateError> {
self.validate_config(&new_config).await?;
let previous_config = {
let current = self
.config
.read()
.map_err(|e| ConfigUpdateError::LockError(format!("Failed to read config: {e}")))?;
current.clone()
};
{
let mut config = self.config.write().map_err(|e| {
ConfigUpdateError::LockError(format!("Failed to write config: {e}"))
})?;
*config = new_config.clone();
}
{
let mut history = self.history.write().map_err(|e| {
ConfigUpdateError::LockError(format!("Failed to write history: {e}"))
})?;
history.record_change(previous_config.clone(), new_config.clone());
}
let event = ConfigChangeEvent {
timestamp: Instant::now(),
change_type: ConfigChangeType::Update,
previous: Some(previous_config),
current: new_config,
};
let _ = self.change_notifier.send(event);
Ok(())
}
pub async fn merge_config(&self, config_update: AppConfig) -> Result<(), ConfigUpdateError> {
let mut new_config = self.get_config();
new_config.merge_with(&config_update);
self.update_config(new_config).await
}
pub async fn update_pipeline_config(
&self,
pipeline_config: PipelineConfig,
) -> Result<(), ConfigUpdateError> {
let mut new_config = self.get_config();
new_config.pipeline = pipeline_config;
self.update_config(new_config).await
}
pub async fn rollback(&self) -> Result<(), ConfigUpdateError> {
let previous_config = {
let history = self.history.read().expect("lock should not be poisoned");
history
.get_previous()
.ok_or(ConfigUpdateError::NoHistory)?
.clone()
};
self.update_config(previous_config).await
}
pub fn subscribe_to_changes(&self) -> broadcast::Receiver<ConfigChangeEvent> {
self.change_notifier.subscribe()
}
async fn validate_config(&self, config: &AppConfig) -> Result<(), ConfigUpdateError> {
config
.validate()
.map_err(ConfigUpdateError::ValidationError)?;
for validator in &self.validators {
validator
.validate(config)
.map_err(ConfigUpdateError::CustomValidation)?;
}
Ok(())
}
pub fn get_history(&self) -> Vec<ConfigHistoryEntry> {
let history = self.history.read().expect("lock should not be poisoned");
history.entries.clone()
}
pub fn clear_history(&self) {
let mut history = self.history.write().expect("lock should not be poisoned");
history.clear();
}
pub fn export_config(&self) -> AppConfig {
self.get_config()
}
pub async fn import_config(&self, config: AppConfig) -> Result<(), ConfigUpdateError> {
self.update_config(config).await
}
}
#[derive(Debug, Clone)]
pub struct ConfigChangeEvent {
pub timestamp: Instant,
pub change_type: ConfigChangeType,
pub previous: Option<AppConfig>,
pub current: AppConfig,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConfigChangeType {
Update,
Reload,
Rollback,
Migration,
}
pub trait ConfigValidator {
fn validate(&self, config: &AppConfig) -> Result<(), String>;
}
pub trait ConfigMigrator {
fn needs_migration(&self, config: &AppConfig) -> bool;
fn migrate(&self, config: AppConfig) -> Result<AppConfig, Box<dyn std::error::Error>>;
}
#[derive(Debug, Clone)]
struct ConfigHistory {
entries: Vec<ConfigHistoryEntry>,
max_entries: usize,
}
impl ConfigHistory {
fn new() -> Self {
Self {
entries: Vec::new(),
max_entries: 50, }
}
fn record_change(&mut self, previous: AppConfig, current: AppConfig) {
let entry = ConfigHistoryEntry {
timestamp: Instant::now(),
previous,
current,
};
self.entries.push(entry);
if self.entries.len() > self.max_entries {
self.entries.remove(0);
}
}
fn get_previous(&self) -> Option<&AppConfig> {
self.entries.last().map(|entry| &entry.previous)
}
fn clear(&mut self) {
self.entries.clear();
}
}
#[derive(Debug, Clone)]
pub struct ConfigHistoryEntry {
pub timestamp: Instant,
pub previous: AppConfig,
pub current: AppConfig,
}
#[derive(Debug, thiserror::Error)]
pub enum ConfigUpdateError {
#[error("Configuration validation failed: {0}")]
ValidationError(#[from] ConfigValidationError),
#[error("Custom validation failed: {0}")]
CustomValidation(String),
#[error("Configuration loading failed: {0}")]
LoadError(#[from] ConfigLoadError),
#[error("Lock error: {0}")]
LockError(String),
#[error("No configuration history available for rollback")]
NoHistory,
}
pub struct ResourceValidator {
max_cache_size_mb: u32,
max_threads: usize,
}
impl ResourceValidator {
pub fn new(max_cache_size_mb: u32, max_threads: usize) -> Self {
Self {
max_cache_size_mb,
max_threads,
}
}
}
impl ConfigValidator for ResourceValidator {
fn validate(&self, config: &AppConfig) -> Result<(), String> {
if config.pipeline.max_cache_size_mb > self.max_cache_size_mb {
return Err(format!(
"Cache size {} MB exceeds maximum allowed {} MB",
config.pipeline.max_cache_size_mb, self.max_cache_size_mb
));
}
if let Some(threads) = config.pipeline.num_threads {
if threads > self.max_threads {
return Err(format!(
"Thread count {} exceeds maximum allowed {}",
threads, self.max_threads
));
}
}
Ok(())
}
}
pub struct DeviceValidator;
impl ConfigValidator for DeviceValidator {
fn validate(&self, config: &AppConfig) -> Result<(), String> {
if config.pipeline.use_gpu && config.pipeline.device == "cpu" {
return Err("Cannot enable GPU acceleration with CPU device".to_string());
}
match config.pipeline.device.as_str() {
"cpu" => Ok(()),
"cuda" => {
if !Self::is_cuda_available() {
Err("CUDA device requested but not available".to_string())
} else {
Ok(())
}
}
"metal" => {
#[cfg(target_os = "macos")]
{
Ok(()) }
#[cfg(not(target_os = "macos"))]
{
Err("Metal device not available on this platform".to_string())
}
}
_ => Err(format!("Unknown device: {}", config.pipeline.device)),
}
}
}
impl DeviceValidator {
fn is_cuda_available() -> bool {
false }
}
pub struct ConfigManagement;
impl ConfigManagement {
pub fn create_manager(config: AppConfig) -> DynamicConfigManager {
DynamicConfigManager::new(config)
.add_validator(ResourceValidator::new(10_000, 64)) .add_validator(DeviceValidator)
}
pub async fn validate_config(config: &AppConfig) -> Result<(), ConfigUpdateError> {
let manager = Self::create_manager(AppConfig::default());
manager.validate_config(config).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_config_update() {
let initial_config = AppConfig::default();
let manager = DynamicConfigManager::new(initial_config.clone());
let mut new_config = initial_config;
new_config.pipeline.device = "cuda".to_string();
assert!(manager.update_config(new_config).await.is_ok());
let updated = manager.get_config();
assert_eq!(updated.pipeline.device, "cuda");
}
#[tokio::test]
async fn test_config_validation() {
let manager = DynamicConfigManager::new(AppConfig::default())
.add_validator(ResourceValidator::new(1000, 8));
let mut invalid_config = AppConfig::default();
invalid_config.pipeline.max_cache_size_mb = 2000;
let result = manager.update_config(invalid_config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_config_rollback() {
let initial_config = AppConfig::default();
let manager = DynamicConfigManager::new(initial_config.clone());
let mut new_config = initial_config.clone();
new_config.pipeline.device = "cuda".to_string();
manager.update_config(new_config).await.unwrap();
manager.rollback().await.unwrap();
let current = manager.get_config();
assert_eq!(current.pipeline.device, initial_config.pipeline.device);
}
#[tokio::test]
async fn test_config_merge() {
let manager = DynamicConfigManager::new(AppConfig::default());
let mut update = AppConfig::default();
update.pipeline.device = "cuda".to_string();
update.pipeline.use_gpu = true;
manager.merge_config(update).await.unwrap();
let current = manager.get_config();
assert_eq!(current.pipeline.device, "cuda");
assert!(current.pipeline.use_gpu);
assert_eq!(current.pipeline.max_cache_size_mb, 1024);
}
#[test]
fn test_device_validator() {
let validator = DeviceValidator;
let mut config = AppConfig::default();
config.pipeline.device = "cpu".to_string();
config.pipeline.use_gpu = false;
assert!(validator.validate(&config).is_ok());
config.pipeline.use_gpu = true; assert!(validator.validate(&config).is_err());
}
#[test]
fn test_resource_validator() {
let validator = ResourceValidator::new(1000, 8);
let mut config = AppConfig::default();
config.pipeline.max_cache_size_mb = 500;
config.pipeline.num_threads = Some(4);
assert!(validator.validate(&config).is_ok());
config.pipeline.max_cache_size_mb = 2000; assert!(validator.validate(&config).is_err());
config.pipeline.max_cache_size_mb = 500;
config.pipeline.num_threads = Some(16); assert!(validator.validate(&config).is_err());
}
#[test]
fn test_config_history() {
let mut history = ConfigHistory::new();
let config1 = AppConfig::default();
let mut config2 = AppConfig::default();
config2.pipeline.device = "cuda".to_string();
history.record_change(config1.clone(), config2.clone());
assert!(history.get_previous().is_some());
assert_eq!(history.get_previous().unwrap().pipeline.device, "cpu");
}
#[tokio::test]
async fn test_change_notifications() {
let manager = DynamicConfigManager::new(AppConfig::default());
let mut receiver = manager.subscribe_to_changes();
let manager_clone = manager;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
let mut new_config = AppConfig::default();
new_config.pipeline.device = "cuda".to_string();
let _ = manager_clone.update_config(new_config).await;
});
let event = receiver.recv().await.unwrap();
assert_eq!(event.change_type, ConfigChangeType::Update);
assert_eq!(event.current.pipeline.device, "cuda");
}
}