drasi_host_sdk/
watcher.rs1use std::path::PathBuf;
30use std::time::Duration;
31
32use tokio::sync::broadcast;
33
34use crate::loader::is_plugin_binary;
35use crate::plugin_types::PluginFileEvent;
36
37#[derive(Debug, Clone)]
39pub struct PluginWatcherConfig {
40 pub plugins_dir: PathBuf,
42 pub debounce: Duration,
44}
45
46impl Default for PluginWatcherConfig {
47 fn default() -> Self {
48 Self {
49 plugins_dir: PathBuf::from("plugins"),
50 debounce: Duration::from_secs(2),
51 }
52 }
53}
54
55pub struct PluginWatcher {
61 config: PluginWatcherConfig,
62 event_tx: broadcast::Sender<PluginFileEvent>,
63 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
64 #[cfg(feature = "watcher")]
65 #[allow(dead_code)]
66 notify_watcher: Option<notify_debouncer_mini::Debouncer<notify::RecommendedWatcher>>,
67}
68
69impl PluginWatcher {
70 pub fn new(config: PluginWatcherConfig) -> Self {
72 let (event_tx, _) = broadcast::channel(64);
73 Self {
74 config,
75 event_tx,
76 shutdown_tx: None,
77 #[cfg(feature = "watcher")]
78 notify_watcher: None,
79 }
80 }
81
82 pub fn subscribe(&self) -> broadcast::Receiver<PluginFileEvent> {
84 self.event_tx.subscribe()
85 }
86
87 pub fn event_sender(&self) -> &broadcast::Sender<PluginFileEvent> {
89 &self.event_tx
90 }
91
92 #[cfg(feature = "watcher")]
98 pub fn start(&mut self) -> anyhow::Result<()> {
99 use notify_debouncer_mini::new_debouncer;
100 use std::sync::mpsc;
101
102 let (tx, rx) = mpsc::channel();
103 let debounce = self.config.debounce;
104
105 let mut debouncer = new_debouncer(debounce, tx)?;
106 debouncer.watcher().watch(
107 &self.config.plugins_dir,
108 notify::RecursiveMode::NonRecursive,
109 )?;
110
111 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
112 self.shutdown_tx = Some(shutdown_tx);
113
114 let event_tx = self.event_tx.clone();
115 let dir = self.config.plugins_dir.clone();
116
117 let mut known_files: std::collections::HashSet<PathBuf> = std::collections::HashSet::new();
119 if let Ok(entries) = std::fs::read_dir(&dir) {
120 for entry in entries.flatten() {
121 let name = entry.file_name();
122 if is_plugin_binary(&name.to_string_lossy()) {
123 known_files.insert(entry.path());
124 }
125 }
126 }
127
128 tokio::spawn(async move {
129 loop {
130 tokio::select! {
131 _ = &mut shutdown_rx => {
132 log::debug!("Plugin watcher (notify) shutting down");
133 break;
134 }
135 _ = tokio::time::sleep(Duration::from_millis(100)) => {
137 while let Ok(result) = rx.try_recv() {
138 match result {
139 Ok(events) => {
140 for event in events {
141 let path = event.path;
142 let name = path.file_name()
143 .map(|n| n.to_string_lossy().to_string())
144 .unwrap_or_default();
145
146 if !is_plugin_binary(&name) {
147 continue;
148 }
149
150 if path.exists() {
151 if known_files.contains(&path) {
152 let _ = event_tx.send(PluginFileEvent::Changed(path));
153 } else {
154 known_files.insert(path.clone());
155 let _ = event_tx.send(PluginFileEvent::Added(path));
156 }
157 } else {
158 known_files.remove(&path);
159 let _ = event_tx.send(PluginFileEvent::Removed(path));
160 }
161 }
162 }
163 Err(err) => {
164 log::warn!("Filesystem watcher error: {err}");
165 }
166 }
167 }
168 }
169 }
170 }
171 });
172
173 self.notify_watcher = Some(debouncer);
175
176 Ok(())
177 }
178
179 #[cfg(not(feature = "watcher"))]
183 pub fn start(&mut self) -> anyhow::Result<()> {
184 self.start_polling()
185 }
186
187 pub fn start_polling(&mut self) -> anyhow::Result<()> {
192 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
193 self.shutdown_tx = Some(shutdown_tx);
194
195 let event_tx = self.event_tx.clone();
196 let dir = self.config.plugins_dir.clone();
197 let debounce = self.config.debounce;
198
199 tokio::spawn(async move {
200 let mut known_files: std::collections::HashMap<PathBuf, (u64, std::time::SystemTime)> =
203 std::collections::HashMap::new();
204
205 if let Ok(entries) = std::fs::read_dir(&dir) {
207 for entry in entries.flatten() {
208 let path = entry.path();
209 let name = entry.file_name();
210 if is_plugin_binary(&name.to_string_lossy()) {
211 if let Ok(meta) = entry.metadata() {
212 let mtime = meta.modified().unwrap_or(std::time::UNIX_EPOCH);
213 known_files.insert(path, (meta.len(), mtime));
214 }
215 }
216 }
217 }
218
219 loop {
220 tokio::select! {
221 _ = &mut shutdown_rx => {
222 log::debug!("Plugin watcher shutting down");
223 break;
224 }
225 _ = tokio::time::sleep(debounce) => {
226 let mut current_files: std::collections::HashMap<PathBuf, (u64, std::time::SystemTime)> =
228 std::collections::HashMap::new();
229
230 if let Ok(entries) = std::fs::read_dir(&dir) {
231 for entry in entries.flatten() {
232 let path = entry.path();
233 let name = entry.file_name();
234 if is_plugin_binary(&name.to_string_lossy()) {
235 if let Ok(meta) = entry.metadata() {
236 let mtime = meta.modified().unwrap_or(std::time::UNIX_EPOCH);
237 current_files.insert(path, (meta.len(), mtime));
238 }
239 }
240 }
241 }
242
243 for (path, (size, mtime)) in ¤t_files {
245 match known_files.get(path) {
246 None => {
247 let _ = event_tx.send(PluginFileEvent::Added(path.clone()));
248 }
249 Some((old_size, old_mtime)) if old_size != size || old_mtime != mtime => {
250 let _ = event_tx.send(PluginFileEvent::Changed(path.clone()));
251 }
252 _ => {}
253 }
254 }
255
256 for path in known_files.keys() {
258 if !current_files.contains_key(path) {
259 let _ = event_tx.send(PluginFileEvent::Removed(path.clone()));
260 }
261 }
262
263 known_files = current_files;
264 }
265 }
266 }
267 });
268
269 Ok(())
270 }
271
272 pub fn stop(&mut self) {
274 if let Some(tx) = self.shutdown_tx.take() {
275 let _ = tx.send(());
276 }
277 #[cfg(feature = "watcher")]
278 {
279 self.notify_watcher = None;
280 }
281 }
282}
283
284impl Drop for PluginWatcher {
285 fn drop(&mut self) {
286 self.stop();
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 #[test]
295 fn test_watcher_config_default() {
296 let config = PluginWatcherConfig::default();
297 assert_eq!(config.debounce, Duration::from_secs(2));
298 }
299
300 #[test]
301 fn test_watcher_creation() {
302 let config = PluginWatcherConfig {
303 plugins_dir: PathBuf::from("/tmp/plugins"),
304 debounce: Duration::from_millis(500),
305 };
306 let watcher = PluginWatcher::new(config);
307 let _rx = watcher.subscribe();
308 }
309
310 #[tokio::test]
311 async fn test_watcher_detects_new_file() {
312 let dir = tempfile::tempdir().expect("temp dir");
313 let config = PluginWatcherConfig {
314 plugins_dir: dir.path().to_path_buf(),
315 debounce: Duration::from_millis(100),
316 };
317
318 let mut watcher = PluginWatcher::new(config);
319 let mut rx = watcher.subscribe();
320 watcher.start_polling().expect("start");
321
322 tokio::time::sleep(Duration::from_millis(150)).await;
324
325 std::fs::write(
327 dir.path().join("libdrasi_source_test.dylib"),
328 b"fake plugin",
329 )
330 .expect("write");
331
332 tokio::time::sleep(Duration::from_millis(200)).await;
334
335 let event = rx.try_recv();
337 assert!(event.is_ok(), "expected event, got {event:?}");
338 match event.expect("event") {
339 PluginFileEvent::Added(path) => {
340 assert!(path.to_string_lossy().contains("libdrasi_source_test"));
341 }
342 other => panic!("expected Added, got {other:?}"),
343 }
344
345 watcher.stop();
346 }
347}