1use crate::analytics::{AnalyticsData, Period};
7use crate::cache::MetadataCache;
8use crate::error::{DegradedState, LoadReport};
9use crate::event::{ConfigScope, DataEvent, EventBus};
10use crate::models::{
11 BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
12};
13use crate::parsers::{
14 InvocationParser, McpConfig, Rules, SessionIndexParser, SettingsParser, StatsParser,
15};
16use dashmap::DashMap;
17use moka::future::Cache;
18use parking_lot::RwLock; use std::path::{Path, PathBuf};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24#[derive(Debug, Clone)]
26pub struct DataStoreConfig {
27 pub max_session_metadata_count: usize,
29
30 pub max_session_content_cache_mb: usize,
32
33 pub max_concurrent_scans: usize,
35
36 pub stats_retry_count: u32,
38
39 pub stats_retry_delay: Duration,
41}
42
43impl Default for DataStoreConfig {
44 fn default() -> Self {
45 Self {
46 max_session_metadata_count: 10_000,
47 max_session_content_cache_mb: 100,
48 max_concurrent_scans: 8,
49 stats_retry_count: 3,
50 stats_retry_delay: Duration::from_millis(100),
51 }
52 }
53}
54
55pub struct DataStore {
60 claude_home: PathBuf,
62
63 project_path: Option<PathBuf>,
65
66 config: DataStoreConfig,
68
69 stats: RwLock<Option<StatsCache>>,
71
72 settings: RwLock<MergedConfig>,
74
75 mcp_config: RwLock<Option<McpConfig>>,
77
78 rules: RwLock<Rules>,
80
81 invocation_stats: RwLock<InvocationStats>,
83
84 billing_blocks: RwLock<BillingBlockManager>,
86
87 analytics_cache: RwLock<Option<AnalyticsData>>,
89
90 sessions: DashMap<SessionId, Arc<SessionMetadata>>,
97
98 #[allow(dead_code)]
100 session_content_cache: Cache<SessionId, Vec<String>>,
101
102 event_bus: EventBus,
104
105 degraded_state: RwLock<DegradedState>,
107
108 metadata_cache: Option<Arc<MetadataCache>>,
110}
111
112#[derive(Debug, Clone)]
114pub struct ProjectLeaderboardEntry {
115 pub project_name: String,
116 pub total_sessions: usize,
117 pub total_tokens: u64,
118 pub total_cost: f64,
119 pub avg_session_cost: f64,
120}
121
122impl DataStore {
123 pub fn new(
125 claude_home: PathBuf,
126 project_path: Option<PathBuf>,
127 config: DataStoreConfig,
128 ) -> Self {
129 let session_content_cache = Cache::builder()
130 .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) .time_to_idle(Duration::from_secs(300)) .build();
133
134 let metadata_cache = {
136 let cache_dir = claude_home.join("cache");
137 match MetadataCache::new(&cache_dir) {
138 Ok(cache) => {
139 debug!(path = %cache_dir.display(), "Metadata cache enabled");
140 Some(Arc::new(cache))
141 }
142 Err(e) => {
143 warn!(error = %e, "Failed to create metadata cache, running without cache");
144 None
145 }
146 }
147 };
148
149 Self {
150 claude_home,
151 project_path,
152 config,
153 stats: RwLock::new(None),
154 settings: RwLock::new(MergedConfig::default()),
155 mcp_config: RwLock::new(None),
156 rules: RwLock::new(Rules::default()),
157 invocation_stats: RwLock::new(InvocationStats::new()),
158 billing_blocks: RwLock::new(BillingBlockManager::new()),
159 analytics_cache: RwLock::new(None),
160 sessions: DashMap::new(),
161 session_content_cache,
162 event_bus: EventBus::default_capacity(),
163 degraded_state: RwLock::new(DegradedState::Healthy),
164 metadata_cache,
165 }
166 }
167
168 pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
170 Self::new(claude_home, project_path, DataStoreConfig::default())
171 }
172
173 pub fn event_bus(&self) -> &EventBus {
175 &self.event_bus
176 }
177
178 pub fn degraded_state(&self) -> DegradedState {
180 self.degraded_state.read().clone()
181 }
182
183 pub async fn initial_load(&self) -> LoadReport {
185 let mut report = LoadReport::new();
186
187 info!(claude_home = %self.claude_home.display(), "Starting initial data load");
188
189 self.load_stats(&mut report).await;
191
192 self.load_settings(&mut report).await;
194
195 self.load_mcp_config(&mut report).await;
197
198 self.load_rules(&mut report).await;
200
201 self.scan_sessions(&mut report).await;
203
204 self.update_degraded_state(&report);
206
207 self.event_bus.publish(DataEvent::LoadCompleted);
209
210 info!(
211 stats_loaded = report.stats_loaded,
212 settings_loaded = report.settings_loaded,
213 sessions_scanned = report.sessions_scanned,
214 sessions_failed = report.sessions_failed,
215 errors = report.errors.len(),
216 "Initial load complete"
217 );
218
219 report
220 }
221
222 async fn load_stats(&self, report: &mut LoadReport) {
224 let stats_path = self.claude_home.join("stats-cache.json");
225 let parser = StatsParser::new()
226 .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
227
228 if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
229 stats.recalculate_costs();
231 let mut guard = self.stats.write();
232 *guard = Some(stats);
233 debug!("Stats loaded successfully with recalculated costs");
234 }
235 }
236
237 async fn load_settings(&self, report: &mut LoadReport) {
239 let parser = SettingsParser::new();
240 let merged = parser
241 .load_merged(&self.claude_home, self.project_path.as_deref(), report)
242 .await;
243
244 let mut guard = self.settings.write();
245 *guard = merged;
246 debug!("Settings loaded and merged");
247 }
248
249 async fn load_mcp_config(&self, report: &mut LoadReport) {
251 match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
252 Ok(Some(config)) => {
253 let server_count = config.servers.len();
254 let mut guard = self.mcp_config.write();
255 *guard = Some(config);
256 debug!(
257 server_count,
258 "MCP config loaded successfully (global + project)"
259 );
260 }
261 Ok(None) => {
262 debug!("No MCP config found (optional)");
263 }
264 Err(e) => {
265 use crate::error::LoadError;
266 report.add_error(LoadError::error(
267 "mcp_config",
268 format!("Failed to parse MCP config: {}", e),
269 ));
270 }
271 }
272 }
273
274 async fn load_rules(&self, report: &mut LoadReport) {
276 match Rules::load(&self.claude_home, self.project_path.as_deref()) {
277 Ok(rules) => {
278 let has_global = rules.global.is_some();
279 let has_project = rules.project.is_some();
280 let mut guard = self.rules.write();
281 *guard = rules;
282 debug!(has_global, has_project, "Rules loaded");
283 }
284 Err(e) => {
285 use crate::error::LoadError;
286 report.add_error(LoadError::error(
287 "rules",
288 format!("Failed to load rules: {}", e),
289 ));
290 }
291 }
292 }
293
294 async fn scan_sessions(&self, report: &mut LoadReport) {
296 let projects_dir = self.claude_home.join("projects");
297
298 if !projects_dir.exists() {
299 report.add_warning(
300 "sessions",
301 format!("Projects directory not found: {}", projects_dir.display()),
302 );
303 return;
304 }
305
306 let mut parser =
307 SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);
308
309 if let Some(ref cache) = self.metadata_cache {
311 parser = parser.with_cache(cache.clone());
312 }
313
314 let sessions = parser.scan_all(&projects_dir, report).await;
315
316 let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
318 warn!(
319 total = sessions.len(),
320 limit = self.config.max_session_metadata_count,
321 "Session count exceeds limit, keeping most recent"
322 );
323
324 let mut sorted = sessions;
325 sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
326 sorted.truncate(self.config.max_session_metadata_count);
327 sorted
328 } else {
329 sessions
330 };
331
332 for session in sessions_to_add {
334 self.sessions.insert(session.id.clone(), Arc::new(session));
335 }
336
337 debug!(count = self.sessions.len(), "Sessions indexed");
338 }
339
340 fn update_degraded_state(&self, report: &LoadReport) {
342 let mut state = self.degraded_state.write();
343
344 if report.has_fatal_errors() {
345 *state = DegradedState::ReadOnly {
346 reason: "Fatal errors during load".to_string(),
347 };
348 return;
349 }
350
351 let mut missing = Vec::new();
352
353 if !report.stats_loaded {
354 missing.push("stats".to_string());
355 }
356 if !report.settings_loaded {
357 missing.push("settings".to_string());
358 }
359 if report.sessions_failed > 0 {
360 missing.push(format!("{} sessions", report.sessions_failed));
361 }
362
363 if missing.is_empty() {
364 *state = DegradedState::Healthy;
365 } else {
366 *state = DegradedState::PartialData {
367 missing: missing.clone(),
368 reason: format!("Missing: {}", missing.join(", ")),
369 };
370 }
371 }
372
373 pub fn stats(&self) -> Option<StatsCache> {
379 self.stats.read().clone()
380 }
381
382 pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
384 let sessions: Vec<_> = self
386 .sessions
387 .iter()
388 .map(|entry| Arc::clone(entry.value()))
389 .collect();
390 let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
392 crate::models::StatsCache::calculate_context_saturation(&refs, 30)
393 }
394
395 pub fn settings(&self) -> MergedConfig {
397 self.settings.read().clone()
398 }
399
400 pub fn mcp_config(&self) -> Option<McpConfig> {
402 self.mcp_config.read().clone()
403 }
404
405 pub fn rules(&self) -> Rules {
407 self.rules.read().clone()
408 }
409
410 pub fn invocation_stats(&self) -> InvocationStats {
412 self.invocation_stats.read().clone()
413 }
414
415 pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
420 crate::live_monitor::detect_live_sessions().unwrap_or_default()
421 }
422
423 pub fn session_count(&self) -> usize {
425 self.sessions.len()
426 }
427
428 pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
431 self.sessions.get(id).map(|r| Arc::clone(r.value()))
432 }
433
434 pub fn analytics(&self) -> Option<AnalyticsData> {
439 let analytics = self.analytics_cache.read().clone();
440 debug!(
441 has_analytics = analytics.is_some(),
442 "analytics() getter called"
443 );
444 analytics
445 }
446
447 pub async fn compute_analytics(&self, period: Period) {
455 let sessions: Vec<_> = self
456 .sessions
457 .iter()
458 .map(|r| Arc::clone(r.value()))
459 .collect();
460
461 info!(
462 session_count = sessions.len(),
463 period = ?period,
464 "compute_analytics() ENTRY"
465 );
466
467 let analytics =
469 tokio::task::spawn_blocking(move || AnalyticsData::compute(&sessions, period)).await;
470
471 match analytics {
472 Ok(data) => {
473 info!(
474 insights_count = data.insights.len(),
475 "compute_analytics() computed data"
476 );
477 let mut guard = self.analytics_cache.write();
478 *guard = Some(data);
479 self.event_bus.publish(DataEvent::AnalyticsUpdated);
480 info!("compute_analytics() EXIT - cached and event published");
481 }
482 Err(e) => {
483 warn!(error = %e, "Failed to compute analytics (task panicked)");
484 }
485 }
486 }
487
488 #[allow(dead_code)]
493 fn invalidate_analytics_cache(&self) {
494 let mut guard = self.analytics_cache.write();
495 *guard = None;
496 debug!("Analytics cache invalidated");
497 }
498
499 pub fn session_ids(&self) -> Vec<SessionId> {
501 self.sessions.iter().map(|r| r.key().clone()).collect()
502 }
503
504 pub fn clear_session_content_cache(&self) {
506 self.session_content_cache.invalidate_all();
507 debug!("Session content cache cleared");
508 }
509
510 pub fn sessions_by_project(
513 &self,
514 ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
515 let mut by_project = std::collections::HashMap::new();
516
517 for entry in self.sessions.iter() {
518 let session = Arc::clone(entry.value());
519 by_project
520 .entry(session.project_path.as_str().to_string())
521 .or_insert_with(Vec::new)
522 .push(session);
523 }
524
525 for sessions in by_project.values_mut() {
527 sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
528 }
529
530 by_project
531 }
532
533 pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
536 self.sessions
537 .iter()
538 .map(|r| Arc::clone(r.value()))
539 .collect()
540 }
541
542 pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
545 let mut sessions = self.all_sessions();
546 sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
547 sessions.truncate(limit);
548 sessions
549 }
550
551 pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
554 let mut sessions: Vec<_> = self
555 .sessions
556 .iter()
557 .map(|r| Arc::clone(r.value()))
558 .collect();
559 sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
560 sessions.truncate(limit);
561 sessions
562 }
563
564 pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
567 let mut model_totals = std::collections::HashMap::new();
568
569 for session in self.sessions.iter() {
571 for model in &session.value().models_used {
572 *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
573 }
574 }
575
576 let mut results: Vec<_> = model_totals.into_iter().collect();
578 results.sort_by(|a, b| b.1.cmp(&a.1));
579 results.truncate(10); results
581 }
582
583 pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
586 let mut day_totals = std::collections::HashMap::new();
587
588 for session in self.sessions.iter() {
590 if let Some(timestamp) = &session.value().first_timestamp {
591 let date = timestamp.format("%Y-%m-%d").to_string();
592 *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
593 }
594 }
595
596 let mut results: Vec<_> = day_totals.into_iter().collect();
598 results.sort_by(|a, b| b.1.cmp(&a.1));
599 results.truncate(10); results
601 }
602
603 pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
608 let mut project_metrics = std::collections::HashMap::new();
609
610 for session in self.sessions.iter() {
612 let metadata = session.value();
613 let project_path = &metadata.project_path;
614
615 let model = metadata
617 .models_used
618 .first()
619 .map(|s| s.as_str())
620 .unwrap_or("unknown");
621
622 let cost = crate::pricing::calculate_cost(
624 model,
625 metadata.input_tokens,
626 metadata.output_tokens,
627 metadata.cache_creation_tokens,
628 metadata.cache_read_tokens,
629 );
630
631 let entry = project_metrics
632 .entry(project_path.clone())
633 .or_insert((0, 0u64, 0.0f64)); entry.0 += 1; entry.1 += metadata.total_tokens; entry.2 += cost; }
639
640 let mut results: Vec<_> = project_metrics
642 .into_iter()
643 .map(
644 |(project_path, (session_count, total_tokens, total_cost))| {
645 let avg_session_cost = if session_count > 0 {
646 total_cost / session_count as f64
647 } else {
648 0.0
649 };
650
651 let project_name = std::path::Path::new(project_path.as_str())
653 .file_name()
654 .and_then(|n| n.to_str())
655 .unwrap_or(project_path.as_str())
656 .to_string();
657
658 ProjectLeaderboardEntry {
659 project_name,
660 total_sessions: session_count,
661 total_tokens,
662 total_cost,
663 avg_session_cost,
664 }
665 },
666 )
667 .collect();
668
669 results.sort_by(|a, b| {
671 b.total_cost
672 .partial_cmp(&a.total_cost)
673 .unwrap_or(std::cmp::Ordering::Equal)
674 });
675
676 results
677 }
678
679 pub async fn reload_stats(&self) {
685 let stats_path = self.claude_home.join("stats-cache.json");
686 let parser = StatsParser::new()
687 .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);
688
689 let mut report = LoadReport::new();
690 if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
691 stats.recalculate_costs();
693 let mut guard = self.stats.write();
694 *guard = Some(stats);
695
696 self.event_bus.publish(DataEvent::StatsUpdated);
699 debug!("Stats reloaded with recalculated costs");
700 }
701 }
702
703 pub async fn reload_settings(&self) {
705 let parser = SettingsParser::new();
706 let merged = parser
707 .load_merged(
708 &self.claude_home,
709 self.project_path.as_deref(),
710 &mut LoadReport::new(),
711 )
712 .await;
713
714 {
715 let mut guard = self.settings.write();
716 *guard = merged;
717 }
718
719 self.event_bus
720 .publish(DataEvent::ConfigChanged(ConfigScope::Global));
721 debug!("Settings reloaded");
722 }
723
724 pub async fn update_session(&self, path: &Path) {
726 let parser = SessionIndexParser::new();
727
728 match parser.scan_session(path).await {
729 Ok(meta) => {
730 let id = meta.id.clone();
731 let is_new = !self.sessions.contains_key(&id);
732
733 self.sessions.insert(id.clone(), Arc::new(meta));
734
735 if is_new {
740 self.event_bus.publish(DataEvent::SessionCreated(id));
741 } else {
742 self.event_bus.publish(DataEvent::SessionUpdated(id));
743 }
744 }
745 Err(e) => {
746 warn!(path = %path.display(), error = %e, "Failed to update session");
747 }
748 }
749 }
750
751 pub async fn compute_invocations(&self) {
756 let paths: Vec<_> = self
757 .sessions
758 .iter()
759 .map(|r| r.value().file_path.clone())
760 .collect();
761
762 debug!(session_count = paths.len(), "Computing invocation stats");
763
764 let parser = InvocationParser::new();
765 let stats = parser.scan_sessions(&paths).await;
766
767 let mut guard = self.invocation_stats.write();
768 *guard = stats;
769
770 debug!(
771 agents = guard.agents.len(),
772 commands = guard.commands.len(),
773 skills = guard.skills.len(),
774 total = guard.total_invocations(),
775 "Invocation stats computed"
776 );
777
778 self.event_bus.publish(DataEvent::LoadCompleted);
780 }
781
782 pub async fn compute_billing_blocks(&self) {
787 debug!("Computing billing blocks from sessions with real pricing");
788
789 let mut manager = BillingBlockManager::new();
790 let mut sessions_with_timestamps = 0;
791 let mut sessions_without_timestamps = 0;
792
793 for session in self.sessions.iter() {
794 let metadata = session.value();
795
796 let Some(timestamp) = &metadata.first_timestamp else {
798 sessions_without_timestamps += 1;
799 continue;
800 };
801
802 sessions_with_timestamps += 1;
803
804 let model = metadata
806 .models_used
807 .first()
808 .map(|s| s.as_str())
809 .unwrap_or("unknown");
810
811 let cost = crate::pricing::calculate_cost(
813 model,
814 metadata.input_tokens,
815 metadata.output_tokens,
816 metadata.cache_creation_tokens,
817 metadata.cache_read_tokens,
818 );
819
820 manager.add_usage(
821 timestamp,
822 metadata.input_tokens,
823 metadata.output_tokens,
824 metadata.cache_creation_tokens,
825 metadata.cache_read_tokens,
826 cost,
827 );
828 }
829
830 debug!(
831 sessions_with_timestamps,
832 sessions_without_timestamps,
833 blocks = manager.get_all_blocks().len(),
834 "Billing blocks computed with real pricing"
835 );
836
837 let mut guard = self.billing_blocks.write();
838 *guard = manager;
839
840 self.event_bus.publish(DataEvent::LoadCompleted);
841 }
842
843 pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
845 self.billing_blocks.read()
846 }
847
848 pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
850 let settings = self.settings();
851 let plan = settings
852 .merged
853 .subscription_plan
854 .as_ref()
855 .map(|s| crate::usage_estimator::SubscriptionPlan::from_str(s))
856 .unwrap_or_default();
857
858 let billing_blocks = self.billing_blocks.read();
859 crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
860 }
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866 use tempfile::tempdir;
867
868 #[tokio::test]
869 async fn test_data_store_creation() {
870 let dir = tempdir().unwrap();
871 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
872
873 assert_eq!(store.session_count(), 0);
874 assert!(store.stats().is_none());
875 assert!(store.degraded_state().is_healthy());
876 }
877
878 #[tokio::test]
879 async fn test_initial_load_missing_dir() {
880 let dir = tempdir().unwrap();
881 let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);
882
883 let report = store.initial_load().await;
884
885 assert!(report.has_errors());
887 assert!(store.degraded_state().is_degraded());
888 }
889
890 #[tokio::test]
891 async fn test_initial_load_with_stats() {
892 let dir = tempdir().unwrap();
893 let claude_home = dir.path();
894
895 std::fs::write(
897 claude_home.join("stats-cache.json"),
898 r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
899 )
900 .unwrap();
901
902 std::fs::create_dir_all(claude_home.join("projects")).unwrap();
904
905 let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
906 let report = store.initial_load().await;
907
908 assert!(report.stats_loaded);
909 let stats = store.stats().unwrap();
910 assert_eq!(stats.total_tokens(), 1000);
911 assert_eq!(stats.session_count(), 5);
912 }
913
914 #[tokio::test]
915 async fn test_event_bus_subscription() {
916 let dir = tempdir().unwrap();
917 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
918
919 let mut rx = store.event_bus().subscribe();
920
921 store.event_bus().publish(DataEvent::StatsUpdated);
923
924 let event = rx.recv().await.unwrap();
925 assert!(matches!(event, DataEvent::StatsUpdated));
926 }
927
928 #[tokio::test]
929 async fn test_analytics_cache_and_invalidation() {
930 use crate::models::session::SessionMetadata;
931 use chrono::Utc;
932
933 let dir = tempdir().unwrap();
934 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
935
936 let now = Utc::now();
938 for i in 0..10 {
939 let total_tokens = 1000 * (i as u64 + 1);
940 let session = SessionMetadata {
941 id: format!("test-{}", i).into(),
942 file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
943 project_path: "/test".into(),
944 first_timestamp: Some(now - chrono::Duration::days(i)),
945 last_timestamp: Some(now),
946 message_count: 10,
947 total_tokens,
948 input_tokens: total_tokens / 2,
949 output_tokens: total_tokens / 3,
950 cache_creation_tokens: total_tokens / 10,
951 cache_read_tokens: total_tokens
952 - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
953 models_used: vec!["sonnet".to_string()],
954 file_size_bytes: 1024,
955 first_user_message: None,
956 has_subagents: false,
957 duration_seconds: Some(1800),
958 branch: None,
959 tool_usage: std::collections::HashMap::new(),
960 };
961 store.sessions.insert(session.id.clone(), Arc::new(session));
962 }
963
964 assert!(store.analytics().is_none());
966
967 store.compute_analytics(Period::last_7d()).await;
969
970 let analytics1 = store.analytics().expect("Analytics should be cached");
972 assert!(!analytics1.trends.is_empty());
973 assert_eq!(analytics1.period, Period::last_7d());
974
975 store.invalidate_analytics_cache();
977 assert!(store.analytics().is_none(), "Cache should be invalidated");
978
979 store.compute_analytics(Period::last_30d()).await;
981 let analytics2 = store.analytics().expect("Analytics should be re-cached");
982 assert_eq!(analytics2.period, Period::last_30d());
983 }
984
985 #[tokio::test]
986 async fn test_leaderboard_methods() {
987 use crate::models::session::SessionMetadata;
988 use chrono::Utc;
989
990 let dir = tempdir().unwrap();
991 let store = DataStore::with_defaults(dir.path().to_path_buf(), None);
992
993 let now = Utc::now();
994
995 let test_data = vec![
997 ("session-1", 5000u64, "opus", 0),
998 ("session-2", 3000u64, "sonnet", 1),
999 ("session-3", 8000u64, "haiku", 0),
1000 ("session-4", 2000u64, "sonnet", 2),
1001 ("session-5", 10000u64, "opus", 0),
1002 ];
1003
1004 for (id, tokens, model, days_ago) in test_data {
1005 let session = SessionMetadata {
1006 id: id.into(),
1007 file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
1008 project_path: "/test".into(),
1009 first_timestamp: Some(now - chrono::Duration::days(days_ago)),
1010 last_timestamp: Some(now),
1011 message_count: 10,
1012 total_tokens: tokens,
1013 input_tokens: tokens / 2,
1014 output_tokens: tokens / 2,
1015 cache_creation_tokens: 0,
1016 cache_read_tokens: 0,
1017 models_used: vec![model.to_string()],
1018 file_size_bytes: 1024,
1019 first_user_message: None,
1020 has_subagents: false,
1021 duration_seconds: Some(1800),
1022 branch: None,
1023 tool_usage: std::collections::HashMap::new(),
1024 };
1025 store.sessions.insert(session.id.clone(), Arc::new(session));
1026 }
1027
1028 let top_sessions = store.top_sessions_by_tokens(3);
1030 assert_eq!(top_sessions.len(), 3);
1031 assert_eq!(top_sessions[0].id, "session-5"); assert_eq!(top_sessions[1].id, "session-3"); assert_eq!(top_sessions[2].id, "session-1"); let top_models = store.top_models_by_tokens();
1037 assert!(!top_models.is_empty());
1038 assert_eq!(top_models[0].0, "opus");
1040 assert_eq!(top_models[0].1, 15000);
1041 assert_eq!(top_models[1].0, "haiku");
1042 assert_eq!(top_models[1].1, 8000);
1043
1044 let top_days = store.top_days_by_tokens();
1046 assert!(!top_days.is_empty());
1047 let today = now.format("%Y-%m-%d").to_string();
1049 assert_eq!(top_days[0].0, today);
1050 assert_eq!(top_days[0].1, 23000);
1051 }
1052}