1use std::fs;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18use log::{debug, info};
19use parking_lot::{Condvar, Mutex as PlMutex};
20
21use crate::graph::CodeGraph;
22use crate::graph::unified::persistence::{GraphStorage, load_from_path};
23use crate::plugin::PluginManager;
24use crate::query::QueryExecutor;
25
26use super::cache::CachedIndex;
27use super::watcher::FileWatcher;
28use super::{SessionConfig, SessionError, SessionResult};
29
30pub struct SessionManager {
32 cache: Arc<DashMap<PathBuf, CachedIndex>>,
33 config: SessionConfig,
34 watcher: Arc<Mutex<FileWatcher>>,
35 load_locks: Arc<DashMap<PathBuf, Arc<PlMutex<()>>>>,
36 cleanup_handle: Option<JoinHandle<()>>,
37 shutdown: Arc<(PlMutex<bool>, Condvar)>,
47 total_queries: Arc<AtomicU64>,
48 cache_hits: Arc<AtomicU64>,
49 cache_misses: Arc<AtomicU64>,
50 disk_loads: Arc<AtomicU64>,
51 query_executor: QueryExecutor,
53}
54
55struct LoadLockCleanup {
56 load_locks: Arc<DashMap<PathBuf, Arc<PlMutex<()>>>>,
57 cache_key: PathBuf,
58 load_lock: Arc<PlMutex<()>>,
59}
60
61impl LoadLockCleanup {
62 fn new(
63 load_locks: Arc<DashMap<PathBuf, Arc<PlMutex<()>>>>,
64 cache_key: PathBuf,
65 load_lock: Arc<PlMutex<()>>,
66 ) -> Self {
67 Self {
68 load_locks,
69 cache_key,
70 load_lock,
71 }
72 }
73}
74
75impl Drop for LoadLockCleanup {
76 fn drop(&mut self) {
77 self.load_locks
78 .remove_if(&self.cache_key, |_, current_lock| {
79 Arc::ptr_eq(current_lock, &self.load_lock)
80 });
81 }
82}
83
84impl SessionManager {
85 pub fn new() -> SessionResult<Self> {
96 Self::with_config_and_plugins(SessionConfig::default(), PluginManager::new())
97 }
98
99 pub fn with_plugin_manager(plugin_manager: PluginManager) -> SessionResult<Self> {
113 Self::with_config_and_plugins(SessionConfig::default(), plugin_manager)
114 }
115
116 pub fn with_config(config: SessionConfig) -> SessionResult<Self> {
125 Self::with_config_and_plugins(config, PluginManager::new())
126 }
127
128 pub fn with_config_and_plugins(
142 config: SessionConfig,
143 plugin_manager: PluginManager,
144 ) -> SessionResult<Self> {
145 let watcher = if config.enable_file_watching {
146 FileWatcher::new()?
147 } else {
148 FileWatcher::disabled()
149 };
150
151 let cache = Arc::new(DashMap::new());
152 let load_locks = Arc::new(DashMap::new());
153 let watcher = Arc::new(Mutex::new(watcher));
154 let shutdown = Arc::new((PlMutex::new(false), Condvar::new()));
155
156 let cleanup_interval = config.cleanup_interval;
157 let idle_timeout = config.idle_timeout;
158
159 let cleanup_handle = {
160 let cache = Arc::clone(&cache);
161 let watcher_clone = Arc::clone(&watcher);
162 let shutdown_flag = Arc::clone(&shutdown);
163
164 thread::Builder::new()
165 .name("sqry-session-cleanup".into())
166 .spawn(move || {
167 loop {
168 if *shutdown_flag.0.lock() {
169 break;
170 }
171
172 if let Ok(mut guard) = watcher_clone.lock()
173 && let Err(err) = guard.process_events()
174 {
175 debug!("failed to process session watcher events: {err}");
176 }
177
178 let now = Instant::now();
179 if let Err(err) =
180 Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
181 {
182 debug!("failed to evict stale sessions: {err}");
183 }
184
185 let mut stop = shutdown_flag.0.lock();
191 if !*stop {
192 shutdown_flag.1.wait_for(&mut stop, cleanup_interval);
193 }
194 if *stop {
195 break;
196 }
197 }
198
199 if let Ok(mut guard) = watcher_clone.lock()
200 && let Err(err) = guard.process_events()
201 {
202 debug!("failed to process session watcher events during shutdown: {err}");
203 }
204 let now = Instant::now();
205 if let Err(err) =
206 Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
207 {
208 debug!("failed to evict stale sessions during shutdown: {err}");
209 }
210 })
211 .map_err(SessionError::SpawnThread)?
212 };
213
214 let query_executor = QueryExecutor::with_plugin_manager(plugin_manager);
216
217 Ok(Self {
218 cache,
219 config,
220 watcher,
221 load_locks,
222 cleanup_handle: Some(cleanup_handle),
223 shutdown,
224 total_queries: Arc::new(AtomicU64::new(0)),
225 cache_hits: Arc::new(AtomicU64::new(0)),
226 cache_misses: Arc::new(AtomicU64::new(0)),
227 disk_loads: Arc::new(AtomicU64::new(0)),
228 query_executor,
229 })
230 }
231
232 pub fn get_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
238 self.get_or_load_graph(path)
239 }
240
241 pub fn query(
252 &self,
253 path: &Path,
254 query_str: &str,
255 ) -> SessionResult<crate::query::results::QueryResults> {
256 self.query_executor
257 .parse_query_ast(query_str)
258 .map_err(|err| SessionError::QueryParse(err.to_string()))?;
259
260 let graph = self.get_or_load_graph(path)?;
261
262 let results = self
263 .query_executor
264 .execute_on_preloaded_graph(graph, query_str, path, None)
265 .map_err(Self::map_query_error)?;
266
267 Ok(results)
268 }
269
270 pub fn invalidate(&self, path: &Path) -> SessionResult<()> {
276 let cache_key = Self::canonical_workspace_key(path)?;
277 self.cache.remove(&cache_key);
278 self.load_locks.remove(&cache_key);
279 Self::unwatch_manifest_for(&self.watcher, &cache_key)?;
280 Ok(())
281 }
282
283 pub fn preload(&self, path: &Path) -> SessionResult<()> {
290 let cache_key = Self::canonical_workspace_key(path)?;
291 if self.cache.contains_key(&cache_key) {
292 return Ok(());
293 }
294
295 let load_lock = self.load_lock_for(&cache_key);
296 let _load_guard = load_lock.lock();
297 let _cleanup = LoadLockCleanup::new(
298 Arc::clone(&self.load_locks),
299 cache_key.clone(),
300 Arc::clone(&load_lock),
301 );
302
303 if self.cache.contains_key(&cache_key) {
304 return Ok(());
305 }
306
307 if self.cache.len() >= self.config.max_cached_indexes {
308 self.evict_lru();
309 }
310
311 self.load_graph_from_disk(&cache_key)?;
312 Ok(())
313 }
314
315 #[must_use]
317 pub fn stats(&self) -> SessionStats {
318 SessionStats {
319 cached_graphs: self.cache.len(),
320 total_queries: self.total_queries.load(Ordering::Relaxed),
321 cache_hits: self.cache_hits.load(Ordering::Relaxed),
322 cache_misses: self.cache_misses.load(Ordering::Relaxed),
323 total_memory_mb: self.cache.len() * 51,
324 }
325 }
326
327 pub fn evict_stale(&self, now: Instant) -> SessionResult<usize> {
333 Self::evict_stale_for(&self.cache, &self.watcher, self.config.idle_timeout, now)
334 }
335
336 fn get_or_load_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
337 debug_assert!(
338 self.config.max_cached_indexes > 0,
339 "SessionConfig::max_cached_indexes must be at least 1"
340 );
341 let cache_key = Self::canonical_workspace_key(path)?;
342 self.total_queries.fetch_add(1, Ordering::Relaxed);
343
344 if let Some(entry) = self.cache.get_mut(&cache_key) {
345 entry.value().access();
346 self.cache_hits.fetch_add(1, Ordering::Relaxed);
347 return Ok(Arc::clone(&entry.value().graph));
348 }
349
350 let load_lock = self.load_lock_for(&cache_key);
351 let _load_guard = load_lock.lock();
352 let _cleanup = LoadLockCleanup::new(
353 Arc::clone(&self.load_locks),
354 cache_key.clone(),
355 Arc::clone(&load_lock),
356 );
357
358 if let Some(entry) = self.cache.get_mut(&cache_key) {
359 entry.value().access();
360 self.cache_hits.fetch_add(1, Ordering::Relaxed);
361 return Ok(Arc::clone(&entry.value().graph));
362 }
363
364 self.cache_misses.fetch_add(1, Ordering::Relaxed);
365
366 if self.cache.len() >= self.config.max_cached_indexes {
367 self.evict_lru();
368 }
369
370 let graph = self.load_graph_from_disk(&cache_key)?;
371
372 if let Some(entry) = self.cache.get_mut(&cache_key) {
373 entry.value().access();
374 }
375
376 Ok(graph)
377 }
378
379 fn load_lock_for(&self, cache_key: &Path) -> Arc<PlMutex<()>> {
380 Arc::clone(
381 &self
382 .load_locks
383 .entry(cache_key.to_path_buf())
384 .or_insert_with(|| Arc::new(PlMutex::new(()))),
385 )
386 }
387
388 fn load_graph_from_disk(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
389 let storage = GraphStorage::new(path);
390 let snapshot_path = storage.snapshot_path();
391
392 let metadata =
393 fs::metadata(snapshot_path).map_err(|source| SessionError::IndexMetadata {
394 path: snapshot_path.to_path_buf(),
395 source,
396 })?;
397 let file_mtime = metadata
398 .modified()
399 .map_err(|source| SessionError::IndexMetadata {
400 path: snapshot_path.to_path_buf(),
401 source,
402 })?;
403
404 let graph = load_from_path(snapshot_path, Some(self.query_executor.plugin_manager()))
405 .map_err(|source| SessionError::IndexLoad {
406 path: snapshot_path.to_path_buf(),
407 source: source.into(),
408 })?;
409 self.disk_loads.fetch_add(1, Ordering::Relaxed);
410 let arc_graph = Arc::new(graph);
411
412 let cached = CachedIndex::new(Arc::clone(&arc_graph), file_mtime);
413 self.cache.insert(path.to_path_buf(), cached);
414
415 self.register_watcher(path, storage.manifest_path());
416
417 Ok(arc_graph)
418 }
419
420 fn register_watcher(&self, workspace_path: &Path, manifest_path: &Path) {
429 if let Ok(mut watcher) = self.watcher.lock() {
430 let cache = Arc::clone(&self.cache);
431 let callback_path = workspace_path.to_path_buf();
432 let watch_path = manifest_path.to_path_buf();
433 if let Err(err) = watcher.watch(watch_path.clone(), move || {
434 cache.remove(&callback_path);
435 info!(
436 "Invalidated session cache after graph change: {}",
437 callback_path.display()
438 );
439 }) {
440 debug!(
441 "failed to register file watcher for {}: {err}",
442 watch_path.display()
443 );
444 }
445 }
446 }
447
448 fn evict_lru(&self) {
450 let mut oldest: Option<(PathBuf, Instant)> = None;
451
452 for entry in self.cache.iter() {
453 let last_accessed = entry.value().last_accessed();
454 if oldest
455 .as_ref()
456 .is_none_or(|(_, instant)| last_accessed < *instant)
457 {
458 oldest = Some((entry.key().clone(), last_accessed));
459 }
460 }
461
462 if let Some((path, _)) = oldest {
463 self.cache.remove(&path);
464 debug!("evicted LRU session cache entry: {}", path.display());
465 if let Err(err) = Self::unwatch_manifest_for(&self.watcher, &path) {
466 debug!("failed to unwatch session path {}: {err}", path.display());
467 }
468 }
469 }
470
471 fn evict_stale_for(
472 cache: &Arc<DashMap<PathBuf, CachedIndex>>,
473 watcher: &Arc<Mutex<FileWatcher>>,
474 timeout: Duration,
475 now: Instant,
476 ) -> SessionResult<usize> {
477 let mut to_remove = Vec::new();
478
479 for entry in cache.iter() {
480 let idle = now.duration_since(entry.value().last_accessed());
481 if idle > timeout {
482 to_remove.push(entry.key().clone());
483 }
484 }
485
486 for path in &to_remove {
487 cache.remove(path);
488 }
489
490 if !to_remove.is_empty()
491 && let Ok(mut guard) = watcher.lock()
492 {
493 for path in &to_remove {
494 let manifest_path = Self::manifest_path_for_key(path);
499 guard.unwatch(&manifest_path)?;
500 }
501 }
502
503 Ok(to_remove.len())
504 }
505
506 fn canonical_workspace_key(path: &Path) -> SessionResult<PathBuf> {
507 path.canonicalize()
508 .map_err(|source| SessionError::IndexMetadata {
509 path: path.to_path_buf(),
510 source,
511 })
512 }
513
514 fn manifest_path_for_key(cache_key: &Path) -> PathBuf {
515 GraphStorage::new(cache_key).manifest_path().to_path_buf()
516 }
517
518 fn unwatch_manifest_for(
519 watcher: &Arc<Mutex<FileWatcher>>,
520 cache_key: &Path,
521 ) -> SessionResult<()> {
522 if let Ok(mut guard) = watcher.lock() {
523 let manifest_path = Self::manifest_path_for_key(cache_key);
524 guard.unwatch(&manifest_path)?;
525 }
526 Ok(())
527 }
528
529 fn map_query_error(err: crate::Error) -> SessionError {
530 let error_msg = err.to_string();
531 if error_msg.contains("parse")
532 || error_msg.contains("unexpected")
533 || error_msg.contains("expected")
534 {
535 SessionError::QueryParse(error_msg)
536 } else {
537 SessionError::QueryExecution(err)
538 }
539 }
540
541 fn stop_worker(&mut self) {
542 {
543 let mut stop = self.shutdown.0.lock();
544 *stop = true;
545 self.shutdown.1.notify_all();
546 }
547
548 if let Some(handle) = self.cleanup_handle.take()
549 && let Err(err) = handle.join()
550 {
551 debug!("session cleanup thread terminated with error: {err:?}");
552 }
553 }
554
555 pub fn shutdown(mut self) -> SessionResult<()> {
561 self.stop_worker();
562 Ok(())
563 }
564}
565
566impl Drop for SessionManager {
567 fn drop(&mut self) {
568 self.stop_worker();
569 }
570}
571
572#[derive(Debug, Clone)]
574pub struct SessionStats {
575 pub cached_graphs: usize,
577 pub total_queries: u64,
579 pub cache_hits: u64,
581 pub cache_misses: u64,
583 pub total_memory_mb: usize,
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590 use crate::graph::unified::persistence::save_to_path;
591 use serial_test::serial;
592 use std::sync::{Arc, Barrier};
593 use std::thread;
594 use std::time::{Duration, Instant};
595 use tempfile::tempdir;
596
597 fn write_empty_graph(dir: &Path) -> SessionResult<()> {
598 let storage = GraphStorage::new(dir);
599 fs::create_dir_all(storage.graph_dir()).map_err(|source| SessionError::IndexMetadata {
600 path: storage.graph_dir().to_path_buf(),
601 source,
602 })?;
603 let graph = CodeGraph::new();
604 save_to_path(&graph, storage.snapshot_path()).map_err(|source| {
605 SessionError::IndexLoad {
606 path: storage.snapshot_path().to_path_buf(),
607 source: source.into(),
608 }
609 })?;
610 fs::write(storage.manifest_path(), b"{}").map_err(|source| {
616 SessionError::IndexMetadata {
617 path: storage.manifest_path().to_path_buf(),
618 source,
619 }
620 })?;
621 Ok(())
622 }
623
624 fn watcher_timeout() -> Duration {
625 let base = if cfg!(target_os = "macos") {
626 Duration::from_secs(3)
627 } else {
628 Duration::from_secs(2)
629 };
630
631 if std::env::var("CI").is_ok() {
632 base * 2
633 } else {
634 base
635 }
636 }
637
638 fn background_timeout() -> Duration {
639 let base = if cfg!(target_os = "macos") {
640 Duration::from_secs(5)
641 } else {
642 Duration::from_secs(3)
643 };
644
645 if std::env::var("CI").is_ok() {
646 base * 2
647 } else {
648 base
649 }
650 }
651
652 fn wait_until<F>(timeout: Duration, mut predicate: F) -> bool
653 where
654 F: FnMut() -> bool,
655 {
656 let deadline = Instant::now() + timeout;
657 loop {
658 if predicate() {
659 return true;
660 }
661 if Instant::now() >= deadline {
662 return false;
663 }
664 thread::sleep(Duration::from_millis(50));
665 }
666 }
667
668 fn canonical(path: &Path) -> PathBuf {
669 path.canonicalize().unwrap()
670 }
671
672 #[test]
673 fn get_graph_loads_and_updates_stats() {
674 let temp = tempdir().unwrap();
675 write_empty_graph(temp.path()).unwrap();
676
677 let manager = SessionManager::new().unwrap();
678
679 let graph = manager.get_graph(temp.path()).unwrap();
680 assert_eq!(graph.snapshot().nodes().len(), 0);
681
682 let stats = manager.stats();
683 assert_eq!(stats.total_queries, 1);
684 assert_eq!(stats.cache_misses, 1);
685 assert_eq!(stats.cache_hits, 0);
686 assert_eq!(stats.cached_graphs, 1);
687
688 manager.shutdown().unwrap();
689 }
690
691 #[test]
692 fn get_graph_missing_returns_error() {
693 let temp = tempdir().unwrap();
694
695 let manager = SessionManager::new().unwrap();
696 let err = manager
697 .get_graph(temp.path())
698 .expect_err("get_graph should fail without graph");
699
700 assert!(matches!(err, SessionError::IndexMetadata { .. }));
701 manager.shutdown().unwrap();
702 }
703
704 #[test]
705 fn preload_does_not_affect_stats() {
706 let temp = tempdir().unwrap();
707 write_empty_graph(temp.path()).unwrap();
708
709 let manager = SessionManager::new().unwrap();
710 manager.preload(temp.path()).unwrap();
711
712 let stats = manager.stats();
713 assert_eq!(stats.total_queries, 0);
714 assert_eq!(stats.cache_hits, 0);
715 assert_eq!(stats.cache_misses, 0);
716 assert_eq!(stats.cached_graphs, 1);
717
718 manager.get_graph(temp.path()).unwrap();
719 let after = manager.stats();
720 assert_eq!(after.total_queries, 1);
721 assert_eq!(after.cache_hits, 1);
722 assert_eq!(after.cache_misses, 0);
723 assert_eq!(after.cached_graphs, 1);
724
725 manager.shutdown().unwrap();
726 }
727
728 #[test]
729 fn query_records_session_cache_miss_then_hit() {
730 let temp = tempdir().unwrap();
731 write_empty_graph(temp.path()).unwrap();
732
733 let manager = SessionManager::new().unwrap();
734
735 let first = manager.query(temp.path(), "kind:function").unwrap();
736 assert_eq!(first.len(), 0);
737 let after_first = manager.stats();
738 assert_eq!(after_first.total_queries, 1);
739 assert_eq!(after_first.cache_misses, 1);
740 assert_eq!(after_first.cache_hits, 0);
741 assert_eq!(after_first.cached_graphs, 1);
742
743 let second = manager.query(temp.path(), "kind:function").unwrap();
744 assert_eq!(second.len(), 0);
745 let after_second = manager.stats();
746 assert_eq!(after_second.total_queries, 2);
747 assert_eq!(after_second.cache_misses, 1);
748 assert_eq!(after_second.cache_hits, 1);
749 assert_eq!(after_second.cached_graphs, 1);
750
751 manager.shutdown().unwrap();
752 }
753
754 #[test]
755 fn preload_warms_query_cache() {
756 let temp = tempdir().unwrap();
757 write_empty_graph(temp.path()).unwrap();
758
759 let manager = SessionManager::new().unwrap();
760 manager.preload(temp.path()).unwrap();
761
762 let results = manager.query(temp.path(), "kind:function").unwrap();
763 assert_eq!(results.len(), 0);
764
765 let stats = manager.stats();
766 assert_eq!(stats.total_queries, 1);
767 assert_eq!(stats.cache_misses, 0);
768 assert_eq!(stats.cache_hits, 1);
769 assert_eq!(stats.cached_graphs, 1);
770
771 manager.shutdown().unwrap();
772 }
773
774 #[test]
775 fn second_access_hits_cache() {
776 let temp = tempdir().unwrap();
777 write_empty_graph(temp.path()).unwrap();
778
779 let manager = SessionManager::new().unwrap();
780
781 manager.get_graph(temp.path()).unwrap();
782 manager.get_graph(temp.path()).unwrap();
783
784 let stats = manager.stats();
785 assert_eq!(stats.total_queries, 2);
786 assert_eq!(stats.cache_misses, 1);
787 assert_eq!(stats.cache_hits, 1);
788 assert_eq!(stats.cached_graphs, 1);
789
790 manager.shutdown().unwrap();
791 }
792
793 #[test]
794 fn concurrent_access_shares_cache() {
795 let temp = tempdir().unwrap();
796 write_empty_graph(temp.path()).unwrap();
797
798 let manager = Arc::new(SessionManager::new().unwrap());
799 let path = temp.path().to_path_buf();
800 let barrier = Arc::new(Barrier::new(7));
801
802 let handles: Vec<_> = (0..6)
803 .map(|_| {
804 let mgr = Arc::clone(&manager);
805 let path = path.clone();
806 let barrier = Arc::clone(&barrier);
807 thread::spawn(move || {
808 barrier.wait();
809 mgr.get_graph(&path).unwrap();
810 })
811 })
812 .collect();
813
814 barrier.wait();
815 for handle in handles {
816 handle.join().unwrap();
817 }
818
819 let stats = manager.stats();
820 assert_eq!(stats.total_queries, 6);
821 assert_eq!(stats.cache_misses, 1);
822 assert_eq!(stats.cache_hits, 5);
823 assert_eq!(manager.disk_loads.load(Ordering::Relaxed), 1);
824 assert!(
825 manager.load_locks.is_empty(),
826 "concurrent singleflight lock must be reclaimed after waiters finish"
827 );
828
829 let manager = Arc::into_inner(manager).expect("no outstanding references");
830 manager.shutdown().unwrap();
831 }
832
833 #[test]
834 fn cold_load_lock_is_reclaimed_after_successful_load() {
835 let temp = tempdir().unwrap();
836 write_empty_graph(temp.path()).unwrap();
837
838 let manager = SessionManager::new().unwrap();
839 manager.get_graph(temp.path()).unwrap();
840
841 assert!(
842 manager.load_locks.is_empty(),
843 "singleflight locks must not outlive completed loads"
844 );
845
846 manager.shutdown().unwrap();
847 }
848
849 #[test]
850 fn preload_lock_is_reclaimed_after_cache_hit_double_check() {
851 let temp = tempdir().unwrap();
852 write_empty_graph(temp.path()).unwrap();
853
854 let manager = SessionManager::new().unwrap();
855 manager.preload(temp.path()).unwrap();
856 manager.invalidate(temp.path()).unwrap();
857 manager.preload(temp.path()).unwrap();
858
859 assert!(
860 manager.load_locks.is_empty(),
861 "preload must reclaim singleflight locks after load completion"
862 );
863
864 manager.shutdown().unwrap();
865 }
866
867 #[test]
868 fn failed_load_lock_is_reclaimed_for_retry() {
869 let temp = tempdir().unwrap();
870 let missing = temp.path().join("missing-index");
871 fs::create_dir_all(&missing).unwrap();
872
873 let manager = SessionManager::new().unwrap();
874 let err = manager
875 .get_graph(&missing)
876 .expect_err("missing graph snapshot should fail");
877 assert!(matches!(err, SessionError::IndexMetadata { .. }));
878 assert!(
879 manager.load_locks.is_empty(),
880 "failed loads must not leave stale singleflight locks"
881 );
882
883 write_empty_graph(&missing).unwrap();
884 manager.get_graph(&missing).unwrap();
885 assert_eq!(manager.disk_loads.load(Ordering::Relaxed), 1);
886 assert!(
887 manager.load_locks.is_empty(),
888 "retry after failure must also reclaim its singleflight lock"
889 );
890
891 manager.shutdown().unwrap();
892 }
893
894 #[test]
895 fn load_locks_stay_bounded_across_many_distinct_paths() {
896 let temp = tempdir().unwrap();
897 let manager = SessionManager::new().unwrap();
898
899 for index in 0..16 {
900 let repo = temp.path().join(format!("repo-{index}"));
901 write_empty_graph(&repo).unwrap();
902 manager.get_graph(&repo).unwrap();
903 assert!(
904 manager.load_locks.is_empty(),
905 "completed load for {} left a singleflight lock behind",
906 repo.display()
907 );
908 manager.invalidate(&repo).unwrap();
909 }
910
911 assert!(
912 manager.load_locks.is_empty(),
913 "distinct workspace churn must not grow the singleflight lock map"
914 );
915
916 manager.shutdown().unwrap();
917 }
918
919 #[test]
920 #[serial]
921 fn relative_and_absolute_paths_share_cache_entry() {
922 let temp = tempdir().unwrap();
923 let repo = temp.path().join("repo");
924 write_empty_graph(&repo).unwrap();
925
926 let original_cwd = std::env::current_dir().unwrap();
927 std::env::set_current_dir(temp.path()).unwrap();
928
929 let manager = SessionManager::new().unwrap();
930 manager.get_graph(Path::new("repo")).unwrap();
931 manager.get_graph(&repo).unwrap();
932
933 std::env::set_current_dir(original_cwd).unwrap();
934
935 let stats = manager.stats();
936 assert_eq!(stats.cached_graphs, 1);
937 assert_eq!(stats.cache_misses, 1);
938 assert_eq!(stats.cache_hits, 1);
939 assert!(manager.cache.contains_key(&canonical(&repo)));
940
941 manager.shutdown().unwrap();
942 }
943
944 #[test]
945 #[cfg(unix)]
946 fn symlink_equivalent_paths_share_cache_entry() {
947 let temp = tempdir().unwrap();
948 let repo = temp.path().join("repo");
949 let link = temp.path().join("repo-link");
950 write_empty_graph(&repo).unwrap();
951 std::os::unix::fs::symlink(&repo, &link).unwrap();
952
953 let manager = SessionManager::new().unwrap();
954 manager.get_graph(&repo).unwrap();
955 manager.get_graph(&link).unwrap();
956
957 let stats = manager.stats();
958 assert_eq!(stats.cached_graphs, 1);
959 assert_eq!(stats.cache_misses, 1);
960 assert_eq!(stats.cache_hits, 1);
961 assert_eq!(manager.disk_loads.load(Ordering::Relaxed), 1);
962
963 manager.shutdown().unwrap();
964 }
965
966 #[test]
967 fn invalid_query_does_not_load_or_mutate_stats() {
968 let temp = tempdir().unwrap();
969 write_empty_graph(temp.path()).unwrap();
970
971 let manager = SessionManager::new().unwrap();
972 let err = manager
973 .query(temp.path(), "kind:")
974 .expect_err("invalid query should fail before graph load");
975 assert!(matches!(err, SessionError::QueryParse(_)));
976
977 let stats = manager.stats();
978 assert_eq!(stats.total_queries, 0);
979 assert_eq!(stats.cache_misses, 0);
980 assert_eq!(stats.cache_hits, 0);
981 assert_eq!(stats.cached_graphs, 0);
982 assert_eq!(manager.disk_loads.load(Ordering::Relaxed), 0);
983
984 manager.shutdown().unwrap();
985 }
986
987 #[test]
988 fn invalidate_removes_cached_graph() {
989 let temp = tempdir().unwrap();
990 write_empty_graph(temp.path()).unwrap();
991
992 let manager = SessionManager::new().unwrap();
993 manager.get_graph(temp.path()).unwrap();
994 assert_eq!(manager.stats().cached_graphs, 1);
995
996 manager.invalidate(temp.path()).unwrap();
997 assert_eq!(manager.stats().cached_graphs, 0);
998
999 manager.shutdown().unwrap();
1000 }
1001
1002 #[test]
1003 #[cfg(unix)]
1004 fn invalidate_alias_unwatches_registered_manifest_path() {
1005 let temp = tempdir().unwrap();
1006 let repo = temp.path().join("repo");
1007 let link = temp.path().join("repo-link");
1008 write_empty_graph(&repo).unwrap();
1009 std::os::unix::fs::symlink(&repo, &link).unwrap();
1010
1011 let manager = SessionManager::new().unwrap();
1012 manager.preload(&repo).unwrap();
1013
1014 let manifest = GraphStorage::new(&canonical(&repo))
1015 .manifest_path()
1016 .to_path_buf();
1017 assert!(
1018 manager
1019 .watcher
1020 .lock()
1021 .expect("watcher lock poisoned in test")
1022 .watched_paths()
1023 .contains(&manifest)
1024 );
1025
1026 manager.invalidate(&link).unwrap();
1027
1028 let watched = manager
1029 .watcher
1030 .lock()
1031 .expect("watcher lock poisoned in test")
1032 .watched_paths();
1033 assert!(
1034 !watched.contains(&manifest),
1035 "manual invalidation through an alias must unwatch registered manifest path"
1036 );
1037 assert_eq!(manager.stats().cached_graphs, 0);
1038
1039 manager.shutdown().unwrap();
1040 }
1041
1042 #[test]
1043 fn watcher_trigger_invalidates_canonical_cache_key() {
1044 let temp = tempdir().unwrap();
1045 write_empty_graph(temp.path()).unwrap();
1046
1047 let manager = SessionManager::new().unwrap();
1048 manager.preload(temp.path()).unwrap();
1049
1050 let cache_key = canonical(temp.path());
1051 assert!(manager.cache.contains_key(&cache_key));
1052
1053 let manifest = GraphStorage::new(&cache_key).manifest_path().to_path_buf();
1054 let triggered = manager
1055 .watcher
1056 .lock()
1057 .expect("watcher lock poisoned in test")
1058 .trigger_for_test(&manifest);
1059
1060 assert!(
1061 triggered,
1062 "expected manifest watcher callback to be registered"
1063 );
1064 assert!(
1065 !manager.cache.contains_key(&cache_key),
1066 "watcher callback must invalidate canonical cache key"
1067 );
1068
1069 manager.shutdown().unwrap();
1070 }
1071
1072 #[test]
1073 fn lru_eviction_removes_oldest_entry() {
1074 let temp = tempdir().unwrap();
1075 let base = temp.path();
1076
1077 let config = SessionConfig {
1078 max_cached_indexes: 2,
1079 ..SessionConfig::default()
1080 };
1081 let manager = SessionManager::with_config(config).unwrap();
1082
1083 let repo1 = base.join("repo1");
1084 let repo2 = base.join("repo2");
1085 let repo3 = base.join("repo3");
1086 write_empty_graph(&repo1).unwrap();
1087 write_empty_graph(&repo2).unwrap();
1088 write_empty_graph(&repo3).unwrap();
1089
1090 manager.get_graph(&repo1).unwrap();
1091 manager.get_graph(&repo2).unwrap();
1092
1093 manager.get_graph(&repo2).unwrap();
1095 manager.get_graph(&repo3).unwrap();
1096
1097 assert_eq!(manager.stats().cached_graphs, 2);
1098 assert!(manager.cache.contains_key(&canonical(&repo2)));
1099 assert!(manager.cache.contains_key(&canonical(&repo3)));
1100 assert!(!manager.cache.contains_key(&canonical(&repo1)));
1101
1102 manager.shutdown().unwrap();
1103 }
1104
1105 #[test]
1106 fn evict_stale_purges_idle_entries() {
1107 let temp = tempdir().unwrap();
1108 write_empty_graph(temp.path()).unwrap();
1109
1110 let config = SessionConfig {
1111 idle_timeout: Duration::from_millis(100),
1112 cleanup_interval: Duration::from_secs(3600),
1113 ..SessionConfig::default()
1114 };
1115 let manager = SessionManager::with_config(config).unwrap();
1116
1117 manager.get_graph(temp.path()).unwrap();
1118 assert_eq!(manager.stats().cached_graphs, 1);
1119
1120 if let Some(entry) = manager.cache.get(&canonical(temp.path())) {
1122 entry.value().set_last_accessed(
1123 Instant::now()
1124 .checked_sub(Duration::from_millis(200))
1125 .unwrap(),
1126 );
1127 }
1128
1129 let evicted = manager.evict_stale(Instant::now()).unwrap();
1130 assert_eq!(evicted, 1);
1131 assert_eq!(manager.stats().cached_graphs, 0);
1132
1133 manager.shutdown().unwrap();
1134 }
1135
1136 #[test]
1137 fn register_watcher_uses_manifest_path() {
1138 let temp = tempdir().unwrap();
1145 write_empty_graph(temp.path()).unwrap();
1146
1147 let manager = SessionManager::new().unwrap();
1148 manager.get_graph(temp.path()).unwrap();
1149
1150 let storage = GraphStorage::new(&canonical(temp.path()));
1151 let watched = manager
1152 .watcher
1153 .lock()
1154 .expect("watcher lock poisoned in test")
1155 .watched_paths();
1156
1157 assert!(
1158 watched.contains(&storage.manifest_path().to_path_buf()),
1159 "watcher must be registered for manifest path {} (registered: {:?})",
1160 storage.manifest_path().display(),
1161 watched,
1162 );
1163 assert!(
1164 !watched.contains(&storage.snapshot_path().to_path_buf()),
1165 "watcher must NOT be registered for snapshot path {} (registered: {:?})",
1166 storage.snapshot_path().display(),
1167 watched,
1168 );
1169
1170 manager.shutdown().unwrap();
1171 }
1172
1173 #[test]
1174 fn evict_lru_unwatches_manifest_path() {
1175 let temp = tempdir().unwrap();
1179 let base = temp.path();
1180
1181 let config = SessionConfig {
1182 max_cached_indexes: 1,
1183 ..SessionConfig::default()
1184 };
1185 let manager = SessionManager::with_config(config).unwrap();
1186
1187 let repo1 = base.join("repo1");
1188 let repo2 = base.join("repo2");
1189 write_empty_graph(&repo1).unwrap();
1190 write_empty_graph(&repo2).unwrap();
1191
1192 manager.get_graph(&repo1).unwrap();
1193 manager.get_graph(&repo2).unwrap(); let watched = manager
1196 .watcher
1197 .lock()
1198 .expect("watcher lock poisoned in test")
1199 .watched_paths();
1200
1201 let repo1_manifest = GraphStorage::new(&canonical(&repo1))
1202 .manifest_path()
1203 .to_path_buf();
1204 let repo2_manifest = GraphStorage::new(&canonical(&repo2))
1205 .manifest_path()
1206 .to_path_buf();
1207 assert!(
1208 !watched.contains(&repo1_manifest),
1209 "evicted workspace's manifest watch must be released; still watching: {watched:?}",
1210 );
1211 assert!(
1212 watched.contains(&repo2_manifest),
1213 "current workspace must remain watched; registered: {watched:?}",
1214 );
1215
1216 manager.shutdown().unwrap();
1217 }
1218
1219 #[test]
1220 #[ignore = "flaky: timing-sensitive file watcher test"]
1221 fn file_changes_trigger_invalidation() {
1222 let temp = tempdir().unwrap();
1223 write_empty_graph(temp.path()).unwrap();
1224
1225 let manager = SessionManager::new().unwrap();
1226 manager.get_graph(temp.path()).unwrap();
1227 let cache_key = canonical(temp.path());
1228 assert!(manager.cache.contains_key(&cache_key));
1229
1230 let storage = GraphStorage::new(&cache_key);
1233 std::fs::write(storage.manifest_path(), b"modified").unwrap();
1234
1235 let evicted = wait_until(watcher_timeout(), || {
1236 manager
1237 .watcher
1238 .lock()
1239 .expect("watcher lock poisoned in test")
1240 .process_events()
1241 .expect("watcher failed to process events in test");
1242 !manager.cache.contains_key(&cache_key)
1243 });
1244 assert!(evicted, "expected watcher to invalidate cache entry");
1245
1246 manager.shutdown().unwrap();
1247 }
1248
1249 #[test]
1250 #[ignore = "flaky: timing-sensitive background thread test"]
1251 fn background_thread_processes_watcher_events() {
1252 let temp = tempdir().unwrap();
1253 write_empty_graph(temp.path()).unwrap();
1254
1255 let config = SessionConfig {
1256 cleanup_interval: Duration::from_millis(50),
1257 ..SessionConfig::default()
1258 };
1259 let manager = SessionManager::with_config(config).unwrap();
1260
1261 manager.get_graph(temp.path()).unwrap();
1262 let cache_key = canonical(temp.path());
1263 let storage = GraphStorage::new(&cache_key);
1264 std::fs::write(storage.manifest_path(), b"changed").unwrap();
1265
1266 let evicted = wait_until(background_timeout(), || {
1267 !manager.cache.contains_key(&cache_key)
1268 });
1269 assert!(
1270 evicted,
1271 "background thread failed to remove watcher-invalidated entry"
1272 );
1273
1274 manager.shutdown().unwrap();
1275 }
1276
1277 #[test]
1278 fn background_thread_evicts_idle_entries() {
1279 let temp = tempdir().unwrap();
1280 write_empty_graph(temp.path()).unwrap();
1281
1282 let config = SessionConfig {
1283 idle_timeout: Duration::from_millis(50),
1284 cleanup_interval: Duration::from_millis(30),
1285 ..SessionConfig::default()
1286 };
1287 let manager = SessionManager::with_config(config).unwrap();
1288
1289 manager.get_graph(temp.path()).unwrap();
1290 assert_eq!(manager.stats().cached_graphs, 1);
1291
1292 if let Some(entry) = manager.cache.get(&canonical(temp.path())) {
1293 entry.value().set_last_accessed(
1294 Instant::now()
1295 .checked_sub(Duration::from_millis(200))
1296 .unwrap(),
1297 );
1298 }
1299
1300 let evicted = wait_until(background_timeout(), || {
1301 !manager.cache.contains_key(&canonical(temp.path()))
1302 });
1303 assert!(
1304 evicted,
1305 "background eviction thread did not remove idle entry"
1306 );
1307
1308 manager.shutdown().unwrap();
1309 }
1310}