1use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
6use parking_lot::RwLock;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use tokio::sync::mpsc;
11use xerv_core::error::{Result, XervError};
12use xerv_core::traits::{Trigger, TriggerConfig, TriggerEvent, TriggerFuture, TriggerType};
13use xerv_core::types::RelPtr;
14
15struct FilesystemState {
17 running: AtomicBool,
19 paused: AtomicBool,
21 shutdown_tx: RwLock<Option<tokio::sync::oneshot::Sender<()>>>,
23}
24
25pub struct FilesystemTrigger {
53 id: String,
55 path: PathBuf,
57 recursive: bool,
59 watch_create: bool,
61 watch_modify: bool,
62 watch_remove: bool,
63 state: Arc<FilesystemState>,
65}
66
67impl FilesystemTrigger {
68 pub fn new(id: impl Into<String>, path: impl Into<PathBuf>) -> Self {
70 Self {
71 id: id.into(),
72 path: path.into(),
73 recursive: false,
74 watch_create: true,
75 watch_modify: true,
76 watch_remove: true,
77 state: Arc::new(FilesystemState {
78 running: AtomicBool::new(false),
79 paused: AtomicBool::new(false),
80 shutdown_tx: RwLock::new(None),
81 }),
82 }
83 }
84
85 pub fn from_config(config: &TriggerConfig) -> Result<Self> {
87 let path = config
88 .get_string("path")
89 .ok_or_else(|| XervError::ConfigValue {
90 field: "path".to_string(),
91 cause: "Filesystem trigger requires 'path' parameter".to_string(),
92 })?;
93
94 let recursive = config.get_bool("recursive").unwrap_or(false);
95
96 let (watch_create, watch_modify, watch_remove) =
98 if let Some(events) = config.params.get("events") {
99 if let Some(events_arr) = events.as_sequence() {
100 let mut create = false;
101 let mut modify = false;
102 let mut remove = false;
103
104 for event in events_arr {
105 if let Some(event_str) = event.as_str() {
106 match event_str {
107 "create" => create = true,
108 "modify" => modify = true,
109 "remove" | "delete" => remove = true,
110 "rename" => {
111 create = true;
112 remove = true;
113 }
114 _ => {}
115 }
116 }
117 }
118
119 (create, modify, remove)
120 } else {
121 (true, true, true)
122 }
123 } else {
124 (true, true, true)
125 };
126
127 Ok(Self {
128 id: config.id.clone(),
129 path: PathBuf::from(path),
130 recursive,
131 watch_create,
132 watch_modify,
133 watch_remove,
134 state: Arc::new(FilesystemState {
135 running: AtomicBool::new(false),
136 paused: AtomicBool::new(false),
137 shutdown_tx: RwLock::new(None),
138 }),
139 })
140 }
141
142 pub fn recursive(mut self) -> Self {
144 self.recursive = true;
145 self
146 }
147
148 pub fn watch_create_only(mut self) -> Self {
150 self.watch_create = true;
151 self.watch_modify = false;
152 self.watch_remove = false;
153 self
154 }
155}
156
157impl Trigger for FilesystemTrigger {
158 fn trigger_type(&self) -> TriggerType {
159 TriggerType::Filesystem
160 }
161
162 fn id(&self) -> &str {
163 &self.id
164 }
165
166 fn start<'a>(
167 &'a self,
168 callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
169 ) -> TriggerFuture<'a, ()> {
170 let state = self.state.clone();
171 let path = self.path.clone();
172 let recursive = self.recursive;
173 let trigger_id = self.id.clone();
174 let watch_create = self.watch_create;
175 let watch_modify = self.watch_modify;
176 let watch_remove = self.watch_remove;
177
178 Box::pin(async move {
179 if state.running.load(Ordering::SeqCst) {
180 return Err(XervError::ConfigValue {
181 field: "trigger".to_string(),
182 cause: "Trigger is already running".to_string(),
183 });
184 }
185
186 if !path.exists() {
188 return Err(XervError::ConfigValue {
189 field: "path".to_string(),
190 cause: format!("Path does not exist: {}", path.display()),
191 });
192 }
193
194 tracing::info!(
195 trigger_id = %trigger_id,
196 path = %path.display(),
197 recursive = recursive,
198 "Filesystem trigger started"
199 );
200
201 state.running.store(true, Ordering::SeqCst);
202
203 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
204 *state.shutdown_tx.write() = Some(shutdown_tx);
205
206 let callback = Arc::new(callback);
207
208 let (tx, mut rx) = mpsc::channel(100);
210
211 let mut watcher = RecommendedWatcher::new(
213 move |res: std::result::Result<Event, notify::Error>| {
214 if let Ok(event) = res {
215 let _ = tx.blocking_send(event);
216 }
217 },
218 Config::default(),
219 )
220 .map_err(|e| XervError::Io {
221 path: path.clone(),
222 cause: format!("Failed to create file watcher: {}", e),
223 })?;
224
225 let mode = if recursive {
227 RecursiveMode::Recursive
228 } else {
229 RecursiveMode::NonRecursive
230 };
231
232 watcher.watch(&path, mode).map_err(|e| XervError::Io {
233 path: path.clone(),
234 cause: format!("Failed to watch path: {}", e),
235 })?;
236
237 loop {
238 tokio::select! {
239 _ = &mut shutdown_rx => {
240 tracing::info!(trigger_id = %trigger_id, "Filesystem trigger shutting down");
241 break;
242 }
243 Some(event) = rx.recv() => {
244 if state.paused.load(Ordering::SeqCst) {
245 tracing::debug!(trigger_id = %trigger_id, "Trigger paused, ignoring event");
246 continue;
247 }
248
249 let should_trigger = match &event.kind {
251 notify::EventKind::Create(_) => watch_create,
252 notify::EventKind::Modify(_) => watch_modify,
253 notify::EventKind::Remove(_) => watch_remove,
254 _ => false,
255 };
256
257 if !should_trigger {
258 continue;
259 }
260
261 let paths: Vec<String> = event.paths.iter()
262 .map(|p| p.display().to_string())
263 .collect();
264
265 let event_kind = format!("{:?}", event.kind);
266
267 let trigger_event = TriggerEvent::new(&trigger_id, RelPtr::null())
269 .with_metadata(format!(
270 "event={},paths={}",
271 event_kind,
272 paths.join(",")
273 ));
274
275 tracing::debug!(
276 trigger_id = %trigger_id,
277 trace_id = %trigger_event.trace_id,
278 event_kind = %event_kind,
279 paths = ?paths,
280 "Filesystem event detected"
281 );
282
283 callback(trigger_event);
284 }
285 }
286 }
287
288 state.running.store(false, Ordering::SeqCst);
289 Ok(())
290 })
291 }
292
293 fn stop<'a>(&'a self) -> TriggerFuture<'a, ()> {
294 let state = self.state.clone();
295 let trigger_id = self.id.clone();
296
297 Box::pin(async move {
298 if let Some(tx) = state.shutdown_tx.write().take() {
299 let _ = tx.send(());
300 tracing::info!(trigger_id = %trigger_id, "Filesystem trigger stopped");
301 }
302 state.running.store(false, Ordering::SeqCst);
303 Ok(())
304 })
305 }
306
307 fn pause<'a>(&'a self) -> TriggerFuture<'a, ()> {
308 let state = self.state.clone();
309 let trigger_id = self.id.clone();
310
311 Box::pin(async move {
312 state.paused.store(true, Ordering::SeqCst);
313 tracing::info!(trigger_id = %trigger_id, "Filesystem trigger paused");
314 Ok(())
315 })
316 }
317
318 fn resume<'a>(&'a self) -> TriggerFuture<'a, ()> {
319 let state = self.state.clone();
320 let trigger_id = self.id.clone();
321
322 Box::pin(async move {
323 state.paused.store(false, Ordering::SeqCst);
324 tracing::info!(trigger_id = %trigger_id, "Filesystem trigger resumed");
325 Ok(())
326 })
327 }
328
329 fn is_running(&self) -> bool {
330 self.state.running.load(Ordering::SeqCst)
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 #[test]
339 fn filesystem_trigger_creation() {
340 let trigger = FilesystemTrigger::new("test_fs", "/tmp");
341 assert_eq!(trigger.id(), "test_fs");
342 assert_eq!(trigger.trigger_type(), TriggerType::Filesystem);
343 assert!(!trigger.is_running());
344 assert!(!trigger.recursive);
345 }
346
347 #[test]
348 fn filesystem_trigger_from_config() {
349 let mut params = serde_yaml::Mapping::new();
350 params.insert(
351 serde_yaml::Value::String("path".to_string()),
352 serde_yaml::Value::String("/data/incoming".to_string()),
353 );
354 params.insert(
355 serde_yaml::Value::String("recursive".to_string()),
356 serde_yaml::Value::Bool(true),
357 );
358
359 let config = TriggerConfig::new("fs_test", TriggerType::Filesystem)
360 .with_params(serde_yaml::Value::Mapping(params));
361
362 let trigger = FilesystemTrigger::from_config(&config).unwrap();
363 assert_eq!(trigger.id(), "fs_test");
364 assert_eq!(trigger.path.to_str().unwrap(), "/data/incoming");
365 assert!(trigger.recursive);
366 }
367
368 #[test]
369 fn filesystem_trigger_missing_path() {
370 let config = TriggerConfig::new("fs_test", TriggerType::Filesystem);
371 let result = FilesystemTrigger::from_config(&config);
372 assert!(result.is_err());
373 }
374
375 #[test]
376 fn filesystem_trigger_builder() {
377 let trigger = FilesystemTrigger::new("builder_test", "/tmp")
378 .recursive()
379 .watch_create_only();
380
381 assert!(trigger.recursive);
382 assert!(trigger.watch_create);
383 assert!(!trigger.watch_modify);
384 assert!(!trigger.watch_remove);
385 }
386}