use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct WatchEvent {
pub scenario: String,
pub paths: Vec<PathBuf>,
pub timestamp: u64,
}
impl WatchEvent {
pub fn new(scenario: String) -> Self {
Self {
scenario,
paths: vec![],
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}
}
pub fn with_paths(scenario: String, paths: Vec<PathBuf>) -> Self {
Self {
scenario,
paths,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
}
}
}
pub trait EventSource: Send {
type Event: Send;
fn next(&mut self) -> impl std::future::Future<Output = Option<Self::Event>> + Send;
}
pub struct LocalFileWatcherSource {
rx: mpsc::Receiver<WatchEvent>,
_watcher: RecommendedWatcher,
}
impl LocalFileWatcherSource {
pub fn new(watch_dir: &Path, poll_interval: Duration) -> Result<Self, notify::Error> {
let (tx, rx) = mpsc::channel(64);
let tx_clone = tx.clone();
let mut watcher = RecommendedWatcher::new(
move |res: Result<notify::Event, notify::Error>| {
if let Ok(event) = res {
if matches!(
event.kind,
notify::EventKind::Create(_) | notify::EventKind::Modify(_)
) {
if let Some(scenario) = extract_scenario_from_paths(&event.paths) {
let watch_event = WatchEvent::with_paths(scenario, event.paths);
let _ = tx_clone.blocking_send(watch_event);
}
}
}
},
Config::default().with_poll_interval(poll_interval),
)?;
watcher.watch(watch_dir, RecursiveMode::Recursive)?;
Ok(Self {
rx,
_watcher: watcher,
})
}
}
impl EventSource for LocalFileWatcherSource {
type Event = WatchEvent;
async fn next(&mut self) -> Option<Self::Event> {
self.rx.recv().await
}
}
fn extract_scenario_from_paths(paths: &[PathBuf]) -> Option<String> {
for path in paths {
let components: Vec<_> = path.components().collect();
for (i, comp) in components.iter().enumerate() {
if let std::path::Component::Normal(s) = comp {
if s.to_string_lossy() == "scenarios" {
if let Some(std::path::Component::Normal(scenario)) = components.get(i + 1) {
if let Some(std::path::Component::Normal(sessions)) = components.get(i + 2)
{
if sessions.to_string_lossy() == "sessions" {
return Some(scenario.to_string_lossy().into_owned());
}
}
}
}
}
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_scenario_from_paths() {
let paths = vec![PathBuf::from(
"/home/user/.swarm-engine/learning/scenarios/troubleshooting/sessions/123/stats.json",
)];
assert_eq!(
extract_scenario_from_paths(&paths),
Some("troubleshooting".to_string())
);
let paths_with_spaces = vec![PathBuf::from(
"/home/user/.swarm-engine/learning/scenarios/Service Troubleshooting/sessions/456/stats.json",
)];
assert_eq!(
extract_scenario_from_paths(&paths_with_spaces),
Some("Service Troubleshooting".to_string())
);
let empty: Vec<PathBuf> = vec![];
assert_eq!(extract_scenario_from_paths(&empty), None);
let no_scenario = vec![PathBuf::from("/home/user/random/file.txt")];
assert_eq!(extract_scenario_from_paths(&no_scenario), None);
let no_sessions = vec![PathBuf::from(
"/home/user/.swarm-engine/learning/scenarios/troubleshooting/stats.json",
)];
assert_eq!(extract_scenario_from_paths(&no_sessions), None);
}
#[test]
fn test_watch_event_new() {
let event = WatchEvent::new("test_scenario".into());
assert_eq!(event.scenario, "test_scenario");
assert!(event.paths.is_empty());
assert!(event.timestamp > 0);
}
}