1use crate::analytics::{AnalyticsData, Period};
7use crate::bookmarks::BookmarkStore;
8use crate::cache::{MetadataCache, StoredAlert};
9use crate::error::{CoreError, DegradedState, LoadReport};
10use crate::event::{DataEvent, EventBus};
11use crate::models::activity::ActivitySummary;
12use crate::models::{
13 BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
14};
15use crate::parsers::{
16 classify_tool_calls, parse_claude_global, parse_tool_calls, ClaudeGlobalStats, CodexParser,
17 CursorParser, InvocationParser, McpConfig, OpenCodeParser, Rules, SessionContentParser,
18 SessionIndexParser, SettingsParser, StatsParser,
19};
20use dashmap::DashMap;
21use moka::future::Cache;
22use parking_lot::RwLock; use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use std::time::Duration;
26use tracing::{debug, info, warn};
27
28#[derive(Debug, Clone)]
30pub struct DataStoreConfig {
31 pub max_session_metadata_count: usize,
33
34 pub max_session_content_cache_mb: usize,
36
37 pub max_concurrent_scans: usize,
39
40 pub stats_retry_count: u32,
42
43 pub stats_retry_delay: Duration,
45}
46
47impl Default for DataStoreConfig {
48 fn default() -> Self {
49 Self {
50 max_session_metadata_count: 10_000,
51 max_session_content_cache_mb: 100,
52 max_concurrent_scans: 8,
53 stats_retry_count: 3,
54 stats_retry_delay: Duration::from_millis(100),
55 }
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct McpCallStat {
62 pub server_name: String,
63 pub call_count: usize,
64 pub session_count: usize,
65 pub last_seen: Option<chrono::DateTime<chrono::Utc>>,
66}
67
68pub struct DataStore {
73 claude_home: PathBuf,
75
76 project_path: Option<PathBuf>,
78
79 config: DataStoreConfig,
81
82 stats: RwLock<Option<StatsCache>>,
84
85 settings: RwLock<MergedConfig>,
87
88 mcp_config: RwLock<Option<McpConfig>>,
90
91 rules: RwLock<Rules>,
93
94 invocation_stats: RwLock<InvocationStats>,
96
97 billing_blocks: RwLock<BillingBlockManager>,
99
100 analytics_cache: RwLock<Option<AnalyticsData>>,
102
103 discover_cache: RwLock<Option<Vec<crate::analytics::DiscoverSuggestion>>>,
105
106 sessions: DashMap<SessionId, Arc<SessionMetadata>>,
113
114 #[allow(dead_code)]
116 session_content_cache: Cache<SessionId, Vec<String>>,
117
118 event_bus: EventBus,
120
121 degraded_state: RwLock<DegradedState>,
123
124 metadata_cache: Option<Arc<MetadataCache>>,
126
127 activity_results: DashMap<String, ActivitySummary>,
129
130 live_hook_sessions: RwLock<crate::hook_state::LiveSessionFile>,
132
133 claude_global_stats: RwLock<Option<ClaudeGlobalStats>>,
135
136 bookmark_store: RwLock<BookmarkStore>,
138
139 summary_store: crate::summaries::SummaryStore,
141}
142
143#[derive(Debug, Clone)]
145pub struct ProjectLeaderboardEntry {
146 pub project_name: String,
147 pub total_sessions: usize,
148 pub total_tokens: u64,
149 pub total_cost: f64,
150 pub avg_session_cost: f64,
151}
152
153impl DataStore {
154 pub fn new(
156 claude_home: PathBuf,
157 project_path: Option<PathBuf>,
158 config: DataStoreConfig,
159 ) -> Self {
160 let session_content_cache = Cache::builder()
161 .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) .time_to_idle(Duration::from_secs(300)) .build();
164
165 let ccboard_dir = claude_home
167 .parent()
168 .unwrap_or(&claude_home)
169 .join(".ccboard");
170
171 let bookmark_store = {
173 let bookmarks_path = ccboard_dir.join("bookmarks.json");
174 match BookmarkStore::load(&bookmarks_path) {
175 Ok(store) => store,
176 Err(e) => {
177 warn!(error = %e, "Failed to load bookmark store, starting empty");
178 BookmarkStore::default()
179 }
180 }
181 };
182
183 let metadata_cache = {
185 let cache_dir = claude_home.join("cache");
186 match MetadataCache::new(&cache_dir) {
187 Ok(cache) => {
188 debug!(path = %cache_dir.display(), "Metadata cache enabled");
189 Some(Arc::new(cache))
190 }
191 Err(e) => {
192 warn!(error = %e, "Failed to create metadata cache, running without cache");
193 None
194 }
195 }
196 };
197
198 Self {
199 claude_home,
200 project_path,
201 config,
202 stats: RwLock::new(None),
203 settings: RwLock::new(MergedConfig::default()),
204 mcp_config: RwLock::new(None),
205 rules: RwLock::new(Rules::default()),
206 invocation_stats: RwLock::new(InvocationStats::new()),
207 billing_blocks: RwLock::new(BillingBlockManager::new()),
208 analytics_cache: RwLock::new(None),
209 discover_cache: RwLock::new(None),
210 sessions: DashMap::new(),
211 session_content_cache,
212 event_bus: EventBus::default_capacity(),
213 degraded_state: RwLock::new(DegradedState::Healthy),
214 metadata_cache,
215 activity_results: DashMap::new(),
216 live_hook_sessions: RwLock::new(crate::hook_state::LiveSessionFile::default()),
217 claude_global_stats: RwLock::new(None),
218 bookmark_store: RwLock::new(bookmark_store),
219 summary_store: crate::summaries::SummaryStore::new(&ccboard_dir),
220 }
221 }
222
223 pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
225 Self::new(claude_home, project_path, DataStoreConfig::default())
226 }
227
228 pub fn event_bus(&self) -> &EventBus {
230 &self.event_bus
231 }
232
233 pub fn degraded_state(&self) -> DegradedState {
235 self.degraded_state.read().clone()
236 }
237
238 pub async fn initial_load(&self) -> LoadReport {
240 let mut report = LoadReport::new();
241
242 info!(claude_home = %self.claude_home.display(), "Starting initial data load");
243
244 self.load_stats(&mut report).await;
246
247 if let Some(home) = dirs::home_dir() {
249 if let Some(global) = parse_claude_global(&home) {
250 *self.claude_global_stats.write() = Some(global);
251 debug!("~/.claude.json loaded successfully");
252 }
253 }
254
255 self.load_settings(&mut report).await;
257
258 self.load_mcp_config(&mut report).await;
260
261 self.load_rules(&mut report).await;
263
264 self.scan_sessions(&mut report).await;
266
267 self.scan_third_party_sessions(&mut report).await;
269
270 self.update_degraded_state(&report);
272
273 self.event_bus.publish(DataEvent::LoadCompleted);
275
276 info!(
277 stats_loaded = report.stats_loaded,
278 settings_loaded = report.settings_loaded,
279 sessions_scanned = report.sessions_scanned,
280 sessions_failed = report.sessions_failed,
281 errors = report.errors.len(),
282 "Initial load complete"
283 );
284
285 self.compute_has_subagents();
287
288 report
289 }
290
291 async fn load_stats(&self, report: &mut LoadReport) {
293 let stats_path = self.claude_home.join("stats-cache.json");
294 let parser = StatsParser::new()
295 .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
296
297 if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
298 stats.recalculate_costs();
300 let mut guard = self.stats.write();
301 *guard = Some(stats);
302 debug!("Stats loaded successfully with recalculated costs");
303 }
304 }
305
306 async fn load_settings(&self, report: &mut LoadReport) {
308 let parser = SettingsParser::new();
309 let merged = parser
310 .load_merged(&self.claude_home, self.project_path.as_deref(), report)
311 .await;
312
313 let mut guard = self.settings.write();
314 *guard = merged;
315 debug!("Settings loaded and merged");
316 }
317
318 async fn load_mcp_config(&self, report: &mut LoadReport) {
320 match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
321 Ok(Some(config)) => {
322 let server_count = config.servers.len();
323 let mut guard = self.mcp_config.write();
324 *guard = Some(config);
325 debug!(
326 server_count,
327 "MCP config loaded successfully (global + project)"
328 );
329 }
330 Ok(None) => {
331 debug!("No MCP config found (optional)");
332 }
333 Err(e) => {
334 use crate::error::LoadError;
335 report.add_error(LoadError::error(
336 "mcp_config",
337 format!("Failed to parse MCP config: {}", e),
338 ));
339 }
340 }
341 }
342
343 async fn load_rules(&self, report: &mut LoadReport) {
345 match Rules::load(&self.claude_home, self.project_path.as_deref()) {
346 Ok(rules) => {
347 let has_global = rules.global.is_some();
348 let has_project = rules.project.is_some();
349 let mut guard = self.rules.write();
350 *guard = rules;
351 debug!(has_global, has_project, "Rules loaded");
352 }
353 Err(e) => {
354 use crate::error::LoadError;
355 report.add_error(LoadError::error(
356 "rules",
357 format!("Failed to load rules: {}", e),
358 ));
359 }
360 }
361 }
362
363 async fn scan_sessions(&self, report: &mut LoadReport) {
365 let projects_dir = self.claude_home.join("projects");
366
367 if !projects_dir.exists() {
368 report.add_warning(
369 "sessions",
370 format!("Projects directory not found: {}", projects_dir.display()),
371 );
372 return;
373 }
374
375 let mut parser =
376 SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
377
378 if let Some(ref cache) = self.metadata_cache {
380 parser = parser.with_cache(cache.clone());
381 }
382
383 let sessions = parser.scan_all(&projects_dir, report).await;
384
385 let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
387 warn!(
388 total = sessions.len(),
389 limit = self.config.max_session_metadata_count,
390 "Session count exceeds limit, keeping most recent"
391 );
392
393 let mut sorted = sessions;
394 sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
395 sorted.truncate(self.config.max_session_metadata_count);
396 sorted
397 } else {
398 sessions
399 };
400
401 for session in sessions_to_add {
403 self.sessions.insert(session.id.clone(), Arc::new(session));
404 }
405
406 debug!(count = self.sessions.len(), "Sessions indexed");
407 }
408
409 async fn scan_third_party_sessions(&self, _report: &mut LoadReport) {
414 if let Some(codex_dir) = CodexParser::default_path() {
416 if codex_dir.exists() {
417 let sessions = CodexParser::scan(&codex_dir).await;
418 let count = sessions.len();
419 for s in sessions {
420 self.sessions.insert(s.id.clone(), Arc::new(s));
421 }
422 debug!(count, "Codex sessions indexed");
423 }
424 }
425
426 if let Some(db_path) = OpenCodeParser::default_path() {
428 if db_path.exists() {
429 let sessions = OpenCodeParser::scan(&db_path);
430 let count = sessions.len();
431 for s in sessions {
432 self.sessions.insert(s.id.clone(), Arc::new(s));
433 }
434 debug!(count, "OpenCode sessions indexed");
435 }
436 }
437
438 if let Some(cursor_dir) = CursorParser::default_dir() {
440 if cursor_dir.exists() {
441 let sessions = CursorParser::scan(&cursor_dir);
442 let count = sessions.len();
443 for s in sessions {
444 self.sessions.insert(s.id.clone(), Arc::new(s));
445 }
446 debug!(count, "Cursor sessions indexed");
447 }
448 }
449 }
450
451 fn update_degraded_state(&self, report: &LoadReport) {
453 let mut state = self.degraded_state.write();
454
455 if report.has_fatal_errors() {
456 *state = DegradedState::ReadOnly {
457 reason: "Fatal errors during load".to_string(),
458 };
459 return;
460 }
461
462 let mut missing = Vec::new();
463
464 if !report.stats_loaded {
465 missing.push("stats".to_string());
466 }
467 if !report.settings_loaded {
468 missing.push("settings".to_string());
469 }
470 if report.sessions_failed > 0 {
471 missing.push(format!("{} sessions", report.sessions_failed));
472 }
473
474 if missing.is_empty() {
475 *state = DegradedState::Healthy;
476 } else {
477 *state = DegradedState::PartialData {
478 missing: missing.clone(),
479 reason: format!("Missing: {}", missing.join(", ")),
480 };
481 }
482 }
483
484 pub fn stats(&self) -> Option<StatsCache> {
490 self.stats.read().clone()
491 }
492
493 pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
495 let sessions: Vec<_> = self
497 .sessions
498 .iter()
499 .map(|entry| Arc::clone(entry.value()))
500 .collect();
501 let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
503 crate::models::StatsCache::calculate_context_saturation(&refs, 30)
504 }
505
506 pub fn settings(&self) -> MergedConfig {
508 self.settings.read().clone()
509 }
510
511 pub fn mcp_config(&self) -> Option<McpConfig> {
513 self.mcp_config.read().clone()
514 }
515
516 pub fn rules(&self) -> Rules {
518 self.rules.read().clone()
519 }
520
521 pub fn invocation_stats(&self) -> InvocationStats {
523 self.invocation_stats.read().clone()
524 }
525
526 pub fn quota_status(&self) -> Option<crate::quota::QuotaStatus> {
530 let stats = self.stats.read().clone()?;
531 let settings = self.settings.read();
532 let budget = settings.merged.budget.as_ref()?;
533
534 Some(crate::quota::calculate_quota_status(&stats, budget))
535 }
536
537 pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
542 crate::live_monitor::detect_live_sessions().unwrap_or_default()
543 }
544
545 pub fn merged_live_sessions(&self) -> Vec<crate::live_monitor::MergedLiveSession> {
549 let hook_file = self.live_hook_sessions.read().clone();
550 let ps_sessions = crate::live_monitor::detect_live_sessions().unwrap_or_default();
551 crate::live_monitor::merge_live_sessions(&hook_file, &ps_sessions)
552 }
553
554 pub async fn reload_live_hook_sessions(&self, path: &std::path::Path) {
556 match crate::hook_state::LiveSessionFile::load(path) {
557 Ok(file) => {
558 *self.live_hook_sessions.write() = file;
559 debug!("Reloaded live hook sessions from {}", path.display());
560 }
561 Err(e) => {
562 warn!(error = %e, "Failed to reload live-sessions.json");
563 }
564 }
565 }
566
567 pub fn claude_global_stats(&self) -> Option<ClaudeGlobalStats> {
569 self.claude_global_stats.read().clone()
570 }
571
572 pub fn session_count(&self) -> usize {
574 self.sessions.len()
575 }
576
577 pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
580 self.sessions.get(id).map(|r| Arc::clone(r.value()))
581 }
582
583 pub async fn load_session_content(
596 &self,
597 session_id: &str,
598 ) -> Result<Vec<crate::models::ConversationMessage>, CoreError> {
599 let metadata = self
601 .get_session(session_id)
602 .ok_or_else(|| CoreError::SessionNotFound {
603 session_id: session_id.to_string(),
604 })?;
605
606 let session_id_owned = SessionId::from(session_id);
608 if let Some(_cached) = self.session_content_cache.get(&session_id_owned).await {
609 debug!(session_id, "Session content cache HIT");
610 }
613
614 debug!(
616 session_id,
617 path = %metadata.file_path.display(),
618 "Session content cache MISS, parsing JSONL"
619 );
620
621 let messages = SessionContentParser::parse_conversation(
622 &metadata.file_path,
623 (*metadata).clone(), )
625 .await?;
626
627 Ok(messages)
631 }
632
633 pub fn analytics(&self) -> Option<AnalyticsData> {
638 let analytics = self.analytics_cache.read().clone();
639 debug!(
640 has_analytics = analytics.is_some(),
641 "analytics() getter called"
642 );
643 analytics
644 }
645
646 pub async fn compute_analytics(&self, period: Period) {
654 let sessions: Vec<_> = self
655 .sessions
656 .iter()
657 .map(|r| Arc::clone(r.value()))
658 .collect();
659
660 info!(
661 session_count = sessions.len(),
662 period = ?period,
663 "compute_analytics() ENTRY"
664 );
665
666 let thresholds = self
668 .settings()
669 .global
670 .as_ref()
671 .and_then(|s| s.anomaly_thresholds.clone())
672 .unwrap_or_default();
673
674 let analytics = tokio::task::spawn_blocking(move || {
676 AnalyticsData::compute_with_thresholds(&sessions, period, &thresholds)
677 })
678 .await;
679
680 match analytics {
681 Ok(data) => {
682 info!(
683 insights_count = data.insights.len(),
684 "compute_analytics() computed data"
685 );
686 let mut guard = self.analytics_cache.write();
687 *guard = Some(data);
688 self.event_bus.publish(DataEvent::AnalyticsUpdated);
689 info!("compute_analytics() EXIT - cached and event published");
690 }
691 Err(e) => {
692 warn!(error = %e, "Failed to compute analytics (task panicked)");
693 }
694 }
695 }
696
697 pub fn discover(&self) -> Option<Vec<crate::analytics::DiscoverSuggestion>> {
699 self.discover_cache.read().clone()
700 }
701
702 pub async fn compute_discover(&self, max_sessions: usize, min_count: usize, top: usize) {
708 let sessions = self.recent_sessions(max_sessions);
709
710 info!(session_count = sessions.len(), "compute_discover() ENTRY");
711
712 let session_data = tokio::task::spawn_blocking(move || {
713 use std::io::BufRead;
714
715 let mut result: Vec<crate::analytics::DiscoverSessionData> = Vec::new();
716 for session in &sessions {
717 let path = session.file_path.clone();
718 let session_id = session.id.as_str().to_string();
719 let project = session.project_path.as_str().to_string();
720
721 let messages: Option<Vec<String>> = (|| {
722 let file = std::fs::File::open(&path).ok()?;
723 let reader = std::io::BufReader::new(file);
724 let mut msgs: Vec<String> = Vec::new();
725 for line in reader.lines().map_while(|l| l.ok()) {
726 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line) {
727 if v.get("type").and_then(|t| t.as_str()) == Some("user") {
728 if let Some(content) =
729 v.get("message").and_then(|m| m.get("content"))
730 {
731 let text = match content {
732 serde_json::Value::String(s) => s.clone(),
733 serde_json::Value::Array(arr) => arr
734 .iter()
735 .filter_map(|item| {
736 if item.get("type").and_then(|t| t.as_str())
737 == Some("text")
738 {
739 item.get("text")
740 .and_then(|t| t.as_str())
741 .map(|s| s.to_string())
742 } else {
743 None
744 }
745 })
746 .collect::<Vec<_>>()
747 .join(" "),
748 _ => continue,
749 };
750 if !text.trim().is_empty() && text.len() > 10 {
751 msgs.push(text);
752 }
753 }
754 }
755 }
756 }
757 if msgs.is_empty() {
758 None
759 } else {
760 Some(msgs)
761 }
762 })();
763
764 if let Some(messages) = messages {
765 result.push(crate::analytics::DiscoverSessionData {
766 session_id,
767 project,
768 messages,
769 });
770 }
771 }
772 result
773 })
774 .await;
775
776 match session_data {
777 Ok(data) => {
778 let sessions_analyzed = data.len();
779 let suggestions = crate::analytics::discover_patterns(&data, min_count, top);
780 info!(
781 sessions_analyzed,
782 suggestions_count = suggestions.len(),
783 "compute_discover() completed"
784 );
785 let mut guard = self.discover_cache.write();
786 *guard = Some(suggestions);
787 self.event_bus.publish(DataEvent::AnalyticsUpdated);
788 }
789 Err(e) => {
790 warn!(error = %e, "compute_discover() task panicked");
791 }
792 }
793 }
794
795 #[allow(dead_code)]
800 fn invalidate_analytics_cache(&self) {
801 let mut guard = self.analytics_cache.write();
802 *guard = None;
803 debug!("Analytics cache invalidated");
804 }
805
806 pub fn session_ids(&self) -> Vec<SessionId> {
808 self.sessions.iter().map(|r| r.key().clone()).collect()
809 }
810
811 pub fn clear_session_content_cache(&self) {
813 self.session_content_cache.invalidate_all();
814 debug!("Session content cache cleared");
815 }
816
817 pub fn sessions_by_project(
820 &self,
821 ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
822 let mut by_project = std::collections::HashMap::new();
823
824 for entry in self.sessions.iter() {
825 let session = Arc::clone(entry.value());
826 by_project
827 .entry(session.project_path.as_str().to_string())
828 .or_insert_with(Vec::new)
829 .push(session);
830 }
831
832 for sessions in by_project.values_mut() {
834 sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
835 }
836
837 by_project
838 }
839
840 pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
843 self.sessions
844 .iter()
845 .map(|r| Arc::clone(r.value()))
846 .collect()
847 }
848
849 pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
852 let mut sessions = self.all_sessions();
853 sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
854 sessions.truncate(limit);
855 sessions
856 }
857
858 pub fn search_sessions(&self, query: &str, limit: usize) -> Vec<crate::cache::SearchResult> {
862 if let Some(ref cache) = self.metadata_cache {
863 match cache.search_sessions(query, limit) {
864 Ok(results) => results,
865 Err(e) => {
866 warn!("FTS5 search failed: {}", e);
867 Vec::new()
868 }
869 }
870 } else {
871 Vec::new()
872 }
873 }
874
875 pub async fn analyze_session(&self, session_id: &str) -> anyhow::Result<ActivitySummary> {
880 use std::time::SystemTime;
881
882 let metadata = self
883 .get_session(session_id)
884 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
885
886 let path = &metadata.file_path;
887
888 let mtime = tokio::fs::metadata(path)
891 .await
892 .and_then(|m| m.modified())
893 .unwrap_or(SystemTime::UNIX_EPOCH);
894
895 if let Some(cache) = &self.metadata_cache {
897 if let Ok(Some(cached)) = cache.get_activity(path, mtime) {
898 self.activity_results
899 .insert(session_id.to_string(), cached.clone());
900 self.event_bus.publish(DataEvent::AnalyticsUpdated);
901 return Ok(cached);
902 }
903 }
904
905 let calls = parse_tool_calls(path, session_id).await?;
907
908 let project_root = path
909 .parent()
910 .and_then(|p| p.parent())
911 .map(|p| p.to_string_lossy().into_owned());
912
913 let summary = classify_tool_calls(calls, session_id, project_root.as_deref());
914
915 if let Some(cache) = &self.metadata_cache {
917 if let Err(e) = cache.put_activity(path, session_id, &summary, mtime) {
918 warn!(session_id, error = %e, "Failed to cache activity — will re-parse on restart");
919 }
920 }
921
922 self.activity_results
924 .insert(session_id.to_string(), summary.clone());
925 self.event_bus.publish(DataEvent::AnalyticsUpdated);
926
927 Ok(summary)
928 }
929
930 pub fn get_session_activity(&self, session_id: &str) -> Option<ActivitySummary> {
932 self.activity_results
933 .get(session_id)
934 .map(|r| r.value().clone())
935 }
936
937 pub fn get_all_stored_alerts(&self, min_severity: Option<&str>) -> Vec<StoredAlert> {
941 if let Some(cache) = &self.metadata_cache {
942 cache.get_all_alerts(min_severity).unwrap_or_default()
943 } else {
944 vec![]
945 }
946 }
947
948 pub fn mcp_call_stats(&self) -> Vec<McpCallStat> {
953 use crate::models::activity::NetworkTool;
954 use std::collections::{HashMap, HashSet};
955
956 let mut stats: HashMap<String, McpCallStat> = HashMap::new();
957
958 for entry in self.activity_results.iter() {
959 let summary = entry.value();
960 let mut servers_in_session: HashSet<String> = HashSet::new();
961
962 for call in &summary.network_calls {
963 if let NetworkTool::McpCall { server } = &call.tool {
964 let stat = stats.entry(server.clone()).or_insert_with(|| McpCallStat {
965 server_name: server.clone(),
966 call_count: 0,
967 session_count: 0,
968 last_seen: None,
969 });
970 stat.call_count += 1;
971 match stat.last_seen {
972 Some(existing) if call.timestamp > existing => {
973 stat.last_seen = Some(call.timestamp);
974 }
975 None => {
976 stat.last_seen = Some(call.timestamp);
977 }
978 _ => {}
979 }
980 servers_in_session.insert(server.clone());
981 }
982 }
983
984 for server in servers_in_session {
985 if let Some(stat) = stats.get_mut(&server) {
986 stat.session_count += 1;
987 }
988 }
989 }
990
991 let mut result: Vec<McpCallStat> = stats.into_values().collect();
992 result.sort_by(|a, b| b.call_count.cmp(&a.call_count));
993 result
994 }
995
996 pub fn all_violations(&self) -> Vec<crate::models::activity::Alert> {
1001 use crate::models::activity::{Alert, AlertSeverity};
1002 use std::collections::HashSet;
1003
1004 let mut seen_sessions: HashSet<String> = HashSet::new();
1006 let mut alerts: Vec<Alert> = Vec::new();
1007
1008 for entry in self.activity_results.iter() {
1009 seen_sessions.insert(entry.key().clone());
1010 alerts.extend(entry.value().alerts.clone());
1011 }
1012
1013 if let Some(cache) = &self.metadata_cache {
1015 if let Ok(stored) = cache.get_all_alerts(None) {
1016 for sa in stored {
1017 let session_id = std::path::Path::new(&sa.session_path)
1019 .file_stem()
1020 .and_then(|s| s.to_str())
1021 .unwrap_or(&sa.session_path)
1022 .to_string();
1023
1024 if seen_sessions.contains(&session_id) {
1025 continue; }
1027
1028 let severity = match sa.severity.as_str() {
1030 "Critical" => AlertSeverity::Critical,
1031 "Warning" => AlertSeverity::Warning,
1032 _ => AlertSeverity::Info,
1033 };
1034 let category = match sa.category.as_str() {
1035 "CredentialAccess" => {
1036 crate::models::activity::AlertCategory::CredentialAccess
1037 }
1038 "DestructiveCommand" => {
1039 crate::models::activity::AlertCategory::DestructiveCommand
1040 }
1041 "ForcePush" => crate::models::activity::AlertCategory::ForcePush,
1042 "ScopeViolation" => crate::models::activity::AlertCategory::ScopeViolation,
1043 _ => crate::models::activity::AlertCategory::ExternalExfil,
1044 };
1045 let timestamp = sa
1046 .timestamp
1047 .parse::<chrono::DateTime<chrono::Utc>>()
1048 .unwrap_or_else(|_| chrono::Utc::now());
1049
1050 alerts.push(Alert {
1051 session_id,
1052 timestamp,
1053 severity,
1054 category,
1055 detail: sa.detail,
1056 });
1057 }
1058 }
1059 }
1060
1061 alerts.sort_by(|a, b| {
1063 b.severity
1064 .partial_cmp(&a.severity)
1065 .unwrap_or(std::cmp::Ordering::Equal)
1066 .then_with(|| b.timestamp.cmp(&a.timestamp))
1067 });
1068
1069 alerts
1070 }
1071
1072 pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
1075 let mut sessions: Vec<_> = self
1076 .sessions
1077 .iter()
1078 .map(|r| Arc::clone(r.value()))
1079 .collect();
1080 sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
1081 sessions.truncate(limit);
1082 sessions
1083 }
1084
1085 pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
1088 let mut model_totals = std::collections::HashMap::new();
1089
1090 for session in self.sessions.iter() {
1092 for model in &session.value().models_used {
1093 *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
1094 }
1095 }
1096
1097 let mut results: Vec<_> = model_totals.into_iter().collect();
1099 results.sort_by(|a, b| b.1.cmp(&a.1));
1100 results.truncate(10); results
1102 }
1103
1104 pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
1107 let mut day_totals = std::collections::HashMap::new();
1108
1109 for session in self.sessions.iter() {
1111 if let Some(timestamp) = &session.value().first_timestamp {
1112 let date = timestamp.format("%Y-%m-%d").to_string();
1113 *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
1114 }
1115 }
1116
1117 let mut results: Vec<_> = day_totals.into_iter().collect();
1119 results.sort_by(|a, b| b.1.cmp(&a.1));
1120 results.truncate(10); results
1122 }
1123
1124 pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
1129 let mut project_metrics = std::collections::HashMap::new();
1130
1131 for session in self.sessions.iter() {
1133 let metadata = session.value();
1134 let project_path = &metadata.project_path;
1135
1136 let model = metadata
1138 .models_used
1139 .first()
1140 .map(|s| s.as_str())
1141 .unwrap_or("unknown");
1142
1143 let cost = crate::pricing::calculate_cost(
1145 model,
1146 metadata.input_tokens,
1147 metadata.output_tokens,
1148 metadata.cache_creation_tokens,
1149 metadata.cache_read_tokens,
1150 );
1151
1152 let entry = project_metrics
1153 .entry(project_path.clone())
1154 .or_insert((0, 0u64, 0.0f64)); entry.0 += 1; entry.1 += metadata.total_tokens; entry.2 += cost; }
1160
1161 let mut results: Vec<_> = project_metrics
1163 .into_iter()
1164 .map(
1165 |(project_path, (session_count, total_tokens, total_cost))| {
1166 let avg_session_cost = if session_count > 0 {
1167 total_cost / session_count as f64
1168 } else {
1169 0.0
1170 };
1171
1172 let project_name = std::path::Path::new(project_path.as_str())
1174 .file_name()
1175 .and_then(|n| n.to_str())
1176 .unwrap_or(project_path.as_str())
1177 .to_string();
1178
1179 ProjectLeaderboardEntry {
1180 project_name,
1181 total_sessions: session_count,
1182 total_tokens,
1183 total_cost,
1184 avg_session_cost,
1185 }
1186 },
1187 )
1188 .collect();
1189
1190 results.sort_by(|a, b| {
1192 b.total_cost
1193 .partial_cmp(&a.total_cost)
1194 .unwrap_or(std::cmp::Ordering::Equal)
1195 });
1196
1197 results
1198 }
1199
1200 pub async fn reload_stats(&self) {
1206 let stats_path = self.claude_home.join("stats-cache.json");
1207 let parser = StatsParser::new()
1208 .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
1209
1210 let mut report = LoadReport::new();
1211 if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
1212 stats.recalculate_costs();
1214 let mut guard = self.stats.write();
1215 *guard = Some(stats);
1216
1217 self.event_bus.publish(DataEvent::StatsUpdated);
1220 debug!("Stats reloaded with recalculated costs");
1221 }
1222 }
1223
1224 pub async fn reload_settings(&self) {
1226 let parser = SettingsParser::new();
1227 let merged = parser
1228 .load_merged(
1229 &self.claude_home,
1230 self.project_path.as_deref(),
1231 &mut LoadReport::new(),
1232 )
1233 .await;
1234
1235 {
1236 let mut guard = self.settings.write();
1237 *guard = merged;
1238 }
1239
1240 debug!("Settings reloaded");
1242 }
1243
1244 pub async fn update_session(&self, path: &Path) {
1246 let parser = SessionIndexParser::new();
1247
1248 match parser.scan_session(path).await {
1249 Ok(meta) => {
1250 let id = meta.id.clone();
1251 let is_new = !self.sessions.contains_key(&id);
1252
1253 self.sessions.insert(id.clone(), Arc::new(meta));
1254
1255 if is_new {
1260 self.event_bus.publish(DataEvent::SessionCreated(id));
1261 } else {
1262 self.event_bus.publish(DataEvent::SessionUpdated(id));
1263 }
1264 }
1265 Err(e) => {
1266 warn!(path = %path.display(), error = %e, "Failed to update session");
1267 }
1268 }
1269 }
1270
1271 pub async fn compute_invocations(&self) {
1276 let paths: Vec<_> = self
1277 .sessions
1278 .iter()
1279 .map(|r| r.value().file_path.clone())
1280 .collect();
1281
1282 debug!(session_count = paths.len(), "Computing invocation stats");
1283
1284 let parser = InvocationParser::new();
1285 let mut stats = parser.scan_sessions(&paths).await;
1286
1287 for session_ref in self.sessions.iter() {
1290 let session = session_ref.value();
1291 if let Some(&task_tokens) = session.tool_token_usage.get("Task") {
1292 let agent_count =
1294 session.tool_usage.get("Task").copied().unwrap_or(0).max(1) as u64;
1295 let tokens_per_agent = task_tokens / agent_count;
1296 for agent_type in stats.agents.keys().cloned().collect::<Vec<_>>() {
1298 *stats.agent_token_stats.entry(agent_type).or_insert(0) += tokens_per_agent;
1299 }
1300 }
1301 }
1302
1303 let mut guard = self.invocation_stats.write();
1304 *guard = stats;
1305
1306 debug!(
1307 agents = guard.agents.len(),
1308 commands = guard.commands.len(),
1309 skills = guard.skills.len(),
1310 total = guard.total_invocations(),
1311 "Invocation stats computed"
1312 );
1313
1314 self.event_bus.publish(DataEvent::LoadCompleted);
1316 }
1317
1318 pub async fn compute_billing_blocks(&self) {
1323 debug!("Computing billing blocks from sessions with real pricing");
1324
1325 let mut manager = BillingBlockManager::new();
1326 let mut sessions_with_timestamps = 0;
1327 let mut sessions_without_timestamps = 0;
1328
1329 for session in self.sessions.iter() {
1330 let metadata = session.value();
1331
1332 let Some(timestamp) = &metadata.first_timestamp else {
1334 sessions_without_timestamps += 1;
1335 continue;
1336 };
1337
1338 sessions_with_timestamps += 1;
1339
1340 let model = metadata
1342 .models_used
1343 .first()
1344 .map(|s| s.as_str())
1345 .unwrap_or("unknown");
1346
1347 let cost = crate::pricing::calculate_cost(
1349 model,
1350 metadata.input_tokens,
1351 metadata.output_tokens,
1352 metadata.cache_creation_tokens,
1353 metadata.cache_read_tokens,
1354 );
1355
1356 manager.add_usage(
1357 timestamp,
1358 metadata.input_tokens,
1359 metadata.output_tokens,
1360 metadata.cache_creation_tokens,
1361 metadata.cache_read_tokens,
1362 cost,
1363 );
1364 }
1365
1366 debug!(
1367 sessions_with_timestamps,
1368 sessions_without_timestamps,
1369 blocks = manager.get_all_blocks().len(),
1370 "Billing blocks computed with real pricing"
1371 );
1372
1373 let mut guard = self.billing_blocks.write();
1374 *guard = manager;
1375
1376 self.event_bus.publish(DataEvent::LoadCompleted);
1377 }
1378
1379 pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
1381 self.billing_blocks.read()
1382 }
1383
1384 pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
1386 use crate::parsers::claude_global::DetectedPlan;
1387 use crate::usage_estimator::SubscriptionPlan;
1388
1389 let settings = self.settings();
1390
1391 let plan = if let Some(s) = settings.merged.subscription_plan.as_ref() {
1393 SubscriptionPlan::parse(s)
1394 } else {
1395 let detected = self
1397 .claude_global_stats
1398 .read()
1399 .as_ref()
1400 .and_then(|g| g.detected_plan.clone());
1401
1402 match detected {
1403 Some(DetectedPlan::Pro) => SubscriptionPlan::Pro,
1404 Some(DetectedPlan::Max) => SubscriptionPlan::Max5x, Some(DetectedPlan::Api) => SubscriptionPlan::Api,
1406 None => SubscriptionPlan::Unknown,
1407 }
1408 };
1409
1410 let billing_blocks = self.billing_blocks.read();
1411 crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
1412 }
1413
1414 pub fn load_preferences(&self) -> crate::preferences::CcboardPreferences {
1416 let cache_dir = self.claude_home.join("cache");
1417 crate::preferences::CcboardPreferences::load(&cache_dir)
1418 }
1419
1420 pub fn save_preferences(
1422 &self,
1423 prefs: &crate::preferences::CcboardPreferences,
1424 ) -> anyhow::Result<()> {
1425 let cache_dir = self.claude_home.join("cache");
1426 prefs.save(&cache_dir)
1427 }
1428
1429 pub fn is_bookmarked(&self, session_id: &str) -> bool {
1433 self.bookmark_store.read().is_bookmarked(session_id)
1434 }
1435
1436 pub fn bookmark_entry(&self, session_id: &str) -> Option<crate::bookmarks::BookmarkEntry> {
1439 self.bookmark_store.read().get(session_id).cloned()
1440 }
1441
1442 pub fn toggle_bookmark(&self, session_id: &str) -> anyhow::Result<bool> {
1445 self.bookmark_store.write().toggle(session_id, "bookmarked")
1446 }
1447
1448 pub fn upsert_bookmark(
1450 &self,
1451 session_id: &str,
1452 tag: impl Into<String>,
1453 note: Option<String>,
1454 ) -> anyhow::Result<()> {
1455 self.bookmark_store.write().upsert(session_id, tag, note)
1456 }
1457
1458 pub fn remove_bookmark(&self, session_id: &str) -> anyhow::Result<bool> {
1460 self.bookmark_store.write().remove(session_id)
1461 }
1462
1463 pub fn bookmark_count(&self) -> usize {
1465 self.bookmark_store.read().len()
1466 }
1467
1468 pub fn has_summary(&self, session_id: &str) -> bool {
1470 self.summary_store.has_summary(session_id)
1471 }
1472
1473 pub fn load_summary(&self, session_id: &str) -> Option<String> {
1475 self.summary_store.load(session_id)
1476 }
1477
1478 pub fn subagent_children(&self, parent_id: &str) -> Vec<Arc<SessionMetadata>> {
1481 self.sessions
1482 .iter()
1483 .filter(|entry| {
1484 entry
1485 .value()
1486 .parent_session_id
1487 .as_deref()
1488 .map(|pid| pid == parent_id)
1489 .unwrap_or(false)
1490 })
1491 .map(|entry| Arc::clone(entry.value()))
1492 .collect()
1493 }
1494
1495 pub fn compute_has_subagents(&self) {
1499 let parent_ids: std::collections::HashSet<String> = self
1501 .sessions
1502 .iter()
1503 .filter_map(|entry| entry.value().parent_session_id.clone())
1504 .collect();
1505
1506 if parent_ids.is_empty() {
1507 return;
1508 }
1509
1510 for mut entry in self.sessions.iter_mut() {
1512 if parent_ids.contains(entry.value().id.as_str()) {
1513 let current: &SessionMetadata = entry.value();
1514 let updated = SessionMetadata {
1515 has_subagents: true,
1516 ..current.clone()
1517 };
1518 *entry.value_mut() = Arc::new(updated);
1519 }
1520 }
1521 }
1522}
1523
1524#[cfg(test)]
1525mod tests {
1526 use super::*;
1527 use tempfile::tempdir;
1528
1529 #[tokio::test]
1530 async fn test_data_store_creation() {
1531 let dir = tempdir().unwrap();
1532 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1533
1534 assert_eq!(store.session_count(), 0);
1535 assert!(store.stats().is_none());
1536 assert!(store.degraded_state().is_healthy());
1537 }
1538
1539 #[tokio::test]
1540 async fn test_initial_load_missing_dir() {
1541 let dir = tempdir().unwrap();
1542 let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
1543
1544 let report = store.initial_load().await;
1545
1546 assert!(report.has_errors());
1548 assert!(store.degraded_state().is_degraded());
1549 }
1550
1551 #[tokio::test]
1552 async fn test_initial_load_with_stats() {
1553 let dir = tempdir().unwrap();
1554 let claude_home = dir.path();
1555
1556 std::fs::write(
1558 claude_home.join("stats-cache.json"),
1559 r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
1560 )
1561 .unwrap();
1562
1563 std::fs::create_dir_all(claude_home.join("projects")).unwrap();
1565
1566 let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
1567 let report = store.initial_load().await;
1568
1569 assert!(report.stats_loaded);
1570 let stats = store.stats().unwrap();
1571 assert_eq!(stats.total_tokens(), 1000);
1572 assert_eq!(stats.session_count(), 5);
1573 }
1574
1575 #[tokio::test]
1576 async fn test_event_bus_subscription() {
1577 let dir = tempdir().unwrap();
1578 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1579
1580 let mut rx = store.event_bus().subscribe();
1581
1582 store.event_bus().publish(DataEvent::StatsUpdated);
1584
1585 let event = rx.recv().await.unwrap();
1586 assert!(matches!(event, DataEvent::StatsUpdated));
1587 }
1588
1589 #[tokio::test]
1590 async fn test_analytics_cache_and_invalidation() {
1591 use crate::models::session::SessionMetadata;
1592 use chrono::Utc;
1593
1594 let dir = tempdir().unwrap();
1595 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1596
1597 let now = Utc::now();
1599 for i in 0..10 {
1600 let total_tokens = 1000 * (i as u64 + 1);
1601 let session = SessionMetadata {
1602 id: format!("test-{}", i).into(),
1603 file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
1604 project_path: "/test".into(),
1605 first_timestamp: Some(now - chrono::Duration::days(i)),
1606 last_timestamp: Some(now),
1607 message_count: 10,
1608 total_tokens,
1609 input_tokens: total_tokens / 2,
1610 output_tokens: total_tokens / 3,
1611 cache_creation_tokens: total_tokens / 10,
1612 cache_read_tokens: total_tokens
1613 - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
1614 models_used: vec!["sonnet".to_string()],
1615 model_segments: Vec::new(),
1616 file_size_bytes: 1024,
1617 first_user_message: None,
1618 has_subagents: false,
1619 parent_session_id: None,
1620 duration_seconds: Some(1800),
1621 branch: None,
1622 tool_usage: std::collections::HashMap::new(),
1623 tool_token_usage: std::collections::HashMap::new(),
1624 source_tool: Default::default(),
1625 lines_added: 0,
1626 lines_removed: 0,
1627 };
1628 store.sessions.insert(session.id.clone(), Arc::new(session));
1629 }
1630
1631 assert!(store.analytics().is_none());
1633
1634 store.compute_analytics(Period::last_7d()).await;
1636
1637 let analytics1 = store.analytics().expect("Analytics should be cached");
1639 assert!(!analytics1.trends.is_empty());
1640 assert_eq!(analytics1.period, Period::last_7d());
1641
1642 store.invalidate_analytics_cache();
1644 assert!(store.analytics().is_none(), "Cache should be invalidated");
1645
1646 store.compute_analytics(Period::last_30d()).await;
1648 let analytics2 = store.analytics().expect("Analytics should be re-cached");
1649 assert_eq!(analytics2.period, Period::last_30d());
1650 }
1651
1652 #[tokio::test]
1653 async fn test_leaderboard_methods() {
1654 use crate::models::session::SessionMetadata;
1655 use chrono::Utc;
1656
1657 let dir = tempdir().unwrap();
1658 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1659
1660 let now = Utc::now();
1661
1662 let test_data = vec![
1664 ("session-1", 5000u64, "opus", 0),
1665 ("session-2", 3000u64, "sonnet", 1),
1666 ("session-3", 8000u64, "haiku", 0),
1667 ("session-4", 2000u64, "sonnet", 2),
1668 ("session-5", 10000u64, "opus", 0),
1669 ];
1670
1671 for (id, tokens, model, days_ago) in test_data {
1672 let session = SessionMetadata {
1673 id: id.into(),
1674 file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1675 project_path: "/test".into(),
1676 first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1677 last_timestamp: Some(now),
1678 message_count: 10,
1679 total_tokens: tokens,
1680 input_tokens: tokens / 2,
1681 output_tokens: tokens / 2,
1682 cache_creation_tokens: 0,
1683 cache_read_tokens: 0,
1684 models_used: vec![model.to_string()],
1685 model_segments: Vec::new(),
1686 file_size_bytes: 1024,
1687 first_user_message: None,
1688 has_subagents: false,
1689 parent_session_id: None,
1690 duration_seconds: Some(1800),
1691 branch: None,
1692 tool_usage: std::collections::HashMap::new(),
1693 tool_token_usage: std::collections::HashMap::new(),
1694 source_tool: Default::default(),
1695 lines_added: 0,
1696 lines_removed: 0,
1697 };
1698 store.sessions.insert(session.id.clone(), Arc::new(session));
1699 }
1700
1701 let top_sessions = store.top_sessions_by_tokens(3);
1703 assert_eq!(top_sessions.len(), 3);
1704 assert_eq!(top_sessions[0].id, "session-5"); assert_eq!(top_sessions[1].id, "session-3"); assert_eq!(top_sessions[2].id, "session-1"); let top_models = store.top_models_by_tokens();
1710 assert!(!top_models.is_empty());
1711 assert_eq!(top_models[0].0, "opus");
1713 assert_eq!(top_models[0].1, 15000);
1714 assert_eq!(top_models[1].0, "haiku");
1715 assert_eq!(top_models[1].1, 8000);
1716
1717 let top_days = store.top_days_by_tokens();
1719 assert!(!top_days.is_empty());
1720 let today = now.format("%Y-%m-%d").to_string();
1722 assert_eq!(top_days[0].0, today);
1723 assert_eq!(top_days[0].1, 23000);
1724 }
1725
1726 #[test]
1732 fn test_all_violations_dashmap_priority_over_sqlite() {
1733 use crate::models::activity::{ActivitySummary, Alert, AlertCategory, AlertSeverity};
1734 use chrono::Utc;
1735
1736 let dir = tempdir().unwrap();
1737 let claude_home = dir.path().to_path_buf();
1738 let store = DataStore::with_defaults(claude_home.clone(), None);
1739
1740 let now = Utc::now();
1741
1742 let cache = MetadataCache::new(&claude_home.join("cache"))
1744 .expect("MetadataCache should open in tempdir");
1745
1746 let sqlite_summary = ActivitySummary {
1748 alerts: vec![Alert {
1749 session_id: "shared-session".to_string(),
1750 timestamp: now,
1751 severity: AlertSeverity::Warning,
1752 category: AlertCategory::DestructiveCommand,
1753 detail: "sqlite-version".to_string(),
1754 }],
1755 ..Default::default()
1756 };
1757 cache
1758 .put_activity(
1759 std::path::Path::new("/projects/test/shared-session.jsonl"),
1760 "shared-session",
1761 &sqlite_summary,
1762 std::time::SystemTime::now(),
1763 )
1764 .expect("put_activity should succeed");
1765
1766 let sqlite_only_summary = ActivitySummary {
1768 alerts: vec![Alert {
1769 session_id: "sqlite-only-session".to_string(),
1770 timestamp: now,
1771 severity: AlertSeverity::Info,
1772 category: AlertCategory::ExternalExfil,
1773 detail: "sqlite-only-detail".to_string(),
1774 }],
1775 ..Default::default()
1776 };
1777 cache
1778 .put_activity(
1779 std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1780 "sqlite-only-session",
1781 &sqlite_only_summary,
1782 std::time::SystemTime::now(),
1783 )
1784 .expect("put_activity should succeed");
1785
1786 let store_cache = store
1793 .metadata_cache
1794 .as_ref()
1795 .expect("MetadataCache should be present in store");
1796
1797 store_cache
1798 .put_activity(
1799 std::path::Path::new("/projects/test/shared-session.jsonl"),
1800 "shared-session",
1801 &sqlite_summary,
1802 std::time::SystemTime::now(),
1803 )
1804 .expect("put_activity via store cache should succeed");
1805 store_cache
1806 .put_activity(
1807 std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1808 "sqlite-only-session",
1809 &sqlite_only_summary,
1810 std::time::SystemTime::now(),
1811 )
1812 .expect("put_activity via store cache should succeed");
1813
1814 let dashmap_summary = ActivitySummary {
1816 alerts: vec![Alert {
1817 session_id: "shared-session".to_string(),
1818 timestamp: now,
1819 severity: AlertSeverity::Critical,
1820 category: AlertCategory::ForcePush,
1821 detail: "dashmap-version".to_string(),
1822 }],
1823 ..Default::default()
1824 };
1825 store
1826 .activity_results
1827 .insert("shared-session".to_string(), dashmap_summary);
1828
1829 let violations = store.all_violations();
1831
1832 let dashmap_hit = violations.iter().find(|a| a.session_id == "shared-session");
1834 assert!(
1835 dashmap_hit.is_some(),
1836 "shared-session alert must appear in violations"
1837 );
1838 assert_eq!(
1839 dashmap_hit.unwrap().detail,
1840 "dashmap-version",
1841 "DashMap version must take priority over SQLite for shared session"
1842 );
1843 assert_eq!(
1844 dashmap_hit.unwrap().severity,
1845 AlertSeverity::Critical,
1846 "DashMap severity (Critical) must win over SQLite (Warning)"
1847 );
1848
1849 let sqlite_hit = violations
1851 .iter()
1852 .find(|a| a.session_id == "sqlite-only-session");
1853 assert!(
1854 sqlite_hit.is_some(),
1855 "sqlite-only-session must appear in violations (no DashMap entry for it)"
1856 );
1857 assert_eq!(sqlite_hit.unwrap().detail, "sqlite-only-detail");
1858
1859 let sqlite_dup = violations
1861 .iter()
1862 .filter(|a| a.session_id == "shared-session")
1863 .count();
1864 assert_eq!(
1865 sqlite_dup, 1,
1866 "shared-session must appear exactly once (DashMap wins, no SQLite duplicate)"
1867 );
1868
1869 assert_eq!(violations[0].severity, AlertSeverity::Critical);
1871 }
1872}