1use crate::error::{ConfigError, Result};
4use crate::sources::ConfigSource;
5use crate::validation::SchemaValidator;
6use crate::value::ConfigValue;
7use crate::watcher::{ConfigChange, ConfigWatcher};
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::SystemTime;
14use tokio::sync::{broadcast, RwLock as AsyncRwLock};
15use tracing::{debug, info, warn};
16
17pub struct ConfigManager {
19 config: Arc<AsyncRwLock<ConfigValue>>,
21
22 sources: Arc<RwLock<Vec<(Box<dyn ConfigSource>, u32)>>>,
24
25 validator: Arc<RwLock<Option<SchemaValidator>>>,
27
28 change_broadcaster: broadcast::Sender<ConfigChange>,
30
31 history: Arc<DashMap<String, Vec<HistoryEntry>>>,
33
34 feature_flags: Arc<DashMap<String, bool>>,
36
37 node_configs: Arc<DashMap<String, ConfigValue>>,
39
40 watchers: Arc<DashMap<String, tokio::task::JoinHandle<()>>>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct HistoryEntry {
47 pub timestamp: SystemTime,
48 pub value: ConfigValue,
49 pub changed_by: String,
50 pub change_type: String,
51}
52
53impl ConfigManager {
54 pub fn new() -> Self {
56 let (change_broadcaster, _) = broadcast::channel(1000);
57
58 Self {
59 config: Arc::new(AsyncRwLock::new(ConfigValue::Object(HashMap::new()))),
60 sources: Arc::new(RwLock::new(Vec::new())),
61 validator: Arc::new(RwLock::new(None)),
62 change_broadcaster,
63 history: Arc::new(DashMap::new()),
64 feature_flags: Arc::new(DashMap::new()),
65 node_configs: Arc::new(DashMap::new()),
66 watchers: Arc::new(DashMap::new()),
67 }
68 }
69
70 pub fn add_source(&mut self, source: impl ConfigSource + 'static, priority: u32) {
72 let mut sources = self.sources.write();
73 sources.push((Box::new(source), priority));
74 sources.sort_by(|a, b| a.1.cmp(&b.1)); info!("Added configuration source with priority {}", priority);
76 }
77
78 pub fn set_validator(&mut self, validator: SchemaValidator) {
80 *self.validator.write() = Some(validator);
81 info!("Schema validator configured");
82 }
83
84 pub async fn initialize(&self) -> Result<()> {
86 info!("Initializing configuration manager");
87
88 let sources_len = self.sources.read().len();
89 if sources_len == 0 {
90 warn!("No configuration sources configured");
91 return Ok(());
92 }
93
94 let merged_config = ConfigValue::Object(HashMap::new());
96
97 if let Some(validator) = self.validator.read().as_ref() {
105 validator.validate(&merged_config)?;
106 }
107
108 {
110 let mut config = self.config.write().await;
111 *config = merged_config.clone();
112 }
113
114 self.extract_feature_flags(&merged_config).await;
116
117 self.start_source_watchers().await?;
119
120 info!("Configuration manager initialized successfully");
121 Ok(())
122 }
123
124 pub async fn get<T>(&self, key: &str) -> Result<T>
126 where
127 T: for<'de> Deserialize<'de>,
128 {
129 let config = self.config.read().await;
130
131 let value = config
132 .get_path(key)
133 .ok_or_else(|| ConfigError::KeyNotFound {
134 key: key.to_string(),
135 })?;
136
137 let json_value = serde_json::to_value(value)?;
138 let typed_value = T::deserialize(json_value)?;
139
140 Ok(typed_value)
141 }
142
143 pub async fn get_value(&self, key: &str) -> Result<ConfigValue> {
145 let config = self.config.read().await;
146
147 config
148 .get_path(key)
149 .cloned()
150 .ok_or_else(|| ConfigError::KeyNotFound {
151 key: key.to_string(),
152 })
153 }
154
155 pub async fn get_value_for_node(&self, key: &str, node_id: &str) -> Result<ConfigValue> {
157 if let Some(node_config) = self.node_configs.get(node_id) {
159 if let Some(value) = node_config.get_path(key) {
160 return Ok(value.clone());
161 }
162 }
163
164 self.get_value(key).await
166 }
167
168 pub async fn set_value(&self, key: &str, value: ConfigValue) -> Result<()> {
170 self.set_value_internal(key, value, "manual".to_string())
171 .await
172 }
173
174 pub async fn set_value_for_cluster(&self, key: &str, value: ConfigValue) -> Result<()> {
176 self.set_value_internal(key, value, "cluster".to_string())
179 .await
180 }
181
182 async fn set_value_internal(
184 &self,
185 key: &str,
186 value: ConfigValue,
187 changed_by: String,
188 ) -> Result<()> {
189 let old_value = {
190 let config = self.config.read().await;
191 config.get_path(key).cloned()
192 };
193
194 {
196 let mut config = self.config.write().await;
197 config.set_path(key, value.clone())?;
198 }
199
200 if let Some(validator) = self.validator.read().as_ref() {
202 let config = self.config.read().await;
203 if let Err(e) = validator.validate(&config) {
204 if let Some(old_val) = old_value {
206 let mut config = self.config.write().await;
207 config.set_path(key, old_val)?;
208 }
209 return Err(e);
210 }
211 }
212
213 self.add_to_history(key, value.clone(), changed_by.clone())
215 .await;
216
217 if key.starts_with("feature_flags.") || key.contains(".feature_flags.") {
219 self.update_feature_flag_cache(key, &value).await;
220 }
221
222 let change = ConfigChange {
224 key: key.to_string(),
225 old_value,
226 new_value: value.clone(),
227 timestamp: SystemTime::now(),
228 changed_by,
229 };
230
231 if let Err(e) = self.change_broadcaster.send(change) {
232 warn!("Failed to broadcast configuration change: {}", e);
233 }
234
235 debug!("Configuration value set: {} = {:?}", key, value);
236 Ok(())
237 }
238
239 pub async fn is_feature_enabled(&self, flag_name: &str) -> Result<bool> {
241 if let Some(enabled) = self.feature_flags.get(flag_name) {
242 Ok(*enabled)
243 } else {
244 let full_key = format!("feature_flags.{flag_name}");
246 match self.get_value(&full_key).await {
247 Ok(value) => value.as_bool(),
248 Err(_) => Ok(false), }
250 }
251 }
252
253 pub async fn watch(&self, key_pattern: &str) -> Result<ConfigWatcher> {
255 let receiver = self.change_broadcaster.subscribe();
256 Ok(ConfigWatcher::new(key_pattern.to_string(), receiver))
257 }
258
259 pub async fn save_to_file(&self, path: &str) -> Result<()> {
261 let config = self.config.read().await;
262 let json_value = serde_json::to_value(&*config)?;
263 let yaml_content = serde_yaml::to_string(&json_value)?;
264
265 tokio::fs::write(path, yaml_content).await?;
266 info!("Configuration saved to: {}", path);
267 Ok(())
268 }
269
270 pub async fn get_history(&self, key: &str, limit: usize) -> Result<Vec<HistoryEntry>> {
272 if let Some(history) = self.history.get(key) {
273 let mut entries = history.clone();
274 entries.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); entries.truncate(limit);
276 Ok(entries)
277 } else {
278 Ok(Vec::new())
279 }
280 }
281
282 async fn extract_feature_flags(&self, config: &ConfigValue) {
284 if let Some(flags_obj) = config.get_path("feature_flags") {
285 if let Ok(flags_map) = flags_obj.as_object() {
286 self.feature_flags.clear();
287 for (flag_name, flag_value) in flags_map {
288 if let Ok(enabled) = flag_value.as_bool() {
289 self.feature_flags.insert(flag_name.clone(), enabled);
290 }
291 }
292 debug!("Extracted {} feature flags", flags_map.len());
293 }
294 }
295 }
296
297 async fn update_feature_flag_cache(&self, key: &str, value: &ConfigValue) {
299 if let Some(flag_name) = key.strip_prefix("feature_flags.") {
300 if let Ok(enabled) = value.as_bool() {
301 self.feature_flags.insert(flag_name.to_string(), enabled);
302 }
303 } else if key.contains(".feature_flags.") {
304 let parts: Vec<&str> = key.split('.').collect();
306 if let Some(flag_idx) = parts.iter().position(|&part| part == "feature_flags") {
307 if flag_idx + 1 < parts.len() {
308 let flag_name = parts[flag_idx + 1..].join(".");
309 if let Ok(enabled) = value.as_bool() {
310 self.feature_flags.insert(flag_name, enabled);
311 }
312 }
313 }
314 }
315 }
316
317 async fn add_to_history(&self, key: &str, value: ConfigValue, changed_by: String) {
319 let entry = HistoryEntry {
320 timestamp: SystemTime::now(),
321 value,
322 changed_by,
323 change_type: "update".to_string(),
324 };
325
326 self.history
327 .entry(key.to_string())
328 .or_default()
329 .push(entry);
330
331 const MAX_HISTORY_PER_KEY: usize = 100;
333 if let Some(mut history) = self.history.get_mut(key) {
334 let history_len = history.len();
335 if history_len > MAX_HISTORY_PER_KEY {
336 history.drain(0..history_len - MAX_HISTORY_PER_KEY);
337 }
338 }
339 }
340
341 async fn start_source_watchers(&self) -> Result<()> {
343 Ok(())
347 }
348}
349
350impl Default for ConfigManager {
351 fn default() -> Self {
352 Self::new()
353 }
354}
355
356impl Drop for ConfigManager {
357 fn drop(&mut self) {
358 for entry in self.watchers.iter() {
360 entry.value().abort();
361 }
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use crate::sources::FileSource;
369 use tempfile::NamedTempFile;
370
371 #[tokio::test]
372 #[ignore] async fn test_config_manager_basic() {
374 let temp_file = NamedTempFile::with_suffix(".json").unwrap();
375 let content = r#"{"key": "value", "number": 42}"#;
376 std::fs::write(temp_file.path(), content).unwrap();
377
378 let mut manager = ConfigManager::new();
379 let source = FileSource::new().add_file(temp_file.path(), None);
380 manager.add_source(source, 10);
381
382 manager.initialize().await.unwrap();
383
384 let value: String = manager.get("key").await.unwrap();
385 assert_eq!(value, "value");
386
387 let number: i64 = manager.get("number").await.unwrap();
388 assert_eq!(number, 42);
389 }
390
391 #[tokio::test]
392 async fn test_config_manager_set_value() {
393 let manager = ConfigManager::new();
394 manager.initialize().await.unwrap();
395
396 manager
397 .set_value("test.key", ConfigValue::String("test_value".to_string()))
398 .await
399 .unwrap();
400
401 let value = manager.get_value("test.key").await.unwrap();
402 assert_eq!(value.as_string().unwrap(), "test_value");
403 }
404
405 #[tokio::test]
406 #[ignore] async fn test_feature_flags() {
408 let temp_file = NamedTempFile::with_suffix(".json").unwrap();
409 let content = r#"{"feature_flags": {"new_ui": true, "beta_feature": false}}"#;
410 std::fs::write(temp_file.path(), content).unwrap();
411
412 let mut manager = ConfigManager::new();
413 let source = FileSource::new().add_file(temp_file.path(), None);
414 manager.add_source(source, 10);
415
416 manager.initialize().await.unwrap();
417
418 assert!(manager.is_feature_enabled("new_ui").await.unwrap());
419 assert!(
420 !manager.is_feature_enabled("beta_feature").await.unwrap()
421 );
422 assert!(
423 !manager.is_feature_enabled("nonexistent").await.unwrap()
424 );
425 }
426
427 #[tokio::test]
428 async fn test_configuration_history() {
429 let manager = ConfigManager::new();
430 manager.initialize().await.unwrap();
431
432 manager
433 .set_value("test.key", ConfigValue::String("value1".to_string()))
434 .await
435 .unwrap();
436 manager
437 .set_value("test.key", ConfigValue::String("value2".to_string()))
438 .await
439 .unwrap();
440
441 let history = manager.get_history("test.key", 10).await.unwrap();
442 assert_eq!(history.len(), 2);
443 assert_eq!(history[0].value.as_string().unwrap(), "value2"); assert_eq!(history[1].value.as_string().unwrap(), "value1");
445 }
446}