Skip to main content

ccboard_core/
store.rs

1//! Data store with DashMap + parking_lot::RwLock
2//!
3//! Uses DashMap for sessions (per-entry locking) and parking_lot::RwLock
4//! for stats/settings (better fairness than std::sync::RwLock).
5
6use crate::analytics::{AnalyticsData, Period};
7use crate::cache::{MetadataCache, StoredAlert};
8use crate::error::{CoreError, DegradedState, LoadReport};
9use crate::event::{ConfigScope, DataEvent, EventBus};
10use crate::models::activity::ActivitySummary;
11use crate::models::{
12    BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
13};
14use crate::parsers::{
15    classify_tool_calls, parse_claude_global, parse_tool_calls, ClaudeGlobalStats,
16    InvocationParser, McpConfig, Rules, SessionContentParser, SessionIndexParser, SettingsParser,
17    StatsParser,
18};
19use dashmap::DashMap;
20use moka::future::Cache;
21use parking_lot::RwLock; // parking_lot > std::sync::RwLock: smaller (40B vs 72B), no poisoning, better fairness
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::time::Duration;
25use tracing::{debug, info, warn};
26
27/// Configuration for the data store
28#[derive(Debug, Clone)]
29pub struct DataStoreConfig {
30    /// Maximum session metadata entries to keep
31    pub max_session_metadata_count: usize,
32
33    /// Maximum size for session content cache in MB
34    pub max_session_content_cache_mb: usize,
35
36    /// Maximum concurrent session scans
37    pub max_concurrent_scans: usize,
38
39    /// Stats parser retry count
40    pub stats_retry_count: u32,
41
42    /// Stats parser retry delay
43    pub stats_retry_delay: Duration,
44}
45
46impl Default for DataStoreConfig {
47    fn default() -> Self {
48        Self {
49            max_session_metadata_count: 10_000,
50            max_session_content_cache_mb: 100,
51            max_concurrent_scans: 8,
52            stats_retry_count: 3,
53            stats_retry_delay: Duration::from_millis(100),
54        }
55    }
56}
57
58/// Central data store for ccboard
59///
60/// Thread-safe access to all Claude Code data.
61/// Uses DashMap for sessions (high contention) and RwLock for stats/settings (low contention).
62pub struct DataStore {
63    /// Path to Claude home directory
64    claude_home: PathBuf,
65
66    /// Current project path (if focused)
67    project_path: Option<PathBuf>,
68
69    /// Configuration
70    config: DataStoreConfig,
71
72    /// Stats cache (low contention, frequent reads)
73    stats: RwLock<Option<StatsCache>>,
74
75    /// Merged settings
76    settings: RwLock<MergedConfig>,
77
78    /// MCP server configuration
79    mcp_config: RwLock<Option<McpConfig>>,
80
81    /// Rules from CLAUDE.md files
82    rules: RwLock<Rules>,
83
84    /// Invocation statistics (agents, commands, skills)
85    invocation_stats: RwLock<InvocationStats>,
86
87    /// Billing blocks (5h usage tracking)
88    billing_blocks: RwLock<BillingBlockManager>,
89
90    /// Analytics data cache (invalidated on stats/sessions update)
91    analytics_cache: RwLock<Option<AnalyticsData>>,
92
93    /// Session metadata (high contention with many entries)
94    /// Arc<SessionMetadata> for cheap cloning (8 bytes vs ~400 bytes)
95    ///
96    /// Why Arc over Box: Multi-thread access from TUI + Web frontends
97    /// justifies atomic refcount overhead (~4 bytes). Box would require
98    /// cloning entire struct on each frontend access.
99    sessions: DashMap<SessionId, Arc<SessionMetadata>>,
100
101    /// Session content cache (LRU for on-demand loading)
102    #[allow(dead_code)]
103    session_content_cache: Cache<SessionId, Vec<String>>,
104
105    /// Event bus for notifying subscribers
106    event_bus: EventBus,
107
108    /// Current degraded state
109    degraded_state: RwLock<DegradedState>,
110
111    /// Metadata cache for 90% startup speedup (optional)
112    metadata_cache: Option<Arc<MetadataCache>>,
113
114    /// In-memory activity analysis results (populated by analyze_session)
115    activity_results: DashMap<String, ActivitySummary>,
116
117    /// Hook-based live session state (loaded from ~/.ccboard/live-sessions.json)
118    live_hook_sessions: RwLock<crate::hook_state::LiveSessionFile>,
119
120    /// Per-project last session stats from ~/.claude.json
121    claude_global_stats: RwLock<Option<ClaudeGlobalStats>>,
122}
123
124/// Project leaderboard entry with aggregated metrics
125#[derive(Debug, Clone)]
126pub struct ProjectLeaderboardEntry {
127    pub project_name: String,
128    pub total_sessions: usize,
129    pub total_tokens: u64,
130    pub total_cost: f64,
131    pub avg_session_cost: f64,
132}
133
134impl DataStore {
135    /// Create a new data store
136    pub fn new(
137        claude_home: PathBuf,
138        project_path: Option<PathBuf>,
139        config: DataStoreConfig,
140    ) -> Self {
141        let session_content_cache = Cache::builder()
142            .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) // Rough estimate
143            .time_to_idle(Duration::from_secs(300)) // 5 min idle expiry
144            .build();
145
146        // Create metadata cache in ~/.claude/cache/
147        let metadata_cache = {
148            let cache_dir = claude_home.join("cache");
149            match MetadataCache::new(&cache_dir) {
150                Ok(cache) => {
151                    debug!(path = %cache_dir.display(), "Metadata cache enabled");
152                    Some(Arc::new(cache))
153                }
154                Err(e) => {
155                    warn!(error = %e, "Failed to create metadata cache, running without cache");
156                    None
157                }
158            }
159        };
160
161        Self {
162            claude_home,
163            project_path,
164            config,
165            stats: RwLock::new(None),
166            settings: RwLock::new(MergedConfig::default()),
167            mcp_config: RwLock::new(None),
168            rules: RwLock::new(Rules::default()),
169            invocation_stats: RwLock::new(InvocationStats::new()),
170            billing_blocks: RwLock::new(BillingBlockManager::new()),
171            analytics_cache: RwLock::new(None),
172            sessions: DashMap::new(),
173            session_content_cache,
174            event_bus: EventBus::default_capacity(),
175            degraded_state: RwLock::new(DegradedState::Healthy),
176            metadata_cache,
177            activity_results: DashMap::new(),
178            live_hook_sessions: RwLock::new(crate::hook_state::LiveSessionFile::default()),
179            claude_global_stats: RwLock::new(None),
180        }
181    }
182
183    /// Create with default configuration
184    pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
185        Self::new(claude_home, project_path, DataStoreConfig::default())
186    }
187
188    /// Get the event bus for subscribing to updates
189    pub fn event_bus(&self) -> &EventBus {
190        &self.event_bus
191    }
192
193    /// Get current degraded state
194    pub fn degraded_state(&self) -> DegradedState {
195        self.degraded_state.read().clone()
196    }
197
198    /// Initial load of all data with LoadReport for graceful degradation
199    pub async fn initial_load(&self) -> LoadReport {
200        let mut report = LoadReport::new();
201
202        info!(claude_home = %self.claude_home.display(), "Starting initial data load");
203
204        // Load stats
205        self.load_stats(&mut report).await;
206
207        // Load ~/.claude.json global stats (per-project last session costs)
208        if let Some(home) = dirs::home_dir() {
209            if let Some(global) = parse_claude_global(&home) {
210                *self.claude_global_stats.write() = Some(global);
211                debug!("~/.claude.json loaded successfully");
212            }
213        }
214
215        // Load settings
216        self.load_settings(&mut report).await;
217
218        // Load MCP configuration
219        self.load_mcp_config(&mut report).await;
220
221        // Load rules
222        self.load_rules(&mut report).await;
223
224        // Scan sessions
225        self.scan_sessions(&mut report).await;
226
227        // Determine degraded state
228        self.update_degraded_state(&report);
229
230        // Notify subscribers
231        self.event_bus.publish(DataEvent::LoadCompleted);
232
233        info!(
234            stats_loaded = report.stats_loaded,
235            settings_loaded = report.settings_loaded,
236            sessions_scanned = report.sessions_scanned,
237            sessions_failed = report.sessions_failed,
238            errors = report.errors.len(),
239            "Initial load complete"
240        );
241
242        report
243    }
244
245    /// Load stats cache
246    async fn load_stats(&self, report: &mut LoadReport) {
247        let stats_path = self.claude_home.join("stats-cache.json");
248        let parser = StatsParser::new()
249            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
250
251        if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
252            // Recalculate costs using accurate pricing
253            stats.recalculate_costs();
254            let mut guard = self.stats.write();
255            *guard = Some(stats);
256            debug!("Stats loaded successfully with recalculated costs");
257        }
258    }
259
260    /// Load and merge settings
261    async fn load_settings(&self, report: &mut LoadReport) {
262        let parser = SettingsParser::new();
263        let merged = parser
264            .load_merged(&self.claude_home, self.project_path.as_deref(), report)
265            .await;
266
267        let mut guard = self.settings.write();
268        *guard = merged;
269        debug!("Settings loaded and merged");
270    }
271
272    /// Load MCP server configuration (global + project-level)
273    async fn load_mcp_config(&self, report: &mut LoadReport) {
274        match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
275            Ok(Some(config)) => {
276                let server_count = config.servers.len();
277                let mut guard = self.mcp_config.write();
278                *guard = Some(config);
279                debug!(
280                    server_count,
281                    "MCP config loaded successfully (global + project)"
282                );
283            }
284            Ok(None) => {
285                debug!("No MCP config found (optional)");
286            }
287            Err(e) => {
288                use crate::error::LoadError;
289                report.add_error(LoadError::error(
290                    "mcp_config",
291                    format!("Failed to parse MCP config: {}", e),
292                ));
293            }
294        }
295    }
296
297    /// Load rules from CLAUDE.md files
298    async fn load_rules(&self, report: &mut LoadReport) {
299        match Rules::load(&self.claude_home, self.project_path.as_deref()) {
300            Ok(rules) => {
301                let has_global = rules.global.is_some();
302                let has_project = rules.project.is_some();
303                let mut guard = self.rules.write();
304                *guard = rules;
305                debug!(has_global, has_project, "Rules loaded");
306            }
307            Err(e) => {
308                use crate::error::LoadError;
309                report.add_error(LoadError::error(
310                    "rules",
311                    format!("Failed to load rules: {}", e),
312                ));
313            }
314        }
315    }
316
317    /// Scan all sessions
318    async fn scan_sessions(&self, report: &mut LoadReport) {
319        let projects_dir = self.claude_home.join("projects");
320
321        if !projects_dir.exists() {
322            report.add_warning(
323                "sessions",
324                format!("Projects directory not found: {}", projects_dir.display()),
325            );
326            return;
327        }
328
329        let mut parser =
330            SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
331
332        // Enable metadata cache if available (90% speedup)
333        if let Some(ref cache) = self.metadata_cache {
334            parser = parser.with_cache(cache.clone());
335        }
336
337        let sessions = parser.scan_all(&projects_dir, report).await;
338
339        // Enforce max count limit
340        let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
341            warn!(
342                total = sessions.len(),
343                limit = self.config.max_session_metadata_count,
344                "Session count exceeds limit, keeping most recent"
345            );
346
347            let mut sorted = sessions;
348            sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
349            sorted.truncate(self.config.max_session_metadata_count);
350            sorted
351        } else {
352            sessions
353        };
354
355        // Insert into DashMap (wrap in Arc for cheap cloning)
356        for session in sessions_to_add {
357            self.sessions.insert(session.id.clone(), Arc::new(session));
358        }
359
360        debug!(count = self.sessions.len(), "Sessions indexed");
361    }
362
363    /// Update degraded state based on load report
364    fn update_degraded_state(&self, report: &LoadReport) {
365        let mut state = self.degraded_state.write();
366
367        if report.has_fatal_errors() {
368            *state = DegradedState::ReadOnly {
369                reason: "Fatal errors during load".to_string(),
370            };
371            return;
372        }
373
374        let mut missing = Vec::new();
375
376        if !report.stats_loaded {
377            missing.push("stats".to_string());
378        }
379        if !report.settings_loaded {
380            missing.push("settings".to_string());
381        }
382        if report.sessions_failed > 0 {
383            missing.push(format!("{} sessions", report.sessions_failed));
384        }
385
386        if missing.is_empty() {
387            *state = DegradedState::Healthy;
388        } else {
389            *state = DegradedState::PartialData {
390                missing: missing.clone(),
391                reason: format!("Missing: {}", missing.join(", ")),
392            };
393        }
394    }
395
396    // ===================
397    // Read accessors
398    // ===================
399
400    /// Get a clone of stats
401    pub fn stats(&self) -> Option<StatsCache> {
402        self.stats.read().clone()
403    }
404
405    /// Calculate context window saturation from current sessions
406    pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
407        // Clone Arc (cheap) to avoid lifetime issues with DashMap iterators
408        let sessions: Vec<_> = self
409            .sessions
410            .iter()
411            .map(|entry| Arc::clone(entry.value()))
412            .collect();
413        // Dereference Arc to get &SessionMetadata
414        let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
415        crate::models::StatsCache::calculate_context_saturation(&refs, 30)
416    }
417
418    /// Get merged settings
419    pub fn settings(&self) -> MergedConfig {
420        self.settings.read().clone()
421    }
422
423    /// Get MCP server configuration
424    pub fn mcp_config(&self) -> Option<McpConfig> {
425        self.mcp_config.read().clone()
426    }
427
428    /// Get rules
429    pub fn rules(&self) -> Rules {
430        self.rules.read().clone()
431    }
432
433    /// Get invocation statistics
434    pub fn invocation_stats(&self) -> InvocationStats {
435        self.invocation_stats.read().clone()
436    }
437
438    /// Calculate current quota status from stats and budget config
439    ///
440    /// Returns None if stats are not loaded or budget is not configured.
441    pub fn quota_status(&self) -> Option<crate::quota::QuotaStatus> {
442        let stats = self.stats.read().clone()?;
443        let settings = self.settings.read();
444        let budget = settings.merged.budget.as_ref()?;
445
446        Some(crate::quota::calculate_quota_status(&stats, budget))
447    }
448
449    /// Get live Claude Code sessions (running processes, ps-based)
450    ///
451    /// Detects active Claude processes on the system and returns metadata.
452    /// Returns empty vector if detection fails or no processes are running.
453    pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
454        crate::live_monitor::detect_live_sessions().unwrap_or_default()
455    }
456
457    /// Get merged live sessions: hook data + ps-based fallback
458    ///
459    /// Hook sessions are prioritized; unmatched ps sessions appear as ProcessOnly.
460    pub fn merged_live_sessions(&self) -> Vec<crate::live_monitor::MergedLiveSession> {
461        let hook_file = self.live_hook_sessions.read().clone();
462        let ps_sessions = crate::live_monitor::detect_live_sessions().unwrap_or_default();
463        crate::live_monitor::merge_live_sessions(&hook_file, &ps_sessions)
464    }
465
466    /// Reload hook-based live session state from a file path
467    pub async fn reload_live_hook_sessions(&self, path: &std::path::Path) {
468        match crate::hook_state::LiveSessionFile::load(path) {
469            Ok(file) => {
470                *self.live_hook_sessions.write() = file;
471                debug!("Reloaded live hook sessions from {}", path.display());
472            }
473            Err(e) => {
474                warn!(error = %e, "Failed to reload live-sessions.json");
475            }
476        }
477    }
478
479    /// Get per-project last session stats from ~/.claude.json
480    pub fn claude_global_stats(&self) -> Option<ClaudeGlobalStats> {
481        self.claude_global_stats.read().clone()
482    }
483
484    /// Get session count
485    pub fn session_count(&self) -> usize {
486        self.sessions.len()
487    }
488
489    /// Get session by ID
490    /// Returns Arc<SessionMetadata> for cheap cloning
491    pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
492        self.sessions.get(id).map(|r| Arc::clone(r.value()))
493    }
494
495    /// Load full session content with lazy caching
496    ///
497    /// Returns conversation messages parsed from session JSONL file.
498    /// Uses Moka cache (LRU with 5min TTL) for repeated access.
499    ///
500    /// # Performance
501    /// - First call: Parse JSONL (~50-500ms for 1000-message session)
502    /// - Cached calls: <1ms (memory lookup)
503    /// - Cache eviction: LRU + 5min idle timeout
504    ///
505    /// # Errors
506    /// Returns CoreError if session not found or file cannot be read.
507    pub async fn load_session_content(
508        &self,
509        session_id: &str,
510    ) -> Result<Vec<crate::models::ConversationMessage>, CoreError> {
511        // Get session metadata
512        let metadata = self
513            .get_session(session_id)
514            .ok_or_else(|| CoreError::SessionNotFound {
515                session_id: session_id.to_string(),
516            })?;
517
518        // Try cache first (Moka handles concurrency internally)
519        let session_id_owned = SessionId::from(session_id);
520        if let Some(_cached) = self.session_content_cache.get(&session_id_owned).await {
521            debug!(session_id, "Session content cache HIT");
522            // TODO: Cache design decision - caching Vec<String> vs Vec<ConversationMessage>
523            // For now, always parse from file (will be optimized in cache phase)
524        }
525
526        // Cache miss: parse from file
527        debug!(
528            session_id,
529            path = %metadata.file_path.display(),
530            "Session content cache MISS, parsing JSONL"
531        );
532
533        let messages = SessionContentParser::parse_conversation(
534            &metadata.file_path,
535            (*metadata).clone(), // Clone metadata out of Arc
536        )
537        .await?;
538
539        // Note: Cache insertion skipped for now (caching Vec<String> vs Vec<ConversationMessage> design decision)
540        // Will be added in cache optimization phase
541
542        Ok(messages)
543    }
544
545    /// Get analytics data for a period (cached)
546    ///
547    /// Returns cached analytics if available, otherwise None.
548    /// Call `compute_analytics()` to compute and cache.
549    pub fn analytics(&self) -> Option<AnalyticsData> {
550        let analytics = self.analytics_cache.read().clone();
551        debug!(
552            has_analytics = analytics.is_some(),
553            "analytics() getter called"
554        );
555        analytics
556    }
557
558    /// Compute and cache analytics data for a period
559    ///
560    /// This is a CPU-intensive operation (trends, forecasting, patterns).
561    /// For 1000+ sessions, this may take 100-300ms, so it's offloaded
562    /// to a blocking task.
563    ///
564    /// Cache is invalidated on stats reload or session updates (EventBus pattern).
565    pub async fn compute_analytics(&self, period: Period) {
566        let sessions: Vec<_> = self
567            .sessions
568            .iter()
569            .map(|r| Arc::clone(r.value()))
570            .collect();
571
572        info!(
573            session_count = sessions.len(),
574            period = ?period,
575            "compute_analytics() ENTRY"
576        );
577
578        // Offload to blocking task for CPU-intensive computation
579        let analytics =
580            tokio::task::spawn_blocking(move || AnalyticsData::compute(&sessions, period)).await;
581
582        match analytics {
583            Ok(data) => {
584                info!(
585                    insights_count = data.insights.len(),
586                    "compute_analytics() computed data"
587                );
588                let mut guard = self.analytics_cache.write();
589                *guard = Some(data);
590                self.event_bus.publish(DataEvent::AnalyticsUpdated);
591                info!("compute_analytics() EXIT - cached and event published");
592            }
593            Err(e) => {
594                warn!(error = %e, "Failed to compute analytics (task panicked)");
595            }
596        }
597    }
598
599    /// Invalidate analytics cache (called on data changes)
600    ///
601    /// Note: Currently unused to prevent aggressive invalidation.
602    /// Kept for future use if smart invalidation is needed.
603    #[allow(dead_code)]
604    fn invalidate_analytics_cache(&self) {
605        let mut guard = self.analytics_cache.write();
606        *guard = None;
607        debug!("Analytics cache invalidated");
608    }
609
610    /// Get all session IDs
611    pub fn session_ids(&self) -> Vec<SessionId> {
612        self.sessions.iter().map(|r| r.key().clone()).collect()
613    }
614
615    /// Clear session content cache (for memory optimization on F5)
616    pub fn clear_session_content_cache(&self) {
617        self.session_content_cache.invalidate_all();
618        debug!("Session content cache cleared");
619    }
620
621    /// Get sessions grouped by project
622    /// Returns Arc<SessionMetadata> for cheap cloning
623    pub fn sessions_by_project(
624        &self,
625    ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
626        let mut by_project = std::collections::HashMap::new();
627
628        for entry in self.sessions.iter() {
629            let session = Arc::clone(entry.value());
630            by_project
631                .entry(session.project_path.as_str().to_string())
632                .or_insert_with(Vec::new)
633                .push(session);
634        }
635
636        // Sort sessions within each project by timestamp (newest first)
637        for sessions in by_project.values_mut() {
638            sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
639        }
640
641        by_project
642    }
643
644    /// Get all sessions (unsorted)
645    /// Returns Arc<SessionMetadata> for cheap cloning
646    pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
647        self.sessions
648            .iter()
649            .map(|r| Arc::clone(r.value()))
650            .collect()
651    }
652
653    /// Get recent sessions (sorted by last timestamp, newest first)
654    /// Returns Arc<SessionMetadata> for cheap cloning
655    pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
656        let mut sessions = self.all_sessions();
657        sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
658        sessions.truncate(limit);
659        sessions
660    }
661
662    /// Search sessions using FTS5 full-text search.
663    ///
664    /// Returns relevance-ranked results. Returns empty vec if FTS5 not initialized.
665    pub fn search_sessions(&self, query: &str, limit: usize) -> Vec<crate::cache::SearchResult> {
666        if let Some(ref cache) = self.metadata_cache {
667            match cache.search_sessions(query, limit) {
668                Ok(results) => results,
669                Err(e) => {
670                    warn!("FTS5 search failed: {}", e);
671                    Vec::new()
672                }
673            }
674        } else {
675            Vec::new()
676        }
677    }
678
679    /// Analyze a session's tool calls and generate activity summary + alerts.
680    ///
681    /// Results are stored in the in-memory DashMap and the SQLite cache.
682    /// Publishes DataEvent::AnalyticsUpdated on completion so the TUI re-renders.
683    pub async fn analyze_session(&self, session_id: &str) -> anyhow::Result<ActivitySummary> {
684        use std::time::SystemTime;
685
686        let metadata = self
687            .get_session(session_id)
688            .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
689
690        let path = &metadata.file_path;
691
692        // Read mtime once — used for both cache check and cache write to avoid TOCTOU.
693        // Use tokio::fs to avoid blocking the executor thread.
694        let mtime = tokio::fs::metadata(path)
695            .await
696            .and_then(|m| m.modified())
697            .unwrap_or(SystemTime::UNIX_EPOCH);
698
699        // Check SQLite cache first (avoids re-parsing unchanged files)
700        if let Some(cache) = &self.metadata_cache {
701            if let Ok(Some(cached)) = cache.get_activity(path, mtime) {
702                self.activity_results
703                    .insert(session_id.to_string(), cached.clone());
704                self.event_bus.publish(DataEvent::AnalyticsUpdated);
705                return Ok(cached);
706            }
707        }
708
709        // Cache miss: parse JSONL
710        let calls = parse_tool_calls(path, session_id).await?;
711
712        let project_root = path
713            .parent()
714            .and_then(|p| p.parent())
715            .map(|p| p.to_string_lossy().into_owned());
716
717        let summary = classify_tool_calls(calls, session_id, project_root.as_deref());
718
719        // Persist to SQLite cache — same mtime as used for cache check (no TOCTOU)
720        if let Some(cache) = &self.metadata_cache {
721            if let Err(e) = cache.put_activity(path, session_id, &summary, mtime) {
722                warn!(session_id, error = %e, "Failed to cache activity — will re-parse on restart");
723            }
724        }
725
726        // Store in memory + notify TUI
727        self.activity_results
728            .insert(session_id.to_string(), summary.clone());
729        self.event_bus.publish(DataEvent::AnalyticsUpdated);
730
731        Ok(summary)
732    }
733
734    /// Get the cached activity summary for a session (returns None if not yet analyzed).
735    pub fn get_session_activity(&self, session_id: &str) -> Option<ActivitySummary> {
736        self.activity_results
737            .get(session_id)
738            .map(|r| r.value().clone())
739    }
740
741    /// Get all stored security alerts from the SQLite cache.
742    ///
743    /// `min_severity`: optional filter — "Warning" or "Critical"
744    pub fn get_all_stored_alerts(&self, min_severity: Option<&str>) -> Vec<StoredAlert> {
745        if let Some(cache) = &self.metadata_cache {
746            cache.get_all_alerts(min_severity).unwrap_or_default()
747        } else {
748            vec![]
749        }
750    }
751
752    /// Consolidated violations feed: merges in-memory DashMap results (freshest) with
753    /// SQLite-persisted alerts. DashMap takes priority for sessions analyzed this run.
754    ///
755    /// Returns alerts sorted Critical → Warning → Info, then by timestamp descending.
756    pub fn all_violations(&self) -> Vec<crate::models::activity::Alert> {
757        use crate::models::activity::{Alert, AlertSeverity};
758        use std::collections::HashSet;
759
760        // Collect session_ids already covered by the DashMap (in-memory, freshest data)
761        let mut seen_sessions: HashSet<String> = HashSet::new();
762        let mut alerts: Vec<Alert> = Vec::new();
763
764        for entry in self.activity_results.iter() {
765            seen_sessions.insert(entry.key().clone());
766            alerts.extend(entry.value().alerts.clone());
767        }
768
769        // Supplement with SQLite alerts for sessions NOT in the DashMap
770        if let Some(cache) = &self.metadata_cache {
771            if let Ok(stored) = cache.get_all_alerts(None) {
772                for sa in stored {
773                    // Derive session_id from session_path (filename without extension)
774                    let session_id = std::path::Path::new(&sa.session_path)
775                        .file_stem()
776                        .and_then(|s| s.to_str())
777                        .unwrap_or(&sa.session_path)
778                        .to_string();
779
780                    if seen_sessions.contains(&session_id) {
781                        continue; // DashMap version is fresher, skip SQLite duplicate
782                    }
783
784                    // Parse severity and category from stored strings
785                    let severity = match sa.severity.as_str() {
786                        "Critical" => AlertSeverity::Critical,
787                        "Warning" => AlertSeverity::Warning,
788                        _ => AlertSeverity::Info,
789                    };
790                    let category = match sa.category.as_str() {
791                        "CredentialAccess" => {
792                            crate::models::activity::AlertCategory::CredentialAccess
793                        }
794                        "DestructiveCommand" => {
795                            crate::models::activity::AlertCategory::DestructiveCommand
796                        }
797                        "ForcePush" => crate::models::activity::AlertCategory::ForcePush,
798                        "ScopeViolation" => crate::models::activity::AlertCategory::ScopeViolation,
799                        _ => crate::models::activity::AlertCategory::ExternalExfil,
800                    };
801                    let timestamp = sa
802                        .timestamp
803                        .parse::<chrono::DateTime<chrono::Utc>>()
804                        .unwrap_or_else(|_| chrono::Utc::now());
805
806                    alerts.push(Alert {
807                        session_id,
808                        timestamp,
809                        severity,
810                        category,
811                        detail: sa.detail,
812                    });
813                }
814            }
815        }
816
817        // Sort: Critical > Warning > Info, then newest first within same severity
818        alerts.sort_by(|a, b| {
819            b.severity
820                .partial_cmp(&a.severity)
821                .unwrap_or(std::cmp::Ordering::Equal)
822                .then_with(|| b.timestamp.cmp(&a.timestamp))
823        });
824
825        alerts
826    }
827
828    /// Get top sessions by total tokens (sorted descending)
829    /// Returns Arc<SessionMetadata> for cheap cloning
830    pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
831        let mut sessions: Vec<_> = self
832            .sessions
833            .iter()
834            .map(|r| Arc::clone(r.value()))
835            .collect();
836        sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
837        sessions.truncate(limit);
838        sessions
839    }
840
841    /// Get top models by total tokens (aggregated, sorted descending)
842    /// Returns (model_name, total_tokens) pairs
843    pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
844        let mut model_totals = std::collections::HashMap::new();
845
846        // Aggregate tokens per model across all sessions
847        for session in self.sessions.iter() {
848            for model in &session.value().models_used {
849                *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
850            }
851        }
852
853        // Convert to vec and sort
854        let mut results: Vec<_> = model_totals.into_iter().collect();
855        results.sort_by(|a, b| b.1.cmp(&a.1));
856        results.truncate(10); // Top 10
857        results
858    }
859
860    /// Get top days by total tokens (aggregated, sorted descending)
861    /// Returns (date_string, total_tokens) pairs
862    pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
863        let mut day_totals = std::collections::HashMap::new();
864
865        // Aggregate tokens per day across all sessions
866        for session in self.sessions.iter() {
867            if let Some(timestamp) = &session.value().first_timestamp {
868                let date = timestamp.format("%Y-%m-%d").to_string();
869                *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
870            }
871        }
872
873        // Convert to vec and sort
874        let mut results: Vec<_> = day_totals.into_iter().collect();
875        results.sort_by(|a, b| b.1.cmp(&a.1));
876        results.truncate(10); // Top 10
877        results
878    }
879
880    /// Get project leaderboard with aggregated metrics
881    ///
882    /// Returns all projects with session count, total tokens, total cost, and average session cost.
883    /// Cost is calculated using accurate model-based pricing from the pricing module.
884    pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
885        let mut project_metrics = std::collections::HashMap::new();
886
887        // Aggregate metrics per project
888        for session in self.sessions.iter() {
889            let metadata = session.value();
890            let project_path = &metadata.project_path;
891
892            // Get model for this session (use first model, or "unknown")
893            let model = metadata
894                .models_used
895                .first()
896                .map(|s| s.as_str())
897                .unwrap_or("unknown");
898
899            // Calculate cost using accurate pricing
900            let cost = crate::pricing::calculate_cost(
901                model,
902                metadata.input_tokens,
903                metadata.output_tokens,
904                metadata.cache_creation_tokens,
905                metadata.cache_read_tokens,
906            );
907
908            let entry = project_metrics
909                .entry(project_path.clone())
910                .or_insert((0, 0u64, 0.0f64)); // (session_count, total_tokens, total_cost)
911
912            entry.0 += 1; // session count
913            entry.1 += metadata.total_tokens; // total tokens
914            entry.2 += cost; // total cost
915        }
916
917        // Convert to leaderboard entries
918        let mut results: Vec<_> = project_metrics
919            .into_iter()
920            .map(
921                |(project_path, (session_count, total_tokens, total_cost))| {
922                    let avg_session_cost = if session_count > 0 {
923                        total_cost / session_count as f64
924                    } else {
925                        0.0
926                    };
927
928                    // Extract project name from path (last component)
929                    let project_name = std::path::Path::new(project_path.as_str())
930                        .file_name()
931                        .and_then(|n| n.to_str())
932                        .unwrap_or(project_path.as_str())
933                        .to_string();
934
935                    ProjectLeaderboardEntry {
936                        project_name,
937                        total_sessions: session_count,
938                        total_tokens,
939                        total_cost,
940                        avg_session_cost,
941                    }
942                },
943            )
944            .collect();
945
946        // Default sort: by total cost descending
947        results.sort_by(|a, b| {
948            b.total_cost
949                .partial_cmp(&a.total_cost)
950                .unwrap_or(std::cmp::Ordering::Equal)
951        });
952
953        results
954    }
955
956    // ===================
957    // Update methods (called by watcher)
958    // ===================
959
960    /// Reload stats (called on file change)
961    pub async fn reload_stats(&self) {
962        let stats_path = self.claude_home.join("stats-cache.json");
963        let parser = StatsParser::new()
964            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
965
966        let mut report = LoadReport::new();
967        if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
968            // Recalculate costs using accurate pricing
969            stats.recalculate_costs();
970            let mut guard = self.stats.write();
971            *guard = Some(stats);
972
973            // Don't invalidate analytics - it will auto-recompute if needed
974            // Instead, just publish the event so UI can decide whether to recompute
975            self.event_bus.publish(DataEvent::StatsUpdated);
976            debug!("Stats reloaded with recalculated costs");
977        }
978    }
979
980    /// Reload settings from files (called when settings change)
981    pub async fn reload_settings(&self) {
982        let parser = SettingsParser::new();
983        let merged = parser
984            .load_merged(
985                &self.claude_home,
986                self.project_path.as_deref(),
987                &mut LoadReport::new(),
988            )
989            .await;
990
991        {
992            let mut guard = self.settings.write();
993            *guard = merged;
994        }
995
996        self.event_bus
997            .publish(DataEvent::ConfigChanged(ConfigScope::Global));
998        debug!("Settings reloaded");
999    }
1000
1001    /// Add or update a session (called when session file changes)
1002    pub async fn update_session(&self, path: &Path) {
1003        let parser = SessionIndexParser::new();
1004
1005        match parser.scan_session(path).await {
1006            Ok(meta) => {
1007                let id = meta.id.clone();
1008                let is_new = !self.sessions.contains_key(&id);
1009
1010                self.sessions.insert(id.clone(), Arc::new(meta));
1011
1012                // Don't invalidate analytics on every session update - too aggressive
1013                // Analytics will be recomputed on demand or periodically
1014                // Only invalidate on significant changes (detected by UI)
1015
1016                if is_new {
1017                    self.event_bus.publish(DataEvent::SessionCreated(id));
1018                } else {
1019                    self.event_bus.publish(DataEvent::SessionUpdated(id));
1020                }
1021            }
1022            Err(e) => {
1023                warn!(path = %path.display(), error = %e, "Failed to update session");
1024            }
1025        }
1026    }
1027
1028    /// Compute invocation statistics from all sessions
1029    ///
1030    /// This scans all session files to count agent/command/skill invocations.
1031    /// Should be called after initial load or when sessions are updated.
1032    pub async fn compute_invocations(&self) {
1033        let paths: Vec<_> = self
1034            .sessions
1035            .iter()
1036            .map(|r| r.value().file_path.clone())
1037            .collect();
1038
1039        debug!(session_count = paths.len(), "Computing invocation stats");
1040
1041        let parser = InvocationParser::new();
1042        let mut stats = parser.scan_sessions(&paths).await;
1043
1044        // Populate agent_token_stats from session tool_token_usage
1045        // The Task tool tokens serve as a proxy for agent token consumption
1046        for session_ref in self.sessions.iter() {
1047            let session = session_ref.value();
1048            if let Some(&task_tokens) = session.tool_token_usage.get("Task") {
1049                // Distribute Task tool tokens equally among agents spawned in this session
1050                let agent_count =
1051                    session.tool_usage.get("Task").copied().unwrap_or(0).max(1) as u64;
1052                let tokens_per_agent = task_tokens / agent_count;
1053                // Attribute to all agent types found in stats that were invoked
1054                for agent_type in stats.agents.keys().cloned().collect::<Vec<_>>() {
1055                    *stats.agent_token_stats.entry(agent_type).or_insert(0) += tokens_per_agent;
1056                }
1057            }
1058        }
1059
1060        let mut guard = self.invocation_stats.write();
1061        *guard = stats;
1062
1063        debug!(
1064            agents = guard.agents.len(),
1065            commands = guard.commands.len(),
1066            skills = guard.skills.len(),
1067            total = guard.total_invocations(),
1068            "Invocation stats computed"
1069        );
1070
1071        // Note: Using LoadCompleted as there's no specific invocation stats event
1072        self.event_bus.publish(DataEvent::LoadCompleted);
1073    }
1074
1075    /// Compute billing blocks from all sessions
1076    ///
1077    /// This scans all sessions with timestamps and aggregates usage into 5-hour billing blocks.
1078    /// Uses real model pricing based on token breakdown for accurate cost calculation.
1079    pub async fn compute_billing_blocks(&self) {
1080        debug!("Computing billing blocks from sessions with real pricing");
1081
1082        let mut manager = BillingBlockManager::new();
1083        let mut sessions_with_timestamps = 0;
1084        let mut sessions_without_timestamps = 0;
1085
1086        for session in self.sessions.iter() {
1087            let metadata = session.value();
1088
1089            // Skip sessions without timestamps
1090            let Some(timestamp) = &metadata.first_timestamp else {
1091                sessions_without_timestamps += 1;
1092                continue;
1093            };
1094
1095            sessions_with_timestamps += 1;
1096
1097            // Get model for this session (use first model, or "unknown")
1098            let model = metadata
1099                .models_used
1100                .first()
1101                .map(|s| s.as_str())
1102                .unwrap_or("unknown");
1103
1104            // Calculate real cost using pricing table
1105            let cost = crate::pricing::calculate_cost(
1106                model,
1107                metadata.input_tokens,
1108                metadata.output_tokens,
1109                metadata.cache_creation_tokens,
1110                metadata.cache_read_tokens,
1111            );
1112
1113            manager.add_usage(
1114                timestamp,
1115                metadata.input_tokens,
1116                metadata.output_tokens,
1117                metadata.cache_creation_tokens,
1118                metadata.cache_read_tokens,
1119                cost,
1120            );
1121        }
1122
1123        debug!(
1124            sessions_with_timestamps,
1125            sessions_without_timestamps,
1126            blocks = manager.get_all_blocks().len(),
1127            "Billing blocks computed with real pricing"
1128        );
1129
1130        let mut guard = self.billing_blocks.write();
1131        *guard = manager;
1132
1133        self.event_bus.publish(DataEvent::LoadCompleted);
1134    }
1135
1136    /// Get billing blocks (read-only access)
1137    pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
1138        self.billing_blocks.read()
1139    }
1140
1141    /// Calculate usage estimate based on billing blocks and subscription plan
1142    pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
1143        let settings = self.settings();
1144        let plan = settings
1145            .merged
1146            .subscription_plan
1147            .as_ref()
1148            .map(|s| crate::usage_estimator::SubscriptionPlan::parse(s))
1149            .unwrap_or_default();
1150
1151        let billing_blocks = self.billing_blocks.read();
1152        crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
1153    }
1154
1155    /// Load ccboard user preferences from the cache directory.
1156    pub fn load_preferences(&self) -> crate::preferences::CcboardPreferences {
1157        let cache_dir = self.claude_home.join("cache");
1158        crate::preferences::CcboardPreferences::load(&cache_dir)
1159    }
1160
1161    /// Save ccboard user preferences to the cache directory.
1162    pub fn save_preferences(
1163        &self,
1164        prefs: &crate::preferences::CcboardPreferences,
1165    ) -> anyhow::Result<()> {
1166        let cache_dir = self.claude_home.join("cache");
1167        prefs.save(&cache_dir)
1168    }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173    use super::*;
1174    use tempfile::tempdir;
1175
1176    #[tokio::test]
1177    async fn test_data_store_creation() {
1178        let dir = tempdir().unwrap();
1179        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1180
1181        assert_eq!(store.session_count(), 0);
1182        assert!(store.stats().is_none());
1183        assert!(store.degraded_state().is_healthy());
1184    }
1185
1186    #[tokio::test]
1187    async fn test_initial_load_missing_dir() {
1188        let dir = tempdir().unwrap();
1189        let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
1190
1191        let report = store.initial_load().await;
1192
1193        // Should have warnings but not crash
1194        assert!(report.has_errors());
1195        assert!(store.degraded_state().is_degraded());
1196    }
1197
1198    #[tokio::test]
1199    async fn test_initial_load_with_stats() {
1200        let dir = tempdir().unwrap();
1201        let claude_home = dir.path();
1202
1203        // Create stats file with new format
1204        std::fs::write(
1205            claude_home.join("stats-cache.json"),
1206            r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
1207        )
1208        .unwrap();
1209
1210        // Create projects dir
1211        std::fs::create_dir_all(claude_home.join("projects")).unwrap();
1212
1213        let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
1214        let report = store.initial_load().await;
1215
1216        assert!(report.stats_loaded);
1217        let stats = store.stats().unwrap();
1218        assert_eq!(stats.total_tokens(), 1000);
1219        assert_eq!(stats.session_count(), 5);
1220    }
1221
1222    #[tokio::test]
1223    async fn test_event_bus_subscription() {
1224        let dir = tempdir().unwrap();
1225        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1226
1227        let mut rx = store.event_bus().subscribe();
1228
1229        // Trigger load completed
1230        store.event_bus().publish(DataEvent::StatsUpdated);
1231
1232        let event = rx.recv().await.unwrap();
1233        assert!(matches!(event, DataEvent::StatsUpdated));
1234    }
1235
1236    #[tokio::test]
1237    async fn test_analytics_cache_and_invalidation() {
1238        use crate::models::session::SessionMetadata;
1239        use chrono::Utc;
1240
1241        let dir = tempdir().unwrap();
1242        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1243
1244        // Add test sessions
1245        let now = Utc::now();
1246        for i in 0..10 {
1247            let total_tokens = 1000 * (i as u64 + 1);
1248            let session = SessionMetadata {
1249                id: format!("test-{}", i).into(),
1250                file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
1251                project_path: "/test".into(),
1252                first_timestamp: Some(now - chrono::Duration::days(i)),
1253                last_timestamp: Some(now),
1254                message_count: 10,
1255                total_tokens,
1256                input_tokens: total_tokens / 2,
1257                output_tokens: total_tokens / 3,
1258                cache_creation_tokens: total_tokens / 10,
1259                cache_read_tokens: total_tokens
1260                    - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
1261                models_used: vec!["sonnet".to_string()],
1262                file_size_bytes: 1024,
1263                first_user_message: None,
1264                has_subagents: false,
1265                duration_seconds: Some(1800),
1266                branch: None,
1267                tool_usage: std::collections::HashMap::new(),
1268                tool_token_usage: std::collections::HashMap::new(),
1269            };
1270            store.sessions.insert(session.id.clone(), Arc::new(session));
1271        }
1272
1273        // Initially no analytics
1274        assert!(store.analytics().is_none());
1275
1276        // Compute analytics
1277        store.compute_analytics(Period::last_7d()).await;
1278
1279        // Analytics should be cached
1280        let analytics1 = store.analytics().expect("Analytics should be cached");
1281        assert!(!analytics1.trends.is_empty());
1282        assert_eq!(analytics1.period, Period::last_7d());
1283
1284        // Invalidate by reloading stats
1285        store.invalidate_analytics_cache();
1286        assert!(store.analytics().is_none(), "Cache should be invalidated");
1287
1288        // Re-compute with different period
1289        store.compute_analytics(Period::last_30d()).await;
1290        let analytics2 = store.analytics().expect("Analytics should be re-cached");
1291        assert_eq!(analytics2.period, Period::last_30d());
1292    }
1293
1294    #[tokio::test]
1295    async fn test_leaderboard_methods() {
1296        use crate::models::session::SessionMetadata;
1297        use chrono::Utc;
1298
1299        let dir = tempdir().unwrap();
1300        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1301
1302        let now = Utc::now();
1303
1304        // Add sessions with varying tokens
1305        let test_data = vec![
1306            ("session-1", 5000u64, "opus", 0),
1307            ("session-2", 3000u64, "sonnet", 1),
1308            ("session-3", 8000u64, "haiku", 0),
1309            ("session-4", 2000u64, "sonnet", 2),
1310            ("session-5", 10000u64, "opus", 0),
1311        ];
1312
1313        for (id, tokens, model, days_ago) in test_data {
1314            let session = SessionMetadata {
1315                id: id.into(),
1316                file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1317                project_path: "/test".into(),
1318                first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1319                last_timestamp: Some(now),
1320                message_count: 10,
1321                total_tokens: tokens,
1322                input_tokens: tokens / 2,
1323                output_tokens: tokens / 2,
1324                cache_creation_tokens: 0,
1325                cache_read_tokens: 0,
1326                models_used: vec![model.to_string()],
1327                file_size_bytes: 1024,
1328                first_user_message: None,
1329                has_subagents: false,
1330                duration_seconds: Some(1800),
1331                branch: None,
1332                tool_usage: std::collections::HashMap::new(),
1333                tool_token_usage: std::collections::HashMap::new(),
1334            };
1335            store.sessions.insert(session.id.clone(), Arc::new(session));
1336        }
1337
1338        // Test top_sessions_by_tokens
1339        let top_sessions = store.top_sessions_by_tokens(3);
1340        assert_eq!(top_sessions.len(), 3);
1341        assert_eq!(top_sessions[0].id, "session-5"); // 10000 tokens
1342        assert_eq!(top_sessions[1].id, "session-3"); // 8000 tokens
1343        assert_eq!(top_sessions[2].id, "session-1"); // 5000 tokens
1344
1345        // Test top_models_by_tokens
1346        let top_models = store.top_models_by_tokens();
1347        assert!(!top_models.is_empty());
1348        // opus: 15000 (5000+10000), sonnet: 5000 (3000+2000), haiku: 8000
1349        assert_eq!(top_models[0].0, "opus");
1350        assert_eq!(top_models[0].1, 15000);
1351        assert_eq!(top_models[1].0, "haiku");
1352        assert_eq!(top_models[1].1, 8000);
1353
1354        // Test top_days_by_tokens
1355        let top_days = store.top_days_by_tokens();
1356        assert!(!top_days.is_empty());
1357        // Day 0 (today): 5000+8000+10000 = 23000
1358        let today = now.format("%Y-%m-%d").to_string();
1359        assert_eq!(top_days[0].0, today);
1360        assert_eq!(top_days[0].1, 23000);
1361    }
1362
1363    /// C3: DashMap takes priority over SQLite in all_violations()
1364    ///
1365    /// Verifies the merge strategy:
1366    /// - Same session_id in both → DashMap version returned (fresher)
1367    /// - Session only in SQLite → SQLite version returned (fills the gap)
1368    #[test]
1369    fn test_all_violations_dashmap_priority_over_sqlite() {
1370        use crate::models::activity::{ActivitySummary, Alert, AlertCategory, AlertSeverity};
1371        use chrono::Utc;
1372
1373        let dir = tempdir().unwrap();
1374        let claude_home = dir.path().to_path_buf();
1375        let store = DataStore::with_defaults(claude_home.clone(), None);
1376
1377        let now = Utc::now();
1378
1379        // ── Setup: SQLite alert (via MetadataCache directly) ──────────────────
1380        let cache = MetadataCache::new(&claude_home.join("cache"))
1381            .expect("MetadataCache should open in tempdir");
1382
1383        // "shared-session" exists in SQLite with a Warning-level alert
1384        let sqlite_summary = ActivitySummary {
1385            alerts: vec![Alert {
1386                session_id: "shared-session".to_string(),
1387                timestamp: now,
1388                severity: AlertSeverity::Warning,
1389                category: AlertCategory::DestructiveCommand,
1390                detail: "sqlite-version".to_string(),
1391            }],
1392            ..Default::default()
1393        };
1394        cache
1395            .put_activity(
1396                std::path::Path::new("/projects/test/shared-session.jsonl"),
1397                "shared-session",
1398                &sqlite_summary,
1399                std::time::SystemTime::now(),
1400            )
1401            .expect("put_activity should succeed");
1402
1403        // "sqlite-only-session" exists exclusively in SQLite
1404        let sqlite_only_summary = ActivitySummary {
1405            alerts: vec![Alert {
1406                session_id: "sqlite-only-session".to_string(),
1407                timestamp: now,
1408                severity: AlertSeverity::Info,
1409                category: AlertCategory::ExternalExfil,
1410                detail: "sqlite-only-detail".to_string(),
1411            }],
1412            ..Default::default()
1413        };
1414        cache
1415            .put_activity(
1416                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1417                "sqlite-only-session",
1418                &sqlite_only_summary,
1419                std::time::SystemTime::now(),
1420            )
1421            .expect("put_activity should succeed");
1422
1423        // Attach the same DB to the store's metadata_cache field
1424        // (accessible from within the same module in #[cfg(test)])
1425        // The store already opened the same cache dir during with_defaults(),
1426        // so both share the same SQLite file.
1427        // We must write through the *store*'s own cache handle to avoid lock
1428        // conflicts. Retrieve it:
1429        let store_cache = store
1430            .metadata_cache
1431            .as_ref()
1432            .expect("MetadataCache should be present in store");
1433
1434        store_cache
1435            .put_activity(
1436                std::path::Path::new("/projects/test/shared-session.jsonl"),
1437                "shared-session",
1438                &sqlite_summary,
1439                std::time::SystemTime::now(),
1440            )
1441            .expect("put_activity via store cache should succeed");
1442        store_cache
1443            .put_activity(
1444                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1445                "sqlite-only-session",
1446                &sqlite_only_summary,
1447                std::time::SystemTime::now(),
1448            )
1449            .expect("put_activity via store cache should succeed");
1450
1451        // ── Setup: DashMap alert for the shared session (fresher, Critical) ───
1452        let dashmap_summary = ActivitySummary {
1453            alerts: vec![Alert {
1454                session_id: "shared-session".to_string(),
1455                timestamp: now,
1456                severity: AlertSeverity::Critical,
1457                category: AlertCategory::ForcePush,
1458                detail: "dashmap-version".to_string(),
1459            }],
1460            ..Default::default()
1461        };
1462        store
1463            .activity_results
1464            .insert("shared-session".to_string(), dashmap_summary);
1465
1466        // ── Assert ────────────────────────────────────────────────────────────
1467        let violations = store.all_violations();
1468
1469        // The DashMap version must appear
1470        let dashmap_hit = violations.iter().find(|a| a.session_id == "shared-session");
1471        assert!(
1472            dashmap_hit.is_some(),
1473            "shared-session alert must appear in violations"
1474        );
1475        assert_eq!(
1476            dashmap_hit.unwrap().detail,
1477            "dashmap-version",
1478            "DashMap version must take priority over SQLite for shared session"
1479        );
1480        assert_eq!(
1481            dashmap_hit.unwrap().severity,
1482            AlertSeverity::Critical,
1483            "DashMap severity (Critical) must win over SQLite (Warning)"
1484        );
1485
1486        // The SQLite-only version must appear (fills the gap)
1487        let sqlite_hit = violations
1488            .iter()
1489            .find(|a| a.session_id == "sqlite-only-session");
1490        assert!(
1491            sqlite_hit.is_some(),
1492            "sqlite-only-session must appear in violations (no DashMap entry for it)"
1493        );
1494        assert_eq!(sqlite_hit.unwrap().detail, "sqlite-only-detail");
1495
1496        // The SQLite version of shared-session must NOT appear
1497        let sqlite_dup = violations
1498            .iter()
1499            .filter(|a| a.session_id == "shared-session")
1500            .count();
1501        assert_eq!(
1502            sqlite_dup, 1,
1503            "shared-session must appear exactly once (DashMap wins, no SQLite duplicate)"
1504        );
1505
1506        // Sorting: Critical before Info
1507        assert_eq!(violations[0].severity, AlertSeverity::Critical);
1508    }
1509}