use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use parking_lot::RwLock;
use tokio::sync::broadcast;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ConfigSource {
Main(PathBuf),
Device(PathBuf),
Scenario(PathBuf),
Protocol { protocol: String, path: PathBuf },
Custom { name: String, path: PathBuf },
}
impl ConfigSource {
pub fn path(&self) -> &PathBuf {
match self {
Self::Main(p) => p,
Self::Device(p) => p,
Self::Scenario(p) => p,
Self::Protocol { path, .. } => path,
Self::Custom { path, .. } => path,
}
}
pub fn name(&self) -> String {
match self {
Self::Main(_) => "main".to_string(),
Self::Device(_) => "device".to_string(),
Self::Scenario(_) => "scenario".to_string(),
Self::Protocol { protocol, .. } => format!("protocol:{}", protocol),
Self::Custom { name, .. } => format!("custom:{}", name),
}
}
}
#[derive(Debug, Clone)]
pub enum ConfigEvent {
Created {
source: ConfigSource,
timestamp: Instant,
},
Modified {
source: ConfigSource,
timestamp: Instant,
},
Deleted {
source: ConfigSource,
timestamp: Instant,
},
Renamed {
source: ConfigSource,
old_path: PathBuf,
new_path: PathBuf,
timestamp: Instant,
},
Error {
source: Option<ConfigSource>,
message: String,
timestamp: Instant,
},
Reloaded {
source: ConfigSource,
timestamp: Instant,
},
}
impl ConfigEvent {
pub fn timestamp(&self) -> Instant {
match self {
Self::Created { timestamp, .. } => *timestamp,
Self::Modified { timestamp, .. } => *timestamp,
Self::Deleted { timestamp, .. } => *timestamp,
Self::Renamed { timestamp, .. } => *timestamp,
Self::Error { timestamp, .. } => *timestamp,
Self::Reloaded { timestamp, .. } => *timestamp,
}
}
pub fn source(&self) -> Option<&ConfigSource> {
match self {
Self::Created { source, .. } => Some(source),
Self::Modified { source, .. } => Some(source),
Self::Deleted { source, .. } => Some(source),
Self::Renamed { source, .. } => Some(source),
Self::Error { source, .. } => source.as_ref(),
Self::Reloaded { source, .. } => Some(source),
}
}
pub fn is_error(&self) -> bool {
matches!(self, Self::Error { .. })
}
}
pub trait ConfigEventHandler: Send + Sync {
fn on_config_change(&self, event: ConfigEvent);
fn before_reload(&self, _source: &ConfigSource) -> bool {
true
}
fn after_reload(&self, _source: &ConfigSource) {}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WatcherState {
Stopped,
Running,
Paused,
}
pub struct ConfigWatcher {
state: RwLock<WatcherState>,
sources: RwLock<Vec<ConfigSource>>,
event_tx: broadcast::Sender<ConfigEvent>,
debounce_ms: u64,
last_events: RwLock<std::collections::HashMap<String, Instant>>,
}
impl ConfigWatcher {
pub fn new() -> Self {
let (event_tx, _) = broadcast::channel(256);
Self {
state: RwLock::new(WatcherState::Stopped),
sources: RwLock::new(Vec::new()),
event_tx,
debounce_ms: 100,
last_events: RwLock::new(std::collections::HashMap::new()),
}
}
pub fn with_debounce(debounce_ms: u64) -> Self {
let mut watcher = Self::new();
watcher.debounce_ms = debounce_ms;
watcher
}
pub fn state(&self) -> WatcherState {
*self.state.read()
}
pub fn start(&self) {
*self.state.write() = WatcherState::Running;
}
pub fn stop(&self) {
*self.state.write() = WatcherState::Stopped;
}
pub fn pause(&self) {
*self.state.write() = WatcherState::Paused;
}
pub fn resume(&self) {
*self.state.write() = WatcherState::Running;
}
pub fn register(&self, source: ConfigSource) {
let mut sources = self.sources.write();
if !sources.iter().any(|s| s.path() == source.path()) {
sources.push(source);
}
}
pub fn unregister(&self, path: &PathBuf) {
self.sources.write().retain(|s| s.path() != path);
}
pub fn sources(&self) -> Vec<ConfigSource> {
self.sources.read().clone()
}
pub fn subscribe(&self) -> broadcast::Receiver<ConfigEvent> {
self.event_tx.subscribe()
}
pub fn emit(&self, event: ConfigEvent) {
if *self.state.read() != WatcherState::Running {
return;
}
if let Some(source) = event.source() {
let source_key = source.name();
let now = Instant::now();
let mut last_events = self.last_events.write();
if let Some(last_time) = last_events.get(&source_key) {
if now.duration_since(*last_time).as_millis() < self.debounce_ms as u128 {
return; }
}
last_events.insert(source_key, now);
}
let _ = self.event_tx.send(event);
}
pub fn emit_modified(&self, source: ConfigSource) {
self.emit(ConfigEvent::Modified {
source,
timestamp: Instant::now(),
});
}
pub fn emit_reloaded(&self, source: ConfigSource) {
self.emit(ConfigEvent::Reloaded {
source,
timestamp: Instant::now(),
});
}
pub fn emit_error(&self, source: Option<ConfigSource>, message: impl Into<String>) {
self.emit(ConfigEvent::Error {
source,
message: message.into(),
timestamp: Instant::now(),
});
}
}
impl Default for ConfigWatcher {
fn default() -> Self {
Self::new()
}
}
pub type SharedConfigWatcher = Arc<ConfigWatcher>;
pub fn create_config_watcher() -> SharedConfigWatcher {
Arc::new(ConfigWatcher::new())
}
pub struct CallbackHandler<F>
where
F: Fn(ConfigEvent) + Send + Sync,
{
callback: F,
}
impl<F> CallbackHandler<F>
where
F: Fn(ConfigEvent) + Send + Sync,
{
pub fn new(callback: F) -> Self {
Self { callback }
}
}
impl<F> ConfigEventHandler for CallbackHandler<F>
where
F: Fn(ConfigEvent) + Send + Sync,
{
fn on_config_change(&self, event: ConfigEvent) {
(self.callback)(event);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_source() {
let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
assert_eq!(source.name(), "main");
assert_eq!(source.path(), &PathBuf::from("/etc/config.yaml"));
let protocol_source = ConfigSource::Protocol {
protocol: "modbus".to_string(),
path: PathBuf::from("/etc/modbus.yaml"),
};
assert_eq!(protocol_source.name(), "protocol:modbus");
}
#[test]
fn test_config_watcher_lifecycle() {
let watcher = ConfigWatcher::new();
assert_eq!(watcher.state(), WatcherState::Stopped);
watcher.start();
assert_eq!(watcher.state(), WatcherState::Running);
watcher.pause();
assert_eq!(watcher.state(), WatcherState::Paused);
watcher.resume();
assert_eq!(watcher.state(), WatcherState::Running);
watcher.stop();
assert_eq!(watcher.state(), WatcherState::Stopped);
}
#[test]
fn test_config_watcher_sources() {
let watcher = ConfigWatcher::new();
let source1 = ConfigSource::Main(PathBuf::from("/etc/config1.yaml"));
let source2 = ConfigSource::Device(PathBuf::from("/etc/config2.yaml"));
watcher.register(source1.clone());
watcher.register(source2.clone());
let sources = watcher.sources();
assert_eq!(sources.len(), 2);
watcher.unregister(&PathBuf::from("/etc/config1.yaml"));
let sources = watcher.sources();
assert_eq!(sources.len(), 1);
}
#[tokio::test]
async fn test_config_watcher_events() {
let watcher = ConfigWatcher::new();
watcher.start();
let mut rx = watcher.subscribe();
let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
watcher.emit_modified(source.clone());
let event = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
.await
.expect("Timeout")
.expect("Channel closed");
assert!(matches!(event, ConfigEvent::Modified { .. }));
}
#[test]
fn test_config_event_methods() {
let source = ConfigSource::Main(PathBuf::from("/etc/config.yaml"));
let event = ConfigEvent::Modified {
source: source.clone(),
timestamp: Instant::now(),
};
assert!(!event.is_error());
assert_eq!(event.source().unwrap().name(), "main");
let error_event = ConfigEvent::Error {
source: None,
message: "Test error".to_string(),
timestamp: Instant::now(),
};
assert!(error_event.is_error());
}
}