xerv_nodes/triggers/
filesystem.rs

1//! Filesystem trigger (file watcher).
2//!
3//! Fires events when files are created, modified, or deleted.
4
5use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
6use parking_lot::RwLock;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use tokio::sync::mpsc;
11use xerv_core::error::{Result, XervError};
12use xerv_core::traits::{Trigger, TriggerConfig, TriggerEvent, TriggerFuture, TriggerType};
13use xerv_core::types::RelPtr;
14
15/// State for the filesystem trigger.
16struct FilesystemState {
17    /// Whether the trigger is running.
18    running: AtomicBool,
19    /// Whether the trigger is paused.
20    paused: AtomicBool,
21    /// Shutdown signal sender.
22    shutdown_tx: RwLock<Option<tokio::sync::oneshot::Sender<()>>>,
23}
24
25/// Filesystem event trigger.
26///
27/// Watches a directory for file changes and fires events.
28///
29/// # Configuration
30///
31/// ```yaml
32/// triggers:
33///   - id: file_watcher
34///     type: trigger::filesystem
35///     params:
36///       path: "/data/incoming"
37///       recursive: true
38///       events:
39///         - create
40///         - modify
41/// ```
42///
43/// # Parameters
44///
45/// - `path` - Directory path to watch (required)
46/// - `recursive` - Watch subdirectories (default: false)
47/// - `events` - Event types to watch (default: all)
48///   - `create` - File created
49///   - `modify` - File modified
50///   - `remove` - File deleted
51///   - `rename` - File renamed
52pub struct FilesystemTrigger {
53    /// Trigger ID.
54    id: String,
55    /// Path to watch.
56    path: PathBuf,
57    /// Watch recursively.
58    recursive: bool,
59    /// Event types to watch.
60    watch_create: bool,
61    watch_modify: bool,
62    watch_remove: bool,
63    /// Internal state.
64    state: Arc<FilesystemState>,
65}
66
67impl FilesystemTrigger {
68    /// Create a new filesystem trigger.
69    pub fn new(id: impl Into<String>, path: impl Into<PathBuf>) -> Self {
70        Self {
71            id: id.into(),
72            path: path.into(),
73            recursive: false,
74            watch_create: true,
75            watch_modify: true,
76            watch_remove: true,
77            state: Arc::new(FilesystemState {
78                running: AtomicBool::new(false),
79                paused: AtomicBool::new(false),
80                shutdown_tx: RwLock::new(None),
81            }),
82        }
83    }
84
85    /// Create from configuration.
86    pub fn from_config(config: &TriggerConfig) -> Result<Self> {
87        let path = config
88            .get_string("path")
89            .ok_or_else(|| XervError::ConfigValue {
90                field: "path".to_string(),
91                cause: "Filesystem trigger requires 'path' parameter".to_string(),
92            })?;
93
94        let recursive = config.get_bool("recursive").unwrap_or(false);
95
96        // Parse event types
97        let (watch_create, watch_modify, watch_remove) =
98            if let Some(events) = config.params.get("events") {
99                if let Some(events_arr) = events.as_sequence() {
100                    let mut create = false;
101                    let mut modify = false;
102                    let mut remove = false;
103
104                    for event in events_arr {
105                        if let Some(event_str) = event.as_str() {
106                            match event_str {
107                                "create" => create = true,
108                                "modify" => modify = true,
109                                "remove" | "delete" => remove = true,
110                                "rename" => {
111                                    create = true;
112                                    remove = true;
113                                }
114                                _ => {}
115                            }
116                        }
117                    }
118
119                    (create, modify, remove)
120                } else {
121                    (true, true, true)
122                }
123            } else {
124                (true, true, true)
125            };
126
127        Ok(Self {
128            id: config.id.clone(),
129            path: PathBuf::from(path),
130            recursive,
131            watch_create,
132            watch_modify,
133            watch_remove,
134            state: Arc::new(FilesystemState {
135                running: AtomicBool::new(false),
136                paused: AtomicBool::new(false),
137                shutdown_tx: RwLock::new(None),
138            }),
139        })
140    }
141
142    /// Enable recursive watching.
143    pub fn recursive(mut self) -> Self {
144        self.recursive = true;
145        self
146    }
147
148    /// Watch only for create events.
149    pub fn watch_create_only(mut self) -> Self {
150        self.watch_create = true;
151        self.watch_modify = false;
152        self.watch_remove = false;
153        self
154    }
155}
156
157impl Trigger for FilesystemTrigger {
158    fn trigger_type(&self) -> TriggerType {
159        TriggerType::Filesystem
160    }
161
162    fn id(&self) -> &str {
163        &self.id
164    }
165
166    fn start<'a>(
167        &'a self,
168        callback: Box<dyn Fn(TriggerEvent) + Send + Sync + 'static>,
169    ) -> TriggerFuture<'a, ()> {
170        let state = self.state.clone();
171        let path = self.path.clone();
172        let recursive = self.recursive;
173        let trigger_id = self.id.clone();
174        let watch_create = self.watch_create;
175        let watch_modify = self.watch_modify;
176        let watch_remove = self.watch_remove;
177
178        Box::pin(async move {
179            if state.running.load(Ordering::SeqCst) {
180                return Err(XervError::ConfigValue {
181                    field: "trigger".to_string(),
182                    cause: "Trigger is already running".to_string(),
183                });
184            }
185
186            // Check if path exists
187            if !path.exists() {
188                return Err(XervError::ConfigValue {
189                    field: "path".to_string(),
190                    cause: format!("Path does not exist: {}", path.display()),
191                });
192            }
193
194            tracing::info!(
195                trigger_id = %trigger_id,
196                path = %path.display(),
197                recursive = recursive,
198                "Filesystem trigger started"
199            );
200
201            state.running.store(true, Ordering::SeqCst);
202
203            let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
204            *state.shutdown_tx.write() = Some(shutdown_tx);
205
206            let callback = Arc::new(callback);
207
208            // Create channel for watcher events
209            let (tx, mut rx) = mpsc::channel(100);
210
211            // Create watcher
212            let mut watcher = RecommendedWatcher::new(
213                move |res: std::result::Result<Event, notify::Error>| {
214                    if let Ok(event) = res {
215                        let _ = tx.blocking_send(event);
216                    }
217                },
218                Config::default(),
219            )
220            .map_err(|e| XervError::Io {
221                path: path.clone(),
222                cause: format!("Failed to create file watcher: {}", e),
223            })?;
224
225            // Start watching
226            let mode = if recursive {
227                RecursiveMode::Recursive
228            } else {
229                RecursiveMode::NonRecursive
230            };
231
232            watcher.watch(&path, mode).map_err(|e| XervError::Io {
233                path: path.clone(),
234                cause: format!("Failed to watch path: {}", e),
235            })?;
236
237            loop {
238                tokio::select! {
239                    _ = &mut shutdown_rx => {
240                        tracing::info!(trigger_id = %trigger_id, "Filesystem trigger shutting down");
241                        break;
242                    }
243                    Some(event) = rx.recv() => {
244                        if state.paused.load(Ordering::SeqCst) {
245                            tracing::debug!(trigger_id = %trigger_id, "Trigger paused, ignoring event");
246                            continue;
247                        }
248
249                        // Check event type
250                        let should_trigger = match &event.kind {
251                            notify::EventKind::Create(_) => watch_create,
252                            notify::EventKind::Modify(_) => watch_modify,
253                            notify::EventKind::Remove(_) => watch_remove,
254                            _ => false,
255                        };
256
257                        if !should_trigger {
258                            continue;
259                        }
260
261                        let paths: Vec<String> = event.paths.iter()
262                            .map(|p| p.display().to_string())
263                            .collect();
264
265                        let event_kind = format!("{:?}", event.kind);
266
267                        // Create trigger event
268                        let trigger_event = TriggerEvent::new(&trigger_id, RelPtr::null())
269                            .with_metadata(format!(
270                                "event={},paths={}",
271                                event_kind,
272                                paths.join(",")
273                            ));
274
275                        tracing::debug!(
276                            trigger_id = %trigger_id,
277                            trace_id = %trigger_event.trace_id,
278                            event_kind = %event_kind,
279                            paths = ?paths,
280                            "Filesystem event detected"
281                        );
282
283                        callback(trigger_event);
284                    }
285                }
286            }
287
288            state.running.store(false, Ordering::SeqCst);
289            Ok(())
290        })
291    }
292
293    fn stop<'a>(&'a self) -> TriggerFuture<'a, ()> {
294        let state = self.state.clone();
295        let trigger_id = self.id.clone();
296
297        Box::pin(async move {
298            if let Some(tx) = state.shutdown_tx.write().take() {
299                let _ = tx.send(());
300                tracing::info!(trigger_id = %trigger_id, "Filesystem trigger stopped");
301            }
302            state.running.store(false, Ordering::SeqCst);
303            Ok(())
304        })
305    }
306
307    fn pause<'a>(&'a self) -> TriggerFuture<'a, ()> {
308        let state = self.state.clone();
309        let trigger_id = self.id.clone();
310
311        Box::pin(async move {
312            state.paused.store(true, Ordering::SeqCst);
313            tracing::info!(trigger_id = %trigger_id, "Filesystem trigger paused");
314            Ok(())
315        })
316    }
317
318    fn resume<'a>(&'a self) -> TriggerFuture<'a, ()> {
319        let state = self.state.clone();
320        let trigger_id = self.id.clone();
321
322        Box::pin(async move {
323            state.paused.store(false, Ordering::SeqCst);
324            tracing::info!(trigger_id = %trigger_id, "Filesystem trigger resumed");
325            Ok(())
326        })
327    }
328
329    fn is_running(&self) -> bool {
330        self.state.running.load(Ordering::SeqCst)
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn filesystem_trigger_creation() {
340        let trigger = FilesystemTrigger::new("test_fs", "/tmp");
341        assert_eq!(trigger.id(), "test_fs");
342        assert_eq!(trigger.trigger_type(), TriggerType::Filesystem);
343        assert!(!trigger.is_running());
344        assert!(!trigger.recursive);
345    }
346
347    #[test]
348    fn filesystem_trigger_from_config() {
349        let mut params = serde_yaml::Mapping::new();
350        params.insert(
351            serde_yaml::Value::String("path".to_string()),
352            serde_yaml::Value::String("/data/incoming".to_string()),
353        );
354        params.insert(
355            serde_yaml::Value::String("recursive".to_string()),
356            serde_yaml::Value::Bool(true),
357        );
358
359        let config = TriggerConfig::new("fs_test", TriggerType::Filesystem)
360            .with_params(serde_yaml::Value::Mapping(params));
361
362        let trigger = FilesystemTrigger::from_config(&config).unwrap();
363        assert_eq!(trigger.id(), "fs_test");
364        assert_eq!(trigger.path.to_str().unwrap(), "/data/incoming");
365        assert!(trigger.recursive);
366    }
367
368    #[test]
369    fn filesystem_trigger_missing_path() {
370        let config = TriggerConfig::new("fs_test", TriggerType::Filesystem);
371        let result = FilesystemTrigger::from_config(&config);
372        assert!(result.is_err());
373    }
374
375    #[test]
376    fn filesystem_trigger_builder() {
377        let trigger = FilesystemTrigger::new("builder_test", "/tmp")
378            .recursive()
379            .watch_create_only();
380
381        assert!(trigger.recursive);
382        assert!(trigger.watch_create);
383        assert!(!trigger.watch_modify);
384        assert!(!trigger.watch_remove);
385    }
386}