1use frankensqlite::Connection;
13use frankensqlite::Row;
14use frankensqlite::compat::{ConnectionExt, RowExt};
15use serde::Serialize;
16use std::collections::BTreeMap;
17
18use super::query::table_exists;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
26#[serde(rename_all = "lowercase")]
27pub enum Severity {
28 Info,
29 Warning,
30 Error,
31}
32
33#[derive(Debug, Clone, Serialize)]
35pub struct Check {
36 pub id: String,
37 pub ok: bool,
38 pub severity: Severity,
39 pub details: String,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub suggested_action: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize)]
46pub struct DriftEntry {
47 pub day_id: i64,
48 pub agent_slug: String,
49 pub source_id: String,
50 pub track_a_total: i64,
51 pub track_b_total: i64,
52 pub delta: i64,
53 pub delta_pct: f64,
54 pub likely_cause: String,
55}
56
57#[derive(Debug, Clone, Serialize)]
59pub struct SamplingMeta {
60 pub buckets_checked: usize,
61 pub buckets_total: usize,
62 pub mode: String, }
64
65#[derive(Debug, Clone, Serialize)]
67pub struct ReportMeta {
68 pub elapsed_ms: u64,
69 pub sampling: SamplingMeta,
70 pub path: String,
71}
72
73#[derive(Debug, Clone, Serialize)]
75pub struct ValidationReport {
76 pub checks: Vec<Check>,
77 pub drift: Vec<DriftEntry>,
78 pub _meta: ReportMeta,
79}
80
81impl ValidationReport {
82 pub fn all_ok(&self) -> bool {
84 self.checks.iter().all(|c| c.ok)
85 }
86
87 pub fn count_failures(&self, sev: Severity) -> usize {
89 self.checks
90 .iter()
91 .filter(|c| !c.ok && c.severity == sev)
92 .count()
93 }
94
95 pub fn to_json(&self) -> serde_json::Value {
97 serde_json::to_value(self).unwrap_or(serde_json::json!({"error": "serialization failed"}))
98 }
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
103#[serde(rename_all = "snake_case")]
104pub enum RepairKind {
105 RebuildTrackA,
106 RebuildTrackB,
112 TrackAllRebuildUnavailable,
118 ManualReview,
119}
120
121#[derive(Debug, Clone, Serialize)]
123pub struct RepairDecision {
124 pub kind: RepairKind,
125 pub fixable: bool,
126 pub check_ids: Vec<String>,
127 pub reason: String,
128}
129
130#[derive(Debug, Clone, Serialize)]
132pub struct RepairPlan {
133 pub apply_track_a_rebuild: bool,
134 pub apply_track_b_rebuild: bool,
138 pub decisions: Vec<RepairDecision>,
139}
140
141pub fn build_repair_plan(report: &ValidationReport) -> RepairPlan {
143 let mut grouped: BTreeMap<RepairKind, Vec<String>> = BTreeMap::new();
144
145 for check in report.checks.iter().filter(|check| !check.ok) {
146 let kind = classify_repair_kind(check, report);
147 grouped.entry(kind).or_default().push(check.id.clone());
148 }
149
150 let decisions = grouped
151 .into_iter()
152 .map(|(kind, mut check_ids)| {
153 check_ids.sort();
154 RepairDecision {
155 fixable: matches!(kind, RepairKind::RebuildTrackA | RepairKind::RebuildTrackB),
156 reason: repair_reason(kind).into(),
157 kind,
158 check_ids,
159 }
160 })
161 .collect::<Vec<_>>();
162
163 let apply_track_a_rebuild = decisions
164 .iter()
165 .any(|decision| decision.kind == RepairKind::RebuildTrackA);
166 let apply_track_b_rebuild = decisions
167 .iter()
168 .any(|decision| decision.kind == RepairKind::RebuildTrackB);
169
170 RepairPlan {
171 apply_track_a_rebuild,
172 apply_track_b_rebuild,
173 decisions,
174 }
175}
176
177fn classify_repair_kind(check: &Check, report: &ValidationReport) -> RepairKind {
178 if check.id.starts_with("track_a.") {
179 return RepairKind::RebuildTrackA;
180 }
181
182 if check.id == "cross_track.drift" {
183 if report.drift.iter().all(|entry| {
184 entry.likely_cause.starts_with("Track A missing rows")
185 || entry.likely_cause.starts_with("Track B higher")
186 }) {
187 return RepairKind::RebuildTrackA;
188 }
189 return RepairKind::TrackAllRebuildUnavailable;
190 }
191
192 if check.id.starts_with("track_b.") {
193 match check.id.as_str() {
203 "track_b.tables_exist" | "track_b.agents_table_missing" | "track_b.query_exec" => {
208 RepairKind::TrackAllRebuildUnavailable
209 }
210 _ => RepairKind::RebuildTrackB,
215 }
216 } else {
217 RepairKind::ManualReview
218 }
219}
220
221fn repair_reason(kind: RepairKind) -> &'static str {
222 match kind {
223 RepairKind::RebuildTrackA => {
224 "Track A rollups are derivable from raw messages and can be rebuilt safely."
225 }
226 RepairKind::RebuildTrackB => {
227 "Track B rollups are derivable from the intact token_usage ledger and can be rebuilt safely via rebuild_token_daily_stats()."
228 }
229 RepairKind::TrackAllRebuildUnavailable => {
230 "Track B ledger or cross-track precondition is missing; a full canonical replay is required and is not implemented by --fix. Run 'cass doctor check --json' and restore or repair the canonical archive before rebuilding derived assets."
231 }
232 RepairKind::ManualReview => {
233 "This validation failure does not have a proven automatic repair path."
234 }
235 }
236}
237
238#[derive(Debug, Clone)]
244pub struct ValidateConfig {
245 pub sample_buckets: usize,
248 pub drift_abs_threshold: i64,
250 pub drift_pct_threshold: f64,
252}
253
254impl Default for ValidateConfig {
255 fn default() -> Self {
256 Self {
257 sample_buckets: 20,
258 drift_abs_threshold: 10,
259 drift_pct_threshold: 1.0,
260 }
261 }
262}
263
264impl ValidateConfig {
265 pub fn deep() -> Self {
267 Self {
268 sample_buckets: 0,
269 ..Default::default()
270 }
271 }
272}
273
274pub fn run_validation(conn: &Connection, config: &ValidateConfig) -> ValidationReport {
280 let start = std::time::Instant::now();
281 let mut checks = Vec::new();
282 let mut buckets_checked: usize = 0;
283 let mut buckets_total: usize = 0;
284
285 let (a_checks, a_checked, a_total) = validate_track_a(conn, config);
287 checks.extend(a_checks);
288 buckets_checked += a_checked;
289 buckets_total += a_total;
290
291 let (b_checks, b_checked, b_total) = validate_track_b(conn, config);
293 checks.extend(b_checks);
294 buckets_checked += b_checked;
295 buckets_total += b_total;
296
297 let (d_checks, d_entries) = validate_cross_track_drift(conn, config);
299 checks.extend(d_checks);
300 let drift = d_entries;
301
302 checks.extend(validate_non_negative_counters(conn));
304
305 let elapsed_ms = start.elapsed().as_millis() as u64;
306 let mode = if config.sample_buckets == 0 {
307 "deep"
308 } else {
309 "sample"
310 };
311
312 ValidationReport {
313 checks,
314 drift,
315 _meta: ReportMeta {
316 elapsed_ms,
317 sampling: SamplingMeta {
318 buckets_checked,
319 buckets_total,
320 mode: mode.into(),
321 },
322 path: "rollup".into(),
323 },
324 }
325}
326
327fn query_executes(conn: &Connection, sql: &str) -> Result<(), String> {
328 conn.query_map_collect(sql, &[], |_row: &Row| Ok(()))
329 .map(|_| ())
330 .map_err(|err| err.to_string())
331}
332
333fn query_exec_error_check(id: &str, details: String, suggested_action: &str) -> Check {
334 Check {
335 id: id.into(),
336 ok: false,
337 severity: Severity::Error,
338 details,
339 suggested_action: Some(suggested_action.into()),
340 }
341}
342
343fn validate_track_a(conn: &Connection, config: &ValidateConfig) -> (Vec<Check>, usize, usize) {
351 let mut checks = Vec::new();
352
353 if !table_exists(conn, "usage_daily") || !table_exists(conn, "message_metrics") {
354 checks.push(Check {
355 id: "track_a.tables_exist".into(),
356 ok: false,
357 severity: Severity::Error,
358 details: "Track A tables missing (usage_daily or message_metrics)".into(),
359 suggested_action: Some("Run 'cass analytics rebuild'".into()),
360 });
361 return (checks, 0, 0);
362 }
363
364 checks.push(Check {
365 id: "track_a.tables_exist".into(),
366 ok: true,
367 severity: Severity::Info,
368 details: "Track A tables exist".into(),
369 suggested_action: None,
370 });
371
372 let total_buckets: usize = conn
374 .query_row_map("SELECT COUNT(*) FROM usage_daily", &[], |r: &Row| {
375 r.get_typed::<i64>(0).map(|v| v as usize)
376 })
377 .unwrap_or(0);
378
379 let limit_clause = if config.sample_buckets > 0 {
380 format!("LIMIT {}", config.sample_buckets)
381 } else {
382 String::new()
383 };
384
385 let sql = format!(
387 "SELECT ud.day_id, ud.agent_slug, ud.workspace_id, ud.source_id,
388 ud.content_tokens_est_total,
389 COALESCE(mm.sum_content, 0),
390 ud.message_count,
391 COALESCE(mm.sum_msgs, 0),
392 ud.api_tokens_total,
393 COALESCE(mm.sum_api, 0),
394 ud.api_coverage_message_count,
395 COALESCE(mm.sum_api_coverage, 0)
396 FROM usage_daily ud
397 LEFT JOIN (
398 SELECT day_id, agent_slug, workspace_id, source_id,
399 SUM(content_tokens_est) AS sum_content,
400 COUNT(*) AS sum_msgs,
401 SUM(CASE WHEN api_data_source = 'api'
402 THEN COALESCE(api_input_tokens, 0)
403 + COALESCE(api_output_tokens, 0)
404 + COALESCE(api_cache_read_tokens, 0)
405 + COALESCE(api_cache_creation_tokens, 0)
406 + COALESCE(api_thinking_tokens, 0)
407 ELSE 0 END) AS sum_api,
408 SUM(CASE WHEN api_data_source = 'api' THEN 1 ELSE 0 END) AS sum_api_coverage
409 FROM message_metrics
410 GROUP BY day_id, agent_slug, workspace_id, source_id
411 ) mm ON ud.day_id = mm.day_id
412 AND ud.agent_slug = mm.agent_slug
413 AND ud.workspace_id = mm.workspace_id
414 AND ud.source_id = mm.source_id
415 ORDER BY ud.day_id DESC
416 {limit_clause}"
417 );
418
419 if total_buckets == 0 {
420 if let Err(err) = query_executes(conn, &sql) {
421 checks.push(query_exec_error_check(
422 "track_a.query_exec",
423 format!("Track A invariant query failed: {err}"),
424 "Run 'cass analytics rebuild --track a' or verify the analytics schema",
425 ));
426 return (checks, 0, 0);
427 }
428
429 checks.push(Check {
430 id: "track_a.has_data".into(),
431 ok: false,
432 severity: Severity::Warning,
433 details: "usage_daily is empty".into(),
434 suggested_action: Some("Run 'cass analytics rebuild'".into()),
435 });
436 return (checks, 0, 0);
437 }
438
439 let mut mismatches_content = 0_usize;
440 let mut mismatches_msg_count = 0_usize;
441 let mut mismatches_api = 0_usize;
442 let mut mismatches_api_cov = 0_usize;
443 let mut checked = 0_usize;
444
445 let rows = match conn.query_map_collect(&sql, &[], |row: &Row| {
446 Ok((
447 row.get_typed::<i64>(0)?, row.get_typed::<String>(1)?, row.get_typed::<i64>(4)?, row.get_typed::<i64>(5)?, row.get_typed::<i64>(6)?, row.get_typed::<i64>(7)?, row.get_typed::<i64>(8)?, row.get_typed::<i64>(9)?, row.get_typed::<i64>(10)?, row.get_typed::<i64>(11)?, ))
458 }) {
459 Ok(rows) => rows,
460 Err(err) => {
461 checks.push(query_exec_error_check(
462 "track_a.query_exec",
463 format!("Track A invariant query failed: {err}"),
464 "Run 'cass analytics rebuild --track a' or verify the analytics schema",
465 ));
466 return (checks, 0, total_buckets);
467 }
468 };
469
470 for row in rows {
471 checked += 1;
472 let (
473 _day_id,
474 _agent,
475 ud_content,
476 mm_content,
477 ud_msgs,
478 mm_msgs,
479 ud_api,
480 mm_api,
481 ud_cov,
482 mm_cov,
483 ) = row;
484 if ud_content != mm_content {
485 mismatches_content += 1;
486 }
487 if ud_msgs != mm_msgs {
488 mismatches_msg_count += 1;
489 }
490 if ud_api != mm_api {
491 mismatches_api += 1;
492 }
493 if ud_cov != mm_cov {
494 mismatches_api_cov += 1;
495 }
496 }
497
498 checks.push(Check {
500 id: "track_a.content_tokens_match".into(),
501 ok: mismatches_content == 0,
502 severity: if mismatches_content > 0 {
503 Severity::Error
504 } else {
505 Severity::Info
506 },
507 details: format!(
508 "content_tokens_est_total: {mismatches_content}/{checked} buckets mismatched"
509 ),
510 suggested_action: if mismatches_content > 0 {
511 Some("Run 'cass analytics rebuild --track a'".into())
512 } else {
513 None
514 },
515 });
516
517 checks.push(Check {
519 id: "track_a.message_count_match".into(),
520 ok: mismatches_msg_count == 0,
521 severity: if mismatches_msg_count > 0 {
522 Severity::Error
523 } else {
524 Severity::Info
525 },
526 details: format!("message_count: {mismatches_msg_count}/{checked} buckets mismatched"),
527 suggested_action: if mismatches_msg_count > 0 {
528 Some("Run 'cass analytics rebuild --track a'".into())
529 } else {
530 None
531 },
532 });
533
534 checks.push(Check {
536 id: "track_a.api_tokens_match".into(),
537 ok: mismatches_api == 0,
538 severity: if mismatches_api > 0 {
539 Severity::Error
540 } else {
541 Severity::Info
542 },
543 details: format!("api_tokens_total: {mismatches_api}/{checked} buckets mismatched"),
544 suggested_action: if mismatches_api > 0 {
545 Some("Run 'cass analytics rebuild --track a'".into())
546 } else {
547 None
548 },
549 });
550
551 checks.push(Check {
553 id: "track_a.api_coverage_match".into(),
554 ok: mismatches_api_cov == 0,
555 severity: if mismatches_api_cov > 0 {
556 Severity::Warning
557 } else {
558 Severity::Info
559 },
560 details: format!(
561 "api_coverage_message_count: {mismatches_api_cov}/{checked} buckets mismatched"
562 ),
563 suggested_action: if mismatches_api_cov > 0 {
564 Some("Run 'cass analytics rebuild --track a'".into())
565 } else {
566 None
567 },
568 });
569
570 (checks, checked, total_buckets)
571}
572
573fn validate_track_b(conn: &Connection, config: &ValidateConfig) -> (Vec<Check>, usize, usize) {
579 let mut checks = Vec::new();
580
581 if !table_exists(conn, "token_daily_stats") || !table_exists(conn, "token_usage") {
582 checks.push(Check {
583 id: "track_b.tables_exist".into(),
584 ok: false,
585 severity: Severity::Error,
586 details: "Track B tables missing (token_daily_stats or token_usage)".into(),
587 suggested_action: Some(
588 "Run 'cass analytics rebuild --track all' (requires z9fse.13)".into(),
589 ),
590 });
591 return (checks, 0, 0);
592 }
593
594 checks.push(Check {
595 id: "track_b.tables_exist".into(),
596 ok: true,
597 severity: Severity::Info,
598 details: "Track B tables exist".into(),
599 suggested_action: None,
600 });
601
602 let total_buckets: usize = conn
603 .query_row_map("SELECT COUNT(*) FROM token_daily_stats", &[], |r: &Row| {
604 r.get_typed::<i64>(0).map(|v| v as usize)
605 })
606 .unwrap_or(0);
607
608 let limit_clause = if config.sample_buckets > 0 {
609 format!("LIMIT {}", config.sample_buckets)
610 } else {
611 String::new()
612 };
613
614 let has_agents_table = table_exists(conn, "agents");
617
618 let sql = if has_agents_table {
619 format!(
620 "SELECT tds.day_id, tds.agent_slug, tds.source_id, tds.model_family,
621 tds.grand_total_tokens,
622 COALESCE(tu.sum_total, 0),
623 tds.total_tool_calls,
624 COALESCE(tu.sum_tools, 0),
625 tds.api_call_count,
626 COALESCE(tu.sum_rows, 0)
627 FROM token_daily_stats tds
628 LEFT JOIN (
629 SELECT t.day_id,
630 a.slug AS agent_slug,
631 t.source_id,
632 COALESCE(t.model_family, 'unknown') AS model_family,
633 SUM(COALESCE(t.total_tokens, 0)) AS sum_total,
634 SUM(t.tool_call_count) AS sum_tools,
635 COUNT(*) AS sum_rows
636 FROM token_usage t
637 JOIN agents a ON a.id = t.agent_id
638 GROUP BY t.day_id, a.slug, t.source_id, COALESCE(t.model_family, 'unknown')
639 ) tu ON tds.day_id = tu.day_id
640 AND tds.agent_slug = tu.agent_slug
641 AND tds.source_id = tu.source_id
642 AND tds.model_family = tu.model_family
643 ORDER BY tds.day_id DESC
644 {limit_clause}"
645 )
646 } else {
647 checks.push(Check {
649 id: "track_b.agents_table_missing".into(),
650 ok: false,
651 severity: Severity::Warning,
652 details: "agents table not found — cannot validate Track B granular invariants".into(),
653 suggested_action: None,
654 });
655 return (checks, 0, total_buckets);
656 };
657
658 if total_buckets == 0 {
659 if let Err(err) = query_executes(conn, &sql) {
660 checks.push(query_exec_error_check(
661 "track_b.query_exec",
662 format!("Track B invariant query failed: {err}"),
663 "Run 'cass analytics rebuild --track all' or verify the analytics schema",
664 ));
665 return (checks, 0, 0);
666 }
667
668 checks.push(Check {
669 id: "track_b.has_data".into(),
670 ok: false,
671 severity: Severity::Warning,
672 details: "token_daily_stats is empty".into(),
673 suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
674 });
675 return (checks, 0, 0);
676 }
677
678 let mut mismatches_total = 0_usize;
679 let mut mismatches_tools = 0_usize;
680 let mut checked = 0_usize;
681
682 let rows = match conn.query_map_collect(&sql, &[], |row: &Row| {
683 Ok((
684 row.get_typed::<i64>(4)?, row.get_typed::<i64>(5)?, row.get_typed::<i64>(6)?, row.get_typed::<i64>(7)?, ))
689 }) {
690 Ok(rows) => rows,
691 Err(err) => {
692 checks.push(query_exec_error_check(
693 "track_b.query_exec",
694 format!("Track B invariant query failed: {err}"),
695 "Run 'cass analytics rebuild --track all' or verify the analytics schema",
696 ));
697 return (checks, 0, total_buckets);
698 }
699 };
700
701 for row in rows {
702 checked += 1;
703 let (tds_total, tu_total, tds_tools, tu_tools) = row;
704 if tds_total != tu_total {
705 mismatches_total += 1;
706 }
707 if tds_tools != tu_tools {
708 mismatches_tools += 1;
709 }
710 }
711
712 checks.push(Check {
713 id: "track_b.grand_total_match".into(),
714 ok: mismatches_total == 0,
715 severity: if mismatches_total > 0 {
716 Severity::Error
717 } else {
718 Severity::Info
719 },
720 details: format!("grand_total_tokens: {mismatches_total}/{checked} buckets mismatched"),
721 suggested_action: if mismatches_total > 0 {
722 Some("Run 'cass analytics rebuild --track all'".into())
723 } else {
724 None
725 },
726 });
727
728 checks.push(Check {
729 id: "track_b.tool_calls_match".into(),
730 ok: mismatches_tools == 0,
731 severity: if mismatches_tools > 0 {
732 Severity::Warning
733 } else {
734 Severity::Info
735 },
736 details: format!("total_tool_calls: {mismatches_tools}/{checked} buckets mismatched"),
737 suggested_action: if mismatches_tools > 0 {
738 Some("Run 'cass analytics rebuild --track all'".into())
739 } else {
740 None
741 },
742 });
743
744 (checks, checked, total_buckets)
745}
746
747fn validate_cross_track_drift(
753 conn: &Connection,
754 config: &ValidateConfig,
755) -> (Vec<Check>, Vec<DriftEntry>) {
756 let mut checks = Vec::new();
757 let mut entries = Vec::new();
758
759 let has_a = table_exists(conn, "usage_daily");
760 let has_b = table_exists(conn, "token_daily_stats");
761
762 if !has_a || !has_b {
763 let missing = if !has_a && !has_b {
764 "both tracks"
765 } else if !has_a {
766 "Track A (usage_daily)"
767 } else {
768 "Track B (token_daily_stats)"
769 };
770 checks.push(Check {
771 id: "cross_track.tables_exist".into(),
772 ok: false,
773 severity: Severity::Warning,
774 details: format!("Cannot compute cross-track drift: {missing} missing"),
775 suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
776 });
777 return (checks, entries);
778 }
779
780 let mut drift_count = 0_usize;
781 let mut drift_checked = 0_usize;
782 let mut merged = BTreeMap::<(i64, String, String), (i64, i64)>::new();
783
784 let track_a_rows = match conn.query_map_collect(
785 "SELECT day_id, agent_slug, source_id, SUM(api_tokens_total) AS api_total
786 FROM usage_daily
787 GROUP BY day_id, agent_slug, source_id",
788 &[],
789 |row: &Row| {
790 Ok((
791 row.get_typed::<i64>(0)?,
792 row.get_typed::<String>(1)?,
793 row.get_typed::<String>(2)?,
794 row.get_typed::<i64>(3)?,
795 ))
796 },
797 ) {
798 Ok(rows) => rows,
799 Err(err) => {
800 checks.push(Check {
801 id: "cross_track.query_exec".into(),
802 ok: false,
803 severity: Severity::Error,
804 details: format!("Cross-track drift query failed while reading Track A: {err}"),
805 suggested_action: Some(
806 "Run 'cass analytics rebuild --track all' or verify the analytics schema"
807 .into(),
808 ),
809 });
810 return (checks, entries);
811 }
812 };
813
814 for (day_id, agent_slug, source_id, total) in track_a_rows {
815 merged
816 .entry((day_id, agent_slug, source_id))
817 .or_insert((0, 0))
818 .0 = total;
819 }
820
821 let track_b_rows = match conn.query_map_collect(
822 "SELECT day_id, agent_slug, source_id, SUM(grand_total_tokens) AS grand_total
823 FROM token_daily_stats
824 GROUP BY day_id, agent_slug, source_id",
825 &[],
826 |row: &Row| {
827 Ok((
828 row.get_typed::<i64>(0)?,
829 row.get_typed::<String>(1)?,
830 row.get_typed::<String>(2)?,
831 row.get_typed::<i64>(3)?,
832 ))
833 },
834 ) {
835 Ok(rows) => rows,
836 Err(err) => {
837 checks.push(Check {
838 id: "cross_track.query_exec".into(),
839 ok: false,
840 severity: Severity::Error,
841 details: format!("Cross-track drift query failed while reading Track B: {err}"),
842 suggested_action: Some(
843 "Run 'cass analytics rebuild --track all' or verify the analytics schema"
844 .into(),
845 ),
846 });
847 return (checks, entries);
848 }
849 };
850
851 for (day_id, agent_slug, source_id, total) in track_b_rows {
852 merged
853 .entry((day_id, agent_slug, source_id))
854 .or_insert((0, 0))
855 .1 = total;
856 }
857
858 let mut rows: Vec<_> = merged.into_iter().collect();
859 rows.sort_by(|left, right| {
860 right
861 .0
862 .0
863 .cmp(&left.0.0)
864 .then_with(|| left.0.1.cmp(&right.0.1))
865 .then_with(|| left.0.2.cmp(&right.0.2))
866 });
867 if config.sample_buckets > 0 && rows.len() > config.sample_buckets {
868 rows.truncate(config.sample_buckets);
869 }
870
871 for ((day_id, agent_slug, source_id), (a_total, b_total)) in rows {
872 drift_checked += 1;
873 let delta = a_total.saturating_sub(b_total);
874 let denom = a_total.max(b_total).max(1);
875 let abs_delta = delta.unsigned_abs();
876 let delta_pct = (abs_delta as f64 / denom as f64) * 100.0;
877
878 if abs_delta > config.drift_abs_threshold as u64 && delta_pct > config.drift_pct_threshold {
879 drift_count += 1;
880 let likely_cause = if a_total > 0 && b_total == 0 {
881 "Track B missing rows (rebuild needed or not yet ingested)"
882 } else if b_total > 0 && a_total == 0 {
883 "Track A missing rows (rebuild needed)"
884 } else if a_total > b_total {
885 "Track A higher — Track B may be stale or missing some messages"
886 } else {
887 "Track B higher — Track A may have been rebuilt recently without all data"
888 };
889
890 entries.push(DriftEntry {
891 day_id,
892 agent_slug,
893 source_id,
894 track_a_total: a_total,
895 track_b_total: b_total,
896 delta,
897 delta_pct: (delta_pct * 100.0).round() / 100.0,
898 likely_cause: likely_cause.into(),
899 });
900 }
901 }
902
903 let total_ok = drift_count == 0;
904 checks.push(Check {
905 id: "cross_track.drift".into(),
906 ok: total_ok,
907 severity: if drift_count > 0 {
908 Severity::Warning
909 } else {
910 Severity::Info
911 },
912 details: format!(
913 "Cross-track drift: {drift_count}/{drift_checked} day+agent+source slices drifted"
914 ),
915 suggested_action: if drift_count > 0 {
916 Some("Run 'cass analytics rebuild --track all' to re-sync both tracks".into())
917 } else {
918 None
919 },
920 });
921
922 (checks, entries)
923}
924
925fn validate_non_negative_counters(conn: &Connection) -> Vec<Check> {
931 let mut checks = Vec::new();
932
933 if table_exists(conn, "usage_daily") {
935 let cols = [
936 "message_count",
937 "user_message_count",
938 "assistant_message_count",
939 "tool_call_count",
940 "plan_message_count",
941 "api_coverage_message_count",
942 "content_tokens_est_total",
943 "api_tokens_total",
944 ];
945 let cond = cols
946 .iter()
947 .map(|c| format!("{c} < 0"))
948 .collect::<Vec<_>>()
949 .join(" OR ");
950 let sql = format!("SELECT COUNT(*) FROM usage_daily WHERE {cond}");
951 match conn.query_row_map(&sql, &[], |r: &Row| r.get_typed::<i64>(0)) {
952 Ok(negative_rows) => {
953 checks.push(Check {
954 id: "track_a.non_negative_counters".into(),
955 ok: negative_rows == 0,
956 severity: if negative_rows > 0 {
957 Severity::Error
958 } else {
959 Severity::Info
960 },
961 details: format!("usage_daily: {negative_rows} rows with negative counters"),
962 suggested_action: if negative_rows > 0 {
963 Some("Run 'cass analytics rebuild --track a'".into())
964 } else {
965 None
966 },
967 });
968 }
969 Err(err) => {
970 checks.push(Check {
971 id: "track_a.non_negative_counters".into(),
972 ok: false,
973 severity: Severity::Error,
974 details: format!("usage_daily negative-counter query failed: {err}"),
975 suggested_action: Some(
976 "Run 'cass analytics rebuild --track a' or verify the analytics schema"
977 .into(),
978 ),
979 });
980 }
981 }
982 }
983
984 if table_exists(conn, "usage_daily") {
986 match conn.query_row_map(
987 "SELECT COUNT(*) FROM usage_daily WHERE api_coverage_message_count > message_count",
988 &[],
989 |r: &Row| r.get_typed::<i64>(0),
990 ) {
991 Ok(bad) => {
992 checks.push(Check {
993 id: "track_a.coverage_lte_messages".into(),
994 ok: bad == 0,
995 severity: if bad > 0 {
996 Severity::Warning
997 } else {
998 Severity::Info
999 },
1000 details: format!(
1001 "usage_daily: {bad} rows where api_coverage_message_count > message_count"
1002 ),
1003 suggested_action: if bad > 0 {
1004 Some("Run 'cass analytics rebuild --track a'".into())
1005 } else {
1006 None
1007 },
1008 });
1009 }
1010 Err(err) => {
1011 checks.push(Check {
1012 id: "track_a.coverage_lte_messages".into(),
1013 ok: false,
1014 severity: Severity::Error,
1015 details: format!("usage_daily coverage query failed: {err}"),
1016 suggested_action: Some(
1017 "Run 'cass analytics rebuild --track a' or verify the analytics schema"
1018 .into(),
1019 ),
1020 });
1021 }
1022 }
1023 }
1024
1025 if table_exists(conn, "token_daily_stats") {
1027 let cols = [
1028 "api_call_count",
1029 "total_input_tokens",
1030 "total_output_tokens",
1031 "grand_total_tokens",
1032 "total_tool_calls",
1033 ];
1034 let cond = cols
1035 .iter()
1036 .map(|c| format!("{c} < 0"))
1037 .collect::<Vec<_>>()
1038 .join(" OR ");
1039 let sql = format!("SELECT COUNT(*) FROM token_daily_stats WHERE {cond}");
1040 match conn.query_row_map(&sql, &[], |r: &Row| r.get_typed::<i64>(0)) {
1041 Ok(negative_rows) => {
1042 checks.push(Check {
1043 id: "track_b.non_negative_counters".into(),
1044 ok: negative_rows == 0,
1045 severity: if negative_rows > 0 {
1046 Severity::Error
1047 } else {
1048 Severity::Info
1049 },
1050 details: format!(
1051 "token_daily_stats: {negative_rows} rows with negative counters"
1052 ),
1053 suggested_action: if negative_rows > 0 {
1054 Some("Run 'cass analytics rebuild --track all'".into())
1055 } else {
1056 None
1057 },
1058 });
1059 }
1060 Err(err) => {
1061 checks.push(Check {
1062 id: "track_b.non_negative_counters".into(),
1063 ok: false,
1064 severity: Severity::Error,
1065 details: format!("token_daily_stats negative-counter query failed: {err}"),
1066 suggested_action: Some(
1067 "Run 'cass analytics rebuild --track all' or verify the analytics schema"
1068 .into(),
1069 ),
1070 });
1071 }
1072 }
1073 }
1074
1075 checks
1076}
1077
1078#[derive(Debug, Clone, Serialize)]
1084pub struct PerfMeasurement {
1085 pub id: String,
1086 pub elapsed_ms: u64,
1087 pub budget_ms: u64,
1088 pub within_budget: bool,
1089 #[serde(skip_serializing_if = "Option::is_none")]
1090 pub error: Option<String>,
1091 pub details: String,
1092}
1093
1094pub fn perf_query_guardrail(conn: &Connection) -> PerfMeasurement {
1096 let start = std::time::Instant::now();
1097
1098 let budget_ms = 500_u64; if !table_exists(conn, "usage_daily") {
1101 let elapsed_ms = start.elapsed().as_millis() as u64;
1102 return PerfMeasurement {
1103 id: "perf.query_timeseries".into(),
1104 elapsed_ms,
1105 budget_ms,
1106 within_budget: true,
1107 error: None,
1108 details: "Skipped timeseries rollup query: usage_daily table missing".into(),
1109 };
1110 }
1111
1112 let sql = "SELECT COUNT(DISTINCT day_id) FROM usage_daily";
1113 let row_count = conn.query_row_map(sql, &[], |r: &Row| r.get_typed::<i64>(0));
1114 let elapsed_ms = start.elapsed().as_millis() as u64;
1115
1116 match row_count {
1117 Ok(row_count) => PerfMeasurement {
1118 id: "perf.query_timeseries".into(),
1119 elapsed_ms,
1120 budget_ms,
1121 within_budget: elapsed_ms <= budget_ms,
1122 error: None,
1123 details: format!("Timeseries rollup query: {row_count} day buckets in {elapsed_ms}ms"),
1124 },
1125 Err(err) => PerfMeasurement {
1126 id: "perf.query_timeseries".into(),
1127 elapsed_ms,
1128 budget_ms,
1129 within_budget: false,
1130 error: Some(err.to_string()),
1131 details: format!("Timeseries rollup query failed after {elapsed_ms}ms: {err}"),
1132 },
1133 }
1134}
1135
1136pub fn perf_breakdown_guardrail(conn: &Connection) -> PerfMeasurement {
1138 let start = std::time::Instant::now();
1139 let budget_ms = 200_u64;
1140
1141 if !table_exists(conn, "usage_daily") {
1142 let elapsed_ms = start.elapsed().as_millis() as u64;
1143 return PerfMeasurement {
1144 id: "perf.query_breakdown".into(),
1145 elapsed_ms,
1146 budget_ms,
1147 within_budget: true,
1148 error: None,
1149 details: "Skipped breakdown query: usage_daily table missing".into(),
1150 };
1151 }
1152
1153 let sql = "SELECT COUNT(DISTINCT agent_slug) FROM usage_daily";
1154 let row_count = conn.query_row_map(sql, &[], |r: &Row| r.get_typed::<i64>(0));
1155 let elapsed_ms = start.elapsed().as_millis() as u64;
1156
1157 match row_count {
1158 Ok(row_count) => PerfMeasurement {
1159 id: "perf.query_breakdown".into(),
1160 elapsed_ms,
1161 budget_ms,
1162 within_budget: elapsed_ms <= budget_ms,
1163 error: None,
1164 details: format!("Breakdown query: {row_count} agent groups in {elapsed_ms}ms"),
1165 },
1166 Err(err) => PerfMeasurement {
1167 id: "perf.query_breakdown".into(),
1168 elapsed_ms,
1169 budget_ms,
1170 within_budget: false,
1171 error: Some(err.to_string()),
1172 details: format!("Breakdown query failed after {elapsed_ms}ms: {err}"),
1173 },
1174 }
1175}
1176
1177#[cfg(test)]
1182mod tests {
1183 use super::*;
1184
1185 fn setup_track_a_fixture() -> Connection {
1189 let conn = Connection::open(":memory:").unwrap();
1190 conn.execute_batch(
1191 "CREATE TABLE message_metrics (
1192 message_id INTEGER PRIMARY KEY,
1193 created_at_ms INTEGER NOT NULL,
1194 hour_id INTEGER NOT NULL,
1195 day_id INTEGER NOT NULL,
1196 agent_slug TEXT NOT NULL,
1197 workspace_id INTEGER NOT NULL DEFAULT 0,
1198 source_id TEXT NOT NULL DEFAULT 'local',
1199 role TEXT NOT NULL,
1200 content_chars INTEGER NOT NULL,
1201 content_tokens_est INTEGER NOT NULL,
1202 api_input_tokens INTEGER,
1203 api_output_tokens INTEGER,
1204 api_cache_read_tokens INTEGER,
1205 api_cache_creation_tokens INTEGER,
1206 api_thinking_tokens INTEGER,
1207 api_service_tier TEXT,
1208 api_data_source TEXT NOT NULL DEFAULT 'estimated',
1209 tool_call_count INTEGER NOT NULL DEFAULT 0,
1210 has_tool_calls INTEGER NOT NULL DEFAULT 0,
1211 has_plan INTEGER NOT NULL DEFAULT 0
1212 );
1213 CREATE TABLE usage_daily (
1214 day_id INTEGER NOT NULL,
1215 agent_slug TEXT NOT NULL,
1216 workspace_id INTEGER NOT NULL DEFAULT 0,
1217 source_id TEXT NOT NULL DEFAULT 'local',
1218 message_count INTEGER NOT NULL DEFAULT 0,
1219 user_message_count INTEGER NOT NULL DEFAULT 0,
1220 assistant_message_count INTEGER NOT NULL DEFAULT 0,
1221 tool_call_count INTEGER NOT NULL DEFAULT 0,
1222 plan_message_count INTEGER NOT NULL DEFAULT 0,
1223 api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
1224 content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
1225 content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
1226 content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
1227 api_tokens_total INTEGER NOT NULL DEFAULT 0,
1228 api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
1229 api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
1230 api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
1231 api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
1232 api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
1233 last_updated INTEGER NOT NULL DEFAULT 0,
1234 PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
1235 );",
1236 )
1237 .unwrap();
1238
1239 conn.execute_batch(
1241 "INSERT INTO message_metrics VALUES
1242 (1, 1750000000000, 416666, 20254, 'claude_code', 1, 'local', 'user', 400, 100, NULL, NULL, NULL, NULL, NULL, NULL, 'estimated', 0, 0, 0),
1243 (2, 1750000000001, 416666, 20254, 'claude_code', 1, 'local', 'assistant', 800, 200, 500, 300, 50, 20, 10, NULL, 'api', 3, 1, 0),
1244 (3, 1750000000002, 416666, 20254, 'claude_code', 1, 'local', 'user', 600, 150, NULL, NULL, NULL, NULL, NULL, NULL, 'estimated', 0, 0, 0);
1245 INSERT INTO usage_daily VALUES
1246 (20254, 'claude_code', 1, 'local',
1247 3, 2, 1, 3, 0, 1,
1248 450, 250, 200,
1249 880, 500, 300, 50, 20, 10,
1250 0);",
1251 )
1252 .unwrap();
1253
1254 conn
1255 }
1256
1257 fn setup_both_tracks_fixture() -> Connection {
1259 let conn = setup_track_a_fixture();
1260
1261 conn.execute_batch(
1262 "CREATE TABLE agents (
1263 id INTEGER PRIMARY KEY,
1264 slug TEXT NOT NULL UNIQUE
1265 );
1266 INSERT INTO agents VALUES (1, 'claude_code');
1267
1268 CREATE TABLE token_usage (
1269 id INTEGER PRIMARY KEY AUTOINCREMENT,
1270 message_id INTEGER NOT NULL,
1271 conversation_id INTEGER NOT NULL,
1272 agent_id INTEGER NOT NULL,
1273 workspace_id INTEGER,
1274 source_id TEXT NOT NULL DEFAULT 'local',
1275 timestamp_ms INTEGER NOT NULL,
1276 day_id INTEGER NOT NULL,
1277 model_name TEXT,
1278 model_family TEXT,
1279 model_tier TEXT,
1280 service_tier TEXT,
1281 provider TEXT,
1282 input_tokens INTEGER,
1283 output_tokens INTEGER,
1284 cache_read_tokens INTEGER,
1285 cache_creation_tokens INTEGER,
1286 thinking_tokens INTEGER,
1287 total_tokens INTEGER,
1288 estimated_cost_usd REAL,
1289 role TEXT NOT NULL,
1290 content_chars INTEGER NOT NULL,
1291 has_tool_calls INTEGER NOT NULL DEFAULT 0,
1292 tool_call_count INTEGER NOT NULL DEFAULT 0,
1293 data_source TEXT NOT NULL DEFAULT 'api',
1294 UNIQUE(message_id)
1295 );
1296
1297 CREATE TABLE token_daily_stats (
1298 day_id INTEGER NOT NULL,
1299 agent_slug TEXT NOT NULL,
1300 source_id TEXT NOT NULL DEFAULT 'all',
1301 model_family TEXT NOT NULL DEFAULT 'all',
1302 api_call_count INTEGER NOT NULL DEFAULT 0,
1303 user_message_count INTEGER NOT NULL DEFAULT 0,
1304 assistant_message_count INTEGER NOT NULL DEFAULT 0,
1305 tool_message_count INTEGER NOT NULL DEFAULT 0,
1306 total_input_tokens INTEGER NOT NULL DEFAULT 0,
1307 total_output_tokens INTEGER NOT NULL DEFAULT 0,
1308 total_cache_read_tokens INTEGER NOT NULL DEFAULT 0,
1309 total_cache_creation_tokens INTEGER NOT NULL DEFAULT 0,
1310 total_thinking_tokens INTEGER NOT NULL DEFAULT 0,
1311 grand_total_tokens INTEGER NOT NULL DEFAULT 0,
1312 total_content_chars INTEGER NOT NULL DEFAULT 0,
1313 total_tool_calls INTEGER NOT NULL DEFAULT 0,
1314 estimated_cost_usd REAL NOT NULL DEFAULT 0.0,
1315 session_count INTEGER NOT NULL DEFAULT 0,
1316 last_updated INTEGER NOT NULL,
1317 PRIMARY KEY (day_id, agent_slug, source_id, model_family)
1318 );
1319
1320 -- Insert matching token_usage for message 2 (the only api-sourced message).
1321 INSERT INTO token_usage VALUES
1322 (1, 2, 100, 1, 1, 'local', 1750000000001, 20254,
1323 'claude-opus-4', 'opus', 'opus', NULL, 'anthropic',
1324 500, 300, 50, 20, 10, 880, 0.05, 'assistant', 800, 1, 3, 'api');
1325
1326 -- Token daily stats matching the token_usage.
1327 INSERT INTO token_daily_stats VALUES
1328 (20254, 'claude_code', 'local', 'opus',
1329 1, 0, 1, 0,
1330 500, 300, 50, 20, 10, 880,
1331 800, 3, 0.05, 1, 0);",
1332 )
1333 .unwrap();
1334
1335 conn
1336 }
1337
1338 #[test]
1341 fn consistent_track_a_passes() {
1342 let conn = setup_track_a_fixture();
1343 let config = ValidateConfig::deep();
1344 let report = run_validation(&conn, &config);
1345
1346 let track_a_checks: Vec<_> = report
1348 .checks
1349 .iter()
1350 .filter(|c| c.id.starts_with("track_a."))
1351 .collect();
1352 assert!(!track_a_checks.is_empty());
1353 for c in &track_a_checks {
1354 assert!(c.ok, "Check {} failed: {}", c.id, c.details);
1355 }
1356 }
1357
1358 #[test]
1359 fn drifted_track_a_detects_mismatch() {
1360 let conn = setup_track_a_fixture();
1361
1362 conn.execute("UPDATE usage_daily SET content_tokens_est_total = 9999 WHERE day_id = 20254")
1364 .unwrap();
1365
1366 let config = ValidateConfig::deep();
1367 let report = run_validation(&conn, &config);
1368
1369 let content_check = report
1370 .checks
1371 .iter()
1372 .find(|c| c.id == "track_a.content_tokens_match")
1373 .expect("should have content tokens check");
1374 assert!(!content_check.ok, "Should detect content tokens mismatch");
1375 assert!(content_check.suggested_action.is_some());
1376 }
1377
1378 #[test]
1379 fn drifted_track_a_message_count_detected() {
1380 let conn = setup_track_a_fixture();
1381
1382 conn.execute("UPDATE usage_daily SET message_count = 999 WHERE day_id = 20254")
1384 .unwrap();
1385
1386 let config = ValidateConfig::deep();
1387 let report = run_validation(&conn, &config);
1388
1389 let msg_check = report
1390 .checks
1391 .iter()
1392 .find(|c| c.id == "track_a.message_count_match")
1393 .expect("should have message count check");
1394 assert!(!msg_check.ok);
1395 }
1396
1397 #[test]
1398 fn consistent_both_tracks_passes() {
1399 let conn = setup_both_tracks_fixture();
1400 let config = ValidateConfig::deep();
1401 let report = run_validation(&conn, &config);
1402
1403 assert!(
1404 report.all_ok(),
1405 "All checks should pass on consistent fixture: {:#?}",
1406 report.checks.iter().filter(|c| !c.ok).collect::<Vec<_>>()
1407 );
1408 assert!(report.drift.is_empty());
1409 }
1410
1411 #[test]
1412 fn cross_track_drift_detected() {
1413 let conn = setup_both_tracks_fixture();
1414
1415 conn.execute("DELETE FROM token_usage WHERE id = 1")
1417 .unwrap();
1418 conn.execute("UPDATE token_daily_stats SET grand_total_tokens = 0 WHERE day_id = 20254")
1420 .unwrap();
1421
1422 let config = ValidateConfig::deep();
1423 let report = run_validation(&conn, &config);
1424
1425 let drift_check = report
1426 .checks
1427 .iter()
1428 .find(|c| c.id == "cross_track.drift")
1429 .expect("should have cross-track drift check");
1430 assert!(!drift_check.ok, "Should detect cross-track drift");
1432 assert!(!report.drift.is_empty());
1433 assert_eq!(report.drift[0].track_a_total, 880);
1434 assert_eq!(report.drift[0].track_b_total, 0);
1435 }
1436
1437 #[test]
1438 fn negative_counters_detected() {
1439 let conn = setup_track_a_fixture();
1440
1441 conn.execute("UPDATE usage_daily SET tool_call_count = -5 WHERE day_id = 20254")
1443 .unwrap();
1444
1445 let config = ValidateConfig::deep();
1446 let report = run_validation(&conn, &config);
1447
1448 let neg_check = report
1449 .checks
1450 .iter()
1451 .find(|c| c.id == "track_a.non_negative_counters")
1452 .expect("should have non-negative check");
1453 assert!(!neg_check.ok, "Should detect negative counters");
1454 }
1455
1456 #[test]
1457 fn coverage_exceeding_message_count_detected() {
1458 let conn = setup_track_a_fixture();
1459
1460 conn.execute(
1462 "UPDATE usage_daily SET api_coverage_message_count = 999 WHERE day_id = 20254",
1463 )
1464 .unwrap();
1465
1466 let config = ValidateConfig::deep();
1467 let report = run_validation(&conn, &config);
1468
1469 let cov_check = report
1470 .checks
1471 .iter()
1472 .find(|c| c.id == "track_a.coverage_lte_messages")
1473 .expect("should have coverage <= messages check");
1474 assert!(!cov_check.ok);
1475 }
1476
1477 #[test]
1478 fn empty_database_reports_missing_tables() {
1479 let conn = Connection::open(":memory:").unwrap();
1480 let config = ValidateConfig::default();
1481 let report = run_validation(&conn, &config);
1482
1483 let errors: Vec<_> = report
1485 .checks
1486 .iter()
1487 .filter(|c| !c.ok && c.severity == Severity::Error)
1488 .collect();
1489 assert!(!errors.is_empty());
1490 }
1491
1492 #[test]
1493 fn sample_mode_limits_buckets() {
1494 let conn = setup_track_a_fixture();
1495 let config = ValidateConfig {
1496 sample_buckets: 1,
1497 ..Default::default()
1498 };
1499 let report = run_validation(&conn, &config);
1500
1501 assert_eq!(report._meta.sampling.mode, "sample");
1502 assert!(report._meta.sampling.buckets_checked <= 1);
1504 }
1505
1506 #[test]
1507 fn deep_mode_scans_all() {
1508 let conn = setup_track_a_fixture();
1509 let config = ValidateConfig::deep();
1510 let report = run_validation(&conn, &config);
1511
1512 assert_eq!(report._meta.sampling.mode, "deep");
1513 }
1514
1515 #[test]
1516 fn report_json_shape() {
1517 let conn = setup_track_a_fixture();
1518 let config = ValidateConfig::deep();
1519 let report = run_validation(&conn, &config);
1520 let json = report.to_json();
1521
1522 assert!(json["checks"].is_array());
1523 assert!(json["drift"].is_array());
1524 assert!(json["_meta"]["elapsed_ms"].is_number());
1525 assert!(json["_meta"]["sampling"]["mode"].is_string());
1526 }
1527
1528 #[test]
1529 fn perf_query_guardrail_completes() {
1530 let conn = setup_track_a_fixture();
1531 let m = perf_query_guardrail(&conn);
1532 assert!(
1533 m.error.is_none(),
1534 "timeseries guardrail should complete: {}",
1535 m.details
1536 );
1537 assert_eq!(m.id, "perf.query_timeseries");
1538 assert_eq!(m.budget_ms, 500);
1539 assert!(m.details.contains("Timeseries rollup query"));
1540 }
1541
1542 #[test]
1543 fn perf_breakdown_guardrail_completes() {
1544 let conn = setup_track_a_fixture();
1545 let m = perf_breakdown_guardrail(&conn);
1546 assert!(
1547 m.error.is_none(),
1548 "breakdown guardrail should complete: {}",
1549 m.details
1550 );
1551 assert_eq!(m.id, "perf.query_breakdown");
1552 assert_eq!(m.budget_ms, 200);
1553 assert!(m.details.contains("Breakdown query"));
1554 }
1555
1556 #[test]
1557 fn perf_query_guardrail_reports_query_failure() {
1558 let conn = Connection::open(":memory:").unwrap();
1559 conn.execute_batch("CREATE TABLE usage_daily (message_count INTEGER);")
1560 .unwrap();
1561
1562 let m = perf_query_guardrail(&conn);
1563 assert!(!m.within_budget);
1564 assert!(m.error.is_some());
1565 assert!(m.details.contains("failed"));
1566 }
1567
1568 #[test]
1569 fn perf_breakdown_guardrail_reports_query_failure() {
1570 let conn = Connection::open(":memory:").unwrap();
1571 conn.execute_batch("CREATE TABLE usage_daily (api_tokens_total INTEGER);")
1572 .unwrap();
1573
1574 let m = perf_breakdown_guardrail(&conn);
1575 assert!(!m.within_budget);
1576 assert!(m.error.is_some());
1577 assert!(m.details.contains("failed"));
1578 }
1579
1580 #[test]
1581 fn malformed_track_a_schema_reports_query_failure() {
1582 let conn = Connection::open(":memory:").unwrap();
1583 conn.execute_batch(
1584 "CREATE TABLE message_metrics (day_id INTEGER);
1585 CREATE TABLE usage_daily (day_id INTEGER);",
1586 )
1587 .unwrap();
1588
1589 let (checks, checked, total) = validate_track_a(&conn, &ValidateConfig::deep());
1590 let failure = checks
1591 .iter()
1592 .find(|c| c.id == "track_a.query_exec")
1593 .expect("Track A query failure should be reported");
1594
1595 assert!(!failure.ok);
1596 assert_eq!(failure.severity, Severity::Error);
1597 assert_eq!(checked, 0);
1598 assert_eq!(total, 0);
1599 }
1600
1601 #[test]
1602 fn malformed_track_b_schema_reports_query_failure() {
1603 let conn = Connection::open(":memory:").unwrap();
1604 conn.execute_batch(
1605 "CREATE TABLE agents (id INTEGER PRIMARY KEY, slug TEXT NOT NULL UNIQUE);
1606 CREATE TABLE token_usage (day_id INTEGER, agent_id INTEGER, source_id TEXT, model_family TEXT);
1607 CREATE TABLE token_daily_stats (day_id INTEGER, agent_slug TEXT, source_id TEXT, model_family TEXT);",
1608 )
1609 .unwrap();
1610
1611 let (checks, checked, total) = validate_track_b(&conn, &ValidateConfig::deep());
1612 let failure = checks
1613 .iter()
1614 .find(|c| c.id == "track_b.query_exec")
1615 .expect("Track B query failure should be reported");
1616
1617 assert!(!failure.ok);
1618 assert_eq!(failure.severity, Severity::Error);
1619 assert_eq!(checked, 0);
1620 assert_eq!(total, 0);
1621 }
1622
1623 #[test]
1624 fn malformed_cross_track_schema_reports_query_failure() {
1625 let conn = Connection::open(":memory:").unwrap();
1626 conn.execute_batch(
1627 "CREATE TABLE usage_daily (day_id INTEGER);
1628 CREATE TABLE token_daily_stats (day_id INTEGER);",
1629 )
1630 .unwrap();
1631
1632 let (checks, drift) = validate_cross_track_drift(&conn, &ValidateConfig::deep());
1633 let failure = checks
1634 .iter()
1635 .find(|c| c.id == "cross_track.query_exec")
1636 .expect("Cross-track query failure should be reported");
1637
1638 assert!(!failure.ok);
1639 assert_eq!(failure.severity, Severity::Error);
1640 assert!(drift.is_empty());
1641 }
1642
1643 #[test]
1644 fn repair_plan_marks_track_a_failures_fixable() {
1645 let conn = setup_track_a_fixture();
1646 conn.execute("UPDATE usage_daily SET message_count = 999 WHERE day_id = 20254")
1647 .unwrap();
1648
1649 let report = run_validation(&conn, &ValidateConfig::deep());
1650 let plan = build_repair_plan(&report);
1651
1652 let track_a = plan
1653 .decisions
1654 .iter()
1655 .find(|decision| decision.kind == RepairKind::RebuildTrackA)
1656 .expect("track a repair decision");
1657 assert!(plan.apply_track_a_rebuild);
1658 assert!(track_a.fixable);
1659 assert!(
1660 track_a
1661 .check_ids
1662 .contains(&"track_a.message_count_match".to_string())
1663 );
1664 }
1665
1666 #[test]
1667 fn repair_plan_marks_track_b_data_drift_as_rebuild_track_b() {
1668 let conn = setup_both_tracks_fixture();
1674 conn.execute("DELETE FROM token_daily_stats").unwrap();
1675
1676 let report = run_validation(&conn, &ValidateConfig::deep());
1677 let plan = build_repair_plan(&report);
1678
1679 let rebuild_b = plan
1680 .decisions
1681 .iter()
1682 .find(|decision| decision.kind == RepairKind::RebuildTrackB)
1683 .expect("track-b rebuild decision");
1684 assert!(!plan.apply_track_a_rebuild);
1685 assert!(plan.apply_track_b_rebuild);
1686 assert!(rebuild_b.fixable);
1687 assert!(
1688 rebuild_b
1689 .check_ids
1690 .contains(&"track_b.has_data".to_string())
1691 );
1692 }
1693
1694 #[test]
1695 fn repair_plan_marks_track_b_tables_missing_as_unavailable() {
1696 let conn = setup_both_tracks_fixture();
1701 conn.execute("DROP TABLE token_usage").unwrap();
1702
1703 let report = run_validation(&conn, &ValidateConfig::deep());
1704 let plan = build_repair_plan(&report);
1705
1706 let unavailable = plan
1707 .decisions
1708 .iter()
1709 .find(|decision| decision.kind == RepairKind::TrackAllRebuildUnavailable)
1710 .expect("track-all unavailable decision when ledger missing");
1711 assert!(!plan.apply_track_a_rebuild);
1712 assert!(!plan.apply_track_b_rebuild);
1713 assert!(!unavailable.fixable);
1714 assert!(
1715 unavailable
1716 .check_ids
1717 .contains(&"track_b.tables_exist".to_string())
1718 );
1719 }
1720
1721 #[test]
1722 fn repair_plan_marks_track_a_only_drift_as_fixable() {
1723 let report = ValidationReport {
1724 checks: vec![Check {
1725 id: "cross_track.drift".into(),
1726 ok: false,
1727 severity: Severity::Warning,
1728 details: "drift found".into(),
1729 suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
1730 }],
1731 drift: vec![DriftEntry {
1732 day_id: 20254,
1733 agent_slug: "codex".into(),
1734 source_id: "local".into(),
1735 track_a_total: 0,
1736 track_b_total: 123,
1737 delta: -123,
1738 delta_pct: 100.0,
1739 likely_cause:
1740 "Track B higher — Track A may have been rebuilt recently without all data"
1741 .into(),
1742 }],
1743 _meta: ReportMeta {
1744 elapsed_ms: 1,
1745 sampling: SamplingMeta {
1746 buckets_checked: 1,
1747 buckets_total: 1,
1748 mode: "deep".into(),
1749 },
1750 path: "rollup".into(),
1751 },
1752 };
1753
1754 let plan = build_repair_plan(&report);
1755 assert!(plan.apply_track_a_rebuild);
1756 assert_eq!(plan.decisions.len(), 1);
1757 assert_eq!(plan.decisions[0].kind, RepairKind::RebuildTrackA);
1758 }
1759}