Skip to main content

ccboard_core/
store.rs

1//! Data store with DashMap + parking_lot::RwLock
2//!
3//! Uses DashMap for sessions (per-entry locking) and parking_lot::RwLock
4//! for stats/settings (better fairness than std::sync::RwLock).
5
6use crate::analytics::{AnalyticsData, Period};
7use crate::cache::{MetadataCache, StoredAlert};
8use crate::error::{CoreError, DegradedState, LoadReport};
9use crate::event::{ConfigScope, DataEvent, EventBus};
10use crate::models::activity::ActivitySummary;
11use crate::models::{
12    BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
13};
14use crate::parsers::{
15    classify_tool_calls, parse_tool_calls, InvocationParser, McpConfig, Rules,
16    SessionContentParser, SessionIndexParser, SettingsParser, StatsParser,
17};
18use dashmap::DashMap;
19use moka::future::Cache;
20use parking_lot::RwLock; // parking_lot > std::sync::RwLock: smaller (40B vs 72B), no poisoning, better fairness
21use std::path::{Path, PathBuf};
22use std::sync::Arc;
23use std::time::Duration;
24use tracing::{debug, info, warn};
25
26/// Configuration for the data store
27#[derive(Debug, Clone)]
28pub struct DataStoreConfig {
29    /// Maximum session metadata entries to keep
30    pub max_session_metadata_count: usize,
31
32    /// Maximum size for session content cache in MB
33    pub max_session_content_cache_mb: usize,
34
35    /// Maximum concurrent session scans
36    pub max_concurrent_scans: usize,
37
38    /// Stats parser retry count
39    pub stats_retry_count: u32,
40
41    /// Stats parser retry delay
42    pub stats_retry_delay: Duration,
43}
44
45impl Default for DataStoreConfig {
46    fn default() -> Self {
47        Self {
48            max_session_metadata_count: 10_000,
49            max_session_content_cache_mb: 100,
50            max_concurrent_scans: 8,
51            stats_retry_count: 3,
52            stats_retry_delay: Duration::from_millis(100),
53        }
54    }
55}
56
57/// Central data store for ccboard
58///
59/// Thread-safe access to all Claude Code data.
60/// Uses DashMap for sessions (high contention) and RwLock for stats/settings (low contention).
61pub struct DataStore {
62    /// Path to Claude home directory
63    claude_home: PathBuf,
64
65    /// Current project path (if focused)
66    project_path: Option<PathBuf>,
67
68    /// Configuration
69    config: DataStoreConfig,
70
71    /// Stats cache (low contention, frequent reads)
72    stats: RwLock<Option<StatsCache>>,
73
74    /// Merged settings
75    settings: RwLock<MergedConfig>,
76
77    /// MCP server configuration
78    mcp_config: RwLock<Option<McpConfig>>,
79
80    /// Rules from CLAUDE.md files
81    rules: RwLock<Rules>,
82
83    /// Invocation statistics (agents, commands, skills)
84    invocation_stats: RwLock<InvocationStats>,
85
86    /// Billing blocks (5h usage tracking)
87    billing_blocks: RwLock<BillingBlockManager>,
88
89    /// Analytics data cache (invalidated on stats/sessions update)
90    analytics_cache: RwLock<Option<AnalyticsData>>,
91
92    /// Session metadata (high contention with many entries)
93    /// Arc<SessionMetadata> for cheap cloning (8 bytes vs ~400 bytes)
94    ///
95    /// Why Arc over Box: Multi-thread access from TUI + Web frontends
96    /// justifies atomic refcount overhead (~4 bytes). Box would require
97    /// cloning entire struct on each frontend access.
98    sessions: DashMap<SessionId, Arc<SessionMetadata>>,
99
100    /// Session content cache (LRU for on-demand loading)
101    #[allow(dead_code)]
102    session_content_cache: Cache<SessionId, Vec<String>>,
103
104    /// Event bus for notifying subscribers
105    event_bus: EventBus,
106
107    /// Current degraded state
108    degraded_state: RwLock<DegradedState>,
109
110    /// Metadata cache for 90% startup speedup (optional)
111    metadata_cache: Option<Arc<MetadataCache>>,
112
113    /// In-memory activity analysis results (populated by analyze_session)
114    activity_results: DashMap<String, ActivitySummary>,
115}
116
117/// Project leaderboard entry with aggregated metrics
118#[derive(Debug, Clone)]
119pub struct ProjectLeaderboardEntry {
120    pub project_name: String,
121    pub total_sessions: usize,
122    pub total_tokens: u64,
123    pub total_cost: f64,
124    pub avg_session_cost: f64,
125}
126
127impl DataStore {
128    /// Create a new data store
129    pub fn new(
130        claude_home: PathBuf,
131        project_path: Option<PathBuf>,
132        config: DataStoreConfig,
133    ) -> Self {
134        let session_content_cache = Cache::builder()
135            .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) // Rough estimate
136            .time_to_idle(Duration::from_secs(300)) // 5 min idle expiry
137            .build();
138
139        // Create metadata cache in ~/.claude/cache/
140        let metadata_cache = {
141            let cache_dir = claude_home.join("cache");
142            match MetadataCache::new(&cache_dir) {
143                Ok(cache) => {
144                    debug!(path = %cache_dir.display(), "Metadata cache enabled");
145                    Some(Arc::new(cache))
146                }
147                Err(e) => {
148                    warn!(error = %e, "Failed to create metadata cache, running without cache");
149                    None
150                }
151            }
152        };
153
154        Self {
155            claude_home,
156            project_path,
157            config,
158            stats: RwLock::new(None),
159            settings: RwLock::new(MergedConfig::default()),
160            mcp_config: RwLock::new(None),
161            rules: RwLock::new(Rules::default()),
162            invocation_stats: RwLock::new(InvocationStats::new()),
163            billing_blocks: RwLock::new(BillingBlockManager::new()),
164            analytics_cache: RwLock::new(None),
165            sessions: DashMap::new(),
166            session_content_cache,
167            event_bus: EventBus::default_capacity(),
168            degraded_state: RwLock::new(DegradedState::Healthy),
169            metadata_cache,
170            activity_results: DashMap::new(),
171        }
172    }
173
174    /// Create with default configuration
175    pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
176        Self::new(claude_home, project_path, DataStoreConfig::default())
177    }
178
179    /// Get the event bus for subscribing to updates
180    pub fn event_bus(&self) -> &EventBus {
181        &self.event_bus
182    }
183
184    /// Get current degraded state
185    pub fn degraded_state(&self) -> DegradedState {
186        self.degraded_state.read().clone()
187    }
188
189    /// Initial load of all data with LoadReport for graceful degradation
190    pub async fn initial_load(&self) -> LoadReport {
191        let mut report = LoadReport::new();
192
193        info!(claude_home = %self.claude_home.display(), "Starting initial data load");
194
195        // Load stats
196        self.load_stats(&mut report).await;
197
198        // Load settings
199        self.load_settings(&mut report).await;
200
201        // Load MCP configuration
202        self.load_mcp_config(&mut report).await;
203
204        // Load rules
205        self.load_rules(&mut report).await;
206
207        // Scan sessions
208        self.scan_sessions(&mut report).await;
209
210        // Determine degraded state
211        self.update_degraded_state(&report);
212
213        // Notify subscribers
214        self.event_bus.publish(DataEvent::LoadCompleted);
215
216        info!(
217            stats_loaded = report.stats_loaded,
218            settings_loaded = report.settings_loaded,
219            sessions_scanned = report.sessions_scanned,
220            sessions_failed = report.sessions_failed,
221            errors = report.errors.len(),
222            "Initial load complete"
223        );
224
225        report
226    }
227
228    /// Load stats cache
229    async fn load_stats(&self, report: &mut LoadReport) {
230        let stats_path = self.claude_home.join("stats-cache.json");
231        let parser = StatsParser::new()
232            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
233
234        if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
235            // Recalculate costs using accurate pricing
236            stats.recalculate_costs();
237            let mut guard = self.stats.write();
238            *guard = Some(stats);
239            debug!("Stats loaded successfully with recalculated costs");
240        }
241    }
242
243    /// Load and merge settings
244    async fn load_settings(&self, report: &mut LoadReport) {
245        let parser = SettingsParser::new();
246        let merged = parser
247            .load_merged(&self.claude_home, self.project_path.as_deref(), report)
248            .await;
249
250        let mut guard = self.settings.write();
251        *guard = merged;
252        debug!("Settings loaded and merged");
253    }
254
255    /// Load MCP server configuration (global + project-level)
256    async fn load_mcp_config(&self, report: &mut LoadReport) {
257        match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
258            Ok(Some(config)) => {
259                let server_count = config.servers.len();
260                let mut guard = self.mcp_config.write();
261                *guard = Some(config);
262                debug!(
263                    server_count,
264                    "MCP config loaded successfully (global + project)"
265                );
266            }
267            Ok(None) => {
268                debug!("No MCP config found (optional)");
269            }
270            Err(e) => {
271                use crate::error::LoadError;
272                report.add_error(LoadError::error(
273                    "mcp_config",
274                    format!("Failed to parse MCP config: {}", e),
275                ));
276            }
277        }
278    }
279
280    /// Load rules from CLAUDE.md files
281    async fn load_rules(&self, report: &mut LoadReport) {
282        match Rules::load(&self.claude_home, self.project_path.as_deref()) {
283            Ok(rules) => {
284                let has_global = rules.global.is_some();
285                let has_project = rules.project.is_some();
286                let mut guard = self.rules.write();
287                *guard = rules;
288                debug!(has_global, has_project, "Rules loaded");
289            }
290            Err(e) => {
291                use crate::error::LoadError;
292                report.add_error(LoadError::error(
293                    "rules",
294                    format!("Failed to load rules: {}", e),
295                ));
296            }
297        }
298    }
299
300    /// Scan all sessions
301    async fn scan_sessions(&self, report: &mut LoadReport) {
302        let projects_dir = self.claude_home.join("projects");
303
304        if !projects_dir.exists() {
305            report.add_warning(
306                "sessions",
307                format!("Projects directory not found: {}", projects_dir.display()),
308            );
309            return;
310        }
311
312        let mut parser =
313            SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
314
315        // Enable metadata cache if available (90% speedup)
316        if let Some(ref cache) = self.metadata_cache {
317            parser = parser.with_cache(cache.clone());
318        }
319
320        let sessions = parser.scan_all(&projects_dir, report).await;
321
322        // Enforce max count limit
323        let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
324            warn!(
325                total = sessions.len(),
326                limit = self.config.max_session_metadata_count,
327                "Session count exceeds limit, keeping most recent"
328            );
329
330            let mut sorted = sessions;
331            sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
332            sorted.truncate(self.config.max_session_metadata_count);
333            sorted
334        } else {
335            sessions
336        };
337
338        // Insert into DashMap (wrap in Arc for cheap cloning)
339        for session in sessions_to_add {
340            self.sessions.insert(session.id.clone(), Arc::new(session));
341        }
342
343        debug!(count = self.sessions.len(), "Sessions indexed");
344    }
345
346    /// Update degraded state based on load report
347    fn update_degraded_state(&self, report: &LoadReport) {
348        let mut state = self.degraded_state.write();
349
350        if report.has_fatal_errors() {
351            *state = DegradedState::ReadOnly {
352                reason: "Fatal errors during load".to_string(),
353            };
354            return;
355        }
356
357        let mut missing = Vec::new();
358
359        if !report.stats_loaded {
360            missing.push("stats".to_string());
361        }
362        if !report.settings_loaded {
363            missing.push("settings".to_string());
364        }
365        if report.sessions_failed > 0 {
366            missing.push(format!("{} sessions", report.sessions_failed));
367        }
368
369        if missing.is_empty() {
370            *state = DegradedState::Healthy;
371        } else {
372            *state = DegradedState::PartialData {
373                missing: missing.clone(),
374                reason: format!("Missing: {}", missing.join(", ")),
375            };
376        }
377    }
378
379    // ===================
380    // Read accessors
381    // ===================
382
383    /// Get a clone of stats
384    pub fn stats(&self) -> Option<StatsCache> {
385        self.stats.read().clone()
386    }
387
388    /// Calculate context window saturation from current sessions
389    pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
390        // Clone Arc (cheap) to avoid lifetime issues with DashMap iterators
391        let sessions: Vec<_> = self
392            .sessions
393            .iter()
394            .map(|entry| Arc::clone(entry.value()))
395            .collect();
396        // Dereference Arc to get &SessionMetadata
397        let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
398        crate::models::StatsCache::calculate_context_saturation(&refs, 30)
399    }
400
401    /// Get merged settings
402    pub fn settings(&self) -> MergedConfig {
403        self.settings.read().clone()
404    }
405
406    /// Get MCP server configuration
407    pub fn mcp_config(&self) -> Option<McpConfig> {
408        self.mcp_config.read().clone()
409    }
410
411    /// Get rules
412    pub fn rules(&self) -> Rules {
413        self.rules.read().clone()
414    }
415
416    /// Get invocation statistics
417    pub fn invocation_stats(&self) -> InvocationStats {
418        self.invocation_stats.read().clone()
419    }
420
421    /// Calculate current quota status from stats and budget config
422    ///
423    /// Returns None if stats are not loaded or budget is not configured.
424    pub fn quota_status(&self) -> Option<crate::quota::QuotaStatus> {
425        let stats = self.stats.read().clone()?;
426        let settings = self.settings.read();
427        let budget = settings.merged.budget.as_ref()?;
428
429        Some(crate::quota::calculate_quota_status(&stats, budget))
430    }
431
432    /// Get live Claude Code sessions (running processes)
433    ///
434    /// Detects active Claude processes on the system and returns metadata.
435    /// Returns empty vector if detection fails or no processes are running.
436    pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
437        crate::live_monitor::detect_live_sessions().unwrap_or_default()
438    }
439
440    /// Get session count
441    pub fn session_count(&self) -> usize {
442        self.sessions.len()
443    }
444
445    /// Get session by ID
446    /// Returns Arc<SessionMetadata> for cheap cloning
447    pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
448        self.sessions.get(id).map(|r| Arc::clone(r.value()))
449    }
450
451    /// Load full session content with lazy caching
452    ///
453    /// Returns conversation messages parsed from session JSONL file.
454    /// Uses Moka cache (LRU with 5min TTL) for repeated access.
455    ///
456    /// # Performance
457    /// - First call: Parse JSONL (~50-500ms for 1000-message session)
458    /// - Cached calls: <1ms (memory lookup)
459    /// - Cache eviction: LRU + 5min idle timeout
460    ///
461    /// # Errors
462    /// Returns CoreError if session not found or file cannot be read.
463    pub async fn load_session_content(
464        &self,
465        session_id: &str,
466    ) -> Result<Vec<crate::models::ConversationMessage>, CoreError> {
467        // Get session metadata
468        let metadata = self
469            .get_session(session_id)
470            .ok_or_else(|| CoreError::SessionNotFound {
471                session_id: session_id.to_string(),
472            })?;
473
474        // Try cache first (Moka handles concurrency internally)
475        let session_id_owned = SessionId::from(session_id);
476        if let Some(_cached) = self.session_content_cache.get(&session_id_owned).await {
477            debug!(session_id, "Session content cache HIT");
478            // TODO: Cache design decision - caching Vec<String> vs Vec<ConversationMessage>
479            // For now, always parse from file (will be optimized in cache phase)
480        }
481
482        // Cache miss: parse from file
483        debug!(
484            session_id,
485            path = %metadata.file_path.display(),
486            "Session content cache MISS, parsing JSONL"
487        );
488
489        let messages = SessionContentParser::parse_conversation(
490            &metadata.file_path,
491            (*metadata).clone(), // Clone metadata out of Arc
492        )
493        .await?;
494
495        // Note: Cache insertion skipped for now (caching Vec<String> vs Vec<ConversationMessage> design decision)
496        // Will be added in cache optimization phase
497
498        Ok(messages)
499    }
500
501    /// Get analytics data for a period (cached)
502    ///
503    /// Returns cached analytics if available, otherwise None.
504    /// Call `compute_analytics()` to compute and cache.
505    pub fn analytics(&self) -> Option<AnalyticsData> {
506        let analytics = self.analytics_cache.read().clone();
507        debug!(
508            has_analytics = analytics.is_some(),
509            "analytics() getter called"
510        );
511        analytics
512    }
513
514    /// Compute and cache analytics data for a period
515    ///
516    /// This is a CPU-intensive operation (trends, forecasting, patterns).
517    /// For 1000+ sessions, this may take 100-300ms, so it's offloaded
518    /// to a blocking task.
519    ///
520    /// Cache is invalidated on stats reload or session updates (EventBus pattern).
521    pub async fn compute_analytics(&self, period: Period) {
522        let sessions: Vec<_> = self
523            .sessions
524            .iter()
525            .map(|r| Arc::clone(r.value()))
526            .collect();
527
528        info!(
529            session_count = sessions.len(),
530            period = ?period,
531            "compute_analytics() ENTRY"
532        );
533
534        // Offload to blocking task for CPU-intensive computation
535        let analytics =
536            tokio::task::spawn_blocking(move || AnalyticsData::compute(&sessions, period)).await;
537
538        match analytics {
539            Ok(data) => {
540                info!(
541                    insights_count = data.insights.len(),
542                    "compute_analytics() computed data"
543                );
544                let mut guard = self.analytics_cache.write();
545                *guard = Some(data);
546                self.event_bus.publish(DataEvent::AnalyticsUpdated);
547                info!("compute_analytics() EXIT - cached and event published");
548            }
549            Err(e) => {
550                warn!(error = %e, "Failed to compute analytics (task panicked)");
551            }
552        }
553    }
554
555    /// Invalidate analytics cache (called on data changes)
556    ///
557    /// Note: Currently unused to prevent aggressive invalidation.
558    /// Kept for future use if smart invalidation is needed.
559    #[allow(dead_code)]
560    fn invalidate_analytics_cache(&self) {
561        let mut guard = self.analytics_cache.write();
562        *guard = None;
563        debug!("Analytics cache invalidated");
564    }
565
566    /// Get all session IDs
567    pub fn session_ids(&self) -> Vec<SessionId> {
568        self.sessions.iter().map(|r| r.key().clone()).collect()
569    }
570
571    /// Clear session content cache (for memory optimization on F5)
572    pub fn clear_session_content_cache(&self) {
573        self.session_content_cache.invalidate_all();
574        debug!("Session content cache cleared");
575    }
576
577    /// Get sessions grouped by project
578    /// Returns Arc<SessionMetadata> for cheap cloning
579    pub fn sessions_by_project(
580        &self,
581    ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
582        let mut by_project = std::collections::HashMap::new();
583
584        for entry in self.sessions.iter() {
585            let session = Arc::clone(entry.value());
586            by_project
587                .entry(session.project_path.as_str().to_string())
588                .or_insert_with(Vec::new)
589                .push(session);
590        }
591
592        // Sort sessions within each project by timestamp (newest first)
593        for sessions in by_project.values_mut() {
594            sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
595        }
596
597        by_project
598    }
599
600    /// Get all sessions (unsorted)
601    /// Returns Arc<SessionMetadata> for cheap cloning
602    pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
603        self.sessions
604            .iter()
605            .map(|r| Arc::clone(r.value()))
606            .collect()
607    }
608
609    /// Get recent sessions (sorted by last timestamp, newest first)
610    /// Returns Arc<SessionMetadata> for cheap cloning
611    pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
612        let mut sessions = self.all_sessions();
613        sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
614        sessions.truncate(limit);
615        sessions
616    }
617
618    /// Search sessions using FTS5 full-text search.
619    ///
620    /// Returns relevance-ranked results. Returns empty vec if FTS5 not initialized.
621    pub fn search_sessions(&self, query: &str, limit: usize) -> Vec<crate::cache::SearchResult> {
622        if let Some(ref cache) = self.metadata_cache {
623            match cache.search_sessions(query, limit) {
624                Ok(results) => results,
625                Err(e) => {
626                    warn!("FTS5 search failed: {}", e);
627                    Vec::new()
628                }
629            }
630        } else {
631            Vec::new()
632        }
633    }
634
635    /// Analyze a session's tool calls and generate activity summary + alerts.
636    ///
637    /// Results are stored in the in-memory DashMap and the SQLite cache.
638    /// Publishes DataEvent::AnalyticsUpdated on completion so the TUI re-renders.
639    pub async fn analyze_session(&self, session_id: &str) -> anyhow::Result<ActivitySummary> {
640        use std::time::SystemTime;
641
642        let metadata = self
643            .get_session(session_id)
644            .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
645
646        let path = &metadata.file_path;
647
648        // Read mtime once — used for both cache check and cache write to avoid TOCTOU.
649        // Use tokio::fs to avoid blocking the executor thread.
650        let mtime = tokio::fs::metadata(path)
651            .await
652            .and_then(|m| m.modified())
653            .unwrap_or(SystemTime::UNIX_EPOCH);
654
655        // Check SQLite cache first (avoids re-parsing unchanged files)
656        if let Some(cache) = &self.metadata_cache {
657            if let Ok(Some(cached)) = cache.get_activity(path, mtime) {
658                self.activity_results
659                    .insert(session_id.to_string(), cached.clone());
660                self.event_bus.publish(DataEvent::AnalyticsUpdated);
661                return Ok(cached);
662            }
663        }
664
665        // Cache miss: parse JSONL
666        let calls = parse_tool_calls(path, session_id).await?;
667
668        let project_root = path
669            .parent()
670            .and_then(|p| p.parent())
671            .map(|p| p.to_string_lossy().into_owned());
672
673        let summary = classify_tool_calls(calls, session_id, project_root.as_deref());
674
675        // Persist to SQLite cache — same mtime as used for cache check (no TOCTOU)
676        if let Some(cache) = &self.metadata_cache {
677            if let Err(e) = cache.put_activity(path, session_id, &summary, mtime) {
678                warn!(session_id, error = %e, "Failed to cache activity — will re-parse on restart");
679            }
680        }
681
682        // Store in memory + notify TUI
683        self.activity_results
684            .insert(session_id.to_string(), summary.clone());
685        self.event_bus.publish(DataEvent::AnalyticsUpdated);
686
687        Ok(summary)
688    }
689
690    /// Get the cached activity summary for a session (returns None if not yet analyzed).
691    pub fn get_session_activity(&self, session_id: &str) -> Option<ActivitySummary> {
692        self.activity_results
693            .get(session_id)
694            .map(|r| r.value().clone())
695    }
696
697    /// Get all stored security alerts from the SQLite cache.
698    ///
699    /// `min_severity`: optional filter — "Warning" or "Critical"
700    pub fn get_all_stored_alerts(&self, min_severity: Option<&str>) -> Vec<StoredAlert> {
701        if let Some(cache) = &self.metadata_cache {
702            cache.get_all_alerts(min_severity).unwrap_or_default()
703        } else {
704            vec![]
705        }
706    }
707
708    /// Consolidated violations feed: merges in-memory DashMap results (freshest) with
709    /// SQLite-persisted alerts. DashMap takes priority for sessions analyzed this run.
710    ///
711    /// Returns alerts sorted Critical → Warning → Info, then by timestamp descending.
712    pub fn all_violations(&self) -> Vec<crate::models::activity::Alert> {
713        use crate::models::activity::{Alert, AlertSeverity};
714        use std::collections::HashSet;
715
716        // Collect session_ids already covered by the DashMap (in-memory, freshest data)
717        let mut seen_sessions: HashSet<String> = HashSet::new();
718        let mut alerts: Vec<Alert> = Vec::new();
719
720        for entry in self.activity_results.iter() {
721            seen_sessions.insert(entry.key().clone());
722            alerts.extend(entry.value().alerts.clone());
723        }
724
725        // Supplement with SQLite alerts for sessions NOT in the DashMap
726        if let Some(cache) = &self.metadata_cache {
727            if let Ok(stored) = cache.get_all_alerts(None) {
728                for sa in stored {
729                    // Derive session_id from session_path (filename without extension)
730                    let session_id = std::path::Path::new(&sa.session_path)
731                        .file_stem()
732                        .and_then(|s| s.to_str())
733                        .unwrap_or(&sa.session_path)
734                        .to_string();
735
736                    if seen_sessions.contains(&session_id) {
737                        continue; // DashMap version is fresher, skip SQLite duplicate
738                    }
739
740                    // Parse severity and category from stored strings
741                    let severity = match sa.severity.as_str() {
742                        "Critical" => AlertSeverity::Critical,
743                        "Warning" => AlertSeverity::Warning,
744                        _ => AlertSeverity::Info,
745                    };
746                    let category = match sa.category.as_str() {
747                        "CredentialAccess" => {
748                            crate::models::activity::AlertCategory::CredentialAccess
749                        }
750                        "DestructiveCommand" => {
751                            crate::models::activity::AlertCategory::DestructiveCommand
752                        }
753                        "ForcePush" => crate::models::activity::AlertCategory::ForcePush,
754                        "ScopeViolation" => crate::models::activity::AlertCategory::ScopeViolation,
755                        _ => crate::models::activity::AlertCategory::ExternalExfil,
756                    };
757                    let timestamp = sa
758                        .timestamp
759                        .parse::<chrono::DateTime<chrono::Utc>>()
760                        .unwrap_or_else(|_| chrono::Utc::now());
761
762                    alerts.push(Alert {
763                        session_id,
764                        timestamp,
765                        severity,
766                        category,
767                        detail: sa.detail,
768                    });
769                }
770            }
771        }
772
773        // Sort: Critical > Warning > Info, then newest first within same severity
774        alerts.sort_by(|a, b| {
775            b.severity
776                .partial_cmp(&a.severity)
777                .unwrap_or(std::cmp::Ordering::Equal)
778                .then_with(|| b.timestamp.cmp(&a.timestamp))
779        });
780
781        alerts
782    }
783
784    /// Get top sessions by total tokens (sorted descending)
785    /// Returns Arc<SessionMetadata> for cheap cloning
786    pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
787        let mut sessions: Vec<_> = self
788            .sessions
789            .iter()
790            .map(|r| Arc::clone(r.value()))
791            .collect();
792        sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
793        sessions.truncate(limit);
794        sessions
795    }
796
797    /// Get top models by total tokens (aggregated, sorted descending)
798    /// Returns (model_name, total_tokens) pairs
799    pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
800        let mut model_totals = std::collections::HashMap::new();
801
802        // Aggregate tokens per model across all sessions
803        for session in self.sessions.iter() {
804            for model in &session.value().models_used {
805                *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
806            }
807        }
808
809        // Convert to vec and sort
810        let mut results: Vec<_> = model_totals.into_iter().collect();
811        results.sort_by(|a, b| b.1.cmp(&a.1));
812        results.truncate(10); // Top 10
813        results
814    }
815
816    /// Get top days by total tokens (aggregated, sorted descending)
817    /// Returns (date_string, total_tokens) pairs
818    pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
819        let mut day_totals = std::collections::HashMap::new();
820
821        // Aggregate tokens per day across all sessions
822        for session in self.sessions.iter() {
823            if let Some(timestamp) = &session.value().first_timestamp {
824                let date = timestamp.format("%Y-%m-%d").to_string();
825                *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
826            }
827        }
828
829        // Convert to vec and sort
830        let mut results: Vec<_> = day_totals.into_iter().collect();
831        results.sort_by(|a, b| b.1.cmp(&a.1));
832        results.truncate(10); // Top 10
833        results
834    }
835
836    /// Get project leaderboard with aggregated metrics
837    ///
838    /// Returns all projects with session count, total tokens, total cost, and average session cost.
839    /// Cost is calculated using accurate model-based pricing from the pricing module.
840    pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
841        let mut project_metrics = std::collections::HashMap::new();
842
843        // Aggregate metrics per project
844        for session in self.sessions.iter() {
845            let metadata = session.value();
846            let project_path = &metadata.project_path;
847
848            // Get model for this session (use first model, or "unknown")
849            let model = metadata
850                .models_used
851                .first()
852                .map(|s| s.as_str())
853                .unwrap_or("unknown");
854
855            // Calculate cost using accurate pricing
856            let cost = crate::pricing::calculate_cost(
857                model,
858                metadata.input_tokens,
859                metadata.output_tokens,
860                metadata.cache_creation_tokens,
861                metadata.cache_read_tokens,
862            );
863
864            let entry = project_metrics
865                .entry(project_path.clone())
866                .or_insert((0, 0u64, 0.0f64)); // (session_count, total_tokens, total_cost)
867
868            entry.0 += 1; // session count
869            entry.1 += metadata.total_tokens; // total tokens
870            entry.2 += cost; // total cost
871        }
872
873        // Convert to leaderboard entries
874        let mut results: Vec<_> = project_metrics
875            .into_iter()
876            .map(
877                |(project_path, (session_count, total_tokens, total_cost))| {
878                    let avg_session_cost = if session_count > 0 {
879                        total_cost / session_count as f64
880                    } else {
881                        0.0
882                    };
883
884                    // Extract project name from path (last component)
885                    let project_name = std::path::Path::new(project_path.as_str())
886                        .file_name()
887                        .and_then(|n| n.to_str())
888                        .unwrap_or(project_path.as_str())
889                        .to_string();
890
891                    ProjectLeaderboardEntry {
892                        project_name,
893                        total_sessions: session_count,
894                        total_tokens,
895                        total_cost,
896                        avg_session_cost,
897                    }
898                },
899            )
900            .collect();
901
902        // Default sort: by total cost descending
903        results.sort_by(|a, b| {
904            b.total_cost
905                .partial_cmp(&a.total_cost)
906                .unwrap_or(std::cmp::Ordering::Equal)
907        });
908
909        results
910    }
911
912    // ===================
913    // Update methods (called by watcher)
914    // ===================
915
916    /// Reload stats (called on file change)
917    pub async fn reload_stats(&self) {
918        let stats_path = self.claude_home.join("stats-cache.json");
919        let parser = StatsParser::new()
920            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
921
922        let mut report = LoadReport::new();
923        if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
924            // Recalculate costs using accurate pricing
925            stats.recalculate_costs();
926            let mut guard = self.stats.write();
927            *guard = Some(stats);
928
929            // Don't invalidate analytics - it will auto-recompute if needed
930            // Instead, just publish the event so UI can decide whether to recompute
931            self.event_bus.publish(DataEvent::StatsUpdated);
932            debug!("Stats reloaded with recalculated costs");
933        }
934    }
935
936    /// Reload settings from files (called when settings change)
937    pub async fn reload_settings(&self) {
938        let parser = SettingsParser::new();
939        let merged = parser
940            .load_merged(
941                &self.claude_home,
942                self.project_path.as_deref(),
943                &mut LoadReport::new(),
944            )
945            .await;
946
947        {
948            let mut guard = self.settings.write();
949            *guard = merged;
950        }
951
952        self.event_bus
953            .publish(DataEvent::ConfigChanged(ConfigScope::Global));
954        debug!("Settings reloaded");
955    }
956
957    /// Add or update a session (called when session file changes)
958    pub async fn update_session(&self, path: &Path) {
959        let parser = SessionIndexParser::new();
960
961        match parser.scan_session(path).await {
962            Ok(meta) => {
963                let id = meta.id.clone();
964                let is_new = !self.sessions.contains_key(&id);
965
966                self.sessions.insert(id.clone(), Arc::new(meta));
967
968                // Don't invalidate analytics on every session update - too aggressive
969                // Analytics will be recomputed on demand or periodically
970                // Only invalidate on significant changes (detected by UI)
971
972                if is_new {
973                    self.event_bus.publish(DataEvent::SessionCreated(id));
974                } else {
975                    self.event_bus.publish(DataEvent::SessionUpdated(id));
976                }
977            }
978            Err(e) => {
979                warn!(path = %path.display(), error = %e, "Failed to update session");
980            }
981        }
982    }
983
984    /// Compute invocation statistics from all sessions
985    ///
986    /// This scans all session files to count agent/command/skill invocations.
987    /// Should be called after initial load or when sessions are updated.
988    pub async fn compute_invocations(&self) {
989        let paths: Vec<_> = self
990            .sessions
991            .iter()
992            .map(|r| r.value().file_path.clone())
993            .collect();
994
995        debug!(session_count = paths.len(), "Computing invocation stats");
996
997        let parser = InvocationParser::new();
998        let stats = parser.scan_sessions(&paths).await;
999
1000        let mut guard = self.invocation_stats.write();
1001        *guard = stats;
1002
1003        debug!(
1004            agents = guard.agents.len(),
1005            commands = guard.commands.len(),
1006            skills = guard.skills.len(),
1007            total = guard.total_invocations(),
1008            "Invocation stats computed"
1009        );
1010
1011        // Note: Using LoadCompleted as there's no specific invocation stats event
1012        self.event_bus.publish(DataEvent::LoadCompleted);
1013    }
1014
1015    /// Compute billing blocks from all sessions
1016    ///
1017    /// This scans all sessions with timestamps and aggregates usage into 5-hour billing blocks.
1018    /// Uses real model pricing based on token breakdown for accurate cost calculation.
1019    pub async fn compute_billing_blocks(&self) {
1020        debug!("Computing billing blocks from sessions with real pricing");
1021
1022        let mut manager = BillingBlockManager::new();
1023        let mut sessions_with_timestamps = 0;
1024        let mut sessions_without_timestamps = 0;
1025
1026        for session in self.sessions.iter() {
1027            let metadata = session.value();
1028
1029            // Skip sessions without timestamps
1030            let Some(timestamp) = &metadata.first_timestamp else {
1031                sessions_without_timestamps += 1;
1032                continue;
1033            };
1034
1035            sessions_with_timestamps += 1;
1036
1037            // Get model for this session (use first model, or "unknown")
1038            let model = metadata
1039                .models_used
1040                .first()
1041                .map(|s| s.as_str())
1042                .unwrap_or("unknown");
1043
1044            // Calculate real cost using pricing table
1045            let cost = crate::pricing::calculate_cost(
1046                model,
1047                metadata.input_tokens,
1048                metadata.output_tokens,
1049                metadata.cache_creation_tokens,
1050                metadata.cache_read_tokens,
1051            );
1052
1053            manager.add_usage(
1054                timestamp,
1055                metadata.input_tokens,
1056                metadata.output_tokens,
1057                metadata.cache_creation_tokens,
1058                metadata.cache_read_tokens,
1059                cost,
1060            );
1061        }
1062
1063        debug!(
1064            sessions_with_timestamps,
1065            sessions_without_timestamps,
1066            blocks = manager.get_all_blocks().len(),
1067            "Billing blocks computed with real pricing"
1068        );
1069
1070        let mut guard = self.billing_blocks.write();
1071        *guard = manager;
1072
1073        self.event_bus.publish(DataEvent::LoadCompleted);
1074    }
1075
1076    /// Get billing blocks (read-only access)
1077    pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
1078        self.billing_blocks.read()
1079    }
1080
1081    /// Calculate usage estimate based on billing blocks and subscription plan
1082    pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
1083        let settings = self.settings();
1084        let plan = settings
1085            .merged
1086            .subscription_plan
1087            .as_ref()
1088            .map(|s| crate::usage_estimator::SubscriptionPlan::parse(s))
1089            .unwrap_or_default();
1090
1091        let billing_blocks = self.billing_blocks.read();
1092        crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
1093    }
1094
1095    /// Load ccboard user preferences from the cache directory.
1096    pub fn load_preferences(&self) -> crate::preferences::CcboardPreferences {
1097        let cache_dir = self.claude_home.join("cache");
1098        crate::preferences::CcboardPreferences::load(&cache_dir)
1099    }
1100
1101    /// Save ccboard user preferences to the cache directory.
1102    pub fn save_preferences(
1103        &self,
1104        prefs: &crate::preferences::CcboardPreferences,
1105    ) -> anyhow::Result<()> {
1106        let cache_dir = self.claude_home.join("cache");
1107        prefs.save(&cache_dir)
1108    }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113    use super::*;
1114    use tempfile::tempdir;
1115
1116    #[tokio::test]
1117    async fn test_data_store_creation() {
1118        let dir = tempdir().unwrap();
1119        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1120
1121        assert_eq!(store.session_count(), 0);
1122        assert!(store.stats().is_none());
1123        assert!(store.degraded_state().is_healthy());
1124    }
1125
1126    #[tokio::test]
1127    async fn test_initial_load_missing_dir() {
1128        let dir = tempdir().unwrap();
1129        let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
1130
1131        let report = store.initial_load().await;
1132
1133        // Should have warnings but not crash
1134        assert!(report.has_errors());
1135        assert!(store.degraded_state().is_degraded());
1136    }
1137
1138    #[tokio::test]
1139    async fn test_initial_load_with_stats() {
1140        let dir = tempdir().unwrap();
1141        let claude_home = dir.path();
1142
1143        // Create stats file with new format
1144        std::fs::write(
1145            claude_home.join("stats-cache.json"),
1146            r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
1147        )
1148        .unwrap();
1149
1150        // Create projects dir
1151        std::fs::create_dir_all(claude_home.join("projects")).unwrap();
1152
1153        let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
1154        let report = store.initial_load().await;
1155
1156        assert!(report.stats_loaded);
1157        let stats = store.stats().unwrap();
1158        assert_eq!(stats.total_tokens(), 1000);
1159        assert_eq!(stats.session_count(), 5);
1160    }
1161
1162    #[tokio::test]
1163    async fn test_event_bus_subscription() {
1164        let dir = tempdir().unwrap();
1165        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1166
1167        let mut rx = store.event_bus().subscribe();
1168
1169        // Trigger load completed
1170        store.event_bus().publish(DataEvent::StatsUpdated);
1171
1172        let event = rx.recv().await.unwrap();
1173        assert!(matches!(event, DataEvent::StatsUpdated));
1174    }
1175
1176    #[tokio::test]
1177    async fn test_analytics_cache_and_invalidation() {
1178        use crate::models::session::SessionMetadata;
1179        use chrono::Utc;
1180
1181        let dir = tempdir().unwrap();
1182        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1183
1184        // Add test sessions
1185        let now = Utc::now();
1186        for i in 0..10 {
1187            let total_tokens = 1000 * (i as u64 + 1);
1188            let session = SessionMetadata {
1189                id: format!("test-{}", i).into(),
1190                file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
1191                project_path: "/test".into(),
1192                first_timestamp: Some(now - chrono::Duration::days(i)),
1193                last_timestamp: Some(now),
1194                message_count: 10,
1195                total_tokens,
1196                input_tokens: total_tokens / 2,
1197                output_tokens: total_tokens / 3,
1198                cache_creation_tokens: total_tokens / 10,
1199                cache_read_tokens: total_tokens
1200                    - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
1201                models_used: vec!["sonnet".to_string()],
1202                file_size_bytes: 1024,
1203                first_user_message: None,
1204                has_subagents: false,
1205                duration_seconds: Some(1800),
1206                branch: None,
1207                tool_usage: std::collections::HashMap::new(),
1208            };
1209            store.sessions.insert(session.id.clone(), Arc::new(session));
1210        }
1211
1212        // Initially no analytics
1213        assert!(store.analytics().is_none());
1214
1215        // Compute analytics
1216        store.compute_analytics(Period::last_7d()).await;
1217
1218        // Analytics should be cached
1219        let analytics1 = store.analytics().expect("Analytics should be cached");
1220        assert!(!analytics1.trends.is_empty());
1221        assert_eq!(analytics1.period, Period::last_7d());
1222
1223        // Invalidate by reloading stats
1224        store.invalidate_analytics_cache();
1225        assert!(store.analytics().is_none(), "Cache should be invalidated");
1226
1227        // Re-compute with different period
1228        store.compute_analytics(Period::last_30d()).await;
1229        let analytics2 = store.analytics().expect("Analytics should be re-cached");
1230        assert_eq!(analytics2.period, Period::last_30d());
1231    }
1232
1233    #[tokio::test]
1234    async fn test_leaderboard_methods() {
1235        use crate::models::session::SessionMetadata;
1236        use chrono::Utc;
1237
1238        let dir = tempdir().unwrap();
1239        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1240
1241        let now = Utc::now();
1242
1243        // Add sessions with varying tokens
1244        let test_data = vec![
1245            ("session-1", 5000u64, "opus", 0),
1246            ("session-2", 3000u64, "sonnet", 1),
1247            ("session-3", 8000u64, "haiku", 0),
1248            ("session-4", 2000u64, "sonnet", 2),
1249            ("session-5", 10000u64, "opus", 0),
1250        ];
1251
1252        for (id, tokens, model, days_ago) in test_data {
1253            let session = SessionMetadata {
1254                id: id.into(),
1255                file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1256                project_path: "/test".into(),
1257                first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1258                last_timestamp: Some(now),
1259                message_count: 10,
1260                total_tokens: tokens,
1261                input_tokens: tokens / 2,
1262                output_tokens: tokens / 2,
1263                cache_creation_tokens: 0,
1264                cache_read_tokens: 0,
1265                models_used: vec![model.to_string()],
1266                file_size_bytes: 1024,
1267                first_user_message: None,
1268                has_subagents: false,
1269                duration_seconds: Some(1800),
1270                branch: None,
1271                tool_usage: std::collections::HashMap::new(),
1272            };
1273            store.sessions.insert(session.id.clone(), Arc::new(session));
1274        }
1275
1276        // Test top_sessions_by_tokens
1277        let top_sessions = store.top_sessions_by_tokens(3);
1278        assert_eq!(top_sessions.len(), 3);
1279        assert_eq!(top_sessions[0].id, "session-5"); // 10000 tokens
1280        assert_eq!(top_sessions[1].id, "session-3"); // 8000 tokens
1281        assert_eq!(top_sessions[2].id, "session-1"); // 5000 tokens
1282
1283        // Test top_models_by_tokens
1284        let top_models = store.top_models_by_tokens();
1285        assert!(!top_models.is_empty());
1286        // opus: 15000 (5000+10000), sonnet: 5000 (3000+2000), haiku: 8000
1287        assert_eq!(top_models[0].0, "opus");
1288        assert_eq!(top_models[0].1, 15000);
1289        assert_eq!(top_models[1].0, "haiku");
1290        assert_eq!(top_models[1].1, 8000);
1291
1292        // Test top_days_by_tokens
1293        let top_days = store.top_days_by_tokens();
1294        assert!(!top_days.is_empty());
1295        // Day 0 (today): 5000+8000+10000 = 23000
1296        let today = now.format("%Y-%m-%d").to_string();
1297        assert_eq!(top_days[0].0, today);
1298        assert_eq!(top_days[0].1, 23000);
1299    }
1300
1301    /// C3: DashMap takes priority over SQLite in all_violations()
1302    ///
1303    /// Verifies the merge strategy:
1304    /// - Same session_id in both → DashMap version returned (fresher)
1305    /// - Session only in SQLite → SQLite version returned (fills the gap)
1306    #[test]
1307    fn test_all_violations_dashmap_priority_over_sqlite() {
1308        use crate::models::activity::{ActivitySummary, Alert, AlertCategory, AlertSeverity};
1309        use chrono::Utc;
1310
1311        let dir = tempdir().unwrap();
1312        let claude_home = dir.path().to_path_buf();
1313        let store = DataStore::with_defaults(claude_home.clone(), None);
1314
1315        let now = Utc::now();
1316
1317        // ── Setup: SQLite alert (via MetadataCache directly) ──────────────────
1318        let cache = MetadataCache::new(&claude_home.join("cache"))
1319            .expect("MetadataCache should open in tempdir");
1320
1321        // "shared-session" exists in SQLite with a Warning-level alert
1322        let sqlite_summary = ActivitySummary {
1323            alerts: vec![Alert {
1324                session_id: "shared-session".to_string(),
1325                timestamp: now,
1326                severity: AlertSeverity::Warning,
1327                category: AlertCategory::DestructiveCommand,
1328                detail: "sqlite-version".to_string(),
1329            }],
1330            ..Default::default()
1331        };
1332        cache
1333            .put_activity(
1334                std::path::Path::new("/projects/test/shared-session.jsonl"),
1335                "shared-session",
1336                &sqlite_summary,
1337                std::time::SystemTime::now(),
1338            )
1339            .expect("put_activity should succeed");
1340
1341        // "sqlite-only-session" exists exclusively in SQLite
1342        let sqlite_only_summary = ActivitySummary {
1343            alerts: vec![Alert {
1344                session_id: "sqlite-only-session".to_string(),
1345                timestamp: now,
1346                severity: AlertSeverity::Info,
1347                category: AlertCategory::ExternalExfil,
1348                detail: "sqlite-only-detail".to_string(),
1349            }],
1350            ..Default::default()
1351        };
1352        cache
1353            .put_activity(
1354                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1355                "sqlite-only-session",
1356                &sqlite_only_summary,
1357                std::time::SystemTime::now(),
1358            )
1359            .expect("put_activity should succeed");
1360
1361        // Attach the same DB to the store's metadata_cache field
1362        // (accessible from within the same module in #[cfg(test)])
1363        // The store already opened the same cache dir during with_defaults(),
1364        // so both share the same SQLite file.
1365        // We must write through the *store*'s own cache handle to avoid lock
1366        // conflicts. Retrieve it:
1367        let store_cache = store
1368            .metadata_cache
1369            .as_ref()
1370            .expect("MetadataCache should be present in store");
1371
1372        store_cache
1373            .put_activity(
1374                std::path::Path::new("/projects/test/shared-session.jsonl"),
1375                "shared-session",
1376                &sqlite_summary,
1377                std::time::SystemTime::now(),
1378            )
1379            .expect("put_activity via store cache should succeed");
1380        store_cache
1381            .put_activity(
1382                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1383                "sqlite-only-session",
1384                &sqlite_only_summary,
1385                std::time::SystemTime::now(),
1386            )
1387            .expect("put_activity via store cache should succeed");
1388
1389        // ── Setup: DashMap alert for the shared session (fresher, Critical) ───
1390        let dashmap_summary = ActivitySummary {
1391            alerts: vec![Alert {
1392                session_id: "shared-session".to_string(),
1393                timestamp: now,
1394                severity: AlertSeverity::Critical,
1395                category: AlertCategory::ForcePush,
1396                detail: "dashmap-version".to_string(),
1397            }],
1398            ..Default::default()
1399        };
1400        store
1401            .activity_results
1402            .insert("shared-session".to_string(), dashmap_summary);
1403
1404        // ── Assert ────────────────────────────────────────────────────────────
1405        let violations = store.all_violations();
1406
1407        // The DashMap version must appear
1408        let dashmap_hit = violations.iter().find(|a| a.session_id == "shared-session");
1409        assert!(
1410            dashmap_hit.is_some(),
1411            "shared-session alert must appear in violations"
1412        );
1413        assert_eq!(
1414            dashmap_hit.unwrap().detail,
1415            "dashmap-version",
1416            "DashMap version must take priority over SQLite for shared session"
1417        );
1418        assert_eq!(
1419            dashmap_hit.unwrap().severity,
1420            AlertSeverity::Critical,
1421            "DashMap severity (Critical) must win over SQLite (Warning)"
1422        );
1423
1424        // The SQLite-only version must appear (fills the gap)
1425        let sqlite_hit = violations
1426            .iter()
1427            .find(|a| a.session_id == "sqlite-only-session");
1428        assert!(
1429            sqlite_hit.is_some(),
1430            "sqlite-only-session must appear in violations (no DashMap entry for it)"
1431        );
1432        assert_eq!(sqlite_hit.unwrap().detail, "sqlite-only-detail");
1433
1434        // The SQLite version of shared-session must NOT appear
1435        let sqlite_dup = violations
1436            .iter()
1437            .filter(|a| a.session_id == "shared-session")
1438            .count();
1439        assert_eq!(
1440            sqlite_dup, 1,
1441            "shared-session must appear exactly once (DashMap wins, no SQLite duplicate)"
1442        );
1443
1444        // Sorting: Critical before Info
1445        assert_eq!(violations[0].severity, AlertSeverity::Critical);
1446    }
1447}