Skip to main content

ccboard_core/
store.rs

1//! Data store with DashMap + parking_lot::RwLock
2//!
3//! Uses DashMap for sessions (per-entry locking) and parking_lot::RwLock
4//! for stats/settings (better fairness than std::sync::RwLock).
5
6use crate::analytics::{AnalyticsData, Period};
7use crate::cache::MetadataCache;
8use crate::error::{DegradedState, LoadReport};
9use crate::event::{ConfigScope, DataEvent, EventBus};
10use crate::models::{
11    BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
12};
13use crate::parsers::{
14    InvocationParser, McpConfig, Rules, SessionIndexParser, SettingsParser, StatsParser,
15};
16use dashmap::DashMap;
17use moka::future::Cache;
18use parking_lot::RwLock; // parking_lot > std::sync::RwLock: smaller (40B vs 72B), no poisoning, better fairness
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24/// Configuration for the data store
25#[derive(Debug, Clone)]
26pub struct DataStoreConfig {
27    /// Maximum session metadata entries to keep
28    pub max_session_metadata_count: usize,
29
30    /// Maximum size for session content cache in MB
31    pub max_session_content_cache_mb: usize,
32
33    /// Maximum concurrent session scans
34    pub max_concurrent_scans: usize,
35
36    /// Stats parser retry count
37    pub stats_retry_count: u32,
38
39    /// Stats parser retry delay
40    pub stats_retry_delay: Duration,
41}
42
43impl Default for DataStoreConfig {
44    fn default() -> Self {
45        Self {
46            max_session_metadata_count: 10_000,
47            max_session_content_cache_mb: 100,
48            max_concurrent_scans: 8,
49            stats_retry_count: 3,
50            stats_retry_delay: Duration::from_millis(100),
51        }
52    }
53}
54
55/// Central data store for ccboard
56///
57/// Thread-safe access to all Claude Code data.
58/// Uses DashMap for sessions (high contention) and RwLock for stats/settings (low contention).
59pub struct DataStore {
60    /// Path to Claude home directory
61    claude_home: PathBuf,
62
63    /// Current project path (if focused)
64    project_path: Option<PathBuf>,
65
66    /// Configuration
67    config: DataStoreConfig,
68
69    /// Stats cache (low contention, frequent reads)
70    stats: RwLock<Option<StatsCache>>,
71
72    /// Merged settings
73    settings: RwLock<MergedConfig>,
74
75    /// MCP server configuration
76    mcp_config: RwLock<Option<McpConfig>>,
77
78    /// Rules from CLAUDE.md files
79    rules: RwLock<Rules>,
80
81    /// Invocation statistics (agents, commands, skills)
82    invocation_stats: RwLock<InvocationStats>,
83
84    /// Billing blocks (5h usage tracking)
85    billing_blocks: RwLock<BillingBlockManager>,
86
87    /// Analytics data cache (invalidated on stats/sessions update)
88    analytics_cache: RwLock<Option<AnalyticsData>>,
89
90    /// Session metadata (high contention with many entries)
91    /// Arc<SessionMetadata> for cheap cloning (8 bytes vs ~400 bytes)
92    ///
93    /// Why Arc over Box: Multi-thread access from TUI + Web frontends
94    /// justifies atomic refcount overhead (~4 bytes). Box would require
95    /// cloning entire struct on each frontend access.
96    sessions: DashMap<SessionId, Arc<SessionMetadata>>,
97
98    /// Session content cache (LRU for on-demand loading)
99    #[allow(dead_code)]
100    session_content_cache: Cache<SessionId, Vec<String>>,
101
102    /// Event bus for notifying subscribers
103    event_bus: EventBus,
104
105    /// Current degraded state
106    degraded_state: RwLock<DegradedState>,
107
108    /// Metadata cache for 90% startup speedup (optional)
109    metadata_cache: Option<Arc<MetadataCache>>,
110}
111
112/// Project leaderboard entry with aggregated metrics
113#[derive(Debug, Clone)]
114pub struct ProjectLeaderboardEntry {
115    pub project_name: String,
116    pub total_sessions: usize,
117    pub total_tokens: u64,
118    pub total_cost: f64,
119    pub avg_session_cost: f64,
120}
121
122impl DataStore {
123    /// Create a new data store
124    pub fn new(
125        claude_home: PathBuf,
126        project_path: Option<PathBuf>,
127        config: DataStoreConfig,
128    ) -> Self {
129        let session_content_cache = Cache::builder()
130            .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) // Rough estimate
131            .time_to_idle(Duration::from_secs(300)) // 5 min idle expiry
132            .build();
133
134        // Create metadata cache in ~/.claude/cache/
135        let metadata_cache = {
136            let cache_dir = claude_home.join("cache");
137            match MetadataCache::new(&cache_dir) {
138                Ok(cache) => {
139                    debug!(path = %cache_dir.display(), "Metadata cache enabled");
140                    Some(Arc::new(cache))
141                }
142                Err(e) => {
143                    warn!(error = %e, "Failed to create metadata cache, running without cache");
144                    None
145                }
146            }
147        };
148
149        Self {
150            claude_home,
151            project_path,
152            config,
153            stats: RwLock::new(None),
154            settings: RwLock::new(MergedConfig::default()),
155            mcp_config: RwLock::new(None),
156            rules: RwLock::new(Rules::default()),
157            invocation_stats: RwLock::new(InvocationStats::new()),
158            billing_blocks: RwLock::new(BillingBlockManager::new()),
159            analytics_cache: RwLock::new(None),
160            sessions: DashMap::new(),
161            session_content_cache,
162            event_bus: EventBus::default_capacity(),
163            degraded_state: RwLock::new(DegradedState::Healthy),
164            metadata_cache,
165        }
166    }
167
168    /// Create with default configuration
169    pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
170        Self::new(claude_home, project_path, DataStoreConfig::default())
171    }
172
173    /// Get the event bus for subscribing to updates
174    pub fn event_bus(&self) -> &EventBus {
175        &self.event_bus
176    }
177
178    /// Get current degraded state
179    pub fn degraded_state(&self) -> DegradedState {
180        self.degraded_state.read().clone()
181    }
182
183    /// Initial load of all data with LoadReport for graceful degradation
184    pub async fn initial_load(&self) -> LoadReport {
185        let mut report = LoadReport::new();
186
187        info!(claude_home = %self.claude_home.display(), "Starting initial data load");
188
189        // Load stats
190        self.load_stats(&mut report).await;
191
192        // Load settings
193        self.load_settings(&mut report).await;
194
195        // Load MCP configuration
196        self.load_mcp_config(&mut report).await;
197
198        // Load rules
199        self.load_rules(&mut report).await;
200
201        // Scan sessions
202        self.scan_sessions(&mut report).await;
203
204        // Determine degraded state
205        self.update_degraded_state(&report);
206
207        // Notify subscribers
208        self.event_bus.publish(DataEvent::LoadCompleted);
209
210        info!(
211            stats_loaded = report.stats_loaded,
212            settings_loaded = report.settings_loaded,
213            sessions_scanned = report.sessions_scanned,
214            sessions_failed = report.sessions_failed,
215            errors = report.errors.len(),
216            "Initial load complete"
217        );
218
219        report
220    }
221
222    /// Load stats cache
223    async fn load_stats(&self, report: &mut LoadReport) {
224        let stats_path = self.claude_home.join("stats-cache.json");
225        let parser = StatsParser::new()
226            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
227
228        if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
229            // Recalculate costs using accurate pricing
230            stats.recalculate_costs();
231            let mut guard = self.stats.write();
232            *guard = Some(stats);
233            debug!("Stats loaded successfully with recalculated costs");
234        }
235    }
236
237    /// Load and merge settings
238    async fn load_settings(&self, report: &mut LoadReport) {
239        let parser = SettingsParser::new();
240        let merged = parser
241            .load_merged(&self.claude_home, self.project_path.as_deref(), report)
242            .await;
243
244        let mut guard = self.settings.write();
245        *guard = merged;
246        debug!("Settings loaded and merged");
247    }
248
249    /// Load MCP server configuration (global + project-level)
250    async fn load_mcp_config(&self, report: &mut LoadReport) {
251        match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
252            Ok(Some(config)) => {
253                let server_count = config.servers.len();
254                let mut guard = self.mcp_config.write();
255                *guard = Some(config);
256                debug!(
257                    server_count,
258                    "MCP config loaded successfully (global + project)"
259                );
260            }
261            Ok(None) => {
262                debug!("No MCP config found (optional)");
263            }
264            Err(e) => {
265                use crate::error::LoadError;
266                report.add_error(LoadError::error(
267                    "mcp_config",
268                    format!("Failed to parse MCP config: {}", e),
269                ));
270            }
271        }
272    }
273
274    /// Load rules from CLAUDE.md files
275    async fn load_rules(&self, report: &mut LoadReport) {
276        match Rules::load(&self.claude_home, self.project_path.as_deref()) {
277            Ok(rules) => {
278                let has_global = rules.global.is_some();
279                let has_project = rules.project.is_some();
280                let mut guard = self.rules.write();
281                *guard = rules;
282                debug!(has_global, has_project, "Rules loaded");
283            }
284            Err(e) => {
285                use crate::error::LoadError;
286                report.add_error(LoadError::error(
287                    "rules",
288                    format!("Failed to load rules: {}", e),
289                ));
290            }
291        }
292    }
293
294    /// Scan all sessions
295    async fn scan_sessions(&self, report: &mut LoadReport) {
296        let projects_dir = self.claude_home.join("projects");
297
298        if !projects_dir.exists() {
299            report.add_warning(
300                "sessions",
301                format!("Projects directory not found: {}", projects_dir.display()),
302            );
303            return;
304        }
305
306        let mut parser =
307            SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
308
309        // Enable metadata cache if available (90% speedup)
310        if let Some(ref cache) = self.metadata_cache {
311            parser = parser.with_cache(cache.clone());
312        }
313
314        let sessions = parser.scan_all(&projects_dir, report).await;
315
316        // Enforce max count limit
317        let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
318            warn!(
319                total = sessions.len(),
320                limit = self.config.max_session_metadata_count,
321                "Session count exceeds limit, keeping most recent"
322            );
323
324            let mut sorted = sessions;
325            sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
326            sorted.truncate(self.config.max_session_metadata_count);
327            sorted
328        } else {
329            sessions
330        };
331
332        // Insert into DashMap (wrap in Arc for cheap cloning)
333        for session in sessions_to_add {
334            self.sessions.insert(session.id.clone(), Arc::new(session));
335        }
336
337        debug!(count = self.sessions.len(), "Sessions indexed");
338    }
339
340    /// Update degraded state based on load report
341    fn update_degraded_state(&self, report: &LoadReport) {
342        let mut state = self.degraded_state.write();
343
344        if report.has_fatal_errors() {
345            *state = DegradedState::ReadOnly {
346                reason: "Fatal errors during load".to_string(),
347            };
348            return;
349        }
350
351        let mut missing = Vec::new();
352
353        if !report.stats_loaded {
354            missing.push("stats".to_string());
355        }
356        if !report.settings_loaded {
357            missing.push("settings".to_string());
358        }
359        if report.sessions_failed > 0 {
360            missing.push(format!("{} sessions", report.sessions_failed));
361        }
362
363        if missing.is_empty() {
364            *state = DegradedState::Healthy;
365        } else {
366            *state = DegradedState::PartialData {
367                missing: missing.clone(),
368                reason: format!("Missing: {}", missing.join(", ")),
369            };
370        }
371    }
372
373    // ===================
374    // Read accessors
375    // ===================
376
377    /// Get a clone of stats
378    pub fn stats(&self) -> Option<StatsCache> {
379        self.stats.read().clone()
380    }
381
382    /// Calculate context window saturation from current sessions
383    pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
384        // Clone Arc (cheap) to avoid lifetime issues with DashMap iterators
385        let sessions: Vec<_> = self
386            .sessions
387            .iter()
388            .map(|entry| Arc::clone(entry.value()))
389            .collect();
390        // Dereference Arc to get &SessionMetadata
391        let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
392        crate::models::StatsCache::calculate_context_saturation(&refs, 30)
393    }
394
395    /// Get merged settings
396    pub fn settings(&self) -> MergedConfig {
397        self.settings.read().clone()
398    }
399
400    /// Get MCP server configuration
401    pub fn mcp_config(&self) -> Option<McpConfig> {
402        self.mcp_config.read().clone()
403    }
404
405    /// Get rules
406    pub fn rules(&self) -> Rules {
407        self.rules.read().clone()
408    }
409
410    /// Get invocation statistics
411    pub fn invocation_stats(&self) -> InvocationStats {
412        self.invocation_stats.read().clone()
413    }
414
415    /// Get live Claude Code sessions (running processes)
416    ///
417    /// Detects active Claude processes on the system and returns metadata.
418    /// Returns empty vector if detection fails or no processes are running.
419    pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
420        crate::live_monitor::detect_live_sessions().unwrap_or_default()
421    }
422
423    /// Get session count
424    pub fn session_count(&self) -> usize {
425        self.sessions.len()
426    }
427
428    /// Get session by ID
429    /// Returns Arc<SessionMetadata> for cheap cloning
430    pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
431        self.sessions.get(id).map(|r| Arc::clone(r.value()))
432    }
433
434    /// Get analytics data for a period (cached)
435    ///
436    /// Returns cached analytics if available, otherwise None.
437    /// Call `compute_analytics()` to compute and cache.
438    pub fn analytics(&self) -> Option<AnalyticsData> {
439        let analytics = self.analytics_cache.read().clone();
440        debug!(
441            has_analytics = analytics.is_some(),
442            "analytics() getter called"
443        );
444        analytics
445    }
446
447    /// Compute and cache analytics data for a period
448    ///
449    /// This is a CPU-intensive operation (trends, forecasting, patterns).
450    /// For 1000+ sessions, this may take 100-300ms, so it's offloaded
451    /// to a blocking task.
452    ///
453    /// Cache is invalidated on stats reload or session updates (EventBus pattern).
454    pub async fn compute_analytics(&self, period: Period) {
455        let sessions: Vec<_> = self
456            .sessions
457            .iter()
458            .map(|r| Arc::clone(r.value()))
459            .collect();
460
461        info!(
462            session_count = sessions.len(),
463            period = ?period,
464            "compute_analytics() ENTRY"
465        );
466
467        // Offload to blocking task for CPU-intensive computation
468        let analytics =
469            tokio::task::spawn_blocking(move || AnalyticsData::compute(&sessions, period)).await;
470
471        match analytics {
472            Ok(data) => {
473                info!(
474                    insights_count = data.insights.len(),
475                    "compute_analytics() computed data"
476                );
477                let mut guard = self.analytics_cache.write();
478                *guard = Some(data);
479                self.event_bus.publish(DataEvent::AnalyticsUpdated);
480                info!("compute_analytics() EXIT - cached and event published");
481            }
482            Err(e) => {
483                warn!(error = %e, "Failed to compute analytics (task panicked)");
484            }
485        }
486    }
487
488    /// Invalidate analytics cache (called on data changes)
489    ///
490    /// Note: Currently unused to prevent aggressive invalidation.
491    /// Kept for future use if smart invalidation is needed.
492    #[allow(dead_code)]
493    fn invalidate_analytics_cache(&self) {
494        let mut guard = self.analytics_cache.write();
495        *guard = None;
496        debug!("Analytics cache invalidated");
497    }
498
499    /// Get all session IDs
500    pub fn session_ids(&self) -> Vec<SessionId> {
501        self.sessions.iter().map(|r| r.key().clone()).collect()
502    }
503
504    /// Clear session content cache (for memory optimization on F5)
505    pub fn clear_session_content_cache(&self) {
506        self.session_content_cache.invalidate_all();
507        debug!("Session content cache cleared");
508    }
509
510    /// Get sessions grouped by project
511    /// Returns Arc<SessionMetadata> for cheap cloning
512    pub fn sessions_by_project(
513        &self,
514    ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
515        let mut by_project = std::collections::HashMap::new();
516
517        for entry in self.sessions.iter() {
518            let session = Arc::clone(entry.value());
519            by_project
520                .entry(session.project_path.as_str().to_string())
521                .or_insert_with(Vec::new)
522                .push(session);
523        }
524
525        // Sort sessions within each project by timestamp (newest first)
526        for sessions in by_project.values_mut() {
527            sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
528        }
529
530        by_project
531    }
532
533    /// Get all sessions (unsorted)
534    /// Returns Arc<SessionMetadata> for cheap cloning
535    pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
536        self.sessions
537            .iter()
538            .map(|r| Arc::clone(r.value()))
539            .collect()
540    }
541
542    /// Get recent sessions (sorted by last timestamp, newest first)
543    /// Returns Arc<SessionMetadata> for cheap cloning
544    pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
545        let mut sessions = self.all_sessions();
546        sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
547        sessions.truncate(limit);
548        sessions
549    }
550
551    /// Get top sessions by total tokens (sorted descending)
552    /// Returns Arc<SessionMetadata> for cheap cloning
553    pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
554        let mut sessions: Vec<_> = self
555            .sessions
556            .iter()
557            .map(|r| Arc::clone(r.value()))
558            .collect();
559        sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
560        sessions.truncate(limit);
561        sessions
562    }
563
564    /// Get top models by total tokens (aggregated, sorted descending)
565    /// Returns (model_name, total_tokens) pairs
566    pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
567        let mut model_totals = std::collections::HashMap::new();
568
569        // Aggregate tokens per model across all sessions
570        for session in self.sessions.iter() {
571            for model in &session.value().models_used {
572                *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
573            }
574        }
575
576        // Convert to vec and sort
577        let mut results: Vec<_> = model_totals.into_iter().collect();
578        results.sort_by(|a, b| b.1.cmp(&a.1));
579        results.truncate(10); // Top 10
580        results
581    }
582
583    /// Get top days by total tokens (aggregated, sorted descending)
584    /// Returns (date_string, total_tokens) pairs
585    pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
586        let mut day_totals = std::collections::HashMap::new();
587
588        // Aggregate tokens per day across all sessions
589        for session in self.sessions.iter() {
590            if let Some(timestamp) = &session.value().first_timestamp {
591                let date = timestamp.format("%Y-%m-%d").to_string();
592                *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
593            }
594        }
595
596        // Convert to vec and sort
597        let mut results: Vec<_> = day_totals.into_iter().collect();
598        results.sort_by(|a, b| b.1.cmp(&a.1));
599        results.truncate(10); // Top 10
600        results
601    }
602
603    /// Get project leaderboard with aggregated metrics
604    ///
605    /// Returns all projects with session count, total tokens, total cost, and average session cost.
606    /// Cost is calculated using accurate model-based pricing from the pricing module.
607    pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
608        let mut project_metrics = std::collections::HashMap::new();
609
610        // Aggregate metrics per project
611        for session in self.sessions.iter() {
612            let metadata = session.value();
613            let project_path = &metadata.project_path;
614
615            // Get model for this session (use first model, or "unknown")
616            let model = metadata
617                .models_used
618                .first()
619                .map(|s| s.as_str())
620                .unwrap_or("unknown");
621
622            // Calculate cost using accurate pricing
623            let cost = crate::pricing::calculate_cost(
624                model,
625                metadata.input_tokens,
626                metadata.output_tokens,
627                metadata.cache_creation_tokens,
628                metadata.cache_read_tokens,
629            );
630
631            let entry = project_metrics
632                .entry(project_path.clone())
633                .or_insert((0, 0u64, 0.0f64)); // (session_count, total_tokens, total_cost)
634
635            entry.0 += 1; // session count
636            entry.1 += metadata.total_tokens; // total tokens
637            entry.2 += cost; // total cost
638        }
639
640        // Convert to leaderboard entries
641        let mut results: Vec<_> = project_metrics
642            .into_iter()
643            .map(
644                |(project_path, (session_count, total_tokens, total_cost))| {
645                    let avg_session_cost = if session_count > 0 {
646                        total_cost / session_count as f64
647                    } else {
648                        0.0
649                    };
650
651                    // Extract project name from path (last component)
652                    let project_name = std::path::Path::new(project_path.as_str())
653                        .file_name()
654                        .and_then(|n| n.to_str())
655                        .unwrap_or(project_path.as_str())
656                        .to_string();
657
658                    ProjectLeaderboardEntry {
659                        project_name,
660                        total_sessions: session_count,
661                        total_tokens,
662                        total_cost,
663                        avg_session_cost,
664                    }
665                },
666            )
667            .collect();
668
669        // Default sort: by total cost descending
670        results.sort_by(|a, b| {
671            b.total_cost
672                .partial_cmp(&a.total_cost)
673                .unwrap_or(std::cmp::Ordering::Equal)
674        });
675
676        results
677    }
678
679    // ===================
680    // Update methods (called by watcher)
681    // ===================
682
683    /// Reload stats (called on file change)
684    pub async fn reload_stats(&self) {
685        let stats_path = self.claude_home.join("stats-cache.json");
686        let parser = StatsParser::new()
687            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
688
689        let mut report = LoadReport::new();
690        if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
691            // Recalculate costs using accurate pricing
692            stats.recalculate_costs();
693            let mut guard = self.stats.write();
694            *guard = Some(stats);
695
696            // Don't invalidate analytics - it will auto-recompute if needed
697            // Instead, just publish the event so UI can decide whether to recompute
698            self.event_bus.publish(DataEvent::StatsUpdated);
699            debug!("Stats reloaded with recalculated costs");
700        }
701    }
702
703    /// Reload settings from files (called when settings change)
704    pub async fn reload_settings(&self) {
705        let parser = SettingsParser::new();
706        let merged = parser
707            .load_merged(
708                &self.claude_home,
709                self.project_path.as_deref(),
710                &mut LoadReport::new(),
711            )
712            .await;
713
714        {
715            let mut guard = self.settings.write();
716            *guard = merged;
717        }
718
719        self.event_bus
720            .publish(DataEvent::ConfigChanged(ConfigScope::Global));
721        debug!("Settings reloaded");
722    }
723
724    /// Add or update a session (called when session file changes)
725    pub async fn update_session(&self, path: &Path) {
726        let parser = SessionIndexParser::new();
727
728        match parser.scan_session(path).await {
729            Ok(meta) => {
730                let id = meta.id.clone();
731                let is_new = !self.sessions.contains_key(&id);
732
733                self.sessions.insert(id.clone(), Arc::new(meta));
734
735                // Don't invalidate analytics on every session update - too aggressive
736                // Analytics will be recomputed on demand or periodically
737                // Only invalidate on significant changes (detected by UI)
738
739                if is_new {
740                    self.event_bus.publish(DataEvent::SessionCreated(id));
741                } else {
742                    self.event_bus.publish(DataEvent::SessionUpdated(id));
743                }
744            }
745            Err(e) => {
746                warn!(path = %path.display(), error = %e, "Failed to update session");
747            }
748        }
749    }
750
751    /// Compute invocation statistics from all sessions
752    ///
753    /// This scans all session files to count agent/command/skill invocations.
754    /// Should be called after initial load or when sessions are updated.
755    pub async fn compute_invocations(&self) {
756        let paths: Vec<_> = self
757            .sessions
758            .iter()
759            .map(|r| r.value().file_path.clone())
760            .collect();
761
762        debug!(session_count = paths.len(), "Computing invocation stats");
763
764        let parser = InvocationParser::new();
765        let stats = parser.scan_sessions(&paths).await;
766
767        let mut guard = self.invocation_stats.write();
768        *guard = stats;
769
770        debug!(
771            agents = guard.agents.len(),
772            commands = guard.commands.len(),
773            skills = guard.skills.len(),
774            total = guard.total_invocations(),
775            "Invocation stats computed"
776        );
777
778        // Note: Using LoadCompleted as there's no specific invocation stats event
779        self.event_bus.publish(DataEvent::LoadCompleted);
780    }
781
782    /// Compute billing blocks from all sessions
783    ///
784    /// This scans all sessions with timestamps and aggregates usage into 5-hour billing blocks.
785    /// Uses real model pricing based on token breakdown for accurate cost calculation.
786    pub async fn compute_billing_blocks(&self) {
787        debug!("Computing billing blocks from sessions with real pricing");
788
789        let mut manager = BillingBlockManager::new();
790        let mut sessions_with_timestamps = 0;
791        let mut sessions_without_timestamps = 0;
792
793        for session in self.sessions.iter() {
794            let metadata = session.value();
795
796            // Skip sessions without timestamps
797            let Some(timestamp) = &metadata.first_timestamp else {
798                sessions_without_timestamps += 1;
799                continue;
800            };
801
802            sessions_with_timestamps += 1;
803
804            // Get model for this session (use first model, or "unknown")
805            let model = metadata
806                .models_used
807                .first()
808                .map(|s| s.as_str())
809                .unwrap_or("unknown");
810
811            // Calculate real cost using pricing table
812            let cost = crate::pricing::calculate_cost(
813                model,
814                metadata.input_tokens,
815                metadata.output_tokens,
816                metadata.cache_creation_tokens,
817                metadata.cache_read_tokens,
818            );
819
820            manager.add_usage(
821                timestamp,
822                metadata.input_tokens,
823                metadata.output_tokens,
824                metadata.cache_creation_tokens,
825                metadata.cache_read_tokens,
826                cost,
827            );
828        }
829
830        debug!(
831            sessions_with_timestamps,
832            sessions_without_timestamps,
833            blocks = manager.get_all_blocks().len(),
834            "Billing blocks computed with real pricing"
835        );
836
837        let mut guard = self.billing_blocks.write();
838        *guard = manager;
839
840        self.event_bus.publish(DataEvent::LoadCompleted);
841    }
842
843    /// Get billing blocks (read-only access)
844    pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
845        self.billing_blocks.read()
846    }
847
848    /// Calculate usage estimate based on billing blocks and subscription plan
849    pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
850        let settings = self.settings();
851        let plan = settings
852            .merged
853            .subscription_plan
854            .as_ref()
855            .map(|s| crate::usage_estimator::SubscriptionPlan::from_str(s))
856            .unwrap_or_default();
857
858        let billing_blocks = self.billing_blocks.read();
859        crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
860    }
861}
862
863#[cfg(test)]
864mod tests {
865    use super::*;
866    use tempfile::tempdir;
867
868    #[tokio::test]
869    async fn test_data_store_creation() {
870        let dir = tempdir().unwrap();
871        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
872
873        assert_eq!(store.session_count(), 0);
874        assert!(store.stats().is_none());
875        assert!(store.degraded_state().is_healthy());
876    }
877
878    #[tokio::test]
879    async fn test_initial_load_missing_dir() {
880        let dir = tempdir().unwrap();
881        let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
882
883        let report = store.initial_load().await;
884
885        // Should have warnings but not crash
886        assert!(report.has_errors());
887        assert!(store.degraded_state().is_degraded());
888    }
889
890    #[tokio::test]
891    async fn test_initial_load_with_stats() {
892        let dir = tempdir().unwrap();
893        let claude_home = dir.path();
894
895        // Create stats file with new format
896        std::fs::write(
897            claude_home.join("stats-cache.json"),
898            r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
899        )
900        .unwrap();
901
902        // Create projects dir
903        std::fs::create_dir_all(claude_home.join("projects")).unwrap();
904
905        let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
906        let report = store.initial_load().await;
907
908        assert!(report.stats_loaded);
909        let stats = store.stats().unwrap();
910        assert_eq!(stats.total_tokens(), 1000);
911        assert_eq!(stats.session_count(), 5);
912    }
913
914    #[tokio::test]
915    async fn test_event_bus_subscription() {
916        let dir = tempdir().unwrap();
917        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
918
919        let mut rx = store.event_bus().subscribe();
920
921        // Trigger load completed
922        store.event_bus().publish(DataEvent::StatsUpdated);
923
924        let event = rx.recv().await.unwrap();
925        assert!(matches!(event, DataEvent::StatsUpdated));
926    }
927
928    #[tokio::test]
929    async fn test_analytics_cache_and_invalidation() {
930        use crate::models::session::SessionMetadata;
931        use chrono::Utc;
932
933        let dir = tempdir().unwrap();
934        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
935
936        // Add test sessions
937        let now = Utc::now();
938        for i in 0..10 {
939            let total_tokens = 1000 * (i as u64 + 1);
940            let session = SessionMetadata {
941                id: format!("test-{}", i).into(),
942                file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
943                project_path: "/test".into(),
944                first_timestamp: Some(now - chrono::Duration::days(i)),
945                last_timestamp: Some(now),
946                message_count: 10,
947                total_tokens,
948                input_tokens: total_tokens / 2,
949                output_tokens: total_tokens / 3,
950                cache_creation_tokens: total_tokens / 10,
951                cache_read_tokens: total_tokens
952                    - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
953                models_used: vec!["sonnet".to_string()],
954                file_size_bytes: 1024,
955                first_user_message: None,
956                has_subagents: false,
957                duration_seconds: Some(1800),
958                branch: None,
959                tool_usage: std::collections::HashMap::new(),
960            };
961            store.sessions.insert(session.id.clone(), Arc::new(session));
962        }
963
964        // Initially no analytics
965        assert!(store.analytics().is_none());
966
967        // Compute analytics
968        store.compute_analytics(Period::last_7d()).await;
969
970        // Analytics should be cached
971        let analytics1 = store.analytics().expect("Analytics should be cached");
972        assert!(!analytics1.trends.is_empty());
973        assert_eq!(analytics1.period, Period::last_7d());
974
975        // Invalidate by reloading stats
976        store.invalidate_analytics_cache();
977        assert!(store.analytics().is_none(), "Cache should be invalidated");
978
979        // Re-compute with different period
980        store.compute_analytics(Period::last_30d()).await;
981        let analytics2 = store.analytics().expect("Analytics should be re-cached");
982        assert_eq!(analytics2.period, Period::last_30d());
983    }
984
985    #[tokio::test]
986    async fn test_leaderboard_methods() {
987        use crate::models::session::SessionMetadata;
988        use chrono::Utc;
989
990        let dir = tempdir().unwrap();
991        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
992
993        let now = Utc::now();
994
995        // Add sessions with varying tokens
996        let test_data = vec![
997            ("session-1", 5000u64, "opus", 0),
998            ("session-2", 3000u64, "sonnet", 1),
999            ("session-3", 8000u64, "haiku", 0),
1000            ("session-4", 2000u64, "sonnet", 2),
1001            ("session-5", 10000u64, "opus", 0),
1002        ];
1003
1004        for (id, tokens, model, days_ago) in test_data {
1005            let session = SessionMetadata {
1006                id: id.into(),
1007                file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1008                project_path: "/test".into(),
1009                first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1010                last_timestamp: Some(now),
1011                message_count: 10,
1012                total_tokens: tokens,
1013                input_tokens: tokens / 2,
1014                output_tokens: tokens / 2,
1015                cache_creation_tokens: 0,
1016                cache_read_tokens: 0,
1017                models_used: vec![model.to_string()],
1018                file_size_bytes: 1024,
1019                first_user_message: None,
1020                has_subagents: false,
1021                duration_seconds: Some(1800),
1022                branch: None,
1023                tool_usage: std::collections::HashMap::new(),
1024            };
1025            store.sessions.insert(session.id.clone(), Arc::new(session));
1026        }
1027
1028        // Test top_sessions_by_tokens
1029        let top_sessions = store.top_sessions_by_tokens(3);
1030        assert_eq!(top_sessions.len(), 3);
1031        assert_eq!(top_sessions[0].id, "session-5"); // 10000 tokens
1032        assert_eq!(top_sessions[1].id, "session-3"); // 8000 tokens
1033        assert_eq!(top_sessions[2].id, "session-1"); // 5000 tokens
1034
1035        // Test top_models_by_tokens
1036        let top_models = store.top_models_by_tokens();
1037        assert!(!top_models.is_empty());
1038        // opus: 15000 (5000+10000), sonnet: 5000 (3000+2000), haiku: 8000
1039        assert_eq!(top_models[0].0, "opus");
1040        assert_eq!(top_models[0].1, 15000);
1041        assert_eq!(top_models[1].0, "haiku");
1042        assert_eq!(top_models[1].1, 8000);
1043
1044        // Test top_days_by_tokens
1045        let top_days = store.top_days_by_tokens();
1046        assert!(!top_days.is_empty());
1047        // Day 0 (today): 5000+8000+10000 = 23000
1048        let today = now.format("%Y-%m-%d").to_string();
1049        assert_eq!(top_days[0].0, today);
1050        assert_eq!(top_days[0].1, 23000);
1051    }
1052}