Skip to main content

sqry_core/session/
manager.rs

1//! Session cache manager for warm multi-query execution.
2//!
3//! The implementation keeps recently-used unified code graphs in-memory
4//! so subsequent queries avoid disk deserialization. Filesystem watching
5//! ensures cache entries are invalidated automatically when the on-disk
6//! graph changes. A background maintenance thread processes file watcher
7//! events and reclaims idle cache entries.
8
9use std::fs;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18use log::{debug, info};
19
20use crate::graph::CodeGraph;
21use crate::graph::unified::persistence::{GraphStorage, load_from_path};
22use crate::plugin::PluginManager;
23use crate::query::QueryExecutor;
24
25use super::cache::CachedIndex;
26use super::watcher::FileWatcher;
27use super::{SessionConfig, SessionError, SessionResult};
28
29/// Manages cached unified code graphs for a workspace session.
30pub struct SessionManager {
31    cache: Arc<DashMap<PathBuf, CachedIndex>>,
32    config: SessionConfig,
33    watcher: Arc<Mutex<FileWatcher>>,
34    cleanup_handle: Option<JoinHandle<()>>,
35    shutdown: Arc<AtomicBool>,
36    total_queries: Arc<AtomicU64>,
37    cache_hits: Arc<AtomicU64>,
38    cache_misses: Arc<AtomicU64>,
39    /// Query executor with properly configured plugin manager
40    query_executor: QueryExecutor,
41}
42
43impl SessionManager {
44    /// Create a session manager with default configuration and an empty plugin manager.
45    ///
46    /// **Note:** For proper query execution with language support, use
47    /// [`with_plugin_manager()`](Self::with_plugin_manager) instead, passing a
48    /// `PluginManager` configured with the language plugins you need.
49    ///
50    /// # Errors
51    ///
52    /// Returns [`SessionError`] when the background watcher thread cannot be spawned or when
53    /// watcher initialization fails.
54    pub fn new() -> SessionResult<Self> {
55        Self::with_config_and_plugins(SessionConfig::default(), PluginManager::new())
56    }
57
58    /// Create a session manager with a pre-configured plugin manager.
59    ///
60    /// This is the recommended constructor for production use. The `PluginManager`
61    /// should be created via `sqry_plugin_registry::create_plugin_manager()` or
62    /// configured with the language plugins you need for query evaluation.
63    ///
64    /// # Arguments
65    ///
66    /// * `plugin_manager` - A `PluginManager` configured with language plugins
67    ///
68    /// # Errors
69    ///
70    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
71    pub fn with_plugin_manager(plugin_manager: PluginManager) -> SessionResult<Self> {
72        Self::with_config_and_plugins(SessionConfig::default(), plugin_manager)
73    }
74
75    /// Create a session manager with the provided configuration.
76    ///
77    /// **Note:** This uses an empty plugin manager. For proper query execution,
78    /// use [`with_config_and_plugins()`](Self::with_config_and_plugins) instead.
79    ///
80    /// # Errors
81    ///
82    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
83    pub fn with_config(config: SessionConfig) -> SessionResult<Self> {
84        Self::with_config_and_plugins(config, PluginManager::new())
85    }
86
87    /// Create a session manager with configuration and a pre-configured plugin manager.
88    ///
89    /// This is the most flexible constructor, allowing full control over both
90    /// session configuration and plugin setup.
91    ///
92    /// # Arguments
93    ///
94    /// * `config` - Session configuration (cache size, timeouts, etc.)
95    /// * `plugin_manager` - A `PluginManager` configured with language plugins
96    ///
97    /// # Errors
98    ///
99    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
100    pub fn with_config_and_plugins(
101        config: SessionConfig,
102        plugin_manager: PluginManager,
103    ) -> SessionResult<Self> {
104        let watcher = if config.enable_file_watching {
105            FileWatcher::new()?
106        } else {
107            FileWatcher::disabled()
108        };
109
110        let cache = Arc::new(DashMap::new());
111        let watcher = Arc::new(Mutex::new(watcher));
112        let shutdown = Arc::new(AtomicBool::new(false));
113
114        let cleanup_interval = config.cleanup_interval;
115        let idle_timeout = config.idle_timeout;
116
117        let cleanup_handle = {
118            let cache = Arc::clone(&cache);
119            let watcher_clone = Arc::clone(&watcher);
120            let shutdown_flag = Arc::clone(&shutdown);
121
122            thread::Builder::new()
123                .name("sqry-session-cleanup".into())
124                .spawn(move || {
125                    while !shutdown_flag.load(Ordering::Relaxed) {
126                        if let Ok(mut guard) = watcher_clone.lock()
127                            && let Err(err) = guard.process_events()
128                        {
129                            debug!("failed to process session watcher events: {err}");
130                        }
131
132                        let now = Instant::now();
133                        if let Err(err) =
134                            Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
135                        {
136                            debug!("failed to evict stale sessions: {err}");
137                        }
138
139                        thread::park_timeout(cleanup_interval);
140                    }
141
142                    if let Ok(mut guard) = watcher_clone.lock()
143                        && let Err(err) = guard.process_events()
144                    {
145                        debug!("failed to process session watcher events during shutdown: {err}");
146                    }
147                    let now = Instant::now();
148                    if let Err(err) =
149                        Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
150                    {
151                        debug!("failed to evict stale sessions during shutdown: {err}");
152                    }
153                })
154                .map_err(SessionError::SpawnThread)?
155        };
156
157        // Create query executor with the provided plugin manager
158        let query_executor = QueryExecutor::with_plugin_manager(plugin_manager);
159
160        Ok(Self {
161            cache,
162            config,
163            watcher,
164            cleanup_handle: Some(cleanup_handle),
165            shutdown,
166            total_queries: Arc::new(AtomicU64::new(0)),
167            cache_hits: Arc::new(AtomicU64::new(0)),
168            cache_misses: Arc::new(AtomicU64::new(0)),
169            query_executor,
170        })
171    }
172
173    /// Get the cached code graph for `path`, loading it from disk on demand.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`SessionError`] when the graph cannot be loaded.
178    pub fn get_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
179        self.get_or_load_graph(path)
180    }
181
182    /// Query symbols from the cached graph for `path`.
183    ///
184    /// This method loads the graph on demand and returns symbols that match
185    /// the query criteria. The query string is parsed and evaluated against
186    /// all symbols in the graph.
187    ///
188    /// # Errors
189    ///
190    /// Returns [`SessionError`] when the graph cannot be loaded, query parsing fails,
191    /// or query execution fails.
192    pub fn query(
193        &self,
194        path: &Path,
195        query_str: &str,
196    ) -> SessionResult<crate::query::results::QueryResults> {
197        // Use execute_on_graph which handles graph loading and caching internally
198        let results = self
199            .query_executor
200            .execute_on_graph(query_str, path)
201            .map_err(|e| {
202                // Determine if this is a parse error or execution error
203                let error_msg = e.to_string();
204                if error_msg.contains("parse")
205                    || error_msg.contains("unexpected")
206                    || error_msg.contains("expected")
207                {
208                    SessionError::QueryParse(error_msg)
209                } else {
210                    SessionError::QueryExecution(e)
211                }
212            })?;
213
214        Ok(results)
215    }
216
217    /// Remove the cached graph for `path`, if present.
218    ///
219    /// # Errors
220    ///
221    /// Returns [`SessionError`] when the file watcher fails to unwatch the path.
222    pub fn invalidate(&self, path: &Path) -> SessionResult<()> {
223        self.cache.remove(path);
224        if let Ok(mut watcher) = self.watcher.lock() {
225            watcher.unwatch(path)?;
226        }
227        Ok(())
228    }
229
230    /// Ensure the graph for `path` is loaded into the session cache without
231    /// affecting query statistics.
232    ///
233    /// # Errors
234    ///
235    /// Returns [`SessionError`] when loading from disk fails.
236    pub fn preload(&self, path: &Path) -> SessionResult<()> {
237        if self.cache.contains_key(path) {
238            return Ok(());
239        }
240
241        if self.cache.len() >= self.config.max_cached_indexes {
242            self.evict_lru();
243        }
244
245        self.load_graph_from_disk(path)?;
246        Ok(())
247    }
248
249    /// Return aggregate session statistics.
250    #[must_use]
251    pub fn stats(&self) -> SessionStats {
252        SessionStats {
253            cached_graphs: self.cache.len(),
254            total_queries: self.total_queries.load(Ordering::Relaxed),
255            cache_hits: self.cache_hits.load(Ordering::Relaxed),
256            cache_misses: self.cache_misses.load(Ordering::Relaxed),
257            total_memory_mb: self.cache.len() * 51,
258        }
259    }
260
261    /// Evict entries that have been idle longer than configured timeout.
262    ///
263    /// # Errors
264    ///
265    /// Returns [`SessionError`] when watcher operations fail while removing paths.
266    pub fn evict_stale(&self, now: Instant) -> SessionResult<usize> {
267        Self::evict_stale_for(&self.cache, &self.watcher, self.config.idle_timeout, now)
268    }
269
270    fn get_or_load_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
271        debug_assert!(
272            self.config.max_cached_indexes > 0,
273            "SessionConfig::max_cached_indexes must be at least 1"
274        );
275        self.total_queries.fetch_add(1, Ordering::Relaxed);
276
277        if let Some(entry) = self.cache.get_mut(path) {
278            entry.value().access();
279            self.cache_hits.fetch_add(1, Ordering::Relaxed);
280            return Ok(Arc::clone(&entry.value().graph));
281        }
282
283        self.cache_misses.fetch_add(1, Ordering::Relaxed);
284
285        if self.cache.len() >= self.config.max_cached_indexes {
286            self.evict_lru();
287        }
288
289        let graph = self.load_graph_from_disk(path)?;
290
291        if let Some(entry) = self.cache.get_mut(path) {
292            entry.value().access();
293        }
294
295        Ok(graph)
296    }
297
298    fn load_graph_from_disk(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
299        let storage = GraphStorage::new(path);
300        let snapshot_path = storage.snapshot_path();
301
302        let metadata =
303            fs::metadata(snapshot_path).map_err(|source| SessionError::IndexMetadata {
304                path: snapshot_path.to_path_buf(),
305                source,
306            })?;
307        let file_mtime = metadata
308            .modified()
309            .map_err(|source| SessionError::IndexMetadata {
310                path: snapshot_path.to_path_buf(),
311                source,
312            })?;
313
314        let graph = load_from_path(snapshot_path, Some(self.query_executor.plugin_manager()))
315            .map_err(|source| SessionError::IndexLoad {
316                path: snapshot_path.to_path_buf(),
317                source: source.into(),
318            })?;
319        let arc_graph = Arc::new(graph);
320
321        let cached = CachedIndex::new(Arc::clone(&arc_graph), file_mtime);
322        self.cache.insert(path.to_path_buf(), cached);
323
324        self.register_watcher(path, storage.manifest_path());
325
326        Ok(arc_graph)
327    }
328
329    /// Register a filesystem watch for the workspace's
330    /// `.sqry/graph/manifest.json` so cache entries are invalidated when
331    /// the live build pipeline rewrites the manifest. The watcher's
332    /// `handle_event` filter matches only manifest writes (see
333    /// `session::watcher::FileWatcher::handle_event`), so the registered
334    /// path **must** be the manifest path; passing the snapshot path here
335    /// would silently disable invalidation because no callback would ever
336    /// match the filtered event path.
337    fn register_watcher(&self, workspace_path: &Path, manifest_path: &Path) {
338        if let Ok(mut watcher) = self.watcher.lock() {
339            let cache = Arc::clone(&self.cache);
340            let callback_path = workspace_path.to_path_buf();
341            let watch_path = manifest_path.to_path_buf();
342            if let Err(err) = watcher.watch(watch_path.clone(), move || {
343                cache.remove(&callback_path);
344                info!(
345                    "Invalidated session cache after graph change: {}",
346                    callback_path.display()
347                );
348            }) {
349                debug!(
350                    "failed to register file watcher for {}: {err}",
351                    watch_path.display()
352                );
353            }
354        }
355    }
356
357    /// Evict least-recently-used entry when cache is at capacity.
358    fn evict_lru(&self) {
359        let mut oldest: Option<(PathBuf, Instant)> = None;
360
361        for entry in self.cache.iter() {
362            let last_accessed = entry.value().last_accessed();
363            if oldest
364                .as_ref()
365                .is_none_or(|(_, instant)| last_accessed < *instant)
366            {
367                oldest = Some((entry.key().clone(), last_accessed));
368            }
369        }
370
371        if let Some((path, _)) = oldest {
372            self.cache.remove(&path);
373            debug!("evicted LRU session cache entry: {}", path.display());
374            if let Ok(mut watcher) = self.watcher.lock() {
375                // Must mirror `register_watcher` and pass the manifest path —
376                // unwatching a different path silently no-ops because the
377                // watcher's callback table is keyed on the registered path.
378                let storage = GraphStorage::new(&path);
379                if let Err(err) = watcher.unwatch(storage.manifest_path()) {
380                    debug!("failed to unwatch session path {}: {err}", path.display());
381                }
382            }
383        }
384    }
385
386    fn evict_stale_for(
387        cache: &Arc<DashMap<PathBuf, CachedIndex>>,
388        watcher: &Arc<Mutex<FileWatcher>>,
389        timeout: Duration,
390        now: Instant,
391    ) -> SessionResult<usize> {
392        let mut to_remove = Vec::new();
393
394        for entry in cache.iter() {
395            let idle = now.duration_since(entry.value().last_accessed());
396            if idle > timeout {
397                to_remove.push(entry.key().clone());
398            }
399        }
400
401        for path in &to_remove {
402            cache.remove(path);
403        }
404
405        if !to_remove.is_empty()
406            && let Ok(mut guard) = watcher.lock()
407        {
408            for path in &to_remove {
409                // Must match the path used in `register_watcher`
410                // (`.sqry/graph/manifest.json`), otherwise the watcher
411                // callback table never receives the unwatch and the entry
412                // leaks.
413                let storage = GraphStorage::new(path);
414                guard.unwatch(storage.manifest_path())?;
415            }
416        }
417
418        Ok(to_remove.len())
419    }
420
421    fn stop_worker(&mut self) {
422        self.shutdown.store(true, Ordering::Relaxed);
423
424        if let Some(handle) = self.cleanup_handle.take() {
425            handle.thread().unpark();
426            if let Err(err) = handle.join() {
427                debug!("session cleanup thread terminated with error: {err:?}");
428            }
429        }
430    }
431
432    /// Gracefully shut down the cleanup thread. Optional, as [`Drop`] handles it.
433    ///
434    /// # Errors
435    ///
436    /// Returns [`SessionError`] when shutting down watcher operations fails.
437    pub fn shutdown(mut self) -> SessionResult<()> {
438        self.stop_worker();
439        Ok(())
440    }
441}
442
443impl Drop for SessionManager {
444    fn drop(&mut self) {
445        self.stop_worker();
446    }
447}
448
449/// High-level statistics describing the current session cache.
450#[derive(Debug, Clone)]
451pub struct SessionStats {
452    /// Number of cached graphs retained in-memory.
453    pub cached_graphs: usize,
454    /// Total number of graph accesses via this manager.
455    pub total_queries: u64,
456    /// Number of accesses served from cache.
457    pub cache_hits: u64,
458    /// Number of accesses that triggered a load from disk.
459    pub cache_misses: u64,
460    /// Estimated total memory footprint in megabytes.
461    pub total_memory_mb: usize,
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use crate::graph::unified::persistence::save_to_path;
468    use std::sync::Arc;
469    use std::thread;
470    use std::time::{Duration, Instant};
471    use tempfile::tempdir;
472
473    fn write_empty_graph(dir: &Path) -> SessionResult<()> {
474        let storage = GraphStorage::new(dir);
475        fs::create_dir_all(storage.graph_dir()).map_err(|source| SessionError::IndexMetadata {
476            path: storage.graph_dir().to_path_buf(),
477            source,
478        })?;
479        let graph = CodeGraph::new();
480        save_to_path(&graph, storage.snapshot_path()).map_err(|source| {
481            SessionError::IndexLoad {
482                path: storage.snapshot_path().to_path_buf(),
483                source: source.into(),
484            }
485        })?;
486        // Production callers register the file watcher against
487        // `<workspace>/.sqry/graph/manifest.json` — and `notify` requires
488        // the watch target to exist before `watch()` is called. The live
489        // build pipeline writes the manifest alongside the snapshot, so
490        // mirror that here for tests that depend on watcher registration.
491        fs::write(storage.manifest_path(), b"{}").map_err(|source| {
492            SessionError::IndexMetadata {
493                path: storage.manifest_path().to_path_buf(),
494                source,
495            }
496        })?;
497        Ok(())
498    }
499
500    fn watcher_timeout() -> Duration {
501        let base = if cfg!(target_os = "macos") {
502            Duration::from_secs(3)
503        } else {
504            Duration::from_secs(2)
505        };
506
507        if std::env::var("CI").is_ok() {
508            base * 2
509        } else {
510            base
511        }
512    }
513
514    fn background_timeout() -> Duration {
515        let base = if cfg!(target_os = "macos") {
516            Duration::from_secs(5)
517        } else {
518            Duration::from_secs(3)
519        };
520
521        if std::env::var("CI").is_ok() {
522            base * 2
523        } else {
524            base
525        }
526    }
527
528    fn wait_until<F>(timeout: Duration, mut predicate: F) -> bool
529    where
530        F: FnMut() -> bool,
531    {
532        let deadline = Instant::now() + timeout;
533        loop {
534            if predicate() {
535                return true;
536            }
537            if Instant::now() >= deadline {
538                return false;
539            }
540            thread::sleep(Duration::from_millis(50));
541        }
542    }
543
544    #[test]
545    fn get_graph_loads_and_updates_stats() {
546        let temp = tempdir().unwrap();
547        write_empty_graph(temp.path()).unwrap();
548
549        let manager = SessionManager::new().unwrap();
550
551        let graph = manager.get_graph(temp.path()).unwrap();
552        assert_eq!(graph.snapshot().nodes().len(), 0);
553
554        let stats = manager.stats();
555        assert_eq!(stats.total_queries, 1);
556        assert_eq!(stats.cache_misses, 1);
557        assert_eq!(stats.cache_hits, 0);
558        assert_eq!(stats.cached_graphs, 1);
559
560        manager.shutdown().unwrap();
561    }
562
563    #[test]
564    fn get_graph_missing_returns_error() {
565        let temp = tempdir().unwrap();
566
567        let manager = SessionManager::new().unwrap();
568        let err = manager
569            .get_graph(temp.path())
570            .expect_err("get_graph should fail without graph");
571
572        assert!(matches!(err, SessionError::IndexMetadata { .. }));
573        manager.shutdown().unwrap();
574    }
575
576    #[test]
577    fn preload_does_not_affect_stats() {
578        let temp = tempdir().unwrap();
579        write_empty_graph(temp.path()).unwrap();
580
581        let manager = SessionManager::new().unwrap();
582        manager.preload(temp.path()).unwrap();
583
584        let stats = manager.stats();
585        assert_eq!(stats.total_queries, 0);
586        assert_eq!(stats.cache_hits, 0);
587        assert_eq!(stats.cache_misses, 0);
588        assert_eq!(stats.cached_graphs, 1);
589
590        manager.get_graph(temp.path()).unwrap();
591        let after = manager.stats();
592        assert_eq!(after.total_queries, 1);
593        assert_eq!(after.cache_hits, 1);
594        assert_eq!(after.cache_misses, 0);
595        assert_eq!(after.cached_graphs, 1);
596
597        manager.shutdown().unwrap();
598    }
599
600    #[test]
601    fn second_access_hits_cache() {
602        let temp = tempdir().unwrap();
603        write_empty_graph(temp.path()).unwrap();
604
605        let manager = SessionManager::new().unwrap();
606
607        manager.get_graph(temp.path()).unwrap();
608        manager.get_graph(temp.path()).unwrap();
609
610        let stats = manager.stats();
611        assert_eq!(stats.total_queries, 2);
612        assert_eq!(stats.cache_misses, 1);
613        assert_eq!(stats.cache_hits, 1);
614        assert_eq!(stats.cached_graphs, 1);
615
616        manager.shutdown().unwrap();
617    }
618
619    #[test]
620    fn concurrent_access_shares_cache() {
621        let temp = tempdir().unwrap();
622        write_empty_graph(temp.path()).unwrap();
623
624        let manager = Arc::new(SessionManager::new().unwrap());
625        let path = temp.path().to_path_buf();
626
627        let handles: Vec<_> = (0..6)
628            .map(|_| {
629                let mgr = Arc::clone(&manager);
630                let path = path.clone();
631                thread::spawn(move || {
632                    mgr.get_graph(&path).unwrap();
633                })
634            })
635            .collect();
636
637        for handle in handles {
638            handle.join().unwrap();
639        }
640
641        let manager = Arc::into_inner(manager).expect("no outstanding references");
642        manager.shutdown().unwrap();
643    }
644
645    #[test]
646    fn invalidate_removes_cached_graph() {
647        let temp = tempdir().unwrap();
648        write_empty_graph(temp.path()).unwrap();
649
650        let manager = SessionManager::new().unwrap();
651        manager.get_graph(temp.path()).unwrap();
652        assert_eq!(manager.stats().cached_graphs, 1);
653
654        manager.invalidate(temp.path()).unwrap();
655        assert_eq!(manager.stats().cached_graphs, 0);
656
657        manager.shutdown().unwrap();
658    }
659
660    #[test]
661    fn lru_eviction_removes_oldest_entry() {
662        let temp = tempdir().unwrap();
663        let base = temp.path();
664
665        let config = SessionConfig {
666            max_cached_indexes: 2,
667            ..SessionConfig::default()
668        };
669        let manager = SessionManager::with_config(config).unwrap();
670
671        let repo1 = base.join("repo1");
672        let repo2 = base.join("repo2");
673        let repo3 = base.join("repo3");
674        write_empty_graph(&repo1).unwrap();
675        write_empty_graph(&repo2).unwrap();
676        write_empty_graph(&repo3).unwrap();
677
678        manager.get_graph(&repo1).unwrap();
679        manager.get_graph(&repo2).unwrap();
680
681        // Access repo2 again so repo1 stays LRU.
682        manager.get_graph(&repo2).unwrap();
683        manager.get_graph(&repo3).unwrap();
684
685        assert_eq!(manager.stats().cached_graphs, 2);
686        assert!(manager.cache.contains_key(&repo2));
687        assert!(manager.cache.contains_key(&repo3));
688        assert!(!manager.cache.contains_key(&repo1));
689
690        manager.shutdown().unwrap();
691    }
692
693    #[test]
694    fn evict_stale_purges_idle_entries() {
695        let temp = tempdir().unwrap();
696        write_empty_graph(temp.path()).unwrap();
697
698        let config = SessionConfig {
699            idle_timeout: Duration::from_millis(100),
700            cleanup_interval: Duration::from_secs(3600),
701            ..SessionConfig::default()
702        };
703        let manager = SessionManager::with_config(config).unwrap();
704
705        manager.get_graph(temp.path()).unwrap();
706        assert_eq!(manager.stats().cached_graphs, 1);
707
708        // Simulate last access in the past to avoid sleeping too long.
709        if let Some(entry) = manager.cache.get(temp.path()) {
710            entry.value().set_last_accessed(
711                Instant::now()
712                    .checked_sub(Duration::from_millis(200))
713                    .unwrap(),
714            );
715        }
716
717        let evicted = manager.evict_stale(Instant::now()).unwrap();
718        assert_eq!(evicted, 1);
719        assert_eq!(manager.stats().cached_graphs, 0);
720
721        manager.shutdown().unwrap();
722    }
723
724    #[test]
725    fn register_watcher_uses_manifest_path() {
726        // Regression test for STEP_3 codex iter2 BLOCK: production must
727        // register the watcher against `.sqry/graph/manifest.json` —
728        // the only path that `FileWatcher::handle_event` matches — not
729        // the snapshot path. Wiring the watcher to the snapshot path is
730        // a silent no-op because no event will ever satisfy the
731        // manifest filter.
732        let temp = tempdir().unwrap();
733        write_empty_graph(temp.path()).unwrap();
734
735        let manager = SessionManager::new().unwrap();
736        manager.get_graph(temp.path()).unwrap();
737
738        let storage = GraphStorage::new(temp.path());
739        let watched = manager
740            .watcher
741            .lock()
742            .expect("watcher lock poisoned in test")
743            .watched_paths();
744
745        assert!(
746            watched.contains(&storage.manifest_path().to_path_buf()),
747            "watcher must be registered for manifest path {} (registered: {:?})",
748            storage.manifest_path().display(),
749            watched,
750        );
751        assert!(
752            !watched.contains(&storage.snapshot_path().to_path_buf()),
753            "watcher must NOT be registered for snapshot path {} (registered: {:?})",
754            storage.snapshot_path().display(),
755            watched,
756        );
757
758        manager.shutdown().unwrap();
759    }
760
761    #[test]
762    fn evict_lru_unwatches_manifest_path() {
763        // Regression test: LRU eviction's `unwatch` call must mirror
764        // `register_watcher` and target the manifest path. If the paths
765        // diverge the unwatch silently no-ops and the watcher leaks.
766        let temp = tempdir().unwrap();
767        let base = temp.path();
768
769        let config = SessionConfig {
770            max_cached_indexes: 1,
771            ..SessionConfig::default()
772        };
773        let manager = SessionManager::with_config(config).unwrap();
774
775        let repo1 = base.join("repo1");
776        let repo2 = base.join("repo2");
777        write_empty_graph(&repo1).unwrap();
778        write_empty_graph(&repo2).unwrap();
779
780        manager.get_graph(&repo1).unwrap();
781        manager.get_graph(&repo2).unwrap(); // evicts repo1
782
783        let watched = manager
784            .watcher
785            .lock()
786            .expect("watcher lock poisoned in test")
787            .watched_paths();
788
789        let repo1_manifest = GraphStorage::new(&repo1).manifest_path().to_path_buf();
790        let repo2_manifest = GraphStorage::new(&repo2).manifest_path().to_path_buf();
791        assert!(
792            !watched.contains(&repo1_manifest),
793            "evicted workspace's manifest watch must be released; still watching: {watched:?}",
794        );
795        assert!(
796            watched.contains(&repo2_manifest),
797            "current workspace must remain watched; registered: {watched:?}",
798        );
799
800        manager.shutdown().unwrap();
801    }
802
803    #[test]
804    #[ignore = "flaky: timing-sensitive file watcher test"]
805    fn file_changes_trigger_invalidation() {
806        let temp = tempdir().unwrap();
807        write_empty_graph(temp.path()).unwrap();
808
809        let manager = SessionManager::new().unwrap();
810        manager.get_graph(temp.path()).unwrap();
811        assert!(manager.cache.contains_key(temp.path()));
812
813        // The watcher matches writes to `.sqry/graph/manifest.json` —
814        // touching the snapshot file would never fire a callback.
815        let storage = GraphStorage::new(temp.path());
816        std::fs::write(storage.manifest_path(), b"modified").unwrap();
817
818        let evicted = wait_until(watcher_timeout(), || {
819            manager
820                .watcher
821                .lock()
822                .expect("watcher lock poisoned in test")
823                .process_events()
824                .expect("watcher failed to process events in test");
825            !manager.cache.contains_key(temp.path())
826        });
827        assert!(evicted, "expected watcher to invalidate cache entry");
828
829        manager.shutdown().unwrap();
830    }
831
832    #[test]
833    #[ignore = "flaky: timing-sensitive background thread test"]
834    fn background_thread_processes_watcher_events() {
835        let temp = tempdir().unwrap();
836        write_empty_graph(temp.path()).unwrap();
837
838        let config = SessionConfig {
839            cleanup_interval: Duration::from_millis(50),
840            ..SessionConfig::default()
841        };
842        let manager = SessionManager::with_config(config).unwrap();
843
844        manager.get_graph(temp.path()).unwrap();
845        let storage = GraphStorage::new(temp.path());
846        std::fs::write(storage.manifest_path(), b"changed").unwrap();
847
848        let evicted = wait_until(background_timeout(), || {
849            !manager.cache.contains_key(temp.path())
850        });
851        assert!(
852            evicted,
853            "background thread failed to remove watcher-invalidated entry"
854        );
855
856        manager.shutdown().unwrap();
857    }
858
859    #[test]
860    fn background_thread_evicts_idle_entries() {
861        let temp = tempdir().unwrap();
862        write_empty_graph(temp.path()).unwrap();
863
864        let config = SessionConfig {
865            idle_timeout: Duration::from_millis(50),
866            cleanup_interval: Duration::from_millis(30),
867            ..SessionConfig::default()
868        };
869        let manager = SessionManager::with_config(config).unwrap();
870
871        manager.get_graph(temp.path()).unwrap();
872        assert_eq!(manager.stats().cached_graphs, 1);
873
874        if let Some(entry) = manager.cache.get(temp.path()) {
875            entry.value().set_last_accessed(
876                Instant::now()
877                    .checked_sub(Duration::from_millis(200))
878                    .unwrap(),
879            );
880        }
881
882        let evicted = wait_until(background_timeout(), || {
883            !manager.cache.contains_key(temp.path())
884        });
885        assert!(
886            evicted,
887            "background eviction thread did not remove idle entry"
888        );
889
890        manager.shutdown().unwrap();
891    }
892}