1use crate::analytics::{AnalyticsData, Period};
7use crate::cache::{MetadataCache, StoredAlert};
8use crate::error::{CoreError, DegradedState, LoadReport};
9use crate::event::{ConfigScope, DataEvent, EventBus};
10use crate::models::activity::ActivitySummary;
11use crate::models::{
12 BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
13};
14use crate::parsers::{
15 classify_tool_calls, parse_tool_calls, InvocationParser, McpConfig, Rules,
16 SessionContentParser, SessionIndexParser, SettingsParser, StatsParser,
17};
18use dashmap::DashMap;
19use moka::future::Cache;
20use parking_lot::RwLock; use std::path::{Path, PathBuf};
22use std::sync::Arc;
23use std::time::Duration;
24use tracing::{debug, info, warn};
25
26#[derive(Debug, Clone)]
28pub struct DataStoreConfig {
29 pub max_session_metadata_count: usize,
31
32 pub max_session_content_cache_mb: usize,
34
35 pub max_concurrent_scans: usize,
37
38 pub stats_retry_count: u32,
40
41 pub stats_retry_delay: Duration,
43}
44
45impl Default for DataStoreConfig {
46 fn default() -> Self {
47 Self {
48 max_session_metadata_count: 10_000,
49 max_session_content_cache_mb: 100,
50 max_concurrent_scans: 8,
51 stats_retry_count: 3,
52 stats_retry_delay: Duration::from_millis(100),
53 }
54 }
55}
56
57pub struct DataStore {
62 claude_home: PathBuf,
64
65 project_path: Option<PathBuf>,
67
68 config: DataStoreConfig,
70
71 stats: RwLock<Option<StatsCache>>,
73
74 settings: RwLock<MergedConfig>,
76
77 mcp_config: RwLock<Option<McpConfig>>,
79
80 rules: RwLock<Rules>,
82
83 invocation_stats: RwLock<InvocationStats>,
85
86 billing_blocks: RwLock<BillingBlockManager>,
88
89 analytics_cache: RwLock<Option<AnalyticsData>>,
91
92 sessions: DashMap<SessionId, Arc<SessionMetadata>>,
99
100 #[allow(dead_code)]
102 session_content_cache: Cache<SessionId, Vec<String>>,
103
104 event_bus: EventBus,
106
107 degraded_state: RwLock<DegradedState>,
109
110 metadata_cache: Option<Arc<MetadataCache>>,
112
113 activity_results: DashMap<String, ActivitySummary>,
115}
116
117#[derive(Debug, Clone)]
119pub struct ProjectLeaderboardEntry {
120 pub project_name: String,
121 pub total_sessions: usize,
122 pub total_tokens: u64,
123 pub total_cost: f64,
124 pub avg_session_cost: f64,
125}
126
127impl DataStore {
128 pub fn new(
130 claude_home: PathBuf,
131 project_path: Option<PathBuf>,
132 config: DataStoreConfig,
133 ) -> Self {
134 let session_content_cache = Cache::builder()
135 .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) .time_to_idle(Duration::from_secs(300)) .build();
138
139 let metadata_cache = {
141 let cache_dir = claude_home.join("cache");
142 match MetadataCache::new(&cache_dir) {
143 Ok(cache) => {
144 debug!(path = %cache_dir.display(), "Metadata cache enabled");
145 Some(Arc::new(cache))
146 }
147 Err(e) => {
148 warn!(error = %e, "Failed to create metadata cache, running without cache");
149 None
150 }
151 }
152 };
153
154 Self {
155 claude_home,
156 project_path,
157 config,
158 stats: RwLock::new(None),
159 settings: RwLock::new(MergedConfig::default()),
160 mcp_config: RwLock::new(None),
161 rules: RwLock::new(Rules::default()),
162 invocation_stats: RwLock::new(InvocationStats::new()),
163 billing_blocks: RwLock::new(BillingBlockManager::new()),
164 analytics_cache: RwLock::new(None),
165 sessions: DashMap::new(),
166 session_content_cache,
167 event_bus: EventBus::default_capacity(),
168 degraded_state: RwLock::new(DegradedState::Healthy),
169 metadata_cache,
170 activity_results: DashMap::new(),
171 }
172 }
173
174 pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
176 Self::new(claude_home, project_path, DataStoreConfig::default())
177 }
178
179 pub fn event_bus(&self) -> &EventBus {
181 &self.event_bus
182 }
183
184 pub fn degraded_state(&self) -> DegradedState {
186 self.degraded_state.read().clone()
187 }
188
189 pub async fn initial_load(&self) -> LoadReport {
191 let mut report = LoadReport::new();
192
193 info!(claude_home = %self.claude_home.display(), "Starting initial data load");
194
195 self.load_stats(&mut report).await;
197
198 self.load_settings(&mut report).await;
200
201 self.load_mcp_config(&mut report).await;
203
204 self.load_rules(&mut report).await;
206
207 self.scan_sessions(&mut report).await;
209
210 self.update_degraded_state(&report);
212
213 self.event_bus.publish(DataEvent::LoadCompleted);
215
216 info!(
217 stats_loaded = report.stats_loaded,
218 settings_loaded = report.settings_loaded,
219 sessions_scanned = report.sessions_scanned,
220 sessions_failed = report.sessions_failed,
221 errors = report.errors.len(),
222 "Initial load complete"
223 );
224
225 report
226 }
227
228 async fn load_stats(&self, report: &mut LoadReport) {
230 let stats_path = self.claude_home.join("stats-cache.json");
231 let parser = StatsParser::new()
232 .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
233
234 if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
235 stats.recalculate_costs();
237 let mut guard = self.stats.write();
238 *guard = Some(stats);
239 debug!("Stats loaded successfully with recalculated costs");
240 }
241 }
242
243 async fn load_settings(&self, report: &mut LoadReport) {
245 let parser = SettingsParser::new();
246 let merged = parser
247 .load_merged(&self.claude_home, self.project_path.as_deref(), report)
248 .await;
249
250 let mut guard = self.settings.write();
251 *guard = merged;
252 debug!("Settings loaded and merged");
253 }
254
255 async fn load_mcp_config(&self, report: &mut LoadReport) {
257 match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
258 Ok(Some(config)) => {
259 let server_count = config.servers.len();
260 let mut guard = self.mcp_config.write();
261 *guard = Some(config);
262 debug!(
263 server_count,
264 "MCP config loaded successfully (global + project)"
265 );
266 }
267 Ok(None) => {
268 debug!("No MCP config found (optional)");
269 }
270 Err(e) => {
271 use crate::error::LoadError;
272 report.add_error(LoadError::error(
273 "mcp_config",
274 format!("Failed to parse MCP config: {}", e),
275 ));
276 }
277 }
278 }
279
280 async fn load_rules(&self, report: &mut LoadReport) {
282 match Rules::load(&self.claude_home, self.project_path.as_deref()) {
283 Ok(rules) => {
284 let has_global = rules.global.is_some();
285 let has_project = rules.project.is_some();
286 let mut guard = self.rules.write();
287 *guard = rules;
288 debug!(has_global, has_project, "Rules loaded");
289 }
290 Err(e) => {
291 use crate::error::LoadError;
292 report.add_error(LoadError::error(
293 "rules",
294 format!("Failed to load rules: {}", e),
295 ));
296 }
297 }
298 }
299
300 async fn scan_sessions(&self, report: &mut LoadReport) {
302 let projects_dir = self.claude_home.join("projects");
303
304 if !projects_dir.exists() {
305 report.add_warning(
306 "sessions",
307 format!("Projects directory not found: {}", projects_dir.display()),
308 );
309 return;
310 }
311
312 let mut parser =
313 SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
314
315 if let Some(ref cache) = self.metadata_cache {
317 parser = parser.with_cache(cache.clone());
318 }
319
320 let sessions = parser.scan_all(&projects_dir, report).await;
321
322 let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
324 warn!(
325 total = sessions.len(),
326 limit = self.config.max_session_metadata_count,
327 "Session count exceeds limit, keeping most recent"
328 );
329
330 let mut sorted = sessions;
331 sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
332 sorted.truncate(self.config.max_session_metadata_count);
333 sorted
334 } else {
335 sessions
336 };
337
338 for session in sessions_to_add {
340 self.sessions.insert(session.id.clone(), Arc::new(session));
341 }
342
343 debug!(count = self.sessions.len(), "Sessions indexed");
344 }
345
346 fn update_degraded_state(&self, report: &LoadReport) {
348 let mut state = self.degraded_state.write();
349
350 if report.has_fatal_errors() {
351 *state = DegradedState::ReadOnly {
352 reason: "Fatal errors during load".to_string(),
353 };
354 return;
355 }
356
357 let mut missing = Vec::new();
358
359 if !report.stats_loaded {
360 missing.push("stats".to_string());
361 }
362 if !report.settings_loaded {
363 missing.push("settings".to_string());
364 }
365 if report.sessions_failed > 0 {
366 missing.push(format!("{} sessions", report.sessions_failed));
367 }
368
369 if missing.is_empty() {
370 *state = DegradedState::Healthy;
371 } else {
372 *state = DegradedState::PartialData {
373 missing: missing.clone(),
374 reason: format!("Missing: {}", missing.join(", ")),
375 };
376 }
377 }
378
379 pub fn stats(&self) -> Option<StatsCache> {
385 self.stats.read().clone()
386 }
387
388 pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
390 let sessions: Vec<_> = self
392 .sessions
393 .iter()
394 .map(|entry| Arc::clone(entry.value()))
395 .collect();
396 let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
398 crate::models::StatsCache::calculate_context_saturation(&refs, 30)
399 }
400
401 pub fn settings(&self) -> MergedConfig {
403 self.settings.read().clone()
404 }
405
406 pub fn mcp_config(&self) -> Option<McpConfig> {
408 self.mcp_config.read().clone()
409 }
410
411 pub fn rules(&self) -> Rules {
413 self.rules.read().clone()
414 }
415
416 pub fn invocation_stats(&self) -> InvocationStats {
418 self.invocation_stats.read().clone()
419 }
420
421 pub fn quota_status(&self) -> Option<crate::quota::QuotaStatus> {
425 let stats = self.stats.read().clone()?;
426 let settings = self.settings.read();
427 let budget = settings.merged.budget.as_ref()?;
428
429 Some(crate::quota::calculate_quota_status(&stats, budget))
430 }
431
432 pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
437 crate::live_monitor::detect_live_sessions().unwrap_or_default()
438 }
439
440 pub fn session_count(&self) -> usize {
442 self.sessions.len()
443 }
444
445 pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
448 self.sessions.get(id).map(|r| Arc::clone(r.value()))
449 }
450
451 pub async fn load_session_content(
464 &self,
465 session_id: &str,
466 ) -> Result<Vec<crate::models::ConversationMessage>, CoreError> {
467 let metadata = self
469 .get_session(session_id)
470 .ok_or_else(|| CoreError::SessionNotFound {
471 session_id: session_id.to_string(),
472 })?;
473
474 let session_id_owned = SessionId::from(session_id);
476 if let Some(_cached) = self.session_content_cache.get(&session_id_owned).await {
477 debug!(session_id, "Session content cache HIT");
478 }
481
482 debug!(
484 session_id,
485 path = %metadata.file_path.display(),
486 "Session content cache MISS, parsing JSONL"
487 );
488
489 let messages = SessionContentParser::parse_conversation(
490 &metadata.file_path,
491 (*metadata).clone(), )
493 .await?;
494
495 Ok(messages)
499 }
500
501 pub fn analytics(&self) -> Option<AnalyticsData> {
506 let analytics = self.analytics_cache.read().clone();
507 debug!(
508 has_analytics = analytics.is_some(),
509 "analytics() getter called"
510 );
511 analytics
512 }
513
514 pub async fn compute_analytics(&self, period: Period) {
522 let sessions: Vec<_> = self
523 .sessions
524 .iter()
525 .map(|r| Arc::clone(r.value()))
526 .collect();
527
528 info!(
529 session_count = sessions.len(),
530 period = ?period,
531 "compute_analytics() ENTRY"
532 );
533
534 let analytics =
536 tokio::task::spawn_blocking(move || AnalyticsData::compute(&sessions, period)).await;
537
538 match analytics {
539 Ok(data) => {
540 info!(
541 insights_count = data.insights.len(),
542 "compute_analytics() computed data"
543 );
544 let mut guard = self.analytics_cache.write();
545 *guard = Some(data);
546 self.event_bus.publish(DataEvent::AnalyticsUpdated);
547 info!("compute_analytics() EXIT - cached and event published");
548 }
549 Err(e) => {
550 warn!(error = %e, "Failed to compute analytics (task panicked)");
551 }
552 }
553 }
554
555 #[allow(dead_code)]
560 fn invalidate_analytics_cache(&self) {
561 let mut guard = self.analytics_cache.write();
562 *guard = None;
563 debug!("Analytics cache invalidated");
564 }
565
566 pub fn session_ids(&self) -> Vec<SessionId> {
568 self.sessions.iter().map(|r| r.key().clone()).collect()
569 }
570
571 pub fn clear_session_content_cache(&self) {
573 self.session_content_cache.invalidate_all();
574 debug!("Session content cache cleared");
575 }
576
577 pub fn sessions_by_project(
580 &self,
581 ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
582 let mut by_project = std::collections::HashMap::new();
583
584 for entry in self.sessions.iter() {
585 let session = Arc::clone(entry.value());
586 by_project
587 .entry(session.project_path.as_str().to_string())
588 .or_insert_with(Vec::new)
589 .push(session);
590 }
591
592 for sessions in by_project.values_mut() {
594 sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
595 }
596
597 by_project
598 }
599
600 pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
603 self.sessions
604 .iter()
605 .map(|r| Arc::clone(r.value()))
606 .collect()
607 }
608
609 pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
612 let mut sessions = self.all_sessions();
613 sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
614 sessions.truncate(limit);
615 sessions
616 }
617
618 pub fn search_sessions(&self, query: &str, limit: usize) -> Vec<crate::cache::SearchResult> {
622 if let Some(ref cache) = self.metadata_cache {
623 match cache.search_sessions(query, limit) {
624 Ok(results) => results,
625 Err(e) => {
626 warn!("FTS5 search failed: {}", e);
627 Vec::new()
628 }
629 }
630 } else {
631 Vec::new()
632 }
633 }
634
635 pub async fn analyze_session(&self, session_id: &str) -> anyhow::Result<ActivitySummary> {
640 use std::time::SystemTime;
641
642 let metadata = self
643 .get_session(session_id)
644 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
645
646 let path = &metadata.file_path;
647
648 let mtime = tokio::fs::metadata(path)
651 .await
652 .and_then(|m| m.modified())
653 .unwrap_or(SystemTime::UNIX_EPOCH);
654
655 if let Some(cache) = &self.metadata_cache {
657 if let Ok(Some(cached)) = cache.get_activity(path, mtime) {
658 self.activity_results
659 .insert(session_id.to_string(), cached.clone());
660 self.event_bus.publish(DataEvent::AnalyticsUpdated);
661 return Ok(cached);
662 }
663 }
664
665 let calls = parse_tool_calls(path, session_id).await?;
667
668 let project_root = path
669 .parent()
670 .and_then(|p| p.parent())
671 .map(|p| p.to_string_lossy().into_owned());
672
673 let summary = classify_tool_calls(calls, session_id, project_root.as_deref());
674
675 if let Some(cache) = &self.metadata_cache {
677 if let Err(e) = cache.put_activity(path, session_id, &summary, mtime) {
678 warn!(session_id, error = %e, "Failed to cache activity — will re-parse on restart");
679 }
680 }
681
682 self.activity_results
684 .insert(session_id.to_string(), summary.clone());
685 self.event_bus.publish(DataEvent::AnalyticsUpdated);
686
687 Ok(summary)
688 }
689
690 pub fn get_session_activity(&self, session_id: &str) -> Option<ActivitySummary> {
692 self.activity_results
693 .get(session_id)
694 .map(|r| r.value().clone())
695 }
696
697 pub fn get_all_stored_alerts(&self, min_severity: Option<&str>) -> Vec<StoredAlert> {
701 if let Some(cache) = &self.metadata_cache {
702 cache.get_all_alerts(min_severity).unwrap_or_default()
703 } else {
704 vec![]
705 }
706 }
707
708 pub fn all_violations(&self) -> Vec<crate::models::activity::Alert> {
713 use crate::models::activity::{Alert, AlertSeverity};
714 use std::collections::HashSet;
715
716 let mut seen_sessions: HashSet<String> = HashSet::new();
718 let mut alerts: Vec<Alert> = Vec::new();
719
720 for entry in self.activity_results.iter() {
721 seen_sessions.insert(entry.key().clone());
722 alerts.extend(entry.value().alerts.clone());
723 }
724
725 if let Some(cache) = &self.metadata_cache {
727 if let Ok(stored) = cache.get_all_alerts(None) {
728 for sa in stored {
729 let session_id = std::path::Path::new(&sa.session_path)
731 .file_stem()
732 .and_then(|s| s.to_str())
733 .unwrap_or(&sa.session_path)
734 .to_string();
735
736 if seen_sessions.contains(&session_id) {
737 continue; }
739
740 let severity = match sa.severity.as_str() {
742 "Critical" => AlertSeverity::Critical,
743 "Warning" => AlertSeverity::Warning,
744 _ => AlertSeverity::Info,
745 };
746 let category = match sa.category.as_str() {
747 "CredentialAccess" => {
748 crate::models::activity::AlertCategory::CredentialAccess
749 }
750 "DestructiveCommand" => {
751 crate::models::activity::AlertCategory::DestructiveCommand
752 }
753 "ForcePush" => crate::models::activity::AlertCategory::ForcePush,
754 "ScopeViolation" => crate::models::activity::AlertCategory::ScopeViolation,
755 _ => crate::models::activity::AlertCategory::ExternalExfil,
756 };
757 let timestamp = sa
758 .timestamp
759 .parse::<chrono::DateTime<chrono::Utc>>()
760 .unwrap_or_else(|_| chrono::Utc::now());
761
762 alerts.push(Alert {
763 session_id,
764 timestamp,
765 severity,
766 category,
767 detail: sa.detail,
768 });
769 }
770 }
771 }
772
773 alerts.sort_by(|a, b| {
775 b.severity
776 .partial_cmp(&a.severity)
777 .unwrap_or(std::cmp::Ordering::Equal)
778 .then_with(|| b.timestamp.cmp(&a.timestamp))
779 });
780
781 alerts
782 }
783
784 pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
787 let mut sessions: Vec<_> = self
788 .sessions
789 .iter()
790 .map(|r| Arc::clone(r.value()))
791 .collect();
792 sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
793 sessions.truncate(limit);
794 sessions
795 }
796
797 pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
800 let mut model_totals = std::collections::HashMap::new();
801
802 for session in self.sessions.iter() {
804 for model in &session.value().models_used {
805 *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
806 }
807 }
808
809 let mut results: Vec<_> = model_totals.into_iter().collect();
811 results.sort_by(|a, b| b.1.cmp(&a.1));
812 results.truncate(10); results
814 }
815
816 pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
819 let mut day_totals = std::collections::HashMap::new();
820
821 for session in self.sessions.iter() {
823 if let Some(timestamp) = &session.value().first_timestamp {
824 let date = timestamp.format("%Y-%m-%d").to_string();
825 *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
826 }
827 }
828
829 let mut results: Vec<_> = day_totals.into_iter().collect();
831 results.sort_by(|a, b| b.1.cmp(&a.1));
832 results.truncate(10); results
834 }
835
836 pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
841 let mut project_metrics = std::collections::HashMap::new();
842
843 for session in self.sessions.iter() {
845 let metadata = session.value();
846 let project_path = &metadata.project_path;
847
848 let model = metadata
850 .models_used
851 .first()
852 .map(|s| s.as_str())
853 .unwrap_or("unknown");
854
855 let cost = crate::pricing::calculate_cost(
857 model,
858 metadata.input_tokens,
859 metadata.output_tokens,
860 metadata.cache_creation_tokens,
861 metadata.cache_read_tokens,
862 );
863
864 let entry = project_metrics
865 .entry(project_path.clone())
866 .or_insert((0, 0u64, 0.0f64)); entry.0 += 1; entry.1 += metadata.total_tokens; entry.2 += cost; }
872
873 let mut results: Vec<_> = project_metrics
875 .into_iter()
876 .map(
877 |(project_path, (session_count, total_tokens, total_cost))| {
878 let avg_session_cost = if session_count > 0 {
879 total_cost / session_count as f64
880 } else {
881 0.0
882 };
883
884 let project_name = std::path::Path::new(project_path.as_str())
886 .file_name()
887 .and_then(|n| n.to_str())
888 .unwrap_or(project_path.as_str())
889 .to_string();
890
891 ProjectLeaderboardEntry {
892 project_name,
893 total_sessions: session_count,
894 total_tokens,
895 total_cost,
896 avg_session_cost,
897 }
898 },
899 )
900 .collect();
901
902 results.sort_by(|a, b| {
904 b.total_cost
905 .partial_cmp(&a.total_cost)
906 .unwrap_or(std::cmp::Ordering::Equal)
907 });
908
909 results
910 }
911
912 pub async fn reload_stats(&self) {
918 let stats_path = self.claude_home.join("stats-cache.json");
919 let parser = StatsParser::new()
920 .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
921
922 let mut report = LoadReport::new();
923 if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
924 stats.recalculate_costs();
926 let mut guard = self.stats.write();
927 *guard = Some(stats);
928
929 self.event_bus.publish(DataEvent::StatsUpdated);
932 debug!("Stats reloaded with recalculated costs");
933 }
934 }
935
936 pub async fn reload_settings(&self) {
938 let parser = SettingsParser::new();
939 let merged = parser
940 .load_merged(
941 &self.claude_home,
942 self.project_path.as_deref(),
943 &mut LoadReport::new(),
944 )
945 .await;
946
947 {
948 let mut guard = self.settings.write();
949 *guard = merged;
950 }
951
952 self.event_bus
953 .publish(DataEvent::ConfigChanged(ConfigScope::Global));
954 debug!("Settings reloaded");
955 }
956
957 pub async fn update_session(&self, path: &Path) {
959 let parser = SessionIndexParser::new();
960
961 match parser.scan_session(path).await {
962 Ok(meta) => {
963 let id = meta.id.clone();
964 let is_new = !self.sessions.contains_key(&id);
965
966 self.sessions.insert(id.clone(), Arc::new(meta));
967
968 if is_new {
973 self.event_bus.publish(DataEvent::SessionCreated(id));
974 } else {
975 self.event_bus.publish(DataEvent::SessionUpdated(id));
976 }
977 }
978 Err(e) => {
979 warn!(path = %path.display(), error = %e, "Failed to update session");
980 }
981 }
982 }
983
984 pub async fn compute_invocations(&self) {
989 let paths: Vec<_> = self
990 .sessions
991 .iter()
992 .map(|r| r.value().file_path.clone())
993 .collect();
994
995 debug!(session_count = paths.len(), "Computing invocation stats");
996
997 let parser = InvocationParser::new();
998 let stats = parser.scan_sessions(&paths).await;
999
1000 let mut guard = self.invocation_stats.write();
1001 *guard = stats;
1002
1003 debug!(
1004 agents = guard.agents.len(),
1005 commands = guard.commands.len(),
1006 skills = guard.skills.len(),
1007 total = guard.total_invocations(),
1008 "Invocation stats computed"
1009 );
1010
1011 self.event_bus.publish(DataEvent::LoadCompleted);
1013 }
1014
1015 pub async fn compute_billing_blocks(&self) {
1020 debug!("Computing billing blocks from sessions with real pricing");
1021
1022 let mut manager = BillingBlockManager::new();
1023 let mut sessions_with_timestamps = 0;
1024 let mut sessions_without_timestamps = 0;
1025
1026 for session in self.sessions.iter() {
1027 let metadata = session.value();
1028
1029 let Some(timestamp) = &metadata.first_timestamp else {
1031 sessions_without_timestamps += 1;
1032 continue;
1033 };
1034
1035 sessions_with_timestamps += 1;
1036
1037 let model = metadata
1039 .models_used
1040 .first()
1041 .map(|s| s.as_str())
1042 .unwrap_or("unknown");
1043
1044 let cost = crate::pricing::calculate_cost(
1046 model,
1047 metadata.input_tokens,
1048 metadata.output_tokens,
1049 metadata.cache_creation_tokens,
1050 metadata.cache_read_tokens,
1051 );
1052
1053 manager.add_usage(
1054 timestamp,
1055 metadata.input_tokens,
1056 metadata.output_tokens,
1057 metadata.cache_creation_tokens,
1058 metadata.cache_read_tokens,
1059 cost,
1060 );
1061 }
1062
1063 debug!(
1064 sessions_with_timestamps,
1065 sessions_without_timestamps,
1066 blocks = manager.get_all_blocks().len(),
1067 "Billing blocks computed with real pricing"
1068 );
1069
1070 let mut guard = self.billing_blocks.write();
1071 *guard = manager;
1072
1073 self.event_bus.publish(DataEvent::LoadCompleted);
1074 }
1075
1076 pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
1078 self.billing_blocks.read()
1079 }
1080
1081 pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
1083 let settings = self.settings();
1084 let plan = settings
1085 .merged
1086 .subscription_plan
1087 .as_ref()
1088 .map(|s| crate::usage_estimator::SubscriptionPlan::parse(s))
1089 .unwrap_or_default();
1090
1091 let billing_blocks = self.billing_blocks.read();
1092 crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
1093 }
1094
1095 pub fn load_preferences(&self) -> crate::preferences::CcboardPreferences {
1097 let cache_dir = self.claude_home.join("cache");
1098 crate::preferences::CcboardPreferences::load(&cache_dir)
1099 }
1100
1101 pub fn save_preferences(
1103 &self,
1104 prefs: &crate::preferences::CcboardPreferences,
1105 ) -> anyhow::Result<()> {
1106 let cache_dir = self.claude_home.join("cache");
1107 prefs.save(&cache_dir)
1108 }
1109}
1110
1111#[cfg(test)]
1112mod tests {
1113 use super::*;
1114 use tempfile::tempdir;
1115
1116 #[tokio::test]
1117 async fn test_data_store_creation() {
1118 let dir = tempdir().unwrap();
1119 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1120
1121 assert_eq!(store.session_count(), 0);
1122 assert!(store.stats().is_none());
1123 assert!(store.degraded_state().is_healthy());
1124 }
1125
1126 #[tokio::test]
1127 async fn test_initial_load_missing_dir() {
1128 let dir = tempdir().unwrap();
1129 let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
1130
1131 let report = store.initial_load().await;
1132
1133 assert!(report.has_errors());
1135 assert!(store.degraded_state().is_degraded());
1136 }
1137
1138 #[tokio::test]
1139 async fn test_initial_load_with_stats() {
1140 let dir = tempdir().unwrap();
1141 let claude_home = dir.path();
1142
1143 std::fs::write(
1145 claude_home.join("stats-cache.json"),
1146 r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
1147 )
1148 .unwrap();
1149
1150 std::fs::create_dir_all(claude_home.join("projects")).unwrap();
1152
1153 let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
1154 let report = store.initial_load().await;
1155
1156 assert!(report.stats_loaded);
1157 let stats = store.stats().unwrap();
1158 assert_eq!(stats.total_tokens(), 1000);
1159 assert_eq!(stats.session_count(), 5);
1160 }
1161
1162 #[tokio::test]
1163 async fn test_event_bus_subscription() {
1164 let dir = tempdir().unwrap();
1165 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1166
1167 let mut rx = store.event_bus().subscribe();
1168
1169 store.event_bus().publish(DataEvent::StatsUpdated);
1171
1172 let event = rx.recv().await.unwrap();
1173 assert!(matches!(event, DataEvent::StatsUpdated));
1174 }
1175
1176 #[tokio::test]
1177 async fn test_analytics_cache_and_invalidation() {
1178 use crate::models::session::SessionMetadata;
1179 use chrono::Utc;
1180
1181 let dir = tempdir().unwrap();
1182 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1183
1184 let now = Utc::now();
1186 for i in 0..10 {
1187 let total_tokens = 1000 * (i as u64 + 1);
1188 let session = SessionMetadata {
1189 id: format!("test-{}", i).into(),
1190 file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
1191 project_path: "/test".into(),
1192 first_timestamp: Some(now - chrono::Duration::days(i)),
1193 last_timestamp: Some(now),
1194 message_count: 10,
1195 total_tokens,
1196 input_tokens: total_tokens / 2,
1197 output_tokens: total_tokens / 3,
1198 cache_creation_tokens: total_tokens / 10,
1199 cache_read_tokens: total_tokens
1200 - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
1201 models_used: vec!["sonnet".to_string()],
1202 file_size_bytes: 1024,
1203 first_user_message: None,
1204 has_subagents: false,
1205 duration_seconds: Some(1800),
1206 branch: None,
1207 tool_usage: std::collections::HashMap::new(),
1208 };
1209 store.sessions.insert(session.id.clone(), Arc::new(session));
1210 }
1211
1212 assert!(store.analytics().is_none());
1214
1215 store.compute_analytics(Period::last_7d()).await;
1217
1218 let analytics1 = store.analytics().expect("Analytics should be cached");
1220 assert!(!analytics1.trends.is_empty());
1221 assert_eq!(analytics1.period, Period::last_7d());
1222
1223 store.invalidate_analytics_cache();
1225 assert!(store.analytics().is_none(), "Cache should be invalidated");
1226
1227 store.compute_analytics(Period::last_30d()).await;
1229 let analytics2 = store.analytics().expect("Analytics should be re-cached");
1230 assert_eq!(analytics2.period, Period::last_30d());
1231 }
1232
1233 #[tokio::test]
1234 async fn test_leaderboard_methods() {
1235 use crate::models::session::SessionMetadata;
1236 use chrono::Utc;
1237
1238 let dir = tempdir().unwrap();
1239 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
1240
1241 let now = Utc::now();
1242
1243 let test_data = vec![
1245 ("session-1", 5000u64, "opus", 0),
1246 ("session-2", 3000u64, "sonnet", 1),
1247 ("session-3", 8000u64, "haiku", 0),
1248 ("session-4", 2000u64, "sonnet", 2),
1249 ("session-5", 10000u64, "opus", 0),
1250 ];
1251
1252 for (id, tokens, model, days_ago) in test_data {
1253 let session = SessionMetadata {
1254 id: id.into(),
1255 file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1256 project_path: "/test".into(),
1257 first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1258 last_timestamp: Some(now),
1259 message_count: 10,
1260 total_tokens: tokens,
1261 input_tokens: tokens / 2,
1262 output_tokens: tokens / 2,
1263 cache_creation_tokens: 0,
1264 cache_read_tokens: 0,
1265 models_used: vec![model.to_string()],
1266 file_size_bytes: 1024,
1267 first_user_message: None,
1268 has_subagents: false,
1269 duration_seconds: Some(1800),
1270 branch: None,
1271 tool_usage: std::collections::HashMap::new(),
1272 };
1273 store.sessions.insert(session.id.clone(), Arc::new(session));
1274 }
1275
1276 let top_sessions = store.top_sessions_by_tokens(3);
1278 assert_eq!(top_sessions.len(), 3);
1279 assert_eq!(top_sessions[0].id, "session-5"); assert_eq!(top_sessions[1].id, "session-3"); assert_eq!(top_sessions[2].id, "session-1"); let top_models = store.top_models_by_tokens();
1285 assert!(!top_models.is_empty());
1286 assert_eq!(top_models[0].0, "opus");
1288 assert_eq!(top_models[0].1, 15000);
1289 assert_eq!(top_models[1].0, "haiku");
1290 assert_eq!(top_models[1].1, 8000);
1291
1292 let top_days = store.top_days_by_tokens();
1294 assert!(!top_days.is_empty());
1295 let today = now.format("%Y-%m-%d").to_string();
1297 assert_eq!(top_days[0].0, today);
1298 assert_eq!(top_days[0].1, 23000);
1299 }
1300
1301 #[test]
1307 fn test_all_violations_dashmap_priority_over_sqlite() {
1308 use crate::models::activity::{ActivitySummary, Alert, AlertCategory, AlertSeverity};
1309 use chrono::Utc;
1310
1311 let dir = tempdir().unwrap();
1312 let claude_home = dir.path().to_path_buf();
1313 let store = DataStore::with_defaults(claude_home.clone(), None);
1314
1315 let now = Utc::now();
1316
1317 let cache = MetadataCache::new(&claude_home.join("cache"))
1319 .expect("MetadataCache should open in tempdir");
1320
1321 let sqlite_summary = ActivitySummary {
1323 alerts: vec![Alert {
1324 session_id: "shared-session".to_string(),
1325 timestamp: now,
1326 severity: AlertSeverity::Warning,
1327 category: AlertCategory::DestructiveCommand,
1328 detail: "sqlite-version".to_string(),
1329 }],
1330 ..Default::default()
1331 };
1332 cache
1333 .put_activity(
1334 std::path::Path::new("/projects/test/shared-session.jsonl"),
1335 "shared-session",
1336 &sqlite_summary,
1337 std::time::SystemTime::now(),
1338 )
1339 .expect("put_activity should succeed");
1340
1341 let sqlite_only_summary = ActivitySummary {
1343 alerts: vec![Alert {
1344 session_id: "sqlite-only-session".to_string(),
1345 timestamp: now,
1346 severity: AlertSeverity::Info,
1347 category: AlertCategory::ExternalExfil,
1348 detail: "sqlite-only-detail".to_string(),
1349 }],
1350 ..Default::default()
1351 };
1352 cache
1353 .put_activity(
1354 std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1355 "sqlite-only-session",
1356 &sqlite_only_summary,
1357 std::time::SystemTime::now(),
1358 )
1359 .expect("put_activity should succeed");
1360
1361 let store_cache = store
1368 .metadata_cache
1369 .as_ref()
1370 .expect("MetadataCache should be present in store");
1371
1372 store_cache
1373 .put_activity(
1374 std::path::Path::new("/projects/test/shared-session.jsonl"),
1375 "shared-session",
1376 &sqlite_summary,
1377 std::time::SystemTime::now(),
1378 )
1379 .expect("put_activity via store cache should succeed");
1380 store_cache
1381 .put_activity(
1382 std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
1383 "sqlite-only-session",
1384 &sqlite_only_summary,
1385 std::time::SystemTime::now(),
1386 )
1387 .expect("put_activity via store cache should succeed");
1388
1389 let dashmap_summary = ActivitySummary {
1391 alerts: vec![Alert {
1392 session_id: "shared-session".to_string(),
1393 timestamp: now,
1394 severity: AlertSeverity::Critical,
1395 category: AlertCategory::ForcePush,
1396 detail: "dashmap-version".to_string(),
1397 }],
1398 ..Default::default()
1399 };
1400 store
1401 .activity_results
1402 .insert("shared-session".to_string(), dashmap_summary);
1403
1404 let violations = store.all_violations();
1406
1407 let dashmap_hit = violations.iter().find(|a| a.session_id == "shared-session");
1409 assert!(
1410 dashmap_hit.is_some(),
1411 "shared-session alert must appear in violations"
1412 );
1413 assert_eq!(
1414 dashmap_hit.unwrap().detail,
1415 "dashmap-version",
1416 "DashMap version must take priority over SQLite for shared session"
1417 );
1418 assert_eq!(
1419 dashmap_hit.unwrap().severity,
1420 AlertSeverity::Critical,
1421 "DashMap severity (Critical) must win over SQLite (Warning)"
1422 );
1423
1424 let sqlite_hit = violations
1426 .iter()
1427 .find(|a| a.session_id == "sqlite-only-session");
1428 assert!(
1429 sqlite_hit.is_some(),
1430 "sqlite-only-session must appear in violations (no DashMap entry for it)"
1431 );
1432 assert_eq!(sqlite_hit.unwrap().detail, "sqlite-only-detail");
1433
1434 let sqlite_dup = violations
1436 .iter()
1437 .filter(|a| a.session_id == "shared-session")
1438 .count();
1439 assert_eq!(
1440 sqlite_dup, 1,
1441 "shared-session must appear exactly once (DashMap wins, no SQLite duplicate)"
1442 );
1443
1444 assert_eq!(violations[0].severity, AlertSeverity::Critical);
1446 }
1447}