1use std::fs;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18use log::{debug, info};
19
20use crate::graph::CodeGraph;
21use crate::graph::unified::persistence::{GraphStorage, load_from_path};
22use crate::plugin::PluginManager;
23use crate::query::QueryExecutor;
24
25use super::cache::CachedIndex;
26use super::watcher::FileWatcher;
27use super::{SessionConfig, SessionError, SessionResult};
28
29pub struct SessionManager {
31 cache: Arc<DashMap<PathBuf, CachedIndex>>,
32 config: SessionConfig,
33 watcher: Arc<Mutex<FileWatcher>>,
34 cleanup_handle: Option<JoinHandle<()>>,
35 shutdown: Arc<AtomicBool>,
36 total_queries: Arc<AtomicU64>,
37 cache_hits: Arc<AtomicU64>,
38 cache_misses: Arc<AtomicU64>,
39 query_executor: QueryExecutor,
41}
42
43impl SessionManager {
44 pub fn new() -> SessionResult<Self> {
55 Self::with_config_and_plugins(SessionConfig::default(), PluginManager::new())
56 }
57
58 pub fn with_plugin_manager(plugin_manager: PluginManager) -> SessionResult<Self> {
72 Self::with_config_and_plugins(SessionConfig::default(), plugin_manager)
73 }
74
75 pub fn with_config(config: SessionConfig) -> SessionResult<Self> {
84 Self::with_config_and_plugins(config, PluginManager::new())
85 }
86
87 pub fn with_config_and_plugins(
101 config: SessionConfig,
102 plugin_manager: PluginManager,
103 ) -> SessionResult<Self> {
104 let watcher = if config.enable_file_watching {
105 FileWatcher::new()?
106 } else {
107 FileWatcher::disabled()
108 };
109
110 let cache = Arc::new(DashMap::new());
111 let watcher = Arc::new(Mutex::new(watcher));
112 let shutdown = Arc::new(AtomicBool::new(false));
113
114 let cleanup_interval = config.cleanup_interval;
115 let idle_timeout = config.idle_timeout;
116
117 let cleanup_handle = {
118 let cache = Arc::clone(&cache);
119 let watcher_clone = Arc::clone(&watcher);
120 let shutdown_flag = Arc::clone(&shutdown);
121
122 thread::Builder::new()
123 .name("sqry-session-cleanup".into())
124 .spawn(move || {
125 while !shutdown_flag.load(Ordering::Relaxed) {
126 if let Ok(mut guard) = watcher_clone.lock()
127 && let Err(err) = guard.process_events()
128 {
129 debug!("failed to process session watcher events: {err}");
130 }
131
132 let now = Instant::now();
133 if let Err(err) =
134 Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
135 {
136 debug!("failed to evict stale sessions: {err}");
137 }
138
139 thread::park_timeout(cleanup_interval);
140 }
141
142 if let Ok(mut guard) = watcher_clone.lock()
143 && let Err(err) = guard.process_events()
144 {
145 debug!("failed to process session watcher events during shutdown: {err}");
146 }
147 let now = Instant::now();
148 if let Err(err) =
149 Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
150 {
151 debug!("failed to evict stale sessions during shutdown: {err}");
152 }
153 })
154 .map_err(SessionError::SpawnThread)?
155 };
156
157 let query_executor = QueryExecutor::with_plugin_manager(plugin_manager);
159
160 Ok(Self {
161 cache,
162 config,
163 watcher,
164 cleanup_handle: Some(cleanup_handle),
165 shutdown,
166 total_queries: Arc::new(AtomicU64::new(0)),
167 cache_hits: Arc::new(AtomicU64::new(0)),
168 cache_misses: Arc::new(AtomicU64::new(0)),
169 query_executor,
170 })
171 }
172
173 pub fn get_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
179 self.get_or_load_graph(path)
180 }
181
182 pub fn query(
193 &self,
194 path: &Path,
195 query_str: &str,
196 ) -> SessionResult<crate::query::results::QueryResults> {
197 let results = self
199 .query_executor
200 .execute_on_graph(query_str, path)
201 .map_err(|e| {
202 let error_msg = e.to_string();
204 if error_msg.contains("parse")
205 || error_msg.contains("unexpected")
206 || error_msg.contains("expected")
207 {
208 SessionError::QueryParse(error_msg)
209 } else {
210 SessionError::QueryExecution(e)
211 }
212 })?;
213
214 Ok(results)
215 }
216
217 pub fn invalidate(&self, path: &Path) -> SessionResult<()> {
223 self.cache.remove(path);
224 if let Ok(mut watcher) = self.watcher.lock() {
225 watcher.unwatch(path)?;
226 }
227 Ok(())
228 }
229
230 pub fn preload(&self, path: &Path) -> SessionResult<()> {
237 if self.cache.contains_key(path) {
238 return Ok(());
239 }
240
241 if self.cache.len() >= self.config.max_cached_indexes {
242 self.evict_lru();
243 }
244
245 self.load_graph_from_disk(path)?;
246 Ok(())
247 }
248
249 #[must_use]
251 pub fn stats(&self) -> SessionStats {
252 SessionStats {
253 cached_graphs: self.cache.len(),
254 total_queries: self.total_queries.load(Ordering::Relaxed),
255 cache_hits: self.cache_hits.load(Ordering::Relaxed),
256 cache_misses: self.cache_misses.load(Ordering::Relaxed),
257 total_memory_mb: self.cache.len() * 51,
258 }
259 }
260
261 pub fn evict_stale(&self, now: Instant) -> SessionResult<usize> {
267 Self::evict_stale_for(&self.cache, &self.watcher, self.config.idle_timeout, now)
268 }
269
270 fn get_or_load_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
271 debug_assert!(
272 self.config.max_cached_indexes > 0,
273 "SessionConfig::max_cached_indexes must be at least 1"
274 );
275 self.total_queries.fetch_add(1, Ordering::Relaxed);
276
277 if let Some(entry) = self.cache.get_mut(path) {
278 entry.value().access();
279 self.cache_hits.fetch_add(1, Ordering::Relaxed);
280 return Ok(Arc::clone(&entry.value().graph));
281 }
282
283 self.cache_misses.fetch_add(1, Ordering::Relaxed);
284
285 if self.cache.len() >= self.config.max_cached_indexes {
286 self.evict_lru();
287 }
288
289 let graph = self.load_graph_from_disk(path)?;
290
291 if let Some(entry) = self.cache.get_mut(path) {
292 entry.value().access();
293 }
294
295 Ok(graph)
296 }
297
298 fn load_graph_from_disk(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
299 let storage = GraphStorage::new(path);
300 let snapshot_path = storage.snapshot_path();
301
302 let metadata =
303 fs::metadata(snapshot_path).map_err(|source| SessionError::IndexMetadata {
304 path: snapshot_path.to_path_buf(),
305 source,
306 })?;
307 let file_mtime = metadata
308 .modified()
309 .map_err(|source| SessionError::IndexMetadata {
310 path: snapshot_path.to_path_buf(),
311 source,
312 })?;
313
314 let graph = load_from_path(snapshot_path, Some(self.query_executor.plugin_manager()))
315 .map_err(|source| SessionError::IndexLoad {
316 path: snapshot_path.to_path_buf(),
317 source: source.into(),
318 })?;
319 let arc_graph = Arc::new(graph);
320
321 let cached = CachedIndex::new(Arc::clone(&arc_graph), file_mtime);
322 self.cache.insert(path.to_path_buf(), cached);
323
324 self.register_watcher(path, storage.manifest_path());
325
326 Ok(arc_graph)
327 }
328
329 fn register_watcher(&self, workspace_path: &Path, manifest_path: &Path) {
338 if let Ok(mut watcher) = self.watcher.lock() {
339 let cache = Arc::clone(&self.cache);
340 let callback_path = workspace_path.to_path_buf();
341 let watch_path = manifest_path.to_path_buf();
342 if let Err(err) = watcher.watch(watch_path.clone(), move || {
343 cache.remove(&callback_path);
344 info!(
345 "Invalidated session cache after graph change: {}",
346 callback_path.display()
347 );
348 }) {
349 debug!(
350 "failed to register file watcher for {}: {err}",
351 watch_path.display()
352 );
353 }
354 }
355 }
356
357 fn evict_lru(&self) {
359 let mut oldest: Option<(PathBuf, Instant)> = None;
360
361 for entry in self.cache.iter() {
362 let last_accessed = entry.value().last_accessed();
363 if oldest
364 .as_ref()
365 .is_none_or(|(_, instant)| last_accessed < *instant)
366 {
367 oldest = Some((entry.key().clone(), last_accessed));
368 }
369 }
370
371 if let Some((path, _)) = oldest {
372 self.cache.remove(&path);
373 debug!("evicted LRU session cache entry: {}", path.display());
374 if let Ok(mut watcher) = self.watcher.lock() {
375 let storage = GraphStorage::new(&path);
379 if let Err(err) = watcher.unwatch(storage.manifest_path()) {
380 debug!("failed to unwatch session path {}: {err}", path.display());
381 }
382 }
383 }
384 }
385
386 fn evict_stale_for(
387 cache: &Arc<DashMap<PathBuf, CachedIndex>>,
388 watcher: &Arc<Mutex<FileWatcher>>,
389 timeout: Duration,
390 now: Instant,
391 ) -> SessionResult<usize> {
392 let mut to_remove = Vec::new();
393
394 for entry in cache.iter() {
395 let idle = now.duration_since(entry.value().last_accessed());
396 if idle > timeout {
397 to_remove.push(entry.key().clone());
398 }
399 }
400
401 for path in &to_remove {
402 cache.remove(path);
403 }
404
405 if !to_remove.is_empty()
406 && let Ok(mut guard) = watcher.lock()
407 {
408 for path in &to_remove {
409 let storage = GraphStorage::new(path);
414 guard.unwatch(storage.manifest_path())?;
415 }
416 }
417
418 Ok(to_remove.len())
419 }
420
421 fn stop_worker(&mut self) {
422 self.shutdown.store(true, Ordering::Relaxed);
423
424 if let Some(handle) = self.cleanup_handle.take() {
425 handle.thread().unpark();
426 if let Err(err) = handle.join() {
427 debug!("session cleanup thread terminated with error: {err:?}");
428 }
429 }
430 }
431
432 pub fn shutdown(mut self) -> SessionResult<()> {
438 self.stop_worker();
439 Ok(())
440 }
441}
442
443impl Drop for SessionManager {
444 fn drop(&mut self) {
445 self.stop_worker();
446 }
447}
448
449#[derive(Debug, Clone)]
451pub struct SessionStats {
452 pub cached_graphs: usize,
454 pub total_queries: u64,
456 pub cache_hits: u64,
458 pub cache_misses: u64,
460 pub total_memory_mb: usize,
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use crate::graph::unified::persistence::save_to_path;
468 use std::sync::Arc;
469 use std::thread;
470 use std::time::{Duration, Instant};
471 use tempfile::tempdir;
472
473 fn write_empty_graph(dir: &Path) -> SessionResult<()> {
474 let storage = GraphStorage::new(dir);
475 fs::create_dir_all(storage.graph_dir()).map_err(|source| SessionError::IndexMetadata {
476 path: storage.graph_dir().to_path_buf(),
477 source,
478 })?;
479 let graph = CodeGraph::new();
480 save_to_path(&graph, storage.snapshot_path()).map_err(|source| {
481 SessionError::IndexLoad {
482 path: storage.snapshot_path().to_path_buf(),
483 source: source.into(),
484 }
485 })?;
486 fs::write(storage.manifest_path(), b"{}").map_err(|source| {
492 SessionError::IndexMetadata {
493 path: storage.manifest_path().to_path_buf(),
494 source,
495 }
496 })?;
497 Ok(())
498 }
499
500 fn watcher_timeout() -> Duration {
501 let base = if cfg!(target_os = "macos") {
502 Duration::from_secs(3)
503 } else {
504 Duration::from_secs(2)
505 };
506
507 if std::env::var("CI").is_ok() {
508 base * 2
509 } else {
510 base
511 }
512 }
513
514 fn background_timeout() -> Duration {
515 let base = if cfg!(target_os = "macos") {
516 Duration::from_secs(5)
517 } else {
518 Duration::from_secs(3)
519 };
520
521 if std::env::var("CI").is_ok() {
522 base * 2
523 } else {
524 base
525 }
526 }
527
528 fn wait_until<F>(timeout: Duration, mut predicate: F) -> bool
529 where
530 F: FnMut() -> bool,
531 {
532 let deadline = Instant::now() + timeout;
533 loop {
534 if predicate() {
535 return true;
536 }
537 if Instant::now() >= deadline {
538 return false;
539 }
540 thread::sleep(Duration::from_millis(50));
541 }
542 }
543
544 #[test]
545 fn get_graph_loads_and_updates_stats() {
546 let temp = tempdir().unwrap();
547 write_empty_graph(temp.path()).unwrap();
548
549 let manager = SessionManager::new().unwrap();
550
551 let graph = manager.get_graph(temp.path()).unwrap();
552 assert_eq!(graph.snapshot().nodes().len(), 0);
553
554 let stats = manager.stats();
555 assert_eq!(stats.total_queries, 1);
556 assert_eq!(stats.cache_misses, 1);
557 assert_eq!(stats.cache_hits, 0);
558 assert_eq!(stats.cached_graphs, 1);
559
560 manager.shutdown().unwrap();
561 }
562
563 #[test]
564 fn get_graph_missing_returns_error() {
565 let temp = tempdir().unwrap();
566
567 let manager = SessionManager::new().unwrap();
568 let err = manager
569 .get_graph(temp.path())
570 .expect_err("get_graph should fail without graph");
571
572 assert!(matches!(err, SessionError::IndexMetadata { .. }));
573 manager.shutdown().unwrap();
574 }
575
576 #[test]
577 fn preload_does_not_affect_stats() {
578 let temp = tempdir().unwrap();
579 write_empty_graph(temp.path()).unwrap();
580
581 let manager = SessionManager::new().unwrap();
582 manager.preload(temp.path()).unwrap();
583
584 let stats = manager.stats();
585 assert_eq!(stats.total_queries, 0);
586 assert_eq!(stats.cache_hits, 0);
587 assert_eq!(stats.cache_misses, 0);
588 assert_eq!(stats.cached_graphs, 1);
589
590 manager.get_graph(temp.path()).unwrap();
591 let after = manager.stats();
592 assert_eq!(after.total_queries, 1);
593 assert_eq!(after.cache_hits, 1);
594 assert_eq!(after.cache_misses, 0);
595 assert_eq!(after.cached_graphs, 1);
596
597 manager.shutdown().unwrap();
598 }
599
600 #[test]
601 fn second_access_hits_cache() {
602 let temp = tempdir().unwrap();
603 write_empty_graph(temp.path()).unwrap();
604
605 let manager = SessionManager::new().unwrap();
606
607 manager.get_graph(temp.path()).unwrap();
608 manager.get_graph(temp.path()).unwrap();
609
610 let stats = manager.stats();
611 assert_eq!(stats.total_queries, 2);
612 assert_eq!(stats.cache_misses, 1);
613 assert_eq!(stats.cache_hits, 1);
614 assert_eq!(stats.cached_graphs, 1);
615
616 manager.shutdown().unwrap();
617 }
618
619 #[test]
620 fn concurrent_access_shares_cache() {
621 let temp = tempdir().unwrap();
622 write_empty_graph(temp.path()).unwrap();
623
624 let manager = Arc::new(SessionManager::new().unwrap());
625 let path = temp.path().to_path_buf();
626
627 let handles: Vec<_> = (0..6)
628 .map(|_| {
629 let mgr = Arc::clone(&manager);
630 let path = path.clone();
631 thread::spawn(move || {
632 mgr.get_graph(&path).unwrap();
633 })
634 })
635 .collect();
636
637 for handle in handles {
638 handle.join().unwrap();
639 }
640
641 let manager = Arc::into_inner(manager).expect("no outstanding references");
642 manager.shutdown().unwrap();
643 }
644
645 #[test]
646 fn invalidate_removes_cached_graph() {
647 let temp = tempdir().unwrap();
648 write_empty_graph(temp.path()).unwrap();
649
650 let manager = SessionManager::new().unwrap();
651 manager.get_graph(temp.path()).unwrap();
652 assert_eq!(manager.stats().cached_graphs, 1);
653
654 manager.invalidate(temp.path()).unwrap();
655 assert_eq!(manager.stats().cached_graphs, 0);
656
657 manager.shutdown().unwrap();
658 }
659
660 #[test]
661 fn lru_eviction_removes_oldest_entry() {
662 let temp = tempdir().unwrap();
663 let base = temp.path();
664
665 let config = SessionConfig {
666 max_cached_indexes: 2,
667 ..SessionConfig::default()
668 };
669 let manager = SessionManager::with_config(config).unwrap();
670
671 let repo1 = base.join("repo1");
672 let repo2 = base.join("repo2");
673 let repo3 = base.join("repo3");
674 write_empty_graph(&repo1).unwrap();
675 write_empty_graph(&repo2).unwrap();
676 write_empty_graph(&repo3).unwrap();
677
678 manager.get_graph(&repo1).unwrap();
679 manager.get_graph(&repo2).unwrap();
680
681 manager.get_graph(&repo2).unwrap();
683 manager.get_graph(&repo3).unwrap();
684
685 assert_eq!(manager.stats().cached_graphs, 2);
686 assert!(manager.cache.contains_key(&repo2));
687 assert!(manager.cache.contains_key(&repo3));
688 assert!(!manager.cache.contains_key(&repo1));
689
690 manager.shutdown().unwrap();
691 }
692
693 #[test]
694 fn evict_stale_purges_idle_entries() {
695 let temp = tempdir().unwrap();
696 write_empty_graph(temp.path()).unwrap();
697
698 let config = SessionConfig {
699 idle_timeout: Duration::from_millis(100),
700 cleanup_interval: Duration::from_secs(3600),
701 ..SessionConfig::default()
702 };
703 let manager = SessionManager::with_config(config).unwrap();
704
705 manager.get_graph(temp.path()).unwrap();
706 assert_eq!(manager.stats().cached_graphs, 1);
707
708 if let Some(entry) = manager.cache.get(temp.path()) {
710 entry.value().set_last_accessed(
711 Instant::now()
712 .checked_sub(Duration::from_millis(200))
713 .unwrap(),
714 );
715 }
716
717 let evicted = manager.evict_stale(Instant::now()).unwrap();
718 assert_eq!(evicted, 1);
719 assert_eq!(manager.stats().cached_graphs, 0);
720
721 manager.shutdown().unwrap();
722 }
723
724 #[test]
725 fn register_watcher_uses_manifest_path() {
726 let temp = tempdir().unwrap();
733 write_empty_graph(temp.path()).unwrap();
734
735 let manager = SessionManager::new().unwrap();
736 manager.get_graph(temp.path()).unwrap();
737
738 let storage = GraphStorage::new(temp.path());
739 let watched = manager
740 .watcher
741 .lock()
742 .expect("watcher lock poisoned in test")
743 .watched_paths();
744
745 assert!(
746 watched.contains(&storage.manifest_path().to_path_buf()),
747 "watcher must be registered for manifest path {} (registered: {:?})",
748 storage.manifest_path().display(),
749 watched,
750 );
751 assert!(
752 !watched.contains(&storage.snapshot_path().to_path_buf()),
753 "watcher must NOT be registered for snapshot path {} (registered: {:?})",
754 storage.snapshot_path().display(),
755 watched,
756 );
757
758 manager.shutdown().unwrap();
759 }
760
761 #[test]
762 fn evict_lru_unwatches_manifest_path() {
763 let temp = tempdir().unwrap();
767 let base = temp.path();
768
769 let config = SessionConfig {
770 max_cached_indexes: 1,
771 ..SessionConfig::default()
772 };
773 let manager = SessionManager::with_config(config).unwrap();
774
775 let repo1 = base.join("repo1");
776 let repo2 = base.join("repo2");
777 write_empty_graph(&repo1).unwrap();
778 write_empty_graph(&repo2).unwrap();
779
780 manager.get_graph(&repo1).unwrap();
781 manager.get_graph(&repo2).unwrap(); let watched = manager
784 .watcher
785 .lock()
786 .expect("watcher lock poisoned in test")
787 .watched_paths();
788
789 let repo1_manifest = GraphStorage::new(&repo1).manifest_path().to_path_buf();
790 let repo2_manifest = GraphStorage::new(&repo2).manifest_path().to_path_buf();
791 assert!(
792 !watched.contains(&repo1_manifest),
793 "evicted workspace's manifest watch must be released; still watching: {watched:?}",
794 );
795 assert!(
796 watched.contains(&repo2_manifest),
797 "current workspace must remain watched; registered: {watched:?}",
798 );
799
800 manager.shutdown().unwrap();
801 }
802
803 #[test]
804 #[ignore = "flaky: timing-sensitive file watcher test"]
805 fn file_changes_trigger_invalidation() {
806 let temp = tempdir().unwrap();
807 write_empty_graph(temp.path()).unwrap();
808
809 let manager = SessionManager::new().unwrap();
810 manager.get_graph(temp.path()).unwrap();
811 assert!(manager.cache.contains_key(temp.path()));
812
813 let storage = GraphStorage::new(temp.path());
816 std::fs::write(storage.manifest_path(), b"modified").unwrap();
817
818 let evicted = wait_until(watcher_timeout(), || {
819 manager
820 .watcher
821 .lock()
822 .expect("watcher lock poisoned in test")
823 .process_events()
824 .expect("watcher failed to process events in test");
825 !manager.cache.contains_key(temp.path())
826 });
827 assert!(evicted, "expected watcher to invalidate cache entry");
828
829 manager.shutdown().unwrap();
830 }
831
832 #[test]
833 #[ignore = "flaky: timing-sensitive background thread test"]
834 fn background_thread_processes_watcher_events() {
835 let temp = tempdir().unwrap();
836 write_empty_graph(temp.path()).unwrap();
837
838 let config = SessionConfig {
839 cleanup_interval: Duration::from_millis(50),
840 ..SessionConfig::default()
841 };
842 let manager = SessionManager::with_config(config).unwrap();
843
844 manager.get_graph(temp.path()).unwrap();
845 let storage = GraphStorage::new(temp.path());
846 std::fs::write(storage.manifest_path(), b"changed").unwrap();
847
848 let evicted = wait_until(background_timeout(), || {
849 !manager.cache.contains_key(temp.path())
850 });
851 assert!(
852 evicted,
853 "background thread failed to remove watcher-invalidated entry"
854 );
855
856 manager.shutdown().unwrap();
857 }
858
859 #[test]
860 fn background_thread_evicts_idle_entries() {
861 let temp = tempdir().unwrap();
862 write_empty_graph(temp.path()).unwrap();
863
864 let config = SessionConfig {
865 idle_timeout: Duration::from_millis(50),
866 cleanup_interval: Duration::from_millis(30),
867 ..SessionConfig::default()
868 };
869 let manager = SessionManager::with_config(config).unwrap();
870
871 manager.get_graph(temp.path()).unwrap();
872 assert_eq!(manager.stats().cached_graphs, 1);
873
874 if let Some(entry) = manager.cache.get(temp.path()) {
875 entry.value().set_last_accessed(
876 Instant::now()
877 .checked_sub(Duration::from_millis(200))
878 .unwrap(),
879 );
880 }
881
882 let evicted = wait_until(background_timeout(), || {
883 !manager.cache.contains_key(temp.path())
884 });
885 assert!(
886 evicted,
887 "background eviction thread did not remove idle entry"
888 );
889
890 manager.shutdown().unwrap();
891 }
892}