swarm_engine_core/pipeline/
source.rs1use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
4use std::path::{Path, PathBuf};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use tokio::sync::mpsc;
7
8#[derive(Debug, Clone)]
10pub struct WatchEvent {
11 pub scenario: String,
13 pub paths: Vec<PathBuf>,
15 pub timestamp: u64,
17}
18
19impl WatchEvent {
20 pub fn new(scenario: String) -> Self {
22 Self {
23 scenario,
24 paths: vec![],
25 timestamp: SystemTime::now()
26 .duration_since(UNIX_EPOCH)
27 .unwrap_or_default()
28 .as_secs(),
29 }
30 }
31
32 pub fn with_paths(scenario: String, paths: Vec<PathBuf>) -> Self {
34 Self {
35 scenario,
36 paths,
37 timestamp: SystemTime::now()
38 .duration_since(UNIX_EPOCH)
39 .unwrap_or_default()
40 .as_secs(),
41 }
42 }
43}
44
45pub trait EventSource: Send {
47 type Event: Send;
49
50 fn next(&mut self) -> impl std::future::Future<Output = Option<Self::Event>> + Send;
52}
53
54pub struct LocalFileWatcherSource {
72 rx: mpsc::Receiver<WatchEvent>,
73 _watcher: RecommendedWatcher,
74}
75
76impl LocalFileWatcherSource {
77 pub fn new(watch_dir: &Path, poll_interval: Duration) -> Result<Self, notify::Error> {
86 let (tx, rx) = mpsc::channel(64);
87
88 let tx_clone = tx.clone();
89 let mut watcher = RecommendedWatcher::new(
90 move |res: Result<notify::Event, notify::Error>| {
91 if let Ok(event) = res {
92 if matches!(
94 event.kind,
95 notify::EventKind::Create(_) | notify::EventKind::Modify(_)
96 ) {
97 if let Some(scenario) = extract_scenario_from_paths(&event.paths) {
98 let watch_event = WatchEvent::with_paths(scenario, event.paths);
99 let _ = tx_clone.blocking_send(watch_event);
100 }
101 }
102 }
103 },
104 Config::default().with_poll_interval(poll_interval),
105 )?;
106
107 watcher.watch(watch_dir, RecursiveMode::Recursive)?;
108
109 Ok(Self {
110 rx,
111 _watcher: watcher,
112 })
113 }
114}
115
116impl EventSource for LocalFileWatcherSource {
117 type Event = WatchEvent;
118
119 async fn next(&mut self) -> Option<Self::Event> {
120 self.rx.recv().await
121 }
122}
123
124fn extract_scenario_from_paths(paths: &[PathBuf]) -> Option<String> {
138 for path in paths {
139 let components: Vec<_> = path.components().collect();
140 for (i, comp) in components.iter().enumerate() {
141 if let std::path::Component::Normal(s) = comp {
142 if s.to_string_lossy() == "scenarios" {
143 if let Some(std::path::Component::Normal(scenario)) = components.get(i + 1) {
145 if let Some(std::path::Component::Normal(sessions)) = components.get(i + 2)
147 {
148 if sessions.to_string_lossy() == "sessions" {
149 return Some(scenario.to_string_lossy().into_owned());
150 }
151 }
152 }
153 }
154 }
155 }
156 }
157 None
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 #[test]
165 fn test_extract_scenario_from_paths() {
166 let paths = vec![PathBuf::from(
168 "/home/user/.swarm-engine/learning/scenarios/troubleshooting/sessions/123/stats.json",
169 )];
170 assert_eq!(
171 extract_scenario_from_paths(&paths),
172 Some("troubleshooting".to_string())
173 );
174
175 let paths_with_spaces = vec![PathBuf::from(
177 "/home/user/.swarm-engine/learning/scenarios/Service Troubleshooting/sessions/456/stats.json",
178 )];
179 assert_eq!(
180 extract_scenario_from_paths(&paths_with_spaces),
181 Some("Service Troubleshooting".to_string())
182 );
183
184 let empty: Vec<PathBuf> = vec![];
186 assert_eq!(extract_scenario_from_paths(&empty), None);
187
188 let no_scenario = vec![PathBuf::from("/home/user/random/file.txt")];
190 assert_eq!(extract_scenario_from_paths(&no_scenario), None);
191
192 let no_sessions = vec![PathBuf::from(
194 "/home/user/.swarm-engine/learning/scenarios/troubleshooting/stats.json",
195 )];
196 assert_eq!(extract_scenario_from_paths(&no_sessions), None);
197 }
198
199 #[test]
200 fn test_watch_event_new() {
201 let event = WatchEvent::new("test_scenario".into());
202 assert_eq!(event.scenario, "test_scenario");
203 assert!(event.paths.is_empty());
204 assert!(event.timestamp > 0);
205 }
206}