Skip to main content

sqry_core/session/
manager.rs

1//! Session cache manager for warm multi-query execution.
2//!
3//! The implementation keeps recently-used unified code graphs in-memory
4//! so subsequent queries avoid disk deserialization. Filesystem watching
5//! ensures cache entries are invalidated automatically when the on-disk
6//! graph changes. A background maintenance thread processes file watcher
7//! events and reclaims idle cache entries.
8
9use std::fs;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18use log::{debug, info};
19use parking_lot::{Condvar, Mutex as PlMutex};
20
21use crate::graph::CodeGraph;
22use crate::graph::unified::persistence::{GraphStorage, load_from_path};
23use crate::plugin::PluginManager;
24use crate::query::QueryExecutor;
25
26use super::cache::CachedIndex;
27use super::watcher::FileWatcher;
28use super::{SessionConfig, SessionError, SessionResult};
29
30/// Manages cached unified code graphs for a workspace session.
31pub struct SessionManager {
32    cache: Arc<DashMap<PathBuf, CachedIndex>>,
33    config: SessionConfig,
34    watcher: Arc<Mutex<FileWatcher>>,
35    cleanup_handle: Option<JoinHandle<()>>,
36    /// `(stop_flag, wakeup_cvar)`: the mutex provides happens-before
37    /// between `stop_worker`'s store and the cleanup loop's read; the
38    /// Condvar guarantees no missed wakeup when `stop_worker` runs
39    /// concurrently with a `cvar.wait_for`. Replaces an earlier
40    /// `AtomicBool` + `thread::park_timeout` protocol that could miss a
41    /// shutdown signal under `Ordering::Relaxed` semantics, causing the
42    /// cleanup thread to sleep for the full `cleanup_interval` after the
43    /// shutdown store — up to one hour in tests that set
44    /// `cleanup_interval = Duration::from_secs(3600)`.
45    shutdown: Arc<(PlMutex<bool>, Condvar)>,
46    total_queries: Arc<AtomicU64>,
47    cache_hits: Arc<AtomicU64>,
48    cache_misses: Arc<AtomicU64>,
49    /// Query executor with properly configured plugin manager
50    query_executor: QueryExecutor,
51}
52
53impl SessionManager {
54    /// Create a session manager with default configuration and an empty plugin manager.
55    ///
56    /// **Note:** For proper query execution with language support, use
57    /// [`with_plugin_manager()`](Self::with_plugin_manager) instead, passing a
58    /// `PluginManager` configured with the language plugins you need.
59    ///
60    /// # Errors
61    ///
62    /// Returns [`SessionError`] when the background watcher thread cannot be spawned or when
63    /// watcher initialization fails.
64    pub fn new() -> SessionResult<Self> {
65        Self::with_config_and_plugins(SessionConfig::default(), PluginManager::new())
66    }
67
68    /// Create a session manager with a pre-configured plugin manager.
69    ///
70    /// This is the recommended constructor for production use. The `PluginManager`
71    /// should be created via `sqry_plugin_registry::create_plugin_manager()` or
72    /// configured with the language plugins you need for query evaluation.
73    ///
74    /// # Arguments
75    ///
76    /// * `plugin_manager` - A `PluginManager` configured with language plugins
77    ///
78    /// # Errors
79    ///
80    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
81    pub fn with_plugin_manager(plugin_manager: PluginManager) -> SessionResult<Self> {
82        Self::with_config_and_plugins(SessionConfig::default(), plugin_manager)
83    }
84
85    /// Create a session manager with the provided configuration.
86    ///
87    /// **Note:** This uses an empty plugin manager. For proper query execution,
88    /// use [`with_config_and_plugins()`](Self::with_config_and_plugins) instead.
89    ///
90    /// # Errors
91    ///
92    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
93    pub fn with_config(config: SessionConfig) -> SessionResult<Self> {
94        Self::with_config_and_plugins(config, PluginManager::new())
95    }
96
97    /// Create a session manager with configuration and a pre-configured plugin manager.
98    ///
99    /// This is the most flexible constructor, allowing full control over both
100    /// session configuration and plugin setup.
101    ///
102    /// # Arguments
103    ///
104    /// * `config` - Session configuration (cache size, timeouts, etc.)
105    /// * `plugin_manager` - A `PluginManager` configured with language plugins
106    ///
107    /// # Errors
108    ///
109    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
110    pub fn with_config_and_plugins(
111        config: SessionConfig,
112        plugin_manager: PluginManager,
113    ) -> SessionResult<Self> {
114        let watcher = if config.enable_file_watching {
115            FileWatcher::new()?
116        } else {
117            FileWatcher::disabled()
118        };
119
120        let cache = Arc::new(DashMap::new());
121        let watcher = Arc::new(Mutex::new(watcher));
122        let shutdown = Arc::new((PlMutex::new(false), Condvar::new()));
123
124        let cleanup_interval = config.cleanup_interval;
125        let idle_timeout = config.idle_timeout;
126
127        let cleanup_handle = {
128            let cache = Arc::clone(&cache);
129            let watcher_clone = Arc::clone(&watcher);
130            let shutdown_flag = Arc::clone(&shutdown);
131
132            thread::Builder::new()
133                .name("sqry-session-cleanup".into())
134                .spawn(move || {
135                    loop {
136                        if *shutdown_flag.0.lock() {
137                            break;
138                        }
139
140                        if let Ok(mut guard) = watcher_clone.lock()
141                            && let Err(err) = guard.process_events()
142                        {
143                            debug!("failed to process session watcher events: {err}");
144                        }
145
146                        let now = Instant::now();
147                        if let Err(err) =
148                            Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
149                        {
150                            debug!("failed to evict stale sessions: {err}");
151                        }
152
153                        // Atomic wait: take the lock, check stop, only
154                        // then suspend on the cvar. `stop_worker` sets
155                        // `*stop = true` under the same lock before
156                        // `notify_all`, so there is no missed-wakeup
157                        // window and no memory-ordering ambiguity.
158                        let mut stop = shutdown_flag.0.lock();
159                        if !*stop {
160                            shutdown_flag.1.wait_for(&mut stop, cleanup_interval);
161                        }
162                        if *stop {
163                            break;
164                        }
165                    }
166
167                    if let Ok(mut guard) = watcher_clone.lock()
168                        && let Err(err) = guard.process_events()
169                    {
170                        debug!("failed to process session watcher events during shutdown: {err}");
171                    }
172                    let now = Instant::now();
173                    if let Err(err) =
174                        Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
175                    {
176                        debug!("failed to evict stale sessions during shutdown: {err}");
177                    }
178                })
179                .map_err(SessionError::SpawnThread)?
180        };
181
182        // Create query executor with the provided plugin manager
183        let query_executor = QueryExecutor::with_plugin_manager(plugin_manager);
184
185        Ok(Self {
186            cache,
187            config,
188            watcher,
189            cleanup_handle: Some(cleanup_handle),
190            shutdown,
191            total_queries: Arc::new(AtomicU64::new(0)),
192            cache_hits: Arc::new(AtomicU64::new(0)),
193            cache_misses: Arc::new(AtomicU64::new(0)),
194            query_executor,
195        })
196    }
197
198    /// Get the cached code graph for `path`, loading it from disk on demand.
199    ///
200    /// # Errors
201    ///
202    /// Returns [`SessionError`] when the graph cannot be loaded.
203    pub fn get_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
204        self.get_or_load_graph(path)
205    }
206
207    /// Query symbols from the cached graph for `path`.
208    ///
209    /// This method loads the graph on demand and returns symbols that match
210    /// the query criteria. The query string is parsed and evaluated against
211    /// all symbols in the graph.
212    ///
213    /// # Errors
214    ///
215    /// Returns [`SessionError`] when the graph cannot be loaded, query parsing fails,
216    /// or query execution fails.
217    pub fn query(
218        &self,
219        path: &Path,
220        query_str: &str,
221    ) -> SessionResult<crate::query::results::QueryResults> {
222        // Use execute_on_graph which handles graph loading and caching internally
223        let results = self
224            .query_executor
225            .execute_on_graph(query_str, path)
226            .map_err(|e| {
227                // Determine if this is a parse error or execution error
228                let error_msg = e.to_string();
229                if error_msg.contains("parse")
230                    || error_msg.contains("unexpected")
231                    || error_msg.contains("expected")
232                {
233                    SessionError::QueryParse(error_msg)
234                } else {
235                    SessionError::QueryExecution(e)
236                }
237            })?;
238
239        Ok(results)
240    }
241
242    /// Remove the cached graph for `path`, if present.
243    ///
244    /// # Errors
245    ///
246    /// Returns [`SessionError`] when the file watcher fails to unwatch the path.
247    pub fn invalidate(&self, path: &Path) -> SessionResult<()> {
248        self.cache.remove(path);
249        if let Ok(mut watcher) = self.watcher.lock() {
250            watcher.unwatch(path)?;
251        }
252        Ok(())
253    }
254
255    /// Ensure the graph for `path` is loaded into the session cache without
256    /// affecting query statistics.
257    ///
258    /// # Errors
259    ///
260    /// Returns [`SessionError`] when loading from disk fails.
261    pub fn preload(&self, path: &Path) -> SessionResult<()> {
262        if self.cache.contains_key(path) {
263            return Ok(());
264        }
265
266        if self.cache.len() >= self.config.max_cached_indexes {
267            self.evict_lru();
268        }
269
270        self.load_graph_from_disk(path)?;
271        Ok(())
272    }
273
274    /// Return aggregate session statistics.
275    #[must_use]
276    pub fn stats(&self) -> SessionStats {
277        SessionStats {
278            cached_graphs: self.cache.len(),
279            total_queries: self.total_queries.load(Ordering::Relaxed),
280            cache_hits: self.cache_hits.load(Ordering::Relaxed),
281            cache_misses: self.cache_misses.load(Ordering::Relaxed),
282            total_memory_mb: self.cache.len() * 51,
283        }
284    }
285
286    /// Evict entries that have been idle longer than configured timeout.
287    ///
288    /// # Errors
289    ///
290    /// Returns [`SessionError`] when watcher operations fail while removing paths.
291    pub fn evict_stale(&self, now: Instant) -> SessionResult<usize> {
292        Self::evict_stale_for(&self.cache, &self.watcher, self.config.idle_timeout, now)
293    }
294
295    fn get_or_load_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
296        debug_assert!(
297            self.config.max_cached_indexes > 0,
298            "SessionConfig::max_cached_indexes must be at least 1"
299        );
300        self.total_queries.fetch_add(1, Ordering::Relaxed);
301
302        if let Some(entry) = self.cache.get_mut(path) {
303            entry.value().access();
304            self.cache_hits.fetch_add(1, Ordering::Relaxed);
305            return Ok(Arc::clone(&entry.value().graph));
306        }
307
308        self.cache_misses.fetch_add(1, Ordering::Relaxed);
309
310        if self.cache.len() >= self.config.max_cached_indexes {
311            self.evict_lru();
312        }
313
314        let graph = self.load_graph_from_disk(path)?;
315
316        if let Some(entry) = self.cache.get_mut(path) {
317            entry.value().access();
318        }
319
320        Ok(graph)
321    }
322
323    fn load_graph_from_disk(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
324        let storage = GraphStorage::new(path);
325        let snapshot_path = storage.snapshot_path();
326
327        let metadata =
328            fs::metadata(snapshot_path).map_err(|source| SessionError::IndexMetadata {
329                path: snapshot_path.to_path_buf(),
330                source,
331            })?;
332        let file_mtime = metadata
333            .modified()
334            .map_err(|source| SessionError::IndexMetadata {
335                path: snapshot_path.to_path_buf(),
336                source,
337            })?;
338
339        let graph = load_from_path(snapshot_path, Some(self.query_executor.plugin_manager()))
340            .map_err(|source| SessionError::IndexLoad {
341                path: snapshot_path.to_path_buf(),
342                source: source.into(),
343            })?;
344        let arc_graph = Arc::new(graph);
345
346        let cached = CachedIndex::new(Arc::clone(&arc_graph), file_mtime);
347        self.cache.insert(path.to_path_buf(), cached);
348
349        self.register_watcher(path, storage.manifest_path());
350
351        Ok(arc_graph)
352    }
353
354    /// Register a filesystem watch for the workspace's
355    /// `.sqry/graph/manifest.json` so cache entries are invalidated when
356    /// the live build pipeline rewrites the manifest. The watcher's
357    /// `handle_event` filter matches only manifest writes (see
358    /// `session::watcher::FileWatcher::handle_event`), so the registered
359    /// path **must** be the manifest path; passing the snapshot path here
360    /// would silently disable invalidation because no callback would ever
361    /// match the filtered event path.
362    fn register_watcher(&self, workspace_path: &Path, manifest_path: &Path) {
363        if let Ok(mut watcher) = self.watcher.lock() {
364            let cache = Arc::clone(&self.cache);
365            let callback_path = workspace_path.to_path_buf();
366            let watch_path = manifest_path.to_path_buf();
367            if let Err(err) = watcher.watch(watch_path.clone(), move || {
368                cache.remove(&callback_path);
369                info!(
370                    "Invalidated session cache after graph change: {}",
371                    callback_path.display()
372                );
373            }) {
374                debug!(
375                    "failed to register file watcher for {}: {err}",
376                    watch_path.display()
377                );
378            }
379        }
380    }
381
382    /// Evict least-recently-used entry when cache is at capacity.
383    fn evict_lru(&self) {
384        let mut oldest: Option<(PathBuf, Instant)> = None;
385
386        for entry in self.cache.iter() {
387            let last_accessed = entry.value().last_accessed();
388            if oldest
389                .as_ref()
390                .is_none_or(|(_, instant)| last_accessed < *instant)
391            {
392                oldest = Some((entry.key().clone(), last_accessed));
393            }
394        }
395
396        if let Some((path, _)) = oldest {
397            self.cache.remove(&path);
398            debug!("evicted LRU session cache entry: {}", path.display());
399            if let Ok(mut watcher) = self.watcher.lock() {
400                // Must mirror `register_watcher` and pass the manifest path —
401                // unwatching a different path silently no-ops because the
402                // watcher's callback table is keyed on the registered path.
403                let storage = GraphStorage::new(&path);
404                if let Err(err) = watcher.unwatch(storage.manifest_path()) {
405                    debug!("failed to unwatch session path {}: {err}", path.display());
406                }
407            }
408        }
409    }
410
411    fn evict_stale_for(
412        cache: &Arc<DashMap<PathBuf, CachedIndex>>,
413        watcher: &Arc<Mutex<FileWatcher>>,
414        timeout: Duration,
415        now: Instant,
416    ) -> SessionResult<usize> {
417        let mut to_remove = Vec::new();
418
419        for entry in cache.iter() {
420            let idle = now.duration_since(entry.value().last_accessed());
421            if idle > timeout {
422                to_remove.push(entry.key().clone());
423            }
424        }
425
426        for path in &to_remove {
427            cache.remove(path);
428        }
429
430        if !to_remove.is_empty()
431            && let Ok(mut guard) = watcher.lock()
432        {
433            for path in &to_remove {
434                // Must match the path used in `register_watcher`
435                // (`.sqry/graph/manifest.json`), otherwise the watcher
436                // callback table never receives the unwatch and the entry
437                // leaks.
438                let storage = GraphStorage::new(path);
439                guard.unwatch(storage.manifest_path())?;
440            }
441        }
442
443        Ok(to_remove.len())
444    }
445
446    fn stop_worker(&mut self) {
447        {
448            let mut stop = self.shutdown.0.lock();
449            *stop = true;
450            self.shutdown.1.notify_all();
451        }
452
453        if let Some(handle) = self.cleanup_handle.take()
454            && let Err(err) = handle.join()
455        {
456            debug!("session cleanup thread terminated with error: {err:?}");
457        }
458    }
459
460    /// Gracefully shut down the cleanup thread. Optional, as [`Drop`] handles it.
461    ///
462    /// # Errors
463    ///
464    /// Returns [`SessionError`] when shutting down watcher operations fails.
465    pub fn shutdown(mut self) -> SessionResult<()> {
466        self.stop_worker();
467        Ok(())
468    }
469}
470
471impl Drop for SessionManager {
472    fn drop(&mut self) {
473        self.stop_worker();
474    }
475}
476
477/// High-level statistics describing the current session cache.
478#[derive(Debug, Clone)]
479pub struct SessionStats {
480    /// Number of cached graphs retained in-memory.
481    pub cached_graphs: usize,
482    /// Total number of graph accesses via this manager.
483    pub total_queries: u64,
484    /// Number of accesses served from cache.
485    pub cache_hits: u64,
486    /// Number of accesses that triggered a load from disk.
487    pub cache_misses: u64,
488    /// Estimated total memory footprint in megabytes.
489    pub total_memory_mb: usize,
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use crate::graph::unified::persistence::save_to_path;
496    use std::sync::Arc;
497    use std::thread;
498    use std::time::{Duration, Instant};
499    use tempfile::tempdir;
500
501    fn write_empty_graph(dir: &Path) -> SessionResult<()> {
502        let storage = GraphStorage::new(dir);
503        fs::create_dir_all(storage.graph_dir()).map_err(|source| SessionError::IndexMetadata {
504            path: storage.graph_dir().to_path_buf(),
505            source,
506        })?;
507        let graph = CodeGraph::new();
508        save_to_path(&graph, storage.snapshot_path()).map_err(|source| {
509            SessionError::IndexLoad {
510                path: storage.snapshot_path().to_path_buf(),
511                source: source.into(),
512            }
513        })?;
514        // Production callers register the file watcher against
515        // `<workspace>/.sqry/graph/manifest.json` — and `notify` requires
516        // the watch target to exist before `watch()` is called. The live
517        // build pipeline writes the manifest alongside the snapshot, so
518        // mirror that here for tests that depend on watcher registration.
519        fs::write(storage.manifest_path(), b"{}").map_err(|source| {
520            SessionError::IndexMetadata {
521                path: storage.manifest_path().to_path_buf(),
522                source,
523            }
524        })?;
525        Ok(())
526    }
527
528    fn watcher_timeout() -> Duration {
529        let base = if cfg!(target_os = "macos") {
530            Duration::from_secs(3)
531        } else {
532            Duration::from_secs(2)
533        };
534
535        if std::env::var("CI").is_ok() {
536            base * 2
537        } else {
538            base
539        }
540    }
541
542    fn background_timeout() -> Duration {
543        let base = if cfg!(target_os = "macos") {
544            Duration::from_secs(5)
545        } else {
546            Duration::from_secs(3)
547        };
548
549        if std::env::var("CI").is_ok() {
550            base * 2
551        } else {
552            base
553        }
554    }
555
556    fn wait_until<F>(timeout: Duration, mut predicate: F) -> bool
557    where
558        F: FnMut() -> bool,
559    {
560        let deadline = Instant::now() + timeout;
561        loop {
562            if predicate() {
563                return true;
564            }
565            if Instant::now() >= deadline {
566                return false;
567            }
568            thread::sleep(Duration::from_millis(50));
569        }
570    }
571
572    #[test]
573    fn get_graph_loads_and_updates_stats() {
574        let temp = tempdir().unwrap();
575        write_empty_graph(temp.path()).unwrap();
576
577        let manager = SessionManager::new().unwrap();
578
579        let graph = manager.get_graph(temp.path()).unwrap();
580        assert_eq!(graph.snapshot().nodes().len(), 0);
581
582        let stats = manager.stats();
583        assert_eq!(stats.total_queries, 1);
584        assert_eq!(stats.cache_misses, 1);
585        assert_eq!(stats.cache_hits, 0);
586        assert_eq!(stats.cached_graphs, 1);
587
588        manager.shutdown().unwrap();
589    }
590
591    #[test]
592    fn get_graph_missing_returns_error() {
593        let temp = tempdir().unwrap();
594
595        let manager = SessionManager::new().unwrap();
596        let err = manager
597            .get_graph(temp.path())
598            .expect_err("get_graph should fail without graph");
599
600        assert!(matches!(err, SessionError::IndexMetadata { .. }));
601        manager.shutdown().unwrap();
602    }
603
604    #[test]
605    fn preload_does_not_affect_stats() {
606        let temp = tempdir().unwrap();
607        write_empty_graph(temp.path()).unwrap();
608
609        let manager = SessionManager::new().unwrap();
610        manager.preload(temp.path()).unwrap();
611
612        let stats = manager.stats();
613        assert_eq!(stats.total_queries, 0);
614        assert_eq!(stats.cache_hits, 0);
615        assert_eq!(stats.cache_misses, 0);
616        assert_eq!(stats.cached_graphs, 1);
617
618        manager.get_graph(temp.path()).unwrap();
619        let after = manager.stats();
620        assert_eq!(after.total_queries, 1);
621        assert_eq!(after.cache_hits, 1);
622        assert_eq!(after.cache_misses, 0);
623        assert_eq!(after.cached_graphs, 1);
624
625        manager.shutdown().unwrap();
626    }
627
628    #[test]
629    fn second_access_hits_cache() {
630        let temp = tempdir().unwrap();
631        write_empty_graph(temp.path()).unwrap();
632
633        let manager = SessionManager::new().unwrap();
634
635        manager.get_graph(temp.path()).unwrap();
636        manager.get_graph(temp.path()).unwrap();
637
638        let stats = manager.stats();
639        assert_eq!(stats.total_queries, 2);
640        assert_eq!(stats.cache_misses, 1);
641        assert_eq!(stats.cache_hits, 1);
642        assert_eq!(stats.cached_graphs, 1);
643
644        manager.shutdown().unwrap();
645    }
646
647    #[test]
648    fn concurrent_access_shares_cache() {
649        let temp = tempdir().unwrap();
650        write_empty_graph(temp.path()).unwrap();
651
652        let manager = Arc::new(SessionManager::new().unwrap());
653        let path = temp.path().to_path_buf();
654
655        let handles: Vec<_> = (0..6)
656            .map(|_| {
657                let mgr = Arc::clone(&manager);
658                let path = path.clone();
659                thread::spawn(move || {
660                    mgr.get_graph(&path).unwrap();
661                })
662            })
663            .collect();
664
665        for handle in handles {
666            handle.join().unwrap();
667        }
668
669        let manager = Arc::into_inner(manager).expect("no outstanding references");
670        manager.shutdown().unwrap();
671    }
672
673    #[test]
674    fn invalidate_removes_cached_graph() {
675        let temp = tempdir().unwrap();
676        write_empty_graph(temp.path()).unwrap();
677
678        let manager = SessionManager::new().unwrap();
679        manager.get_graph(temp.path()).unwrap();
680        assert_eq!(manager.stats().cached_graphs, 1);
681
682        manager.invalidate(temp.path()).unwrap();
683        assert_eq!(manager.stats().cached_graphs, 0);
684
685        manager.shutdown().unwrap();
686    }
687
688    #[test]
689    fn lru_eviction_removes_oldest_entry() {
690        let temp = tempdir().unwrap();
691        let base = temp.path();
692
693        let config = SessionConfig {
694            max_cached_indexes: 2,
695            ..SessionConfig::default()
696        };
697        let manager = SessionManager::with_config(config).unwrap();
698
699        let repo1 = base.join("repo1");
700        let repo2 = base.join("repo2");
701        let repo3 = base.join("repo3");
702        write_empty_graph(&repo1).unwrap();
703        write_empty_graph(&repo2).unwrap();
704        write_empty_graph(&repo3).unwrap();
705
706        manager.get_graph(&repo1).unwrap();
707        manager.get_graph(&repo2).unwrap();
708
709        // Access repo2 again so repo1 stays LRU.
710        manager.get_graph(&repo2).unwrap();
711        manager.get_graph(&repo3).unwrap();
712
713        assert_eq!(manager.stats().cached_graphs, 2);
714        assert!(manager.cache.contains_key(&repo2));
715        assert!(manager.cache.contains_key(&repo3));
716        assert!(!manager.cache.contains_key(&repo1));
717
718        manager.shutdown().unwrap();
719    }
720
721    #[test]
722    fn evict_stale_purges_idle_entries() {
723        let temp = tempdir().unwrap();
724        write_empty_graph(temp.path()).unwrap();
725
726        let config = SessionConfig {
727            idle_timeout: Duration::from_millis(100),
728            cleanup_interval: Duration::from_secs(3600),
729            ..SessionConfig::default()
730        };
731        let manager = SessionManager::with_config(config).unwrap();
732
733        manager.get_graph(temp.path()).unwrap();
734        assert_eq!(manager.stats().cached_graphs, 1);
735
736        // Simulate last access in the past to avoid sleeping too long.
737        if let Some(entry) = manager.cache.get(temp.path()) {
738            entry.value().set_last_accessed(
739                Instant::now()
740                    .checked_sub(Duration::from_millis(200))
741                    .unwrap(),
742            );
743        }
744
745        let evicted = manager.evict_stale(Instant::now()).unwrap();
746        assert_eq!(evicted, 1);
747        assert_eq!(manager.stats().cached_graphs, 0);
748
749        manager.shutdown().unwrap();
750    }
751
752    #[test]
753    fn register_watcher_uses_manifest_path() {
754        // Regression test for STEP_3 codex iter2 BLOCK: production must
755        // register the watcher against `.sqry/graph/manifest.json` —
756        // the only path that `FileWatcher::handle_event` matches — not
757        // the snapshot path. Wiring the watcher to the snapshot path is
758        // a silent no-op because no event will ever satisfy the
759        // manifest filter.
760        let temp = tempdir().unwrap();
761        write_empty_graph(temp.path()).unwrap();
762
763        let manager = SessionManager::new().unwrap();
764        manager.get_graph(temp.path()).unwrap();
765
766        let storage = GraphStorage::new(temp.path());
767        let watched = manager
768            .watcher
769            .lock()
770            .expect("watcher lock poisoned in test")
771            .watched_paths();
772
773        assert!(
774            watched.contains(&storage.manifest_path().to_path_buf()),
775            "watcher must be registered for manifest path {} (registered: {:?})",
776            storage.manifest_path().display(),
777            watched,
778        );
779        assert!(
780            !watched.contains(&storage.snapshot_path().to_path_buf()),
781            "watcher must NOT be registered for snapshot path {} (registered: {:?})",
782            storage.snapshot_path().display(),
783            watched,
784        );
785
786        manager.shutdown().unwrap();
787    }
788
789    #[test]
790    fn evict_lru_unwatches_manifest_path() {
791        // Regression test: LRU eviction's `unwatch` call must mirror
792        // `register_watcher` and target the manifest path. If the paths
793        // diverge the unwatch silently no-ops and the watcher leaks.
794        let temp = tempdir().unwrap();
795        let base = temp.path();
796
797        let config = SessionConfig {
798            max_cached_indexes: 1,
799            ..SessionConfig::default()
800        };
801        let manager = SessionManager::with_config(config).unwrap();
802
803        let repo1 = base.join("repo1");
804        let repo2 = base.join("repo2");
805        write_empty_graph(&repo1).unwrap();
806        write_empty_graph(&repo2).unwrap();
807
808        manager.get_graph(&repo1).unwrap();
809        manager.get_graph(&repo2).unwrap(); // evicts repo1
810
811        let watched = manager
812            .watcher
813            .lock()
814            .expect("watcher lock poisoned in test")
815            .watched_paths();
816
817        let repo1_manifest = GraphStorage::new(&repo1).manifest_path().to_path_buf();
818        let repo2_manifest = GraphStorage::new(&repo2).manifest_path().to_path_buf();
819        assert!(
820            !watched.contains(&repo1_manifest),
821            "evicted workspace's manifest watch must be released; still watching: {watched:?}",
822        );
823        assert!(
824            watched.contains(&repo2_manifest),
825            "current workspace must remain watched; registered: {watched:?}",
826        );
827
828        manager.shutdown().unwrap();
829    }
830
831    #[test]
832    #[ignore = "flaky: timing-sensitive file watcher test"]
833    fn file_changes_trigger_invalidation() {
834        let temp = tempdir().unwrap();
835        write_empty_graph(temp.path()).unwrap();
836
837        let manager = SessionManager::new().unwrap();
838        manager.get_graph(temp.path()).unwrap();
839        assert!(manager.cache.contains_key(temp.path()));
840
841        // The watcher matches writes to `.sqry/graph/manifest.json` —
842        // touching the snapshot file would never fire a callback.
843        let storage = GraphStorage::new(temp.path());
844        std::fs::write(storage.manifest_path(), b"modified").unwrap();
845
846        let evicted = wait_until(watcher_timeout(), || {
847            manager
848                .watcher
849                .lock()
850                .expect("watcher lock poisoned in test")
851                .process_events()
852                .expect("watcher failed to process events in test");
853            !manager.cache.contains_key(temp.path())
854        });
855        assert!(evicted, "expected watcher to invalidate cache entry");
856
857        manager.shutdown().unwrap();
858    }
859
860    #[test]
861    #[ignore = "flaky: timing-sensitive background thread test"]
862    fn background_thread_processes_watcher_events() {
863        let temp = tempdir().unwrap();
864        write_empty_graph(temp.path()).unwrap();
865
866        let config = SessionConfig {
867            cleanup_interval: Duration::from_millis(50),
868            ..SessionConfig::default()
869        };
870        let manager = SessionManager::with_config(config).unwrap();
871
872        manager.get_graph(temp.path()).unwrap();
873        let storage = GraphStorage::new(temp.path());
874        std::fs::write(storage.manifest_path(), b"changed").unwrap();
875
876        let evicted = wait_until(background_timeout(), || {
877            !manager.cache.contains_key(temp.path())
878        });
879        assert!(
880            evicted,
881            "background thread failed to remove watcher-invalidated entry"
882        );
883
884        manager.shutdown().unwrap();
885    }
886
887    #[test]
888    fn background_thread_evicts_idle_entries() {
889        let temp = tempdir().unwrap();
890        write_empty_graph(temp.path()).unwrap();
891
892        let config = SessionConfig {
893            idle_timeout: Duration::from_millis(50),
894            cleanup_interval: Duration::from_millis(30),
895            ..SessionConfig::default()
896        };
897        let manager = SessionManager::with_config(config).unwrap();
898
899        manager.get_graph(temp.path()).unwrap();
900        assert_eq!(manager.stats().cached_graphs, 1);
901
902        if let Some(entry) = manager.cache.get(temp.path()) {
903            entry.value().set_last_accessed(
904                Instant::now()
905                    .checked_sub(Duration::from_millis(200))
906                    .unwrap(),
907            );
908        }
909
910        let evicted = wait_until(background_timeout(), || {
911            !manager.cache.contains_key(temp.path())
912        });
913        assert!(
914            evicted,
915            "background eviction thread did not remove idle entry"
916        );
917
918        manager.shutdown().unwrap();
919    }
920}