ccboard-core 0.16.3

Core library for ccboard - parsers, models, store, watcher
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
//! Data store with DashMap + parking_lot::RwLock
//!
//! Uses DashMap for sessions (per-entry locking) and parking_lot::RwLock
//! for stats/settings (better fairness than std::sync::RwLock).

use crate::analytics::{AnalyticsData, Period};
use crate::cache::{MetadataCache, StoredAlert};
use crate::error::{CoreError, DegradedState, LoadReport};
use crate::event::{ConfigScope, DataEvent, EventBus};
use crate::models::activity::ActivitySummary;
use crate::models::{
    BillingBlockManager, InvocationStats, MergedConfig, SessionId, SessionMetadata, StatsCache,
};
use crate::parsers::{
    classify_tool_calls, parse_claude_global, parse_tool_calls, ClaudeGlobalStats,
    InvocationParser, McpConfig, Rules, SessionContentParser, SessionIndexParser, SettingsParser,
    StatsParser,
};
use dashmap::DashMap;
use moka::future::Cache;
use parking_lot::RwLock; // parking_lot > std::sync::RwLock: smaller (40B vs 72B), no poisoning, better fairness
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, warn};

/// Configuration for the data store
#[derive(Debug, Clone)]
pub struct DataStoreConfig {
    /// Maximum session metadata entries to keep
    pub max_session_metadata_count: usize,

    /// Maximum size for session content cache in MB
    pub max_session_content_cache_mb: usize,

    /// Maximum concurrent session scans
    pub max_concurrent_scans: usize,

    /// Stats parser retry count
    pub stats_retry_count: u32,

    /// Stats parser retry delay
    pub stats_retry_delay: Duration,
}

impl Default for DataStoreConfig {
    fn default() -> Self {
        Self {
            max_session_metadata_count: 10_000,
            max_session_content_cache_mb: 100,
            max_concurrent_scans: 8,
            stats_retry_count: 3,
            stats_retry_delay: Duration::from_millis(100),
        }
    }
}

/// Central data store for ccboard
///
/// Thread-safe access to all Claude Code data.
/// Uses DashMap for sessions (high contention) and RwLock for stats/settings (low contention).
pub struct DataStore {
    /// Path to Claude home directory
    claude_home: PathBuf,

    /// Current project path (if focused)
    project_path: Option<PathBuf>,

    /// Configuration
    config: DataStoreConfig,

    /// Stats cache (low contention, frequent reads)
    stats: RwLock<Option<StatsCache>>,

    /// Merged settings
    settings: RwLock<MergedConfig>,

    /// MCP server configuration
    mcp_config: RwLock<Option<McpConfig>>,

    /// Rules from CLAUDE.md files
    rules: RwLock<Rules>,

    /// Invocation statistics (agents, commands, skills)
    invocation_stats: RwLock<InvocationStats>,

    /// Billing blocks (5h usage tracking)
    billing_blocks: RwLock<BillingBlockManager>,

    /// Analytics data cache (invalidated on stats/sessions update)
    analytics_cache: RwLock<Option<AnalyticsData>>,

    /// Session metadata (high contention with many entries)
    /// Arc<SessionMetadata> for cheap cloning (8 bytes vs ~400 bytes)
    ///
    /// Why Arc over Box: Multi-thread access from TUI + Web frontends
    /// justifies atomic refcount overhead (~4 bytes). Box would require
    /// cloning entire struct on each frontend access.
    sessions: DashMap<SessionId, Arc<SessionMetadata>>,

    /// Session content cache (LRU for on-demand loading)
    #[allow(dead_code)]
    session_content_cache: Cache<SessionId, Vec<String>>,

    /// Event bus for notifying subscribers
    event_bus: EventBus,

    /// Current degraded state
    degraded_state: RwLock<DegradedState>,

    /// Metadata cache for 90% startup speedup (optional)
    metadata_cache: Option<Arc<MetadataCache>>,

    /// In-memory activity analysis results (populated by analyze_session)
    activity_results: DashMap<String, ActivitySummary>,

    /// Hook-based live session state (loaded from ~/.ccboard/live-sessions.json)
    live_hook_sessions: RwLock<crate::hook_state::LiveSessionFile>,

    /// Per-project last session stats from ~/.claude.json
    claude_global_stats: RwLock<Option<ClaudeGlobalStats>>,
}

/// Project leaderboard entry with aggregated metrics
#[derive(Debug, Clone)]
pub struct ProjectLeaderboardEntry {
    pub project_name: String,
    pub total_sessions: usize,
    pub total_tokens: u64,
    pub total_cost: f64,
    pub avg_session_cost: f64,
}

impl DataStore {
    /// Create a new data store
    pub fn new(
        claude_home: PathBuf,
        project_path: Option<PathBuf>,
        config: DataStoreConfig,
    ) -> Self {
        let session_content_cache = Cache::builder()
            .max_capacity((config.max_session_content_cache_mb * 1024 * 1024 / 1000) as u64) // Rough estimate
            .time_to_idle(Duration::from_secs(300)) // 5 min idle expiry
            .build();

        // Create metadata cache in ~/.claude/cache/
        let metadata_cache = {
            let cache_dir = claude_home.join("cache");
            match MetadataCache::new(&cache_dir) {
                Ok(cache) => {
                    debug!(path = %cache_dir.display(), "Metadata cache enabled");
                    Some(Arc::new(cache))
                }
                Err(e) => {
                    warn!(error = %e, "Failed to create metadata cache, running without cache");
                    None
                }
            }
        };

        Self {
            claude_home,
            project_path,
            config,
            stats: RwLock::new(None),
            settings: RwLock::new(MergedConfig::default()),
            mcp_config: RwLock::new(None),
            rules: RwLock::new(Rules::default()),
            invocation_stats: RwLock::new(InvocationStats::new()),
            billing_blocks: RwLock::new(BillingBlockManager::new()),
            analytics_cache: RwLock::new(None),
            sessions: DashMap::new(),
            session_content_cache,
            event_bus: EventBus::default_capacity(),
            degraded_state: RwLock::new(DegradedState::Healthy),
            metadata_cache,
            activity_results: DashMap::new(),
            live_hook_sessions: RwLock::new(crate::hook_state::LiveSessionFile::default()),
            claude_global_stats: RwLock::new(None),
        }
    }

