Skip to main content

drasi_host_sdk/
watcher.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Policy-neutral plugin filesystem watcher.
16//!
17//! The [`PluginWatcher`] monitors a plugins directory and emits raw
18//! [`PluginFileEvent`]s. It does not decide whether a file change means load
19//! or reload — that policy belongs in the host application's orchestrator layer.
20//!
21//! ## Backends
22//!
23//! - **`watcher` feature enabled**: Uses the `notify` crate with
24//!   `notify-debouncer-mini` for efficient, event-driven filesystem watching.
25//! - **Without `watcher` feature**: Falls back to a polling implementation
26//!   that periodically scans the directory. Suitable for development and
27//!   environments where native fs-event support is unavailable.
28
29use std::path::PathBuf;
30use std::time::Duration;
31
32use tokio::sync::broadcast;
33
34use crate::loader::is_plugin_binary;
35use crate::plugin_types::PluginFileEvent;
36
37/// Configuration for the plugin filesystem watcher.
38#[derive(Debug, Clone)]
39pub struct PluginWatcherConfig {
40    /// Directory to watch for plugin changes.
41    pub plugins_dir: PathBuf,
42    /// Debounce duration to coalesce rapid filesystem events.
43    pub debounce: Duration,
44}
45
46impl Default for PluginWatcherConfig {
47    fn default() -> Self {
48        Self {
49            plugins_dir: PathBuf::from("plugins"),
50            debounce: Duration::from_secs(2),
51        }
52    }
53}
54
55/// Watches a plugins directory and emits raw filesystem events.
56///
57/// The watcher is policy-neutral: it only emits [`PluginFileEvent`]s and does
58/// not make any loading decisions. The host application's orchestrator
59/// subscribes to these events and applies its own policy.
60pub struct PluginWatcher {
61    config: PluginWatcherConfig,
62    event_tx: broadcast::Sender<PluginFileEvent>,
63    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
64    #[cfg(feature = "watcher")]
65    #[allow(dead_code)]
66    notify_watcher: Option<notify_debouncer_mini::Debouncer<notify::RecommendedWatcher>>,
67}
68
69impl PluginWatcher {
70    /// Create a new watcher. Call [`start`] or [`start_polling`] to begin monitoring.
71    pub fn new(config: PluginWatcherConfig) -> Self {
72        let (event_tx, _) = broadcast::channel(64);
73        Self {
74            config,
75            event_tx,
76            shutdown_tx: None,
77            #[cfg(feature = "watcher")]
78            notify_watcher: None,
79        }
80    }
81
82    /// Subscribe to plugin file events.
83    pub fn subscribe(&self) -> broadcast::Receiver<PluginFileEvent> {
84        self.event_tx.subscribe()
85    }
86
87    /// The event sender, so the host can forward events programmatically.
88    pub fn event_sender(&self) -> &broadcast::Sender<PluginFileEvent> {
89        &self.event_tx
90    }
91
92    /// Start watching the plugins directory using the best available backend.
93    ///
94    /// With the `watcher` feature enabled, this uses the `notify` crate for
95    /// efficient, event-driven filesystem watching. Without it, falls back to
96    /// polling.
97    #[cfg(feature = "watcher")]
98    pub fn start(&mut self) -> anyhow::Result<()> {
99        use notify_debouncer_mini::new_debouncer;
100        use std::sync::mpsc;
101
102        let (tx, rx) = mpsc::channel();
103        let debounce = self.config.debounce;
104
105        let mut debouncer = new_debouncer(debounce, tx)?;
106        debouncer.watcher().watch(
107            &self.config.plugins_dir,
108            notify::RecursiveMode::NonRecursive,
109        )?;
110
111        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
112        self.shutdown_tx = Some(shutdown_tx);
113
114        let event_tx = self.event_tx.clone();
115        let dir = self.config.plugins_dir.clone();
116
117        // Track known files for detecting adds vs changes
118        let mut known_files: std::collections::HashSet<PathBuf> = std::collections::HashSet::new();
119        if let Ok(entries) = std::fs::read_dir(&dir) {
120            for entry in entries.flatten() {
121                let name = entry.file_name();
122                if is_plugin_binary(&name.to_string_lossy()) {
123                    known_files.insert(entry.path());
124                }
125            }
126        }
127
128        tokio::spawn(async move {
129            loop {
130                tokio::select! {
131                    _ = &mut shutdown_rx => {
132                        log::debug!("Plugin watcher (notify) shutting down");
133                        break;
134                    }
135                    // Check for fs events on a short interval
136                    _ = tokio::time::sleep(Duration::from_millis(100)) => {
137                        while let Ok(result) = rx.try_recv() {
138                            match result {
139                                Ok(events) => {
140                                    for event in events {
141                                        let path = event.path;
142                                        let name = path.file_name()
143                                            .map(|n| n.to_string_lossy().to_string())
144                                            .unwrap_or_default();
145
146                                        if !is_plugin_binary(&name) {
147                                            continue;
148                                        }
149
150                                        if path.exists() {
151                                            if known_files.contains(&path) {
152                                                let _ = event_tx.send(PluginFileEvent::Changed(path));
153                                            } else {
154                                                known_files.insert(path.clone());
155                                                let _ = event_tx.send(PluginFileEvent::Added(path));
156                                            }
157                                        } else {
158                                            known_files.remove(&path);
159                                            let _ = event_tx.send(PluginFileEvent::Removed(path));
160                                        }
161                                    }
162                                }
163                                Err(err) => {
164                                    log::warn!("Filesystem watcher error: {err}");
165                                }
166                            }
167                        }
168                    }
169                }
170            }
171        });
172
173        // Keep the debouncer alive
174        self.notify_watcher = Some(debouncer);
175
176        Ok(())
177    }
178
179    /// Start watching using the notify backend.
180    ///
181    /// Without the `watcher` feature, this delegates to `start_polling`.
182    #[cfg(not(feature = "watcher"))]
183    pub fn start(&mut self) -> anyhow::Result<()> {
184        self.start_polling()
185    }
186
187    /// Start watching the plugins directory using a polling loop.
188    ///
189    /// This is the fallback implementation that works without the `notify` crate.
190    /// It periodically scans the directory and compares file sizes to detect changes.
191    pub fn start_polling(&mut self) -> anyhow::Result<()> {
192        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
193        self.shutdown_tx = Some(shutdown_tx);
194
195        let event_tx = self.event_tx.clone();
196        let dir = self.config.plugins_dir.clone();
197        let debounce = self.config.debounce;
198
199        tokio::spawn(async move {
200            // Track both size and mtime to detect changes even when file size stays the same
201            // (common during rebuilds/strip operations).
202            let mut known_files: std::collections::HashMap<PathBuf, (u64, std::time::SystemTime)> =
203                std::collections::HashMap::new();
204
205            // Initial scan
206            if let Ok(entries) = std::fs::read_dir(&dir) {
207                for entry in entries.flatten() {
208                    let path = entry.path();
209                    let name = entry.file_name();
210                    if is_plugin_binary(&name.to_string_lossy()) {
211                        if let Ok(meta) = entry.metadata() {
212                            let mtime = meta.modified().unwrap_or(std::time::UNIX_EPOCH);
213                            known_files.insert(path, (meta.len(), mtime));
214                        }
215                    }
216                }
217            }
218
219            loop {
220                tokio::select! {
221                    _ = &mut shutdown_rx => {
222                        log::debug!("Plugin watcher shutting down");
223                        break;
224                    }
225                    _ = tokio::time::sleep(debounce) => {
226                        // Poll for changes
227                        let mut current_files: std::collections::HashMap<PathBuf, (u64, std::time::SystemTime)> =
228                            std::collections::HashMap::new();
229
230                        if let Ok(entries) = std::fs::read_dir(&dir) {
231                            for entry in entries.flatten() {
232                                let path = entry.path();
233                                let name = entry.file_name();
234                                if is_plugin_binary(&name.to_string_lossy()) {
235                                    if let Ok(meta) = entry.metadata() {
236                                        let mtime = meta.modified().unwrap_or(std::time::UNIX_EPOCH);
237                                        current_files.insert(path, (meta.len(), mtime));
238                                    }
239                                }
240                            }
241                        }
242
243                        // Detect additions and changes (by size or mtime)
244                        for (path, (size, mtime)) in &current_files {
245                            match known_files.get(path) {
246                                None => {
247                                    let _ = event_tx.send(PluginFileEvent::Added(path.clone()));
248                                }
249                                Some((old_size, old_mtime)) if old_size != size || old_mtime != mtime => {
250                                    let _ = event_tx.send(PluginFileEvent::Changed(path.clone()));
251                                }
252                                _ => {}
253                            }
254                        }
255
256                        // Detect removals
257                        for path in known_files.keys() {
258                            if !current_files.contains_key(path) {
259                                let _ = event_tx.send(PluginFileEvent::Removed(path.clone()));
260                            }
261                        }
262
263                        known_files = current_files;
264                    }
265                }
266            }
267        });
268
269        Ok(())
270    }
271
272    /// Stop the watcher.
273    pub fn stop(&mut self) {
274        if let Some(tx) = self.shutdown_tx.take() {
275            let _ = tx.send(());
276        }
277        #[cfg(feature = "watcher")]
278        {
279            self.notify_watcher = None;
280        }
281    }
282}
283
284impl Drop for PluginWatcher {
285    fn drop(&mut self) {
286        self.stop();
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn test_watcher_config_default() {
296        let config = PluginWatcherConfig::default();
297        assert_eq!(config.debounce, Duration::from_secs(2));
298    }
299
300    #[test]
301    fn test_watcher_creation() {
302        let config = PluginWatcherConfig {
303            plugins_dir: PathBuf::from("/tmp/plugins"),
304            debounce: Duration::from_millis(500),
305        };
306        let watcher = PluginWatcher::new(config);
307        let _rx = watcher.subscribe();
308    }
309
310    #[tokio::test]
311    async fn test_watcher_detects_new_file() {
312        let dir = tempfile::tempdir().expect("temp dir");
313        let config = PluginWatcherConfig {
314            plugins_dir: dir.path().to_path_buf(),
315            debounce: Duration::from_millis(100),
316        };
317
318        let mut watcher = PluginWatcher::new(config);
319        let mut rx = watcher.subscribe();
320        watcher.start_polling().expect("start");
321
322        // Wait for initial scan
323        tokio::time::sleep(Duration::from_millis(150)).await;
324
325        // Add a plugin file
326        std::fs::write(
327            dir.path().join("libdrasi_source_test.dylib"),
328            b"fake plugin",
329        )
330        .expect("write");
331
332        // Wait for detection
333        tokio::time::sleep(Duration::from_millis(200)).await;
334
335        // Should have received an Added event
336        let event = rx.try_recv();
337        assert!(event.is_ok(), "expected event, got {event:?}");
338        match event.expect("event") {
339            PluginFileEvent::Added(path) => {
340                assert!(path.to_string_lossy().contains("libdrasi_source_test"));
341            }
342            other => panic!("expected Added, got {other:?}"),
343        }
344
345        watcher.stop();
346    }
347}