1use frankensqlite::Connection;
13use frankensqlite::Row;
14use frankensqlite::compat::{ConnectionExt, RowExt};
15use serde::Serialize;
16use std::collections::BTreeMap;
17
18use super::query::{query_breakdown, query_tokens_timeseries, table_exists};
19use super::types::{AnalyticsFilter, Dim, GroupBy, Metric};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
27#[serde(rename_all = "lowercase")]
28pub enum Severity {
29 Info,
30 Warning,
31 Error,
32}
33
34#[derive(Debug, Clone, Serialize)]
36pub struct Check {
37 pub id: String,
38 pub ok: bool,
39 pub severity: Severity,
40 pub details: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub suggested_action: Option<String>,
43}
44
45#[derive(Debug, Clone, Serialize)]
47pub struct DriftEntry {
48 pub day_id: i64,
49 pub agent_slug: String,
50 pub source_id: String,
51 pub track_a_total: i64,
52 pub track_b_total: i64,
53 pub delta: i64,
54 pub delta_pct: f64,
55 pub likely_cause: String,
56}
57
58#[derive(Debug, Clone, Serialize)]
60pub struct SamplingMeta {
61 pub buckets_checked: usize,
62 pub buckets_total: usize,
63 pub mode: String, }
65
66#[derive(Debug, Clone, Serialize)]
68pub struct ReportMeta {
69 pub elapsed_ms: u64,
70 pub sampling: SamplingMeta,
71 pub path: String,
72}
73
74#[derive(Debug, Clone, Serialize)]
76pub struct ValidationReport {
77 pub checks: Vec<Check>,
78 pub drift: Vec<DriftEntry>,
79 pub _meta: ReportMeta,
80}
81
82impl ValidationReport {
83 pub fn all_ok(&self) -> bool {
85 self.checks.iter().all(|c| c.ok)
86 }
87
88 pub fn count_failures(&self, sev: Severity) -> usize {
90 self.checks
91 .iter()
92 .filter(|c| !c.ok && c.severity == sev)
93 .count()
94 }
95
96 pub fn to_json(&self) -> serde_json::Value {
98 serde_json::to_value(self).unwrap_or(serde_json::json!({"error": "serialization failed"}))
99 }
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
104#[serde(rename_all = "snake_case")]
105pub enum RepairKind {
106 RebuildTrackA,
107 RebuildTrackB,
113 TrackAllRebuildUnavailable,
119 ManualReview,
120}
121
122#[derive(Debug, Clone, Serialize)]
124pub struct RepairDecision {
125 pub kind: RepairKind,
126 pub fixable: bool,
127 pub check_ids: Vec<String>,
128 pub reason: String,
129}
130
131#[derive(Debug, Clone, Serialize)]
133pub struct RepairPlan {
134 pub apply_track_a_rebuild: bool,
135 pub apply_track_b_rebuild: bool,
139 pub decisions: Vec<RepairDecision>,
140}
141
142pub fn build_repair_plan(report: &ValidationReport) -> RepairPlan {
144 let mut grouped: BTreeMap<RepairKind, Vec<String>> = BTreeMap::new();
145
146 for check in report.checks.iter().filter(|check| !check.ok) {
147 let kind = classify_repair_kind(check, report);
148 grouped.entry(kind).or_default().push(check.id.clone());
149 }
150
151 let decisions = grouped
152 .into_iter()
153 .map(|(kind, mut check_ids)| {
154 check_ids.sort();
155 RepairDecision {
156 fixable: matches!(kind, RepairKind::RebuildTrackA | RepairKind::RebuildTrackB),
157 reason: repair_reason(kind).into(),
158 kind,
159 check_ids,
160 }
161 })
162 .collect::<Vec<_>>();
163
164 let apply_track_a_rebuild = decisions
165 .iter()
166 .any(|decision| decision.kind == RepairKind::RebuildTrackA);
167 let apply_track_b_rebuild = decisions
168 .iter()
169 .any(|decision| decision.kind == RepairKind::RebuildTrackB);
170
171 RepairPlan {
172 apply_track_a_rebuild,
173 apply_track_b_rebuild,
174 decisions,
175 }
176}
177
178fn classify_repair_kind(check: &Check, report: &ValidationReport) -> RepairKind {
179 if check.id.starts_with("track_a.") {
180 return RepairKind::RebuildTrackA;
181 }
182
183 if check.id == "cross_track.drift" {
184 if report.drift.iter().all(|entry| {
185 entry.likely_cause.starts_with("Track A missing rows")
186 || entry.likely_cause.starts_with("Track B higher")
187 }) {
188 return RepairKind::RebuildTrackA;
189 }
190 return RepairKind::TrackAllRebuildUnavailable;
191 }
192
193 if check.id.starts_with("track_b.") {
194 match check.id.as_str() {
204 "track_b.tables_exist" | "track_b.agents_table_missing" | "track_b.query_exec" => {
209 RepairKind::TrackAllRebuildUnavailable
210 }
211 _ => RepairKind::RebuildTrackB,
216 }
217 } else {
218 RepairKind::ManualReview
219 }
220}
221
222fn repair_reason(kind: RepairKind) -> &'static str {
223 match kind {
224 RepairKind::RebuildTrackA => {
225 "Track A rollups are derivable from raw messages and can be rebuilt safely."
226 }
227 RepairKind::RebuildTrackB => {
228 "Track B rollups are derivable from the intact token_usage ledger and can be rebuilt safely via rebuild_token_daily_stats()."
229 }
230 RepairKind::TrackAllRebuildUnavailable => {
231 "Track B ledger or cross-track precondition is missing; a full canonical replay is required and is not implemented by --fix. Run 'cass doctor check --json' and restore or repair the canonical archive before rebuilding derived assets."
232 }
233 RepairKind::ManualReview => {
234 "This validation failure does not have a proven automatic repair path."
235 }
236 }
237}
238
239#[derive(Debug, Clone)]
245pub struct ValidateConfig {
246 pub sample_buckets: usize,
249 pub drift_abs_threshold: i64,
251 pub drift_pct_threshold: f64,
253}
254
255impl Default for ValidateConfig {
256 fn default() -> Self {
257 Self {
258 sample_buckets: 20,
259 drift_abs_threshold: 10,
260 drift_pct_threshold: 1.0,
261 }
262 }
263}
264
265impl ValidateConfig {
266 pub fn deep() -> Self {
268 Self {
269 sample_buckets: 0,
270 ..Default::default()
271 }
272 }
273}
274
275pub fn run_validation(conn: &Connection, config: &ValidateConfig) -> ValidationReport {
281 let start = std::time::Instant::now();
282 let mut checks = Vec::new();
283 let mut buckets_checked: usize = 0;
284 let mut buckets_total: usize = 0;
285
286 let (a_checks, a_checked, a_total) = validate_track_a(conn, config);
288 checks.extend(a_checks);
289 buckets_checked += a_checked;
290 buckets_total += a_total;
291
292 let (b_checks, b_checked, b_total) = validate_track_b(conn, config);
294 checks.extend(b_checks);
295 buckets_checked += b_checked;
296 buckets_total += b_total;
297
298 let (d_checks, d_entries) = validate_cross_track_drift(conn, config);
300 checks.extend(d_checks);
301 let drift = d_entries;
302
303 checks.extend(validate_non_negative_counters(conn));
305
306 let elapsed_ms = start.elapsed().as_millis() as u64;
307 let mode = if config.sample_buckets == 0 {
308 "deep"
309 } else {
310 "sample"
311 };
312
313 ValidationReport {
314 checks,
315 drift,
316 _meta: ReportMeta {
317 elapsed_ms,
318 sampling: SamplingMeta {
319 buckets_checked,
320 buckets_total,
321 mode: mode.into(),
322 },
323 path: "rollup".into(),
324 },
325 }
326}
327
328fn query_executes(conn: &Connection, sql: &str) -> Result<(), String> {
329 conn.query_map_collect(sql, &[], |_row: &Row| Ok(()))
330 .map(|_| ())
331 .map_err(|err| err.to_string())
332}
333
334fn query_exec_error_check(id: &str, details: String, suggested_action: &str) -> Check {
335 Check {
336 id: id.into(),
337 ok: false,
338 severity: Severity::Error,
339 details,
340 suggested_action: Some(suggested_action.into()),
341 }
342}
343
344fn validate_track_a(conn: &Connection, config: &ValidateConfig) -> (Vec<Check>, usize, usize) {
352 let mut checks = Vec::new();
353
354 if !table_exists(conn, "usage_daily") || !table_exists(conn, "message_metrics") {
355 checks.push(Check {
356 id: "track_a.tables_exist".into(),
357 ok: false,
358 severity: Severity::Error,
359 details: "Track A tables missing (usage_daily or message_metrics)".into(),
360 suggested_action: Some("Run 'cass analytics rebuild'".into()),
361 });
362 return (checks, 0, 0);
363 }
364
365 checks.push(Check {
366 id: "track_a.tables_exist".into(),
367 ok: true,
368 severity: Severity::Info,
369 details: "Track A tables exist".into(),
370 suggested_action: None,
371 });
372
373 let total_buckets: usize = conn
375 .query_row_map("SELECT COUNT(*) FROM usage_daily", &[], |r: &Row| {
376 r.get_typed::<i64>(0).map(|v| v as usize)
377 })
378 .unwrap_or(0);
379
380 let limit_clause = if config.sample_buckets > 0 {
381 format!("LIMIT {}", config.sample_buckets)
382 } else {
383 String::new()
384 };
385
386 let sql = format!(
388 "SELECT ud.day_id, ud.agent_slug, ud.workspace_id, ud.source_id,
389 ud.content_tokens_est_total,
390 COALESCE(mm.sum_content, 0),
391 ud.message_count,
392 COALESCE(mm.sum_msgs, 0),
393 ud.api_tokens_total,
394 COALESCE(mm.sum_api, 0),
395 ud.api_coverage_message_count,
396 COALESCE(mm.sum_api_coverage, 0)
397 FROM usage_daily ud
398 LEFT JOIN (
399 SELECT day_id, agent_slug, workspace_id, source_id,
400 SUM(content_tokens_est) AS sum_content,
401 COUNT(*) AS sum_msgs,
402 SUM(CASE WHEN api_data_source = 'api'
403 THEN COALESCE(api_input_tokens, 0)
404 + COALESCE(api_output_tokens, 0)
405 + COALESCE(api_cache_read_tokens, 0)
406 + COALESCE(api_cache_creation_tokens, 0)
407 + COALESCE(api_thinking_tokens, 0)
408 ELSE 0 END) AS sum_api,
409 SUM(CASE WHEN api_data_source = 'api' THEN 1 ELSE 0 END) AS sum_api_coverage
410 FROM message_metrics
411 GROUP BY day_id, agent_slug, workspace_id, source_id
412 ) mm ON ud.day_id = mm.day_id
413 AND ud.agent_slug = mm.agent_slug
414 AND ud.workspace_id = mm.workspace_id
415 AND ud.source_id = mm.source_id
416 ORDER BY ud.day_id DESC
417 {limit_clause}"
418 );
419
420 if total_buckets == 0 {
421 if let Err(err) = query_executes(conn, &sql) {
422 checks.push(query_exec_error_check(
423 "track_a.query_exec",
424 format!("Track A invariant query failed: {err}"),
425 "Run 'cass analytics rebuild --track a' or verify the analytics schema",
426 ));
427 return (checks, 0, 0);
428 }
429
430 checks.push(Check {
431 id: "track_a.has_data".into(),
432 ok: false,
433 severity: Severity::Warning,
434 details: "usage_daily is empty".into(),
435 suggested_action: Some("Run 'cass analytics rebuild'".into()),
436 });
437 return (checks, 0, 0);
438 }
439
440 let mut mismatches_content = 0_usize;
441 let mut mismatches_msg_count = 0_usize;
442 let mut mismatches_api = 0_usize;
443 let mut mismatches_api_cov = 0_usize;
444 let mut checked = 0_usize;
445
446 let rows = match conn.query_map_collect(&sql, &[], |row: &Row| {
447 Ok((
448 row.get_typed::<i64>(0)?, row.get_typed::<String>(1)?, row.get_typed::<i64>(4)?, row.get_typed::<i64>(5)?, row.get_typed::<i64>(6)?, row.get_typed::<i64>(7)?, row.get_typed::<i64>(8)?, row.get_typed::<i64>(9)?, row.get_typed::<i64>(10)?, row.get_typed::<i64>(11)?, ))
459 }) {
460 Ok(rows) => rows,
461 Err(err) => {
462 checks.push(query_exec_error_check(
463 "track_a.query_exec",
464 format!("Track A invariant query failed: {err}"),
465 "Run 'cass analytics rebuild --track a' or verify the analytics schema",
466 ));
467 return (checks, 0, total_buckets);
468 }
469 };
470
471 for row in rows {
472 checked += 1;
473 let (
474 _day_id,
475 _agent,
476 ud_content,
477 mm_content,
478 ud_msgs,
479 mm_msgs,
480 ud_api,
481 mm_api,
482 ud_cov,
483 mm_cov,
484 ) = row;
485 if ud_content != mm_content {
486 mismatches_content += 1;
487 }
488 if ud_msgs != mm_msgs {
489 mismatches_msg_count += 1;
490 }
491 if ud_api != mm_api {
492 mismatches_api += 1;
493 }
494 if ud_cov != mm_cov {
495 mismatches_api_cov += 1;
496 }
497 }
498
499 checks.push(Check {
501 id: "track_a.content_tokens_match".into(),
502 ok: mismatches_content == 0,
503 severity: if mismatches_content > 0 {
504 Severity::Error
505 } else {
506 Severity::Info
507 },
508 details: format!(
509 "content_tokens_est_total: {mismatches_content}/{checked} buckets mismatched"
510 ),
511 suggested_action: if mismatches_content > 0 {
512 Some("Run 'cass analytics rebuild --track a'".into())
513 } else {
514 None
515 },
516 });
517
518 checks.push(Check {
520 id: "track_a.message_count_match".into(),
521 ok: mismatches_msg_count == 0,
522 severity: if mismatches_msg_count > 0 {
523 Severity::Error
524 } else {
525 Severity::Info
526 },
527 details: format!("message_count: {mismatches_msg_count}/{checked} buckets mismatched"),
528 suggested_action: if mismatches_msg_count > 0 {
529 Some("Run 'cass analytics rebuild --track a'".into())
530 } else {
531 None
532 },
533 });
534
535 checks.push(Check {
537 id: "track_a.api_tokens_match".into(),
538 ok: mismatches_api == 0,
539 severity: if mismatches_api > 0 {
540 Severity::Error
541 } else {
542 Severity::Info
543 },
544 details: format!("api_tokens_total: {mismatches_api}/{checked} buckets mismatched"),
545 suggested_action: if mismatches_api > 0 {
546 Some("Run 'cass analytics rebuild --track a'".into())
547 } else {
548 None
549 },
550 });
551
552 checks.push(Check {
554 id: "track_a.api_coverage_match".into(),
555 ok: mismatches_api_cov == 0,
556 severity: if mismatches_api_cov > 0 {
557 Severity::Warning
558 } else {
559 Severity::Info
560 },
561 details: format!(
562 "api_coverage_message_count: {mismatches_api_cov}/{checked} buckets mismatched"
563 ),
564 suggested_action: if mismatches_api_cov > 0 {
565 Some("Run 'cass analytics rebuild --track a'".into())
566 } else {
567 None
568 },
569 });
570
571 (checks, checked, total_buckets)
572}
573
574fn validate_track_b(conn: &Connection, config: &ValidateConfig) -> (Vec<Check>, usize, usize) {
580 let mut checks = Vec::new();
581
582 if !table_exists(conn, "token_daily_stats") || !table_exists(conn, "token_usage") {
583 checks.push(Check {
584 id: "track_b.tables_exist".into(),
585 ok: false,
586 severity: Severity::Error,
587 details: "Track B tables missing (token_daily_stats or token_usage)".into(),
588 suggested_action: Some(
589 "Run 'cass analytics rebuild --track all' (requires z9fse.13)".into(),
590 ),
591 });
592 return (checks, 0, 0);
593 }
594
595 checks.push(Check {
596 id: "track_b.tables_exist".into(),
597 ok: true,
598 severity: Severity::Info,
599 details: "Track B tables exist".into(),
600 suggested_action: None,
601 });
602
603 let total_buckets: usize = conn
604 .query_row_map("SELECT COUNT(*) FROM token_daily_stats", &[], |r: &Row| {
605 r.get_typed::<i64>(0).map(|v| v as usize)
606 })
607 .unwrap_or(0);
608
609 let limit_clause = if config.sample_buckets > 0 {
610 format!("LIMIT {}", config.sample_buckets)
611 } else {
612 String::new()
613 };
614
615 let has_agents_table = table_exists(conn, "agents");
618
619 let sql = if has_agents_table {
620 format!(
621 "SELECT tds.day_id, tds.agent_slug, tds.source_id, tds.model_family,
622 tds.grand_total_tokens,
623 COALESCE(tu.sum_total, 0),
624 tds.total_tool_calls,
625 COALESCE(tu.sum_tools, 0),
626 tds.api_call_count,
627 COALESCE(tu.sum_rows, 0)
628 FROM token_daily_stats tds
629 LEFT JOIN (
630 SELECT t.day_id,
631 a.slug AS agent_slug,
632 t.source_id,
633 COALESCE(t.model_family, 'unknown') AS model_family,
634 SUM(COALESCE(t.total_tokens, 0)) AS sum_total,
635 SUM(t.tool_call_count) AS sum_tools,
636 COUNT(*) AS sum_rows
637 FROM token_usage t
638 JOIN agents a ON a.id = t.agent_id
639 GROUP BY t.day_id, a.slug, t.source_id, COALESCE(t.model_family, 'unknown')
640 ) tu ON tds.day_id = tu.day_id
641 AND tds.agent_slug = tu.agent_slug
642 AND tds.source_id = tu.source_id
643 AND tds.model_family = tu.model_family
644 ORDER BY tds.day_id DESC
645 {limit_clause}"
646 )
647 } else {
648 checks.push(Check {
650 id: "track_b.agents_table_missing".into(),
651 ok: false,
652 severity: Severity::Warning,
653 details: "agents table not found — cannot validate Track B granular invariants".into(),
654 suggested_action: None,
655 });
656 return (checks, 0, total_buckets);
657 };
658
659 if total_buckets == 0 {
660 if let Err(err) = query_executes(conn, &sql) {
661 checks.push(query_exec_error_check(
662 "track_b.query_exec",
663 format!("Track B invariant query failed: {err}"),
664 "Run 'cass analytics rebuild --track all' or verify the analytics schema",
665 ));
666 return (checks, 0, 0);
667 }
668
669 checks.push(Check {
670 id: "track_b.has_data".into(),
671 ok: false,
672 severity: Severity::Warning,
673 details: "token_daily_stats is empty".into(),
674 suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
675 });
676 return (checks, 0, 0);
677 }
678
679 let mut mismatches_total = 0_usize;
680 let mut mismatches_tools = 0_usize;
681 let mut checked = 0_usize;
682
683 let rows = match conn.query_map_collect(&sql, &[], |row: &Row| {
684 Ok((
685 row.get_typed::<i64>(4)?, row.get_typed::<i64>(5)?, row.get_typed::<i64>(6)?, row.get_typed::<i64>(7)?, ))
690 }) {
691 Ok(rows) => rows,
692 Err(err) => {
693 checks.push(query_exec_error_check(
694 "track_b.query_exec",
695 format!("Track B invariant query failed: {err}"),
696 "Run 'cass analytics rebuild --track all' or verify the analytics schema",
697 ));
698 return (checks, 0, total_buckets);
699 }
700 };
701
702 for row in rows {
703 checked += 1;
704 let (tds_total, tu_total, tds_tools, tu_tools) = row;
705 if tds_total != tu_total {
706 mismatches_total += 1;
707 }
708 if tds_tools != tu_tools {
709 mismatches_tools += 1;
710 }
711 }
712
713 checks.push(Check {
714 id: "track_b.grand_total_match".into(),
715 ok: mismatches_total == 0,
716 severity: if mismatches_total > 0 {
717 Severity::Error
718 } else {
719 Severity::Info
720 },
721 details: format!("grand_total_tokens: {mismatches_total}/{checked} buckets mismatched"),
722 suggested_action: if mismatches_total > 0 {
723 Some("Run 'cass analytics rebuild --track all'".into())
724 } else {
725 None
726 },
727 });
728
729 checks.push(Check {
730 id: "track_b.tool_calls_match".into(),
731 ok: mismatches_tools == 0,
732 severity: if mismatches_tools > 0 {
733 Severity::Warning
734 } else {
735 Severity::Info
736 },
737 details: format!("total_tool_calls: {mismatches_tools}/{checked} buckets mismatched"),
738 suggested_action: if mismatches_tools > 0 {
739 Some("Run 'cass analytics rebuild --track all'".into())
740 } else {
741 None
742 },
743 });
744
745 (checks, checked, total_buckets)
746}
747
748fn validate_cross_track_drift(
754 conn: &Connection,
755 config: &ValidateConfig,
756) -> (Vec<Check>, Vec<DriftEntry>) {
757 let mut checks = Vec::new();
758 let mut entries = Vec::new();
759
760 let has_a = table_exists(conn, "usage_daily");
761 let has_b = table_exists(conn, "token_daily_stats");
762
763 if !has_a || !has_b {
764 let missing = if !has_a && !has_b {
765 "both tracks"
766 } else if !has_a {
767 "Track A (usage_daily)"
768 } else {
769 "Track B (token_daily_stats)"
770 };
771 checks.push(Check {
772 id: "cross_track.tables_exist".into(),
773 ok: false,
774 severity: Severity::Warning,
775 details: format!("Cannot compute cross-track drift: {missing} missing"),
776 suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
777 });
778 return (checks, entries);
779 }
780
781 let mut drift_count = 0_usize;
782 let mut drift_checked = 0_usize;
783 let mut merged = BTreeMap::<(i64, String, String), (i64, i64)>::new();
784
785 let track_a_rows = match conn.query_map_collect(
786 "SELECT day_id, agent_slug, source_id, SUM(api_tokens_total) AS api_total
787 FROM usage_daily
788 GROUP BY day_id, agent_slug, source_id",
789 &[],
790 |row: &Row| {
791 Ok((
792 row.get_typed::<i64>(0)?,
793 row.get_typed::<String>(1)?,
794 row.get_typed::<String>(2)?,
795 row.get_typed::<i64>(3)?,
796 ))
797 },
798 ) {
799 Ok(rows) => rows,
800 Err(err) => {
801 checks.push(Check {
802 id: "cross_track.query_exec".into(),
803 ok: false,
804 severity: Severity::Error,
805 details: format!("Cross-track drift query failed while reading Track A: {err}"),
806 suggested_action: Some(
807 "Run 'cass analytics rebuild --track all' or verify the analytics schema"
808 .into(),
809 ),
810 });
811 return (checks, entries);
812 }
813 };
814
815 for (day_id, agent_slug, source_id, total) in track_a_rows {
816 merged
817 .entry((day_id, agent_slug, source_id))
818 .or_insert((0, 0))
819 .0 = total;
820 }
821
822 let track_b_rows = match conn.query_map_collect(
823 "SELECT day_id, agent_slug, source_id, SUM(grand_total_tokens) AS grand_total
824 FROM token_daily_stats
825 GROUP BY day_id, agent_slug, source_id",
826 &[],
827 |row: &Row| {
828 Ok((
829 row.get_typed::<i64>(0)?,
830 row.get_typed::<String>(1)?,
831 row.get_typed::<String>(2)?,
832 row.get_typed::<i64>(3)?,
833 ))
834 },
835 ) {
836 Ok(rows) => rows,
837 Err(err) => {
838 checks.push(Check {
839 id: "cross_track.query_exec".into(),
840 ok: false,
841 severity: Severity::Error,
842 details: format!("Cross-track drift query failed while reading Track B: {err}"),
843 suggested_action: Some(
844 "Run 'cass analytics rebuild --track all' or verify the analytics schema"
845 .into(),
846 ),
847 });
848 return (checks, entries);
849 }
850 };
851
852 for (day_id, agent_slug, source_id, total) in track_b_rows {
853 merged
854 .entry((day_id, agent_slug, source_id))
855 .or_insert((0, 0))
856 .1 = total;
857 }
858
859 let mut rows: Vec<_> = merged.into_iter().collect();
860 rows.sort_by(|left, right| {
861 right
862 .0
863 .0
864 .cmp(&left.0.0)
865 .then_with(|| left.0.1.cmp(&right.0.1))
866 .then_with(|| left.0.2.cmp(&right.0.2))
867 });
868 if config.sample_buckets > 0 && rows.len() > config.sample_buckets {
869 rows.truncate(config.sample_buckets);
870 }
871
872 for ((day_id, agent_slug, source_id), (a_total, b_total)) in rows {
873 drift_checked += 1;
874 let delta = a_total.saturating_sub(b_total);
875 let denom = a_total.max(b_total).max(1);
876 let abs_delta = delta.unsigned_abs();
877 let delta_pct = (abs_delta as f64 / denom as f64) * 100.0;
878
879 if abs_delta > config.drift_abs_threshold as u64 && delta_pct > config.drift_pct_threshold {
880 drift_count += 1;
881 let likely_cause = if a_total > 0 && b_total == 0 {
882 "Track B missing rows (rebuild needed or not yet ingested)"
883 } else if b_total > 0 && a_total == 0 {
884 "Track A missing rows (rebuild needed)"
885 } else if a_total > b_total {
886 "Track A higher — Track B may be stale or missing some messages"
887 } else {
888 "Track B higher — Track A may have been rebuilt recently without all data"
889 };
890
891 entries.push(DriftEntry {
892 day_id,
893 agent_slug,
894 source_id,
895 track_a_total: a_total,
896 track_b_total: b_total,
897 delta,
898 delta_pct: (delta_pct * 100.0).round() / 100.0,
899 likely_cause: likely_cause.into(),
900 });
901 }
902 }
903
904 let total_ok = drift_count == 0;
905 checks.push(Check {
906 id: "cross_track.drift".into(),
907 ok: total_ok,
908 severity: if drift_count > 0 {
909 Severity::Warning
910 } else {
911 Severity::Info
912 },
913 details: format!(
914 "Cross-track drift: {drift_count}/{drift_checked} day+agent+source slices drifted"
915 ),
916 suggested_action: if drift_count > 0 {
917 Some("Run 'cass analytics rebuild --track all' to re-sync both tracks".into())
918 } else {
919 None
920 },
921 });
922
923 (checks, entries)
924}
925
926fn validate_non_negative_counters(conn: &Connection) -> Vec<Check> {
932 let mut checks = Vec::new();
933
934 if table_exists(conn, "usage_daily") {
936 let cols = [
937 "message_count",
938 "user_message_count",
939 "assistant_message_count",
940 "tool_call_count",
941 "plan_message_count",
942 "api_coverage_message_count",
943 "content_tokens_est_total",
944 "api_tokens_total",
945 ];
946 let cond = cols
947 .iter()
948 .map(|c| format!("{c} < 0"))
949 .collect::<Vec<_>>()
950 .join(" OR ");
951 let sql = format!("SELECT COUNT(*) FROM usage_daily WHERE {cond}");
952 match conn.query_row_map(&sql, &[], |r: &Row| r.get_typed::<i64>(0)) {
953 Ok(negative_rows) => {
954 checks.push(Check {
955 id: "track_a.non_negative_counters".into(),
956 ok: negative_rows == 0,
957 severity: if negative_rows > 0 {
958 Severity::Error
959 } else {
960 Severity::Info
961 },
962 details: format!("usage_daily: {negative_rows} rows with negative counters"),
963 suggested_action: if negative_rows > 0 {
964 Some("Run 'cass analytics rebuild --track a'".into())
965 } else {
966 None
967 },
968 });
969 }
970 Err(err) => {
971 checks.push(Check {
972 id: "track_a.non_negative_counters".into(),
973 ok: false,
974 severity: Severity::Error,
975 details: format!("usage_daily negative-counter query failed: {err}"),
976 suggested_action: Some(
977 "Run 'cass analytics rebuild --track a' or verify the analytics schema"
978 .into(),
979 ),
980 });
981 }
982 }
983 }
984
985 if table_exists(conn, "usage_daily") {
987 match conn.query_row_map(
988 "SELECT COUNT(*) FROM usage_daily WHERE api_coverage_message_count > message_count",
989 &[],
990 |r: &Row| r.get_typed::<i64>(0),
991 ) {
992 Ok(bad) => {
993 checks.push(Check {
994 id: "track_a.coverage_lte_messages".into(),
995 ok: bad == 0,
996 severity: if bad > 0 {
997 Severity::Warning
998 } else {
999 Severity::Info
1000 },
1001 details: format!(
1002 "usage_daily: {bad} rows where api_coverage_message_count > message_count"
1003 ),
1004 suggested_action: if bad > 0 {
1005 Some("Run 'cass analytics rebuild --track a'".into())
1006 } else {
1007 None
1008 },
1009 });
1010 }
1011 Err(err) => {
1012 checks.push(Check {
1013 id: "track_a.coverage_lte_messages".into(),
1014 ok: false,
1015 severity: Severity::Error,
1016 details: format!("usage_daily coverage query failed: {err}"),
1017 suggested_action: Some(
1018 "Run 'cass analytics rebuild --track a' or verify the analytics schema"
1019 .into(),
1020 ),
1021 });
1022 }
1023 }
1024 }
1025
1026 if table_exists(conn, "token_daily_stats") {
1028 let cols = [
1029 "api_call_count",
1030 "total_input_tokens",
1031 "total_output_tokens",
1032 "grand_total_tokens",
1033 "total_tool_calls",
1034 ];
1035 let cond = cols
1036 .iter()
1037 .map(|c| format!("{c} < 0"))
1038 .collect::<Vec<_>>()
1039 .join(" OR ");
1040 let sql = format!("SELECT COUNT(*) FROM token_daily_stats WHERE {cond}");
1041 match conn.query_row_map(&sql, &[], |r: &Row| r.get_typed::<i64>(0)) {
1042 Ok(negative_rows) => {
1043 checks.push(Check {
1044 id: "track_b.non_negative_counters".into(),
1045 ok: negative_rows == 0,
1046 severity: if negative_rows > 0 {
1047 Severity::Error
1048 } else {
1049 Severity::Info
1050 },
1051 details: format!(
1052 "token_daily_stats: {negative_rows} rows with negative counters"
1053 ),
1054 suggested_action: if negative_rows > 0 {
1055 Some("Run 'cass analytics rebuild --track all'".into())
1056 } else {
1057 None
1058 },
1059 });
1060 }
1061 Err(err) => {
1062 checks.push(Check {
1063 id: "track_b.non_negative_counters".into(),
1064 ok: false,
1065 severity: Severity::Error,
1066 details: format!("token_daily_stats negative-counter query failed: {err}"),
1067 suggested_action: Some(
1068 "Run 'cass analytics rebuild --track all' or verify the analytics schema"
1069 .into(),
1070 ),
1071 });
1072 }
1073 }
1074 }
1075
1076 checks
1077}
1078
1079#[derive(Debug, Clone, Serialize)]
1085pub struct PerfMeasurement {
1086 pub id: String,
1087 pub elapsed_ms: u64,
1088 pub budget_ms: u64,
1089 pub within_budget: bool,
1090 #[serde(skip_serializing_if = "Option::is_none")]
1091 pub error: Option<String>,
1092 pub details: String,
1093}
1094
1095pub fn perf_query_guardrail(conn: &Connection) -> PerfMeasurement {
1097 let start = std::time::Instant::now();
1098
1099 let budget_ms = 500_u64; if !table_exists(conn, "usage_daily") {
1103 let elapsed_ms = start.elapsed().as_millis() as u64;
1104 return PerfMeasurement {
1105 id: "perf.query_timeseries".into(),
1106 elapsed_ms,
1107 budget_ms,
1108 within_budget: true,
1109 error: None,
1110 details: "Skipped timeseries rollup query: usage_daily table missing".into(),
1111 };
1112 }
1113
1114 let result = query_tokens_timeseries(conn, &AnalyticsFilter::default(), GroupBy::Day);
1115 let elapsed_ms = start.elapsed().as_millis() as u64;
1116
1117 match result {
1118 Ok(result) => PerfMeasurement {
1119 id: "perf.query_timeseries".into(),
1120 elapsed_ms,
1121 budget_ms,
1122 within_budget: elapsed_ms <= budget_ms,
1123 error: None,
1124 details: format!(
1125 "Timeseries rollup query: {} day buckets in {elapsed_ms}ms",
1126 result.buckets.len()
1127 ),
1128 },
1129 Err(err) => PerfMeasurement {
1130 id: "perf.query_timeseries".into(),
1131 elapsed_ms,
1132 budget_ms,
1133 within_budget: false,
1134 error: Some(err.to_string()),
1135 details: format!("Timeseries rollup query failed after {elapsed_ms}ms: {err}"),
1136 },
1137 }
1138}
1139
1140pub fn perf_breakdown_guardrail(conn: &Connection) -> PerfMeasurement {
1142 let start = std::time::Instant::now();
1143 let budget_ms = 200_u64;
1144
1145 if !table_exists(conn, "usage_daily") {
1146 let elapsed_ms = start.elapsed().as_millis() as u64;
1147 return PerfMeasurement {
1148 id: "perf.query_breakdown".into(),
1149 elapsed_ms,
1150 budget_ms,
1151 within_budget: true,
1152 error: None,
1153 details: "Skipped breakdown query: usage_daily table missing".into(),
1154 };
1155 }
1156
1157 let result = query_breakdown(
1158 conn,
1159 &AnalyticsFilter::default(),
1160 Dim::Agent,
1161 Metric::ApiTotal,
1162 25,
1163 );
1164 let elapsed_ms = start.elapsed().as_millis() as u64;
1165
1166 match result {
1167 Ok(result) => PerfMeasurement {
1168 id: "perf.query_breakdown".into(),
1169 elapsed_ms,
1170 budget_ms,
1171 within_budget: elapsed_ms <= budget_ms,
1172 error: None,
1173 details: format!(
1174 "Breakdown query: {} agent groups in {elapsed_ms}ms",
1175 result.rows.len()
1176 ),
1177 },
1178 Err(err) => PerfMeasurement {
1179 id: "perf.query_breakdown".into(),
1180 elapsed_ms,
1181 budget_ms,
1182 within_budget: false,
1183 error: Some(err.to_string()),
1184 details: format!("Breakdown query failed after {elapsed_ms}ms: {err}"),
1185 },
1186 }
1187}
1188
1189#[cfg(test)]
1194mod tests {
1195 use super::*;
1196
1197 fn setup_track_a_fixture() -> Connection {
1201 let conn = Connection::open(":memory:").unwrap();
1202 conn.execute_batch(
1203 "CREATE TABLE message_metrics (
1204 message_id INTEGER PRIMARY KEY,
1205 created_at_ms INTEGER NOT NULL,
1206 hour_id INTEGER NOT NULL,
1207 day_id INTEGER NOT NULL,
1208 agent_slug TEXT NOT NULL,
1209 workspace_id INTEGER NOT NULL DEFAULT 0,
1210 source_id TEXT NOT NULL DEFAULT 'local',
1211 role TEXT NOT NULL,
1212 content_chars INTEGER NOT NULL,
1213 content_tokens_est INTEGER NOT NULL,
1214 api_input_tokens INTEGER,
1215 api_output_tokens INTEGER,
1216 api_cache_read_tokens INTEGER,
1217 api_cache_creation_tokens INTEGER,
1218 api_thinking_tokens INTEGER,
1219 api_service_tier TEXT,
1220 api_data_source TEXT NOT NULL DEFAULT 'estimated',
1221 tool_call_count INTEGER NOT NULL DEFAULT 0,
1222 has_tool_calls INTEGER NOT NULL DEFAULT 0,
1223 has_plan INTEGER NOT NULL DEFAULT 0
1224 );
1225 CREATE TABLE usage_daily (
1226 day_id INTEGER NOT NULL,
1227 agent_slug TEXT NOT NULL,
1228 workspace_id INTEGER NOT NULL DEFAULT 0,
1229 source_id TEXT NOT NULL DEFAULT 'local',
1230 message_count INTEGER NOT NULL DEFAULT 0,
1231 user_message_count INTEGER NOT NULL DEFAULT 0,
1232 assistant_message_count INTEGER NOT NULL DEFAULT 0,
1233 tool_call_count INTEGER NOT NULL DEFAULT 0,
1234 plan_message_count INTEGER NOT NULL DEFAULT 0,
1235 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
1236 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
1237 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
1238 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
1239 api_tokens_total INTEGER NOT NULL DEFAULT 0,
1240 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
1241 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
1242 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
1243 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
1244 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
1245 last_updated INTEGER NOT NULL DEFAULT 0,
1246 PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
1247 );",
1248 )
1249 .unwrap();
1250
1251 conn.execute_batch(
1253 "INSERT INTO message_metrics VALUES
1254 (1, 1750000000000, 416666, 20254, 'claude_code', 1, 'local', 'user', 400, 100, NULL, NULL, NULL, NULL, NULL, NULL, 'estimated', 0, 0, 0),
1255 (2, 1750000000001, 416666, 20254, 'claude_code', 1, 'local', 'assistant', 800, 200, 500, 300, 50, 20, 10, NULL, 'api', 3, 1, 0),
1256 (3, 1750000000002, 416666, 20254, 'claude_code', 1, 'local', 'user', 600, 150, NULL, NULL, NULL, NULL, NULL, NULL, 'estimated', 0, 0, 0);
1257 INSERT INTO usage_daily VALUES
1258 (20254, 'claude_code', 1, 'local',
1259 3, 2, 1, 3, 0, 1,
1260 450, 250, 200,
1261 880, 500, 300, 50, 20, 10,
1262 0);",
1263 )
1264 .unwrap();
1265
1266 conn
1267 }
1268
1269 fn setup_both_tracks_fixture() -> Connection {
1271 let conn = setup_track_a_fixture();
1272
1273 conn.execute_batch(
1274 "CREATE TABLE agents (
1275 id INTEGER PRIMARY KEY,
1276 slug TEXT NOT NULL UNIQUE
1277 );
1278 INSERT INTO agents VALUES (1, 'claude_code');
1279
1280 CREATE TABLE token_usage (
1281 id INTEGER PRIMARY KEY AUTOINCREMENT,
1282 message_id INTEGER NOT NULL,
1283 conversation_id INTEGER NOT NULL,
1284 agent_id INTEGER NOT NULL,
1285 workspace_id INTEGER,
1286 source_id TEXT NOT NULL DEFAULT 'local',
1287 timestamp_ms INTEGER NOT NULL,
1288 day_id INTEGER NOT NULL,
1289 model_name TEXT,
1290 model_family TEXT,
1291 model_tier TEXT,
1292 service_tier TEXT,
1293 provider TEXT,
1294 input_tokens INTEGER,
1295 output_tokens INTEGER,
1296 cache_read_tokens INTEGER,
1297 cache_creation_tokens INTEGER,
1298 thinking_tokens INTEGER,
1299 total_tokens INTEGER,
1300 estimated_cost_usd REAL,
1301 role TEXT NOT NULL,
1302 content_chars INTEGER NOT NULL,
1303 has_tool_calls INTEGER NOT NULL DEFAULT 0,
1304 tool_call_count INTEGER NOT NULL DEFAULT 0,
1305 data_source TEXT NOT NULL DEFAULT 'api',
1306 UNIQUE(message_id)
1307 );
1308
1309 CREATE TABLE token_daily_stats (
1310 day_id INTEGER NOT NULL,
1311 agent_slug TEXT NOT NULL,
1312 source_id TEXT NOT NULL DEFAULT 'all',
1313 model_family TEXT NOT NULL DEFAULT 'all',
1314 api_call_count INTEGER NOT NULL DEFAULT 0,
1315 user_message_count INTEGER NOT NULL DEFAULT 0,
1316 assistant_message_count INTEGER NOT NULL DEFAULT 0,
1317 tool_message_count INTEGER NOT NULL DEFAULT 0,
1318 total_input_tokens INTEGER NOT NULL DEFAULT 0,
1319 total_output_tokens INTEGER NOT NULL DEFAULT 0,
1320 total_cache_read_tokens INTEGER NOT NULL DEFAULT 0,
1321 total_cache_creation_tokens INTEGER NOT NULL DEFAULT 0,
1322 total_thinking_tokens INTEGER NOT NULL DEFAULT 0,
1323 grand_total_tokens INTEGER NOT NULL DEFAULT 0,
1324 total_content_chars INTEGER NOT NULL DEFAULT 0,
1325 total_tool_calls INTEGER NOT NULL DEFAULT 0,
1326 estimated_cost_usd REAL NOT NULL DEFAULT 0.0,
1327 session_count INTEGER NOT NULL DEFAULT 0,
1328 last_updated INTEGER NOT NULL,
1329 PRIMARY KEY (day_id, agent_slug, source_id, model_family)
1330 );
1331
1332 -- Insert matching token_usage for message 2 (the only api-sourced message).
1333 INSERT INTO token_usage VALUES
1334 (1, 2, 100, 1, 1, 'local', 1750000000001, 20254,
1335 'claude-opus-4', 'opus', 'opus', NULL, 'anthropic',
1336 500, 300, 50, 20, 10, 880, 0.05, 'assistant', 800, 1, 3, 'api');
1337
1338 -- Token daily stats matching the token_usage.
1339 INSERT INTO token_daily_stats VALUES
1340 (20254, 'claude_code', 'local', 'opus',
1341 1, 0, 1, 0,
1342 500, 300, 50, 20, 10, 880,
1343 800, 3, 0.05, 1, 0);",
1344 )
1345 .unwrap();
1346
1347 conn
1348 }
1349
1350 #[test]
1353 fn consistent_track_a_passes() {
1354 let conn = setup_track_a_fixture();
1355 let config = ValidateConfig::deep();
1356 let report = run_validation(&conn, &config);
1357
1358 let track_a_checks: Vec<_> = report
1360 .checks
1361 .iter()
1362 .filter(|c| c.id.starts_with("track_a."))
1363 .collect();
1364 assert!(!track_a_checks.is_empty());
1365 for c in &track_a_checks {
1366 assert!(c.ok, "Check {} failed: {}", c.id, c.details);
1367 }
1368 }
1369
1370 #[test]
1371 fn drifted_track_a_detects_mismatch() {
1372 let conn = setup_track_a_fixture();
1373
1374 conn.execute("UPDATE usage_daily SET content_tokens_est_total = 9999 WHERE day_id = 20254")
1376 .unwrap();
1377
1378 let config = ValidateConfig::deep();
1379 let report = run_validation(&conn, &config);
1380
1381 let content_check = report
1382 .checks
1383 .iter()
1384 .find(|c| c.id == "track_a.content_tokens_match")
1385 .expect("should have content tokens check");
1386 assert!(!content_check.ok, "Should detect content tokens mismatch");
1387 assert!(content_check.suggested_action.is_some());
1388 }
1389
1390 #[test]
1391 fn drifted_track_a_message_count_detected() {
1392 let conn = setup_track_a_fixture();
1393
1394 conn.execute("UPDATE usage_daily SET message_count = 999 WHERE day_id = 20254")
1396 .unwrap();
1397
1398 let config = ValidateConfig::deep();
1399 let report = run_validation(&conn, &config);
1400
1401 let msg_check = report
1402 .checks
1403 .iter()
1404 .find(|c| c.id == "track_a.message_count_match")
1405 .expect("should have message count check");
1406 assert!(!msg_check.ok);
1407 }
1408
1409 #[test]
1410 fn consistent_both_tracks_passes() {
1411 let conn = setup_both_tracks_fixture();
1412 let config = ValidateConfig::deep();
1413 let report = run_validation(&conn, &config);
1414
1415 assert!(
1416 report.all_ok(),
1417 "All checks should pass on consistent fixture: {:#?}",
1418 report.checks.iter().filter(|c| !c.ok).collect::<Vec<_>>()
1419 );
1420 assert!(report.drift.is_empty());
1421 }
1422
1423 #[test]
1424 fn cross_track_drift_detected() {
1425 let conn = setup_both_tracks_fixture();
1426
1427 conn.execute("DELETE FROM token_usage WHERE id = 1")
1429 .unwrap();
1430 conn.execute("UPDATE token_daily_stats SET grand_total_tokens = 0 WHERE day_id = 20254")
1432 .unwrap();
1433
1434 let config = ValidateConfig::deep();
1435 let report = run_validation(&conn, &config);
1436
1437 let drift_check = report
1438 .checks
1439 .iter()
1440 .find(|c| c.id == "cross_track.drift")
1441 .expect("should have cross-track drift check");
1442 assert!(!drift_check.ok, "Should detect cross-track drift");
1444 assert!(!report.drift.is_empty());
1445 assert_eq!(report.drift[0].track_a_total, 880);
1446 assert_eq!(report.drift[0].track_b_total, 0);
1447 }
1448
1449 #[test]
1450 fn negative_counters_detected() {
1451 let conn = setup_track_a_fixture();
1452
1453 conn.execute("UPDATE usage_daily SET tool_call_count = -5 WHERE day_id = 20254")
1455 .unwrap();
1456
1457 let config = ValidateConfig::deep();
1458 let report = run_validation(&conn, &config);
1459
1460 let neg_check = report
1461 .checks
1462 .iter()
1463 .find(|c| c.id == "track_a.non_negative_counters")
1464 .expect("should have non-negative check");
1465 assert!(!neg_check.ok, "Should detect negative counters");
1466 }
1467
1468 #[test]
1469 fn coverage_exceeding_message_count_detected() {
1470 let conn = setup_track_a_fixture();
1471
1472 conn.execute(
1474 "UPDATE usage_daily SET api_coverage_message_count = 999 WHERE day_id = 20254",
1475 )
1476 .unwrap();
1477
1478 let config = ValidateConfig::deep();
1479 let report = run_validation(&conn, &config);
1480
1481 let cov_check = report
1482 .checks
1483 .iter()
1484 .find(|c| c.id == "track_a.coverage_lte_messages")
1485 .expect("should have coverage <= messages check");
1486 assert!(!cov_check.ok);
1487 }
1488
1489 #[test]
1490 fn empty_database_reports_missing_tables() {
1491 let conn = Connection::open(":memory:").unwrap();
1492 let config = ValidateConfig::default();
1493 let report = run_validation(&conn, &config);
1494
1495 let errors: Vec<_> = report
1497 .checks
1498 .iter()
1499 .filter(|c| !c.ok && c.severity == Severity::Error)
1500 .collect();
1501 assert!(!errors.is_empty());
1502 }
1503
1504 #[test]
1505 fn sample_mode_limits_buckets() {
1506 let conn = setup_track_a_fixture();
1507 let config = ValidateConfig {
1508 sample_buckets: 1,
1509 ..Default::default()
1510 };
1511 let report = run_validation(&conn, &config);
1512
1513 assert_eq!(report._meta.sampling.mode, "sample");
1514 assert!(report._meta.sampling.buckets_checked <= 1);
1516 }
1517
1518 #[test]
1519 fn deep_mode_scans_all() {
1520 let conn = setup_track_a_fixture();
1521 let config = ValidateConfig::deep();
1522 let report = run_validation(&conn, &config);
1523
1524 assert_eq!(report._meta.sampling.mode, "deep");
1525 }
1526
1527 #[test]
1528 fn report_json_shape() {
1529 let conn = setup_track_a_fixture();
1530 let config = ValidateConfig::deep();
1531 let report = run_validation(&conn, &config);
1532 let json = report.to_json();
1533
1534 assert!(json["checks"].is_array());
1535 assert!(json["drift"].is_array());
1536 assert!(json["_meta"]["elapsed_ms"].is_number());
1537 assert!(json["_meta"]["sampling"]["mode"].is_string());
1538 }
1539
1540 #[test]
1541 fn perf_query_guardrail_completes() {
1542 let conn = setup_track_a_fixture();
1543 let m = perf_query_guardrail(&conn);
1544 assert!(
1545 m.error.is_none(),
1546 "timeseries guardrail should complete: {}",
1547 m.details
1548 );
1549 assert_eq!(m.id, "perf.query_timeseries");
1550 assert_eq!(m.budget_ms, 500);
1551 assert!(m.details.contains("Timeseries rollup query"));
1552 }
1553
1554 #[test]
1555 fn perf_breakdown_guardrail_completes() {
1556 let conn = setup_track_a_fixture();
1557 let m = perf_breakdown_guardrail(&conn);
1558 assert!(
1559 m.error.is_none(),
1560 "breakdown guardrail should complete: {}",
1561 m.details
1562 );
1563 assert_eq!(m.id, "perf.query_breakdown");
1564 assert_eq!(m.budget_ms, 200);
1565 assert!(m.details.contains("Breakdown query"));
1566 }
1567
1568 #[test]
1569 fn perf_query_guardrail_reports_query_failure() {
1570 let conn = Connection::open(":memory:").unwrap();
1571 conn.execute_batch("CREATE TABLE usage_daily (message_count INTEGER);")
1572 .unwrap();
1573
1574 let m = perf_query_guardrail(&conn);
1575 assert!(!m.within_budget);
1576 assert!(m.error.is_some());
1577 assert!(m.details.contains("failed"));
1578 }
1579
1580 #[test]
1581 fn perf_breakdown_guardrail_reports_query_failure() {
1582 let conn = Connection::open(":memory:").unwrap();
1583 conn.execute_batch("CREATE TABLE usage_daily (api_tokens_total INTEGER);")
1584 .unwrap();
1585
1586 let m = perf_breakdown_guardrail(&conn);
1587 assert!(!m.within_budget);
1588 assert!(m.error.is_some());
1589 assert!(m.details.contains("failed"));
1590 }
1591
1592 #[test]
1593 fn malformed_track_a_schema_reports_query_failure() {
1594 let conn = Connection::open(":memory:").unwrap();
1595 conn.execute_batch(
1596 "CREATE TABLE message_metrics (day_id INTEGER);
1597 CREATE TABLE usage_daily (day_id INTEGER);",
1598 )
1599 .unwrap();
1600
1601 let (checks, checked, total) = validate_track_a(&conn, &ValidateConfig::deep());
1602 let failure = checks
1603 .iter()
1604 .find(|c| c.id == "track_a.query_exec")
1605 .expect("Track A query failure should be reported");
1606
1607 assert!(!failure.ok);
1608 assert_eq!(failure.severity, Severity::Error);
1609 assert_eq!(checked, 0);
1610 assert_eq!(total, 0);
1611 }
1612
1613 #[test]
1614 fn malformed_track_b_schema_reports_query_failure() {
1615 let conn = Connection::open(":memory:").unwrap();
1616 conn.execute_batch(
1617 "CREATE TABLE agents (id INTEGER PRIMARY KEY, slug TEXT NOT NULL UNIQUE);
1618 CREATE TABLE token_usage (day_id INTEGER, agent_id INTEGER, source_id TEXT, model_family TEXT);
1619 CREATE TABLE token_daily_stats (day_id INTEGER, agent_slug TEXT, source_id TEXT, model_family TEXT);",
1620 )
1621 .unwrap();
1622
1623 let (checks, checked, total) = validate_track_b(&conn, &ValidateConfig::deep());
1624 let failure = checks
1625 .iter()
1626 .find(|c| c.id == "track_b.query_exec")
1627 .expect("Track B query failure should be reported");
1628
1629 assert!(!failure.ok);
1630 assert_eq!(failure.severity, Severity::Error);
1631 assert_eq!(checked, 0);
1632 assert_eq!(total, 0);
1633 }
1634
1635 #[test]
1636 fn malformed_cross_track_schema_reports_query_failure() {
1637 let conn = Connection::open(":memory:").unwrap();
1638 conn.execute_batch(
1639 "CREATE TABLE usage_daily (day_id INTEGER);
1640 CREATE TABLE token_daily_stats (day_id INTEGER);",
1641 )
1642 .unwrap();
1643
1644 let (checks, drift) = validate_cross_track_drift(&conn, &ValidateConfig::deep());
1645 let failure = checks
1646 .iter()
1647 .find(|c| c.id == "cross_track.query_exec")
1648 .expect("Cross-track query failure should be reported");
1649
1650 assert!(!failure.ok);
1651 assert_eq!(failure.severity, Severity::Error);
1652 assert!(drift.is_empty());
1653 }
1654
1655 #[test]
1656 fn repair_plan_marks_track_a_failures_fixable() {
1657 let conn = setup_track_a_fixture();
1658 conn.execute("UPDATE usage_daily SET message_count = 999 WHERE day_id = 20254")
1659 .unwrap();
1660
1661 let report = run_validation(&conn, &ValidateConfig::deep());
1662 let plan = build_repair_plan(&report);
1663
1664 let track_a = plan
1665 .decisions
1666 .iter()
1667 .find(|decision| decision.kind == RepairKind::RebuildTrackA)
1668 .expect("track a repair decision");
1669 assert!(plan.apply_track_a_rebuild);
1670 assert!(track_a.fixable);
1671 assert!(
1672 track_a
1673 .check_ids
1674 .contains(&"track_a.message_count_match".to_string())
1675 );
1676 }
1677
1678 #[test]
1679 fn repair_plan_marks_track_b_data_drift_as_rebuild_track_b() {
1680 let conn = setup_both_tracks_fixture();
1686 conn.execute("DELETE FROM token_daily_stats").unwrap();
1687
1688 let report = run_validation(&conn, &ValidateConfig::deep());
1689 let plan = build_repair_plan(&report);
1690
1691 let rebuild_b = plan
1692 .decisions
1693 .iter()
1694 .find(|decision| decision.kind == RepairKind::RebuildTrackB)
1695 .expect("track-b rebuild decision");
1696 assert!(!plan.apply_track_a_rebuild);
1697 assert!(plan.apply_track_b_rebuild);
1698 assert!(rebuild_b.fixable);
1699 assert!(
1700 rebuild_b
1701 .check_ids
1702 .contains(&"track_b.has_data".to_string())
1703 );
1704 }
1705
1706 #[test]
1707 fn repair_plan_marks_track_b_tables_missing_as_unavailable() {
1708 let conn = setup_both_tracks_fixture();
1713 conn.execute("DROP TABLE token_usage").unwrap();
1714
1715 let report = run_validation(&conn, &ValidateConfig::deep());
1716 let plan = build_repair_plan(&report);
1717
1718 let unavailable = plan
1719 .decisions
1720 .iter()
1721 .find(|decision| decision.kind == RepairKind::TrackAllRebuildUnavailable)
1722 .expect("track-all unavailable decision when ledger missing");
1723 assert!(!plan.apply_track_a_rebuild);
1724 assert!(!plan.apply_track_b_rebuild);
1725 assert!(!unavailable.fixable);
1726 assert!(
1727 unavailable
1728 .check_ids
1729 .contains(&"track_b.tables_exist".to_string())
1730 );
1731 }
1732
1733 #[test]
1734 fn repair_plan_marks_track_a_only_drift_as_fixable() {
1735 let report = ValidationReport {
1736 checks: vec![Check {
1737 id: "cross_track.drift".into(),
1738 ok: false,
1739 severity: Severity::Warning,
1740 details: "drift found".into(),
1741 suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
1742 }],
1743 drift: vec![DriftEntry {
1744 day_id: 20254,
1745 agent_slug: "codex".into(),
1746 source_id: "local".into(),
1747 track_a_total: 0,
1748 track_b_total: 123,
1749 delta: -123,
1750 delta_pct: 100.0,
1751 likely_cause:
1752 "Track B higher — Track A may have been rebuilt recently without all data"
1753 .into(),
1754 }],
1755 _meta: ReportMeta {
1756 elapsed_ms: 1,
1757 sampling: SamplingMeta {
1758 buckets_checked: 1,
1759 buckets_total: 1,
1760 mode: "deep".into(),
1761 },
1762 path: "rollup".into(),
1763 },
1764 };
1765
1766 let plan = build_repair_plan(&report);
1767 assert!(plan.apply_track_a_rebuild);
1768 assert_eq!(plan.decisions.len(), 1);
1769 assert_eq!(plan.decisions[0].kind, RepairKind::RebuildTrackA);
1770 }
1771}