    /// Create with default configuration
    pub fn with_defaults(claude_home: PathBuf, project_path: Option<PathBuf>) -> Self {
        Self::new(claude_home, project_path, DataStoreConfig::default())
    }

    /// Get the event bus for subscribing to updates
    pub fn event_bus(&self) -> &EventBus {
        &self.event_bus
    }

    /// Get current degraded state
    pub fn degraded_state(&self) -> DegradedState {
        self.degraded_state.read().clone()
    }

    /// Initial load of all data with LoadReport for graceful degradation
    pub async fn initial_load(&self) -> LoadReport {
        let mut report = LoadReport::new();

        info!(claude_home = %self.claude_home.display(), "Starting initial data load");

        // Load stats
        self.load_stats(&mut report).await;

        // Load ~/.claude.json global stats (per-project last session costs)
        if let Some(home) = dirs::home_dir() {
            if let Some(global) = parse_claude_global(&home) {
                *self.claude_global_stats.write() = Some(global);
                debug!("~/.claude.json loaded successfully");
            }
        }

        // Load settings
        self.load_settings(&mut report).await;

        // Load MCP configuration
        self.load_mcp_config(&mut report).await;

        // Load rules
        self.load_rules(&mut report).await;

        // Scan sessions
        self.scan_sessions(&mut report).await;

        // Determine degraded state
        self.update_degraded_state(&report);

        // Notify subscribers
        self.event_bus.publish(DataEvent::LoadCompleted);

        info!(
            stats_loaded = report.stats_loaded,
            settings_loaded = report.settings_loaded,
            sessions_scanned = report.sessions_scanned,
            sessions_failed = report.sessions_failed,
            errors = report.errors.len(),
            "Initial load complete"
        );

        report
    }

    /// Load stats cache
    async fn load_stats(&self, report: &mut LoadReport) {
        let stats_path = self.claude_home.join("stats-cache.json");
        let parser = StatsParser::new()
            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);

