Skip to main content

ccboard_core/
store.rs

1//! Data store with DashMap + parking_lot::RwLock
2//!
3//! Uses DashMap for sessions (per-entry locking) and parking_lot::RwLock
4//! for stats/settings (better fairness than std::sync::RwLock).
5
6use crate::analytics::{AnalyticsData, Period};
7use crate::cache::{MetadataCache, StoredAlert};
8use crate::error::{CoreError, DegradedState, LoadReport};
9use crate::event::{ConfigScope, DataEvent, EventBus};
10use crate::models::activity::ActivitySummary;
11use crate::models::{
12    BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
13};
14use crate::parsers::{
15    classify_tool_calls, parse_claude_global, parse_tool_calls, ClaudeGlobalStats,
16    InvocationParser, McpConfig, Rules, SessionContentParser, SessionIndexParser, SettingsParser,
17    StatsParser,
18};
19use dashmap::DashMap;
20use moka::future::Cache;
21use parking_lot::RwLock; // parking_lot > std::sync::RwLock: smaller (40B vs 72B), no poisoning, better fairness
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::time::Duration;
25use tracing::{debug, info, warn};
26
27/// Configuration for the data store
28#[derive(Debug, Clone)]
29pub struct DataStoreConfig {
30    /// Maximum session metadata entries to keep
31    pub max_session_metadata_count: usize,
32
33    /// Maximum size for session content cache in MB
34    pub max_session_content_cache_mb: usize,
35
36    /// Maximum concurrent session scans
37    pub max_concurrent_scans: usize,
38
39    /// Stats parser retry count
40    pub stats_retry_count: u32,
41
42    /// Stats parser retry delay
43    pub stats_retry_delay: Duration,
44}
45
46impl Default for DataStoreConfig {
47    fn default() -> Self {
48        Self {
49            max_session_metadata_count: 10_000,
50            max_session_content_cache_mb: 100,
51            max_concurrent_scans: 8,
52            stats_retry_count: 3,
53            stats_retry_delay: Duration::from_millis(100),
54        }
55    }
56}
57
58/// Central data store for ccboard
59///
60/// Thread-safe access to all Claude Code data.
61/// Uses DashMap for sessions (high contention) and RwLock for stats/settings (low contention).
62pub struct DataStore {
63    /// Path to Claude home directory
64    claude_home: PathBuf,
65
66    /// Current project path (if focused)
67    project_path: Option<PathBuf>,
68
69    /// Configuration
70    config: DataStoreConfig,
71
72    /// Stats cache (low contention, frequent reads)
73    stats: RwLock<Option<StatsCache>>,
74
75    /// Merged settings
76    settings: RwLock<MergedConfig>,
77
78    /// MCP server configuration
79    mcp_config: RwLock<Option<McpConfig>>,
80
81    /// Rules from CLAUDE.md files
82    rules: RwLock<Rules>,
83
84    /// Invocation statistics (agents, commands, skills)
85    invocation_stats: RwLock<InvocationStats>,
86
87    /// Billing blocks (5h usage tracking)
88    billing_blocks: RwLock<BillingBlockManager>,
89
90    /// Analytics data cache (invalidated on stats/sessions update)
91    analytics_cache: RwLock<Option<AnalyticsData>>,
92
93    /// Session metadata (high contention with many entries)
94    /// Arc<SessionMetadata> for cheap cloning (8 bytes vs ~400 bytes)
95    ///
96    /// Why Arc over Box: Multi-thread access from TUI + Web frontends
97    /// justifies atomic refcount overhead (~4 bytes). Box would require
98    /// cloning entire struct on each frontend access.
99    sessions: DashMap<SessionId, Arc<SessionMetadata>>,
100
101    /// Session content cache (LRU for on-demand loading)
102    #[allow(dead_code)]
103    session_content_cache: Cache<SessionId, Vec<String>>,
104
105    /// Event bus for notifying subscribers
106    event_bus: EventBus,
107
108    /// Current degraded state
109    degraded_state: RwLock<DegradedState>,
110
111    /// Metadata cache for 90% startup speedup (optional)
112    metadata_cache: Option<Arc<MetadataCache>>,
113
114    /// In-memory activity analysis results (populated by analyze_session)
115    activity_results: DashMap<String, ActivitySummary>,
116
117    /// Hook-based live session state (loaded from ~/.ccboard/live-sessions.json)
118    live_hook_sessions: RwLock<crate::hook_state::LiveSessionFile>,
119
120    /// Per-project last session stats from ~/.claude.json
121    claude_global_stats: RwLock<Option<ClaudeGlobalStats>>,
122}
123
124/// Project leaderboard entry with aggregated metrics
125#[derive(Debug, Clone)]
126pub struct ProjectLeaderboardEntry {
127    pub project_name: String,
128    pub total_sessions: usize,
129    pub total_tokens: u64,
130    pub total_cost: f64,
131    pub avg_session_cost: f64,
132}
133
134impl DataStore {
135    /// Create a new data store
136    pub fn new(
137        claude_home: PathBuf,
138        project_path: Option<PathBuf>,
139        config: DataStoreConfig,
140    ) -> Self {
141        let session_content_cache = Cache::builder()
142            .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) // Rough estimate
143            .time_to_idle(Duration::from_secs(300)) // 5 min idle expiry
144            .build();
145
146        // Create metadata cache in ~/.claude/cache/
147        let metadata_cache = {
148            let cache_dir = claude_home.join("cache");
149            match MetadataCache::new(&cache_dir) {
150                Ok(cache) => {
151                    debug!(path = %cache_dir.display(), "Metadata cache enabled");
152                    Some(Arc::new(cache))
153                }
154                Err(e) => {
155                    warn!(error = %e, "Failed to create metadata cache, running without cache");
156                    None
157                }
158            }
159        };
160
161        Self {
162            claude_home,
163            project_path,
164            config,
165            stats: RwLock::new(None),
166            settings: RwLock::new(MergedConfig::default()),
167            mcp_config: RwLock::new(None),
168            rules: RwLock::new(Rules::default()),
169            invocation_stats: RwLock::new(InvocationStats::new()),
170            billing_blocks: RwLock::new(BillingBlockManager::new()),
171            analytics_cache: RwLock::new(None),
172            sessions: DashMap::new(),
173            session_content_cache,
174            event_bus: EventBus::default_capacity(),
175            degraded_state: RwLock::new(DegradedState::Healthy),
176            metadata_cache,
177            activity_results: DashMap::new(),
178            live_hook_sessions: RwLock::new(crate::hook_state::LiveSessionFile::default()),
179            claude_global_stats: RwLock::new(None),
180        }
181    }
182
183    /// Create with default configuration
184    pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
185        Self::new(claude_home, project_path, DataStoreConfig::default())
186    }
187
188    /// Get the event bus for subscribing to updates
189    pub fn event_bus(&self) -> &EventBus {
190        &self.event_bus
191    }
192
193    /// Get current degraded state
194    pub fn degraded_state(&self) -> DegradedState {
195        self.degraded_state.read().clone()
196    }
197
198    /// Initial load of all data with LoadReport for graceful degradation
199    pub async fn initial_load(&self) -> LoadReport {
200        let mut report = LoadReport::new();
201
202        info!(claude_home = %self.claude_home.display(), "Starting initial data load");
203
204        // Load stats
205        self.load_stats(&mut report).await;
206
207        // Load ~/.claude.json global stats (per-project last session costs)
208        if let Some(home) = dirs::home_dir() {
209            if let Some(global) = parse_claude_global(&home) {
210                *self.claude_global_stats.write() = Some(global);
211                debug!("~/.claude.json loaded successfully");
212            }
213        }
214
215        // Load settings
216        self.load_settings(&mut report).await;
217
218        // Load MCP configuration
219        self.load_mcp_config(&mut report).await;
220
221        // Load rules
222        self.load_rules(&mut report).await;
223
224        // Scan sessions
225        self.scan_sessions(&mut report).await;
226
227        // Determine degraded state
228        self.update_degraded_state(&report);
229
230        // Notify subscribers
231        self.event_bus.publish(DataEvent::LoadCompleted);
232
233        info!(
234            stats_loaded = report.stats_loaded,
235            settings_loaded = report.settings_loaded,
236            sessions_scanned = report.sessions_scanned,
237            sessions_failed = report.sessions_failed,
238            errors = report.errors.len(),
239            "Initial load complete"
240        );
241
242        report
243    }
244
245    /// Load stats cache
246    async fn load_stats(&self, report: &mut LoadReport) {
247        let stats_path = self.claude_home.join("stats-cache.json");
248        let parser = StatsParser::new()
249            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
250
251        if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
252            // Recalculate costs using accurate pricing
253            stats.recalculate_costs();
254            let mut guard = self.stats.write();
255            *guard = Some(stats);
256            debug!("Stats loaded successfully with recalculated costs");
257        }
258    }
259
260    /// Load and merge settings
261    async fn load_settings(&self, report: &mut LoadReport) {
262        let parser = SettingsParser::new();
263        let merged = parser
264            .load_merged(&self.claude_home, self.project_path.as_deref(), report)
265            .await;
266
267        let mut guard = self.settings.write();
268        *guard = merged;
269        debug!("Settings loaded and merged");
270    }
271
272    /// Load MCP server configuration (global + project-level)
273    async fn load_mcp_config(&self, report: &mut LoadReport) {
274        match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
275            Ok(Some(config)) => {
276                let server_count = config.servers.len();
277                let mut guard = self.mcp_config.write();
278                *guard = Some(config);
279                debug!(
280                    server_count,
281                    "MCP config loaded successfully (global + project)"
282                );
283            }
284            Ok(None) => {
285                debug!("No MCP config found (optional)");
286            }
287            Err(e) => {
288                use crate::error::LoadError;
289                report.add_error(LoadError::error(
290                    "mcp_config",
291                    format!("Failed to parse MCP config: {}", e),
292                ));
293            }
294        }
295    }
296
297    /// Load rules from CLAUDE.md files
298    async fn load_rules(&self, report: &mut LoadReport) {
299        match Rules::load(&self.claude_home, self.project_path.as_deref()) {
300            Ok(rules) => {
301                let has_global = rules.global.is_some();
302                let has_project = rules.project.is_some();
303                let mut guard = self.rules.write();
304                *guard = rules;
305                debug!(has_global, has_project, "Rules loaded");
306            }
307            Err(e) => {
308                use crate::error::LoadError;
309                report.add_error(LoadError::error(
310                    "rules",
311                    format!("Failed to load rules: {}", e),
312                ));
313            }
314        }
315    }
316
317    /// Scan all sessions
318    async fn scan_sessions(&self, report: &mut LoadReport) {
319        let projects_dir = self.claude_home.join("projects");
320
321        if !projects_dir.exists() {
322            report.add_warning(
323                "sessions",
324                format!("Projects directory not found: {}", projects_dir.display()),
325            );
326            return;
327        }
328
329        let mut parser =
330            SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
331
332        // Enable metadata cache if available (90% speedup)
333        if let Some(ref cache) = self.metadata_cache {
334            parser = parser.with_cache(cache.clone());
335        }
336
337        let sessions = parser.scan_all(&projects_dir, report).await;
338
339        // Enforce max count limit
340        let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
341            warn!(
342                total = sessions.len(),
343                limit = self.config.max_session_metadata_count,
344                "Session count exceeds limit, keeping most recent"
345            );
346
347            let mut sorted = sessions;
348            sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
349            sorted.truncate(self.config.max_session_metadata_count);
350            sorted
351        } else {
352            sessions
353        };
354
355        // Insert into DashMap (wrap in Arc for cheap cloning)
356        for session in sessions_to_add {
357            self.sessions.insert(session.id.clone(), Arc::new(session));
358        }
359
360        debug!(count = self.sessions.len(), "Sessions indexed");
361    }
362
363    /// Update degraded state based on load report
364    fn update_degraded_state(&self, report: &LoadReport) {
365        let mut state = self.degraded_state.write();
366
367        if report.has_fatal_errors() {
368            *state = DegradedState::ReadOnly {
369                reason: "Fatal errors during load".to_string(),
370            };
371            return;
372        }
373
374        let mut missing = Vec::new();
375
376        if !report.stats_loaded {
377            missing.push("stats".to_string());
378        }
379        if !report.settings_loaded {
380            missing.push("settings".to_string());
381        }
382        if report.sessions_failed > 0 {
383            missing.push(format!("{} sessions", report.sessions_failed));
384        }
385
386        if missing.is_empty() {
387            *state = DegradedState::Healthy;
388        } else {
389            *state = DegradedState::PartialData {
390                missing: missing.clone(),
391                reason: format!("Missing: {}", missing.join(", ")),
392            };
393        }
394    }
395
396    // ===================
397    // Read accessors
398    // ===================
399
400    /// Get a clone of stats
401    pub fn stats(&self) -> Option<StatsCache> {
402        self.stats.read().clone()
403    }
404
405    /// Calculate context window saturation from current sessions
406    pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
407        // Clone Arc (cheap) to avoid lifetime issues with DashMap iterators
408        let sessions: Vec<_> = self
409            .sessions
410            .iter()
411            .map(|entry| Arc::clone(entry.value()))
412            .collect();
413        // Dereference Arc to get &SessionMetadata
414        let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
415        crate::models::StatsCache::calculate_context_saturation(&refs, 30)
416    }
417
418    /// Get merged settings
419    pub fn settings(&self) -> MergedConfig {
420        self.settings.read().clone()
421    }
422
423    /// Get MCP server configuration
424    pub fn mcp_config(&self) -> Option<McpConfig> {
425        self.mcp_config.read().clone()
426    }
427
428    /// Get rules
429    pub fn rules(&self) -> Rules {
430        self.rules.read().clone()
431    }
432
433    /// Get invocation statistics
434    pub fn invocation_stats(&self) -> InvocationStats {
435        self.invocation_stats.read().clone()
436    }
437
438    /// Calculate current quota status from stats and budget config
439    ///
440    /// Returns None if stats are not loaded or budget is not configured.
441    pub fn quota_status(&self) -> Option<crate::quota::QuotaStatus> {
442        let stats = self.stats.read().clone()?;
443        let settings = self.settings.read();
444        let budget = settings.merged.budget.as_ref()?;
445
446        Some(crate::quota::calculate_quota_status(&stats, budget))
447    }
448
449    /// Get live Claude Code sessions (running processes, ps-based)
450    ///
451    /// Detects active Claude processes on the system and returns metadata.
452    /// Returns empty vector if detection fails or no processes are running.
453    pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
454        crate::live_monitor::detect_live_sessions().unwrap_or_default()
455    }
456
457    /// Get merged live sessions: hook data + ps-based fallback
458    ///
459    /// Hook sessions are prioritized; unmatched ps sessions appear as ProcessOnly.
460    pub fn merged_live_sessions(&self) -> Vec<crate::live_monitor::MergedLiveSession> {
461        let hook_file = self.live_hook_sessions.read().clone();
462        let ps_sessions = crate::live_monitor::detect_live_sessions().unwrap_or_default();
463        crate::live_monitor::merge_live_sessions(&hook_file, &ps_sessions)
464    }
465
466    /// Reload hook-based live session state from a file path
467    pub async fn reload_live_hook_sessions(&self, path: &std::path::Path) {
468        match crate::hook_state::LiveSessionFile::load(path) {
469            Ok(file) => {
470                *self.live_hook_sessions.write() = file;
471                debug!("Reloaded live hook sessions from {}", path.display());
472            }
473            Err(e) => {
474                warn!(error = %e, "Failed to reload live-sessions.json");
475            }
476        }
477    }
478
479    /// Get per-project last session stats from ~/.claude.json
480    pub fn claude_global_stats(&self) -> Option<ClaudeGlobalStats> {
481        self.claude_global_stats.read().clone()
482    }
483
484    /// Get session count
485    pub fn session_count(&self) -> usize {
486        self.sessions.len()
487    }
488
489    /// Get session by ID
490    /// Returns Arc<SessionMetadata> for cheap cloning
491    pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
492        self.sessions.get(id).map(|r| Arc::clone(r.value()))
493    }
494
495    /// Load full session content with lazy caching
496    ///
497    /// Returns conversation messages parsed from session JSONL file.
498    /// Uses Moka cache (LRU with 5min TTL) for repeated access.
499    ///
500    /// # Performance
501    /// - First call: Parse JSONL (~50-500ms for 1000-message session)
502    /// - Cached calls: <1ms (memory lookup)
503    /// - Cache eviction: LRU + 5min idle timeout
504    ///
505    /// # Errors
506    /// Returns CoreError if session not found or file cannot be read.
507    pub async fn load_session_content(
508        &self,
509        session_id: &str,
510    ) -> Result<Vec<crate::models::ConversationMessage>, CoreError> {
511        // Get session metadata
512        let metadata = self
513            .get_session(session_id)
514            .ok_or_else(|| CoreError::SessionNotFound {
515                session_id: session_id.to_string(),
516            })?;
517
518        // Try cache first (Moka handles concurrency internally)
519        let session_id_owned = SessionId::from(session_id);
520        if let Some(_cached) = self.session_content_cache.get(&session_id_owned).await {
521            debug!(session_id, "Session content cache HIT");
522            // TODO: Cache design decision - caching Vec<String> vs Vec<ConversationMessage>
523            // For now, always parse from file (will be optimized in cache phase)
524        }
525
526        // Cache miss: parse from file
527        debug!(
528            session_id,
529            path = %metadata.file_path.display(),
530            "Session content cache MISS, parsing JSONL"
531        );
532
533        let messages = SessionContentParser::parse_conversation(
534            &metadata.file_path,
535            (*metadata).clone(), // Clone metadata out of Arc
536        )
537        .await?;
538
539        // Note: Cache insertion skipped for now (caching Vec<String> vs Vec<ConversationMessage> design decision)
540        // Will be added in cache optimization phase
541
542        Ok(messages)
543    }
544
545    /// Get analytics data for a period (cached)
546    ///
547    /// Returns cached analytics if available, otherwise None.
548    /// Call `compute_analytics()` to compute and cache.
549    pub fn analytics(&self) -> Option<AnalyticsData> {
550        let analytics = self.analytics_cache.read().clone();
551        debug!(
552            has_analytics = analytics.is_some(),
553            "analytics() getter called"
554        );
555        analytics
556    }
557
558    /// Compute and cache analytics data for a period
559    ///
560    /// This is a CPU-intensive operation (trends, forecasting, patterns).
561    /// For 1000+ sessions, this may take 100-300ms, so it's offloaded
562    /// to a blocking task.
563    ///
564    /// Cache is invalidated on stats reload or session updates (EventBus pattern).
565    pub async fn compute_analytics(&self, period: Period) {
566        let sessions: Vec<_> = self
567            .sessions
568            .iter()
569            .map(|r| Arc::clone(r.value()))
570            .collect();
571
572        info!(
573            session_count = sessions.len(),
574            period = ?period,
575            "compute_analytics() ENTRY"
576        );
577
578        // Offload to blocking task for CPU-intensive computation
579        let analytics =
580            tokio::task::spawn_blocking(move || AnalyticsData::compute(&sessions, period)).await;
581
582        match analytics {
583            Ok(data) => {
584                info!(
585                    insights_count = data.insights.len(),
586                    "compute_analytics() computed data"
587                );
588                let mut guard = self.analytics_cache.write();
589                *guard = Some(data);
590                self.event_bus.publish(DataEvent::AnalyticsUpdated);
591                info!("compute_analytics() EXIT - cached and event published");
592            }
593            Err(e) => {
594                warn!(error = %e, "Failed to compute analytics (task panicked)");
595            }
596        }
597    }
598
599    /// Invalidate analytics cache (called on data changes)
600    ///
601    /// Note: Currently unused to prevent aggressive invalidation.
602    /// Kept for future use if smart invalidation is needed.
603    #[allow(dead_code)]
604    fn invalidate_analytics_cache(&self) {
605        let mut guard = self.analytics_cache.write();
606        *guard = None;
607        debug!("Analytics cache invalidated");
608    }
609
610    /// Get all session IDs
611    pub fn session_ids(&self) -> Vec<SessionId> {
612        self.sessions.iter().map(|r| r.key().clone()).collect()
613    }
614
615    /// Clear session content cache (for memory optimization on F5)
616    pub fn clear_session_content_cache(&self) {
617        self.session_content_cache.invalidate_all();
618        debug!("Session content cache cleared");
619    }
620
621    /// Get sessions grouped by project
622    /// Returns Arc<SessionMetadata> for cheap cloning
623    pub fn sessions_by_project(
624        &self,
625    ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
626        let mut by_project = std::collections::HashMap::new();
627
628        for entry in self.sessions.iter() {
629            let session = Arc::clone(entry.value());
630            by_project
631                .entry(session.project_path.as_str().to_string())
632                .or_insert_with(Vec::new)
633                .push(session);
634        }
635
636        // Sort sessions within each project by timestamp (newest first)
637        for sessions in by_project.values_mut() {
638            sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
639        }
640
641        by_project
642    }
643
644    /// Get all sessions (unsorted)
645    /// Returns Arc<SessionMetadata> for cheap cloning
646    pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
647        self.sessions
648            .iter()
649            .map(|r| Arc::clone(r.value()))
650            .collect()
651    }
652
653    /// Get recent sessions (sorted by last timestamp, newest first)
654    /// Returns Arc<SessionMetadata> for cheap cloning
655    pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
656        let mut sessions = self.all_sessions();
657        sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
658        sessions.truncate(limit);
659        sessions
660    }
661
662    /// Search sessions using FTS5 full-text search.
663    ///
664    /// Returns relevance-ranked results. Returns empty vec if FTS5 not initialized.
665    pub fn search_sessions(&self, query: &str, limit: usize) -> Vec<crate::cache::SearchResult> {
666        if let Some(ref cache) = self.metadata_cache {
667            match cache.search_sessions(query, limit) {
668                Ok(results) => results,
669                Err(e) => {
670                    warn!("FTS5 search failed: {}", e);
671                    Vec::new()
672                }
673            }
674        } else {
675            Vec::new()
676        }
677    }
678
679    /// Analyze a session's tool calls and generate activity summary + alerts.
680    ///
681    /// Results are stored in the in-memory DashMap and the SQLite cache.
682    /// Publishes DataEvent::AnalyticsUpdated on completion so the TUI re-renders.
683    pub async fn analyze_session(&self, session_id: &str) -> anyhow::Result<ActivitySummary> {
684        use std::time::SystemTime;
685
686        let metadata = self
687            .get_session(session_id)
688            .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
689
690        let path = &metadata.file_path;
691
692        // Read mtime once — used for both cache check and cache write to avoid TOCTOU.
693        // Use tokio::fs to avoid blocking the executor thread.
694        let mtime = tokio::fs::metadata(path)
695            .await
696            .and_then(|m| m.modified())
697            .unwrap_or(SystemTime::UNIX_EPOCH);
698
699        // Check SQLite cache first (avoids re-parsing unchanged files)
700        if let Some(cache) = &self.metadata_cache {
701            if let Ok(Some(cached)) = cache.get_activity(path, mtime) {
702                self.activity_results
703                    .insert(session_id.to_string(), cached.clone());
704                self.event_bus.publish(DataEvent::AnalyticsUpdated);
705                return Ok(cached);
706            }
707        }
708
709        // Cache miss: parse JSONL
710        let calls = parse_tool_calls(path, session_id).await?;
711
712        let project_root = path
713            .parent()
714            .and_then(|p| p.parent())
715            .map(|p| p.to_string_lossy().into_owned());
716
717        let summary = classify_tool_calls(calls, session_id, project_root.as_deref());
718
719        // Persist to SQLite cache — same mtime as used for cache check (no TOCTOU)
720        if let Some(cache) = &self.metadata_cache {
721            if let Err(e) = cache.put_activity(path, session_id, &summary, mtime) {
722                warn!(session_id, error = %e, "Failed to cache activity — will re-parse on restart");
723            }
724        }
725
726        // Store in memory + notify TUI
727        self.activity_results
728            .insert(session_id.to_string(), summary.clone());
729        self.event_bus.publish(DataEvent::AnalyticsUpdated);
730
731        Ok(summary)
732    }
733
734    /// Get the cached activity summary for a session (returns None if not yet analyzed).
735    pub fn get_session_activity(&self, session_id: &str) -> Option<ActivitySummary> {
736        self.activity_results
737            .get(session_id)
738            .map(|r| r.value().clone())
739    }
740
741    /// Get all stored security alerts from the SQLite cache.
742    ///
743    /// `min_severity`: optional filter — "Warning" or "Critical"
744    pub fn get_all_stored_alerts(&self, min_severity: Option<&str>) -> Vec<StoredAlert> {
745        if let Some(cache) = &self.metadata_cache {
746            cache.get_all_alerts(min_severity).unwrap_or_default()
747        } else {
748            vec![]
749        }
750    }
751
752    /// Consolidated violations feed: merges in-memory DashMap results (freshest) with
753    /// SQLite-persisted alerts. DashMap takes priority for sessions analyzed this run.
754    ///
755    /// Returns alerts sorted Critical → Warning → Info, then by timestamp descending.
756    pub fn all_violations(&self) -> Vec<crate::models::activity::Alert> {
757        use crate::models::activity::{Alert, AlertSeverity};
758        use std::collections::HashSet;
759
760        // Collect session_ids already covered by the DashMap (in-memory, freshest data)
761        let mut seen_sessions: HashSet<String> = HashSet::new();
762        let mut alerts: Vec<Alert> = Vec::new();
763
764        for entry in self.activity_results.iter() {
765            seen_sessions.insert(entry.key().clone());
766            alerts.extend(entry.value().alerts.clone());
767        }
768
769        // Supplement with SQLite alerts for sessions NOT in the DashMap
770        if let Some(cache) = &self.metadata_cache {
771            if let Ok(stored) = cache.get_all_alerts(None) {
772                for sa in stored {
773                    // Derive session_id from session_path (filename without extension)
774                    let session_id = std::path::Path::new(&sa.session_path)
775                        .file_stem()
776                        .and_then(|s| s.to_str())
777                        .unwrap_or(&sa.session_path)
778                        .to_string();
779
780                    if seen_sessions.contains(&session_id) {
781                        continue; // DashMap version is fresher, skip SQLite duplicate
782                    }
783
784                    // Parse severity and category from stored strings
785                    let severity = match sa.severity.as_str() {
786                        "Critical" => AlertSeverity::Critical,
787                        "Warning" => AlertSeverity::Warning,
788                        _ => AlertSeverity::Info,
789                    };
790                    let category = match sa.category.as_str() {
791                        "CredentialAccess" => {
792                            crate::models::activity::AlertCategory::CredentialAccess
793                        }
794                        "DestructiveCommand" => {
795                            crate::models::activity::AlertCategory::DestructiveCommand
796                        }
797                        "ForcePush" => crate::models::activity::AlertCategory::ForcePush,
798                        "ScopeViolation" => crate::models::activity::AlertCategory::ScopeViolation,
799                        _ => crate::models::activity::AlertCategory::ExternalExfil,
800                    };
801                    let timestamp = sa
802                        .timestamp
803                        .parse::<chrono::DateTime<chrono::Utc>>()
804                        .unwrap_or_else(|_| chrono::Utc::now());
805
806                    alerts.push(Alert {
807                        session_id,
808                        timestamp,
809                        severity,
810                        category,
811                        detail: sa.detail,
812                    });
813                }
814            }
815        }
816
817        // Sort: Critical > Warning > Info, then newest first within same severity
818        alerts.sort_by(|a, b| {
819            b.severity
820                .partial_cmp(&a.severity)
821                .unwrap_or(std::cmp::Ordering::Equal)
822                .then_with(|| b.timestamp.cmp(&a.timestamp))
823        });
824
825        alerts
826    }
827
828    /// Get top sessions by total tokens (sorted descending)
829    /// Returns Arc<SessionMetadata> for cheap cloning
830    pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
831        let mut sessions: Vec<_> = self
832            .sessions
833            .iter()
834            .map(|r| Arc::clone(r.value()))
835            .collect();
836        sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
837        sessions.truncate(limit);
838        sessions
839    }
840
841    /// Get top models by total tokens (aggregated, sorted descending)
842    /// Returns (model_name, total_tokens) pairs
843    pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
844        let mut model_totals = std::collections::HashMap::new();
845
846        // Aggregate tokens per model across all sessions
847        for session in self.sessions.iter() {
848            for model in &session.value().models_used {
849                *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
850            }
851        }
852
853        // Convert to vec and sort
854        let mut results: Vec<_> = model_totals.into_iter().collect();
855        results.sort_by(|a, b| b.1.cmp(&a.1));
856        results.truncate(10); // Top 10
857        results
858    }
859
860    /// Get top days by total tokens (aggregated, sorted descending)
861    /// Returns (date_string, total_tokens) pairs
862    pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
863        let mut day_totals = std::collections::HashMap::new();
864
865        // Aggregate tokens per day across all sessions
866        for session in self.sessions.iter() {
867            if let Some(timestamp) = &session.value().first_timestamp {
868                let date = timestamp.format("%Y-%m-%d").to_string();
869                *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
870            }
871        }
872
873        // Convert to vec and sort
874        let mut results: Vec<_> = day_totals.into_iter().collect();
875        results.sort_by(|a, b| b.1.cmp(&a.1));
876        results.truncate(10); // Top 10
877        results
878    }
879
880    /// Get project leaderboard with aggregated metrics
881    ///
882    /// Returns all projects with session count, total tokens, total cost, and average session cost.
883    /// Cost is calculated using accurate model-based pricing from the pricing module.
884    pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
885        let mut project_metrics = std::collections::HashMap::new();
886
887        // Aggregate metrics per project
888        for session in self.sessions.iter() {
889            let metadata = session.value();
890            let project_path = &metadata.project_path;
891
892            // Get model for this session (use first model, or "unknown")
893            let model = metadata
894                .models_used
895                .first()
896                .map(|s| s.as_str())
897                .unwrap_or("unknown");
898
899            // Calculate cost using accurate pricing
900            let cost = crate::pricing::calculate_cost(
901                model,
902                metadata.input_tokens,
903                metadata.output_tokens,
904                metadata.cache_creation_tokens,
905                metadata.cache_read_tokens,
906            );
907
908            let entry = project_metrics
909                .entry(project_path.clone())
910                .or_insert((0, 0u64, 0.0f64)); // (session_count, total_tokens, total_cost)
911
912            entry.0 += 1; // session count
913            entry.1 += metadata.total_tokens; // total tokens
914            entry.2 += cost; // total cost
915        }
916
917        // Convert to leaderboard entries
918        let mut results: Vec<_> = project_metrics
919            .into_iter()
920            .map(
921                |(project_path, (session_count, total_tokens, total_cost))| {
922                    let avg_session_cost = if session_count > 0 {
923                        total_cost / session_count as f64
924                    } else {
925                        0.0
926                    };
927
928                    // Extract project name from path (last component)
929                    let project_name = std::path::Path::new(project_path.as_str())
930                        .file_name()
931                        .and_then(|n| n.to_str())
932                        .unwrap_or(project_path.as_str())
933                        .to_string();
934
935                    ProjectLeaderboardEntry {
936                        project_name,
937                        total_sessions: session_count,
938                        total_tokens,
939                        total_cost,
940                        avg_session_cost,
941                    }
942                },
943            )
944            .collect();
945
946        // Default sort: by total cost descending
947        results.sort_by(|a, b| {
948            b.total_cost
949                .partial_cmp(&a.total_cost)
950                .unwrap_or(std::cmp::Ordering::Equal)
951        });
952
953        results
954    }
955
956    // ===================
957    // Update methods (called by watcher)
958    // ===================
959
960    /// Reload stats (called on file change)
961    pub async fn reload_stats(&self) {
962        let stats_path = self.claude_home.join("stats-cache.json");
963        let parser = StatsParser::new()
964            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
965
966        let mut report = LoadReport::new();
967        if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
968            // Recalculate costs using accurate pricing
969            stats.recalculate_costs();
970            let mut guard = self.stats.write();
971            *guard = Some(stats);
972
973            // Don't invalidate analytics - it will auto-recompute if needed
974            // Instead, just publish the event so UI can decide whether to recompute
975            self.event_bus.publish(DataEvent::StatsUpdated);
976            debug!("Stats reloaded with recalculated costs");
977        }
978    }
979
980    /// Reload settings from files (called when settings change)
981    pub async fn reload_settings(&self) {
982        let parser = SettingsParser::new();
983        let merged = parser
984            .load_merged(
985                &self.claude_home,
986                self.project_path.as_deref(),
987                &mut LoadReport::new(),
988            )
989            .await;
990
991        {
992            let mut guard = self.settings.write();
993            *guard = merged;
994        }
995
996        self.event_bus
997            .publish(DataEvent::ConfigChanged(ConfigScope::Global));
998        debug!("Settings reloaded");
999    }
1000
1001    /// Add or update a session (called when session file changes)
1002    pub async fn update_session(&self, path: &Path) {
1003        let parser = SessionIndexParser::new();
1004
1005        match parser.scan_session(path).await {
1006            Ok(meta) => {
1007                let id = meta.id.clone();
1008                let is_new = !self.sessions.contains_key(&id);
1009
1010                self.sessions.insert(id.clone(), Arc::new(meta));
1011
1012                // Don't invalidate analytics on every session update - too aggressive
1013                // Analytics will be recomputed on demand or periodically
1014                // Only invalidate on significant changes (detected by UI)
1015
1016                if is_new {
1017                    self.event_bus.publish(DataEvent::SessionCreated(id));
1018                } else {
1019                    self.event_bus.publish(DataEvent::SessionUpdated(id));
1020                }
1021            }
1022            Err(e) => {
1023                warn!(path = %path.display(), error = %e, "Failed to update session");
1024            }
1025        }
1026    }
1027
1028    /// Compute invocation statistics from all sessions
1029    ///
1030    /// This scans all session files to count agent/command/skill invocations.
1031    /// Should be called after initial load or when sessions are updated.
1032    pub async fn compute_invocations(&self) {
1033        let paths: Vec<_> = self
1034            .sessions
1035            .iter()
1036            .map(|r| r.value().file_path.clone())
1037            .collect();
1038
1039        debug!(session_count = paths.len(), "Computing invocation stats");
1040
1041        let parser = InvocationParser::new();
1042        let mut stats = parser.scan_sessions(&paths).await;
1043
1044        // Populate agent_token_stats from session tool_token_usage
1045        // The Task tool tokens serve as a proxy for agent token consumption
1046        for session_ref in self.sessions.iter() {
1047            let session = session_ref.value();
1048            if let Some(&task_tokens) = session.tool_token_usage.get("Task") {
1049                // Distribute Task tool tokens equally among agents spawned in this session
1050                let agent_count =
1051                    session.tool_usage.get("Task").copied().unwrap_or(0).max(1) as u64;
1052                let tokens_per_agent = task_tokens / agent_count;
1053                // Attribute to all agent types found in stats that were invoked
1054                for agent_type in stats.agents.keys().cloned().collect::<Vec<_>>() {
1055                    *stats.agent_token_stats.entry(agent_type).or_insert(0) += tokens_per_agent;
1056                }
1057            }
1058        }
1059
1060        let mut guard = self.invocation_stats.write();
1061        *guard = stats;
1062
1063        debug!(
1064            agents = guard.agents.len(),
1065            commands = guard.commands.len(),
1066            skills = guard.skills.len(),
1067            total = guard.total_invocations(),
1068            "Invocation stats computed"
1069        );
1070
1071        // Note: Using LoadCompleted as there's no specific invocation stats event
1072        self.event_bus.publish(DataEvent::LoadCompleted);
1073    }
1074
1075    /// Compute billing blocks from all sessions
1076    ///
1077    /// This scans all sessions with timestamps and aggregates usage into 5-hour billing blocks.
1078    /// Uses real model pricing based on token breakdown for accurate cost calculation.
1079    pub async fn compute_billing_blocks(&self) {
1080        debug!("Computing billing blocks from sessions with real pricing");
1081
1082        let mut manager = BillingBlockManager::new();
1083        let mut sessions_with_timestamps = 0;
1084        let mut sessions_without_timestamps = 0;
1085
1086        for session in self.sessions.iter() {
1087            let metadata = session.value();
1088
1089            // Skip sessions without timestamps
1090            let Some(timestamp) = &metadata.first_timestamp else {
1091                sessions_without_timestamps += 1;
1092                continue;
1093            };
1094
1095            sessions_with_timestamps += 1;
1096
1097            // Get model for this session (use first model, or "unknown")
1098            let model = metadata
1099                .models_used
1100                .first()
1101                .map(|s| s.as_str())
1102                .unwrap_or("unknown");
1103
1104            // Calculate real cost using pricing table
1105            let cost = crate::pricing::calculate_cost(
1106                model,
1107                metadata.input_tokens,
1108                metadata.output_tokens,
1109                metadata.cache_creation_tokens,
1110                metadata.cache_read_tokens,
1111            );
1112
1113            manager.add_usage(
1114                timestamp,
1115                metadata.input_tokens,
1116                metadata.output_tokens,
1117                metadata.cache_creation_tokens,
1118                metadata.cache_read_tokens,
1119                cost,
1120            );
1121        }
1122
1123        debug!(
1124            sessions_with_timestamps,
1125            sessions_without_timestamps,
1126            blocks = manager.get_all_blocks().len(),
1127            "Billing blocks computed with real pricing"
1128        );
1129
1130        let mut guard = self.billing_blocks.write();
1131        *guard = manager;
1132
1133        self.event_bus.publish(DataEvent::LoadCompleted);
1134    }
1135
1136    /// Get billing blocks (read-only access)
1137    pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
1138        self.billing_blocks.read()
1139    }
1140
1141    /// Calculate usage estimate based on billing blocks and subscription plan
1142    pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
1143        use crate::parsers::claude_global::DetectedPlan;
1144        use crate::usage_estimator::SubscriptionPlan;
1145
1146        let settings = self.settings();
1147
1148        // Priority 1: explicit override in settings.json (ccboard field)
1149        let plan = if let Some(s) = settings.merged.subscription_plan.as_ref() {
1150            SubscriptionPlan::parse(s)
1151        } else {
1152            // Priority 2: auto-detect from ~/.claude.json account fields
1153            let detected = self
1154                .claude_global_stats
1155                .read()
1156                .as_ref()
1157                .and_then(|g| g.detected_plan.clone());
1158
1159            match detected {
1160                Some(DetectedPlan::Pro) => SubscriptionPlan::Pro,
1161                Some(DetectedPlan::Max) => SubscriptionPlan::Max5x, // can't distinguish 5x vs 20x
1162                Some(DetectedPlan::Api) => SubscriptionPlan::Api,
1163                None => SubscriptionPlan::Unknown,
1164            }
1165        };
1166
1167        let billing_blocks = self.billing_blocks.read();
1168        crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
1169    }
1170
1171    /// Load ccboard user preferences from the cache directory.
1172    pub fn load_preferences(&self) -> crate::preferences::CcboardPreferences {
1173        let cache_dir = self.claude_home.join("cache");
1174        crate::preferences::CcboardPreferences::load(&cache_dir)
1175    }
1176
1177    /// Save ccboard user preferences to the cache directory.
1178    pub fn save_preferences(
1179        &self,
1180        prefs: &crate::preferences::CcboardPreferences,
1181    ) -> anyhow::Result<()> {
1182        let cache_dir = self.claude_home.join("cache");
1183        prefs.save(&cache_dir)
1184    }
1185}
1186
1187#[cfg(test)]
1188mod tests {
1189    use super::*;
1190    use tempfile::tempdir;
1191
1192    #[tokio::test]
1193    async fn test_data_store_creation() {
1194        let dir = tempdir().unwrap();
1195        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1196
1197        assert_eq!(store.session_count(), 0);
1198        assert!(store.stats().is_none());
1199        assert!(store.degraded_state().is_healthy());
1200    }
1201
1202    #[tokio::test]
1203    async fn test_initial_load_missing_dir() {
1204        let dir = tempdir().unwrap();
1205        let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
1206
1207        let report = store.initial_load().await;
1208
1209        // Should have warnings but not crash
1210        assert!(report.has_errors());
1211        assert!(store.degraded_state().is_degraded());
1212    }
1213
1214    #[tokio::test]
1215    async fn test_initial_load_with_stats() {
1216        let dir = tempdir().unwrap();
1217        let claude_home = dir.path();
1218
1219        // Create stats file with new format
1220        std::fs::write(
1221            claude_home.join("stats-cache.json"),
1222            r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
1223        )
1224        .unwrap();
1225
1226        // Create projects dir
1227        std::fs::create_dir_all(claude_home.join("projects")).unwrap();
1228
1229        let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
1230        let report = store.initial_load().await;
1231
1232        assert!(report.stats_loaded);
1233        let stats = store.stats().unwrap();
1234        assert_eq!(stats.total_tokens(), 1000);
1235        assert_eq!(stats.session_count(), 5);
1236    }
1237
1238    #[tokio::test]
1239    async fn test_event_bus_subscription() {
1240        let dir = tempdir().unwrap();
1241        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1242
1243        let mut rx = store.event_bus().subscribe();
1244
1245        // Trigger load completed
1246        store.event_bus().publish(DataEvent::StatsUpdated);
1247
1248        let event = rx.recv().await.unwrap();
1249        assert!(matches!(event, DataEvent::StatsUpdated));
1250    }
1251
1252    #[tokio::test]
1253    async fn test_analytics_cache_and_invalidation() {
1254        use crate::models::session::SessionMetadata;
1255        use chrono::Utc;
1256
1257        let dir = tempdir().unwrap();
1258        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1259
1260        // Add test sessions
1261        let now = Utc::now();
1262        for i in 0..10 {
1263            let total_tokens = 1000 * (i as u64 + 1);
1264            let session = SessionMetadata {
1265                id: format!("test-{}", i).into(),
1266                file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
1267                project_path: "/test".into(),
1268                first_timestamp: Some(now - chrono::Duration::days(i)),
1269                last_timestamp: Some(now),
1270                message_count: 10,
1271                total_tokens,
1272                input_tokens: total_tokens / 2,
1273                output_tokens: total_tokens / 3,
1274                cache_creation_tokens: total_tokens / 10,
1275                cache_read_tokens: total_tokens
1276                    - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
1277                models_used: vec!["sonnet".to_string()],
1278                file_size_bytes: 1024,
1279                first_user_message: None,
1280                has_subagents: false,
1281                duration_seconds: Some(1800),
1282                branch: None,
1283                tool_usage: std::collections::HashMap::new(),
1284                tool_token_usage: std::collections::HashMap::new(),
1285            };
1286            store.sessions.insert(session.id.clone(), Arc::new(session));
1287        }
1288
1289        // Initially no analytics
1290        assert!(store.analytics().is_none());
1291
1292        // Compute analytics
1293        store.compute_analytics(Period::last_7d()).await;
1294
1295        // Analytics should be cached
1296        let analytics1 = store.analytics().expect("Analytics should be cached");
1297        assert!(!analytics1.trends.is_empty());
1298        assert_eq!(analytics1.period, Period::last_7d());
1299
1300        // Invalidate by reloading stats
1301        store.invalidate_analytics_cache();
1302        assert!(store.analytics().is_none(), "Cache should be invalidated");
1303
1304        // Re-compute with different period
1305        store.compute_analytics(Period::last_30d()).await;
1306        let analytics2 = store.analytics().expect("Analytics should be re-cached");
1307        assert_eq!(analytics2.period, Period::last_30d());
1308    }
1309
1310    #[tokio::test]
1311    async fn test_leaderboard_methods() {
1312        use crate::models::session::SessionMetadata;
1313        use chrono::Utc;
1314
1315        let dir = tempdir().unwrap();
1316        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1317
1318        let now = Utc::now();
1319
1320        // Add sessions with varying tokens
1321        let test_data = vec![
1322            ("session-1", 5000u64, "opus", 0),
1323            ("session-2", 3000u64, "sonnet", 1),
1324            ("session-3", 8000u64, "haiku", 0),
1325            ("session-4", 2000u64, "sonnet", 2),
1326            ("session-5", 10000u64, "opus", 0),
1327        ];
1328
1329        for (id, tokens, model, days_ago) in test_data {
1330            let session = SessionMetadata {
1331                id: id.into(),
1332                file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1333                project_path: "/test".into(),
1334                first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1335                last_timestamp: Some(now),
1336                message_count: 10,
1337                total_tokens: tokens,
1338                input_tokens: tokens / 2,
1339                output_tokens: tokens / 2,
1340                cache_creation_tokens: 0,
1341                cache_read_tokens: 0,
1342                models_used: vec![model.to_string()],
1343                file_size_bytes: 1024,
1344                first_user_message: None,
1345                has_subagents: false,
1346                duration_seconds: Some(1800),
1347                branch: None,
1348                tool_usage: std::collections::HashMap::new(),
1349                tool_token_usage: std::collections::HashMap::new(),
1350            };
1351            store.sessions.insert(session.id.clone(), Arc::new(session));
1352        }
1353
1354        // Test top_sessions_by_tokens
1355        let top_sessions = store.top_sessions_by_tokens(3);
1356        assert_eq!(top_sessions.len(), 3);
1357        assert_eq!(top_sessions[0].id, "session-5"); // 10000 tokens
1358        assert_eq!(top_sessions[1].id, "session-3"); // 8000 tokens
1359        assert_eq!(top_sessions[2].id, "session-1"); // 5000 tokens
1360
1361        // Test top_models_by_tokens
1362        let top_models = store.top_models_by_tokens();
1363        assert!(!top_models.is_empty());
1364        // opus: 15000 (5000+10000), sonnet: 5000 (3000+2000), haiku: 8000
1365        assert_eq!(top_models[0].0, "opus");
1366        assert_eq!(top_models[0].1, 15000);
1367        assert_eq!(top_models[1].0, "haiku");
1368        assert_eq!(top_models[1].1, 8000);
1369
1370        // Test top_days_by_tokens
1371        let top_days = store.top_days_by_tokens();
1372        assert!(!top_days.is_empty());
1373        // Day 0 (today): 5000+8000+10000 = 23000
1374        let today = now.format("%Y-%m-%d").to_string();
1375        assert_eq!(top_days[0].0, today);
1376        assert_eq!(top_days[0].1, 23000);
1377    }
1378
1379    /// C3: DashMap takes priority over SQLite in all_violations()
1380    ///
1381    /// Verifies the merge strategy:
1382    /// - Same session_id in both → DashMap version returned (fresher)
1383    /// - Session only in SQLite → SQLite version returned (fills the gap)
1384    #[test]
1385    fn test_all_violations_dashmap_priority_over_sqlite() {
1386        use crate::models::activity::{ActivitySummary, Alert, AlertCategory, AlertSeverity};
1387        use chrono::Utc;
1388
1389        let dir = tempdir().unwrap();
1390        let claude_home = dir.path().to_path_buf();
1391        let store = DataStore::with_defaults(claude_home.clone(), None);
1392
1393        let now = Utc::now();
1394
1395        // ── Setup: SQLite alert (via MetadataCache directly) ──────────────────
1396        let cache = MetadataCache::new(&claude_home.join("cache"))
1397            .expect("MetadataCache should open in tempdir");
1398
1399        // "shared-session" exists in SQLite with a Warning-level alert
1400        let sqlite_summary = ActivitySummary {
1401            alerts: vec![Alert {
1402                session_id: "shared-session".to_string(),
1403                timestamp: now,
1404                severity: AlertSeverity::Warning,
1405                category: AlertCategory::DestructiveCommand,
1406                detail: "sqlite-version".to_string(),
1407            }],
1408            ..Default::default()
1409        };
1410        cache
1411            .put_activity(
1412                std::path::Path::new("/projects/test/shared-session.jsonl"),
1413                "shared-session",
1414                &sqlite_summary,
1415                std::time::SystemTime::now(),
1416            )
1417            .expect("put_activity should succeed");
1418
1419        // "sqlite-only-session" exists exclusively in SQLite
1420        let sqlite_only_summary = ActivitySummary {
1421            alerts: vec![Alert {
1422                session_id: "sqlite-only-session".to_string(),
1423                timestamp: now,
1424                severity: AlertSeverity::Info,
1425                category: AlertCategory::ExternalExfil,
1426                detail: "sqlite-only-detail".to_string(),
1427            }],
1428            ..Default::default()
1429        };
1430        cache
1431            .put_activity(
1432                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1433                "sqlite-only-session",
1434                &sqlite_only_summary,
1435                std::time::SystemTime::now(),
1436            )
1437            .expect("put_activity should succeed");
1438
1439        // Attach the same DB to the store's metadata_cache field
1440        // (accessible from within the same module in #[cfg(test)])
1441        // The store already opened the same cache dir during with_defaults(),
1442        // so both share the same SQLite file.
1443        // We must write through the *store*'s own cache handle to avoid lock
1444        // conflicts. Retrieve it:
1445        let store_cache = store
1446            .metadata_cache
1447            .as_ref()
1448            .expect("MetadataCache should be present in store");
1449
1450        store_cache
1451            .put_activity(
1452                std::path::Path::new("/projects/test/shared-session.jsonl"),
1453                "shared-session",
1454                &sqlite_summary,
1455                std::time::SystemTime::now(),
1456            )
1457            .expect("put_activity via store cache should succeed");
1458        store_cache
1459            .put_activity(
1460                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1461                "sqlite-only-session",
1462                &sqlite_only_summary,
1463                std::time::SystemTime::now(),
1464            )
1465            .expect("put_activity via store cache should succeed");
1466
1467        // ── Setup: DashMap alert for the shared session (fresher, Critical) ───
1468        let dashmap_summary = ActivitySummary {
1469            alerts: vec![Alert {
1470                session_id: "shared-session".to_string(),
1471                timestamp: now,
1472                severity: AlertSeverity::Critical,
1473                category: AlertCategory::ForcePush,
1474                detail: "dashmap-version".to_string(),
1475            }],
1476            ..Default::default()
1477        };
1478        store
1479            .activity_results
1480            .insert("shared-session".to_string(), dashmap_summary);
1481
1482        // ── Assert ────────────────────────────────────────────────────────────
1483        let violations = store.all_violations();
1484
1485        // The DashMap version must appear
1486        let dashmap_hit = violations.iter().find(|a| a.session_id == "shared-session");
1487        assert!(
1488            dashmap_hit.is_some(),
1489            "shared-session alert must appear in violations"
1490        );
1491        assert_eq!(
1492            dashmap_hit.unwrap().detail,
1493            "dashmap-version",
1494            "DashMap version must take priority over SQLite for shared session"
1495        );
1496        assert_eq!(
1497            dashmap_hit.unwrap().severity,
1498            AlertSeverity::Critical,
1499            "DashMap severity (Critical) must win over SQLite (Warning)"
1500        );
1501
1502        // The SQLite-only version must appear (fills the gap)
1503        let sqlite_hit = violations
1504            .iter()
1505            .find(|a| a.session_id == "sqlite-only-session");
1506        assert!(
1507            sqlite_hit.is_some(),
1508            "sqlite-only-session must appear in violations (no DashMap entry for it)"
1509        );
1510        assert_eq!(sqlite_hit.unwrap().detail, "sqlite-only-detail");
1511
1512        // The SQLite version of shared-session must NOT appear
1513        let sqlite_dup = violations
1514            .iter()
1515            .filter(|a| a.session_id == "shared-session")
1516            .count();
1517        assert_eq!(
1518            sqlite_dup, 1,
1519            "shared-session must appear exactly once (DashMap wins, no SQLite duplicate)"
1520        );
1521
1522        // Sorting: Critical before Info
1523        assert_eq!(violations[0].severity, AlertSeverity::Critical);
1524    }
1525}