use std::collections::HashMap;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone)]
pub struct ConfigChangeEvent {
pub key: String,
pub old_value: Option<String>,
pub new_value: String,
}
impl ConfigChangeEvent {
pub fn new(
key: impl Into<String>,
old_value: Option<impl Into<String>>,
new_value: impl Into<String>,
) -> Self {
Self {
key: key.into(),
old_value: old_value.map(Into::into),
new_value: new_value.into(),
}
}
pub fn is_new(&self) -> bool {
self.old_value.is_none()
}
pub fn is_removed(&self) -> bool {
self.new_value.is_empty()
}
}
pub(crate) type ChangeListener = Box<dyn Fn(&ConfigChangeEvent) + Send + Sync>;
pub struct RefreshScope {
refreshables: Arc<RwLock<HashMap<String, Arc<RwLock<RefreshableValue>>>>>,
listeners: Arc<RwLock<Vec<ChangeListener>>>,
}
impl std::fmt::Debug for RefreshScope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let count = self.refreshables.read().map_or(0, |m| m.len());
let listener_count = self.listeners.read().map_or(0, |v| v.len());
f.debug_struct("RefreshScope")
.field("refreshable_count", &count)
.field("listener_count", &listener_count)
.finish()
}
}
#[derive(Debug)]
struct RefreshableValue {
value: String,
}
impl RefreshScope {
pub fn new() -> Self {
Self {
refreshables: Arc::new(RwLock::new(HashMap::new())),
listeners: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn register(&self, key: impl Into<String>, initial_value: impl Into<String>) {
let mut map = self
.refreshables
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
map.insert(
key.into(),
Arc::new(RwLock::new(RefreshableValue {
value: initial_value.into(),
})),
);
}
pub fn get(&self, key: &str) -> Option<String> {
let map = self
.refreshables
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
map.get(key).map(|v| {
let guard = v.read().unwrap_or_else(std::sync::PoisonError::into_inner);
guard.value.clone()
})
}
pub fn add_listener(&self, listener: impl Fn(&ConfigChangeEvent) + Send + Sync + 'static) {
let mut listeners = self.listeners.write().unwrap_or_else(|e| e.into_inner());
listeners.push(Box::new(listener));
}
pub fn fire_event(&self, event: &ConfigChangeEvent) {
let map = self
.refreshables
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(refreshable) = map.get(&event.key) {
let mut guard = refreshable
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard.value.clone_from(&event.new_value);
}
let listeners = self.listeners.read().unwrap_or_else(|e| e.into_inner());
for listener in listeners.iter() {
listener(event);
}
}
pub fn refresh_all(&self, getter: impl Fn(&str) -> Option<String>) {
let map = self
.refreshables
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
for (key, refreshable) in map.iter() {
if let Some(new_value) = getter(key) {
let old_value = {
let guard = refreshable
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
Some(guard.value.clone())
};
let event = ConfigChangeEvent::new(key.as_str(), old_value, &new_value);
{
let mut guard = refreshable
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
guard.value = new_value;
}
let listeners = self.listeners.read().unwrap_or_else(|e| e.into_inner());
for listener in listeners.iter() {
listener(&event);
}
}
}
}
pub fn len(&self) -> usize {
let map = self
.refreshables
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
map.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for RefreshScope {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct ConfigWatcher {
watched_files: Vec<std::path::PathBuf>,
last_modified: HashMap<std::path::PathBuf, std::time::SystemTime>,
scope: RefreshScope,
}
impl ConfigWatcher {
pub fn new(scope: RefreshScope) -> Self {
Self {
watched_files: Vec::new(),
last_modified: HashMap::new(),
scope,
}
}
pub fn watch_file(&mut self, path: impl Into<std::path::PathBuf>) {
let path = path.into();
if let Ok(metadata) = std::fs::metadata(&path)
&& let Ok(modified) = metadata.modified()
{
self.last_modified.insert(path.clone(), modified);
}
self.watched_files.push(path);
}
pub fn check_changes(&mut self) -> Vec<String> {
let mut changed = Vec::new();
for path in &self.watched_files {
if let Ok(metadata) = std::fs::metadata(path)
&& let Ok(modified) = metadata.modified()
{
let prev = self.last_modified.get(path).copied();
if prev != Some(modified) {
let path_str = path.to_string_lossy().to_string();
let event = ConfigChangeEvent::new(&path_str, prev.map(|_| "old"), "updated");
self.scope.fire_event(&event);
self.last_modified.insert(path.clone(), modified);
changed.push(path_str);
}
}
}
changed
}
pub fn scope(&self) -> &RefreshScope {
&self.scope
}
pub fn watched_count(&self) -> usize {
self.watched_files.len()
}
}
#[derive(Debug)]
pub struct Refreshable<T> {
key: String,
value: Arc<RwLock<T>>,
}
impl<T: Clone> Refreshable<T> {
pub fn new(key: impl Into<String>, value: T) -> Self {
Self {
key: key.into(),
value: Arc::new(RwLock::new(value)),
}
}
pub fn get(&self) -> std::sync::RwLockReadGuard<'_, T> {
self.value
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
pub fn update(&self, new_value: T) {
let mut guard = self
.value
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*guard = new_value;
}
pub fn key(&self) -> &str {
&self.key
}
pub fn value(&self) -> T {
self.get().clone()
}
}
impl<T: Clone> Clone for Refreshable<T> {
fn clone(&self) -> Self {
Self {
key: self.key.clone(),
value: self.value.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_config_change_event() {
let event = ConfigChangeEvent::new("db.url", Some("old_host"), "new_host");
assert_eq!(event.key, "db.url");
assert_eq!(event.old_value, Some("old_host".to_string()));
assert_eq!(event.new_value, "new_host");
assert!(!event.is_new());
assert!(!event.is_removed());
}
#[test]
fn test_config_change_event_new_property() {
let event = ConfigChangeEvent::new("new.key", None::<String>, "value");
assert!(event.is_new());
assert!(!event.is_removed());
}
#[test]
fn test_config_change_event_removed() {
let event = ConfigChangeEvent::new("old.key", Some("value"), "");
assert!(event.is_removed());
}
#[test]
fn test_refresh_scope_register_and_get() {
let scope = RefreshScope::new();
scope.register("db.url", "localhost:5432");
scope.register("server.port", "8080");
assert_eq!(scope.get("db.url"), Some("localhost:5432".to_string()));
assert_eq!(scope.get("server.port"), Some("8080".to_string()));
assert_eq!(scope.get("missing"), None);
assert_eq!(scope.len(), 2);
}
#[test]
fn test_refresh_scope_fire_event() {
let scope = RefreshScope::new();
scope.register("db.url", "localhost:5432");
let event = ConfigChangeEvent::new("db.url", Some("localhost:5432"), "db.example.com:5432");
scope.fire_event(&event);
assert_eq!(scope.get("db.url"), Some("db.example.com:5432".to_string()));
}
#[test]
fn test_refresh_scope_listener() {
let scope = RefreshScope::new();
scope.register("db.url", "old");
let call_count = Arc::new(AtomicUsize::new(0));
let count_clone = call_count.clone();
scope.add_listener(move |_event| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
let event = ConfigChangeEvent::new("db.url", Some("old"), "new");
scope.fire_event(&event);
assert_eq!(call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_refresh_scope_refresh_all() {
let scope = RefreshScope::new();
scope.register("a", "1");
scope.register("b", "2");
let new_values: HashMap<String, String> = {
let mut m = HashMap::new();
m.insert("a".to_string(), "100".to_string());
m.insert("b".to_string(), "200".to_string());
m
};
scope.refresh_all(|key| new_values.get(key).cloned());
assert_eq!(scope.get("a"), Some("100".to_string()));
assert_eq!(scope.get("b"), Some("200".to_string()));
}
#[test]
fn test_refresh_scope_default() {
let scope = RefreshScope::default();
assert!(scope.is_empty());
}
#[test]
fn test_config_watcher() {
let scope = RefreshScope::new();
let mut watcher = ConfigWatcher::new(scope);
assert_eq!(watcher.watched_count(), 0);
watcher.watch_file("/nonexistent/config.yaml");
assert_eq!(watcher.watched_count(), 1);
let changes = watcher.check_changes();
assert!(changes.is_empty());
}
#[test]
fn test_refreshable() {
let refreshable = Refreshable::new("server.port", 8080);
assert_eq!(refreshable.key(), "server.port");
assert_eq!(*refreshable.get(), 8080);
assert_eq!(refreshable.value(), 8080);
refreshable.update(9090);
assert_eq!(*refreshable.get(), 9090);
}
#[test]
fn test_refreshable_clone() {
let r1 = Refreshable::new("key", "value");
let r2 = r1.clone();
r1.update("new_value");
assert_eq!(*r2.get(), "new_value");
}
}