1use std::fs;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use std::sync::Mutex;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::thread::{self, JoinHandle};
15use std::time::{Duration, Instant};
16
17use dashmap::DashMap;
18use log::{debug, info};
19
20use crate::graph::CodeGraph;
21use crate::graph::unified::persistence::{GraphStorage, load_from_path};
22use crate::plugin::PluginManager;
23use crate::query::QueryExecutor;
24
25use super::cache::CachedIndex;
26use super::watcher::FileWatcher;
27use super::{SessionConfig, SessionError, SessionResult};
28
29pub struct SessionManager {
31 cache: Arc<DashMap<PathBuf, CachedIndex>>,
32 config: SessionConfig,
33 watcher: Arc<Mutex<FileWatcher>>,
34 cleanup_handle: Option<JoinHandle<()>>,
35 shutdown: Arc<AtomicBool>,
36 total_queries: Arc<AtomicU64>,
37 cache_hits: Arc<AtomicU64>,
38 cache_misses: Arc<AtomicU64>,
39 query_executor: QueryExecutor,
41}
42
43impl SessionManager {
44 pub fn new() -> SessionResult<Self> {
55 Self::with_config_and_plugins(SessionConfig::default(), PluginManager::new())
56 }
57
58 pub fn with_plugin_manager(plugin_manager: PluginManager) -> SessionResult<Self> {
72 Self::with_config_and_plugins(SessionConfig::default(), plugin_manager)
73 }
74
75 pub fn with_config(config: SessionConfig) -> SessionResult<Self> {
84 Self::with_config_and_plugins(config, PluginManager::new())
85 }
86
87 pub fn with_config_and_plugins(
101 config: SessionConfig,
102 plugin_manager: PluginManager,
103 ) -> SessionResult<Self> {
104 let watcher = if config.enable_file_watching {
105 FileWatcher::new()?
106 } else {
107 FileWatcher::disabled()
108 };
109
110 let cache = Arc::new(DashMap::new());
111 let watcher = Arc::new(Mutex::new(watcher));
112 let shutdown = Arc::new(AtomicBool::new(false));
113
114 let cleanup_interval = config.cleanup_interval;
115 let idle_timeout = config.idle_timeout;
116
117 let cleanup_handle = {
118 let cache = Arc::clone(&cache);
119 let watcher_clone = Arc::clone(&watcher);
120 let shutdown_flag = Arc::clone(&shutdown);
121
122 thread::Builder::new()
123 .name("sqry-session-cleanup".into())
124 .spawn(move || {
125 while !shutdown_flag.load(Ordering::Relaxed) {
126 if let Ok(mut guard) = watcher_clone.lock()
127 && let Err(err) = guard.process_events()
128 {
129 debug!("failed to process session watcher events: {err}");
130 }
131
132 let now = Instant::now();
133 if let Err(err) =
134 Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
135 {
136 debug!("failed to evict stale sessions: {err}");
137 }
138
139 thread::park_timeout(cleanup_interval);
140 }
141
142 if let Ok(mut guard) = watcher_clone.lock()
143 && let Err(err) = guard.process_events()
144 {
145 debug!("failed to process session watcher events during shutdown: {err}");
146 }
147 let now = Instant::now();
148 if let Err(err) =
149 Self::evict_stale_for(&cache, &watcher_clone, idle_timeout, now)
150 {
151 debug!("failed to evict stale sessions during shutdown: {err}");
152 }
153 })
154 .map_err(SessionError::SpawnThread)?
155 };
156
157 let query_executor = QueryExecutor::with_plugin_manager(plugin_manager);
159
160 Ok(Self {
161 cache,
162 config,
163 watcher,
164 cleanup_handle: Some(cleanup_handle),
165 shutdown,
166 total_queries: Arc::new(AtomicU64::new(0)),
167 cache_hits: Arc::new(AtomicU64::new(0)),
168 cache_misses: Arc::new(AtomicU64::new(0)),
169 query_executor,
170 })
171 }
172
173 pub fn get_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
179 self.get_or_load_graph(path)
180 }
181
182 pub fn query(
193 &self,
194 path: &Path,
195 query_str: &str,
196 ) -> SessionResult<crate::query::results::QueryResults> {
197 let results = self
199 .query_executor
200 .execute_on_graph(query_str, path)
201 .map_err(|e| {
202 let error_msg = e.to_string();
204 if error_msg.contains("parse")
205 || error_msg.contains("unexpected")
206 || error_msg.contains("expected")
207 {
208 SessionError::QueryParse(error_msg)
209 } else {
210 SessionError::QueryExecution(e)
211 }
212 })?;
213
214 Ok(results)
215 }
216
217 pub fn invalidate(&self, path: &Path) -> SessionResult<()> {
223 self.cache.remove(path);
224 if let Ok(mut watcher) = self.watcher.lock() {
225 watcher.unwatch(path)?;
226 }
227 Ok(())
228 }
229
230 pub fn preload(&self, path: &Path) -> SessionResult<()> {
237 if self.cache.contains_key(path) {
238 return Ok(());
239 }
240
241 if self.cache.len() >= self.config.max_cached_indexes {
242 self.evict_lru();
243 }
244
245 self.load_graph_from_disk(path)?;
246 Ok(())
247 }
248
249 #[must_use]
251 pub fn stats(&self) -> SessionStats {
252 SessionStats {
253 cached_graphs: self.cache.len(),
254 total_queries: self.total_queries.load(Ordering::Relaxed),
255 cache_hits: self.cache_hits.load(Ordering::Relaxed),
256 cache_misses: self.cache_misses.load(Ordering::Relaxed),
257 total_memory_mb: self.cache.len() * 51,
258 }
259 }
260
261 pub fn evict_stale(&self, now: Instant) -> SessionResult<usize> {
267 Self::evict_stale_for(&self.cache, &self.watcher, self.config.idle_timeout, now)
268 }
269
270 fn get_or_load_graph(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
271 debug_assert!(
272 self.config.max_cached_indexes > 0,
273 "SessionConfig::max_cached_indexes must be at least 1"
274 );
275 self.total_queries.fetch_add(1, Ordering::Relaxed);
276
277 if let Some(entry) = self.cache.get_mut(path) {
278 entry.value().access();
279 self.cache_hits.fetch_add(1, Ordering::Relaxed);
280 return Ok(Arc::clone(&entry.value().graph));
281 }
282
283 self.cache_misses.fetch_add(1, Ordering::Relaxed);
284
285 if self.cache.len() >= self.config.max_cached_indexes {
286 self.evict_lru();
287 }
288
289 let graph = self.load_graph_from_disk(path)?;
290
291 if let Some(entry) = self.cache.get_mut(path) {
292 entry.value().access();
293 }
294
295 Ok(graph)
296 }
297
298 fn load_graph_from_disk(&self, path: &Path) -> SessionResult<Arc<CodeGraph>> {
299 let storage = GraphStorage::new(path);
300 let snapshot_path = storage.snapshot_path();
301
302 let metadata =
303 fs::metadata(snapshot_path).map_err(|source| SessionError::IndexMetadata {
304 path: snapshot_path.to_path_buf(),
305 source,
306 })?;
307 let file_mtime = metadata
308 .modified()
309 .map_err(|source| SessionError::IndexMetadata {
310 path: snapshot_path.to_path_buf(),
311 source,
312 })?;
313
314 let graph = load_from_path(snapshot_path, Some(self.query_executor.plugin_manager()))
315 .map_err(|source| SessionError::IndexLoad {
316 path: snapshot_path.to_path_buf(),
317 source: source.into(),
318 })?;
319 let arc_graph = Arc::new(graph);
320
321 let cached = CachedIndex::new(Arc::clone(&arc_graph), file_mtime);
322 self.cache.insert(path.to_path_buf(), cached);
323
324 self.register_watcher(path, snapshot_path);
325
326 Ok(arc_graph)
327 }
328
329 fn register_watcher(&self, workspace_path: &Path, snapshot_path: &Path) {
330 if let Ok(mut watcher) = self.watcher.lock() {
331 let cache = Arc::clone(&self.cache);
332 let callback_path = workspace_path.to_path_buf();
333 let watch_path = snapshot_path.to_path_buf();
334 if let Err(err) = watcher.watch(watch_path.clone(), move || {
335 cache.remove(&callback_path);
336 info!(
337 "Invalidated session cache after graph change: {}",
338 callback_path.display()
339 );
340 }) {
341 debug!(
342 "failed to register file watcher for {}: {err}",
343 watch_path.display()
344 );
345 }
346 }
347 }
348
349 fn evict_lru(&self) {
351 let mut oldest: Option<(PathBuf, Instant)> = None;
352
353 for entry in self.cache.iter() {
354 let last_accessed = entry.value().last_accessed();
355 if oldest
356 .as_ref()
357 .is_none_or(|(_, instant)| last_accessed < *instant)
358 {
359 oldest = Some((entry.key().clone(), last_accessed));
360 }
361 }
362
363 if let Some((path, _)) = oldest {
364 self.cache.remove(&path);
365 debug!("evicted LRU session cache entry: {}", path.display());
366 if let Ok(mut watcher) = self.watcher.lock() {
367 let storage = GraphStorage::new(&path);
368 if let Err(err) = watcher.unwatch(storage.snapshot_path()) {
369 debug!("failed to unwatch session path {}: {err}", path.display());
370 }
371 }
372 }
373 }
374
375 fn evict_stale_for(
376 cache: &Arc<DashMap<PathBuf, CachedIndex>>,
377 watcher: &Arc<Mutex<FileWatcher>>,
378 timeout: Duration,
379 now: Instant,
380 ) -> SessionResult<usize> {
381 let mut to_remove = Vec::new();
382
383 for entry in cache.iter() {
384 let idle = now.duration_since(entry.value().last_accessed());
385 if idle > timeout {
386 to_remove.push(entry.key().clone());
387 }
388 }
389
390 for path in &to_remove {
391 cache.remove(path);
392 }
393
394 if !to_remove.is_empty()
395 && let Ok(mut guard) = watcher.lock()
396 {
397 for path in &to_remove {
398 let storage = GraphStorage::new(path);
399 guard.unwatch(storage.snapshot_path())?;
400 }
401 }
402
403 Ok(to_remove.len())
404 }
405
406 fn stop_worker(&mut self) {
407 self.shutdown.store(true, Ordering::Relaxed);
408
409 if let Some(handle) = self.cleanup_handle.take() {
410 handle.thread().unpark();
411 if let Err(err) = handle.join() {
412 debug!("session cleanup thread terminated with error: {err:?}");
413 }
414 }
415 }
416
417 pub fn shutdown(mut self) -> SessionResult<()> {
423 self.stop_worker();
424 Ok(())
425 }
426}
427
428impl Drop for SessionManager {
429 fn drop(&mut self) {
430 self.stop_worker();
431 }
432}
433
434#[derive(Debug, Clone)]
436pub struct SessionStats {
437 pub cached_graphs: usize,
439 pub total_queries: u64,
441 pub cache_hits: u64,
443 pub cache_misses: u64,
445 pub total_memory_mb: usize,
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::graph::unified::persistence::save_to_path;
453 use std::sync::Arc;
454 use std::thread;
455 use std::time::{Duration, Instant};
456 use tempfile::tempdir;
457
458 fn write_empty_graph(dir: &Path) -> SessionResult<()> {
459 let storage = GraphStorage::new(dir);
460 fs::create_dir_all(storage.graph_dir()).map_err(|source| SessionError::IndexMetadata {
461 path: storage.graph_dir().to_path_buf(),
462 source,
463 })?;
464 let graph = CodeGraph::new();
465 save_to_path(&graph, storage.snapshot_path()).map_err(|source| {
466 SessionError::IndexLoad {
467 path: storage.snapshot_path().to_path_buf(),
468 source: source.into(),
469 }
470 })?;
471 Ok(())
472 }
473
474 fn watcher_timeout() -> Duration {
475 let base = if cfg!(target_os = "macos") {
476 Duration::from_secs(3)
477 } else {
478 Duration::from_secs(2)
479 };
480
481 if std::env::var("CI").is_ok() {
482 base * 2
483 } else {
484 base
485 }
486 }
487
488 fn background_timeout() -> Duration {
489 let base = if cfg!(target_os = "macos") {
490 Duration::from_secs(5)
491 } else {
492 Duration::from_secs(3)
493 };
494
495 if std::env::var("CI").is_ok() {
496 base * 2
497 } else {
498 base
499 }
500 }
501
502 fn wait_until<F>(timeout: Duration, mut predicate: F) -> bool
503 where
504 F: FnMut() -> bool,
505 {
506 let deadline = Instant::now() + timeout;
507 loop {
508 if predicate() {
509 return true;
510 }
511 if Instant::now() >= deadline {
512 return false;
513 }
514 thread::sleep(Duration::from_millis(50));
515 }
516 }
517
518 #[test]
519 fn get_graph_loads_and_updates_stats() {
520 let temp = tempdir().unwrap();
521 write_empty_graph(temp.path()).unwrap();
522
523 let manager = SessionManager::new().unwrap();
524
525 let graph = manager.get_graph(temp.path()).unwrap();
526 assert_eq!(graph.snapshot().nodes().len(), 0);
527
528 let stats = manager.stats();
529 assert_eq!(stats.total_queries, 1);
530 assert_eq!(stats.cache_misses, 1);
531 assert_eq!(stats.cache_hits, 0);
532 assert_eq!(stats.cached_graphs, 1);
533
534 manager.shutdown().unwrap();
535 }
536
537 #[test]
538 fn get_graph_missing_returns_error() {
539 let temp = tempdir().unwrap();
540
541 let manager = SessionManager::new().unwrap();
542 let err = manager
543 .get_graph(temp.path())
544 .expect_err("get_graph should fail without graph");
545
546 assert!(matches!(err, SessionError::IndexMetadata { .. }));
547 manager.shutdown().unwrap();
548 }
549
550 #[test]
551 fn preload_does_not_affect_stats() {
552 let temp = tempdir().unwrap();
553 write_empty_graph(temp.path()).unwrap();
554
555 let manager = SessionManager::new().unwrap();
556 manager.preload(temp.path()).unwrap();
557
558 let stats = manager.stats();
559 assert_eq!(stats.total_queries, 0);
560 assert_eq!(stats.cache_hits, 0);
561 assert_eq!(stats.cache_misses, 0);
562 assert_eq!(stats.cached_graphs, 1);
563
564 manager.get_graph(temp.path()).unwrap();
565 let after = manager.stats();
566 assert_eq!(after.total_queries, 1);
567 assert_eq!(after.cache_hits, 1);
568 assert_eq!(after.cache_misses, 0);
569 assert_eq!(after.cached_graphs, 1);
570
571 manager.shutdown().unwrap();
572 }
573
574 #[test]
575 fn second_access_hits_cache() {
576 let temp = tempdir().unwrap();
577 write_empty_graph(temp.path()).unwrap();
578
579 let manager = SessionManager::new().unwrap();
580
581 manager.get_graph(temp.path()).unwrap();
582 manager.get_graph(temp.path()).unwrap();
583
584 let stats = manager.stats();
585 assert_eq!(stats.total_queries, 2);
586 assert_eq!(stats.cache_misses, 1);
587 assert_eq!(stats.cache_hits, 1);
588 assert_eq!(stats.cached_graphs, 1);
589
590 manager.shutdown().unwrap();
591 }
592
593 #[test]
594 fn concurrent_access_shares_cache() {
595 let temp = tempdir().unwrap();
596 write_empty_graph(temp.path()).unwrap();
597
598 let manager = Arc::new(SessionManager::new().unwrap());
599 let path = temp.path().to_path_buf();
600
601 let handles: Vec<_> = (0..6)
602 .map(|_| {
603 let mgr = Arc::clone(&manager);
604 let path = path.clone();
605 thread::spawn(move || {
606 mgr.get_graph(&path).unwrap();
607 })
608 })
609 .collect();
610
611 for handle in handles {
612 handle.join().unwrap();
613 }
614
615 let manager = Arc::into_inner(manager).expect("no outstanding references");
616 manager.shutdown().unwrap();
617 }
618
619 #[test]
620 fn invalidate_removes_cached_graph() {
621 let temp = tempdir().unwrap();
622 write_empty_graph(temp.path()).unwrap();
623
624 let manager = SessionManager::new().unwrap();
625 manager.get_graph(temp.path()).unwrap();
626 assert_eq!(manager.stats().cached_graphs, 1);
627
628 manager.invalidate(temp.path()).unwrap();
629 assert_eq!(manager.stats().cached_graphs, 0);
630
631 manager.shutdown().unwrap();
632 }
633
634 #[test]
635 fn lru_eviction_removes_oldest_entry() {
636 let temp = tempdir().unwrap();
637 let base = temp.path();
638
639 let config = SessionConfig {
640 max_cached_indexes: 2,
641 ..SessionConfig::default()
642 };
643 let manager = SessionManager::with_config(config).unwrap();
644
645 let repo1 = base.join("repo1");
646 let repo2 = base.join("repo2");
647 let repo3 = base.join("repo3");
648 write_empty_graph(&repo1).unwrap();
649 write_empty_graph(&repo2).unwrap();
650 write_empty_graph(&repo3).unwrap();
651
652 manager.get_graph(&repo1).unwrap();
653 manager.get_graph(&repo2).unwrap();
654
655 manager.get_graph(&repo2).unwrap();
657 manager.get_graph(&repo3).unwrap();
658
659 assert_eq!(manager.stats().cached_graphs, 2);
660 assert!(manager.cache.contains_key(&repo2));
661 assert!(manager.cache.contains_key(&repo3));
662 assert!(!manager.cache.contains_key(&repo1));
663
664 manager.shutdown().unwrap();
665 }
666
667 #[test]
668 fn evict_stale_purges_idle_entries() {
669 let temp = tempdir().unwrap();
670 write_empty_graph(temp.path()).unwrap();
671
672 let config = SessionConfig {
673 idle_timeout: Duration::from_millis(100),
674 ..SessionConfig::default()
675 };
676 let manager = SessionManager::with_config(config).unwrap();
677
678 manager.get_graph(temp.path()).unwrap();
679 assert_eq!(manager.stats().cached_graphs, 1);
680
681 if let Some(entry) = manager.cache.get(temp.path()) {
683 entry.value().set_last_accessed(
684 Instant::now()
685 .checked_sub(Duration::from_millis(200))
686 .unwrap(),
687 );
688 }
689
690 let evicted = manager.evict_stale(Instant::now()).unwrap();
691 assert_eq!(evicted, 1);
692 assert_eq!(manager.stats().cached_graphs, 0);
693
694 manager.shutdown().unwrap();
695 }
696
697 #[test]
698 #[ignore = "flaky: timing-sensitive file watcher test"]
699 fn file_changes_trigger_invalidation() {
700 let temp = tempdir().unwrap();
701 write_empty_graph(temp.path()).unwrap();
702
703 let manager = SessionManager::new().unwrap();
704 manager.get_graph(temp.path()).unwrap();
705 assert!(manager.cache.contains_key(temp.path()));
706
707 let storage = GraphStorage::new(temp.path());
708 std::fs::write(storage.snapshot_path(), b"modified").unwrap();
709
710 let evicted = wait_until(watcher_timeout(), || {
711 manager
712 .watcher
713 .lock()
714 .expect("watcher lock poisoned in test")
715 .process_events()
716 .expect("watcher failed to process events in test");
717 !manager.cache.contains_key(temp.path())
718 });
719 assert!(evicted, "expected watcher to invalidate cache entry");
720
721 manager.shutdown().unwrap();
722 }
723
724 #[test]
725 #[ignore = "flaky: timing-sensitive background thread test"]
726 fn background_thread_processes_watcher_events() {
727 let temp = tempdir().unwrap();
728 write_empty_graph(temp.path()).unwrap();
729
730 let config = SessionConfig {
731 cleanup_interval: Duration::from_millis(50),
732 ..SessionConfig::default()
733 };
734 let manager = SessionManager::with_config(config).unwrap();
735
736 manager.get_graph(temp.path()).unwrap();
737 let storage = GraphStorage::new(temp.path());
738 std::fs::write(storage.snapshot_path(), b"changed").unwrap();
739
740 let evicted = wait_until(background_timeout(), || {
741 !manager.cache.contains_key(temp.path())
742 });
743 assert!(
744 evicted,
745 "background thread failed to remove watcher-invalidated entry"
746 );
747
748 manager.shutdown().unwrap();
749 }
750
751 #[test]
752 fn background_thread_evicts_idle_entries() {
753 let temp = tempdir().unwrap();
754 write_empty_graph(temp.path()).unwrap();
755
756 let config = SessionConfig {
757 idle_timeout: Duration::from_millis(50),
758 cleanup_interval: Duration::from_millis(30),
759 ..SessionConfig::default()
760 };
761 let manager = SessionManager::with_config(config).unwrap();
762
763 manager.get_graph(temp.path()).unwrap();
764 assert_eq!(manager.stats().cached_graphs, 1);
765
766 if let Some(entry) = manager.cache.get(temp.path()) {
767 entry.value().set_last_accessed(
768 Instant::now()
769 .checked_sub(Duration::from_millis(200))
770 .unwrap(),
771 );
772 }
773
774 let evicted = wait_until(background_timeout(), || {
775 !manager.cache.contains_key(temp.path())
776 });
777 assert!(
778 evicted,
779 "background eviction thread did not remove idle entry"
780 );
781
782 manager.shutdown().unwrap();
783 }
784}