1use std::fs;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18use log::{debug, info};
19use parking_lot::{Condvar, Mutex as PlMutex};
20
21use crate::graph::CodeGraph;
22use crate::graph::unified::persistence::{GraphStorage, load_from_path};
23use crate::plugin::PluginManager;
24use crate::query::QueryExecutor;
25
26use super::cache::CachedIndex;
27use super::watcher::FileWatcher;
28use super::{SessionConfig, SessionError, SessionResult};
29
30pub struct SessionManager {
32 cache: Arc<DashMap<PathBuf, CachedIndex>>,
33 config: SessionConfig,
34 watcher: Arc<Mutex<FileWatcher>>,
35 cleanup_handle: Option<JoinHandle<()>>,
36 shutdown: Arc<(PlMutex<bool>, Condvar)>,
46 total_queries: Arc<AtomicU64>,
47 cache_hits: Arc<AtomicU64>,
48 cache_misses: Arc<AtomicU64>,
49 query_executor: QueryExecutor,
51}
52
53impl SessionManager {
54 pub fn new() -> SessionResult<Self> {
65 Self::with_config_and_plugins(SessionConfig::default(), PluginManager::new())
66 }
67
68 pub fn with_plugin_manager(plugin_manager: PluginManager) -> SessionResult<Self> {
82 Self::with_config_and_plugins(SessionConfig::default(), plugin_manager)
83 }
84
85 pub fn with_config(config: SessionConfig) -> SessionResult<Self> {
94 Self::with_config_and_plugins(config, PluginManager::new())
95 }
96
97 pub fn with_config_and_plugins(
111 config: SessionConfig,
112 plugin_manager: PluginManager,
113 ) -> SessionResult<Self> {
114 let watcher = if config.enable_file_watching {
115 FileWatcher::new()?
116 } else {
117 FileWatcher::disabled()
118 };
119
120 let cache = Arc::new(DashMap::new());
121 let watcher = Arc::new(Mutex::new(watcher));
122 let shutdown = Arc::new((PlMutex::new(false), Condvar::new()));
123
124 let cleanup_interval = config.cleanup_interval;
125 let idle_timeout = config.idle_timeout;
126
127 let cleanup_handle = {
128 let cache = Arc::clone(&cache);
129 let watcher_clone = Arc::clone(&watcher);
130 let shutdown_flag = Arc::clone(&shutdown);
131
132 thread::Builder::new()
133 .name("sqry-session-cleanup".into())
134 .spawn(move || {
135 loop {
136 if *shutdown_flag.0.lock() {
137 break;
138 }
139
140 if let Ok(mut guard) = watcher_clone.lock()
141 && let Err(err) = guard.process_events()
142 {
143 debug!("failed to process session watcher events: {err}");
144 }
145
146 let now = Instant::now();
147 if let Err(err) =
148 Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
149 {
150 debug!("failed to evict stale sessions: {err}");
151 }
152
153 let mut stop = shutdown_flag.0.lock();
159 if !*stop {
160 shutdown_flag.1.wait_for(&mut stop, cleanup_interval);
161 }
162 if *stop {
163 break;
164 }
165 }
166
167 if let Ok(mut guard) = watcher_clone.lock()
168 && let Err(err) = guard.process_events()
169 {
170 debug!("failed to process session watcher events during shutdown: {err}");
171 }
172 let now = Instant::now();
173 if let Err(err) =
174 Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
175 {
176 debug!("failed to evict stale sessions during shutdown: {err}");
177 }
178 })
179 .map_err(SessionError::SpawnThread)?
180 };
181
182 let query_executor = QueryExecutor::with_plugin_manager(plugin_manager);
184
185 Ok(Self {
186 cache,
187 config,
188 watcher,
189 cleanup_handle: Some(cleanup_handle),
190 shutdown,
191 total_queries: Arc::new(AtomicU64::new(0)),
192 cache_hits: Arc::new(AtomicU64::new(0)),
193 cache_misses: Arc::new(AtomicU64::new(0)),
194 query_executor,
195 })
196 }
197
198 pub fn get_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
204 self.get_or_load_graph(path)
205 }
206
207 pub fn query(
218 &self,
219 path: &Path,
220 query_str: &str,
221 ) -> SessionResult<crate::query::results::QueryResults> {
222 let results = self
224 .query_executor
225 .execute_on_graph(query_str, path)
226 .map_err(|e| {
227 let error_msg = e.to_string();
229 if error_msg.contains("parse")
230 || error_msg.contains("unexpected")
231 || error_msg.contains("expected")
232 {
233 SessionError::QueryParse(error_msg)
234 } else {
235 SessionError::QueryExecution(e)
236 }
237 })?;
238
239 Ok(results)
240 }
241
242 pub fn invalidate(&self, path: &Path) -> SessionResult<()> {
248 self.cache.remove(path);
249 if let Ok(mut watcher) = self.watcher.lock() {
250 watcher.unwatch(path)?;
251 }
252 Ok(())
253 }
254
255 pub fn preload(&self, path: &Path) -> SessionResult<()> {
262 if self.cache.contains_key(path) {
263 return Ok(());
264 }
265
266 if self.cache.len() >= self.config.max_cached_indexes {
267 self.evict_lru();
268 }
269
270 self.load_graph_from_disk(path)?;
271 Ok(())
272 }
273
274 #[must_use]
276 pub fn stats(&self) -> SessionStats {
277 SessionStats {
278 cached_graphs: self.cache.len(),
279 total_queries: self.total_queries.load(Ordering::Relaxed),
280 cache_hits: self.cache_hits.load(Ordering::Relaxed),
281 cache_misses: self.cache_misses.load(Ordering::Relaxed),
282 total_memory_mb: self.cache.len() * 51,
283 }
284 }
285
286 pub fn evict_stale(&self, now: Instant) -> SessionResult<usize> {
292 Self::evict_stale_for(&self.cache, &self.watcher, self.config.idle_timeout, now)
293 }
294
295 fn get_or_load_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
296 debug_assert!(
297 self.config.max_cached_indexes > 0,
298 "SessionConfig::max_cached_indexes must be at least 1"
299 );
300 self.total_queries.fetch_add(1, Ordering::Relaxed);
301
302 if let Some(entry) = self.cache.get_mut(path) {
303 entry.value().access();
304 self.cache_hits.fetch_add(1, Ordering::Relaxed);
305 return Ok(Arc::clone(&entry.value().graph));
306 }
307
308 self.cache_misses.fetch_add(1, Ordering::Relaxed);
309
310 if self.cache.len() >= self.config.max_cached_indexes {
311 self.evict_lru();
312 }
313
314 let graph = self.load_graph_from_disk(path)?;
315
316 if let Some(entry) = self.cache.get_mut(path) {
317 entry.value().access();
318 }
319
320 Ok(graph)
321 }
322
323 fn load_graph_from_disk(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
324 let storage = GraphStorage::new(path);
325 let snapshot_path = storage.snapshot_path();
326
327 let metadata =
328 fs::metadata(snapshot_path).map_err(|source| SessionError::IndexMetadata {
329 path: snapshot_path.to_path_buf(),
330 source,
331 })?;
332 let file_mtime = metadata
333 .modified()
334 .map_err(|source| SessionError::IndexMetadata {
335 path: snapshot_path.to_path_buf(),
336 source,
337 })?;
338
339 let graph = load_from_path(snapshot_path, Some(self.query_executor.plugin_manager()))
340 .map_err(|source| SessionError::IndexLoad {
341 path: snapshot_path.to_path_buf(),
342 source: source.into(),
343 })?;
344 let arc_graph = Arc::new(graph);
345
346 let cached = CachedIndex::new(Arc::clone(&arc_graph), file_mtime);
347 self.cache.insert(path.to_path_buf(), cached);
348
349 self.register_watcher(path, storage.manifest_path());
350
351 Ok(arc_graph)
352 }
353
354 fn register_watcher(&self, workspace_path: &Path, manifest_path: &Path) {
363 if let Ok(mut watcher) = self.watcher.lock() {
364 let cache = Arc::clone(&self.cache);
365 let callback_path = workspace_path.to_path_buf();
366 let watch_path = manifest_path.to_path_buf();
367 if let Err(err) = watcher.watch(watch_path.clone(), move || {
368 cache.remove(&callback_path);
369 info!(
370 "Invalidated session cache after graph change: {}",
371 callback_path.display()
372 );
373 }) {
374 debug!(
375 "failed to register file watcher for {}: {err}",
376 watch_path.display()
377 );
378 }
379 }
380 }
381
382 fn evict_lru(&self) {
384 let mut oldest: Option<(PathBuf, Instant)> = None;
385
386 for entry in self.cache.iter() {
387 let last_accessed = entry.value().last_accessed();
388 if oldest
389 .as_ref()
390 .is_none_or(|(_, instant)| last_accessed < *instant)
391 {
392 oldest = Some((entry.key().clone(), last_accessed));
393 }
394 }
395
396 if let Some((path, _)) = oldest {
397 self.cache.remove(&path);
398 debug!("evicted LRU session cache entry: {}", path.display());
399 if let Ok(mut watcher) = self.watcher.lock() {
400 let storage = GraphStorage::new(&path);
404 if let Err(err) = watcher.unwatch(storage.manifest_path()) {
405 debug!("failed to unwatch session path {}: {err}", path.display());
406 }
407 }
408 }
409 }
410
411 fn evict_stale_for(
412 cache: &Arc<DashMap<PathBuf, CachedIndex>>,
413 watcher: &Arc<Mutex<FileWatcher>>,
414 timeout: Duration,
415 now: Instant,
416 ) -> SessionResult<usize> {
417 let mut to_remove = Vec::new();
418
419 for entry in cache.iter() {
420 let idle = now.duration_since(entry.value().last_accessed());
421 if idle > timeout {
422 to_remove.push(entry.key().clone());
423 }
424 }
425
426 for path in &to_remove {
427 cache.remove(path);
428 }
429
430 if !to_remove.is_empty()
431 && let Ok(mut guard) = watcher.lock()
432 {
433 for path in &to_remove {
434 let storage = GraphStorage::new(path);
439 guard.unwatch(storage.manifest_path())?;
440 }
441 }
442
443 Ok(to_remove.len())
444 }
445
446 fn stop_worker(&mut self) {
447 {
448 let mut stop = self.shutdown.0.lock();
449 *stop = true;
450 self.shutdown.1.notify_all();
451 }
452
453 if let Some(handle) = self.cleanup_handle.take()
454 && let Err(err) = handle.join()
455 {
456 debug!("session cleanup thread terminated with error: {err:?}");
457 }
458 }
459
460 pub fn shutdown(mut self) -> SessionResult<()> {
466 self.stop_worker();
467 Ok(())
468 }
469}
470
471impl Drop for SessionManager {
472 fn drop(&mut self) {
473 self.stop_worker();
474 }
475}
476
477#[derive(Debug, Clone)]
479pub struct SessionStats {
480 pub cached_graphs: usize,
482 pub total_queries: u64,
484 pub cache_hits: u64,
486 pub cache_misses: u64,
488 pub total_memory_mb: usize,
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use crate::graph::unified::persistence::save_to_path;
496 use std::sync::Arc;
497 use std::thread;
498 use std::time::{Duration, Instant};
499 use tempfile::tempdir;
500
501 fn write_empty_graph(dir: &Path) -> SessionResult<()> {
502 let storage = GraphStorage::new(dir);
503 fs::create_dir_all(storage.graph_dir()).map_err(|source| SessionError::IndexMetadata {
504 path: storage.graph_dir().to_path_buf(),
505 source,
506 })?;
507 let graph = CodeGraph::new();
508 save_to_path(&graph, storage.snapshot_path()).map_err(|source| {
509 SessionError::IndexLoad {
510 path: storage.snapshot_path().to_path_buf(),
511 source: source.into(),
512 }
513 })?;
514 fs::write(storage.manifest_path(), b"{}").map_err(|source| {
520 SessionError::IndexMetadata {
521 path: storage.manifest_path().to_path_buf(),
522 source,
523 }
524 })?;
525 Ok(())
526 }
527
528 fn watcher_timeout() -> Duration {
529 let base = if cfg!(target_os = "macos") {
530 Duration::from_secs(3)
531 } else {
532 Duration::from_secs(2)
533 };
534
535 if std::env::var("CI").is_ok() {
536 base * 2
537 } else {
538 base
539 }
540 }
541
542 fn background_timeout() -> Duration {
543 let base = if cfg!(target_os = "macos") {
544 Duration::from_secs(5)
545 } else {
546 Duration::from_secs(3)
547 };
548
549 if std::env::var("CI").is_ok() {
550 base * 2
551 } else {
552 base
553 }
554 }
555
556 fn wait_until<F>(timeout: Duration, mut predicate: F) -> bool
557 where
558 F: FnMut() -> bool,
559 {
560 let deadline = Instant::now() + timeout;
561 loop {
562 if predicate() {
563 return true;
564 }
565 if Instant::now() >= deadline {
566 return false;
567 }
568 thread::sleep(Duration::from_millis(50));
569 }
570 }
571
572 #[test]
573 fn get_graph_loads_and_updates_stats() {
574 let temp = tempdir().unwrap();
575 write_empty_graph(temp.path()).unwrap();
576
577 let manager = SessionManager::new().unwrap();
578
579 let graph = manager.get_graph(temp.path()).unwrap();
580 assert_eq!(graph.snapshot().nodes().len(), 0);
581
582 let stats = manager.stats();
583 assert_eq!(stats.total_queries, 1);
584 assert_eq!(stats.cache_misses, 1);
585 assert_eq!(stats.cache_hits, 0);
586 assert_eq!(stats.cached_graphs, 1);
587
588 manager.shutdown().unwrap();
589 }
590
591 #[test]
592 fn get_graph_missing_returns_error() {
593 let temp = tempdir().unwrap();
594
595 let manager = SessionManager::new().unwrap();
596 let err = manager
597 .get_graph(temp.path())
598 .expect_err("get_graph should fail without graph");
599
600 assert!(matches!(err, SessionError::IndexMetadata { .. }));
601 manager.shutdown().unwrap();
602 }
603
604 #[test]
605 fn preload_does_not_affect_stats() {
606 let temp = tempdir().unwrap();
607 write_empty_graph(temp.path()).unwrap();
608
609 let manager = SessionManager::new().unwrap();
610 manager.preload(temp.path()).unwrap();
611
612 let stats = manager.stats();
613 assert_eq!(stats.total_queries, 0);
614 assert_eq!(stats.cache_hits, 0);
615 assert_eq!(stats.cache_misses, 0);
616 assert_eq!(stats.cached_graphs, 1);
617
618 manager.get_graph(temp.path()).unwrap();
619 let after = manager.stats();
620 assert_eq!(after.total_queries, 1);
621 assert_eq!(after.cache_hits, 1);
622 assert_eq!(after.cache_misses, 0);
623 assert_eq!(after.cached_graphs, 1);
624
625 manager.shutdown().unwrap();
626 }
627
628 #[test]
629 fn second_access_hits_cache() {
630 let temp = tempdir().unwrap();
631 write_empty_graph(temp.path()).unwrap();
632
633 let manager = SessionManager::new().unwrap();
634
635 manager.get_graph(temp.path()).unwrap();
636 manager.get_graph(temp.path()).unwrap();
637
638 let stats = manager.stats();
639 assert_eq!(stats.total_queries, 2);
640 assert_eq!(stats.cache_misses, 1);
641 assert_eq!(stats.cache_hits, 1);
642 assert_eq!(stats.cached_graphs, 1);
643
644 manager.shutdown().unwrap();
645 }
646
647 #[test]
648 fn concurrent_access_shares_cache() {
649 let temp = tempdir().unwrap();
650 write_empty_graph(temp.path()).unwrap();
651
652 let manager = Arc::new(SessionManager::new().unwrap());
653 let path = temp.path().to_path_buf();
654
655 let handles: Vec<_> = (0..6)
656 .map(|_| {
657 let mgr = Arc::clone(&manager);
658 let path = path.clone();
659 thread::spawn(move || {
660 mgr.get_graph(&path).unwrap();
661 })
662 })
663 .collect();
664
665 for handle in handles {
666 handle.join().unwrap();
667 }
668
669 let manager = Arc::into_inner(manager).expect("no outstanding references");
670 manager.shutdown().unwrap();
671 }
672
673 #[test]
674 fn invalidate_removes_cached_graph() {
675 let temp = tempdir().unwrap();
676 write_empty_graph(temp.path()).unwrap();
677
678 let manager = SessionManager::new().unwrap();
679 manager.get_graph(temp.path()).unwrap();
680 assert_eq!(manager.stats().cached_graphs, 1);
681
682 manager.invalidate(temp.path()).unwrap();
683 assert_eq!(manager.stats().cached_graphs, 0);
684
685 manager.shutdown().unwrap();
686 }
687
688 #[test]
689 fn lru_eviction_removes_oldest_entry() {
690 let temp = tempdir().unwrap();
691 let base = temp.path();
692
693 let config = SessionConfig {
694 max_cached_indexes: 2,
695 ..SessionConfig::default()
696 };
697 let manager = SessionManager::with_config(config).unwrap();
698
699 let repo1 = base.join("repo1");
700 let repo2 = base.join("repo2");
701 let repo3 = base.join("repo3");
702 write_empty_graph(&repo1).unwrap();
703 write_empty_graph(&repo2).unwrap();
704 write_empty_graph(&repo3).unwrap();
705
706 manager.get_graph(&repo1).unwrap();
707 manager.get_graph(&repo2).unwrap();
708
709 manager.get_graph(&repo2).unwrap();
711 manager.get_graph(&repo3).unwrap();
712
713 assert_eq!(manager.stats().cached_graphs, 2);
714 assert!(manager.cache.contains_key(&repo2));
715 assert!(manager.cache.contains_key(&repo3));
716 assert!(!manager.cache.contains_key(&repo1));
717
718 manager.shutdown().unwrap();
719 }
720
721 #[test]
722 fn evict_stale_purges_idle_entries() {
723 let temp = tempdir().unwrap();
724 write_empty_graph(temp.path()).unwrap();
725
726 let config = SessionConfig {
727 idle_timeout: Duration::from_millis(100),
728 cleanup_interval: Duration::from_secs(3600),
729 ..SessionConfig::default()
730 };
731 let manager = SessionManager::with_config(config).unwrap();
732
733 manager.get_graph(temp.path()).unwrap();
734 assert_eq!(manager.stats().cached_graphs, 1);
735
736 if let Some(entry) = manager.cache.get(temp.path()) {
738 entry.value().set_last_accessed(
739 Instant::now()
740 .checked_sub(Duration::from_millis(200))
741 .unwrap(),
742 );
743 }
744
745 let evicted = manager.evict_stale(Instant::now()).unwrap();
746 assert_eq!(evicted, 1);
747 assert_eq!(manager.stats().cached_graphs, 0);
748
749 manager.shutdown().unwrap();
750 }
751
752 #[test]
753 fn register_watcher_uses_manifest_path() {
754 let temp = tempdir().unwrap();
761 write_empty_graph(temp.path()).unwrap();
762
763 let manager = SessionManager::new().unwrap();
764 manager.get_graph(temp.path()).unwrap();
765
766 let storage = GraphStorage::new(temp.path());
767 let watched = manager
768 .watcher
769 .lock()
770 .expect("watcher lock poisoned in test")
771 .watched_paths();
772
773 assert!(
774 watched.contains(&storage.manifest_path().to_path_buf()),
775 "watcher must be registered for manifest path {} (registered: {:?})",
776 storage.manifest_path().display(),
777 watched,
778 );
779 assert!(
780 !watched.contains(&storage.snapshot_path().to_path_buf()),
781 "watcher must NOT be registered for snapshot path {} (registered: {:?})",
782 storage.snapshot_path().display(),
783 watched,
784 );
785
786 manager.shutdown().unwrap();
787 }
788
789 #[test]
790 fn evict_lru_unwatches_manifest_path() {
791 let temp = tempdir().unwrap();
795 let base = temp.path();
796
797 let config = SessionConfig {
798 max_cached_indexes: 1,
799 ..SessionConfig::default()
800 };
801 let manager = SessionManager::with_config(config).unwrap();
802
803 let repo1 = base.join("repo1");
804 let repo2 = base.join("repo2");
805 write_empty_graph(&repo1).unwrap();
806 write_empty_graph(&repo2).unwrap();
807
808 manager.get_graph(&repo1).unwrap();
809 manager.get_graph(&repo2).unwrap(); let watched = manager
812 .watcher
813 .lock()
814 .expect("watcher lock poisoned in test")
815 .watched_paths();
816
817 let repo1_manifest = GraphStorage::new(&repo1).manifest_path().to_path_buf();
818 let repo2_manifest = GraphStorage::new(&repo2).manifest_path().to_path_buf();
819 assert!(
820 !watched.contains(&repo1_manifest),
821 "evicted workspace's manifest watch must be released; still watching: {watched:?}",
822 );
823 assert!(
824 watched.contains(&repo2_manifest),
825 "current workspace must remain watched; registered: {watched:?}",
826 );
827
828 manager.shutdown().unwrap();
829 }
830
831 #[test]
832 #[ignore = "flaky: timing-sensitive file watcher test"]
833 fn file_changes_trigger_invalidation() {
834 let temp = tempdir().unwrap();
835 write_empty_graph(temp.path()).unwrap();
836
837 let manager = SessionManager::new().unwrap();
838 manager.get_graph(temp.path()).unwrap();
839 assert!(manager.cache.contains_key(temp.path()));
840
841 let storage = GraphStorage::new(temp.path());
844 std::fs::write(storage.manifest_path(), b"modified").unwrap();
845
846 let evicted = wait_until(watcher_timeout(), || {
847 manager
848 .watcher
849 .lock()
850 .expect("watcher lock poisoned in test")
851 .process_events()
852 .expect("watcher failed to process events in test");
853 !manager.cache.contains_key(temp.path())
854 });
855 assert!(evicted, "expected watcher to invalidate cache entry");
856
857 manager.shutdown().unwrap();
858 }
859
860 #[test]
861 #[ignore = "flaky: timing-sensitive background thread test"]
862 fn background_thread_processes_watcher_events() {
863 let temp = tempdir().unwrap();
864 write_empty_graph(temp.path()).unwrap();
865
866 let config = SessionConfig {
867 cleanup_interval: Duration::from_millis(50),
868 ..SessionConfig::default()
869 };
870 let manager = SessionManager::with_config(config).unwrap();
871
872 manager.get_graph(temp.path()).unwrap();
873 let storage = GraphStorage::new(temp.path());
874 std::fs::write(storage.manifest_path(), b"changed").unwrap();
875
876 let evicted = wait_until(background_timeout(), || {
877 !manager.cache.contains_key(temp.path())
878 });
879 assert!(
880 evicted,
881 "background thread failed to remove watcher-invalidated entry"
882 );
883
884 manager.shutdown().unwrap();
885 }
886
887 #[test]
888 fn background_thread_evicts_idle_entries() {
889 let temp = tempdir().unwrap();
890 write_empty_graph(temp.path()).unwrap();
891
892 let config = SessionConfig {
893 idle_timeout: Duration::from_millis(50),
894 cleanup_interval: Duration::from_millis(30),
895 ..SessionConfig::default()
896 };
897 let manager = SessionManager::with_config(config).unwrap();
898
899 manager.get_graph(temp.path()).unwrap();
900 assert_eq!(manager.stats().cached_graphs, 1);
901
902 if let Some(entry) = manager.cache.get(temp.path()) {
903 entry.value().set_last_accessed(
904 Instant::now()
905 .checked_sub(Duration::from_millis(200))
906 .unwrap(),
907 );
908 }
909
910 let evicted = wait_until(background_timeout(), || {
911 !manager.cache.contains_key(temp.path())
912 });
913 assert!(
914 evicted,
915 "background eviction thread did not remove idle entry"
916 );
917
918 manager.shutdown().unwrap();
919 }
920}