Skip to main content

sqry_core/session/
watcher.rs

1//! Filesystem watcher utility for session cache invalidation.
2//!
3//! Wraps the `notify` crate with a small abstraction that tracks callbacks
4//! keyed by the watched file path. Each callback is triggered when the
5//! corresponding workspace's `.sqry/graph/manifest.json` file is modified;
6//! callers typically use this to invalidate the in-memory cache entry for
7//! that workspace. The manifest is the canonical marker emitted by
8//! `build_unified_graph_inner` (see
9//! `graph/unified/persistence/mod.rs`'s `GRAPH_DIR_NAME` /
10//! `MANIFEST_FILE_NAME` constants); the legacy `.sqry-index` placeholder
11//! was never written by the live build pipeline.
12//!
13//! ## RR-10 Gap #3: Bounded Event Queue (`DoS` Prevention)
14//!
15//! Uses a bounded synchronous channel (`sync_channel`) instead of unbounded
16//! channel to prevent memory exhaustion attacks via filesystem event flooding.
17//! The queue capacity is configurable via `SQRY_WATCH_EVENT_QUEUE` environment
18//! variable (default: 10,000 events).
19
20use std::collections::HashMap;
21use std::ffi::OsStr;
22use std::path::{Path, PathBuf};
23use std::sync::mpsc::{Receiver, TryRecvError};
24use std::sync::{Arc, Mutex, MutexGuard};
25
26use notify::{
27    Config as NotifyConfig, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
28};
29
30use super::error::{SessionError, SessionResult};
31use crate::config::buffers::watch_event_queue_capacity;
32
33/// File name of the canonical sqry graph manifest. The live build pipeline
34/// writes `<workspace>/.sqry/graph/manifest.json` via
35/// `graph/unified/persistence::GraphStorage`; this watcher matches events
36/// whose target leaf name equals this constant and whose parent directory
37/// chain is `.sqry/graph/`.
38const MANIFEST_FILE_NAME: &str = "manifest.json";
39
40/// Directory segment containing [`MANIFEST_FILE_NAME`].
41const GRAPH_DIR_SEGMENT: &str = "graph";
42
43/// Parent directory of [`GRAPH_DIR_SEGMENT`]. The full canonical relative
44/// path is `.sqry/graph/manifest.json`.
45const SQRY_DIR_SEGMENT: &str = ".sqry";
46
47/// Build the canonical manifest path watched for changes inside `workspace`.
48///
49/// Returns `<workspace>/.sqry/graph/manifest.json`. Currently only the
50/// in-module tests construct paths through this helper; production callers
51/// (e.g. `session::manager::register_watcher`) already hold a fully-formed
52/// `GraphStorage` path and pass it directly.
53#[cfg(test)]
54fn manifest_path(workspace: &Path) -> PathBuf {
55    workspace
56        .join(SQRY_DIR_SEGMENT)
57        .join(GRAPH_DIR_SEGMENT)
58        .join(MANIFEST_FILE_NAME)
59}
60
61type Callback = Arc<dyn Fn() + Send + Sync + 'static>;
62
63struct WatcherState {
64    watcher: RecommendedWatcher,
65    rx: Receiver<notify::Result<Event>>,
66    callbacks: Arc<Mutex<HashMap<PathBuf, Callback>>>,
67}
68
69impl WatcherState {
70    fn lock_callbacks(&self) -> MutexGuard<'_, HashMap<PathBuf, Callback>> {
71        self.callbacks
72            .lock()
73            .unwrap_or_else(std::sync::PoisonError::into_inner)
74    }
75}
76
77/// Lightweight wrapper around `notify` for watching
78/// `.sqry/graph/manifest.json` files.
79pub struct FileWatcher {
80    state: Option<WatcherState>,
81}
82
83impl FileWatcher {
84    /// Create an active file watcher.
85    ///
86    /// # Errors
87    ///
88    /// Returns [`SessionError`] when the underlying `notify` watcher cannot be initialised.
89    pub fn new() -> SessionResult<Self> {
90        // RR-10 Gap #3: Use bounded channel to prevent DoS via event flooding
91        // Configurable via SQRY_WATCH_EVENT_QUEUE (default: 10,000)
92        let capacity = watch_event_queue_capacity();
93        let (tx, rx) = std::sync::mpsc::sync_channel(capacity);
94
95        let watcher = RecommendedWatcher::new(
96            move |event| {
97                let _ = tx.send(event);
98            },
99            NotifyConfig::default(),
100        )
101        .map_err(SessionError::WatcherInit)?;
102
103        Ok(Self {
104            state: Some(WatcherState {
105                watcher,
106                rx,
107                callbacks: Arc::new(Mutex::new(HashMap::new())),
108            }),
109        })
110    }
111
112    /// Create a disabled watcher (used when file watching is turned off).
113    #[must_use]
114    pub fn disabled() -> Self {
115        Self { state: None }
116    }
117
118    /// Register a path for change notifications.
119    ///
120    /// When the underlying `.sqry/graph/manifest.json` file changes,
121    /// `on_change` is invoked. The `path` argument is forwarded directly to
122    /// the underlying `notify` watcher — callers that already know the
123    /// manifest's location (e.g. `GraphStorage::manifest_path()`) pass it
124    /// here.
125    ///
126    /// # Errors
127    ///
128    /// Returns [`SessionError`] when the watcher cannot register the path.
129    pub fn watch<F>(&mut self, path: PathBuf, on_change: F) -> SessionResult<()>
130    where
131        F: Fn() + Send + Sync + 'static,
132    {
133        let Some(state) = &mut self.state else {
134            // Watching disabled; nothing to do.
135            return Ok(());
136        };
137
138        // Avoid duplicate registrations for the same workspace.
139        if state.lock_callbacks().contains_key(&path) {
140            return Ok(());
141        }
142
143        state
144            .watcher
145            .watch(&path, RecursiveMode::NonRecursive)
146            .map_err(|source| SessionError::WatchIndex {
147                path: path.clone(),
148                source,
149            })?;
150
151        state.lock_callbacks().insert(path, Arc::new(on_change));
152
153        Ok(())
154    }
155
156    /// Stop watching a path.
157    ///
158    /// # Errors
159    ///
160    /// Returns [`SessionError`] when the watcher fails to unregister the path.
161    pub fn unwatch(&mut self, path: &Path) -> SessionResult<()> {
162        let Some(state) = &mut self.state else {
163            return Ok(());
164        };
165
166        if state.lock_callbacks().remove(path).is_some() {
167            state
168                .watcher
169                .unwatch(path)
170                .map_err(|source| SessionError::UnwatchIndex {
171                    path: path.to_path_buf(),
172                    source,
173                })?;
174        }
175
176        Ok(())
177    }
178
179    /// Returns the set of paths currently registered for change
180    /// notifications. Test-only helper used to assert that production
181    /// callers wire the watcher to the correct artifact path
182    /// (`.sqry/graph/manifest.json`); accessing it from non-test code
183    /// would defeat the encapsulation of the callback table.
184    #[cfg(test)]
185    #[must_use]
186    pub(crate) fn watched_paths(&self) -> Vec<PathBuf> {
187        self.state
188            .as_ref()
189            .map(|state| state.lock_callbacks().keys().cloned().collect())
190            .unwrap_or_default()
191    }
192
193    /// Drain pending filesystem events and invoke registered callbacks.
194    ///
195    /// # Errors
196    ///
197    /// Returns [`SessionError`] when event processing fails catastrophically (e.g., callback errors).
198    pub fn process_events(&mut self) -> SessionResult<()> {
199        let Some(state) = &mut self.state else {
200            return Ok(());
201        };
202
203        loop {
204            match state.rx.try_recv() {
205                Ok(Ok(event)) => Self::handle_event(state, &event),
206                Ok(Err(err)) => {
207                    log::warn!("file watcher error: {err}");
208                }
209                Err(TryRecvError::Empty | TryRecvError::Disconnected) => break,
210            }
211        }
212
213        Ok(())
214    }
215
216    /// Wait for a duration while processing events
217    ///
218    /// Unlike `thread::sleep()` followed by `process_events()`, this actively
219    /// drains and processes events during the wait period. This is crucial for
220    /// macOS `FSEvents` which may deliver batched notifications with higher latency.
221    ///
222    /// Reference: `CI_FAILURE_REMEDIATION_PLAN.md` Section 2
223    ///
224    /// # Errors
225    ///
226    /// Returns [`SessionError`] when the event loop encounters an unrecoverable error.
227    pub fn wait_and_process(&mut self, duration: std::time::Duration) -> SessionResult<()> {
228        let Some(state) = &mut self.state else {
229            return Ok(());
230        };
231
232        let deadline = std::time::Instant::now() + duration;
233
234        while std::time::Instant::now() < deadline {
235            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
236            let poll_interval = std::time::Duration::from_millis(10).min(remaining);
237
238            match state.rx.recv_timeout(poll_interval) {
239                Ok(Ok(event)) => Self::handle_event(state, &event),
240                Ok(Err(err)) => {
241                    log::warn!("file watcher error: {err}");
242                }
243                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
244                    // Continue waiting
245                }
246                Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
247                    break;
248                }
249            }
250        }
251
252        Ok(())
253    }
254
255    fn handle_event(state: &WatcherState, event: &Event) {
256        use EventKind::{Any, Create, Modify, Remove};
257
258        let relevant = matches!(event.kind, Modify(_) | Create(_) | Remove(_) | Any);
259
260        if !relevant {
261            return;
262        }
263
264        // Collect callbacks to invoke outside the mutex.
265        let mut callbacks_to_run: Vec<Callback> = Vec::new();
266
267        {
268            let callbacks = state.lock_callbacks();
269            for path in &event.paths {
270                // Only react to writes targeting `.sqry/graph/manifest.json`.
271                // We require the full parent chain so unrelated `manifest.json`
272                // files (e.g. NPM package manifests) cannot trigger spurious
273                // invalidations.
274                if path
275                    .file_name()
276                    .is_some_and(|name| name == OsStr::new(MANIFEST_FILE_NAME))
277                    && let Some(graph_dir) = path.parent()
278                    && graph_dir
279                        .file_name()
280                        .is_some_and(|name| name == OsStr::new(GRAPH_DIR_SEGMENT))
281                    && let Some(sqry_dir) = graph_dir.parent()
282                    && sqry_dir
283                        .file_name()
284                        .is_some_and(|name| name == OsStr::new(SQRY_DIR_SEGMENT))
285                    && let Some(callback) = callbacks.get(path)
286                {
287                    callbacks_to_run.push(Arc::clone(callback));
288                }
289            }
290        }
291
292        for callback in callbacks_to_run {
293            callback();
294        }
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use std::sync::atomic::{AtomicBool, Ordering};
302    use std::time::Duration;
303    use tempfile::tempdir;
304
305    fn event_timeout() -> Duration {
306        // CI environments need more generous timeouts due to resource constraints
307        let base = if cfg!(target_os = "macos") {
308            Duration::from_secs(3)
309        } else {
310            Duration::from_secs(1) // Increased from 250ms for CI stability
311        };
312
313        // Double timeout in CI environment
314        if std::env::var("CI").is_ok() {
315            base * 2
316        } else {
317            base
318        }
319    }
320
321    #[test]
322    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
323    fn detects_changes_to_index_file() {
324        let temp = tempdir().unwrap();
325        let workspace = temp.path();
326        let manifest = manifest_path(workspace);
327        std::fs::create_dir_all(manifest.parent().unwrap()).unwrap();
328        std::fs::write(&manifest, b"initial").unwrap();
329
330        let mut watcher = FileWatcher::new().unwrap();
331
332        let triggered = Arc::new(AtomicBool::new(false));
333        let flag = Arc::clone(&triggered);
334        watcher
335            .watch(manifest.clone(), move || {
336                flag.store(true, Ordering::SeqCst);
337            })
338            .unwrap();
339
340        std::fs::write(&manifest, b"modified").unwrap();
341
342        watcher.wait_and_process(event_timeout()).unwrap();
343
344        assert!(triggered.load(Ordering::SeqCst));
345    }
346
347    #[test]
348    fn disabled_watcher_is_noop() {
349        let temp = tempdir().unwrap();
350        let workspace = temp.path();
351        let manifest = manifest_path(workspace);
352        std::fs::create_dir_all(manifest.parent().unwrap()).unwrap();
353        std::fs::write(&manifest, b"data").unwrap();
354
355        let mut watcher = FileWatcher::disabled();
356        watcher
357            .watch(manifest, || {
358                panic!("disabled watcher should not invoke callback");
359            })
360            .unwrap();
361        // No events should be processed, but method should be callable.
362        watcher.process_events().unwrap();
363    }
364}