        if let Some(mut stats) = parser.parse_graceful(&stats_path, report).await {
            // Recalculate costs using accurate pricing
            stats.recalculate_costs();
            let mut guard = self.stats.write();
            *guard = Some(stats);
            debug!("Stats loaded successfully with recalculated costs");
        }
    }

    /// Load and merge settings
    async fn load_settings(&self, report: &mut LoadReport) {
        let parser = SettingsParser::new();
        let merged = parser
            .load_merged(&self.claude_home, self.project_path.as_deref(), report)
            .await;

        let mut guard = self.settings.write();
        *guard = merged;
        debug!("Settings loaded and merged");
    }

    /// Load MCP server configuration (global + project-level)
    async fn load_mcp_config(&self, report: &mut LoadReport) {
        match McpConfig::load_merged(&self.claude_home, self.project_path.as_deref()) {
            Ok(Some(config)) => {
                let server_count = config.servers.len();
                let mut guard = self.mcp_config.write();
                *guard = Some(config);
                debug!(
                    server_count,
                    "MCP config loaded successfully (global + project)"
                );
            }
            Ok(None) => {
                debug!("No MCP config found (optional)");
            }
            Err(e) => {
                use crate::error::LoadError;
                report.add_error(LoadError::error(
                    "mcp_config",
                    format!("Failed to parse MCP config: {}", e),
                ));
            }
        }
    }

    /// Load rules from CLAUDE.md files
    async fn load_rules(&self, report: &mut LoadReport) {
        match Rules::load(&self.claude_home, self.project_path.as_deref()) {
            Ok(rules) => {
                let has_global = rules.global.is_some();
                let has_project = rules.project.is_some();
                let mut guard = self.rules.write();
                *guard = rules;
                debug!(has_global, has_project, "Rules loaded");
            }
            Err(e) => {
                use crate::error::LoadError;
                report.add_error(LoadError::error(
                    "rules",
                    format!("Failed to load rules: {}", e),
                ));
            }
        }
    }

    /// Scan all sessions
    async fn scan_sessions(&self, report: &mut LoadReport) {
        let projects_dir = self.claude_home.join("projects");

        if !projects_dir.exists() {
            report.add_warning(
                "sessions",
                format!("Projects directory not found: {}", projects_dir.display()),
            );
            return;
        }

        let mut parser =
            SessionIndexParser::new().with_concurrency(self.config.max_concurrent_scans);

        // Enable metadata cache if available (90% speedup)
        if let Some(ref cache) = self.metadata_cache {
            parser = parser.with_cache(cache.clone());
        }

        let sessions = parser.scan_all(&projects_dir, report).await;

        // Enforce max count limit
        let sessions_to_add: Vec<_> = if sessions.len() > self.config.max_session_metadata_count {
            warn!(
                total = sessions.len(),
                limit = self.config.max_session_metadata_count,
                "Session count exceeds limit, keeping most recent"
            );

            let mut sorted = sessions;
            sorted.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
            sorted.truncate(self.config.max_session_metadata_count);
            sorted
        } else {
            sessions
        };

        // Insert into DashMap (wrap in Arc for cheap cloning)
        for session in sessions_to_add {
            self.sessions.insert(session.id.clone(), Arc::new(session));
        }

        debug!(count = self.sessions.len(), "Sessions indexed");
    }

    /// Update degraded state based on load report
    fn update_degraded_state(&self, report: &LoadReport) {
        let mut state = self.degraded_state.write();

        if report.has_fatal_errors() {
            *state = DegradedState::ReadOnly {
                reason: "Fatal errors during load".to_string(),
            };
            return;
        }

        let mut missing = Vec::new();

        if !report.stats_loaded {
            missing.push("stats".to_string());
        }
        if !report.settings_loaded {
            missing.push("settings".to_string());
        }
        if report.sessions_failed > 0 {
            missing.push(format!("{} sessions", report.sessions_failed));
        }

        if missing.is_empty() {
            *state = DegradedState::Healthy;
        } else {
            *state = DegradedState::PartialData {
                missing: missing.clone(),
                reason: format!("Missing: {}", missing.join(", ")),
            };
        }
    }

    // ===================
    // Read accessors
    // ===================

    /// Get a clone of stats
    pub fn stats(&self) -> Option<StatsCache> {
        self.stats.read().clone()
    }

    /// Calculate context window saturation from current sessions
    pub fn context_window_stats(&self) -> crate::models::ContextWindowStats {
        // Clone Arc (cheap) to avoid lifetime issues with DashMap iterators
        let sessions: Vec<_> = self
            .sessions
            .iter()
            .map(|entry| Arc::clone(entry.value()))
            .collect();
        // Dereference Arc to get &SessionMetadata
        let refs: Vec<_> = sessions.iter().map(|s| s.as_ref()).collect();
        crate::models::StatsCache::calculate_context_saturation(&refs, 30)
    }

    /// Get merged settings
    pub fn settings(&self) -> MergedConfig {
        self.settings.read().clone()
    }

    /// Get MCP server configuration
    pub fn mcp_config(&self) -> Option<McpConfig> {
        self.mcp_config.read().clone()
    }

    /// Get rules
    pub fn rules(&self) -> Rules {
        self.rules.read().clone()
    }

    /// Get invocation statistics
    pub fn invocation_stats(&self) -> InvocationStats {
        self.invocation_stats.read().clone()
    }

    /// Calculate current quota status from stats and budget config
    ///
    /// Returns None if stats are not loaded or budget is not configured.
    pub fn quota_status(&self) -> Option<crate::quota::QuotaStatus> {
        let stats = self.stats.read().clone()?;
        let settings = self.settings.read();
        let budget = settings.merged.budget.as_ref()?;

        Some(crate::quota::calculate_quota_status(&stats, budget))
    }

    /// Get live Claude Code sessions (running processes, ps-based)
    ///
    /// Detects active Claude processes on the system and returns metadata.
    /// Returns empty vector if detection fails or no processes are running.
    pub fn live_sessions(&self) -> Vec<crate::live_monitor::LiveSession> {
        crate::live_monitor::detect_live_sessions().unwrap_or_default()
    }

    /// Get merged live sessions: hook data + ps-based fallback
    ///
    /// Hook sessions are prioritized; unmatched ps sessions appear as ProcessOnly.
    pub fn merged_live_sessions(&self) -> Vec<crate::live_monitor::MergedLiveSession> {
        let hook_file = self.live_hook_sessions.read().clone();
        let ps_sessions = crate::live_monitor::detect_live_sessions().unwrap_or_default();
        crate::live_monitor::merge_live_sessions(&hook_file, &ps_sessions)
    }

    /// Reload hook-based live session state from a file path
    pub async fn reload_live_hook_sessions(&self, path: &std::path::Path) {
        match crate::hook_state::LiveSessionFile::load(path) {
            Ok(file) => {
                *self.live_hook_sessions.write() = file;
                debug!("Reloaded live hook sessions from {}", path.display());
            }
            Err(e) => {
                warn!(error = %e, "Failed to reload live-sessions.json");
            }
        }
    }

    /// Get per-project last session stats from ~/.claude.json
    pub fn claude_global_stats(&self) -> Option<ClaudeGlobalStats> {
        self.claude_global_stats.read().clone()
    }

    /// Get session count
    pub fn session_count(&self) -> usize {
        self.sessions.len()
    }

    /// Get session by ID
    /// Returns Arc<SessionMetadata> for cheap cloning
    pub fn get_session(&self, id: &str) -> Option<Arc<SessionMetadata>> {
        self.sessions.get(id).map(|r| Arc::clone(r.value()))
    }

    /// Load full session content with lazy caching
    ///
    /// Returns conversation messages parsed from session JSONL file.
    /// Uses Moka cache (LRU with 5min TTL) for repeated access.
    ///
    /// # Performance
    /// - First call: Parse JSONL (~50-500ms for 1000-message session)
    /// - Cached calls: <1ms (memory lookup)
    /// - Cache eviction: LRU + 5min idle timeout
    ///
    /// # Errors
    /// Returns CoreError if session not found or file cannot be read.
    pub async fn load_session_content(
        &self,
        session_id: &str,
    ) -> Result<Vec<crate::models::ConversationMessage>, CoreError> {
        // Get session metadata
        let metadata = self
            .get_session(session_id)
            .ok_or_else(|| CoreError::SessionNotFound {
                session_id: session_id.to_string(),
            })?;

        // Try cache first (Moka handles concurrency internally)
        let session_id_owned = SessionId::from(session_id);
        if let Some(_cached) = self.session_content_cache.get(&session_id_owned).await {
            debug!(session_id, "Session content cache HIT");
            // TODO: Cache design decision - caching Vec<String> vs Vec<ConversationMessage>
            // For now, always parse from file (will be optimized in cache phase)
        }

        // Cache miss: parse from file
        debug!(
            session_id,
            path = %metadata.file_path.display(),
            "Session content cache MISS, parsing JSONL"
        );

        let messages = SessionContentParser::parse_conversation(
            &metadata.file_path,
            (*metadata).clone(), // Clone metadata out of Arc
        )
        .await?;

        // Note: Cache insertion skipped for now (caching Vec<String> vs Vec<ConversationMessage> design decision)
        // Will be added in cache optimization phase

        Ok(messages)
    }

    /// Get analytics data for a period (cached)
    ///
    /// Returns cached analytics if available, otherwise None.
    /// Call `compute_analytics()` to compute and cache.
    pub fn analytics(&self) -> Option<AnalyticsData> {
        let analytics = self.analytics_cache.read().clone();
        debug!(
            has_analytics = analytics.is_some(),
            "analytics() getter called"
        );
        analytics
    }

    /// Compute and cache analytics data for a period
    ///
    /// This is a CPU-intensive operation (trends, forecasting, patterns).
    /// For 1000+ sessions, this may take 100-300ms, so it's offloaded
    /// to a blocking task.
    ///
    /// Cache is invalidated on stats reload or session updates (EventBus pattern).
    pub async fn compute_analytics(&self, period: Period) {
        let sessions: Vec<_> = self
            .sessions
            .iter()
            .map(|r| Arc::clone(r.value()))
            .collect();

        info!(
            session_count = sessions.len(),
            period = ?period,
            "compute_analytics() ENTRY"
        );

        // Offload to blocking task for CPU-intensive computation
        let analytics =
            tokio::task::spawn_blocking(move || AnalyticsData::compute(&sessions, period)).await;

        match analytics {
            Ok(data) => {
                info!(
                    insights_count = data.insights.len(),
                    "compute_analytics() computed data"
                );
                let mut guard = self.analytics_cache.write();
                *guard = Some(data);
                self.event_bus.publish(DataEvent::AnalyticsUpdated);
                info!("compute_analytics() EXIT - cached and event published");
            }
            Err(e) => {
                warn!(error = %e, "Failed to compute analytics (task panicked)");
            }
        }
    }

    /// Invalidate analytics cache (called on data changes)
    ///
    /// Note: Currently unused to prevent aggressive invalidation.
    /// Kept for future use if smart invalidation is needed.
    #[allow(dead_code)]
    fn invalidate_analytics_cache(&self) {
        let mut guard = self.analytics_cache.write();
        *guard = None;
        debug!("Analytics cache invalidated");
    }

    /// Get all session IDs
    pub fn session_ids(&self) -> Vec<SessionId> {
        self.sessions.iter().map(|r| r.key().clone()).collect()
    }

    /// Clear session content cache (for memory optimization on F5)
    pub fn clear_session_content_cache(&self) {
        self.session_content_cache.invalidate_all();
        debug!("Session content cache cleared");
    }

    /// Get sessions grouped by project
    /// Returns Arc<SessionMetadata> for cheap cloning
    pub fn sessions_by_project(
        &self,
    ) -> std::collections::HashMap<String, Vec<Arc<SessionMetadata>>> {
        let mut by_project = std::collections::HashMap::new();

        for entry in self.sessions.iter() {
            let session = Arc::clone(entry.value());
            by_project
                .entry(session.project_path.as_str().to_string())
                .or_insert_with(Vec::new)
                .push(session);
        }

        // Sort sessions within each project by timestamp (newest first)
        for sessions in by_project.values_mut() {
            sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
        }

        by_project
    }

    /// Get all sessions (unsorted)
    /// Returns Arc<SessionMetadata> for cheap cloning
    pub fn all_sessions(&self) -> Vec<Arc<SessionMetadata>> {
        self.sessions
            .iter()
            .map(|r| Arc::clone(r.value()))
            .collect()
    }

    /// Get recent sessions (sorted by last timestamp, newest first)
    /// Returns Arc<SessionMetadata> for cheap cloning
    pub fn recent_sessions(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
        let mut sessions = self.all_sessions();
        sessions.sort_by(|a, b| b.last_timestamp.cmp(&a.last_timestamp));
        sessions.truncate(limit);
        sessions
    }

    /// Search sessions using FTS5 full-text search.
    ///
    /// Returns relevance-ranked results. Returns empty vec if FTS5 not initialized.
    pub fn search_sessions(&self, query: &str, limit: usize) -> Vec<crate::cache::SearchResult> {
        if let Some(ref cache) = self.metadata_cache {
            match cache.search_sessions(query, limit) {
                Ok(results) => results,
                Err(e) => {
                    warn!("FTS5 search failed: {}", e);
                    Vec::new()
                }
            }
        } else {
            Vec::new()
        }
    }

    /// Analyze a session's tool calls and generate activity summary + alerts.
    ///
    /// Results are stored in the in-memory DashMap and the SQLite cache.
    /// Publishes DataEvent::AnalyticsUpdated on completion so the TUI re-renders.
    pub async fn analyze_session(&self, session_id: &str) -> anyhow::Result<ActivitySummary> {
        use std::time::SystemTime;

        let metadata = self
            .get_session(session_id)
            .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;

        let path = &metadata.file_path;

        // Read mtime once — used for both cache check and cache write to avoid TOCTOU.
        // Use tokio::fs to avoid blocking the executor thread.
        let mtime = tokio::fs::metadata(path)
            .await
            .and_then(|m| m.modified())
            .unwrap_or(SystemTime::UNIX_EPOCH);

        // Check SQLite cache first (avoids re-parsing unchanged files)
        if let Some(cache) = &self.metadata_cache {
            if let Ok(Some(cached)) = cache.get_activity(path, mtime) {
                self.activity_results
                    .insert(session_id.to_string(), cached.clone());
                self.event_bus.publish(DataEvent::AnalyticsUpdated);
                return Ok(cached);
            }
        }

        // Cache miss: parse JSONL
        let calls = parse_tool_calls(path, session_id).await?;

        let project_root = path
            .parent()
            .and_then(|p| p.parent())
            .map(|p| p.to_string_lossy().into_owned());

        let summary = classify_tool_calls(calls, session_id, project_root.as_deref());

        // Persist to SQLite cache — same mtime as used for cache check (no TOCTOU)
        if let Some(cache) = &self.metadata_cache {
            if let Err(e) = cache.put_activity(path, session_id, &summary, mtime) {
                warn!(session_id, error = %e, "Failed to cache activity — will re-parse on restart");
            }
        }

        // Store in memory + notify TUI
        self.activity_results
            .insert(session_id.to_string(), summary.clone());
        self.event_bus.publish(DataEvent::AnalyticsUpdated);

        Ok(summary)
    }

    /// Get the cached activity summary for a session (returns None if not yet analyzed).
    pub fn get_session_activity(&self, session_id: &str) -> Option<ActivitySummary> {
        self.activity_results
            .get(session_id)
            .map(|r| r.value().clone())
    }

    /// Get all stored security alerts from the SQLite cache.
    ///
    /// `min_severity`: optional filter — "Warning" or "Critical"
    pub fn get_all_stored_alerts(&self, min_severity: Option<&str>) -> Vec<StoredAlert> {
        if let Some(cache) = &self.metadata_cache {
            cache.get_all_alerts(min_severity).unwrap_or_default()
        } else {
            vec![]
        }
    }

    /// Consolidated violations feed: merges in-memory DashMap results (freshest) with
    /// SQLite-persisted alerts. DashMap takes priority for sessions analyzed this run.
    ///
    /// Returns alerts sorted Critical → Warning → Info, then by timestamp descending.
    pub fn all_violations(&self) -> Vec<crate::models::activity::Alert> {
        use crate::models::activity::{Alert, AlertSeverity};
        use std::collections::HashSet;

        // Collect session_ids already covered by the DashMap (in-memory, freshest data)
        let mut seen_sessions: HashSet<String> = HashSet::new();
        let mut alerts: Vec<Alert> = Vec::new();

        for entry in self.activity_results.iter() {
            seen_sessions.insert(entry.key().clone());
            alerts.extend(entry.value().alerts.clone());
        }

        // Supplement with SQLite alerts for sessions NOT in the DashMap
        if let Some(cache) = &self.metadata_cache {
            if let Ok(stored) = cache.get_all_alerts(None) {
                for sa in stored {
                    // Derive session_id from session_path (filename without extension)
                    let session_id = std::path::Path::new(&sa.session_path)
                        .file_stem()
                        .and_then(|s| s.to_str())
                        .unwrap_or(&sa.session_path)
                        .to_string();

                    if seen_sessions.contains(&session_id) {
                        continue; // DashMap version is fresher, skip SQLite duplicate
                    }

                    // Parse severity and category from stored strings
                    let severity = match sa.severity.as_str() {
                        "Critical" => AlertSeverity::Critical,
                        "Warning" => AlertSeverity::Warning,
                        _ => AlertSeverity::Info,
                    };
                    let category = match sa.category.as_str() {
                        "CredentialAccess" => {
                            crate::models::activity::AlertCategory::CredentialAccess
                        }
                        "DestructiveCommand" => {
                            crate::models::activity::AlertCategory::DestructiveCommand
                        }
                        "ForcePush" => crate::models::activity::AlertCategory::ForcePush,
                        "ScopeViolation" => crate::models::activity::AlertCategory::ScopeViolation,
                        _ => crate::models::activity::AlertCategory::ExternalExfil,
                    };
                    let timestamp = sa
                        .timestamp
                        .parse::<chrono::DateTime<chrono::Utc>>()
                        .unwrap_or_else(|_| chrono::Utc::now());

                    alerts.push(Alert {
                        session_id,
                        timestamp,
                        severity,
                        category,
                        detail: sa.detail,
                    });
                }
            }
        }

        // Sort: Critical > Warning > Info, then newest first within same severity
        alerts.sort_by(|a, b| {
            b.severity
                .partial_cmp(&a.severity)
                .unwrap_or(std::cmp::Ordering::Equal)
                .then_with(|| b.timestamp.cmp(&a.timestamp))
        });

        alerts
    }

    /// Get top sessions by total tokens (sorted descending)
    /// Returns Arc<SessionMetadata> for cheap cloning
    pub fn top_sessions_by_tokens(&self, limit: usize) -> Vec<Arc<SessionMetadata>> {
        let mut sessions: Vec<_> = self
            .sessions
            .iter()
            .map(|r| Arc::clone(r.value()))
            .collect();
        sessions.sort_by(|a, b| b.total_tokens.cmp(&a.total_tokens));
        sessions.truncate(limit);
        sessions
    }

    /// Get top models by total tokens (aggregated, sorted descending)
    /// Returns (model_name, total_tokens) pairs
    pub fn top_models_by_tokens(&self) -> Vec<(String, u64)> {
        let mut model_totals = std::collections::HashMap::new();

        // Aggregate tokens per model across all sessions
        for session in self.sessions.iter() {
            for model in &session.value().models_used {
                *model_totals.entry(model.clone()).or_insert(0) += session.value().total_tokens;
            }
        }

        // Convert to vec and sort
        let mut results: Vec<_> = model_totals.into_iter().collect();
        results.sort_by(|a, b| b.1.cmp(&a.1));
        results.truncate(10); // Top 10
        results
    }

    /// Get top days by total tokens (aggregated, sorted descending)
    /// Returns (date_string, total_tokens) pairs
    pub fn top_days_by_tokens(&self) -> Vec<(String, u64)> {
        let mut day_totals = std::collections::HashMap::new();

        // Aggregate tokens per day across all sessions
        for session in self.sessions.iter() {
            if let Some(timestamp) = &session.value().first_timestamp {
                let date = timestamp.format("%Y-%m-%d").to_string();
                *day_totals.entry(date).or_insert(0) += session.value().total_tokens;
            }
        }

        // Convert to vec and sort
        let mut results: Vec<_> = day_totals.into_iter().collect();
        results.sort_by(|a, b| b.1.cmp(&a.1));
        results.truncate(10); // Top 10
        results
    }

    /// Get project leaderboard with aggregated metrics
    ///
    /// Returns all projects with session count, total tokens, total cost, and average session cost.
    /// Cost is calculated using accurate model-based pricing from the pricing module.
    pub fn projects_leaderboard(&self) -> Vec<ProjectLeaderboardEntry> {
        let mut project_metrics = std::collections::HashMap::new();

        // Aggregate metrics per project
        for session in self.sessions.iter() {
            let metadata = session.value();
            let project_path = &metadata.project_path;

            // Get model for this session (use first model, or "unknown")
            let model = metadata
                .models_used
                .first()
                .map(|s| s.as_str())
                .unwrap_or("unknown");

            // Calculate cost using accurate pricing
            let cost = crate::pricing::calculate_cost(
                model,
                metadata.input_tokens,
                metadata.output_tokens,
                metadata.cache_creation_tokens,
                metadata.cache_read_tokens,
            );

            let entry = project_metrics
                .entry(project_path.clone())
                .or_insert((0, 0u64, 0.0f64)); // (session_count, total_tokens, total_cost)

            entry.0 += 1; // session count
            entry.1 += metadata.total_tokens; // total tokens
            entry.2 += cost; // total cost
        }

        // Convert to leaderboard entries
        let mut results: Vec<_> = project_metrics
            .into_iter()
            .map(
                |(project_path, (session_count, total_tokens, total_cost))| {
                    let avg_session_cost = if session_count > 0 {
                        total_cost / session_count as f64
                    } else {
                        0.0
                    };

                    // Extract project name from path (last component)
                    let project_name = std::path::Path::new(project_path.as_str())
                        .file_name()
                        .and_then(|n| n.to_str())
                        .unwrap_or(project_path.as_str())
                        .to_string();

                    ProjectLeaderboardEntry {
                        project_name,
                        total_sessions: session_count,
                        total_tokens,
                        total_cost,
                        avg_session_cost,
                    }
                },
            )
            .collect();

        // Default sort: by total cost descending
        results.sort_by(|a, b| {
            b.total_cost
                .partial_cmp(&a.total_cost)
                .unwrap_or(std::cmp::Ordering::Equal)
        });

        results
    }

    // ===================
    // Update methods (called by watcher)
    // ===================

    /// Reload stats (called on file change)
    pub async fn reload_stats(&self) {
        let stats_path = self.claude_home.join("stats-cache.json");
        let parser = StatsParser::new()
            .with_retries(self.config.stats_retry_count, self.config.stats_retry_delay);

        let mut report = LoadReport::new();
        if let Some(mut stats) = parser.parse_graceful(&stats_path, &mut report).await {
            // Recalculate costs using accurate pricing
            stats.recalculate_costs();
            let mut guard = self.stats.write();
            *guard = Some(stats);

            // Don't invalidate analytics - it will auto-recompute if needed
            // Instead, just publish the event so UI can decide whether to recompute
            self.event_bus.publish(DataEvent::StatsUpdated);
            debug!("Stats reloaded with recalculated costs");
        }
    }

    /// Reload settings from files (called when settings change)
    pub async fn reload_settings(&self) {
        let parser = SettingsParser::new();
        let merged = parser
            .load_merged(
                &self.claude_home,
                self.project_path.as_deref(),
                &mut LoadReport::new(),
            )
            .await;

        {
            let mut guard = self.settings.write();
            *guard = merged;
        }

        self.event_bus
            .publish(DataEvent::ConfigChanged(ConfigScope::Global));
        debug!("Settings reloaded");
    }

    /// Add or update a session (called when session file changes)
    pub async fn update_session(&self, path: &Path) {
        let parser = SessionIndexParser::new();

        match parser.scan_session(path).await {
            Ok(meta) => {
                let id = meta.id.clone();
                let is_new = !self.sessions.contains_key(&id);

                self.sessions.insert(id.clone(), Arc::new(meta));

                // Don't invalidate analytics on every session update - too aggressive
                // Analytics will be recomputed on demand or periodically
                // Only invalidate on significant changes (detected by UI)

                if is_new {
                    self.event_bus.publish(DataEvent::SessionCreated(id));
                } else {
                    self.event_bus.publish(DataEvent::SessionUpdated(id));
                }
            }
            Err(e) => {
                warn!(path = %path.display(), error = %e, "Failed to update session");
            }
        }
    }

    /// Compute invocation statistics from all sessions
    ///
    /// This scans all session files to count agent/command/skill invocations.
    /// Should be called after initial load or when sessions are updated.
    pub async fn compute_invocations(&self) {
        let paths: Vec<_> = self
            .sessions
            .iter()
            .map(|r| r.value().file_path.clone())
            .collect();

        debug!(session_count = paths.len(), "Computing invocation stats");

        let parser = InvocationParser::new();
        let mut stats = parser.scan_sessions(&paths).await;

        // Populate agent_token_stats from session tool_token_usage
        // The Task tool tokens serve as a proxy for agent token consumption
        for session_ref in self.sessions.iter() {
            let session = session_ref.value();
            if let Some(&task_tokens) = session.tool_token_usage.get("Task") {
                // Distribute Task tool tokens equally among agents spawned in this session
                let agent_count =
                    session.tool_usage.get("Task").copied().unwrap_or(0).max(1) as u64;
                let tokens_per_agent = task_tokens / agent_count;
                // Attribute to all agent types found in stats that were invoked
                for agent_type in stats.agents.keys().cloned().collect::<Vec<_>>() {
                    *stats.agent_token_stats.entry(agent_type).or_insert(0) += tokens_per_agent;
                }
            }
        }

        let mut guard = self.invocation_stats.write();
        *guard = stats;

        debug!(
            agents = guard.agents.len(),
            commands = guard.commands.len(),
            skills = guard.skills.len(),
            total = guard.total_invocations(),
            "Invocation stats computed"
        );

        // Note: Using LoadCompleted as there's no specific invocation stats event
        self.event_bus.publish(DataEvent::LoadCompleted);
    }

    /// Compute billing blocks from all sessions
    ///
    /// This scans all sessions with timestamps and aggregates usage into 5-hour billing blocks.
    /// Uses real model pricing based on token breakdown for accurate cost calculation.
    pub async fn compute_billing_blocks(&self) {
        debug!("Computing billing blocks from sessions with real pricing");

        let mut manager = BillingBlockManager::new();
        let mut sessions_with_timestamps = 0;
        let mut sessions_without_timestamps = 0;

        for session in self.sessions.iter() {
            let metadata = session.value();

            // Skip sessions without timestamps
            let Some(timestamp) = &metadata.first_timestamp else {
                sessions_without_timestamps += 1;
                continue;
            };

            sessions_with_timestamps += 1;

            // Get model for this session (use first model, or "unknown")
            let model = metadata
                .models_used
                .first()
                .map(|s| s.as_str())
                .unwrap_or("unknown");

            // Calculate real cost using pricing table
            let cost = crate::pricing::calculate_cost(
                model,
                metadata.input_tokens,
                metadata.output_tokens,
                metadata.cache_creation_tokens,
                metadata.cache_read_tokens,
            );

            manager.add_usage(
                timestamp,
                metadata.input_tokens,
                metadata.output_tokens,
                metadata.cache_creation_tokens,
                metadata.cache_read_tokens,
                cost,
            );
        }

        debug!(
            sessions_with_timestamps,
            sessions_without_timestamps,
            blocks = manager.get_all_blocks().len(),
            "Billing blocks computed with real pricing"
        );

        let mut guard = self.billing_blocks.write();
        *guard = manager;

        self.event_bus.publish(DataEvent::LoadCompleted);
    }

    /// Get billing blocks (read-only access)
    pub fn billing_blocks(&self) -> parking_lot::RwLockReadGuard<'_, BillingBlockManager> {
        self.billing_blocks.read()
    }

    /// Calculate usage estimate based on billing blocks and subscription plan
    pub fn usage_estimate(&self) -> crate::usage_estimator::UsageEstimate {
        let settings = self.settings();
        let plan = settings
            .merged
            .subscription_plan
            .as_ref()
            .map(|s| crate::usage_estimator::SubscriptionPlan::parse(s))
            .unwrap_or_default();

        let billing_blocks = self.billing_blocks.read();
        crate::usage_estimator::calculate_usage_estimate(&billing_blocks, plan)
    }

    /// Load ccboard user preferences from the cache directory.
    pub fn load_preferences(&self) -> crate::preferences::CcboardPreferences {
        let cache_dir = self.claude_home.join("cache");
        crate::preferences::CcboardPreferences::load(&cache_dir)
    }

    /// Save ccboard user preferences to the cache directory.
    pub fn save_preferences(
        &self,
        prefs: &crate::preferences::CcboardPreferences,
    ) -> anyhow::Result<()> {
        let cache_dir = self.claude_home.join("cache");
        prefs.save(&cache_dir)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[tokio::test]
    async fn test_data_store_creation() {
        let dir = tempdir().unwrap();
        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);

        assert_eq!(store.session_count(), 0);
        assert!(store.stats().is_none());
        assert!(store.degraded_state().is_healthy());
    }

    #[tokio::test]
    async fn test_initial_load_missing_dir() {
        let dir = tempdir().unwrap();
        let store = DataStore::with_defaults(dir.path().join("nonexistent"), None);

        let report = store.initial_load().await;

        // Should have warnings but not crash
        assert!(report.has_errors());
        assert!(store.degraded_state().is_degraded());
    }

    #[tokio::test]
    async fn test_initial_load_with_stats() {
        let dir = tempdir().unwrap();
        let claude_home = dir.path();

        // Create stats file with new format
        std::fs::write(
            claude_home.join("stats-cache.json"),
            r#"{"version": 2, "totalSessions": 5, "totalMessages": 100, "modelUsage": {"test": {"inputTokens": 600, "outputTokens": 400}}}"#,
        )
        .unwrap();

        // Create projects dir
        std::fs::create_dir_all(claude_home.join("projects")).unwrap();

        let store = DataStore::with_defaults(claude_home.to_path_buf(), None);
        let report = store.initial_load().await;

        assert!(report.stats_loaded);
        let stats = store.stats().unwrap();
        assert_eq!(stats.total_tokens(), 1000);
        assert_eq!(stats.session_count(), 5);
    }

    #[tokio::test]
    async fn test_event_bus_subscription() {
        let dir = tempdir().unwrap();
        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);

        let mut rx = store.event_bus().subscribe();

        // Trigger load completed
        store.event_bus().publish(DataEvent::StatsUpdated);

        let event = rx.recv().await.unwrap();
        assert!(matches!(event, DataEvent::StatsUpdated));
    }

    #[tokio::test]
    async fn test_analytics_cache_and_invalidation() {
        use crate::models::session::SessionMetadata;
        use chrono::Utc;

        let dir = tempdir().unwrap();
        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);

        // Add test sessions
        let now = Utc::now();
        for i in 0..10 {
            let total_tokens = 1000 * (i as u64 + 1);
            let session = SessionMetadata {
                id: format!("test-{}", i).into(),
                file_path: std::path::PathBuf::from(format!("/test-{}.jsonl", i)),
                project_path: "/test".into(),
                first_timestamp: Some(now - chrono::Duration::days(i)),
                last_timestamp: Some(now),
                message_count: 10,
                total_tokens,
                input_tokens: total_tokens / 2,
                output_tokens: total_tokens / 3,
                cache_creation_tokens: total_tokens / 10,
                cache_read_tokens: total_tokens
                    - (total_tokens / 2 + total_tokens / 3 + total_tokens / 10),
                models_used: vec!["sonnet".to_string()],
                file_size_bytes: 1024,
                first_user_message: None,
                has_subagents: false,
                duration_seconds: Some(1800),
                branch: None,
                tool_usage: std::collections::HashMap::new(),
                tool_token_usage: std::collections::HashMap::new(),
            };
            store.sessions.insert(session.id.clone(), Arc::new(session));
        }

        // Initially no analytics
        assert!(store.analytics().is_none());

        // Compute analytics
        store.compute_analytics(Period::last_7d()).await;

        // Analytics should be cached
        let analytics1 = store.analytics().expect("Analytics should be cached");
        assert!(!analytics1.trends.is_empty());
        assert_eq!(analytics1.period, Period::last_7d());

        // Invalidate by reloading stats
        store.invalidate_analytics_cache();
        assert!(store.analytics().is_none(), "Cache should be invalidated");

        // Re-compute with different period
        store.compute_analytics(Period::last_30d()).await;
        let analytics2 = store.analytics().expect("Analytics should be re-cached");
        assert_eq!(analytics2.period, Period::last_30d());
    }

    #[tokio::test]
    async fn test_leaderboard_methods() {
        use crate::models::session::SessionMetadata;
        use chrono::Utc;

        let dir = tempdir().unwrap();
        let store = DataStore::with_defaults(dir.path().to_path_buf(), None);

        let now = Utc::now();

        // Add sessions with varying tokens
        let test_data = vec![
            ("session-1", 5000u64, "opus", 0),
            ("session-2", 3000u64, "sonnet", 1),
            ("session-3", 8000u64, "haiku", 0),
            ("session-4", 2000u64, "sonnet", 2),
            ("session-5", 10000u64, "opus", 0),
        ];

        for (id, tokens, model, days_ago) in test_data {
            let session = SessionMetadata {
                id: id.into(),
                file_path: std::path::PathBuf::from(format!("/{}.jsonl", id)),
                project_path: "/test".into(),
                first_timestamp: Some(now - chrono::Duration::days(days_ago)),
                last_timestamp: Some(now),
                message_count: 10,
                total_tokens: tokens,
                input_tokens: tokens / 2,
                output_tokens: tokens / 2,
                cache_creation_tokens: 0,
                cache_read_tokens: 0,
                models_used: vec![model.to_string()],
                file_size_bytes: 1024,
                first_user_message: None,
                has_subagents: false,
                duration_seconds: Some(1800),
                branch: None,
                tool_usage: std::collections::HashMap::new(),
                tool_token_usage: std::collections::HashMap::new(),
            };
            store.sessions.insert(session.id.clone(), Arc::new(session));
        }

        // Test top_sessions_by_tokens
        let top_sessions = store.top_sessions_by_tokens(3);
        assert_eq!(top_sessions.len(), 3);
        assert_eq!(top_sessions[0].id, "session-5"); // 10000 tokens
        assert_eq!(top_sessions[1].id, "session-3"); // 8000 tokens
        assert_eq!(top_sessions[2].id, "session-1"); // 5000 tokens

        // Test top_models_by_tokens
        let top_models = store.top_models_by_tokens();
        assert!(!top_models.is_empty());
        // opus: 15000 (5000+10000), sonnet: 5000 (3000+2000), haiku: 8000
        assert_eq!(top_models[0].0, "opus");
        assert_eq!(top_models[0].1, 15000);
        assert_eq!(top_models[1].0, "haiku");
        assert_eq!(top_models[1].1, 8000);

        // Test top_days_by_tokens
        let top_days = store.top_days_by_tokens();
        assert!(!top_days.is_empty());
        // Day 0 (today): 5000+8000+10000 = 23000
        let today = now.format("%Y-%m-%d").to_string();
        assert_eq!(top_days[0].0, today);
        assert_eq!(top_days[0].1, 23000);
    }

    /// C3: DashMap takes priority over SQLite in all_violations()
    ///
    /// Verifies the merge strategy:
    /// - Same session_id in both → DashMap version returned (fresher)
    /// - Session only in SQLite → SQLite version returned (fills the gap)
    #[test]
    fn test_all_violations_dashmap_priority_over_sqlite() {
        use crate::models::activity::{ActivitySummary, Alert, AlertCategory, AlertSeverity};
        use chrono::Utc;

        let dir = tempdir().unwrap();
        let claude_home = dir.path().to_path_buf();
        let store = DataStore::with_defaults(claude_home.clone(), None);

        let now = Utc::now();

        // ── Setup: SQLite alert (via MetadataCache directly) ──────────────────
        let cache = MetadataCache::new(&claude_home.join("cache"))
            .expect("MetadataCache should open in tempdir");

        // "shared-session" exists in SQLite with a Warning-level alert
        let sqlite_summary = ActivitySummary {
            alerts: vec![Alert {
                session_id: "shared-session".to_string(),
                timestamp: now,
                severity: AlertSeverity::Warning,
                category: AlertCategory::DestructiveCommand,
                detail: "sqlite-version".to_string(),
            }],
            ..Default::default()
        };
        cache
            .put_activity(
                std::path::Path::new("/projects/test/shared-session.jsonl"),
                "shared-session",
                &sqlite_summary,
                std::time::SystemTime::now(),
            )
            .expect("put_activity should succeed");

        // "sqlite-only-session" exists exclusively in SQLite
        let sqlite_only_summary = ActivitySummary {
            alerts: vec![Alert {
                session_id: "sqlite-only-session".to_string(),
                timestamp: now,
                severity: AlertSeverity::Info,
                category: AlertCategory::ExternalExfil,
                detail: "sqlite-only-detail".to_string(),
            }],
            ..Default::default()
        };
        cache
            .put_activity(
                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
                "sqlite-only-session",
                &sqlite_only_summary,
                std::time::SystemTime::now(),
            )
            .expect("put_activity should succeed");

        // Attach the same DB to the store's metadata_cache field
        // (accessible from within the same module in #[cfg(test)])
        // The store already opened the same cache dir during with_defaults(),
        // so both share the same SQLite file.
        // We must write through the *store*'s own cache handle to avoid lock
        // conflicts. Retrieve it:
        let store_cache = store
            .metadata_cache
            .as_ref()
            .expect("MetadataCache should be present in store");

        store_cache
            .put_activity(
                std::path::Path::new("/projects/test/shared-session.jsonl"),
                "shared-session",
                &sqlite_summary,
                std::time::SystemTime::now(),
            )
            .expect("put_activity via store cache should succeed");
        store_cache
            .put_activity(
                std::path::Path::new("/projects/test/sqlite-only-session.jsonl"),
                "sqlite-only-session",
                &sqlite_only_summary,
                std::time::SystemTime::now(),
            )
            .expect("put_activity via store cache should succeed");

        // ── Setup: DashMap alert for the shared session (fresher, Critical) ───
        let dashmap_summary = ActivitySummary {
            alerts: vec![Alert {
                session_id: "shared-session".to_string(),
                timestamp: now,
                severity: AlertSeverity::Critical,
                category: AlertCategory::ForcePush,
                detail: "dashmap-version".to_string(),
            }],
            ..Default::default()
        };
        store
            .activity_results
            .insert("shared-session".to_string(), dashmap_summary);

        // ── Assert ────────────────────────────────────────────────────────────
        let violations = store.all_violations();

        // The DashMap version must appear
        let dashmap_hit = violations.iter().find(|a| a.session_id == "shared-session");
        assert!(
            dashmap_hit.is_some(),
            "shared-session alert must appear in violations"
        );
        assert_eq!(
            dashmap_hit.unwrap().detail,
            "dashmap-version",
            "DashMap version must take priority over SQLite for shared session"
        );
        assert_eq!(
            dashmap_hit.unwrap().severity,
            AlertSeverity::Critical,
            "DashMap severity (Critical) must win over SQLite (Warning)"
        );

        // The SQLite-only version must appear (fills the gap)
        let sqlite_hit = violations
            .iter()
            .find(|a| a.session_id == "sqlite-only-session");
        assert!(
            sqlite_hit.is_some(),
            "sqlite-only-session must appear in violations (no DashMap entry for it)"
        );
        assert_eq!(sqlite_hit.unwrap().detail, "sqlite-only-detail");

        // The SQLite version of shared-session must NOT appear
        let sqlite_dup = violations
            .iter()
            .filter(|a| a.session_id == "shared-session")
            .count();
        assert_eq!(
            sqlite_dup, 1,
            "shared-session must appear exactly once (DashMap wins, no SQLite duplicate)"
        );

        // Sorting: Critical before Info
        assert_eq!(violations[0].severity, AlertSeverity::Critical);
    }
}