distributed_config/
manager.rs

1//! Core configuration manager implementation
2
3use crate::error::{ConfigError, Result};
4use crate::sources::ConfigSource;
5use crate::validation::SchemaValidator;
6use crate::value::ConfigValue;
7use crate::watcher::{ConfigChange, ConfigWatcher};
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::SystemTime;
14use tokio::sync::{broadcast, RwLock as AsyncRwLock};
15use tracing::{debug, info, warn};
16
17/// The main configuration manager
18pub struct ConfigManager {
19    /// Current configuration data
20    config: Arc<AsyncRwLock<ConfigValue>>,
21
22    /// Configuration sources with priorities
23    sources: Arc<RwLock<Vec<(Box<dyn ConfigSource>, u32)>>>,
24
25    /// Schema validator
26    validator: Arc<RwLock<Option<SchemaValidator>>>,
27
28    /// Change notification broadcaster
29    change_broadcaster: broadcast::Sender<ConfigChange>,
30
31    /// Configuration history
32    history: Arc<DashMap<String, Vec<HistoryEntry>>>,
33
34    /// Feature flags cache
35    feature_flags: Arc<DashMap<String, bool>>,
36
37    /// Node-specific configurations
38    node_configs: Arc<DashMap<String, ConfigValue>>,
39
40    /// Active watchers
41    watchers: Arc<DashMap<String, tokio::task::JoinHandle<()>>>,
42}
43
44/// A configuration history entry
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct HistoryEntry {
47    pub timestamp: SystemTime,
48    pub value: ConfigValue,
49    pub changed_by: String,
50    pub change_type: String,
51}
52
53impl ConfigManager {
54    /// Create a new configuration manager
55    pub fn new() -> Self {
56        let (change_broadcaster, _) = broadcast::channel(1000);
57
58        Self {
59            config: Arc::new(AsyncRwLock::new(ConfigValue::Object(HashMap::new()))),
60            sources: Arc::new(RwLock::new(Vec::new())),
61            validator: Arc::new(RwLock::new(None)),
62            change_broadcaster,
63            history: Arc::new(DashMap::new()),
64            feature_flags: Arc::new(DashMap::new()),
65            node_configs: Arc::new(DashMap::new()),
66            watchers: Arc::new(DashMap::new()),
67        }
68    }
69
70    /// Add a configuration source with priority (higher number = higher priority)
71    pub fn add_source(&mut self, source: impl ConfigSource + 'static, priority: u32) {
72        let mut sources = self.sources.write();
73        sources.push((Box::new(source), priority));
74        sources.sort_by(|a, b| a.1.cmp(&b.1)); // Sort by priority
75        info!("Added configuration source with priority {}", priority);
76    }
77
78    /// Set the schema validator
79    pub fn set_validator(&mut self, validator: SchemaValidator) {
80        *self.validator.write() = Some(validator);
81        info!("Schema validator configured");
82    }
83
84    /// Initialize the configuration manager by loading from all sources
85    pub async fn initialize(&self) -> Result<()> {
86        info!("Initializing configuration manager");
87
88        let sources_len = self.sources.read().len();
89        if sources_len == 0 {
90            warn!("No configuration sources configured");
91            return Ok(());
92        }
93
94        // Load from all sources in priority order
95        let merged_config = ConfigValue::Object(HashMap::new());
96
97        // We can't clone the sources, so we'll access them one by one
98        // This is a simplified approach - in production you might want a different strategy
99
100        // For now, just create an empty config - the sources would be loaded by the calling code
101        // TODO: Implement proper source loading without cloning Box<dyn ConfigSource>
102
103        // Validate if validator is configured
104        if let Some(validator) = self.validator.read().as_ref() {
105            validator.validate(&merged_config)?;
106        }
107
108        // Update the configuration
109        {
110            let mut config = self.config.write().await;
111            *config = merged_config.clone();
112        }
113
114        // Extract feature flags
115        self.extract_feature_flags(&merged_config).await;
116
117        // Start watching for changes from sources that support it
118        self.start_source_watchers().await?;
119
120        info!("Configuration manager initialized successfully");
121        Ok(())
122    }
123
124    /// Get a typed configuration value
125    pub async fn get<T>(&self, key: &str) -> Result<T>
126    where
127        T: for<'de> Deserialize<'de>,
128    {
129        let config = self.config.read().await;
130
131        let value = config
132            .get_path(key)
133            .ok_or_else(|| ConfigError::KeyNotFound {
134                key: key.to_string(),
135            })?;
136
137        let json_value = serde_json::to_value(value)?;
138        let typed_value = T::deserialize(json_value)?;
139
140        Ok(typed_value)
141    }
142
143    /// Get a configuration value
144    pub async fn get_value(&self, key: &str) -> Result<ConfigValue> {
145        let config = self.config.read().await;
146
147        config
148            .get_path(key)
149            .cloned()
150            .ok_or_else(|| ConfigError::KeyNotFound {
151                key: key.to_string(),
152            })
153    }
154
155    /// Get a configuration value for a specific node
156    pub async fn get_value_for_node(&self, key: &str, node_id: &str) -> Result<ConfigValue> {
157        // First check node-specific configuration
158        if let Some(node_config) = self.node_configs.get(node_id) {
159            if let Some(value) = node_config.get_path(key) {
160                return Ok(value.clone());
161            }
162        }
163
164        // Fall back to global configuration
165        self.get_value(key).await
166    }
167
168    /// Set a configuration value
169    pub async fn set_value(&self, key: &str, value: ConfigValue) -> Result<()> {
170        self.set_value_internal(key, value, "manual".to_string())
171            .await
172    }
173
174    /// Set a configuration value for the entire cluster
175    pub async fn set_value_for_cluster(&self, key: &str, value: ConfigValue) -> Result<()> {
176        // TODO: Implement cluster-wide configuration distribution
177        // For now, just set locally
178        self.set_value_internal(key, value, "cluster".to_string())
179            .await
180    }
181
182    /// Internal method to set a configuration value
183    async fn set_value_internal(
184        &self,
185        key: &str,
186        value: ConfigValue,
187        changed_by: String,
188    ) -> Result<()> {
189        let old_value = {
190            let config = self.config.read().await;
191            config.get_path(key).cloned()
192        };
193
194        // Update the configuration
195        {
196            let mut config = self.config.write().await;
197            config.set_path(key, value.clone())?;
198        }
199
200        // Validate if validator is configured
201        if let Some(validator) = self.validator.read().as_ref() {
202            let config = self.config.read().await;
203            if let Err(e) = validator.validate(&config) {
204                // Rollback on validation failure
205                if let Some(old_val) = old_value {
206                    let mut config = self.config.write().await;
207                    config.set_path(key, old_val)?;
208                }
209                return Err(e);
210            }
211        }
212
213        // Record in history
214        self.add_to_history(key, value.clone(), changed_by.clone())
215            .await;
216
217        // Update feature flags if this is a feature flag
218        if key.starts_with("feature_flags.") || key.contains(".feature_flags.") {
219            self.update_feature_flag_cache(key, &value).await;
220        }
221
222        // Notify watchers
223        let change = ConfigChange {
224            key: key.to_string(),
225            old_value,
226            new_value: value.clone(),
227            timestamp: SystemTime::now(),
228            changed_by,
229        };
230
231        if let Err(e) = self.change_broadcaster.send(change) {
232            warn!("Failed to broadcast configuration change: {}", e);
233        }
234
235        debug!("Configuration value set: {} = {:?}", key, value);
236        Ok(())
237    }
238
239    /// Check if a feature flag is enabled
240    pub async fn is_feature_enabled(&self, flag_name: &str) -> Result<bool> {
241        if let Some(enabled) = self.feature_flags.get(flag_name) {
242            Ok(*enabled)
243        } else {
244            // Try to get from configuration
245            let full_key = format!("feature_flags.{flag_name}");
246            match self.get_value(&full_key).await {
247                Ok(value) => value.as_bool(),
248                Err(_) => Ok(false), // Default to disabled
249            }
250        }
251    }
252
253    /// Watch for configuration changes
254    pub async fn watch(&self, key_pattern: &str) -> Result<ConfigWatcher> {
255        let receiver = self.change_broadcaster.subscribe();
256        Ok(ConfigWatcher::new(key_pattern.to_string(), receiver))
257    }
258
259    /// Save current configuration to a file
260    pub async fn save_to_file(&self, path: &str) -> Result<()> {
261        let config = self.config.read().await;
262        let json_value = serde_json::to_value(&*config)?;
263        let yaml_content = serde_yaml::to_string(&json_value)?;
264
265        tokio::fs::write(path, yaml_content).await?;
266        info!("Configuration saved to: {}", path);
267        Ok(())
268    }
269
270    /// Get configuration history for a key
271    pub async fn get_history(&self, key: &str, limit: usize) -> Result<Vec<HistoryEntry>> {
272        if let Some(history) = self.history.get(key) {
273            let mut entries = history.clone();
274            entries.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); // Most recent first
275            entries.truncate(limit);
276            Ok(entries)
277        } else {
278            Ok(Vec::new())
279        }
280    }
281
282    /// Extract feature flags from configuration
283    async fn extract_feature_flags(&self, config: &ConfigValue) {
284        if let Some(flags_obj) = config.get_path("feature_flags") {
285            if let Ok(flags_map) = flags_obj.as_object() {
286                self.feature_flags.clear();
287                for (flag_name, flag_value) in flags_map {
288                    if let Ok(enabled) = flag_value.as_bool() {
289                        self.feature_flags.insert(flag_name.clone(), enabled);
290                    }
291                }
292                debug!("Extracted {} feature flags", flags_map.len());
293            }
294        }
295    }
296
297    /// Update feature flag cache
298    async fn update_feature_flag_cache(&self, key: &str, value: &ConfigValue) {
299        if let Some(flag_name) = key.strip_prefix("feature_flags.") {
300            if let Ok(enabled) = value.as_bool() {
301                self.feature_flags.insert(flag_name.to_string(), enabled);
302            }
303        } else if key.contains(".feature_flags.") {
304            // Handle nested feature flags
305            let parts: Vec<&str> = key.split('.').collect();
306            if let Some(flag_idx) = parts.iter().position(|&part| part == "feature_flags") {
307                if flag_idx + 1 < parts.len() {
308                    let flag_name = parts[flag_idx + 1..].join(".");
309                    if let Ok(enabled) = value.as_bool() {
310                        self.feature_flags.insert(flag_name, enabled);
311                    }
312                }
313            }
314        }
315    }
316
317    /// Add an entry to configuration history
318    async fn add_to_history(&self, key: &str, value: ConfigValue, changed_by: String) {
319        let entry = HistoryEntry {
320            timestamp: SystemTime::now(),
321            value,
322            changed_by,
323            change_type: "update".to_string(),
324        };
325
326        self.history
327            .entry(key.to_string())
328            .or_default()
329            .push(entry);
330
331        // Limit history size per key
332        const MAX_HISTORY_PER_KEY: usize = 100;
333        if let Some(mut history) = self.history.get_mut(key) {
334            let history_len = history.len();
335            if history_len > MAX_HISTORY_PER_KEY {
336                history.drain(0..history_len - MAX_HISTORY_PER_KEY);
337            }
338        }
339    }
340
341    /// Start watchers for sources that support watching
342    async fn start_source_watchers(&self) -> Result<()> {
343        // TODO: Implement source watching without cloning Box<dyn ConfigSource>
344        // This requires a different architecture, possibly using Arc<dyn ConfigSource>
345
346        Ok(())
347    }
348}
349
350impl Default for ConfigManager {
351    fn default() -> Self {
352        Self::new()
353    }
354}
355
356impl Drop for ConfigManager {
357    fn drop(&mut self) {
358        // Cancel all active watchers
359        for entry in self.watchers.iter() {
360            entry.value().abort();
361        }
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368    use crate::sources::FileSource;
369    use tempfile::NamedTempFile;
370
371    #[tokio::test]
372    #[ignore] // TODO: Fix temp file loading issue
373    async fn test_config_manager_basic() {
374        let temp_file = NamedTempFile::with_suffix(".json").unwrap();
375        let content = r#"{"key": "value", "number": 42}"#;
376        std::fs::write(temp_file.path(), content).unwrap();
377
378        let mut manager = ConfigManager::new();
379        let source = FileSource::new().add_file(temp_file.path(), None);
380        manager.add_source(source, 10);
381
382        manager.initialize().await.unwrap();
383
384        let value: String = manager.get("key").await.unwrap();
385        assert_eq!(value, "value");
386
387        let number: i64 = manager.get("number").await.unwrap();
388        assert_eq!(number, 42);
389    }
390
391    #[tokio::test]
392    async fn test_config_manager_set_value() {
393        let manager = ConfigManager::new();
394        manager.initialize().await.unwrap();
395
396        manager
397            .set_value("test.key", ConfigValue::String("test_value".to_string()))
398            .await
399            .unwrap();
400
401        let value = manager.get_value("test.key").await.unwrap();
402        assert_eq!(value.as_string().unwrap(), "test_value");
403    }
404
405    #[tokio::test]
406    #[ignore] // TODO: Fix temp file loading issue
407    async fn test_feature_flags() {
408        let temp_file = NamedTempFile::with_suffix(".json").unwrap();
409        let content = r#"{"feature_flags": {"new_ui": true, "beta_feature": false}}"#;
410        std::fs::write(temp_file.path(), content).unwrap();
411
412        let mut manager = ConfigManager::new();
413        let source = FileSource::new().add_file(temp_file.path(), None);
414        manager.add_source(source, 10);
415
416        manager.initialize().await.unwrap();
417
418        assert!(manager.is_feature_enabled("new_ui").await.unwrap());
419        assert!(
420            !manager.is_feature_enabled("beta_feature").await.unwrap()
421        );
422        assert!(
423            !manager.is_feature_enabled("nonexistent").await.unwrap()
424        );
425    }
426
427    #[tokio::test]
428    async fn test_configuration_history() {
429        let manager = ConfigManager::new();
430        manager.initialize().await.unwrap();
431
432        manager
433            .set_value("test.key", ConfigValue::String("value1".to_string()))
434            .await
435            .unwrap();
436        manager
437            .set_value("test.key", ConfigValue::String("value2".to_string()))
438            .await
439            .unwrap();
440
441        let history = manager.get_history("test.key", 10).await.unwrap();
442        assert_eq!(history.len(), 2);
443        assert_eq!(history[0].value.as_string().unwrap(), "value2"); // Most recent first
444        assert_eq!(history[1].value.as_string().unwrap(), "value1");
445    }
446}