Skip to main content

sqry_core/session/
manager.rs

1//! Session cache manager for warm multi-query execution.
2//!
3//! The implementation keeps recently-used unified code graphs in-memory
4//! so subsequent queries avoid disk deserialization. Filesystem watching
5//! ensures cache entries are invalidated automatically when the on-disk
6//! graph changes. A background maintenance thread processes file watcher
7//! events and reclaims idle cache entries.
8
9use std::fs;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18use log::{debug, info};
19use parking_lot::{Condvar, Mutex as PlMutex};
20
21use crate::graph::CodeGraph;
22use crate::graph::unified::persistence::{GraphStorage, load_from_path};
23use crate::plugin::PluginManager;
24use crate::query::QueryExecutor;
25
26use super::cache::CachedIndex;
27use super::watcher::FileWatcher;
28use super::{SessionConfig, SessionError, SessionResult};
29
30/// Manages cached unified code graphs for a workspace session.
31pub struct SessionManager {
32    cache: Arc<DashMap<PathBuf, CachedIndex>>,
33    config: SessionConfig,
34    watcher: Arc<Mutex<FileWatcher>>,
35    load_locks: Arc<DashMap<PathBuf, Arc<PlMutex<()>>>>,
36    cleanup_handle: Option<JoinHandle<()>>,
37    /// `(stop_flag, wakeup_cvar)`: the mutex provides happens-before
38    /// between `stop_worker`'s store and the cleanup loop's read; the
39    /// Condvar guarantees no missed wakeup when `stop_worker` runs
40    /// concurrently with a `cvar.wait_for`. Replaces an earlier
41    /// `AtomicBool` + `thread::park_timeout` protocol that could miss a
42    /// shutdown signal under `Ordering::Relaxed` semantics, causing the
43    /// cleanup thread to sleep for the full `cleanup_interval` after the
44    /// shutdown store — up to one hour in tests that set
45    /// `cleanup_interval = Duration::from_secs(3600)`.
46    shutdown: Arc<(PlMutex<bool>, Condvar)>,
47    total_queries: Arc<AtomicU64>,
48    cache_hits: Arc<AtomicU64>,
49    cache_misses: Arc<AtomicU64>,
50    disk_loads: Arc<AtomicU64>,
51    /// Query executor with properly configured plugin manager
52    query_executor: QueryExecutor,
53}
54
55struct LoadLockCleanup {
56    load_locks: Arc<DashMap<PathBuf, Arc<PlMutex<()>>>>,
57    cache_key: PathBuf,
58    load_lock: Arc<PlMutex<()>>,
59}
60
61impl LoadLockCleanup {
62    fn new(
63        load_locks: Arc<DashMap<PathBuf, Arc<PlMutex<()>>>>,
64        cache_key: PathBuf,
65        load_lock: Arc<PlMutex<()>>,
66    ) -> Self {
67        Self {
68            load_locks,
69            cache_key,
70            load_lock,
71        }
72    }
73}
74
75impl Drop for LoadLockCleanup {
76    fn drop(&mut self) {
77        self.load_locks
78            .remove_if(&self.cache_key, |_, current_lock| {
79                Arc::ptr_eq(current_lock, &self.load_lock)
80            });
81    }
82}
83
84impl SessionManager {
85    /// Create a session manager with default configuration and an empty plugin manager.
86    ///
87    /// **Note:** For proper query execution with language support, use
88    /// [`with_plugin_manager()`](Self::with_plugin_manager) instead, passing a
89    /// `PluginManager` configured with the language plugins you need.
90    ///
91    /// # Errors
92    ///
93    /// Returns [`SessionError`] when the background watcher thread cannot be spawned or when
94    /// watcher initialization fails.
95    pub fn new() -> SessionResult<Self> {
96        Self::with_config_and_plugins(SessionConfig::default(), PluginManager::new())
97    }
98
99    /// Create a session manager with a pre-configured plugin manager.
100    ///
101    /// This is the recommended constructor for production use. The `PluginManager`
102    /// should be created via `sqry_plugin_registry::create_plugin_manager()` or
103    /// configured with the language plugins you need for query evaluation.
104    ///
105    /// # Arguments
106    ///
107    /// * `plugin_manager` - A `PluginManager` configured with language plugins
108    ///
109    /// # Errors
110    ///
111    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
112    pub fn with_plugin_manager(plugin_manager: PluginManager) -> SessionResult<Self> {
113        Self::with_config_and_plugins(SessionConfig::default(), plugin_manager)
114    }
115
116    /// Create a session manager with the provided configuration.
117    ///
118    /// **Note:** This uses an empty plugin manager. For proper query execution,
119    /// use [`with_config_and_plugins()`](Self::with_config_and_plugins) instead.
120    ///
121    /// # Errors
122    ///
123    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
124    pub fn with_config(config: SessionConfig) -> SessionResult<Self> {
125        Self::with_config_and_plugins(config, PluginManager::new())
126    }
127
128    /// Create a session manager with configuration and a pre-configured plugin manager.
129    ///
130    /// This is the most flexible constructor, allowing full control over both
131    /// session configuration and plugin setup.
132    ///
133    /// # Arguments
134    ///
135    /// * `config` - Session configuration (cache size, timeouts, etc.)
136    /// * `plugin_manager` - A `PluginManager` configured with language plugins
137    ///
138    /// # Errors
139    ///
140    /// Returns [`SessionError`] when watcher initialization or thread spawning fails.
141    pub fn with_config_and_plugins(
142        config: SessionConfig,
143        plugin_manager: PluginManager,
144    ) -> SessionResult<Self> {
145        let watcher = if config.enable_file_watching {
146            FileWatcher::new()?
147        } else {
148            FileWatcher::disabled()
149        };
150
151        let cache = Arc::new(DashMap::new());
152        let load_locks = Arc::new(DashMap::new());
153        let watcher = Arc::new(Mutex::new(watcher));
154        let shutdown = Arc::new((PlMutex::new(false), Condvar::new()));
155
156        let cleanup_interval = config.cleanup_interval;
157        let idle_timeout = config.idle_timeout;
158
159        let cleanup_handle = {
160            let cache = Arc::clone(&cache);
161            let watcher_clone = Arc::clone(&watcher);
162            let shutdown_flag = Arc::clone(&shutdown);
163
164            thread::Builder::new()
165                .name("sqry-session-cleanup".into())
166                .spawn(move || {
167                    loop {
168                        if *shutdown_flag.0.lock() {
169                            break;
170                        }
171
172                        if let Ok(mut guard) = watcher_clone.lock()
173                            && let Err(err) = guard.process_events()
174                        {
175                            debug!("failed to process session watcher events: {err}");
176                        }
177
178                        let now = Instant::now();
179                        if let Err(err) =
180                            Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
181                        {
182                            debug!("failed to evict stale sessions: {err}");
183                        }
184
185                        // Atomic wait: take the lock, check stop, only
186                        // then suspend on the cvar. `stop_worker` sets
187                        // `*stop = true` under the same lock before
188                        // `notify_all`, so there is no missed-wakeup
189                        // window and no memory-ordering ambiguity.
190                        let mut stop = shutdown_flag.0.lock();
191                        if !*stop {
192                            shutdown_flag.1.wait_for(&mut stop, cleanup_interval);
193                        }
194                        if *stop {
195                            break;
196                        }
197                    }
198
199                    if let Ok(mut guard) = watcher_clone.lock()
200                        && let Err(err) = guard.process_events()
201                    {
202                        debug!("failed to process session watcher events during shutdown: {err}");
203                    }
204                    let now = Instant::now();
205                    if let Err(err) =
206                        Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
207                    {
208                        debug!("failed to evict stale sessions during shutdown: {err}");
209                    }
210                })
211                .map_err(SessionError::SpawnThread)?
212        };
213
214        // Create query executor with the provided plugin manager
215        let query_executor = QueryExecutor::with_plugin_manager(plugin_manager);
216
217        Ok(Self {
218            cache,
219            config,
220            watcher,
221            load_locks,
222            cleanup_handle: Some(cleanup_handle),
223            shutdown,
224            total_queries: Arc::new(AtomicU64::new(0)),
225            cache_hits: Arc::new(AtomicU64::new(0)),
226            cache_misses: Arc::new(AtomicU64::new(0)),
227            disk_loads: Arc::new(AtomicU64::new(0)),
228            query_executor,
229        })
230    }
231
232    /// Get the cached code graph for `path`, loading it from disk on demand.
233    ///
234    /// # Errors
235    ///
236    /// Returns [`SessionError`] when the graph cannot be loaded.
237    pub fn get_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
238        self.get_or_load_graph(path)
239    }
240
241    /// Query symbols from the cached graph for `path`.
242    ///
243    /// This method loads the graph on demand and returns symbols that match
244    /// the query criteria. The query string is parsed and evaluated against
245    /// all symbols in the graph.
246    ///
247    /// # Errors
248    ///
249    /// Returns [`SessionError`] when the graph cannot be loaded, query parsing fails,
250    /// or query execution fails.
251    pub fn query(
252        &self,
253        path: &Path,
254        query_str: &str,
255    ) -> SessionResult<crate::query::results::QueryResults> {
256        self.query_executor
257            .parse_query_ast(query_str)
258            .map_err(|err| SessionError::QueryParse(err.to_string()))?;
259
260        let graph = self.get_or_load_graph(path)?;
261
262        let results = self
263            .query_executor
264            .execute_on_preloaded_graph(graph, query_str, path, None)
265            .map_err(Self::map_query_error)?;
266
267        Ok(results)
268    }
269
270    /// Remove the cached graph for `path`, if present.
271    ///
272    /// # Errors
273    ///
274    /// Returns [`SessionError`] when the file watcher fails to unwatch the path.
275    pub fn invalidate(&self, path: &Path) -> SessionResult<()> {
276        let cache_key = Self::canonical_workspace_key(path)?;
277        self.cache.remove(&cache_key);
278        self.load_locks.remove(&cache_key);
279        Self::unwatch_manifest_for(&self.watcher, &cache_key)?;
280        Ok(())
281    }
282
283    /// Ensure the graph for `path` is loaded into the session cache without
284    /// affecting query statistics.
285    ///
286    /// # Errors
287    ///
288    /// Returns [`SessionError`] when loading from disk fails.
289    pub fn preload(&self, path: &Path) -> SessionResult<()> {
290        let cache_key = Self::canonical_workspace_key(path)?;
291        if self.cache.contains_key(&cache_key) {
292            return Ok(());
293        }
294
295        let load_lock = self.load_lock_for(&cache_key);
296        let _load_guard = load_lock.lock();
297        let _cleanup = LoadLockCleanup::new(
298            Arc::clone(&self.load_locks),
299            cache_key.clone(),
300            Arc::clone(&load_lock),
301        );
302
303        if self.cache.contains_key(&cache_key) {
304            return Ok(());
305        }
306
307        if self.cache.len() >= self.config.max_cached_indexes {
308            self.evict_lru();
309        }
310
311        self.load_graph_from_disk(&cache_key)?;
312        Ok(())
313    }
314
315    /// Return aggregate session statistics.
316    #[must_use]
317    pub fn stats(&self) -> SessionStats {
318        SessionStats {
319            cached_graphs: self.cache.len(),
320            total_queries: self.total_queries.load(Ordering::Relaxed),
321            cache_hits: self.cache_hits.load(Ordering::Relaxed),
322            cache_misses: self.cache_misses.load(Ordering::Relaxed),
323            total_memory_mb: self.cache.len() * 51,
324        }
325    }
326
327    /// Evict entries that have been idle longer than configured timeout.
328    ///
329    /// # Errors
330    ///
331    /// Returns [`SessionError`] when watcher operations fail while removing paths.
332    pub fn evict_stale(&self, now: Instant) -> SessionResult<usize> {
333        Self::evict_stale_for(&self.cache, &self.watcher, self.config.idle_timeout, now)
334    }
335
336    fn get_or_load_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
337        debug_assert!(
338            self.config.max_cached_indexes > 0,
339            "SessionConfig::max_cached_indexes must be at least 1"
340        );
341        let cache_key = Self::canonical_workspace_key(path)?;
342        self.total_queries.fetch_add(1, Ordering::Relaxed);
343
344        if let Some(entry) = self.cache.get_mut(&cache_key) {
345            entry.value().access();
346            self.cache_hits.fetch_add(1, Ordering::Relaxed);
347            return Ok(Arc::clone(&entry.value().graph));
348        }
349
350        let load_lock = self.load_lock_for(&cache_key);
351        let _load_guard = load_lock.lock();
352        let _cleanup = LoadLockCleanup::new(
353            Arc::clone(&self.load_locks),
354            cache_key.clone(),
355            Arc::clone(&load_lock),
356        );
357
358        if let Some(entry) = self.cache.get_mut(&cache_key) {
359            entry.value().access();
360            self.cache_hits.fetch_add(1, Ordering::Relaxed);
361            return Ok(Arc::clone(&entry.value().graph));
362        }
363
364        self.cache_misses.fetch_add(1, Ordering::Relaxed);
365
366        if self.cache.len() >= self.config.max_cached_indexes {
367            self.evict_lru();
368        }
369
370        let graph = self.load_graph_from_disk(&cache_key)?;
371
372        if let Some(entry) = self.cache.get_mut(&cache_key) {
373            entry.value().access();
374        }
375
376        Ok(graph)
377    }
378
379    fn load_lock_for(&self, cache_key: &Path) -> Arc<PlMutex<()>> {
380        Arc::clone(
381            &self
382                .load_locks
383                .entry(cache_key.to_path_buf())
384                .or_insert_with(|| Arc::new(PlMutex::new(()))),
385        )
386    }
387
388    fn load_graph_from_disk(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
389        let storage = GraphStorage::new(path);
390        let snapshot_path = storage.snapshot_path();
391
392        let metadata =
393            fs::metadata(snapshot_path).map_err(|source| SessionError::IndexMetadata {
394                path: snapshot_path.to_path_buf(),
395                source,
396            })?;
397        let file_mtime = metadata
398            .modified()
399            .map_err(|source| SessionError::IndexMetadata {
400                path: snapshot_path.to_path_buf(),
401                source,
402            })?;
403
404        let graph = load_from_path(snapshot_path, Some(self.query_executor.plugin_manager()))
405            .map_err(|source| SessionError::IndexLoad {
406                path: snapshot_path.to_path_buf(),
407                source: source.into(),
408            })?;
409        self.disk_loads.fetch_add(1, Ordering::Relaxed);
410        let arc_graph = Arc::new(graph);
411
412        let cached = CachedIndex::new(Arc::clone(&arc_graph), file_mtime);
413        self.cache.insert(path.to_path_buf(), cached);
414
415        self.register_watcher(path, storage.manifest_path());
416
417        Ok(arc_graph)
418    }
419
420    /// Register a filesystem watch for the workspace's
421    /// `.sqry/graph/manifest.json` so cache entries are invalidated when
422    /// the live build pipeline rewrites the manifest. The watcher's
423    /// `handle_event` filter matches only manifest writes (see
424    /// `session::watcher::FileWatcher::handle_event`), so the registered
425    /// path **must** be the manifest path; passing the snapshot path here
426    /// would silently disable invalidation because no callback would ever
427    /// match the filtered event path.
428    fn register_watcher(&self, workspace_path: &Path, manifest_path: &Path) {
429        if let Ok(mut watcher) = self.watcher.lock() {
430            let cache = Arc::clone(&self.cache);
431            let callback_path = workspace_path.to_path_buf();
432            let watch_path = manifest_path.to_path_buf();
433            if let Err(err) = watcher.watch(watch_path.clone(), move || {
434                cache.remove(&callback_path);
435                info!(
436                    "Invalidated session cache after graph change: {}",
437                    callback_path.display()
438                );
439            }) {
440                debug!(
441                    "failed to register file watcher for {}: {err}",
442                    watch_path.display()
443                );
444            }
445        }
446    }
447
448    /// Evict least-recently-used entry when cache is at capacity.
449    fn evict_lru(&self) {
450        let mut oldest: Option<(PathBuf, Instant)> = None;
451
452        for entry in self.cache.iter() {
453            let last_accessed = entry.value().last_accessed();
454            if oldest
455                .as_ref()
456                .is_none_or(|(_, instant)| last_accessed < *instant)
457            {
458                oldest = Some((entry.key().clone(), last_accessed));
459            }
460        }
461
462        if let Some((path, _)) = oldest {
463            self.cache.remove(&path);
464            debug!("evicted LRU session cache entry: {}", path.display());
465            if let Err(err) = Self::unwatch_manifest_for(&self.watcher, &path) {
466                debug!("failed to unwatch session path {}: {err}", path.display());
467            }
468        }
469    }
470
471    fn evict_stale_for(
472        cache: &Arc<DashMap<PathBuf, CachedIndex>>,
473        watcher: &Arc<Mutex<FileWatcher>>,
474        timeout: Duration,
475        now: Instant,
476    ) -> SessionResult<usize> {
477        let mut to_remove = Vec::new();
478
479        for entry in cache.iter() {
480            let idle = now.duration_since(entry.value().last_accessed());
481            if idle > timeout {
482                to_remove.push(entry.key().clone());
483            }
484        }
485
486        for path in &to_remove {
487            cache.remove(path);
488        }
489
490        if !to_remove.is_empty()
491            && let Ok(mut guard) = watcher.lock()
492        {
493            for path in &to_remove {
494                // Must match the path used in `register_watcher`
495                // (`.sqry/graph/manifest.json`), otherwise the watcher
496                // callback table never receives the unwatch and the entry
497                // leaks.
498                let manifest_path = Self::manifest_path_for_key(path);
499                guard.unwatch(&manifest_path)?;
500            }
501        }
502
503        Ok(to_remove.len())
504    }
505
506    fn canonical_workspace_key(path: &Path) -> SessionResult<PathBuf> {
507        path.canonicalize()
508            .map_err(|source| SessionError::IndexMetadata {
509                path: path.to_path_buf(),
510                source,
511            })
512    }
513
514    fn manifest_path_for_key(cache_key: &Path) -> PathBuf {
515        GraphStorage::new(cache_key).manifest_path().to_path_buf()
516    }
517
518    fn unwatch_manifest_for(
519        watcher: &Arc<Mutex<FileWatcher>>,
520        cache_key: &Path,
521    ) -> SessionResult<()> {
522        if let Ok(mut guard) = watcher.lock() {
523            let manifest_path = Self::manifest_path_for_key(cache_key);
524            guard.unwatch(&manifest_path)?;
525        }
526        Ok(())
527    }
528
529    fn map_query_error(err: crate::Error) -> SessionError {
530        let error_msg = err.to_string();
531        if error_msg.contains("parse")
532            || error_msg.contains("unexpected")
533            || error_msg.contains("expected")
534        {
535            SessionError::QueryParse(error_msg)
536        } else {
537            SessionError::QueryExecution(err)
538        }
539    }
540
541    fn stop_worker(&mut self) {
542        {
543            let mut stop = self.shutdown.0.lock();
544            *stop = true;
545            self.shutdown.1.notify_all();
546        }
547
548        if let Some(handle) = self.cleanup_handle.take()
549            && let Err(err) = handle.join()
550        {
551            debug!("session cleanup thread terminated with error: {err:?}");
552        }
553    }
554
555    /// Gracefully shut down the cleanup thread. Optional, as [`Drop`] handles it.
556    ///
557    /// # Errors
558    ///
559    /// Returns [`SessionError`] when shutting down watcher operations fails.
560    pub fn shutdown(mut self) -> SessionResult<()> {
561        self.stop_worker();
562        Ok(())
563    }
564}
565
566impl Drop for SessionManager {
567    fn drop(&mut self) {
568        self.stop_worker();
569    }
570}
571
572/// High-level statistics describing the current session cache.
573#[derive(Debug, Clone)]
574pub struct SessionStats {
575    /// Number of cached graphs retained in-memory.
576    pub cached_graphs: usize,
577    /// Total number of graph accesses via this manager.
578    pub total_queries: u64,
579    /// Number of accesses served from cache.
580    pub cache_hits: u64,
581    /// Number of accesses that triggered a load from disk.
582    pub cache_misses: u64,
583    /// Estimated total memory footprint in megabytes.
584    pub total_memory_mb: usize,
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590    use crate::graph::unified::persistence::save_to_path;
591    use serial_test::serial;
592    use std::sync::{Arc, Barrier};
593    use std::thread;
594    use std::time::{Duration, Instant};
595    use tempfile::tempdir;
596
597    fn write_empty_graph(dir: &Path) -> SessionResult<()> {
598        let storage = GraphStorage::new(dir);
599        fs::create_dir_all(storage.graph_dir()).map_err(|source| SessionError::IndexMetadata {
600            path: storage.graph_dir().to_path_buf(),
601            source,
602        })?;
603        let graph = CodeGraph::new();
604        save_to_path(&graph, storage.snapshot_path()).map_err(|source| {
605            SessionError::IndexLoad {
606                path: storage.snapshot_path().to_path_buf(),
607                source: source.into(),
608            }
609        })?;
610        // Production callers register the file watcher against
611        // `<workspace>/.sqry/graph/manifest.json` — and `notify` requires
612        // the watch target to exist before `watch()` is called. The live
613        // build pipeline writes the manifest alongside the snapshot, so
614        // mirror that here for tests that depend on watcher registration.
615        fs::write(storage.manifest_path(), b"{}").map_err(|source| {
616            SessionError::IndexMetadata {
617                path: storage.manifest_path().to_path_buf(),
618                source,
619            }
620        })?;
621        Ok(())
622    }
623
624    fn watcher_timeout() -> Duration {
625        let base = if cfg!(target_os = "macos") {
626            Duration::from_secs(3)
627        } else {
628            Duration::from_secs(2)
629        };
630
631        if std::env::var("CI").is_ok() {
632            base * 2
633        } else {
634            base
635        }
636    }
637
638    fn background_timeout() -> Duration {
639        let base = if cfg!(target_os = "macos") {
640            Duration::from_secs(5)
641        } else {
642            Duration::from_secs(3)
643        };
644
645        if std::env::var("CI").is_ok() {
646            base * 2
647        } else {
648            base
649        }
650    }
651
652    fn wait_until<F>(timeout: Duration, mut predicate: F) -> bool
653    where
654        F: FnMut() -> bool,
655    {
656        let deadline = Instant::now() + timeout;
657        loop {
658            if predicate() {
659                return true;
660            }
661            if Instant::now() >= deadline {
662                return false;
663            }
664            thread::sleep(Duration::from_millis(50));
665        }
666    }
667
668    fn canonical(path: &Path) -> PathBuf {
669        path.canonicalize().unwrap()
670    }
671
672    #[test]
673    fn get_graph_loads_and_updates_stats() {
674        let temp = tempdir().unwrap();
675        write_empty_graph(temp.path()).unwrap();
676
677        let manager = SessionManager::new().unwrap();
678
679        let graph = manager.get_graph(temp.path()).unwrap();
680        assert_eq!(graph.snapshot().nodes().len(), 0);
681
682        let stats = manager.stats();
683        assert_eq!(stats.total_queries, 1);
684        assert_eq!(stats.cache_misses, 1);
685        assert_eq!(stats.cache_hits, 0);
686        assert_eq!(stats.cached_graphs, 1);
687
688        manager.shutdown().unwrap();
689    }
690
691    #[test]
692    fn get_graph_missing_returns_error() {
693        let temp = tempdir().unwrap();
694
695        let manager = SessionManager::new().unwrap();
696        let err = manager
697            .get_graph(temp.path())
698            .expect_err("get_graph should fail without graph");
699
700        assert!(matches!(err, SessionError::IndexMetadata { .. }));
701        manager.shutdown().unwrap();
702    }
703
704    #[test]
705    fn preload_does_not_affect_stats() {
706        let temp = tempdir().unwrap();
707        write_empty_graph(temp.path()).unwrap();
708
709        let manager = SessionManager::new().unwrap();
710        manager.preload(temp.path()).unwrap();
711
712        let stats = manager.stats();
713        assert_eq!(stats.total_queries, 0);
714        assert_eq!(stats.cache_hits, 0);
715        assert_eq!(stats.cache_misses, 0);
716        assert_eq!(stats.cached_graphs, 1);
717
718        manager.get_graph(temp.path()).unwrap();
719        let after = manager.stats();
720        assert_eq!(after.total_queries, 1);
721        assert_eq!(after.cache_hits, 1);
722        assert_eq!(after.cache_misses, 0);
723        assert_eq!(after.cached_graphs, 1);
724
725        manager.shutdown().unwrap();
726    }
727
728    #[test]
729    fn query_records_session_cache_miss_then_hit() {
730        let temp = tempdir().unwrap();
731        write_empty_graph(temp.path()).unwrap();
732
733        let manager = SessionManager::new().unwrap();
734
735        let first = manager.query(temp.path(), "kind:function").unwrap();
736        assert_eq!(first.len(), 0);
737        let after_first = manager.stats();
738        assert_eq!(after_first.total_queries, 1);
739        assert_eq!(after_first.cache_misses, 1);
740        assert_eq!(after_first.cache_hits, 0);
741        assert_eq!(after_first.cached_graphs, 1);
742
743        let second = manager.query(temp.path(), "kind:function").unwrap();
744        assert_eq!(second.len(), 0);
745        let after_second = manager.stats();
746        assert_eq!(after_second.total_queries, 2);
747        assert_eq!(after_second.cache_misses, 1);
748        assert_eq!(after_second.cache_hits, 1);
749        assert_eq!(after_second.cached_graphs, 1);
750
751        manager.shutdown().unwrap();
752    }
753
754    #[test]
755    fn preload_warms_query_cache() {
756        let temp = tempdir().unwrap();
757        write_empty_graph(temp.path()).unwrap();
758
759        let manager = SessionManager::new().unwrap();
760        manager.preload(temp.path()).unwrap();
761
762        let results = manager.query(temp.path(), "kind:function").unwrap();
763        assert_eq!(results.len(), 0);
764
765        let stats = manager.stats();
766        assert_eq!(stats.total_queries, 1);
767        assert_eq!(stats.cache_misses, 0);
768        assert_eq!(stats.cache_hits, 1);
769        assert_eq!(stats.cached_graphs, 1);
770
771        manager.shutdown().unwrap();
772    }
773
774    #[test]
775    fn second_access_hits_cache() {
776        let temp = tempdir().unwrap();
777        write_empty_graph(temp.path()).unwrap();
778
779        let manager = SessionManager::new().unwrap();
780
781        manager.get_graph(temp.path()).unwrap();
782        manager.get_graph(temp.path()).unwrap();
783
784        let stats = manager.stats();
785        assert_eq!(stats.total_queries, 2);
786        assert_eq!(stats.cache_misses, 1);
787        assert_eq!(stats.cache_hits, 1);
788        assert_eq!(stats.cached_graphs, 1);
789
790        manager.shutdown().unwrap();
791    }
792
793    #[test]
794    fn concurrent_access_shares_cache() {
795        let temp = tempdir().unwrap();
796        write_empty_graph(temp.path()).unwrap();
797
798        let manager = Arc::new(SessionManager::new().unwrap());
799        let path = temp.path().to_path_buf();
800        let barrier = Arc::new(Barrier::new(7));
801
802        let handles: Vec<_> = (0..6)
803            .map(|_| {
804                let mgr = Arc::clone(&manager);
805                let path = path.clone();
806                let barrier = Arc::clone(&barrier);
807                thread::spawn(move || {
808                    barrier.wait();
809                    mgr.get_graph(&path).unwrap();
810                })
811            })
812            .collect();
813
814        barrier.wait();
815        for handle in handles {
816            handle.join().unwrap();
817        }
818
819        let stats = manager.stats();
820        assert_eq!(stats.total_queries, 6);
821        assert_eq!(stats.cache_misses, 1);
822        assert_eq!(stats.cache_hits, 5);
823        assert_eq!(manager.disk_loads.load(Ordering::Relaxed), 1);
824        assert!(
825            manager.load_locks.is_empty(),
826            "concurrent singleflight lock must be reclaimed after waiters finish"
827        );
828
829        let manager = Arc::into_inner(manager).expect("no outstanding references");
830        manager.shutdown().unwrap();
831    }
832
833    #[test]
834    fn cold_load_lock_is_reclaimed_after_successful_load() {
835        let temp = tempdir().unwrap();
836        write_empty_graph(temp.path()).unwrap();
837
838        let manager = SessionManager::new().unwrap();
839        manager.get_graph(temp.path()).unwrap();
840
841        assert!(
842            manager.load_locks.is_empty(),
843            "singleflight locks must not outlive completed loads"
844        );
845
846        manager.shutdown().unwrap();
847    }
848
849    #[test]
850    fn preload_lock_is_reclaimed_after_cache_hit_double_check() {
851        let temp = tempdir().unwrap();
852        write_empty_graph(temp.path()).unwrap();
853
854        let manager = SessionManager::new().unwrap();
855        manager.preload(temp.path()).unwrap();
856        manager.invalidate(temp.path()).unwrap();
857        manager.preload(temp.path()).unwrap();
858
859        assert!(
860            manager.load_locks.is_empty(),
861            "preload must reclaim singleflight locks after load completion"
862        );
863
864        manager.shutdown().unwrap();
865    }
866
867    #[test]
868    fn failed_load_lock_is_reclaimed_for_retry() {
869        let temp = tempdir().unwrap();
870        let missing = temp.path().join("missing-index");
871        fs::create_dir_all(&missing).unwrap();
872
873        let manager = SessionManager::new().unwrap();
874        let err = manager
875            .get_graph(&missing)
876            .expect_err("missing graph snapshot should fail");
877        assert!(matches!(err, SessionError::IndexMetadata { .. }));
878        assert!(
879            manager.load_locks.is_empty(),
880            "failed loads must not leave stale singleflight locks"
881        );
882
883        write_empty_graph(&missing).unwrap();
884        manager.get_graph(&missing).unwrap();
885        assert_eq!(manager.disk_loads.load(Ordering::Relaxed), 1);
886        assert!(
887            manager.load_locks.is_empty(),
888            "retry after failure must also reclaim its singleflight lock"
889        );
890
891        manager.shutdown().unwrap();
892    }
893
894    #[test]
895    fn load_locks_stay_bounded_across_many_distinct_paths() {
896        let temp = tempdir().unwrap();
897        let manager = SessionManager::new().unwrap();
898
899        for index in 0..16 {
900            let repo = temp.path().join(format!("repo-{index}"));
901            write_empty_graph(&repo).unwrap();
902            manager.get_graph(&repo).unwrap();
903            assert!(
904                manager.load_locks.is_empty(),
905                "completed load for {} left a singleflight lock behind",
906                repo.display()
907            );
908            manager.invalidate(&repo).unwrap();
909        }
910
911        assert!(
912            manager.load_locks.is_empty(),
913            "distinct workspace churn must not grow the singleflight lock map"
914        );
915
916        manager.shutdown().unwrap();
917    }
918
919    #[test]
920    #[serial]
921    fn relative_and_absolute_paths_share_cache_entry() {
922        let temp = tempdir().unwrap();
923        let repo = temp.path().join("repo");
924        write_empty_graph(&repo).unwrap();
925
926        let original_cwd = std::env::current_dir().unwrap();
927        std::env::set_current_dir(temp.path()).unwrap();
928
929        let manager = SessionManager::new().unwrap();
930        manager.get_graph(Path::new("repo")).unwrap();
931        manager.get_graph(&repo).unwrap();
932
933        std::env::set_current_dir(original_cwd).unwrap();
934
935        let stats = manager.stats();
936        assert_eq!(stats.cached_graphs, 1);
937        assert_eq!(stats.cache_misses, 1);
938        assert_eq!(stats.cache_hits, 1);
939        assert!(manager.cache.contains_key(&canonical(&repo)));
940
941        manager.shutdown().unwrap();
942    }
943
944    #[test]
945    #[cfg(unix)]
946    fn symlink_equivalent_paths_share_cache_entry() {
947        let temp = tempdir().unwrap();
948        let repo = temp.path().join("repo");
949        let link = temp.path().join("repo-link");
950        write_empty_graph(&repo).unwrap();
951        std::os::unix::fs::symlink(&repo, &link).unwrap();
952
953        let manager = SessionManager::new().unwrap();
954        manager.get_graph(&repo).unwrap();
955        manager.get_graph(&link).unwrap();
956
957        let stats = manager.stats();
958        assert_eq!(stats.cached_graphs, 1);
959        assert_eq!(stats.cache_misses, 1);
960        assert_eq!(stats.cache_hits, 1);
961        assert_eq!(manager.disk_loads.load(Ordering::Relaxed), 1);
962
963        manager.shutdown().unwrap();
964    }
965
966    #[test]
967    fn invalid_query_does_not_load_or_mutate_stats() {
968        let temp = tempdir().unwrap();
969        write_empty_graph(temp.path()).unwrap();
970
971        let manager = SessionManager::new().unwrap();
972        let err = manager
973            .query(temp.path(), "kind:")
974            .expect_err("invalid query should fail before graph load");
975        assert!(matches!(err, SessionError::QueryParse(_)));
976
977        let stats = manager.stats();
978        assert_eq!(stats.total_queries, 0);
979        assert_eq!(stats.cache_misses, 0);
980        assert_eq!(stats.cache_hits, 0);
981        assert_eq!(stats.cached_graphs, 0);
982        assert_eq!(manager.disk_loads.load(Ordering::Relaxed), 0);
983
984        manager.shutdown().unwrap();
985    }
986
987    #[test]
988    fn invalidate_removes_cached_graph() {
989        let temp = tempdir().unwrap();
990        write_empty_graph(temp.path()).unwrap();
991
992        let manager = SessionManager::new().unwrap();
993        manager.get_graph(temp.path()).unwrap();
994        assert_eq!(manager.stats().cached_graphs, 1);
995
996        manager.invalidate(temp.path()).unwrap();
997        assert_eq!(manager.stats().cached_graphs, 0);
998
999        manager.shutdown().unwrap();
1000    }
1001
1002    #[test]
1003    #[cfg(unix)]
1004    fn invalidate_alias_unwatches_registered_manifest_path() {
1005        let temp = tempdir().unwrap();
1006        let repo = temp.path().join("repo");
1007        let link = temp.path().join("repo-link");
1008        write_empty_graph(&repo).unwrap();
1009        std::os::unix::fs::symlink(&repo, &link).unwrap();
1010
1011        let manager = SessionManager::new().unwrap();
1012        manager.preload(&repo).unwrap();
1013
1014        let manifest = GraphStorage::new(&canonical(&repo))
1015            .manifest_path()
1016            .to_path_buf();
1017        assert!(
1018            manager
1019                .watcher
1020                .lock()
1021                .expect("watcher lock poisoned in test")
1022                .watched_paths()
1023                .contains(&manifest)
1024        );
1025
1026        manager.invalidate(&link).unwrap();
1027
1028        let watched = manager
1029            .watcher
1030            .lock()
1031            .expect("watcher lock poisoned in test")
1032            .watched_paths();
1033        assert!(
1034            !watched.contains(&manifest),
1035            "manual invalidation through an alias must unwatch registered manifest path"
1036        );
1037        assert_eq!(manager.stats().cached_graphs, 0);
1038
1039        manager.shutdown().unwrap();
1040    }
1041
1042    #[test]
1043    fn watcher_trigger_invalidates_canonical_cache_key() {
1044        let temp = tempdir().unwrap();
1045        write_empty_graph(temp.path()).unwrap();
1046
1047        let manager = SessionManager::new().unwrap();
1048        manager.preload(temp.path()).unwrap();
1049
1050        let cache_key = canonical(temp.path());
1051        assert!(manager.cache.contains_key(&cache_key));
1052
1053        let manifest = GraphStorage::new(&cache_key).manifest_path().to_path_buf();
1054        let triggered = manager
1055            .watcher
1056            .lock()
1057            .expect("watcher lock poisoned in test")
1058            .trigger_for_test(&manifest);
1059
1060        assert!(
1061            triggered,
1062            "expected manifest watcher callback to be registered"
1063        );
1064        assert!(
1065            !manager.cache.contains_key(&cache_key),
1066            "watcher callback must invalidate canonical cache key"
1067        );
1068
1069        manager.shutdown().unwrap();
1070    }
1071
1072    #[test]
1073    fn lru_eviction_removes_oldest_entry() {
1074        let temp = tempdir().unwrap();
1075        let base = temp.path();
1076
1077        let config = SessionConfig {
1078            max_cached_indexes: 2,
1079            ..SessionConfig::default()
1080        };
1081        let manager = SessionManager::with_config(config).unwrap();
1082
1083        let repo1 = base.join("repo1");
1084        let repo2 = base.join("repo2");
1085        let repo3 = base.join("repo3");
1086        write_empty_graph(&repo1).unwrap();
1087        write_empty_graph(&repo2).unwrap();
1088        write_empty_graph(&repo3).unwrap();
1089
1090        manager.get_graph(&repo1).unwrap();
1091        manager.get_graph(&repo2).unwrap();
1092
1093        // Access repo2 again so repo1 stays LRU.
1094        manager.get_graph(&repo2).unwrap();
1095        manager.get_graph(&repo3).unwrap();
1096
1097        assert_eq!(manager.stats().cached_graphs, 2);
1098        assert!(manager.cache.contains_key(&canonical(&repo2)));
1099        assert!(manager.cache.contains_key(&canonical(&repo3)));
1100        assert!(!manager.cache.contains_key(&canonical(&repo1)));
1101
1102        manager.shutdown().unwrap();
1103    }
1104
1105    #[test]
1106    fn evict_stale_purges_idle_entries() {
1107        let temp = tempdir().unwrap();
1108        write_empty_graph(temp.path()).unwrap();
1109
1110        let config = SessionConfig {
1111            idle_timeout: Duration::from_millis(100),
1112            cleanup_interval: Duration::from_secs(3600),
1113            ..SessionConfig::default()
1114        };
1115        let manager = SessionManager::with_config(config).unwrap();
1116
1117        manager.get_graph(temp.path()).unwrap();
1118        assert_eq!(manager.stats().cached_graphs, 1);
1119
1120        // Simulate last access in the past to avoid sleeping too long.
1121        if let Some(entry) = manager.cache.get(&canonical(temp.path())) {
1122            entry.value().set_last_accessed(
1123                Instant::now()
1124                    .checked_sub(Duration::from_millis(200))
1125                    .unwrap(),
1126            );
1127        }
1128
1129        let evicted = manager.evict_stale(Instant::now()).unwrap();
1130        assert_eq!(evicted, 1);
1131        assert_eq!(manager.stats().cached_graphs, 0);
1132
1133        manager.shutdown().unwrap();
1134    }
1135
1136    #[test]
1137    fn register_watcher_uses_manifest_path() {
1138        // Regression test for STEP_3 codex iter2 BLOCK: production must
1139        // register the watcher against `.sqry/graph/manifest.json` —
1140        // the only path that `FileWatcher::handle_event` matches — not
1141        // the snapshot path. Wiring the watcher to the snapshot path is
1142        // a silent no-op because no event will ever satisfy the
1143        // manifest filter.
1144        let temp = tempdir().unwrap();
1145        write_empty_graph(temp.path()).unwrap();
1146
1147        let manager = SessionManager::new().unwrap();
1148        manager.get_graph(temp.path()).unwrap();
1149
1150        let storage = GraphStorage::new(&canonical(temp.path()));
1151        let watched = manager
1152            .watcher
1153            .lock()
1154            .expect("watcher lock poisoned in test")
1155            .watched_paths();
1156
1157        assert!(
1158            watched.contains(&storage.manifest_path().to_path_buf()),
1159            "watcher must be registered for manifest path {} (registered: {:?})",
1160            storage.manifest_path().display(),
1161            watched,
1162        );
1163        assert!(
1164            !watched.contains(&storage.snapshot_path().to_path_buf()),
1165            "watcher must NOT be registered for snapshot path {} (registered: {:?})",
1166            storage.snapshot_path().display(),
1167            watched,
1168        );
1169
1170        manager.shutdown().unwrap();
1171    }
1172
1173    #[test]
1174    fn evict_lru_unwatches_manifest_path() {
1175        // Regression test: LRU eviction's `unwatch` call must mirror
1176        // `register_watcher` and target the manifest path. If the paths
1177        // diverge the unwatch silently no-ops and the watcher leaks.
1178        let temp = tempdir().unwrap();
1179        let base = temp.path();
1180
1181        let config = SessionConfig {
1182            max_cached_indexes: 1,
1183            ..SessionConfig::default()
1184        };
1185        let manager = SessionManager::with_config(config).unwrap();
1186
1187        let repo1 = base.join("repo1");
1188        let repo2 = base.join("repo2");
1189        write_empty_graph(&repo1).unwrap();
1190        write_empty_graph(&repo2).unwrap();
1191
1192        manager.get_graph(&repo1).unwrap();
1193        manager.get_graph(&repo2).unwrap(); // evicts repo1
1194
1195        let watched = manager
1196            .watcher
1197            .lock()
1198            .expect("watcher lock poisoned in test")
1199            .watched_paths();
1200
1201        let repo1_manifest = GraphStorage::new(&canonical(&repo1))
1202            .manifest_path()
1203            .to_path_buf();
1204        let repo2_manifest = GraphStorage::new(&canonical(&repo2))
1205            .manifest_path()
1206            .to_path_buf();
1207        assert!(
1208            !watched.contains(&repo1_manifest),
1209            "evicted workspace's manifest watch must be released; still watching: {watched:?}",
1210        );
1211        assert!(
1212            watched.contains(&repo2_manifest),
1213            "current workspace must remain watched; registered: {watched:?}",
1214        );
1215
1216        manager.shutdown().unwrap();
1217    }
1218
1219    #[test]
1220    #[ignore = "flaky: timing-sensitive file watcher test"]
1221    fn file_changes_trigger_invalidation() {
1222        let temp = tempdir().unwrap();
1223        write_empty_graph(temp.path()).unwrap();
1224
1225        let manager = SessionManager::new().unwrap();
1226        manager.get_graph(temp.path()).unwrap();
1227        let cache_key = canonical(temp.path());
1228        assert!(manager.cache.contains_key(&cache_key));
1229
1230        // The watcher matches writes to `.sqry/graph/manifest.json` —
1231        // touching the snapshot file would never fire a callback.
1232        let storage = GraphStorage::new(&cache_key);
1233        std::fs::write(storage.manifest_path(), b"modified").unwrap();
1234
1235        let evicted = wait_until(watcher_timeout(), || {
1236            manager
1237                .watcher
1238                .lock()
1239                .expect("watcher lock poisoned in test")
1240                .process_events()
1241                .expect("watcher failed to process events in test");
1242            !manager.cache.contains_key(&cache_key)
1243        });
1244        assert!(evicted, "expected watcher to invalidate cache entry");
1245
1246        manager.shutdown().unwrap();
1247    }
1248
1249    #[test]
1250    #[ignore = "flaky: timing-sensitive background thread test"]
1251    fn background_thread_processes_watcher_events() {
1252        let temp = tempdir().unwrap();
1253        write_empty_graph(temp.path()).unwrap();
1254
1255        let config = SessionConfig {
1256            cleanup_interval: Duration::from_millis(50),
1257            ..SessionConfig::default()
1258        };
1259        let manager = SessionManager::with_config(config).unwrap();
1260
1261        manager.get_graph(temp.path()).unwrap();
1262        let cache_key = canonical(temp.path());
1263        let storage = GraphStorage::new(&cache_key);
1264        std::fs::write(storage.manifest_path(), b"changed").unwrap();
1265
1266        let evicted = wait_until(background_timeout(), || {
1267            !manager.cache.contains_key(&cache_key)
1268        });
1269        assert!(
1270            evicted,
1271            "background thread failed to remove watcher-invalidated entry"
1272        );
1273
1274        manager.shutdown().unwrap();
1275    }
1276
1277    #[test]
1278    fn background_thread_evicts_idle_entries() {
1279        let temp = tempdir().unwrap();
1280        write_empty_graph(temp.path()).unwrap();
1281
1282        let config = SessionConfig {
1283            idle_timeout: Duration::from_millis(50),
1284            cleanup_interval: Duration::from_millis(30),
1285            ..SessionConfig::default()
1286        };
1287        let manager = SessionManager::with_config(config).unwrap();
1288
1289        manager.get_graph(temp.path()).unwrap();
1290        assert_eq!(manager.stats().cached_graphs, 1);
1291
1292        if let Some(entry) = manager.cache.get(&canonical(temp.path())) {
1293            entry.value().set_last_accessed(
1294                Instant::now()
1295                    .checked_sub(Duration::from_millis(200))
1296                    .unwrap(),
1297            );
1298        }
1299
1300        let evicted = wait_until(background_timeout(), || {
1301            !manager.cache.contains_key(&canonical(temp.path()))
1302        });
1303        assert!(
1304            evicted,
1305            "background eviction thread did not remove idle entry"
1306        );
1307
1308        manager.shutdown().unwrap();
1309    }
1310}