1use std::collections::HashMap;
34use std::sync::{Arc, RwLock};
35
36#[derive(Debug, Clone)]
42pub struct ConfigChangeEvent {
43 pub key: String,
45 pub old_value: Option<String>,
47 pub new_value: String,
49}
50
51impl ConfigChangeEvent {
52 pub fn new(
54 key: impl Into<String>,
55 old_value: Option<impl Into<String>>,
56 new_value: impl Into<String>,
57 ) -> Self {
58 Self {
59 key: key.into(),
60 old_value: old_value.map(Into::into),
61 new_value: new_value.into(),
62 }
63 }
64
65 pub fn is_new(&self) -> bool {
67 self.old_value.is_none()
68 }
69
70 pub fn is_removed(&self) -> bool {
72 self.new_value.is_empty()
73 }
74}
75
76pub(crate) type ChangeListener = Box<dyn Fn(&ConfigChangeEvent) + Send + Sync>;
78
79pub struct RefreshScope {
88 refreshables: Arc<RwLock<HashMap<String, Arc<RwLock<RefreshableValue>>>>>,
90
91 listeners: Arc<RwLock<Vec<ChangeListener>>>,
93}
94
95impl std::fmt::Debug for RefreshScope {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 let count = self.refreshables.read().map_or(0, |m| m.len());
98 let listener_count = self.listeners.read().map_or(0, |v| v.len());
99 f.debug_struct("RefreshScope")
100 .field("refreshable_count", &count)
101 .field("listener_count", &listener_count)
102 .finish()
103 }
104}
105
106#[derive(Debug)]
108struct RefreshableValue {
109 value: String,
111}
112
113impl RefreshScope {
114 pub fn new() -> Self {
116 Self {
117 refreshables: Arc::new(RwLock::new(HashMap::new())),
118 listeners: Arc::new(RwLock::new(Vec::new())),
119 }
120 }
121
122 pub fn register(&self, key: impl Into<String>, initial_value: impl Into<String>) {
124 let mut map = self
125 .refreshables
126 .write()
127 .unwrap_or_else(std::sync::PoisonError::into_inner);
128 map.insert(
129 key.into(),
130 Arc::new(RwLock::new(RefreshableValue {
131 value: initial_value.into(),
132 })),
133 );
134 }
135
136 pub fn get(&self, key: &str) -> Option<String> {
138 let map = self
139 .refreshables
140 .read()
141 .unwrap_or_else(std::sync::PoisonError::into_inner);
142 map.get(key).map(|v| {
143 let guard = v.read().unwrap_or_else(std::sync::PoisonError::into_inner);
144 guard.value.clone()
145 })
146 }
147
148 pub fn add_listener(&self, listener: impl Fn(&ConfigChangeEvent) + Send + Sync + 'static) {
150 let mut listeners = self.listeners.write().unwrap_or_else(|e| e.into_inner());
151 listeners.push(Box::new(listener));
152 }
153
154 pub fn fire_event(&self, event: &ConfigChangeEvent) {
161 let map = self
164 .refreshables
165 .read()
166 .unwrap_or_else(std::sync::PoisonError::into_inner);
167 if let Some(refreshable) = map.get(&event.key) {
168 let mut guard = refreshable
169 .write()
170 .unwrap_or_else(std::sync::PoisonError::into_inner);
171 guard.value.clone_from(&event.new_value);
172 }
173
174 let listeners = self.listeners.read().unwrap_or_else(|e| e.into_inner());
177 for listener in listeners.iter() {
178 listener(event);
179 }
180 }
181
182 pub fn refresh_all(&self, getter: impl Fn(&str) -> Option<String>) {
188 let map = self
189 .refreshables
190 .read()
191 .unwrap_or_else(std::sync::PoisonError::into_inner);
192 for (key, refreshable) in map.iter() {
193 if let Some(new_value) = getter(key) {
194 let old_value = {
195 let guard = refreshable
196 .read()
197 .unwrap_or_else(std::sync::PoisonError::into_inner);
198 Some(guard.value.clone())
199 };
200
201 let event = ConfigChangeEvent::new(key.as_str(), old_value, &new_value);
202
203 {
204 let mut guard = refreshable
205 .write()
206 .unwrap_or_else(std::sync::PoisonError::into_inner);
207 guard.value = new_value;
208 }
209
210 let listeners = self.listeners.read().unwrap_or_else(|e| e.into_inner());
212 for listener in listeners.iter() {
213 listener(&event);
214 }
215 }
216 }
217 }
218
219 pub fn len(&self) -> usize {
221 let map = self
222 .refreshables
223 .read()
224 .unwrap_or_else(std::sync::PoisonError::into_inner);
225 map.len()
226 }
227
228 pub fn is_empty(&self) -> bool {
230 self.len() == 0
231 }
232}
233
234impl Default for RefreshScope {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240#[derive(Debug)]
248pub struct ConfigWatcher {
249 watched_files: Vec<std::path::PathBuf>,
251
252 last_modified: HashMap<std::path::PathBuf, std::time::SystemTime>,
254
255 scope: RefreshScope,
257}
258
259impl ConfigWatcher {
260 pub fn new(scope: RefreshScope) -> Self {
262 Self {
263 watched_files: Vec::new(),
264 last_modified: HashMap::new(),
265 scope,
266 }
267 }
268
269 pub fn watch_file(&mut self, path: impl Into<std::path::PathBuf>) {
271 let path = path.into();
272 if let Ok(metadata) = std::fs::metadata(&path)
273 && let Ok(modified) = metadata.modified()
274 {
275 self.last_modified.insert(path.clone(), modified);
276 }
277 self.watched_files.push(path);
278 }
279
280 pub fn check_changes(&mut self) -> Vec<String> {
285 let mut changed = Vec::new();
286
287 for path in &self.watched_files {
288 if let Ok(metadata) = std::fs::metadata(path)
289 && let Ok(modified) = metadata.modified()
290 {
291 let prev = self.last_modified.get(path).copied();
292 if prev != Some(modified) {
293 let path_str = path.to_string_lossy().to_string();
294 let event = ConfigChangeEvent::new(&path_str, prev.map(|_| "old"), "updated");
295 self.scope.fire_event(&event);
296 self.last_modified.insert(path.clone(), modified);
297 changed.push(path_str);
298 }
299 }
300 }
301
302 changed
303 }
304
305 pub fn scope(&self) -> &RefreshScope {
307 &self.scope
308 }
309
310 pub fn watched_count(&self) -> usize {
312 self.watched_files.len()
313 }
314}
315
316#[derive(Debug)]
334pub struct Refreshable<T> {
335 key: String,
337 value: Arc<RwLock<T>>,
339}
340
341impl<T: Clone> Refreshable<T> {
342 pub fn new(key: impl Into<String>, value: T) -> Self {
344 Self {
345 key: key.into(),
346 value: Arc::new(RwLock::new(value)),
347 }
348 }
349
350 pub fn get(&self) -> std::sync::RwLockReadGuard<'_, T> {
352 self.value
353 .read()
354 .unwrap_or_else(std::sync::PoisonError::into_inner)
355 }
356
357 pub fn update(&self, new_value: T) {
359 let mut guard = self
360 .value
361 .write()
362 .unwrap_or_else(std::sync::PoisonError::into_inner);
363 *guard = new_value;
364 }
365
366 pub fn key(&self) -> &str {
368 &self.key
369 }
370
371 pub fn value(&self) -> T {
373 self.get().clone()
374 }
375}
376
377impl<T: Clone> Clone for Refreshable<T> {
378 fn clone(&self) -> Self {
379 Self {
380 key: self.key.clone(),
381 value: self.value.clone(),
382 }
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use std::sync::atomic::{AtomicUsize, Ordering};
390
391 #[test]
392 fn test_config_change_event() {
393 let event = ConfigChangeEvent::new("db.url", Some("old_host"), "new_host");
394 assert_eq!(event.key, "db.url");
395 assert_eq!(event.old_value, Some("old_host".to_string()));
396 assert_eq!(event.new_value, "new_host");
397 assert!(!event.is_new());
398 assert!(!event.is_removed());
399 }
400
401 #[test]
402 fn test_config_change_event_new_property() {
403 let event = ConfigChangeEvent::new("new.key", None::<String>, "value");
404 assert!(event.is_new());
405 assert!(!event.is_removed());
406 }
407
408 #[test]
409 fn test_config_change_event_removed() {
410 let event = ConfigChangeEvent::new("old.key", Some("value"), "");
411 assert!(event.is_removed());
412 }
413
414 #[test]
415 fn test_refresh_scope_register_and_get() {
416 let scope = RefreshScope::new();
417 scope.register("db.url", "localhost:5432");
418 scope.register("server.port", "8080");
419
420 assert_eq!(scope.get("db.url"), Some("localhost:5432".to_string()));
421 assert_eq!(scope.get("server.port"), Some("8080".to_string()));
422 assert_eq!(scope.get("missing"), None);
423 assert_eq!(scope.len(), 2);
424 }
425
426 #[test]
427 fn test_refresh_scope_fire_event() {
428 let scope = RefreshScope::new();
429 scope.register("db.url", "localhost:5432");
430
431 let event = ConfigChangeEvent::new("db.url", Some("localhost:5432"), "db.example.com:5432");
432 scope.fire_event(&event);
433
434 assert_eq!(scope.get("db.url"), Some("db.example.com:5432".to_string()));
435 }
436
437 #[test]
438 fn test_refresh_scope_listener() {
439 let scope = RefreshScope::new();
440 scope.register("db.url", "old");
441
442 let call_count = Arc::new(AtomicUsize::new(0));
443 let count_clone = call_count.clone();
444 scope.add_listener(move |_event| {
445 count_clone.fetch_add(1, Ordering::SeqCst);
446 });
447
448 let event = ConfigChangeEvent::new("db.url", Some("old"), "new");
449 scope.fire_event(&event);
450
451 assert_eq!(call_count.load(Ordering::SeqCst), 1);
452 }
453
454 #[test]
455 fn test_refresh_scope_refresh_all() {
456 let scope = RefreshScope::new();
457 scope.register("a", "1");
458 scope.register("b", "2");
459
460 let new_values: HashMap<String, String> = {
461 let mut m = HashMap::new();
462 m.insert("a".to_string(), "100".to_string());
463 m.insert("b".to_string(), "200".to_string());
464 m
465 };
466
467 scope.refresh_all(|key| new_values.get(key).cloned());
468
469 assert_eq!(scope.get("a"), Some("100".to_string()));
470 assert_eq!(scope.get("b"), Some("200".to_string()));
471 }
472
473 #[test]
474 fn test_refresh_scope_default() {
475 let scope = RefreshScope::default();
476 assert!(scope.is_empty());
477 }
478
479 #[test]
480 fn test_config_watcher() {
481 let scope = RefreshScope::new();
482 let mut watcher = ConfigWatcher::new(scope);
483 assert_eq!(watcher.watched_count(), 0);
484
485 watcher.watch_file("/nonexistent/config.yaml");
487 assert_eq!(watcher.watched_count(), 1);
488
489 let changes = watcher.check_changes();
491 assert!(changes.is_empty());
492 }
493
494 #[test]
495 fn test_refreshable() {
496 let refreshable = Refreshable::new("server.port", 8080);
497 assert_eq!(refreshable.key(), "server.port");
498 assert_eq!(*refreshable.get(), 8080);
499 assert_eq!(refreshable.value(), 8080);
500
501 refreshable.update(9090);
502 assert_eq!(*refreshable.get(), 9090);
503 }
504
505 #[test]
506 fn test_refreshable_clone() {
507 let r1 = Refreshable::new("key", "value");
508 let r2 = r1.clone();
509 r1.update("new_value");
510 assert_eq!(*r2.get(), "new_value");
512 }
513}