1use crate::analytics::{AnalyticsData, Period};
7use crate::cache::{MetadataCache, StoredAlert};
8use crate::error::{CoreError, DegradedState, LoadReport};
9use crate::event::{ConfigScope, DataEvent, EventBus};
10use crate::models::activity::ActivitySummary;
11use crate::models::{
12 BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
13};
14use crate::parsers::{
15 classify_tool_calls, parse_claude_global, parse_tool_calls, ClaudeGlobalStats,
16 InvocationParser, McpConfig, Rules, SessionContentParser, SessionIndexParser, SettingsParser,
17 StatsParser,
18};
19use dashmap::DashMap;
20use moka::future::Cache;
21use parking_lot::RwLock; use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::time::Duration;
25use tracing::{debug, info, warn};
26
27#[derive(Debug, Clone)]
29pub struct DataStoreConfig {
30 pub max_session_metadata_count: usize,
32
33 pub max_session_content_cache_mb: usize,
35
36 pub max_concurrent_scans: usize,
38
39 pub stats_retry_count: u32,
41
42 pub stats_retry_delay: Duration,
44}
45
46impl Default for DataStoreConfig {
47 fn default() -> Self {
48 Self {
49 max_session_metadata_count: 10_000,
50 max_session_content_cache_mb: 100,
51 max_concurrent_scans: 8,
52 stats_retry_count: 3,
53 stats_retry_delay: Duration::from_millis(100),
54 }
55 }
56}
57
58pub struct DataStore {
63 claude_home: PathBuf,
65
66 project_path: Option<PathBuf>,
68
69 config: DataStoreConfig,
71
72 stats: RwLock<Option<StatsCache>>,
74
75 settings: RwLock<MergedConfig>,
77
78 mcp_config: RwLock<Option<McpConfig>>,
80
81 rules: RwLock<Rules>,
83
84 invocation_stats: RwLock<InvocationStats>,
86
87 billing_blocks: RwLock<BillingBlockManager>,
89
90 analytics_cache: RwLock<Option<AnalyticsData>>,
92
93 sessions: DashMap<SessionId, Arc<SessionMetadata>>,
100
101 #[allow(dead_code)]
103 session_content_cache: Cache<SessionId, Vec<String>>,
104
105 event_bus: EventBus,
107
108 degraded_state: RwLock<DegradedState>,
110
111 metadata_cache: Option<Arc<MetadataCache>>,
113
114 activity_results: DashMap<String, ActivitySummary>,
116
117 live_hook_sessions: RwLock<crate::hook_state::LiveSessionFile>,
119
120 claude_global_stats: RwLock<Option<ClaudeGlobalStats>>,
122}
123
124#[derive(Debug, Clone)]
126pub struct ProjectLeaderboardEntry {
127 pub project_name: String,
128 pub total_sessions: usize,
129 pub total_tokens: u64,
130 pub total_cost: f64,
131 pub avg_session_cost: f64,
132}
133
134impl DataStore {
135 pub fn new(
137 claude_home: PathBuf,
138 project_path: Option<PathBuf>,
139 config: DataStoreConfig,
140 ) -> Self {
141 let session_content_cache = Cache::builder()
142 .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) .time_to_idle(Duration::from_secs(300)) .build();
145
146 let metadata_cache = {
148 let cache_dir = claude_home.join("cache");
149 match MetadataCache::new(&cache_dir) {
150 Ok(cache) => {
151 debug!(path = %cache_dir.display(), "Metadata cache enabled");
152 Some(Arc::new(cache))
153 }
154 Err(e) => {
155 warn!(error = %e, "Failed to create metadata cache, running without cache");
156 None
157 }
158 }
159 };
160
161 Self {
162 claude_home,
163 project_path,
164 config,
165 stats: RwLock::new(None),
166 settings: RwLock::new(MergedConfig::default()),
167 mcp_config: RwLock::new(None),
168 rules: RwLock::new(Rules::default()),
169 invocation_stats: RwLock::new(InvocationStats::new()),
170 billing_blocks: RwLock::new(BillingBlockManager::new()),
171 analytics_cache: RwLock::new(None),
172 sessions: DashMap::new(),
173 session_content_cache,
174 event_bus: EventBus::default_capacity(),
175 degraded_state: RwLock::new(DegradedState::Healthy),
176 metadata_cache,
177 activity_results: DashMap::new(),
178 live_hook_sessions: RwLock::new(crate::hook_state::LiveSessionFile::default()),
179 claude_global_stats: RwLock::new(None),
180 }
181 }
182
183 pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
185 Self::new(claude_home, project_path, DataStoreConfig::default())
186 }
187
188 pub fn event_bus(&self) -> &EventBus {
190 &self.event_bus
191 }
192
193 pub fn degraded_state(&self) -> DegradedState {
195 self.degraded_state.read().clone()
196 }
197
198 pub async fn initial_load(&self) -> LoadReport {
200 let mut report = LoadReport::new();
201
202 info!(claude_home = %self.claude_home.display(), "Starting initial data load");
203
204 self.load_stats(&mut report).await;
206
207 if let Some(home) = dirs::home_dir() {
209 if let Some(global) = parse_claude_global(&home) {
210 *self.claude_global_stats.write() = Some(global);
211 debug!("~/.claude.json loaded successfully");
212 }
213 }
214
215 self.load_settings(&mut report).await;
217
218 self.load_mcp_config(&mut report).await;
220
221 self.load_rules(&mut report).await;
223
224 self.scan_sessions(&mut report).await;
226
227 self.update_degraded_state(&report);
229
230 self.event_bus.publish(DataEvent::LoadCompleted);
232
233 info!(
234 stats_loaded = report.stats_loaded,
235 settings_loaded = report.settings_loaded,
236 sessions_scanned = report.sessions_scanned,
237 sessions_failed = report.sessions_failed,
238 errors = report.errors.len(),
239 "Initial load complete"
240 );
241
242 report
243 }
244
245 async fn load_stats(&self, report: &mut LoadReport) {
247 let stats_path = self.claude_home.join("stats-cache.json");
248 let parser = StatsParser::new()
249 .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
250
251 if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
252 stats.recalculate_costs();
254 let mut guard = self.stats.write();
255 *guard = Some(stats);
256 debug!("Stats loaded successfully with recalculated costs");
257 }
258 }
259
260 async fn load_settings(&self, report: &mut LoadReport) {
262 let parser = SettingsParser::new();
263 let merged = parser
264 .load_merged(&self.claude_home, self.project_path.as_deref(), report)
265 .await;
266
267 let mut guard = self.settings.write();
268 *guard = merged;
269 debug!("Settings loaded and merged");
270 }
271
272 async fn load_mcp_config(&self, report: &mut LoadReport) {
274 match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
275 Ok(Some(config)) => {
276 let server_count = config.servers.len();
277 let mut guard = self.mcp_config.write();
278 *guard = Some(config);
279 debug!(
280 server_count,
281 "MCP config loaded successfully (global + project)"
282 );
283 }
284 Ok(None) => {
285 debug!("No MCP config found (optional)");
286 }
287 Err(e) => {
288 use crate::error::LoadError;
289 report.add_error(LoadError::error(
290 "mcp_config",
291 format!("Failed to parse MCP config: {}", e),
292 ));
293 }
294 }
295 }
296
297 async fn load_rules(&self, report: &mut LoadReport) {
299 match Rules::load(&self.claude_home, self.project_path.as_deref()) {
300 Ok(rules) => {
301 let has_global = rules.global.is_some();
302 let has_project = rules.project.is_some();
303 let mut guard = self.rules.write();
304 *guard = rules;
305 debug!(has_global, has_project, "Rules loaded");
306 }
307 Err(e) => {
308 use crate::error::LoadError;
309 report.add_error(LoadError::error(
310 "rules",
311 format!("Failed to load rules: {}", e),
312 ));
313 }
314 }
315 }
316
317 async fn scan_sessions(&self, report: &mut LoadReport) {
319 let projects_dir = self.claude_home.join("projects");
320
321 if !projects_dir.exists() {
322 report.add_warning(
323 "sessions",
324 format!("Projects directory not found: {}", projects_dir.display()),
325 );
326 return;
327 }
328
329 let mut parser =
330 SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
331
332 if let Some(ref cache) = self.metadata_cache {
334 parser = parser.with_cache(cache.clone());
335 }
336
337 let sessions = parser.scan_all(&projects_dir, report).await;
338
339 let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
341 warn!(
342 total = sessions.len(),
343 limit = self.config.max_session_metadata_count,
344 "Session count exceeds limit, keeping most recent"
345 );
346
347 let mut sorted = sessions;
348 sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
349 sorted.truncate(self.config.max_session_metadata_count);
350 sorted
351 } else {
352 sessions
353 };
354
355 for session in sessions_to_add {
357 self.sessions.insert(session.id.clone(), Arc::new(session));
358 }
359
360 debug!(count = self.sessions.len(), "Sessions indexed");
361 }
362
363 fn update_degraded_state(&self, report: &LoadReport) {
365 let mut state = self.degraded_state.write();
366
367 if report.has_fatal_errors() {
368 *state = DegradedState::ReadOnly {
369 reason: "Fatal errors during load".to_string(),
370 };
371 return;
372 }
373
374 let mut missing = Vec::new();
375
376 if !report.stats_loaded {
377 missing.push("stats".to_string());
378 }
379 if !report.settings_loaded {
380 missing.push("settings".to_string());
381 }
382 if report.sessions_failed > 0 {
383 missing.push(format!("{} sessions", report.sessions_failed));
384 }
385
386 if missing.is_empty() {
387 *state = DegradedState::Healthy;
388 } else {
389 *state = DegradedState::PartialData {
390 missing: missing.clone(),
391 reason: format!("Missing: {}", missing.join(", ")),
392 };
393 }
394 }
395
396 pub fn stats(&self) -> Option<StatsCache> {
402 self.stats.read().clone()
403 }
404
405 pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
407 let sessions: Vec<_> = self
409 .sessions
410 .iter()
411 .map(|entry| Arc::clone(entry.value()))
412 .collect();
413 let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
415 crate::models::StatsCache::calculate_context_saturation(&refs, 30)
416 }
417
418 pub fn settings(&self) -> MergedConfig {
420 self.settings.read().clone()
421 }
422
423 pub fn mcp_config(&self) -> Option<McpConfig> {
425 self.mcp_config.read().clone()
426 }
427
428 pub fn rules(&self) -> Rules {
430 self.rules.read().clone()
431 }
432
433 pub fn invocation_stats(&self) -> InvocationStats {
435 self.invocation_stats.read().clone()
436 }
437
438 pub fn quota_status(&self) -> Option<crate::quota::QuotaStatus> {
442 let stats = self.stats.read().clone()?;
443 let settings = self.settings.read();
444 let budget = settings.merged.budget.as_ref()?;
445
446 Some(crate::quota::calculate_quota_status(&stats, budget))
447 }
448
449 pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
454 crate::live_monitor::detect_live_sessions().unwrap_or_default()
455 }
456
457 pub fn merged_live_sessions(&self) -> Vec<crate::live_monitor::MergedLiveSession> {
461 let hook_file = self.live_hook_sessions.read().clone();
462 let ps_sessions = crate::live_monitor::detect_live_sessions().unwrap_or_default();
463 crate::live_monitor::merge_live_sessions(&hook_file, &ps_sessions)
464 }
465
466 pub async fn reload_live_hook_sessions(&self, path: &std::path::Path) {
468 match crate::hook_state::LiveSessionFile::load(path) {
469 Ok(file) => {
470 *self.live_hook_sessions.write() = file;
471 debug!("Reloaded live hook sessions from {}", path.display());
472 }
473 Err(e) => {
474 warn!(error = %e, "Failed to reload live-sessions.json");
475 }
476 }
477 }
478
479 pub fn claude_global_stats(&self) -> Option<ClaudeGlobalStats> {
481 self.claude_global_stats.read().clone()
482 }
483
484 pub fn session_count(&self) -> usize {
486 self.sessions.len()
487 }
488
489 pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
492 self.sessions.get(id).map(|r| Arc::clone(r.value()))
493 }
494
495 pub async fn load_session_content(
508 &self,
509 session_id: &str,
510 ) -> Result<Vec<crate::models::ConversationMessage>, CoreError> {
511 let metadata = self
513 .get_session(session_id)
514 .ok_or_else(|| CoreError::SessionNotFound {
515 session_id: session_id.to_string(),
516 })?;
517
518 let session_id_owned = SessionId::from(session_id);
520 if let Some(_cached) = self.session_content_cache.get(&session_id_owned).await {
521 debug!(session_id, "Session content cache HIT");
522 }
525
526 debug!(
528 session_id,
529 path = %metadata.file_path.display(),
530 "Session content cache MISS, parsing JSONL"
531 );
532
533 let messages = SessionContentParser::parse_conversation(
534 &metadata.file_path,
535 (*metadata).clone(), )
537 .await?;
538
539 Ok(messages)
543 }
544
545 pub fn analytics(&self) -> Option<AnalyticsData> {
550 let analytics = self.analytics_cache.read().clone();
551 debug!(
552 has_analytics = analytics.is_some(),
553 "analytics() getter called"
554 );
555 analytics
556 }
557
558 pub async fn compute_analytics(&self, period: Period) {
566 let sessions: Vec<_> = self
567 .sessions
568 .iter()
569 .map(|r| Arc::clone(r.value()))
570 .collect();
571
572 info!(
573 session_count = sessions.len(),
574 period = ?period,
575 "compute_analytics() ENTRY"
576 );
577
578 let analytics =
580 tokio::task::spawn_blocking(move || AnalyticsData::compute(&sessions, period)).await;
581
582 match analytics {
583 Ok(data) => {
584 info!(
585 insights_count = data.insights.len(),
586 "compute_analytics() computed data"
587 );
588 let mut guard = self.analytics_cache.write();
589 *guard = Some(data);
590 self.event_bus.publish(DataEvent::AnalyticsUpdated);
591 info!("compute_analytics() EXIT - cached and event published");
592 }
593 Err(e) => {
594 warn!(error = %e, "Failed to compute analytics (task panicked)");
595 }
596 }
597 }
598
599 #[allow(dead_code)]
604 fn invalidate_analytics_cache(&self) {
605 let mut guard = self.analytics_cache.write();
606 *guard = None;
607 debug!("Analytics cache invalidated");
608 }
609
610 pub fn session_ids(&self) -> Vec<SessionId> {
612 self.sessions.iter().map(|r| r.key().clone()).collect()
613 }
614
615 pub fn clear_session_content_cache(&self) {
617 self.session_content_cache.invalidate_all();
618 debug!("Session content cache cleared");
619 }
620
621 pub fn sessions_by_project(
624 &self,
625 ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
626 let mut by_project = std::collections::HashMap::new();
627
628 for entry in self.sessions.iter() {
629 let session = Arc::clone(entry.value());
630 by_project
631 .entry(session.project_path.as_str().to_string())
632 .or_insert_with(Vec::new)
633 .push(session);
634 }
635
636 for sessions in by_project.values_mut() {
638 sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
639 }
640
641 by_project
642 }
643
644 pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
647 self.sessions
648 .iter()
649 .map(|r| Arc::clone(r.value()))
650 .collect()
651 }
652
653 pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
656 let mut sessions = self.all_sessions();
657 sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
658 sessions.truncate(limit);
659 sessions
660 }
661
662 pub fn search_sessions(&self, query: &str, limit: usize) -> Vec<crate::cache::SearchResult> {
666 if let Some(ref cache) = self.metadata_cache {
667 match cache.search_sessions(query, limit) {
668 Ok(results) => results,
669 Err(e) => {
670 warn!("FTS5 search failed: {}", e);
671 Vec::new()
672 }
673 }
674 } else {
675 Vec::new()
676 }
677 }
678
679 pub async fn analyze_session(&self, session_id: &str) -> anyhow::Result<ActivitySummary> {
684 use std::time::SystemTime;
685
686 let metadata = self
687 .get_session(session_id)
688 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
689
690 let path = &metadata.file_path;
691
692 let mtime = tokio::fs::metadata(path)
695 .await
696 .and_then(|m| m.modified())
697 .unwrap_or(SystemTime::UNIX_EPOCH);
698
699 if let Some(cache) = &self.metadata_cache {
701 if let Ok(Some(cached)) = cache.get_activity(path, mtime) {
702 self.activity_results
703 .insert(session_id.to_string(), cached.clone());
704 self.event_bus.publish(DataEvent::AnalyticsUpdated);
705 return Ok(cached);
706 }
707 }
708
709 let calls = parse_tool_calls(path, session_id).await?;
711
712 let project_root = path
713 .parent()
714 .and_then(|p| p.parent())
715 .map(|p| p.to_string_lossy().into_owned());
716
717 let summary = classify_tool_calls(calls, session_id, project_root.as_deref());
718
719 if let Some(cache) = &self.metadata_cache {
721 if let Err(e) = cache.put_activity(path, session_id, &summary, mtime) {
722 warn!(session_id, error = %e, "Failed to cache activity — will re-parse on restart");
723 }
724 }
725
726 self.activity_results
728 .insert(session_id.to_string(), summary.clone());
729 self.event_bus.publish(DataEvent::AnalyticsUpdated);
730
731 Ok(summary)
732 }
733
734 pub fn get_session_activity(&self, session_id: &str) -> Option<ActivitySummary> {
736 self.activity_results
737 .get(session_id)
738 .map(|r| r.value().clone())
739 }
740
741 pub fn get_all_stored_alerts(&self, min_severity: Option<&str>) -> Vec<StoredAlert> {
745 if let Some(cache) = &self.metadata_cache {
746 cache.get_all_alerts(min_severity).unwrap_or_default()
747 } else {
748 vec![]
749 }
750 }
751
752 pub fn all_violations(&self) -> Vec<crate::models::activity::Alert> {
757 use crate::models::activity::{Alert, AlertSeverity};
758 use std::collections::HashSet;
759
760 let mut seen_sessions: HashSet<String> = HashSet::new();
762 let mut alerts: Vec<Alert> = Vec::new();
763
764 for entry in self.activity_results.iter() {
765 seen_sessions.insert(entry.key().clone());
766 alerts.extend(entry.value().alerts.clone());
767 }
768
769 if let Some(cache) = &self.metadata_cache {
771 if let Ok(stored) = cache.get_all_alerts(None) {
772 for sa in stored {
773 let session_id = std::path::Path::new(&sa.session_path)
775 .file_stem()
776 .and_then(|s| s.to_str())
777 .unwrap_or(&sa.session_path)
778 .to_string();
779
780 if seen_sessions.contains(&session_id) {
781 continue; }
783
784 let severity = match sa.severity.as_str() {
786 "Critical" => AlertSeverity::Critical,
787 "Warning" => AlertSeverity::Warning,
788 _ => AlertSeverity::Info,
789 };
790 let category = match sa.category.as_str() {
791 "CredentialAccess" => {
792 crate::models::activity::AlertCategory::CredentialAccess
793 }
794 "DestructiveCommand" => {
795 crate::models::activity::AlertCategory::DestructiveCommand
796 }
797 "ForcePush" => crate::models::activity::AlertCategory::ForcePush,
798 "ScopeViolation" => crate::models::activity::AlertCategory::ScopeViolation,
799 _ => crate::models::activity::AlertCategory::ExternalExfil,
800 };
801 let timestamp = sa
802 .timestamp
803 .parse::<chrono::DateTime<chrono::Utc>>()
804 .unwrap_or_else(|_| chrono::Utc::now());
805
806 alerts.push(Alert {
807 session_id,
808 timestamp,
809 severity,
810 category,
811 detail: sa.detail,
812 });
813 }
814 }
815 }
816
817 alerts.sort_by(|a, b| {
819 b.severity
820 .partial_cmp(&a.severity)
821 .unwrap_or(std::cmp::Ordering::Equal)
822 .then_with(|| b.timestamp.cmp(&a.timestamp))
823 });
824
825 alerts
826 }
827
828 pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
831 let mut sessions: Vec<_> = self
832 .sessions
833 .iter()
834 .map(|r| Arc::clone(r.value()))
835 .collect();
836 sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
837 sessions.truncate(limit);
838 sessions
839 }
840
841 pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
844 let mut model_totals = std::collections::HashMap::new();
845
846 for session in self.sessions.iter() {
848 for model in &session.value().models_used {
849 *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
850 }
851 }
852
853 let mut results: Vec<_> = model_totals.into_iter().collect();
855 results.sort_by(|a, b| b.1.cmp(&a.1));
856 results.truncate(10); results
858 }
859
860 pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
863 let mut day_totals = std::collections::HashMap::new();
864
865 for session in self.sessions.iter() {
867 if let Some(timestamp) = &session.value().first_timestamp {
868 let date = timestamp.format("%Y-%m-%d").to_string();
869 *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
870 }
871 }
872
873 let mut results: Vec<_> = day_totals.into_iter().collect();
875 results.sort_by(|a, b| b.1.cmp(&a.1));
876 results.truncate(10); results
878 }
879
880 pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
885 let mut project_metrics = std::collections::HashMap::new();
886
887 for session in self.sessions.iter() {
889 let metadata = session.value();
890 let project_path = &metadata.project_path;
891
892 let model = metadata
894 .models_used
895 .first()
896 .map(|s| s.as_str())
897 .unwrap_or("unknown");
898
899 let cost = crate::pricing::calculate_cost(
901 model,
902 metadata.input_tokens,
903 metadata.output_tokens,
904 metadata.cache_creation_tokens,
905 metadata.cache_read_tokens,
906 );
907
908 let entry = project_metrics
909 .entry(project_path.clone())
910 .or_insert((0, 0u64, 0.0f64)); entry.0 += 1; entry.1 += metadata.total_tokens; entry.2 += cost; }
916
917 let mut results: Vec<_> = project_metrics
919 .into_iter()
920 .map(
921 |(project_path, (session_count, total_tokens, total_cost))| {
922 let avg_session_cost = if session_count > 0 {
923 total_cost / session_count as f64
924 } else {
925 0.0
926 };
927
928 let project_name = std::path::Path::new(project_path.as_str())
930 .file_name()
931 .and_then(|n| n.to_str())
932 .unwrap_or(project_path.as_str())
933 .to_string();
934
935 ProjectLeaderboardEntry {
936 project_name,
937 total_sessions: session_count,
938 total_tokens,
939 total_cost,
940 avg_session_cost,
941 }
942 },
943 )
944 .collect();
945
946 results.sort_by(|a, b| {
948 b.total_cost
949 .partial_cmp(&a.total_cost)
950 .unwrap_or(std::cmp::Ordering::Equal)
951 });
952
953 results
954 }
955
956 pub async fn reload_stats(&self) {
962 let stats_path = self.claude_home.join("stats-cache.json");
963 let parser = StatsParser::new()
964 .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
965
966 let mut report = LoadReport::new();
967 if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
968 stats.recalculate_costs();
970 let mut guard = self.stats.write();
971 *guard = Some(stats);
972
973 self.event_bus.publish(DataEvent::StatsUpdated);
976 debug!("Stats reloaded with recalculated costs");
977 }
978 }
979
980 pub async fn reload_settings(&self) {
982 let parser = SettingsParser::new();
983 let merged = parser
984 .load_merged(
985 &self.claude_home,
986 self.project_path.as_deref(),
987 &mut LoadReport::new(),
988 )
989 .await;
990
991 {
992 let mut guard = self.settings.write();
993 *guard = merged;
994 }
995
996 self.event_bus
997 .publish(DataEvent::ConfigChanged(ConfigScope::Global));
998 debug!("Settings reloaded");
999 }
1000
1001 pub async fn update_session(&self, path: &Path) {
1003 let parser = SessionIndexParser::new();
1004
1005 match parser.scan_session(path).await {
1006 Ok(meta) => {
1007 let id = meta.id.clone();
1008 let is_new = !self.sessions.contains_key(&id);
1009
1010 self.sessions.insert(id.clone(), Arc::new(meta));
1011
1012 if is_new {
1017 self.event_bus.publish(DataEvent::SessionCreated(id));
1018 } else {
1019 self.event_bus.publish(DataEvent::SessionUpdated(id));
1020 }
1021 }
1022 Err(e) => {
1023 warn!(path = %path.display(), error = %e, "Failed to update session");
1024 }
1025 }
1026 }
1027
1028 pub async fn compute_invocations(&self) {
1033 let paths: Vec<_> = self
1034 .sessions
1035 .iter()
1036 .map(|r| r.value().file_path.clone())
1037 .collect();
1038
1039 debug!(session_count = paths.len(), "Computing invocation stats");
1040
1041 let parser = InvocationParser::new();
1042 let mut stats = parser.scan_sessions(&paths).await;
1043
1044 for session_ref in self.sessions.iter() {
1047 let session = session_ref.value();
1048 if let Some(&task_tokens) = session.tool_token_usage.get("Task") {
1049 let agent_count =
1051 session.tool_usage.get("Task").copied().unwrap_or(0).max(1) as u64;
1052 let tokens_per_agent = task_tokens / agent_count;
1053 for agent_type in stats.agents.keys().cloned().collect::<Vec<_>>() {
1055 *stats.agent_token_stats.entry(agent_type).or_insert(0) += tokens_per_agent;
1056 }
1057 }
1058 }
1059
1060 let mut guard = self.invocation_stats.write();
1061 *guard = stats;
1062
1063 debug!(
1064 agents = guard.agents.len(),
1065 commands = guard.commands.len(),
1066 skills = guard.skills.len(),
1067 total = guard.total_invocations(),
1068 "Invocation stats computed"
1069 );
1070
1071 self.event_bus.publish(DataEvent::LoadCompleted);
1073 }
1074
1075 pub async fn compute_billing_blocks(&self) {
1080 debug!("Computing billing blocks from sessions with real pricing");
1081
1082 let mut manager = BillingBlockManager::new();
1083 let mut sessions_with_timestamps = 0;
1084 let mut sessions_without_timestamps = 0;
1085
1086 for session in self.sessions.iter() {
1087 let metadata = session.value();
1088
1089 let Some(timestamp) = &metadata.first_timestamp else {
1091 sessions_without_timestamps += 1;
1092 continue;
1093 };
1094
1095 sessions_with_timestamps += 1;
1096
1097 let model = metadata
1099 .models_used
1100 .first()
1101 .map(|s| s.as_str())
1102 .unwrap_or("unknown");
1103
1104 let cost = crate::pricing::calculate_cost(
1106 model,
1107 metadata.input_tokens,
1108 metadata.output_tokens,
1109 metadata.cache_creation_tokens,
1110 metadata.cache_read_tokens,
1111 );
1112
1113 manager.add_usage(
1114 timestamp,
1115 metadata.input_tokens,
1116 metadata.output_tokens,
1117 metadata.cache_creation_tokens,
1118 metadata.cache_read_tokens,
1119 cost,
1120 );
1121 }
1122
1123 debug!(
1124 sessions_with_timestamps,
1125 sessions_without_timestamps,
1126 blocks = manager.get_all_blocks().len(),
1127 "Billing blocks computed with real pricing"
1128 );
1129
1130 let mut guard = self.billing_blocks.write();
1131 *guard = manager;
1132
1133 self.event_bus.publish(DataEvent::LoadCompleted);
1134 }
1135
1136 pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
1138 self.billing_blocks.read()
1139 }
1140
1141 pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
1143 let settings = self.settings();
1144 let plan = settings
1145 .merged
1146 .subscription_plan
1147 .as_ref()
1148 .map(|s| crate::usage_estimator::SubscriptionPlan::parse(s))
1149 .unwrap_or_default();
1150
1151 let billing_blocks = self.billing_blocks.read();
1152 crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
1153 }
1154
1155 pub fn load_preferences(&self) -> crate::preferences::CcboardPreferences {
1157 let cache_dir = self.claude_home.join("cache");
1158 crate::preferences::CcboardPreferences::load(&cache_dir)
1159 }
1160
1161 pub fn save_preferences(
1163 &self,
1164 prefs: &crate::preferences::CcboardPreferences,
1165 ) -> anyhow::Result<()> {
1166 let cache_dir = self.claude_home.join("cache");
1167 prefs.save(&cache_dir)
1168 }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173 use super::*;
1174 use tempfile::tempdir;
1175
1176 #[tokio::test]
1177 async fn test_data_store_creation() {
1178 let dir = tempdir().unwrap();
1179 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1180
1181 assert_eq!(store.session_count(), 0);
1182 assert!(store.stats().is_none());
1183 assert!(store.degraded_state().is_healthy());
1184 }
1185
1186 #[tokio::test]
1187 async fn test_initial_load_missing_dir() {
1188 let dir = tempdir().unwrap();
1189 let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
1190
1191 let report = store.initial_load().await;
1192
1193 assert!(report.has_errors());
1195 assert!(store.degraded_state().is_degraded());
1196 }
1197
1198 #[tokio::test]
1199 async fn test_initial_load_with_stats() {
1200 let dir = tempdir().unwrap();
1201 let claude_home = dir.path();
1202
1203 std::fs::write(
1205 claude_home.join("stats-cache.json"),
1206 r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
1207 )
1208 .unwrap();
1209
1210 std::fs::create_dir_all(claude_home.join("projects")).unwrap();
1212
1213 let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
1214 let report = store.initial_load().await;
1215
1216 assert!(report.stats_loaded);
1217 let stats = store.stats().unwrap();
1218 assert_eq!(stats.total_tokens(), 1000);
1219 assert_eq!(stats.session_count(), 5);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_event_bus_subscription() {
1224 let dir = tempdir().unwrap();
1225 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1226
1227 let mut rx = store.event_bus().subscribe();
1228
1229 store.event_bus().publish(DataEvent::StatsUpdated);
1231
1232 let event = rx.recv().await.unwrap();
1233 assert!(matches!(event, DataEvent::StatsUpdated));
1234 }
1235
1236 #[tokio::test]
1237 async fn test_analytics_cache_and_invalidation() {
1238 use crate::models::session::SessionMetadata;
1239 use chrono::Utc;
1240
1241 let dir = tempdir().unwrap();
1242 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1243
1244 let now = Utc::now();
1246 for i in 0..10 {
1247 let total_tokens = 1000 * (i as u64 + 1);
1248 let session = SessionMetadata {
1249 id: format!("test-{}", i).into(),
1250 file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
1251 project_path: "/test".into(),
1252 first_timestamp: Some(now - chrono::Duration::days(i)),
1253 last_timestamp: Some(now),
1254 message_count: 10,
1255 total_tokens,
1256 input_tokens: total_tokens / 2,
1257 output_tokens: total_tokens / 3,
1258 cache_creation_tokens: total_tokens / 10,
1259 cache_read_tokens: total_tokens
1260 - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
1261 models_used: vec!["sonnet".to_string()],
1262 file_size_bytes: 1024,
1263 first_user_message: None,
1264 has_subagents: false,
1265 duration_seconds: Some(1800),
1266 branch: None,
1267 tool_usage: std::collections::HashMap::new(),
1268 tool_token_usage: std::collections::HashMap::new(),
1269 };
1270 store.sessions.insert(session.id.clone(), Arc::new(session));
1271 }
1272
1273 assert!(store.analytics().is_none());
1275
1276 store.compute_analytics(Period::last_7d()).await;
1278
1279 let analytics1 = store.analytics().expect("Analytics should be cached");
1281 assert!(!analytics1.trends.is_empty());
1282 assert_eq!(analytics1.period, Period::last_7d());
1283
1284 store.invalidate_analytics_cache();
1286 assert!(store.analytics().is_none(), "Cache should be invalidated");
1287
1288 store.compute_analytics(Period::last_30d()).await;
1290 let analytics2 = store.analytics().expect("Analytics should be re-cached");
1291 assert_eq!(analytics2.period, Period::last_30d());
1292 }
1293
1294 #[tokio::test]
1295 async fn test_leaderboard_methods() {
1296 use crate::models::session::SessionMetadata;
1297 use chrono::Utc;
1298
1299 let dir = tempdir().unwrap();
1300 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1301
1302 let now = Utc::now();
1303
1304 let test_data = vec![
1306 ("session-1", 5000u64, "opus", 0),
1307 ("session-2", 3000u64, "sonnet", 1),
1308 ("session-3", 8000u64, "haiku", 0),
1309 ("session-4", 2000u64, "sonnet", 2),
1310 ("session-5", 10000u64, "opus", 0),
1311 ];
1312
1313 for (id, tokens, model, days_ago) in test_data {
1314 let session = SessionMetadata {
1315 id: id.into(),
1316 file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1317 project_path: "/test".into(),
1318 first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1319 last_timestamp: Some(now),
1320 message_count: 10,
1321 total_tokens: tokens,
1322 input_tokens: tokens / 2,
1323 output_tokens: tokens / 2,
1324 cache_creation_tokens: 0,
1325 cache_read_tokens: 0,
1326 models_used: vec![model.to_string()],
1327 file_size_bytes: 1024,
1328 first_user_message: None,
1329 has_subagents: false,
1330 duration_seconds: Some(1800),
1331 branch: None,
1332 tool_usage: std::collections::HashMap::new(),
1333 tool_token_usage: std::collections::HashMap::new(),
1334 };
1335 store.sessions.insert(session.id.clone(), Arc::new(session));
1336 }
1337
1338 let top_sessions = store.top_sessions_by_tokens(3);
1340 assert_eq!(top_sessions.len(), 3);
1341 assert_eq!(top_sessions[0].id, "session-5"); assert_eq!(top_sessions[1].id, "session-3"); assert_eq!(top_sessions[2].id, "session-1"); let top_models = store.top_models_by_tokens();
1347 assert!(!top_models.is_empty());
1348 assert_eq!(top_models[0].0, "opus");
1350 assert_eq!(top_models[0].1, 15000);
1351 assert_eq!(top_models[1].0, "haiku");
1352 assert_eq!(top_models[1].1, 8000);
1353
1354 let top_days = store.top_days_by_tokens();
1356 assert!(!top_days.is_empty());
1357 let today = now.format("%Y-%m-%d").to_string();
1359 assert_eq!(top_days[0].0, today);
1360 assert_eq!(top_days[0].1, 23000);
1361 }
1362
1363 #[test]
1369 fn test_all_violations_dashmap_priority_over_sqlite() {
1370 use crate::models::activity::{ActivitySummary, Alert, AlertCategory, AlertSeverity};
1371 use chrono::Utc;
1372
1373 let dir = tempdir().unwrap();
1374 let claude_home = dir.path().to_path_buf();
1375 let store = DataStore::with_defaults(claude_home.clone(), None);
1376
1377 let now = Utc::now();
1378
1379 let cache = MetadataCache::new(&claude_home.join("cache"))
1381 .expect("MetadataCache should open in tempdir");
1382
1383 let sqlite_summary = ActivitySummary {
1385 alerts: vec![Alert {
1386 session_id: "shared-session".to_string(),
1387 timestamp: now,
1388 severity: AlertSeverity::Warning,
1389 category: AlertCategory::DestructiveCommand,
1390 detail: "sqlite-version".to_string(),
1391 }],
1392 ..Default::default()
1393 };
1394 cache
1395 .put_activity(
1396 std::path::Path::new("/projects/test/shared-session.jsonl"),
1397 "shared-session",
1398 &sqlite_summary,
1399 std::time::SystemTime::now(),
1400 )
1401 .expect("put_activity should succeed");
1402
1403 let sqlite_only_summary = ActivitySummary {
1405 alerts: vec![Alert {
1406 session_id: "sqlite-only-session".to_string(),
1407 timestamp: now,
1408 severity: AlertSeverity::Info,
1409 category: AlertCategory::ExternalExfil,
1410 detail: "sqlite-only-detail".to_string(),
1411 }],
1412 ..Default::default()
1413 };
1414 cache
1415 .put_activity(
1416 std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1417 "sqlite-only-session",
1418 &sqlite_only_summary,
1419 std::time::SystemTime::now(),
1420 )
1421 .expect("put_activity should succeed");
1422
1423 let store_cache = store
1430 .metadata_cache
1431 .as_ref()
1432 .expect("MetadataCache should be present in store");
1433
1434 store_cache
1435 .put_activity(
1436 std::path::Path::new("/projects/test/shared-session.jsonl"),
1437 "shared-session",
1438 &sqlite_summary,
1439 std::time::SystemTime::now(),
1440 )
1441 .expect("put_activity via store cache should succeed");
1442 store_cache
1443 .put_activity(
1444 std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1445 "sqlite-only-session",
1446 &sqlite_only_summary,
1447 std::time::SystemTime::now(),
1448 )
1449 .expect("put_activity via store cache should succeed");
1450
1451 let dashmap_summary = ActivitySummary {
1453 alerts: vec![Alert {
1454 session_id: "shared-session".to_string(),
1455 timestamp: now,
1456 severity: AlertSeverity::Critical,
1457 category: AlertCategory::ForcePush,
1458 detail: "dashmap-version".to_string(),
1459 }],
1460 ..Default::default()
1461 };
1462 store
1463 .activity_results
1464 .insert("shared-session".to_string(), dashmap_summary);
1465
1466 let violations = store.all_violations();
1468
1469 let dashmap_hit = violations.iter().find(|a| a.session_id == "shared-session");
1471 assert!(
1472 dashmap_hit.is_some(),
1473 "shared-session alert must appear in violations"
1474 );
1475 assert_eq!(
1476 dashmap_hit.unwrap().detail,
1477 "dashmap-version",
1478 "DashMap version must take priority over SQLite for shared session"
1479 );
1480 assert_eq!(
1481 dashmap_hit.unwrap().severity,
1482 AlertSeverity::Critical,
1483 "DashMap severity (Critical) must win over SQLite (Warning)"
1484 );
1485
1486 let sqlite_hit = violations
1488 .iter()
1489 .find(|a| a.session_id == "sqlite-only-session");
1490 assert!(
1491 sqlite_hit.is_some(),
1492 "sqlite-only-session must appear in violations (no DashMap entry for it)"
1493 );
1494 assert_eq!(sqlite_hit.unwrap().detail, "sqlite-only-detail");
1495
1496 let sqlite_dup = violations
1498 .iter()
1499 .filter(|a| a.session_id == "shared-session")
1500 .count();
1501 assert_eq!(
1502 sqlite_dup, 1,
1503 "shared-session must appear exactly once (DashMap wins, no SQLite duplicate)"
1504 );
1505
1506 assert_eq!(violations[0].severity, AlertSeverity::Critical);
1508 }
1509}