Skip to main content

ccboard_core/
store.rs

1//! Data store with DashMap + parking_lot::RwLock
2//!
3//! Uses DashMap for sessions (per-entry locking) and parking_lot::RwLock
4//! for stats/settings (better fairness than std::sync::RwLock).
5
6use crate::analytics::{AnalyticsData, Period};
7use crate::bookmarks::BookmarkStore;
8use crate::cache::{MetadataCache, StoredAlert};
9use crate::error::{CoreError, DegradedState, LoadReport};
10use crate::event::{DataEvent, EventBus};
11use crate::models::activity::ActivitySummary;
12use crate::models::{
13    BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
14};
15use crate::parsers::{
16    classify_tool_calls, parse_claude_global, parse_tool_calls, ClaudeGlobalStats, CodexParser,
17    CursorParser, InvocationParser, McpConfig, OpenCodeParser, Rules, SessionContentParser,
18    SessionIndexParser, SettingsParser, StatsParser,
19};
20use dashmap::DashMap;
21use moka::future::Cache;
22use parking_lot::RwLock; // parking_lot > std::sync::RwLock: smaller (40B vs 72B), no poisoning, better fairness
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use std::time::Duration;
26use tracing::{debug, info, warn};
27
28/// Configuration for the data store
29#[derive(Debug, Clone)]
30pub struct DataStoreConfig {
31    /// Maximum session metadata entries to keep
32    pub max_session_metadata_count: usize,
33
34    /// Maximum size for session content cache in MB
35    pub max_session_content_cache_mb: usize,
36
37    /// Maximum concurrent session scans
38    pub max_concurrent_scans: usize,
39
40    /// Stats parser retry count
41    pub stats_retry_count: u32,
42
43    /// Stats parser retry delay
44    pub stats_retry_delay: Duration,
45}
46
47impl Default for DataStoreConfig {
48    fn default() -> Self {
49        Self {
50            max_session_metadata_count: 10_000,
51            max_session_content_cache_mb: 100,
52            max_concurrent_scans: 8,
53            stats_retry_count: 3,
54            stats_retry_delay: Duration::from_millis(100),
55        }
56    }
57}
58
59/// Per-server MCP usage statistics aggregated from analyzed sessions
60#[derive(Debug, Clone)]
61pub struct McpCallStat {
62    pub server_name: String,
63    pub call_count: usize,
64    pub session_count: usize,
65    pub last_seen: Option<chrono::DateTime<chrono::Utc>>,
66}
67
68/// Central data store for ccboard
69///
70/// Thread-safe access to all Claude Code data.
71/// Uses DashMap for sessions (high contention) and RwLock for stats/settings (low contention).
72pub struct DataStore {
73    /// Path to Claude home directory
74    claude_home: PathBuf,
75
76    /// Current project path (if focused)
77    project_path: Option<PathBuf>,
78
79    /// Configuration
80    config: DataStoreConfig,
81
82    /// Stats cache (low contention, frequent reads)
83    stats: RwLock<Option<StatsCache>>,
84
85    /// Merged settings
86    settings: RwLock<MergedConfig>,
87
88    /// MCP server configuration
89    mcp_config: RwLock<Option<McpConfig>>,
90
91    /// Rules from CLAUDE.md files
92    rules: RwLock<Rules>,
93
94    /// Invocation statistics (agents, commands, skills)
95    invocation_stats: RwLock<InvocationStats>,
96
97    /// Billing blocks (5h usage tracking)
98    billing_blocks: RwLock<BillingBlockManager>,
99
100    /// Analytics data cache (invalidated on stats/sessions update)
101    analytics_cache: RwLock<Option<AnalyticsData>>,
102
103    /// Discover pattern analysis cache
104    discover_cache: RwLock<Option<Vec<crate::analytics::DiscoverSuggestion>>>,
105
106    /// Session metadata (high contention with many entries)
107    /// Arc<SessionMetadata> for cheap cloning (8 bytes vs ~400 bytes)
108    ///
109    /// Why Arc over Box: Multi-thread access from TUI + Web frontends
110    /// justifies atomic refcount overhead (~4 bytes). Box would require
111    /// cloning entire struct on each frontend access.
112    sessions: DashMap<SessionId, Arc<SessionMetadata>>,
113
114    /// Session content cache (LRU for on-demand loading)
115    #[allow(dead_code)]
116    session_content_cache: Cache<SessionId, Vec<String>>,
117
118    /// Event bus for notifying subscribers
119    event_bus: EventBus,
120
121    /// Current degraded state
122    degraded_state: RwLock<DegradedState>,
123
124    /// Metadata cache for 90% startup speedup (optional)
125    metadata_cache: Option<Arc<MetadataCache>>,
126
127    /// In-memory activity analysis results (populated by analyze_session)
128    activity_results: DashMap<String, ActivitySummary>,
129
130    /// Hook-based live session state (loaded from ~/.ccboard/live-sessions.json)
131    live_hook_sessions: RwLock<crate::hook_state::LiveSessionFile>,
132
133    /// Per-project last session stats from ~/.claude.json
134    claude_global_stats: RwLock<Option<ClaudeGlobalStats>>,
135
136    /// Session bookmarks persisted to ~/.ccboard/bookmarks.json
137    bookmark_store: RwLock<BookmarkStore>,
138
139    /// Summary store — reads cached summaries from ~/.ccboard/summaries/
140    summary_store: crate::summaries::SummaryStore,
141}
142
143/// Project leaderboard entry with aggregated metrics
144#[derive(Debug, Clone)]
145pub struct ProjectLeaderboardEntry {
146    pub project_name: String,
147    pub total_sessions: usize,
148    pub total_tokens: u64,
149    pub total_cost: f64,
150    pub avg_session_cost: f64,
151}
152
153impl DataStore {
154    /// Create a new data store
155    pub fn new(
156        claude_home: PathBuf,
157        project_path: Option<PathBuf>,
158        config: DataStoreConfig,
159    ) -> Self {
160        let session_content_cache = Cache::builder()
161            .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) // Rough estimate
162            .time_to_idle(Duration::from_secs(300)) // 5 min idle expiry
163            .build();
164
165        // Resolve ~/.ccboard dir (sibling of claude_home which is ~/.claude)
166        let ccboard_dir = claude_home
167            .parent()
168            .unwrap_or(&claude_home)
169            .join(".ccboard");
170
171        // Load bookmark store from ~/.ccboard/bookmarks.json
172        let bookmark_store = {
173            let bookmarks_path = ccboard_dir.join("bookmarks.json");
174            match BookmarkStore::load(&bookmarks_path) {
175                Ok(store) => store,
176                Err(e) => {
177                    warn!(error = %e, "Failed to load bookmark store, starting empty");
178                    BookmarkStore::default()
179                }
180            }
181        };
182
183        // Create metadata cache in ~/.claude/cache/
184        let metadata_cache = {
185            let cache_dir = claude_home.join("cache");
186            match MetadataCache::new(&cache_dir) {
187                Ok(cache) => {
188                    debug!(path = %cache_dir.display(), "Metadata cache enabled");
189                    Some(Arc::new(cache))
190                }
191                Err(e) => {
192                    warn!(error = %e, "Failed to create metadata cache, running without cache");
193                    None
194                }
195            }
196        };
197
198        Self {
199            claude_home,
200            project_path,
201            config,
202            stats: RwLock::new(None),
203            settings: RwLock::new(MergedConfig::default()),
204            mcp_config: RwLock::new(None),
205            rules: RwLock::new(Rules::default()),
206            invocation_stats: RwLock::new(InvocationStats::new()),
207            billing_blocks: RwLock::new(BillingBlockManager::new()),
208            analytics_cache: RwLock::new(None),
209            discover_cache: RwLock::new(None),
210            sessions: DashMap::new(),
211            session_content_cache,
212            event_bus: EventBus::default_capacity(),
213            degraded_state: RwLock::new(DegradedState::Healthy),
214            metadata_cache,
215            activity_results: DashMap::new(),
216            live_hook_sessions: RwLock::new(crate::hook_state::LiveSessionFile::default()),
217            claude_global_stats: RwLock::new(None),
218            bookmark_store: RwLock::new(bookmark_store),
219            summary_store: crate::summaries::SummaryStore::new(&ccboard_dir),
220        }
221    }
222
223    /// Create with default configuration
224    pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
225        Self::new(claude_home, project_path, DataStoreConfig::default())
226    }
227
228    /// Get the event bus for subscribing to updates
229    pub fn event_bus(&self) -> &EventBus {
230        &self.event_bus
231    }
232
233    /// Get current degraded state
234    pub fn degraded_state(&self) -> DegradedState {
235        self.degraded_state.read().clone()
236    }
237
238    /// Initial load of all data with LoadReport for graceful degradation
239    pub async fn initial_load(&self) -> LoadReport {
240        let mut report = LoadReport::new();
241
242        info!(claude_home = %self.claude_home.display(), "Starting initial data load");
243
244        // Load stats
245        self.load_stats(&mut report).await;
246
247        // Load ~/.claude.json global stats (per-project last session costs)
248        if let Some(home) = dirs::home_dir() {
249            if let Some(global) = parse_claude_global(&home) {
250                *self.claude_global_stats.write() = Some(global);
251                debug!("~/.claude.json loaded successfully");
252            }
253        }
254
255        // Load settings
256        self.load_settings(&mut report).await;
257
258        // Load MCP configuration
259        self.load_mcp_config(&mut report).await;
260
261        // Load rules
262        self.load_rules(&mut report).await;
263
264        // Scan sessions
265        self.scan_sessions(&mut report).await;
266
267        // Scan third-party AI tool sessions (Codex, OpenCode, Cursor)
268        self.scan_third_party_sessions(&mut report).await;
269
270        // Determine degraded state
271        self.update_degraded_state(&report);
272
273        // Notify subscribers
274        self.event_bus.publish(DataEvent::LoadCompleted);
275
276        info!(
277            stats_loaded = report.stats_loaded,
278            settings_loaded = report.settings_loaded,
279            sessions_scanned = report.sessions_scanned,
280            sessions_failed = report.sessions_failed,
281            errors = report.errors.len(),
282            "Initial load complete"
283        );
284
285        // Backfill has_subagents after all sessions are indexed
286        self.compute_has_subagents();
287
288        report
289    }
290
291    /// Load stats cache
292    async fn load_stats(&self, report: &mut LoadReport) {
293        let stats_path = self.claude_home.join("stats-cache.json");
294        let parser = StatsParser::new()
295            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
296
297        if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
298            // Recalculate costs using accurate pricing
299            stats.recalculate_costs();
300            let mut guard = self.stats.write();
301            *guard = Some(stats);
302            debug!("Stats loaded successfully with recalculated costs");
303        }
304    }
305
306    /// Load and merge settings
307    async fn load_settings(&self, report: &mut LoadReport) {
308        let parser = SettingsParser::new();
309        let merged = parser
310            .load_merged(&self.claude_home, self.project_path.as_deref(), report)
311            .await;
312
313        let mut guard = self.settings.write();
314        *guard = merged;
315        debug!("Settings loaded and merged");
316    }
317
318    /// Load MCP server configuration (global + project-level)
319    async fn load_mcp_config(&self, report: &mut LoadReport) {
320        match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
321            Ok(Some(config)) => {
322                let server_count = config.servers.len();
323                let mut guard = self.mcp_config.write();
324                *guard = Some(config);
325                debug!(
326                    server_count,
327                    "MCP config loaded successfully (global + project)"
328                );
329            }
330            Ok(None) => {
331                debug!("No MCP config found (optional)");
332            }
333            Err(e) => {
334                use crate::error::LoadError;
335                report.add_error(LoadError::error(
336                    "mcp_config",
337                    format!("Failed to parse MCP config: {}", e),
338                ));
339            }
340        }
341    }
342
343    /// Load rules from CLAUDE.md files
344    async fn load_rules(&self, report: &mut LoadReport) {
345        match Rules::load(&self.claude_home, self.project_path.as_deref()) {
346            Ok(rules) => {
347                let has_global = rules.global.is_some();
348                let has_project = rules.project.is_some();
349                let mut guard = self.rules.write();
350                *guard = rules;
351                debug!(has_global, has_project, "Rules loaded");
352            }
353            Err(e) => {
354                use crate::error::LoadError;
355                report.add_error(LoadError::error(
356                    "rules",
357                    format!("Failed to load rules: {}", e),
358                ));
359            }
360        }
361    }
362
363    /// Scan all sessions
364    async fn scan_sessions(&self, report: &mut LoadReport) {
365        let projects_dir = self.claude_home.join("projects");
366
367        if !projects_dir.exists() {
368            report.add_warning(
369                "sessions",
370                format!("Projects directory not found: {}", projects_dir.display()),
371            );
372            return;
373        }
374
375        let mut parser =
376            SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
377
378        // Enable metadata cache if available (90% speedup)
379        if let Some(ref cache) = self.metadata_cache {
380            parser = parser.with_cache(cache.clone());
381        }
382
383        let sessions = parser.scan_all(&projects_dir, report).await;
384
385        // Enforce max count limit
386        let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
387            warn!(
388                total = sessions.len(),
389                limit = self.config.max_session_metadata_count,
390                "Session count exceeds limit, keeping most recent"
391            );
392
393            let mut sorted = sessions;
394            sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
395            sorted.truncate(self.config.max_session_metadata_count);
396            sorted
397        } else {
398            sessions
399        };
400
401        // Insert into DashMap (wrap in Arc for cheap cloning)
402        for session in sessions_to_add {
403            self.sessions.insert(session.id.clone(), Arc::new(session));
404        }
405
406        debug!(count = self.sessions.len(), "Sessions indexed");
407    }
408
409    /// Scan third-party AI tool sessions (Codex, OpenCode, Cursor).
410    ///
411    /// Each parser is skipped silently when its data directory does not exist,
412    /// which is the common case for users who only use Claude Code.
413    async fn scan_third_party_sessions(&self, _report: &mut LoadReport) {
414        // Codex
415        if let Some(codex_dir) = CodexParser::default_path() {
416            if codex_dir.exists() {
417                let sessions = CodexParser::scan(&codex_dir).await;
418                let count = sessions.len();
419                for s in sessions {
420                    self.sessions.insert(s.id.clone(), Arc::new(s));
421                }
422                debug!(count, "Codex sessions indexed");
423            }
424        }
425
426        // OpenCode
427        if let Some(db_path) = OpenCodeParser::default_path() {
428            if db_path.exists() {
429                let sessions = OpenCodeParser::scan(&db_path);
430                let count = sessions.len();
431                for s in sessions {
432                    self.sessions.insert(s.id.clone(), Arc::new(s));
433                }
434                debug!(count, "OpenCode sessions indexed");
435            }
436        }
437
438        // Cursor
439        if let Some(cursor_dir) = CursorParser::default_dir() {
440            if cursor_dir.exists() {
441                let sessions = CursorParser::scan(&cursor_dir);
442                let count = sessions.len();
443                for s in sessions {
444                    self.sessions.insert(s.id.clone(), Arc::new(s));
445                }
446                debug!(count, "Cursor sessions indexed");
447            }
448        }
449    }
450
451    /// Update degraded state based on load report
452    fn update_degraded_state(&self, report: &LoadReport) {
453        let mut state = self.degraded_state.write();
454
455        if report.has_fatal_errors() {
456            *state = DegradedState::ReadOnly {
457                reason: "Fatal errors during load".to_string(),
458            };
459            return;
460        }
461
462        let mut missing = Vec::new();
463
464        if !report.stats_loaded {
465            missing.push("stats".to_string());
466        }
467        if !report.settings_loaded {
468            missing.push("settings".to_string());
469        }
470        if report.sessions_failed > 0 {
471            missing.push(format!("{} sessions", report.sessions_failed));
472        }
473
474        if missing.is_empty() {
475            *state = DegradedState::Healthy;
476        } else {
477            *state = DegradedState::PartialData {
478                missing: missing.clone(),
479                reason: format!("Missing: {}", missing.join(", ")),
480            };
481        }
482    }
483
484    // ===================
485    // Read accessors
486    // ===================
487
488    /// Get a clone of stats
489    pub fn stats(&self) -> Option<StatsCache> {
490        self.stats.read().clone()
491    }
492
493    /// Calculate context window saturation from current sessions
494    pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
495        // Clone Arc (cheap) to avoid lifetime issues with DashMap iterators
496        let sessions: Vec<_> = self
497            .sessions
498            .iter()
499            .map(|entry| Arc::clone(entry.value()))
500            .collect();
501        // Dereference Arc to get &SessionMetadata
502        let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
503        crate::models::StatsCache::calculate_context_saturation(&refs, 30)
504    }
505
506    /// Get merged settings
507    pub fn settings(&self) -> MergedConfig {
508        self.settings.read().clone()
509    }
510
511    /// Get MCP server configuration
512    pub fn mcp_config(&self) -> Option<McpConfig> {
513        self.mcp_config.read().clone()
514    }
515
516    /// Get rules
517    pub fn rules(&self) -> Rules {
518        self.rules.read().clone()
519    }
520
521    /// Get invocation statistics
522    pub fn invocation_stats(&self) -> InvocationStats {
523        self.invocation_stats.read().clone()
524    }
525
526    /// Calculate current quota status from stats and budget config
527    ///
528    /// Returns None if stats are not loaded or budget is not configured.
529    pub fn quota_status(&self) -> Option<crate::quota::QuotaStatus> {
530        let stats = self.stats.read().clone()?;
531        let settings = self.settings.read();
532        let budget = settings.merged.budget.as_ref()?;
533
534        Some(crate::quota::calculate_quota_status(&stats, budget))
535    }
536
537    /// Get live Claude Code sessions (running processes, ps-based)
538    ///
539    /// Detects active Claude processes on the system and returns metadata.
540    /// Returns empty vector if detection fails or no processes are running.
541    pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
542        crate::live_monitor::detect_live_sessions().unwrap_or_default()
543    }
544
545    /// Get merged live sessions: hook data + ps-based fallback
546    ///
547    /// Hook sessions are prioritized; unmatched ps sessions appear as ProcessOnly.
548    pub fn merged_live_sessions(&self) -> Vec<crate::live_monitor::MergedLiveSession> {
549        let hook_file = self.live_hook_sessions.read().clone();
550        let ps_sessions = crate::live_monitor::detect_live_sessions().unwrap_or_default();
551        crate::live_monitor::merge_live_sessions(&hook_file, &ps_sessions)
552    }
553
554    /// Reload hook-based live session state from a file path
555    pub async fn reload_live_hook_sessions(&self, path: &std::path::Path) {
556        match crate::hook_state::LiveSessionFile::load(path) {
557            Ok(file) => {
558                *self.live_hook_sessions.write() = file;
559                debug!("Reloaded live hook sessions from {}", path.display());
560            }
561            Err(e) => {
562                warn!(error = %e, "Failed to reload live-sessions.json");
563            }
564        }
565    }
566
567    /// Get per-project last session stats from ~/.claude.json
568    pub fn claude_global_stats(&self) -> Option<ClaudeGlobalStats> {
569        self.claude_global_stats.read().clone()
570    }
571
572    /// Get session count
573    pub fn session_count(&self) -> usize {
574        self.sessions.len()
575    }
576
577    /// Get session by ID
578    /// Returns Arc<SessionMetadata> for cheap cloning
579    pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
580        self.sessions.get(id).map(|r| Arc::clone(r.value()))
581    }
582
583    /// Load full session content with lazy caching
584    ///
585    /// Returns conversation messages parsed from session JSONL file.
586    /// Uses Moka cache (LRU with 5min TTL) for repeated access.
587    ///
588    /// # Performance
589    /// - First call: Parse JSONL (~50-500ms for 1000-message session)
590    /// - Cached calls: <1ms (memory lookup)
591    /// - Cache eviction: LRU + 5min idle timeout
592    ///
593    /// # Errors
594    /// Returns CoreError if session not found or file cannot be read.
595    pub async fn load_session_content(
596        &self,
597        session_id: &str,
598    ) -> Result<Vec<crate::models::ConversationMessage>, CoreError> {
599        // Get session metadata
600        let metadata = self
601            .get_session(session_id)
602            .ok_or_else(|| CoreError::SessionNotFound {
603                session_id: session_id.to_string(),
604            })?;
605
606        // Try cache first (Moka handles concurrency internally)
607        let session_id_owned = SessionId::from(session_id);
608        if let Some(_cached) = self.session_content_cache.get(&session_id_owned).await {
609            debug!(session_id, "Session content cache HIT");
610            // TODO: Cache design decision - caching Vec<String> vs Vec<ConversationMessage>
611            // For now, always parse from file (will be optimized in cache phase)
612        }
613
614        // Cache miss: parse from file
615        debug!(
616            session_id,
617            path = %metadata.file_path.display(),
618            "Session content cache MISS, parsing JSONL"
619        );
620
621        let messages = SessionContentParser::parse_conversation(
622            &metadata.file_path,
623            (*metadata).clone(), // Clone metadata out of Arc
624        )
625        .await?;
626
627        // Note: Cache insertion skipped for now (caching Vec<String> vs Vec<ConversationMessage> design decision)
628        // Will be added in cache optimization phase
629
630        Ok(messages)
631    }
632
633    /// Get analytics data for a period (cached)
634    ///
635    /// Returns cached analytics if available, otherwise None.
636    /// Call `compute_analytics()` to compute and cache.
637    pub fn analytics(&self) -> Option<AnalyticsData> {
638        let analytics = self.analytics_cache.read().clone();
639        debug!(
640            has_analytics = analytics.is_some(),
641            "analytics() getter called"
642        );
643        analytics
644    }
645
646    /// Compute and cache analytics data for a period
647    ///
648    /// This is a CPU-intensive operation (trends, forecasting, patterns).
649    /// For 1000+ sessions, this may take 100-300ms, so it's offloaded
650    /// to a blocking task.
651    ///
652    /// Cache is invalidated on stats reload or session updates (EventBus pattern).
653    pub async fn compute_analytics(&self, period: Period) {
654        let sessions: Vec<_> = self
655            .sessions
656            .iter()
657            .map(|r| Arc::clone(r.value()))
658            .collect();
659
660        info!(
661            session_count = sessions.len(),
662            period = ?period,
663            "compute_analytics() ENTRY"
664        );
665
666        // Pick up custom anomaly thresholds from merged settings (if configured)
667        let thresholds = self
668            .settings()
669            .global
670            .as_ref()
671            .and_then(|s| s.anomaly_thresholds.clone())
672            .unwrap_or_default();
673
674        // Offload to blocking task for CPU-intensive computation
675        let analytics = tokio::task::spawn_blocking(move || {
676            AnalyticsData::compute_with_thresholds(&sessions, period, &thresholds)
677        })
678        .await;
679
680        match analytics {
681            Ok(data) => {
682                info!(
683                    insights_count = data.insights.len(),
684                    "compute_analytics() computed data"
685                );
686                let mut guard = self.analytics_cache.write();
687                *guard = Some(data);
688                self.event_bus.publish(DataEvent::AnalyticsUpdated);
689                info!("compute_analytics() EXIT - cached and event published");
690            }
691            Err(e) => {
692                warn!(error = %e, "Failed to compute analytics (task panicked)");
693            }
694        }
695    }
696
697    /// Get cached discover suggestions (None if not yet computed)
698    pub fn discover(&self) -> Option<Vec<crate::analytics::DiscoverSuggestion>> {
699        self.discover_cache.read().clone()
700    }
701
702    /// Extract user messages from recent sessions and run pattern discovery.
703    ///
704    /// Loads JSONL content for up to `max_sessions` most recent sessions,
705    /// extracts user message text, and stores results in `discover_cache`.
706    /// Publishes `DataEvent::AnalyticsUpdated` on completion so the TUI re-renders.
707    pub async fn compute_discover(&self, max_sessions: usize, min_count: usize, top: usize) {
708        let sessions = self.recent_sessions(max_sessions);
709
710        info!(session_count = sessions.len(), "compute_discover() ENTRY");
711
712        let session_data = tokio::task::spawn_blocking(move || {
713            use std::io::BufRead;
714
715            let mut result: Vec<crate::analytics::DiscoverSessionData> = Vec::new();
716            for session in &sessions {
717                let path = session.file_path.clone();
718                let session_id = session.id.as_str().to_string();
719                let project = session.project_path.as_str().to_string();
720
721                let messages: Option<Vec<String>> = (|| {
722                    let file = std::fs::File::open(&path).ok()?;
723                    let reader = std::io::BufReader::new(file);
724                    let mut msgs: Vec<String> = Vec::new();
725                    for line in reader.lines().map_while(|l| l.ok()) {
726                        if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line) {
727                            if v.get("type").and_then(|t| t.as_str()) == Some("user") {
728                                if let Some(content) =
729                                    v.get("message").and_then(|m| m.get("content"))
730                                {
731                                    let text = match content {
732                                        serde_json::Value::String(s) => s.clone(),
733                                        serde_json::Value::Array(arr) => arr
734                                            .iter()
735                                            .filter_map(|item| {
736                                                if item.get("type").and_then(|t| t.as_str())
737                                                    == Some("text")
738                                                {
739                                                    item.get("text")
740                                                        .and_then(|t| t.as_str())
741                                                        .map(|s| s.to_string())
742                                                } else {
743                                                    None
744                                                }
745                                            })
746                                            .collect::<Vec<_>>()
747                                            .join(" "),
748                                        _ => continue,
749                                    };
750                                    if !text.trim().is_empty() && text.len() > 10 {
751                                        msgs.push(text);
752                                    }
753                                }
754                            }
755                        }
756                    }
757                    if msgs.is_empty() {
758                        None
759                    } else {
760                        Some(msgs)
761                    }
762                })();
763
764                if let Some(messages) = messages {
765                    result.push(crate::analytics::DiscoverSessionData {
766                        session_id,
767                        project,
768                        messages,
769                    });
770                }
771            }
772            result
773        })
774        .await;
775
776        match session_data {
777            Ok(data) => {
778                let sessions_analyzed = data.len();
779                let suggestions = crate::analytics::discover_patterns(&data, min_count, top);
780                info!(
781                    sessions_analyzed,
782                    suggestions_count = suggestions.len(),
783                    "compute_discover() completed"
784                );
785                let mut guard = self.discover_cache.write();
786                *guard = Some(suggestions);
787                self.event_bus.publish(DataEvent::AnalyticsUpdated);
788            }
789            Err(e) => {
790                warn!(error = %e, "compute_discover() task panicked");
791            }
792        }
793    }
794
795    /// Invalidate analytics cache (called on data changes)
796    ///
797    /// Note: Currently unused to prevent aggressive invalidation.
798    /// Kept for future use if smart invalidation is needed.
799    #[allow(dead_code)]
800    fn invalidate_analytics_cache(&self) {
801        let mut guard = self.analytics_cache.write();
802        *guard = None;
803        debug!("Analytics cache invalidated");
804    }
805
806    /// Get all session IDs
807    pub fn session_ids(&self) -> Vec<SessionId> {
808        self.sessions.iter().map(|r| r.key().clone()).collect()
809    }
810
811    /// Clear session content cache (for memory optimization on F5)
812    pub fn clear_session_content_cache(&self) {
813        self.session_content_cache.invalidate_all();
814        debug!("Session content cache cleared");
815    }
816
817    /// Get sessions grouped by project
818    /// Returns Arc<SessionMetadata> for cheap cloning
819    pub fn sessions_by_project(
820        &self,
821    ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
822        let mut by_project = std::collections::HashMap::new();
823
824        for entry in self.sessions.iter() {
825            let session = Arc::clone(entry.value());
826            by_project
827                .entry(session.project_path.as_str().to_string())
828                .or_insert_with(Vec::new)
829                .push(session);
830        }
831
832        // Sort sessions within each project by timestamp (newest first)
833        for sessions in by_project.values_mut() {
834            sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
835        }
836
837        by_project
838    }
839
840    /// Get all sessions (unsorted)
841    /// Returns Arc<SessionMetadata> for cheap cloning
842    pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
843        self.sessions
844            .iter()
845            .map(|r| Arc::clone(r.value()))
846            .collect()
847    }
848
849    /// Get recent sessions (sorted by last timestamp, newest first)
850    /// Returns Arc<SessionMetadata> for cheap cloning
851    pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
852        let mut sessions = self.all_sessions();
853        sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
854        sessions.truncate(limit);
855        sessions
856    }
857
858    /// Search sessions using FTS5 full-text search.
859    ///
860    /// Returns relevance-ranked results. Returns empty vec if FTS5 not initialized.
861    pub fn search_sessions(&self, query: &str, limit: usize) -> Vec<crate::cache::SearchResult> {
862        if let Some(ref cache) = self.metadata_cache {
863            match cache.search_sessions(query, limit) {
864                Ok(results) => results,
865                Err(e) => {
866                    warn!("FTS5 search failed: {}", e);
867                    Vec::new()
868                }
869            }
870        } else {
871            Vec::new()
872        }
873    }
874
875    /// Analyze a session's tool calls and generate activity summary + alerts.
876    ///
877    /// Results are stored in the in-memory DashMap and the SQLite cache.
878    /// Publishes DataEvent::AnalyticsUpdated on completion so the TUI re-renders.
879    pub async fn analyze_session(&self, session_id: &str) -> anyhow::Result<ActivitySummary> {
880        use std::time::SystemTime;
881
882        let metadata = self
883            .get_session(session_id)
884            .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
885
886        let path = &metadata.file_path;
887
888        // Read mtime once — used for both cache check and cache write to avoid TOCTOU.
889        // Use tokio::fs to avoid blocking the executor thread.
890        let mtime = tokio::fs::metadata(path)
891            .await
892            .and_then(|m| m.modified())
893            .unwrap_or(SystemTime::UNIX_EPOCH);
894
895        // Check SQLite cache first (avoids re-parsing unchanged files)
896        if let Some(cache) = &self.metadata_cache {
897            if let Ok(Some(cached)) = cache.get_activity(path, mtime) {
898                self.activity_results
899                    .insert(session_id.to_string(), cached.clone());
900                self.event_bus.publish(DataEvent::AnalyticsUpdated);
901                return Ok(cached);
902            }
903        }
904
905        // Cache miss: parse JSONL
906        let calls = parse_tool_calls(path, session_id).await?;
907
908        let project_root = path
909            .parent()
910            .and_then(|p| p.parent())
911            .map(|p| p.to_string_lossy().into_owned());
912
913        let summary = classify_tool_calls(calls, session_id, project_root.as_deref());
914
915        // Persist to SQLite cache — same mtime as used for cache check (no TOCTOU)
916        if let Some(cache) = &self.metadata_cache {
917            if let Err(e) = cache.put_activity(path, session_id, &summary, mtime) {
918                warn!(session_id, error = %e, "Failed to cache activity — will re-parse on restart");
919            }
920        }
921
922        // Store in memory + notify TUI
923        self.activity_results
924            .insert(session_id.to_string(), summary.clone());
925        self.event_bus.publish(DataEvent::AnalyticsUpdated);
926
927        Ok(summary)
928    }
929
930    /// Get the cached activity summary for a session (returns None if not yet analyzed).
931    pub fn get_session_activity(&self, session_id: &str) -> Option<ActivitySummary> {
932        self.activity_results
933            .get(session_id)
934            .map(|r| r.value().clone())
935    }
936
937    /// Get all stored security alerts from the SQLite cache.
938    ///
939    /// `min_severity`: optional filter — "Warning" or "Critical"
940    pub fn get_all_stored_alerts(&self, min_severity: Option<&str>) -> Vec<StoredAlert> {
941        if let Some(cache) = &self.metadata_cache {
942            cache.get_all_alerts(min_severity).unwrap_or_default()
943        } else {
944            vec![]
945        }
946    }
947
948    /// Aggregate MCP call stats from all analyzed sessions.
949    ///
950    /// Returns one entry per server that has been called, sorted by
951    /// call_count descending. Servers with 0 calls are omitted.
952    pub fn mcp_call_stats(&self) -> Vec<McpCallStat> {
953        use crate::models::activity::NetworkTool;
954        use std::collections::{HashMap, HashSet};
955
956        let mut stats: HashMap<String, McpCallStat> = HashMap::new();
957
958        for entry in self.activity_results.iter() {
959            let summary = entry.value();
960            let mut servers_in_session: HashSet<String> = HashSet::new();
961
962            for call in &summary.network_calls {
963                if let NetworkTool::McpCall { server } = &call.tool {
964                    let stat = stats.entry(server.clone()).or_insert_with(|| McpCallStat {
965                        server_name: server.clone(),
966                        call_count: 0,
967                        session_count: 0,
968                        last_seen: None,
969                    });
970                    stat.call_count += 1;
971                    match stat.last_seen {
972                        Some(existing) if call.timestamp > existing => {
973                            stat.last_seen = Some(call.timestamp);
974                        }
975                        None => {
976                            stat.last_seen = Some(call.timestamp);
977                        }
978                        _ => {}
979                    }
980                    servers_in_session.insert(server.clone());
981                }
982            }
983
984            for server in servers_in_session {
985                if let Some(stat) = stats.get_mut(&server) {
986                    stat.session_count += 1;
987                }
988            }
989        }
990
991        let mut result: Vec<McpCallStat> = stats.into_values().collect();
992        result.sort_by(|a, b| b.call_count.cmp(&a.call_count));
993        result
994    }
995
996    /// Consolidated violations feed: merges in-memory DashMap results (freshest) with
997    /// SQLite-persisted alerts. DashMap takes priority for sessions analyzed this run.
998    ///
999    /// Returns alerts sorted Critical → Warning → Info, then by timestamp descending.
1000    pub fn all_violations(&self) -> Vec<crate::models::activity::Alert> {
1001        use crate::models::activity::{Alert, AlertSeverity};
1002        use std::collections::HashSet;
1003
1004        // Collect session_ids already covered by the DashMap (in-memory, freshest data)
1005        let mut seen_sessions: HashSet<String> = HashSet::new();
1006        let mut alerts: Vec<Alert> = Vec::new();
1007
1008        for entry in self.activity_results.iter() {
1009            seen_sessions.insert(entry.key().clone());
1010            alerts.extend(entry.value().alerts.clone());
1011        }
1012
1013        // Supplement with SQLite alerts for sessions NOT in the DashMap
1014        if let Some(cache) = &self.metadata_cache {
1015            if let Ok(stored) = cache.get_all_alerts(None) {
1016                for sa in stored {
1017                    // Derive session_id from session_path (filename without extension)
1018                    let session_id = std::path::Path::new(&sa.session_path)
1019                        .file_stem()
1020                        .and_then(|s| s.to_str())
1021                        .unwrap_or(&sa.session_path)
1022                        .to_string();
1023
1024                    if seen_sessions.contains(&session_id) {
1025                        continue; // DashMap version is fresher, skip SQLite duplicate
1026                    }
1027
1028                    // Parse severity and category from stored strings
1029                    let severity = match sa.severity.as_str() {
1030                        "Critical" => AlertSeverity::Critical,
1031                        "Warning" => AlertSeverity::Warning,
1032                        _ => AlertSeverity::Info,
1033                    };
1034                    let category = match sa.category.as_str() {
1035                        "CredentialAccess" => {
1036                            crate::models::activity::AlertCategory::CredentialAccess
1037                        }
1038                        "DestructiveCommand" => {
1039                            crate::models::activity::AlertCategory::DestructiveCommand
1040                        }
1041                        "ForcePush" => crate::models::activity::AlertCategory::ForcePush,
1042                        "ScopeViolation" => crate::models::activity::AlertCategory::ScopeViolation,
1043                        _ => crate::models::activity::AlertCategory::ExternalExfil,
1044                    };
1045                    let timestamp = sa
1046                        .timestamp
1047                        .parse::<chrono::DateTime<chrono::Utc>>()
1048                        .unwrap_or_else(|_| chrono::Utc::now());
1049
1050                    alerts.push(Alert {
1051                        session_id,
1052                        timestamp,
1053                        severity,
1054                        category,
1055                        detail: sa.detail,
1056                    });
1057                }
1058            }
1059        }
1060
1061        // Sort: Critical > Warning > Info, then newest first within same severity
1062        alerts.sort_by(|a, b| {
1063            b.severity
1064                .partial_cmp(&a.severity)
1065                .unwrap_or(std::cmp::Ordering::Equal)
1066                .then_with(|| b.timestamp.cmp(&a.timestamp))
1067        });
1068
1069        alerts
1070    }
1071
1072    /// Get top sessions by total tokens (sorted descending)
1073    /// Returns Arc<SessionMetadata> for cheap cloning
1074    pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
1075        let mut sessions: Vec<_> = self
1076            .sessions
1077            .iter()
1078            .map(|r| Arc::clone(r.value()))
1079            .collect();
1080        sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
1081        sessions.truncate(limit);
1082        sessions
1083    }
1084
1085    /// Get top models by total tokens (aggregated, sorted descending)
1086    /// Returns (model_name, total_tokens) pairs
1087    pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
1088        let mut model_totals = std::collections::HashMap::new();
1089
1090        // Aggregate tokens per model across all sessions
1091        for session in self.sessions.iter() {
1092            for model in &session.value().models_used {
1093                *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
1094            }
1095        }
1096
1097        // Convert to vec and sort
1098        let mut results: Vec<_> = model_totals.into_iter().collect();
1099        results.sort_by(|a, b| b.1.cmp(&a.1));
1100        results.truncate(10); // Top 10
1101        results
1102    }
1103
1104    /// Get top days by total tokens (aggregated, sorted descending)
1105    /// Returns (date_string, total_tokens) pairs
1106    pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
1107        let mut day_totals = std::collections::HashMap::new();
1108
1109        // Aggregate tokens per day across all sessions
1110        for session in self.sessions.iter() {
1111            if let Some(timestamp) = &session.value().first_timestamp {
1112                let date = timestamp.format("%Y-%m-%d").to_string();
1113                *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
1114            }
1115        }
1116
1117        // Convert to vec and sort
1118        let mut results: Vec<_> = day_totals.into_iter().collect();
1119        results.sort_by(|a, b| b.1.cmp(&a.1));
1120        results.truncate(10); // Top 10
1121        results
1122    }
1123
1124    /// Get project leaderboard with aggregated metrics
1125    ///
1126    /// Returns all projects with session count, total tokens, total cost, and average session cost.
1127    /// Cost is calculated using accurate model-based pricing from the pricing module.
1128    pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
1129        let mut project_metrics = std::collections::HashMap::new();
1130
1131        // Aggregate metrics per project
1132        for session in self.sessions.iter() {
1133            let metadata = session.value();
1134            let project_path = &metadata.project_path;
1135
1136            // Get model for this session (use first model, or "unknown")
1137            let model = metadata
1138                .models_used
1139                .first()
1140                .map(|s| s.as_str())
1141                .unwrap_or("unknown");
1142
1143            // Calculate cost using accurate pricing
1144            let cost = crate::pricing::calculate_cost(
1145                model,
1146                metadata.input_tokens,
1147                metadata.output_tokens,
1148                metadata.cache_creation_tokens,
1149                metadata.cache_read_tokens,
1150            );
1151
1152            let entry = project_metrics
1153                .entry(project_path.clone())
1154                .or_insert((0, 0u64, 0.0f64)); // (session_count, total_tokens, total_cost)
1155
1156            entry.0 += 1; // session count
1157            entry.1 += metadata.total_tokens; // total tokens
1158            entry.2 += cost; // total cost
1159        }
1160
1161        // Convert to leaderboard entries
1162        let mut results: Vec<_> = project_metrics
1163            .into_iter()
1164            .map(
1165                |(project_path, (session_count, total_tokens, total_cost))| {
1166                    let avg_session_cost = if session_count > 0 {
1167                        total_cost / session_count as f64
1168                    } else {
1169                        0.0
1170                    };
1171
1172                    // Extract project name from path (last component)
1173                    let project_name = std::path::Path::new(project_path.as_str())
1174                        .file_name()
1175                        .and_then(|n| n.to_str())
1176                        .unwrap_or(project_path.as_str())
1177                        .to_string();
1178
1179                    ProjectLeaderboardEntry {
1180                        project_name,
1181                        total_sessions: session_count,
1182                        total_tokens,
1183                        total_cost,
1184                        avg_session_cost,
1185                    }
1186                },
1187            )
1188            .collect();
1189
1190        // Default sort: by total cost descending
1191        results.sort_by(|a, b| {
1192            b.total_cost
1193                .partial_cmp(&a.total_cost)
1194                .unwrap_or(std::cmp::Ordering::Equal)
1195        });
1196
1197        results
1198    }
1199
1200    // ===================
1201    // Update methods (called by watcher)
1202    // ===================
1203
1204    /// Reload stats (called on file change)
1205    pub async fn reload_stats(&self) {
1206        let stats_path = self.claude_home.join("stats-cache.json");
1207        let parser = StatsParser::new()
1208            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
1209
1210        let mut report = LoadReport::new();
1211        if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
1212            // Recalculate costs using accurate pricing
1213            stats.recalculate_costs();
1214            let mut guard = self.stats.write();
1215            *guard = Some(stats);
1216
1217            // Don't invalidate analytics - it will auto-recompute if needed
1218            // Instead, just publish the event so UI can decide whether to recompute
1219            self.event_bus.publish(DataEvent::StatsUpdated);
1220            debug!("Stats reloaded with recalculated costs");
1221        }
1222    }
1223
1224    /// Reload settings from files (called when settings change)
1225    pub async fn reload_settings(&self) {
1226        let parser = SettingsParser::new();
1227        let merged = parser
1228            .load_merged(
1229                &self.claude_home,
1230                self.project_path.as_deref(),
1231                &mut LoadReport::new(),
1232            )
1233            .await;
1234
1235        {
1236            let mut guard = self.settings.write();
1237            *guard = merged;
1238        }
1239
1240        // Note: caller (watcher handle_event) publishes ConfigChanged after this returns.
1241        debug!("Settings reloaded");
1242    }
1243
1244    /// Add or update a session (called when session file changes)
1245    pub async fn update_session(&self, path: &Path) {
1246        let parser = SessionIndexParser::new();
1247
1248        match parser.scan_session(path).await {
1249            Ok(meta) => {
1250                let id = meta.id.clone();
1251                let is_new = !self.sessions.contains_key(&id);
1252
1253                self.sessions.insert(id.clone(), Arc::new(meta));
1254
1255                // Don't invalidate analytics on every session update - too aggressive
1256                // Analytics will be recomputed on demand or periodically
1257                // Only invalidate on significant changes (detected by UI)
1258
1259                if is_new {
1260                    self.event_bus.publish(DataEvent::SessionCreated(id));
1261                } else {
1262                    self.event_bus.publish(DataEvent::SessionUpdated(id));
1263                }
1264            }
1265            Err(e) => {
1266                warn!(path = %path.display(), error = %e, "Failed to update session");
1267            }
1268        }
1269    }
1270
1271    /// Compute invocation statistics from all sessions
1272    ///
1273    /// This scans all session files to count agent/command/skill invocations.
1274    /// Should be called after initial load or when sessions are updated.
1275    pub async fn compute_invocations(&self) {
1276        let paths: Vec<_> = self
1277            .sessions
1278            .iter()
1279            .map(|r| r.value().file_path.clone())
1280            .collect();
1281
1282        debug!(session_count = paths.len(), "Computing invocation stats");
1283
1284        let parser = InvocationParser::new();
1285        let mut stats = parser.scan_sessions(&paths).await;
1286
1287        // Populate agent_token_stats from session tool_token_usage
1288        // The Task tool tokens serve as a proxy for agent token consumption
1289        for session_ref in self.sessions.iter() {
1290            let session = session_ref.value();
1291            if let Some(&task_tokens) = session.tool_token_usage.get("Task") {
1292                // Distribute Task tool tokens equally among agents spawned in this session
1293                let agent_count =
1294                    session.tool_usage.get("Task").copied().unwrap_or(0).max(1) as u64;
1295                let tokens_per_agent = task_tokens / agent_count;
1296                // Attribute to all agent types found in stats that were invoked
1297                for agent_type in stats.agents.keys().cloned().collect::<Vec<_>>() {
1298                    *stats.agent_token_stats.entry(agent_type).or_insert(0) += tokens_per_agent;
1299                }
1300            }
1301        }
1302
1303        let mut guard = self.invocation_stats.write();
1304        *guard = stats;
1305
1306        debug!(
1307            agents = guard.agents.len(),
1308            commands = guard.commands.len(),
1309            skills = guard.skills.len(),
1310            total = guard.total_invocations(),
1311            "Invocation stats computed"
1312        );
1313
1314        // Note: Using LoadCompleted as there's no specific invocation stats event
1315        self.event_bus.publish(DataEvent::LoadCompleted);
1316    }
1317
1318    /// Compute billing blocks from all sessions
1319    ///
1320    /// This scans all sessions with timestamps and aggregates usage into 5-hour billing blocks.
1321    /// Uses real model pricing based on token breakdown for accurate cost calculation.
1322    pub async fn compute_billing_blocks(&self) {
1323        debug!("Computing billing blocks from sessions with real pricing");
1324
1325        let mut manager = BillingBlockManager::new();
1326        let mut sessions_with_timestamps = 0;
1327        let mut sessions_without_timestamps = 0;
1328
1329        for session in self.sessions.iter() {
1330            let metadata = session.value();
1331
1332            // Skip sessions without timestamps
1333            let Some(timestamp) = &metadata.first_timestamp else {
1334                sessions_without_timestamps += 1;
1335                continue;
1336            };
1337
1338            sessions_with_timestamps += 1;
1339
1340            // Get model for this session (use first model, or "unknown")
1341            let model = metadata
1342                .models_used
1343                .first()
1344                .map(|s| s.as_str())
1345                .unwrap_or("unknown");
1346
1347            // Calculate real cost using pricing table
1348            let cost = crate::pricing::calculate_cost(
1349                model,
1350                metadata.input_tokens,
1351                metadata.output_tokens,
1352                metadata.cache_creation_tokens,
1353                metadata.cache_read_tokens,
1354            );
1355
1356            manager.add_usage(
1357                timestamp,
1358                metadata.input_tokens,
1359                metadata.output_tokens,
1360                metadata.cache_creation_tokens,
1361                metadata.cache_read_tokens,
1362                cost,
1363            );
1364        }
1365
1366        debug!(
1367            sessions_with_timestamps,
1368            sessions_without_timestamps,
1369            blocks = manager.get_all_blocks().len(),
1370            "Billing blocks computed with real pricing"
1371        );
1372
1373        let mut guard = self.billing_blocks.write();
1374        *guard = manager;
1375
1376        self.event_bus.publish(DataEvent::LoadCompleted);
1377    }
1378
1379    /// Get billing blocks (read-only access)
1380    pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
1381        self.billing_blocks.read()
1382    }
1383
1384    /// Calculate usage estimate based on billing blocks and subscription plan
1385    pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
1386        use crate::parsers::claude_global::DetectedPlan;
1387        use crate::usage_estimator::SubscriptionPlan;
1388
1389        let settings = self.settings();
1390
1391        // Priority 1: explicit override in settings.json (ccboard field)
1392        let plan = if let Some(s) = settings.merged.subscription_plan.as_ref() {
1393            SubscriptionPlan::parse(s)
1394        } else {
1395            // Priority 2: auto-detect from ~/.claude.json account fields
1396            let detected = self
1397                .claude_global_stats
1398                .read()
1399                .as_ref()
1400                .and_then(|g| g.detected_plan.clone());
1401
1402            match detected {
1403                Some(DetectedPlan::Pro) => SubscriptionPlan::Pro,
1404                Some(DetectedPlan::Max) => SubscriptionPlan::Max5x, // can't distinguish 5x vs 20x
1405                Some(DetectedPlan::Api) => SubscriptionPlan::Api,
1406                None => SubscriptionPlan::Unknown,
1407            }
1408        };
1409
1410        let billing_blocks = self.billing_blocks.read();
1411        crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
1412    }
1413
1414    /// Load ccboard user preferences from the cache directory.
1415    pub fn load_preferences(&self) -> crate::preferences::CcboardPreferences {
1416        let cache_dir = self.claude_home.join("cache");
1417        crate::preferences::CcboardPreferences::load(&cache_dir)
1418    }
1419
1420    /// Save ccboard user preferences to the cache directory.
1421    pub fn save_preferences(
1422        &self,
1423        prefs: &crate::preferences::CcboardPreferences,
1424    ) -> anyhow::Result<()> {
1425        let cache_dir = self.claude_home.join("cache");
1426        prefs.save(&cache_dir)
1427    }
1428
1429    // ── Bookmark accessors ───────────────────────────────────────────────────
1430
1431    /// Returns true if the session is bookmarked
1432    pub fn is_bookmarked(&self, session_id: &str) -> bool {
1433        self.bookmark_store.read().is_bookmarked(session_id)
1434    }
1435
1436    /// Returns the bookmark entry for a session, if any.
1437    /// Cloned to avoid holding the lock across an await point.
1438    pub fn bookmark_entry(&self, session_id: &str) -> Option<crate::bookmarks::BookmarkEntry> {
1439        self.bookmark_store.read().get(session_id).cloned()
1440    }
1441
1442    /// Toggle bookmark (add with default tag "bookmarked", or remove).
1443    /// Returns `true` if the session is now bookmarked.
1444    pub fn toggle_bookmark(&self, session_id: &str) -> anyhow::Result<bool> {
1445        self.bookmark_store.write().toggle(session_id, "bookmarked")
1446    }
1447
1448    /// Add or update a bookmark with a custom tag and optional note.
1449    pub fn upsert_bookmark(
1450        &self,
1451        session_id: &str,
1452        tag: impl Into<String>,
1453        note: Option<String>,
1454    ) -> anyhow::Result<()> {
1455        self.bookmark_store.write().upsert(session_id, tag, note)
1456    }
1457
1458    /// Remove a bookmark explicitly.
1459    pub fn remove_bookmark(&self, session_id: &str) -> anyhow::Result<bool> {
1460        self.bookmark_store.write().remove(session_id)
1461    }
1462
1463    /// Number of bookmarked sessions
1464    pub fn bookmark_count(&self) -> usize {
1465        self.bookmark_store.read().len()
1466    }
1467
1468    /// True if a cached LLM summary exists for this session
1469    pub fn has_summary(&self, session_id: &str) -> bool {
1470        self.summary_store.has_summary(session_id)
1471    }
1472
1473    /// Load cached summary text, or None if not yet generated
1474    pub fn load_summary(&self, session_id: &str) -> Option<String> {
1475        self.summary_store.load(session_id)
1476    }
1477
1478    /// Returns all direct subagent sessions of the given parent session ID.
1479    /// A session is a subagent if its `parent_session_id` == `parent_id`.
1480    pub fn subagent_children(&self, parent_id: &str) -> Vec<Arc<SessionMetadata>> {
1481        self.sessions
1482            .iter()
1483            .filter(|entry| {
1484                entry
1485                    .value()
1486                    .parent_session_id
1487                    .as_deref()
1488                    .map(|pid| pid == parent_id)
1489                    .unwrap_or(false)
1490            })
1491            .map(|entry| Arc::clone(entry.value()))
1492            .collect()
1493    }
1494
1495    /// Backfills `has_subagents` on all sessions based on cross-references.
1496    /// A session has subagents if any other session has `parent_session_id == this_id`.
1497    /// Called once after initial_load() completes.
1498    pub fn compute_has_subagents(&self) {
1499        // Collect all parent IDs referenced by child sessions
1500        let parent_ids: std::collections::HashSet<String> = self
1501            .sessions
1502            .iter()
1503            .filter_map(|entry| entry.value().parent_session_id.clone())
1504            .collect();
1505
1506        if parent_ids.is_empty() {
1507            return;
1508        }
1509
1510        // For each session that is referenced as a parent, set has_subagents = true
1511        for mut entry in self.sessions.iter_mut() {
1512            if parent_ids.contains(entry.value().id.as_str()) {
1513                let current: &SessionMetadata = entry.value();
1514                let updated = SessionMetadata {
1515                    has_subagents: true,
1516                    ..current.clone()
1517                };
1518                *entry.value_mut() = Arc::new(updated);
1519            }
1520        }
1521    }
1522}
1523
1524#[cfg(test)]
1525mod tests {
1526    use super::*;
1527    use tempfile::tempdir;
1528
1529    #[tokio::test]
1530    async fn test_data_store_creation() {
1531        let dir = tempdir().unwrap();
1532        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1533
1534        assert_eq!(store.session_count(), 0);
1535        assert!(store.stats().is_none());
1536        assert!(store.degraded_state().is_healthy());
1537    }
1538
1539    #[tokio::test]
1540    async fn test_initial_load_missing_dir() {
1541        let dir = tempdir().unwrap();
1542        let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
1543
1544        let report = store.initial_load().await;
1545
1546        // Should have warnings but not crash
1547        assert!(report.has_errors());
1548        assert!(store.degraded_state().is_degraded());
1549    }
1550
1551    #[tokio::test]
1552    async fn test_initial_load_with_stats() {
1553        let dir = tempdir().unwrap();
1554        let claude_home = dir.path();
1555
1556        // Create stats file with new format
1557        std::fs::write(
1558            claude_home.join("stats-cache.json"),
1559            r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
1560        )
1561        .unwrap();
1562
1563        // Create projects dir
1564        std::fs::create_dir_all(claude_home.join("projects")).unwrap();
1565
1566        let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
1567        let report = store.initial_load().await;
1568
1569        assert!(report.stats_loaded);
1570        let stats = store.stats().unwrap();
1571        assert_eq!(stats.total_tokens(), 1000);
1572        assert_eq!(stats.session_count(), 5);
1573    }
1574
1575    #[tokio::test]
1576    async fn test_event_bus_subscription() {
1577        let dir = tempdir().unwrap();
1578        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1579
1580        let mut rx = store.event_bus().subscribe();
1581
1582        // Trigger load completed
1583        store.event_bus().publish(DataEvent::StatsUpdated);
1584
1585        let event = rx.recv().await.unwrap();
1586        assert!(matches!(event, DataEvent::StatsUpdated));
1587    }
1588
1589    #[tokio::test]
1590    async fn test_analytics_cache_and_invalidation() {
1591        use crate::models::session::SessionMetadata;
1592        use chrono::Utc;
1593
1594        let dir = tempdir().unwrap();
1595        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1596
1597        // Add test sessions
1598        let now = Utc::now();
1599        for i in 0..10 {
1600            let total_tokens = 1000 * (i as u64 + 1);
1601            let session = SessionMetadata {
1602                id: format!("test-{}", i).into(),
1603                file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
1604                project_path: "/test".into(),
1605                first_timestamp: Some(now - chrono::Duration::days(i)),
1606                last_timestamp: Some(now),
1607                message_count: 10,
1608                total_tokens,
1609                input_tokens: total_tokens / 2,
1610                output_tokens: total_tokens / 3,
1611                cache_creation_tokens: total_tokens / 10,
1612                cache_read_tokens: total_tokens
1613                    - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
1614                models_used: vec!["sonnet".to_string()],
1615                model_segments: Vec::new(),
1616                file_size_bytes: 1024,
1617                first_user_message: None,
1618                has_subagents: false,
1619                parent_session_id: None,
1620                duration_seconds: Some(1800),
1621                branch: None,
1622                tool_usage: std::collections::HashMap::new(),
1623                tool_token_usage: std::collections::HashMap::new(),
1624                source_tool: Default::default(),
1625                lines_added: 0,
1626                lines_removed: 0,
1627            };
1628            store.sessions.insert(session.id.clone(), Arc::new(session));
1629        }
1630
1631        // Initially no analytics
1632        assert!(store.analytics().is_none());
1633
1634        // Compute analytics
1635        store.compute_analytics(Period::last_7d()).await;
1636
1637        // Analytics should be cached
1638        let analytics1 = store.analytics().expect("Analytics should be cached");
1639        assert!(!analytics1.trends.is_empty());
1640        assert_eq!(analytics1.period, Period::last_7d());
1641
1642        // Invalidate by reloading stats
1643        store.invalidate_analytics_cache();
1644        assert!(store.analytics().is_none(), "Cache should be invalidated");
1645
1646        // Re-compute with different period
1647        store.compute_analytics(Period::last_30d()).await;
1648        let analytics2 = store.analytics().expect("Analytics should be re-cached");
1649        assert_eq!(analytics2.period, Period::last_30d());
1650    }
1651
1652    #[tokio::test]
1653    async fn test_leaderboard_methods() {
1654        use crate::models::session::SessionMetadata;
1655        use chrono::Utc;
1656
1657        let dir = tempdir().unwrap();
1658        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1659
1660        let now = Utc::now();
1661
1662        // Add sessions with varying tokens
1663        let test_data = vec![
1664            ("session-1", 5000u64, "opus", 0),
1665            ("session-2", 3000u64, "sonnet", 1),
1666            ("session-3", 8000u64, "haiku", 0),
1667            ("session-4", 2000u64, "sonnet", 2),
1668            ("session-5", 10000u64, "opus", 0),
1669        ];
1670
1671        for (id, tokens, model, days_ago) in test_data {
1672            let session = SessionMetadata {
1673                id: id.into(),
1674                file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1675                project_path: "/test".into(),
1676                first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1677                last_timestamp: Some(now),
1678                message_count: 10,
1679                total_tokens: tokens,
1680                input_tokens: tokens / 2,
1681                output_tokens: tokens / 2,
1682                cache_creation_tokens: 0,
1683                cache_read_tokens: 0,
1684                models_used: vec![model.to_string()],
1685                model_segments: Vec::new(),
1686                file_size_bytes: 1024,
1687                first_user_message: None,
1688                has_subagents: false,
1689                parent_session_id: None,
1690                duration_seconds: Some(1800),
1691                branch: None,
1692                tool_usage: std::collections::HashMap::new(),
1693                tool_token_usage: std::collections::HashMap::new(),
1694                source_tool: Default::default(),
1695                lines_added: 0,
1696                lines_removed: 0,
1697            };
1698            store.sessions.insert(session.id.clone(), Arc::new(session));
1699        }
1700
1701        // Test top_sessions_by_tokens
1702        let top_sessions = store.top_sessions_by_tokens(3);
1703        assert_eq!(top_sessions.len(), 3);
1704        assert_eq!(top_sessions[0].id, "session-5"); // 10000 tokens
1705        assert_eq!(top_sessions[1].id, "session-3"); // 8000 tokens
1706        assert_eq!(top_sessions[2].id, "session-1"); // 5000 tokens
1707
1708        // Test top_models_by_tokens
1709        let top_models = store.top_models_by_tokens();
1710        assert!(!top_models.is_empty());
1711        // opus: 15000 (5000+10000), sonnet: 5000 (3000+2000), haiku: 8000
1712        assert_eq!(top_models[0].0, "opus");
1713        assert_eq!(top_models[0].1, 15000);
1714        assert_eq!(top_models[1].0, "haiku");
1715        assert_eq!(top_models[1].1, 8000);
1716
1717        // Test top_days_by_tokens
1718        let top_days = store.top_days_by_tokens();
1719        assert!(!top_days.is_empty());
1720        // Day 0 (today): 5000+8000+10000 = 23000
1721        let today = now.format("%Y-%m-%d").to_string();
1722        assert_eq!(top_days[0].0, today);
1723        assert_eq!(top_days[0].1, 23000);
1724    }
1725
1726    /// C3: DashMap takes priority over SQLite in all_violations()
1727    ///
1728    /// Verifies the merge strategy:
1729    /// - Same session_id in both → DashMap version returned (fresher)
1730    /// - Session only in SQLite → SQLite version returned (fills the gap)
1731    #[test]
1732    fn test_all_violations_dashmap_priority_over_sqlite() {
1733        use crate::models::activity::{ActivitySummary, Alert, AlertCategory, AlertSeverity};
1734        use chrono::Utc;
1735
1736        let dir = tempdir().unwrap();
1737        let claude_home = dir.path().to_path_buf();
1738        let store = DataStore::with_defaults(claude_home.clone(), None);
1739
1740        let now = Utc::now();
1741
1742        // ── Setup: SQLite alert (via MetadataCache directly) ──────────────────
1743        let cache = MetadataCache::new(&claude_home.join("cache"))
1744            .expect("MetadataCache should open in tempdir");
1745
1746        // "shared-session" exists in SQLite with a Warning-level alert
1747        let sqlite_summary = ActivitySummary {
1748            alerts: vec![Alert {
1749                session_id: "shared-session".to_string(),
1750                timestamp: now,
1751                severity: AlertSeverity::Warning,
1752                category: AlertCategory::DestructiveCommand,
1753                detail: "sqlite-version".to_string(),
1754            }],
1755            ..Default::default()
1756        };
1757        cache
1758            .put_activity(
1759                std::path::Path::new("/projects/test/shared-session.jsonl"),
1760                "shared-session",
1761                &sqlite_summary,
1762                std::time::SystemTime::now(),
1763            )
1764            .expect("put_activity should succeed");
1765
1766        // "sqlite-only-session" exists exclusively in SQLite
1767        let sqlite_only_summary = ActivitySummary {
1768            alerts: vec![Alert {
1769                session_id: "sqlite-only-session".to_string(),
1770                timestamp: now,
1771                severity: AlertSeverity::Info,
1772                category: AlertCategory::ExternalExfil,
1773                detail: "sqlite-only-detail".to_string(),
1774            }],
1775            ..Default::default()
1776        };
1777        cache
1778            .put_activity(
1779                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1780                "sqlite-only-session",
1781                &sqlite_only_summary,
1782                std::time::SystemTime::now(),
1783            )
1784            .expect("put_activity should succeed");
1785
1786        // Attach the same DB to the store's metadata_cache field
1787        // (accessible from within the same module in #[cfg(test)])
1788        // The store already opened the same cache dir during with_defaults(),
1789        // so both share the same SQLite file.
1790        // We must write through the *store*'s own cache handle to avoid lock
1791        // conflicts. Retrieve it:
1792        let store_cache = store
1793            .metadata_cache
1794            .as_ref()
1795            .expect("MetadataCache should be present in store");
1796
1797        store_cache
1798            .put_activity(
1799                std::path::Path::new("/projects/test/shared-session.jsonl"),
1800                "shared-session",
1801                &sqlite_summary,
1802                std::time::SystemTime::now(),
1803            )
1804            .expect("put_activity via store cache should succeed");
1805        store_cache
1806            .put_activity(
1807                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1808                "sqlite-only-session",
1809                &sqlite_only_summary,
1810                std::time::SystemTime::now(),
1811            )
1812            .expect("put_activity via store cache should succeed");
1813
1814        // ── Setup: DashMap alert for the shared session (fresher, Critical) ───
1815        let dashmap_summary = ActivitySummary {
1816            alerts: vec![Alert {
1817                session_id: "shared-session".to_string(),
1818                timestamp: now,
1819                severity: AlertSeverity::Critical,
1820                category: AlertCategory::ForcePush,
1821                detail: "dashmap-version".to_string(),
1822            }],
1823            ..Default::default()
1824        };
1825        store
1826            .activity_results
1827            .insert("shared-session".to_string(), dashmap_summary);
1828
1829        // ── Assert ────────────────────────────────────────────────────────────
1830        let violations = store.all_violations();
1831
1832        // The DashMap version must appear
1833        let dashmap_hit = violations.iter().find(|a| a.session_id == "shared-session");
1834        assert!(
1835            dashmap_hit.is_some(),
1836            "shared-session alert must appear in violations"
1837        );
1838        assert_eq!(
1839            dashmap_hit.unwrap().detail,
1840            "dashmap-version",
1841            "DashMap version must take priority over SQLite for shared session"
1842        );
1843        assert_eq!(
1844            dashmap_hit.unwrap().severity,
1845            AlertSeverity::Critical,
1846            "DashMap severity (Critical) must win over SQLite (Warning)"
1847        );
1848
1849        // The SQLite-only version must appear (fills the gap)
1850        let sqlite_hit = violations
1851            .iter()
1852            .find(|a| a.session_id == "sqlite-only-session");
1853        assert!(
1854            sqlite_hit.is_some(),
1855            "sqlite-only-session must appear in violations (no DashMap entry for it)"
1856        );
1857        assert_eq!(sqlite_hit.unwrap().detail, "sqlite-only-detail");
1858
1859        // The SQLite version of shared-session must NOT appear
1860        let sqlite_dup = violations
1861            .iter()
1862            .filter(|a| a.session_id == "shared-session")
1863            .count();
1864        assert_eq!(
1865            sqlite_dup, 1,
1866            "shared-session must appear exactly once (DashMap wins, no SQLite duplicate)"
1867        );
1868
1869        // Sorting: Critical before Info
1870        assert_eq!(violations[0].severity, AlertSeverity::Critical);
1871    }
1872}