Skip to main content

coding_agent_search/analytics/
validate.rs

1//! Analytics validation library.
2//!
3//! Provides deterministic checks for:
4//! - **Track A invariants** — `usage_daily` matches `SUM(message_metrics)`.
5//! - **Track B invariants** — `token_daily_stats` matches `SUM(token_usage)`.
6//! - **Cross-track drift** — Track A vs Track B deltas by day + agent.
7//! - **Performance guardrails** — timing budgets for queries and rebuilds.
8//!
9//! Output is a structured [`ValidationReport`] that serialises to JSON
10//! for `cass analytics validate --json`.
11
12use frankensqlite::Connection;
13use frankensqlite::Row;
14use frankensqlite::compat::{ConnectionExt, RowExt};
15use serde::Serialize;
16use std::collections::BTreeMap;
17
18use super::query::table_exists;
19
20// ---------------------------------------------------------------------------
21// Output types
22// ---------------------------------------------------------------------------
23
24/// Severity level for a single check.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
26#[serde(rename_all = "lowercase")]
27pub enum Severity {
28    Info,
29    Warning,
30    Error,
31}
32
33/// A single validation check result.
34#[derive(Debug, Clone, Serialize)]
35pub struct Check {
36    pub id: String,
37    pub ok: bool,
38    pub severity: Severity,
39    pub details: String,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub suggested_action: Option<String>,
42}
43
44/// A cross-track drift entry.
45#[derive(Debug, Clone, Serialize)]
46pub struct DriftEntry {
47    pub day_id: i64,
48    pub agent_slug: String,
49    pub source_id: String,
50    pub track_a_total: i64,
51    pub track_b_total: i64,
52    pub delta: i64,
53    pub delta_pct: f64,
54    pub likely_cause: String,
55}
56
57/// Sampling metadata.
58#[derive(Debug, Clone, Serialize)]
59pub struct SamplingMeta {
60    pub buckets_checked: usize,
61    pub buckets_total: usize,
62    pub mode: String, // "sample" or "deep"
63}
64
65/// Report metadata.
66#[derive(Debug, Clone, Serialize)]
67pub struct ReportMeta {
68    pub elapsed_ms: u64,
69    pub sampling: SamplingMeta,
70    pub path: String,
71}
72
73/// Full validation report.
74#[derive(Debug, Clone, Serialize)]
75pub struct ValidationReport {
76    pub checks: Vec<Check>,
77    pub drift: Vec<DriftEntry>,
78    pub _meta: ReportMeta,
79}
80
81impl ValidationReport {
82    /// True if every check passed.
83    pub fn all_ok(&self) -> bool {
84        self.checks.iter().all(|c| c.ok)
85    }
86
87    /// Count of checks that failed with a given severity.
88    pub fn count_failures(&self, sev: Severity) -> usize {
89        self.checks
90            .iter()
91            .filter(|c| !c.ok && c.severity == sev)
92            .count()
93    }
94
95    /// Produce the JSON value.
96    pub fn to_json(&self) -> serde_json::Value {
97        serde_json::to_value(self).unwrap_or(serde_json::json!({"error": "serialization failed"}))
98    }
99}
100
101/// Safe automatic repair class for a validation failure.
102#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
103#[serde(rename_all = "snake_case")]
104pub enum RepairKind {
105    RebuildTrackA,
106    /// Track B (`token_daily_stats`) rollup mismatch where the
107    /// underlying `token_usage` ledger is intact. Repairable by
108    /// calling `FrankenStorage::rebuild_token_daily_stats()` — it
109    /// replays the ledger into fresh `token_daily_stats` rows
110    /// transactionally. Bead m7xrw.
111    RebuildTrackB,
112    /// Neither Track A nor Track B rebuild can fix this failure —
113    /// e.g. token_usage ledger itself is missing or corrupt, agents
114    /// table is gone, or the validation query failed to execute.
115    /// Only a full canonical replay (ibuuh.29 / z9fse.13 class work)
116    /// would recover this.
117    TrackAllRebuildUnavailable,
118    ManualReview,
119}
120
121/// Grouped repair decision derived from a validation report.
122#[derive(Debug, Clone, Serialize)]
123pub struct RepairDecision {
124    pub kind: RepairKind,
125    pub fixable: bool,
126    pub check_ids: Vec<String>,
127    pub reason: String,
128}
129
130/// Summary of automatic repair opportunities in a validation report.
131#[derive(Debug, Clone, Serialize)]
132pub struct RepairPlan {
133    pub apply_track_a_rebuild: bool,
134    /// Whether any Track B check can be repaired by replaying the
135    /// `token_usage` ledger into fresh `token_daily_stats` via
136    /// `rebuild_token_daily_stats`. Bead m7xrw.
137    pub apply_track_b_rebuild: bool,
138    pub decisions: Vec<RepairDecision>,
139}
140
141/// Build a safe automatic repair plan from a validation report.
142pub fn build_repair_plan(report: &ValidationReport) -> RepairPlan {
143    let mut grouped: BTreeMap<RepairKind, Vec<String>> = BTreeMap::new();
144
145    for check in report.checks.iter().filter(|check| !check.ok) {
146        let kind = classify_repair_kind(check, report);
147        grouped.entry(kind).or_default().push(check.id.clone());
148    }
149
150    let decisions = grouped
151        .into_iter()
152        .map(|(kind, mut check_ids)| {
153            check_ids.sort();
154            RepairDecision {
155                fixable: matches!(kind, RepairKind::RebuildTrackA | RepairKind::RebuildTrackB),
156                reason: repair_reason(kind).into(),
157                kind,
158                check_ids,
159            }
160        })
161        .collect::<Vec<_>>();
162
163    let apply_track_a_rebuild = decisions
164        .iter()
165        .any(|decision| decision.kind == RepairKind::RebuildTrackA);
166    let apply_track_b_rebuild = decisions
167        .iter()
168        .any(|decision| decision.kind == RepairKind::RebuildTrackB);
169
170    RepairPlan {
171        apply_track_a_rebuild,
172        apply_track_b_rebuild,
173        decisions,
174    }
175}
176
177fn classify_repair_kind(check: &Check, report: &ValidationReport) -> RepairKind {
178    if check.id.starts_with("track_a.") {
179        return RepairKind::RebuildTrackA;
180    }
181
182    if check.id == "cross_track.drift" {
183        if report.drift.iter().all(|entry| {
184            entry.likely_cause.starts_with("Track A missing rows")
185                || entry.likely_cause.starts_with("Track B higher")
186        }) {
187            return RepairKind::RebuildTrackA;
188        }
189        return RepairKind::TrackAllRebuildUnavailable;
190    }
191
192    if check.id.starts_with("track_b.") {
193        // Bead m7xrw: Track B failures where the `token_usage` ledger
194        // is intact are repairable by replaying the ledger into fresh
195        // `token_daily_stats` rows via
196        // `FrankenStorage::rebuild_token_daily_stats()`. Only when the
197        // ledger itself is missing/corrupt or the infrastructure
198        // preconditions fail do we fall back to
199        // `TrackAllRebuildUnavailable` (that would need a full
200        // canonical replay from messages, which is the larger z9fse.13
201        // class of work and is NOT what this repair path provides).
202        match check.id.as_str() {
203            // Infrastructure-level failures that rebuild_token_daily_stats
204            // cannot repair on its own: either the ledger is gone, or a
205            // required joined table is missing, or the query itself
206            // couldn't execute.
207            "track_b.tables_exist" | "track_b.agents_table_missing" | "track_b.query_exec" => {
208                RepairKind::TrackAllRebuildUnavailable
209            }
210            // Every other Track B check ("has_data", "grand_total_match",
211            // "tool_calls_match", "non_negative_counters", and any future
212            // same-shape checks) describes a state rebuild_token_daily_stats
213            // will fix by replaying the intact ledger.
214            _ => RepairKind::RebuildTrackB,
215        }
216    } else {
217        RepairKind::ManualReview
218    }
219}
220
221fn repair_reason(kind: RepairKind) -> &'static str {
222    match kind {
223        RepairKind::RebuildTrackA => {
224            "Track A rollups are derivable from raw messages and can be rebuilt safely."
225        }
226        RepairKind::RebuildTrackB => {
227            "Track B rollups are derivable from the intact token_usage ledger and can be rebuilt safely via rebuild_token_daily_stats()."
228        }
229        RepairKind::TrackAllRebuildUnavailable => {
230            "Track B ledger or cross-track precondition is missing; a full canonical replay is required and is not implemented by --fix. Run 'cass doctor check --json' and restore or repair the canonical archive before rebuilding derived assets."
231        }
232        RepairKind::ManualReview => {
233            "This validation failure does not have a proven automatic repair path."
234        }
235    }
236}
237
238// ---------------------------------------------------------------------------
239// Configuration
240// ---------------------------------------------------------------------------
241
242/// Controls sampling vs deep-scan behaviour.
243#[derive(Debug, Clone)]
244pub struct ValidateConfig {
245    /// Maximum number of (day_id, agent_slug) buckets to check per track.
246    /// `0` means full scan (deep mode).
247    pub sample_buckets: usize,
248    /// Absolute delta threshold below which drift is treated as rounding noise.
249    pub drift_abs_threshold: i64,
250    /// Percentage threshold above which drift is flagged.
251    pub drift_pct_threshold: f64,
252}
253
254impl Default for ValidateConfig {
255    fn default() -> Self {
256        Self {
257            sample_buckets: 20,
258            drift_abs_threshold: 10,
259            drift_pct_threshold: 1.0,
260        }
261    }
262}
263
264impl ValidateConfig {
265    /// Deep-scan mode: check every bucket.
266    pub fn deep() -> Self {
267        Self {
268            sample_buckets: 0,
269            ..Default::default()
270        }
271    }
272}
273
274// ---------------------------------------------------------------------------
275// Main entry point
276// ---------------------------------------------------------------------------
277
278/// Run the full validation suite and return a structured report.
279pub fn run_validation(conn: &Connection, config: &ValidateConfig) -> ValidationReport {
280    let start = std::time::Instant::now();
281    let mut checks = Vec::new();
282    let mut buckets_checked: usize = 0;
283    let mut buckets_total: usize = 0;
284
285    // --- Track A ---
286    let (a_checks, a_checked, a_total) = validate_track_a(conn, config);
287    checks.extend(a_checks);
288    buckets_checked += a_checked;
289    buckets_total += a_total;
290
291    // --- Track B ---
292    let (b_checks, b_checked, b_total) = validate_track_b(conn, config);
293    checks.extend(b_checks);
294    buckets_checked += b_checked;
295    buckets_total += b_total;
296
297    // --- Cross-track drift ---
298    let (d_checks, d_entries) = validate_cross_track_drift(conn, config);
299    checks.extend(d_checks);
300    let drift = d_entries;
301
302    // --- Non-negative counters ---
303    checks.extend(validate_non_negative_counters(conn));
304
305    let elapsed_ms = start.elapsed().as_millis() as u64;
306    let mode = if config.sample_buckets == 0 {
307        "deep"
308    } else {
309        "sample"
310    };
311
312    ValidationReport {
313        checks,
314        drift,
315        _meta: ReportMeta {
316            elapsed_ms,
317            sampling: SamplingMeta {
318                buckets_checked,
319                buckets_total,
320                mode: mode.into(),
321            },
322            path: "rollup".into(),
323        },
324    }
325}
326
327fn query_executes(conn: &Connection, sql: &str) -> Result<(), String> {
328    conn.query_map_collect(sql, &[], |_row: &Row| Ok(()))
329        .map(|_| ())
330        .map_err(|err| err.to_string())
331}
332
333fn query_exec_error_check(id: &str, details: String, suggested_action: &str) -> Check {
334    Check {
335        id: id.into(),
336        ok: false,
337        severity: Severity::Error,
338        details,
339        suggested_action: Some(suggested_action.into()),
340    }
341}
342
343// ---------------------------------------------------------------------------
344// Track A validation
345// ---------------------------------------------------------------------------
346
347/// Validate Track A: `usage_daily` aggregates must match `SUM(message_metrics)`.
348///
349/// Returns `(checks, buckets_checked, buckets_total)`.
350fn validate_track_a(conn: &Connection, config: &ValidateConfig) -> (Vec<Check>, usize, usize) {
351    let mut checks = Vec::new();
352
353    if !table_exists(conn, "usage_daily") || !table_exists(conn, "message_metrics") {
354        checks.push(Check {
355            id: "track_a.tables_exist".into(),
356            ok: false,
357            severity: Severity::Error,
358            details: "Track A tables missing (usage_daily or message_metrics)".into(),
359            suggested_action: Some("Run 'cass analytics rebuild'".into()),
360        });
361        return (checks, 0, 0);
362    }
363
364    checks.push(Check {
365        id: "track_a.tables_exist".into(),
366        ok: true,
367        severity: Severity::Info,
368        details: "Track A tables exist".into(),
369        suggested_action: None,
370    });
371
372    // Get all distinct (day_id, agent_slug, workspace_id, source_id) buckets in usage_daily.
373    let total_buckets: usize = conn
374        .query_row_map("SELECT COUNT(*) FROM usage_daily", &[], |r: &Row| {
375            r.get_typed::<i64>(0).map(|v| v as usize)
376        })
377        .unwrap_or(0);
378
379    let limit_clause = if config.sample_buckets > 0 {
380        format!("LIMIT {}", config.sample_buckets)
381    } else {
382        String::new()
383    };
384
385    // Check content_tokens_est_total invariant.
386    let sql = format!(
387        "SELECT ud.day_id, ud.agent_slug, ud.workspace_id, ud.source_id,
388                ud.content_tokens_est_total,
389                COALESCE(mm.sum_content, 0),
390                ud.message_count,
391                COALESCE(mm.sum_msgs, 0),
392                ud.api_tokens_total,
393                COALESCE(mm.sum_api, 0),
394                ud.api_coverage_message_count,
395                COALESCE(mm.sum_api_coverage, 0)
396         FROM usage_daily ud
397         LEFT JOIN (
398             SELECT day_id, agent_slug, workspace_id, source_id,
399                    SUM(content_tokens_est) AS sum_content,
400                    COUNT(*) AS sum_msgs,
401                    SUM(CASE WHEN api_data_source = 'api'
402                             THEN COALESCE(api_input_tokens, 0)
403                                + COALESCE(api_output_tokens, 0)
404                                + COALESCE(api_cache_read_tokens, 0)
405                                + COALESCE(api_cache_creation_tokens, 0)
406                                + COALESCE(api_thinking_tokens, 0)
407                             ELSE 0 END) AS sum_api,
408                    SUM(CASE WHEN api_data_source = 'api' THEN 1 ELSE 0 END) AS sum_api_coverage
409             FROM message_metrics
410             GROUP BY day_id, agent_slug, workspace_id, source_id
411         ) mm ON ud.day_id = mm.day_id
412              AND ud.agent_slug = mm.agent_slug
413              AND ud.workspace_id = mm.workspace_id
414              AND ud.source_id = mm.source_id
415         ORDER BY ud.day_id DESC
416         {limit_clause}"
417    );
418
419    if total_buckets == 0 {
420        if let Err(err) = query_executes(conn, &sql) {
421            checks.push(query_exec_error_check(
422                "track_a.query_exec",
423                format!("Track A invariant query failed: {err}"),
424                "Run 'cass analytics rebuild --track a' or verify the analytics schema",
425            ));
426            return (checks, 0, 0);
427        }
428
429        checks.push(Check {
430            id: "track_a.has_data".into(),
431            ok: false,
432            severity: Severity::Warning,
433            details: "usage_daily is empty".into(),
434            suggested_action: Some("Run 'cass analytics rebuild'".into()),
435        });
436        return (checks, 0, 0);
437    }
438
439    let mut mismatches_content = 0_usize;
440    let mut mismatches_msg_count = 0_usize;
441    let mut mismatches_api = 0_usize;
442    let mut mismatches_api_cov = 0_usize;
443    let mut checked = 0_usize;
444
445    let rows = match conn.query_map_collect(&sql, &[], |row: &Row| {
446        Ok((
447            row.get_typed::<i64>(0)?,    // day_id
448            row.get_typed::<String>(1)?, // agent_slug
449            row.get_typed::<i64>(4)?,    // ud.content_tokens_est_total
450            row.get_typed::<i64>(5)?,    // mm.sum_content
451            row.get_typed::<i64>(6)?,    // ud.message_count
452            row.get_typed::<i64>(7)?,    // mm.sum_msgs
453            row.get_typed::<i64>(8)?,    // ud.api_tokens_total
454            row.get_typed::<i64>(9)?,    // mm.sum_api
455            row.get_typed::<i64>(10)?,   // ud.api_coverage_message_count
456            row.get_typed::<i64>(11)?,   // mm.sum_api_coverage
457        ))
458    }) {
459        Ok(rows) => rows,
460        Err(err) => {
461            checks.push(query_exec_error_check(
462                "track_a.query_exec",
463                format!("Track A invariant query failed: {err}"),
464                "Run 'cass analytics rebuild --track a' or verify the analytics schema",
465            ));
466            return (checks, 0, total_buckets);
467        }
468    };
469
470    for row in rows {
471        checked += 1;
472        let (
473            _day_id,
474            _agent,
475            ud_content,
476            mm_content,
477            ud_msgs,
478            mm_msgs,
479            ud_api,
480            mm_api,
481            ud_cov,
482            mm_cov,
483        ) = row;
484        if ud_content != mm_content {
485            mismatches_content += 1;
486        }
487        if ud_msgs != mm_msgs {
488            mismatches_msg_count += 1;
489        }
490        if ud_api != mm_api {
491            mismatches_api += 1;
492        }
493        if ud_cov != mm_cov {
494            mismatches_api_cov += 1;
495        }
496    }
497
498    // Content tokens check.
499    checks.push(Check {
500        id: "track_a.content_tokens_match".into(),
501        ok: mismatches_content == 0,
502        severity: if mismatches_content > 0 {
503            Severity::Error
504        } else {
505            Severity::Info
506        },
507        details: format!(
508            "content_tokens_est_total: {mismatches_content}/{checked} buckets mismatched"
509        ),
510        suggested_action: if mismatches_content > 0 {
511            Some("Run 'cass analytics rebuild --track a'".into())
512        } else {
513            None
514        },
515    });
516
517    // Message count check.
518    checks.push(Check {
519        id: "track_a.message_count_match".into(),
520        ok: mismatches_msg_count == 0,
521        severity: if mismatches_msg_count > 0 {
522            Severity::Error
523        } else {
524            Severity::Info
525        },
526        details: format!("message_count: {mismatches_msg_count}/{checked} buckets mismatched"),
527        suggested_action: if mismatches_msg_count > 0 {
528            Some("Run 'cass analytics rebuild --track a'".into())
529        } else {
530            None
531        },
532    });
533
534    // API tokens check.
535    checks.push(Check {
536        id: "track_a.api_tokens_match".into(),
537        ok: mismatches_api == 0,
538        severity: if mismatches_api > 0 {
539            Severity::Error
540        } else {
541            Severity::Info
542        },
543        details: format!("api_tokens_total: {mismatches_api}/{checked} buckets mismatched"),
544        suggested_action: if mismatches_api > 0 {
545            Some("Run 'cass analytics rebuild --track a'".into())
546        } else {
547            None
548        },
549    });
550
551    // API coverage check.
552    checks.push(Check {
553        id: "track_a.api_coverage_match".into(),
554        ok: mismatches_api_cov == 0,
555        severity: if mismatches_api_cov > 0 {
556            Severity::Warning
557        } else {
558            Severity::Info
559        },
560        details: format!(
561            "api_coverage_message_count: {mismatches_api_cov}/{checked} buckets mismatched"
562        ),
563        suggested_action: if mismatches_api_cov > 0 {
564            Some("Run 'cass analytics rebuild --track a'".into())
565        } else {
566            None
567        },
568    });
569
570    (checks, checked, total_buckets)
571}
572
573// ---------------------------------------------------------------------------
574// Track B validation
575// ---------------------------------------------------------------------------
576
577/// Validate Track B: `token_daily_stats` must match `SUM(token_usage)`.
578fn validate_track_b(conn: &Connection, config: &ValidateConfig) -> (Vec<Check>, usize, usize) {
579    let mut checks = Vec::new();
580
581    if !table_exists(conn, "token_daily_stats") || !table_exists(conn, "token_usage") {
582        checks.push(Check {
583            id: "track_b.tables_exist".into(),
584            ok: false,
585            severity: Severity::Error,
586            details: "Track B tables missing (token_daily_stats or token_usage)".into(),
587            suggested_action: Some(
588                "Run 'cass analytics rebuild --track all' (requires z9fse.13)".into(),
589            ),
590        });
591        return (checks, 0, 0);
592    }
593
594    checks.push(Check {
595        id: "track_b.tables_exist".into(),
596        ok: true,
597        severity: Severity::Info,
598        details: "Track B tables exist".into(),
599        suggested_action: None,
600    });
601
602    let total_buckets: usize = conn
603        .query_row_map("SELECT COUNT(*) FROM token_daily_stats", &[], |r: &Row| {
604            r.get_typed::<i64>(0).map(|v| v as usize)
605        })
606        .unwrap_or(0);
607
608    let limit_clause = if config.sample_buckets > 0 {
609        format!("LIMIT {}", config.sample_buckets)
610    } else {
611        String::new()
612    };
613
614    // token_usage uses agent_id (FK) not agent_slug; we need agents table.
615    // If agents table doesn't exist, we fall back to a simpler join.
616    let has_agents_table = table_exists(conn, "agents");
617
618    let sql = if has_agents_table {
619        format!(
620            "SELECT tds.day_id, tds.agent_slug, tds.source_id, tds.model_family,
621                    tds.grand_total_tokens,
622                    COALESCE(tu.sum_total, 0),
623                    tds.total_tool_calls,
624                    COALESCE(tu.sum_tools, 0),
625                    tds.api_call_count,
626                    COALESCE(tu.sum_rows, 0)
627             FROM token_daily_stats tds
628             LEFT JOIN (
629                 SELECT t.day_id,
630                        a.slug AS agent_slug,
631                        t.source_id,
632                        COALESCE(t.model_family, 'unknown') AS model_family,
633                        SUM(COALESCE(t.total_tokens, 0)) AS sum_total,
634                        SUM(t.tool_call_count) AS sum_tools,
635                        COUNT(*) AS sum_rows
636                 FROM token_usage t
637                 JOIN agents a ON a.id = t.agent_id
638                 GROUP BY t.day_id, a.slug, t.source_id, COALESCE(t.model_family, 'unknown')
639             ) tu ON tds.day_id = tu.day_id
640                   AND tds.agent_slug = tu.agent_slug
641                   AND tds.source_id = tu.source_id
642                   AND tds.model_family = tu.model_family
643             ORDER BY tds.day_id DESC
644             {limit_clause}"
645        )
646    } else {
647        // Without agents table, we can't join — skip granular check.
648        checks.push(Check {
649            id: "track_b.agents_table_missing".into(),
650            ok: false,
651            severity: Severity::Warning,
652            details: "agents table not found — cannot validate Track B granular invariants".into(),
653            suggested_action: None,
654        });
655        return (checks, 0, total_buckets);
656    };
657
658    if total_buckets == 0 {
659        if let Err(err) = query_executes(conn, &sql) {
660            checks.push(query_exec_error_check(
661                "track_b.query_exec",
662                format!("Track B invariant query failed: {err}"),
663                "Run 'cass analytics rebuild --track all' or verify the analytics schema",
664            ));
665            return (checks, 0, 0);
666        }
667
668        checks.push(Check {
669            id: "track_b.has_data".into(),
670            ok: false,
671            severity: Severity::Warning,
672            details: "token_daily_stats is empty".into(),
673            suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
674        });
675        return (checks, 0, 0);
676    }
677
678    let mut mismatches_total = 0_usize;
679    let mut mismatches_tools = 0_usize;
680    let mut checked = 0_usize;
681
682    let rows = match conn.query_map_collect(&sql, &[], |row: &Row| {
683        Ok((
684            row.get_typed::<i64>(4)?, // tds.grand_total_tokens
685            row.get_typed::<i64>(5)?, // tu.sum_total
686            row.get_typed::<i64>(6)?, // tds.total_tool_calls
687            row.get_typed::<i64>(7)?, // tu.sum_tools
688        ))
689    }) {
690        Ok(rows) => rows,
691        Err(err) => {
692            checks.push(query_exec_error_check(
693                "track_b.query_exec",
694                format!("Track B invariant query failed: {err}"),
695                "Run 'cass analytics rebuild --track all' or verify the analytics schema",
696            ));
697            return (checks, 0, total_buckets);
698        }
699    };
700
701    for row in rows {
702        checked += 1;
703        let (tds_total, tu_total, tds_tools, tu_tools) = row;
704        if tds_total != tu_total {
705            mismatches_total += 1;
706        }
707        if tds_tools != tu_tools {
708            mismatches_tools += 1;
709        }
710    }
711
712    checks.push(Check {
713        id: "track_b.grand_total_match".into(),
714        ok: mismatches_total == 0,
715        severity: if mismatches_total > 0 {
716            Severity::Error
717        } else {
718            Severity::Info
719        },
720        details: format!("grand_total_tokens: {mismatches_total}/{checked} buckets mismatched"),
721        suggested_action: if mismatches_total > 0 {
722            Some("Run 'cass analytics rebuild --track all'".into())
723        } else {
724            None
725        },
726    });
727
728    checks.push(Check {
729        id: "track_b.tool_calls_match".into(),
730        ok: mismatches_tools == 0,
731        severity: if mismatches_tools > 0 {
732            Severity::Warning
733        } else {
734            Severity::Info
735        },
736        details: format!("total_tool_calls: {mismatches_tools}/{checked} buckets mismatched"),
737        suggested_action: if mismatches_tools > 0 {
738            Some("Run 'cass analytics rebuild --track all'".into())
739        } else {
740            None
741        },
742    });
743
744    (checks, checked, total_buckets)
745}
746
747// ---------------------------------------------------------------------------
748// Cross-track drift detection
749// ---------------------------------------------------------------------------
750
751/// Detect drift between Track A and Track B at the day + agent + source level.
752fn validate_cross_track_drift(
753    conn: &Connection,
754    config: &ValidateConfig,
755) -> (Vec<Check>, Vec<DriftEntry>) {
756    let mut checks = Vec::new();
757    let mut entries = Vec::new();
758
759    let has_a = table_exists(conn, "usage_daily");
760    let has_b = table_exists(conn, "token_daily_stats");
761
762    if !has_a || !has_b {
763        let missing = if !has_a && !has_b {
764            "both tracks"
765        } else if !has_a {
766            "Track A (usage_daily)"
767        } else {
768            "Track B (token_daily_stats)"
769        };
770        checks.push(Check {
771            id: "cross_track.tables_exist".into(),
772            ok: false,
773            severity: Severity::Warning,
774            details: format!("Cannot compute cross-track drift: {missing} missing"),
775            suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
776        });
777        return (checks, entries);
778    }
779
780    let mut drift_count = 0_usize;
781    let mut drift_checked = 0_usize;
782    let mut merged = BTreeMap::<(i64, String, String), (i64, i64)>::new();
783
784    let track_a_rows = match conn.query_map_collect(
785        "SELECT day_id, agent_slug, source_id, SUM(api_tokens_total) AS api_total
786         FROM usage_daily
787         GROUP BY day_id, agent_slug, source_id",
788        &[],
789        |row: &Row| {
790            Ok((
791                row.get_typed::<i64>(0)?,
792                row.get_typed::<String>(1)?,
793                row.get_typed::<String>(2)?,
794                row.get_typed::<i64>(3)?,
795            ))
796        },
797    ) {
798        Ok(rows) => rows,
799        Err(err) => {
800            checks.push(Check {
801                id: "cross_track.query_exec".into(),
802                ok: false,
803                severity: Severity::Error,
804                details: format!("Cross-track drift query failed while reading Track A: {err}"),
805                suggested_action: Some(
806                    "Run 'cass analytics rebuild --track all' or verify the analytics schema"
807                        .into(),
808                ),
809            });
810            return (checks, entries);
811        }
812    };
813
814    for (day_id, agent_slug, source_id, total) in track_a_rows {
815        merged
816            .entry((day_id, agent_slug, source_id))
817            .or_insert((0, 0))
818            .0 = total;
819    }
820
821    let track_b_rows = match conn.query_map_collect(
822        "SELECT day_id, agent_slug, source_id, SUM(grand_total_tokens) AS grand_total
823         FROM token_daily_stats
824         GROUP BY day_id, agent_slug, source_id",
825        &[],
826        |row: &Row| {
827            Ok((
828                row.get_typed::<i64>(0)?,
829                row.get_typed::<String>(1)?,
830                row.get_typed::<String>(2)?,
831                row.get_typed::<i64>(3)?,
832            ))
833        },
834    ) {
835        Ok(rows) => rows,
836        Err(err) => {
837            checks.push(Check {
838                id: "cross_track.query_exec".into(),
839                ok: false,
840                severity: Severity::Error,
841                details: format!("Cross-track drift query failed while reading Track B: {err}"),
842                suggested_action: Some(
843                    "Run 'cass analytics rebuild --track all' or verify the analytics schema"
844                        .into(),
845                ),
846            });
847            return (checks, entries);
848        }
849    };
850
851    for (day_id, agent_slug, source_id, total) in track_b_rows {
852        merged
853            .entry((day_id, agent_slug, source_id))
854            .or_insert((0, 0))
855            .1 = total;
856    }
857
858    let mut rows: Vec<_> = merged.into_iter().collect();
859    rows.sort_by(|left, right| {
860        right
861            .0
862            .0
863            .cmp(&left.0.0)
864            .then_with(|| left.0.1.cmp(&right.0.1))
865            .then_with(|| left.0.2.cmp(&right.0.2))
866    });
867    if config.sample_buckets > 0 && rows.len() > config.sample_buckets {
868        rows.truncate(config.sample_buckets);
869    }
870
871    for ((day_id, agent_slug, source_id), (a_total, b_total)) in rows {
872        drift_checked += 1;
873        let delta = a_total.saturating_sub(b_total);
874        let denom = a_total.max(b_total).max(1);
875        let abs_delta = delta.unsigned_abs();
876        let delta_pct = (abs_delta as f64 / denom as f64) * 100.0;
877
878        if abs_delta > config.drift_abs_threshold as u64 && delta_pct > config.drift_pct_threshold {
879            drift_count += 1;
880            let likely_cause = if a_total > 0 && b_total == 0 {
881                "Track B missing rows (rebuild needed or not yet ingested)"
882            } else if b_total > 0 && a_total == 0 {
883                "Track A missing rows (rebuild needed)"
884            } else if a_total > b_total {
885                "Track A higher — Track B may be stale or missing some messages"
886            } else {
887                "Track B higher — Track A may have been rebuilt recently without all data"
888            };
889
890            entries.push(DriftEntry {
891                day_id,
892                agent_slug,
893                source_id,
894                track_a_total: a_total,
895                track_b_total: b_total,
896                delta,
897                delta_pct: (delta_pct * 100.0).round() / 100.0,
898                likely_cause: likely_cause.into(),
899            });
900        }
901    }
902
903    let total_ok = drift_count == 0;
904    checks.push(Check {
905        id: "cross_track.drift".into(),
906        ok: total_ok,
907        severity: if drift_count > 0 {
908            Severity::Warning
909        } else {
910            Severity::Info
911        },
912        details: format!(
913            "Cross-track drift: {drift_count}/{drift_checked} day+agent+source slices drifted"
914        ),
915        suggested_action: if drift_count > 0 {
916            Some("Run 'cass analytics rebuild --track all' to re-sync both tracks".into())
917        } else {
918            None
919        },
920    });
921
922    (checks, entries)
923}
924
925// ---------------------------------------------------------------------------
926// Non-negative counter checks
927// ---------------------------------------------------------------------------
928
929/// Validate that rollup counters are never negative.
930fn validate_non_negative_counters(conn: &Connection) -> Vec<Check> {
931    let mut checks = Vec::new();
932
933    // Track A: usage_daily non-negative.
934    if table_exists(conn, "usage_daily") {
935        let cols = [
936            "message_count",
937            "user_message_count",
938            "assistant_message_count",
939            "tool_call_count",
940            "plan_message_count",
941            "api_coverage_message_count",
942            "content_tokens_est_total",
943            "api_tokens_total",
944        ];
945        let cond = cols
946            .iter()
947            .map(|c| format!("{c} < 0"))
948            .collect::<Vec<_>>()
949            .join(" OR ");
950        let sql = format!("SELECT COUNT(*) FROM usage_daily WHERE {cond}");
951        match conn.query_row_map(&sql, &[], |r: &Row| r.get_typed::<i64>(0)) {
952            Ok(negative_rows) => {
953                checks.push(Check {
954                    id: "track_a.non_negative_counters".into(),
955                    ok: negative_rows == 0,
956                    severity: if negative_rows > 0 {
957                        Severity::Error
958                    } else {
959                        Severity::Info
960                    },
961                    details: format!("usage_daily: {negative_rows} rows with negative counters"),
962                    suggested_action: if negative_rows > 0 {
963                        Some("Run 'cass analytics rebuild --track a'".into())
964                    } else {
965                        None
966                    },
967                });
968            }
969            Err(err) => {
970                checks.push(Check {
971                    id: "track_a.non_negative_counters".into(),
972                    ok: false,
973                    severity: Severity::Error,
974                    details: format!("usage_daily negative-counter query failed: {err}"),
975                    suggested_action: Some(
976                        "Run 'cass analytics rebuild --track a' or verify the analytics schema"
977                            .into(),
978                    ),
979                });
980            }
981        }
982    }
983
984    // Track A: api_coverage_message_count <= message_count.
985    if table_exists(conn, "usage_daily") {
986        match conn.query_row_map(
987            "SELECT COUNT(*) FROM usage_daily WHERE api_coverage_message_count > message_count",
988            &[],
989            |r: &Row| r.get_typed::<i64>(0),
990        ) {
991            Ok(bad) => {
992                checks.push(Check {
993                    id: "track_a.coverage_lte_messages".into(),
994                    ok: bad == 0,
995                    severity: if bad > 0 {
996                        Severity::Warning
997                    } else {
998                        Severity::Info
999                    },
1000                    details: format!(
1001                        "usage_daily: {bad} rows where api_coverage_message_count > message_count"
1002                    ),
1003                    suggested_action: if bad > 0 {
1004                        Some("Run 'cass analytics rebuild --track a'".into())
1005                    } else {
1006                        None
1007                    },
1008                });
1009            }
1010            Err(err) => {
1011                checks.push(Check {
1012                    id: "track_a.coverage_lte_messages".into(),
1013                    ok: false,
1014                    severity: Severity::Error,
1015                    details: format!("usage_daily coverage query failed: {err}"),
1016                    suggested_action: Some(
1017                        "Run 'cass analytics rebuild --track a' or verify the analytics schema"
1018                            .into(),
1019                    ),
1020                });
1021            }
1022        }
1023    }
1024
1025    // Track B: token_daily_stats non-negative.
1026    if table_exists(conn, "token_daily_stats") {
1027        let cols = [
1028            "api_call_count",
1029            "total_input_tokens",
1030            "total_output_tokens",
1031            "grand_total_tokens",
1032            "total_tool_calls",
1033        ];
1034        let cond = cols
1035            .iter()
1036            .map(|c| format!("{c} < 0"))
1037            .collect::<Vec<_>>()
1038            .join(" OR ");
1039        let sql = format!("SELECT COUNT(*) FROM token_daily_stats WHERE {cond}");
1040        match conn.query_row_map(&sql, &[], |r: &Row| r.get_typed::<i64>(0)) {
1041            Ok(negative_rows) => {
1042                checks.push(Check {
1043                    id: "track_b.non_negative_counters".into(),
1044                    ok: negative_rows == 0,
1045                    severity: if negative_rows > 0 {
1046                        Severity::Error
1047                    } else {
1048                        Severity::Info
1049                    },
1050                    details: format!(
1051                        "token_daily_stats: {negative_rows} rows with negative counters"
1052                    ),
1053                    suggested_action: if negative_rows > 0 {
1054                        Some("Run 'cass analytics rebuild --track all'".into())
1055                    } else {
1056                        None
1057                    },
1058                });
1059            }
1060            Err(err) => {
1061                checks.push(Check {
1062                    id: "track_b.non_negative_counters".into(),
1063                    ok: false,
1064                    severity: Severity::Error,
1065                    details: format!("token_daily_stats negative-counter query failed: {err}"),
1066                    suggested_action: Some(
1067                        "Run 'cass analytics rebuild --track all' or verify the analytics schema"
1068                            .into(),
1069                    ),
1070                });
1071            }
1072        }
1073    }
1074
1075    checks
1076}
1077
1078// ---------------------------------------------------------------------------
1079// Performance guardrails
1080// ---------------------------------------------------------------------------
1081
1082/// A single performance measurement.
1083#[derive(Debug, Clone, Serialize)]
1084pub struct PerfMeasurement {
1085    pub id: String,
1086    pub elapsed_ms: u64,
1087    pub budget_ms: u64,
1088    pub within_budget: bool,
1089    #[serde(skip_serializing_if = "Option::is_none")]
1090    pub error: Option<String>,
1091    pub details: String,
1092}
1093
1094/// Run a performance guardrail check: time a basic timeseries query.
1095pub fn perf_query_guardrail(conn: &Connection) -> PerfMeasurement {
1096    let start = std::time::Instant::now();
1097
1098    // Run a basic rollup query — same as query_tokens_timeseries with no filters.
1099    let budget_ms = 500_u64; // 500ms budget for rollup timeseries query
1100    if !table_exists(conn, "usage_daily") {
1101        let elapsed_ms = start.elapsed().as_millis() as u64;
1102        return PerfMeasurement {
1103            id: "perf.query_timeseries".into(),
1104            elapsed_ms,
1105            budget_ms,
1106            within_budget: true,
1107            error: None,
1108            details: "Skipped timeseries rollup query: usage_daily table missing".into(),
1109        };
1110    }
1111
1112    let sql = "SELECT COUNT(DISTINCT day_id) FROM usage_daily";
1113    let row_count = conn.query_row_map(sql, &[], |r: &Row| r.get_typed::<i64>(0));
1114    let elapsed_ms = start.elapsed().as_millis() as u64;
1115
1116    match row_count {
1117        Ok(row_count) => PerfMeasurement {
1118            id: "perf.query_timeseries".into(),
1119            elapsed_ms,
1120            budget_ms,
1121            within_budget: elapsed_ms <= budget_ms,
1122            error: None,
1123            details: format!("Timeseries rollup query: {row_count} day buckets in {elapsed_ms}ms"),
1124        },
1125        Err(err) => PerfMeasurement {
1126            id: "perf.query_timeseries".into(),
1127            elapsed_ms,
1128            budget_ms,
1129            within_budget: false,
1130            error: Some(err.to_string()),
1131            details: format!("Timeseries rollup query failed after {elapsed_ms}ms: {err}"),
1132        },
1133    }
1134}
1135
1136/// Run a performance guardrail for breakdown queries.
1137pub fn perf_breakdown_guardrail(conn: &Connection) -> PerfMeasurement {
1138    let start = std::time::Instant::now();
1139    let budget_ms = 200_u64;
1140
1141    if !table_exists(conn, "usage_daily") {
1142        let elapsed_ms = start.elapsed().as_millis() as u64;
1143        return PerfMeasurement {
1144            id: "perf.query_breakdown".into(),
1145            elapsed_ms,
1146            budget_ms,
1147            within_budget: true,
1148            error: None,
1149            details: "Skipped breakdown query: usage_daily table missing".into(),
1150        };
1151    }
1152
1153    let sql = "SELECT COUNT(DISTINCT agent_slug) FROM usage_daily";
1154    let row_count = conn.query_row_map(sql, &[], |r: &Row| r.get_typed::<i64>(0));
1155    let elapsed_ms = start.elapsed().as_millis() as u64;
1156
1157    match row_count {
1158        Ok(row_count) => PerfMeasurement {
1159            id: "perf.query_breakdown".into(),
1160            elapsed_ms,
1161            budget_ms,
1162            within_budget: elapsed_ms <= budget_ms,
1163            error: None,
1164            details: format!("Breakdown query: {row_count} agent groups in {elapsed_ms}ms"),
1165        },
1166        Err(err) => PerfMeasurement {
1167            id: "perf.query_breakdown".into(),
1168            elapsed_ms,
1169            budget_ms,
1170            within_budget: false,
1171            error: Some(err.to_string()),
1172            details: format!("Breakdown query failed after {elapsed_ms}ms: {err}"),
1173        },
1174    }
1175}
1176
1177// ---------------------------------------------------------------------------
1178// Tests
1179// ---------------------------------------------------------------------------
1180
1181#[cfg(test)]
1182mod tests {
1183    use super::*;
1184
1185    // -- Fixture helpers --
1186
1187    /// Create a minimal Track A fixture (message_metrics + usage_daily).
1188    fn setup_track_a_fixture() -> Connection {
1189        let conn = Connection::open(":memory:").unwrap();
1190        conn.execute_batch(
1191            "CREATE TABLE message_metrics (
1192                message_id INTEGER PRIMARY KEY,
1193                created_at_ms INTEGER NOT NULL,
1194                hour_id INTEGER NOT NULL,
1195                day_id INTEGER NOT NULL,
1196                agent_slug TEXT NOT NULL,
1197                workspace_id INTEGER NOT NULL DEFAULT 0,
1198                source_id TEXT NOT NULL DEFAULT 'local',
1199                role TEXT NOT NULL,
1200                content_chars INTEGER NOT NULL,
1201                content_tokens_est INTEGER NOT NULL,
1202                api_input_tokens INTEGER,
1203                api_output_tokens INTEGER,
1204                api_cache_read_tokens INTEGER,
1205                api_cache_creation_tokens INTEGER,
1206                api_thinking_tokens INTEGER,
1207                api_service_tier TEXT,
1208                api_data_source TEXT NOT NULL DEFAULT 'estimated',
1209                tool_call_count INTEGER NOT NULL DEFAULT 0,
1210                has_tool_calls INTEGER NOT NULL DEFAULT 0,
1211                has_plan INTEGER NOT NULL DEFAULT 0
1212            );
1213            CREATE TABLE usage_daily (
1214                day_id INTEGER NOT NULL,
1215                agent_slug TEXT NOT NULL,
1216                workspace_id INTEGER NOT NULL DEFAULT 0,
1217                source_id TEXT NOT NULL DEFAULT 'local',
1218                message_count INTEGER NOT NULL DEFAULT 0,
1219                user_message_count INTEGER NOT NULL DEFAULT 0,
1220                assistant_message_count INTEGER NOT NULL DEFAULT 0,
1221                tool_call_count INTEGER NOT NULL DEFAULT 0,
1222                plan_message_count INTEGER NOT NULL DEFAULT 0,
1223                api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
1224                content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
1225                content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
1226                content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
1227                api_tokens_total INTEGER NOT NULL DEFAULT 0,
1228                api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
1229                api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
1230                api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
1231                api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
1232                api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
1233                last_updated INTEGER NOT NULL DEFAULT 0,
1234                PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
1235            );",
1236        )
1237        .unwrap();
1238
1239        // Insert consistent data: 3 messages for claude_code on day 20250.
1240        conn.execute_batch(
1241            "INSERT INTO message_metrics VALUES
1242                (1, 1750000000000, 416666, 20254, 'claude_code', 1, 'local', 'user',   400, 100, NULL, NULL, NULL, NULL, NULL, NULL, 'estimated', 0, 0, 0),
1243                (2, 1750000000001, 416666, 20254, 'claude_code', 1, 'local', 'assistant', 800, 200, 500, 300, 50, 20, 10, NULL, 'api', 3, 1, 0),
1244                (3, 1750000000002, 416666, 20254, 'claude_code', 1, 'local', 'user',   600, 150, NULL, NULL, NULL, NULL, NULL, NULL, 'estimated', 0, 0, 0);
1245            INSERT INTO usage_daily VALUES
1246                (20254, 'claude_code', 1, 'local',
1247                 3, 2, 1, 3, 0, 1,
1248                 450, 250, 200,
1249                 880, 500, 300, 50, 20, 10,
1250                 0);",
1251        )
1252        .unwrap();
1253
1254        conn
1255    }
1256
1257    /// Create a consistent fixture with both Track A and Track B.
1258    fn setup_both_tracks_fixture() -> Connection {
1259        let conn = setup_track_a_fixture();
1260
1261        conn.execute_batch(
1262            "CREATE TABLE agents (
1263                id INTEGER PRIMARY KEY,
1264                slug TEXT NOT NULL UNIQUE
1265            );
1266            INSERT INTO agents VALUES (1, 'claude_code');
1267
1268            CREATE TABLE token_usage (
1269                id INTEGER PRIMARY KEY AUTOINCREMENT,
1270                message_id INTEGER NOT NULL,
1271                conversation_id INTEGER NOT NULL,
1272                agent_id INTEGER NOT NULL,
1273                workspace_id INTEGER,
1274                source_id TEXT NOT NULL DEFAULT 'local',
1275                timestamp_ms INTEGER NOT NULL,
1276                day_id INTEGER NOT NULL,
1277                model_name TEXT,
1278                model_family TEXT,
1279                model_tier TEXT,
1280                service_tier TEXT,
1281                provider TEXT,
1282                input_tokens INTEGER,
1283                output_tokens INTEGER,
1284                cache_read_tokens INTEGER,
1285                cache_creation_tokens INTEGER,
1286                thinking_tokens INTEGER,
1287                total_tokens INTEGER,
1288                estimated_cost_usd REAL,
1289                role TEXT NOT NULL,
1290                content_chars INTEGER NOT NULL,
1291                has_tool_calls INTEGER NOT NULL DEFAULT 0,
1292                tool_call_count INTEGER NOT NULL DEFAULT 0,
1293                data_source TEXT NOT NULL DEFAULT 'api',
1294                UNIQUE(message_id)
1295            );
1296
1297            CREATE TABLE token_daily_stats (
1298                day_id INTEGER NOT NULL,
1299                agent_slug TEXT NOT NULL,
1300                source_id TEXT NOT NULL DEFAULT 'all',
1301                model_family TEXT NOT NULL DEFAULT 'all',
1302                api_call_count INTEGER NOT NULL DEFAULT 0,
1303                user_message_count INTEGER NOT NULL DEFAULT 0,
1304                assistant_message_count INTEGER NOT NULL DEFAULT 0,
1305                tool_message_count INTEGER NOT NULL DEFAULT 0,
1306                total_input_tokens INTEGER NOT NULL DEFAULT 0,
1307                total_output_tokens INTEGER NOT NULL DEFAULT 0,
1308                total_cache_read_tokens INTEGER NOT NULL DEFAULT 0,
1309                total_cache_creation_tokens INTEGER NOT NULL DEFAULT 0,
1310                total_thinking_tokens INTEGER NOT NULL DEFAULT 0,
1311                grand_total_tokens INTEGER NOT NULL DEFAULT 0,
1312                total_content_chars INTEGER NOT NULL DEFAULT 0,
1313                total_tool_calls INTEGER NOT NULL DEFAULT 0,
1314                estimated_cost_usd REAL NOT NULL DEFAULT 0.0,
1315                session_count INTEGER NOT NULL DEFAULT 0,
1316                last_updated INTEGER NOT NULL,
1317                PRIMARY KEY (day_id, agent_slug, source_id, model_family)
1318            );
1319
1320            -- Insert matching token_usage for message 2 (the only api-sourced message).
1321            INSERT INTO token_usage VALUES
1322                (1, 2, 100, 1, 1, 'local', 1750000000001, 20254,
1323                 'claude-opus-4', 'opus', 'opus', NULL, 'anthropic',
1324                 500, 300, 50, 20, 10, 880, 0.05, 'assistant', 800, 1, 3, 'api');
1325
1326            -- Token daily stats matching the token_usage.
1327            INSERT INTO token_daily_stats VALUES
1328                (20254, 'claude_code', 'local', 'opus',
1329                 1, 0, 1, 0,
1330                 500, 300, 50, 20, 10, 880,
1331                 800, 3, 0.05, 1, 0);",
1332        )
1333        .unwrap();
1334
1335        conn
1336    }
1337
1338    // -- Tests --
1339
1340    #[test]
1341    fn consistent_track_a_passes() {
1342        let conn = setup_track_a_fixture();
1343        let config = ValidateConfig::deep();
1344        let report = run_validation(&conn, &config);
1345
1346        // Track A checks should all pass.
1347        let track_a_checks: Vec<_> = report
1348            .checks
1349            .iter()
1350            .filter(|c| c.id.starts_with("track_a."))
1351            .collect();
1352        assert!(!track_a_checks.is_empty());
1353        for c in &track_a_checks {
1354            assert!(c.ok, "Check {} failed: {}", c.id, c.details);
1355        }
1356    }
1357
1358    #[test]
1359    fn drifted_track_a_detects_mismatch() {
1360        let conn = setup_track_a_fixture();
1361
1362        // Inject drift: change usage_daily content_tokens_est_total.
1363        conn.execute("UPDATE usage_daily SET content_tokens_est_total = 9999 WHERE day_id = 20254")
1364            .unwrap();
1365
1366        let config = ValidateConfig::deep();
1367        let report = run_validation(&conn, &config);
1368
1369        let content_check = report
1370            .checks
1371            .iter()
1372            .find(|c| c.id == "track_a.content_tokens_match")
1373            .expect("should have content tokens check");
1374        assert!(!content_check.ok, "Should detect content tokens mismatch");
1375        assert!(content_check.suggested_action.is_some());
1376    }
1377
1378    #[test]
1379    fn drifted_track_a_message_count_detected() {
1380        let conn = setup_track_a_fixture();
1381
1382        // Inject drift: change message_count.
1383        conn.execute("UPDATE usage_daily SET message_count = 999 WHERE day_id = 20254")
1384            .unwrap();
1385
1386        let config = ValidateConfig::deep();
1387        let report = run_validation(&conn, &config);
1388
1389        let msg_check = report
1390            .checks
1391            .iter()
1392            .find(|c| c.id == "track_a.message_count_match")
1393            .expect("should have message count check");
1394        assert!(!msg_check.ok);
1395    }
1396
1397    #[test]
1398    fn consistent_both_tracks_passes() {
1399        let conn = setup_both_tracks_fixture();
1400        let config = ValidateConfig::deep();
1401        let report = run_validation(&conn, &config);
1402
1403        assert!(
1404            report.all_ok(),
1405            "All checks should pass on consistent fixture: {:#?}",
1406            report.checks.iter().filter(|c| !c.ok).collect::<Vec<_>>()
1407        );
1408        assert!(report.drift.is_empty());
1409    }
1410
1411    #[test]
1412    fn cross_track_drift_detected() {
1413        let conn = setup_both_tracks_fixture();
1414
1415        // Inject drift: delete token_usage row (Track B ledger).
1416        conn.execute("DELETE FROM token_usage WHERE id = 1")
1417            .unwrap();
1418        // Also zero out token_daily_stats to be consistent with the deletion.
1419        conn.execute("UPDATE token_daily_stats SET grand_total_tokens = 0 WHERE day_id = 20254")
1420            .unwrap();
1421
1422        let config = ValidateConfig::deep();
1423        let report = run_validation(&conn, &config);
1424
1425        let drift_check = report
1426            .checks
1427            .iter()
1428            .find(|c| c.id == "cross_track.drift")
1429            .expect("should have cross-track drift check");
1430        // Track A has api_tokens_total=880 but Track B now has 0.
1431        assert!(!drift_check.ok, "Should detect cross-track drift");
1432        assert!(!report.drift.is_empty());
1433        assert_eq!(report.drift[0].track_a_total, 880);
1434        assert_eq!(report.drift[0].track_b_total, 0);
1435    }
1436
1437    #[test]
1438    fn negative_counters_detected() {
1439        let conn = setup_track_a_fixture();
1440
1441        // Inject negative counter.
1442        conn.execute("UPDATE usage_daily SET tool_call_count = -5 WHERE day_id = 20254")
1443            .unwrap();
1444
1445        let config = ValidateConfig::deep();
1446        let report = run_validation(&conn, &config);
1447
1448        let neg_check = report
1449            .checks
1450            .iter()
1451            .find(|c| c.id == "track_a.non_negative_counters")
1452            .expect("should have non-negative check");
1453        assert!(!neg_check.ok, "Should detect negative counters");
1454    }
1455
1456    #[test]
1457    fn coverage_exceeding_message_count_detected() {
1458        let conn = setup_track_a_fixture();
1459
1460        // Inject bad data: coverage > message count.
1461        conn.execute(
1462            "UPDATE usage_daily SET api_coverage_message_count = 999 WHERE day_id = 20254",
1463        )
1464        .unwrap();
1465
1466        let config = ValidateConfig::deep();
1467        let report = run_validation(&conn, &config);
1468
1469        let cov_check = report
1470            .checks
1471            .iter()
1472            .find(|c| c.id == "track_a.coverage_lte_messages")
1473            .expect("should have coverage <= messages check");
1474        assert!(!cov_check.ok);
1475    }
1476
1477    #[test]
1478    fn empty_database_reports_missing_tables() {
1479        let conn = Connection::open(":memory:").unwrap();
1480        let config = ValidateConfig::default();
1481        let report = run_validation(&conn, &config);
1482
1483        // Should have error-level checks about missing tables.
1484        let errors: Vec<_> = report
1485            .checks
1486            .iter()
1487            .filter(|c| !c.ok && c.severity == Severity::Error)
1488            .collect();
1489        assert!(!errors.is_empty());
1490    }
1491
1492    #[test]
1493    fn sample_mode_limits_buckets() {
1494        let conn = setup_track_a_fixture();
1495        let config = ValidateConfig {
1496            sample_buckets: 1,
1497            ..Default::default()
1498        };
1499        let report = run_validation(&conn, &config);
1500
1501        assert_eq!(report._meta.sampling.mode, "sample");
1502        // We only have 1 bucket anyway, but the mode should reflect sampling.
1503        assert!(report._meta.sampling.buckets_checked <= 1);
1504    }
1505
1506    #[test]
1507    fn deep_mode_scans_all() {
1508        let conn = setup_track_a_fixture();
1509        let config = ValidateConfig::deep();
1510        let report = run_validation(&conn, &config);
1511
1512        assert_eq!(report._meta.sampling.mode, "deep");
1513    }
1514
1515    #[test]
1516    fn report_json_shape() {
1517        let conn = setup_track_a_fixture();
1518        let config = ValidateConfig::deep();
1519        let report = run_validation(&conn, &config);
1520        let json = report.to_json();
1521
1522        assert!(json["checks"].is_array());
1523        assert!(json["drift"].is_array());
1524        assert!(json["_meta"]["elapsed_ms"].is_number());
1525        assert!(json["_meta"]["sampling"]["mode"].is_string());
1526    }
1527
1528    #[test]
1529    fn perf_query_guardrail_completes() {
1530        let conn = setup_track_a_fixture();
1531        let m = perf_query_guardrail(&conn);
1532        assert!(
1533            m.error.is_none(),
1534            "timeseries guardrail should complete: {}",
1535            m.details
1536        );
1537        assert_eq!(m.id, "perf.query_timeseries");
1538        assert_eq!(m.budget_ms, 500);
1539        assert!(m.details.contains("Timeseries rollup query"));
1540    }
1541
1542    #[test]
1543    fn perf_breakdown_guardrail_completes() {
1544        let conn = setup_track_a_fixture();
1545        let m = perf_breakdown_guardrail(&conn);
1546        assert!(
1547            m.error.is_none(),
1548            "breakdown guardrail should complete: {}",
1549            m.details
1550        );
1551        assert_eq!(m.id, "perf.query_breakdown");
1552        assert_eq!(m.budget_ms, 200);
1553        assert!(m.details.contains("Breakdown query"));
1554    }
1555
1556    #[test]
1557    fn perf_query_guardrail_reports_query_failure() {
1558        let conn = Connection::open(":memory:").unwrap();
1559        conn.execute_batch("CREATE TABLE usage_daily (message_count INTEGER);")
1560            .unwrap();
1561
1562        let m = perf_query_guardrail(&conn);
1563        assert!(!m.within_budget);
1564        assert!(m.error.is_some());
1565        assert!(m.details.contains("failed"));
1566    }
1567
1568    #[test]
1569    fn perf_breakdown_guardrail_reports_query_failure() {
1570        let conn = Connection::open(":memory:").unwrap();
1571        conn.execute_batch("CREATE TABLE usage_daily (api_tokens_total INTEGER);")
1572            .unwrap();
1573
1574        let m = perf_breakdown_guardrail(&conn);
1575        assert!(!m.within_budget);
1576        assert!(m.error.is_some());
1577        assert!(m.details.contains("failed"));
1578    }
1579
1580    #[test]
1581    fn malformed_track_a_schema_reports_query_failure() {
1582        let conn = Connection::open(":memory:").unwrap();
1583        conn.execute_batch(
1584            "CREATE TABLE message_metrics (day_id INTEGER);
1585             CREATE TABLE usage_daily (day_id INTEGER);",
1586        )
1587        .unwrap();
1588
1589        let (checks, checked, total) = validate_track_a(&conn, &ValidateConfig::deep());
1590        let failure = checks
1591            .iter()
1592            .find(|c| c.id == "track_a.query_exec")
1593            .expect("Track A query failure should be reported");
1594
1595        assert!(!failure.ok);
1596        assert_eq!(failure.severity, Severity::Error);
1597        assert_eq!(checked, 0);
1598        assert_eq!(total, 0);
1599    }
1600
1601    #[test]
1602    fn malformed_track_b_schema_reports_query_failure() {
1603        let conn = Connection::open(":memory:").unwrap();
1604        conn.execute_batch(
1605            "CREATE TABLE agents (id INTEGER PRIMARY KEY, slug TEXT NOT NULL UNIQUE);
1606             CREATE TABLE token_usage (day_id INTEGER, agent_id INTEGER, source_id TEXT, model_family TEXT);
1607             CREATE TABLE token_daily_stats (day_id INTEGER, agent_slug TEXT, source_id TEXT, model_family TEXT);",
1608        )
1609        .unwrap();
1610
1611        let (checks, checked, total) = validate_track_b(&conn, &ValidateConfig::deep());
1612        let failure = checks
1613            .iter()
1614            .find(|c| c.id == "track_b.query_exec")
1615            .expect("Track B query failure should be reported");
1616
1617        assert!(!failure.ok);
1618        assert_eq!(failure.severity, Severity::Error);
1619        assert_eq!(checked, 0);
1620        assert_eq!(total, 0);
1621    }
1622
1623    #[test]
1624    fn malformed_cross_track_schema_reports_query_failure() {
1625        let conn = Connection::open(":memory:").unwrap();
1626        conn.execute_batch(
1627            "CREATE TABLE usage_daily (day_id INTEGER);
1628             CREATE TABLE token_daily_stats (day_id INTEGER);",
1629        )
1630        .unwrap();
1631
1632        let (checks, drift) = validate_cross_track_drift(&conn, &ValidateConfig::deep());
1633        let failure = checks
1634            .iter()
1635            .find(|c| c.id == "cross_track.query_exec")
1636            .expect("Cross-track query failure should be reported");
1637
1638        assert!(!failure.ok);
1639        assert_eq!(failure.severity, Severity::Error);
1640        assert!(drift.is_empty());
1641    }
1642
1643    #[test]
1644    fn repair_plan_marks_track_a_failures_fixable() {
1645        let conn = setup_track_a_fixture();
1646        conn.execute("UPDATE usage_daily SET message_count = 999 WHERE day_id = 20254")
1647            .unwrap();
1648
1649        let report = run_validation(&conn, &ValidateConfig::deep());
1650        let plan = build_repair_plan(&report);
1651
1652        let track_a = plan
1653            .decisions
1654            .iter()
1655            .find(|decision| decision.kind == RepairKind::RebuildTrackA)
1656            .expect("track a repair decision");
1657        assert!(plan.apply_track_a_rebuild);
1658        assert!(track_a.fixable);
1659        assert!(
1660            track_a
1661                .check_ids
1662                .contains(&"track_a.message_count_match".to_string())
1663        );
1664    }
1665
1666    #[test]
1667    fn repair_plan_marks_track_b_data_drift_as_rebuild_track_b() {
1668        // Bead m7xrw: Track B rollup drift with an intact token_usage
1669        // ledger is now repairable via `rebuild_token_daily_stats()`,
1670        // not deferred as TrackAllRebuildUnavailable. Deleting only the
1671        // `token_daily_stats` rows (keeping token_usage intact) is the
1672        // textbook repairable scenario.
1673        let conn = setup_both_tracks_fixture();
1674        conn.execute("DELETE FROM token_daily_stats").unwrap();
1675
1676        let report = run_validation(&conn, &ValidateConfig::deep());
1677        let plan = build_repair_plan(&report);
1678
1679        let rebuild_b = plan
1680            .decisions
1681            .iter()
1682            .find(|decision| decision.kind == RepairKind::RebuildTrackB)
1683            .expect("track-b rebuild decision");
1684        assert!(!plan.apply_track_a_rebuild);
1685        assert!(plan.apply_track_b_rebuild);
1686        assert!(rebuild_b.fixable);
1687        assert!(
1688            rebuild_b
1689                .check_ids
1690                .contains(&"track_b.has_data".to_string())
1691        );
1692    }
1693
1694    #[test]
1695    fn repair_plan_marks_track_b_tables_missing_as_unavailable() {
1696        // Bead m7xrw: when the `token_usage` ledger itself is missing
1697        // (not just empty rollups), `rebuild_token_daily_stats()`
1698        // cannot recover — fall through to TrackAllRebuildUnavailable
1699        // which tells the operator to do a full canonical replay.
1700        let conn = setup_both_tracks_fixture();
1701        conn.execute("DROP TABLE token_usage").unwrap();
1702
1703        let report = run_validation(&conn, &ValidateConfig::deep());
1704        let plan = build_repair_plan(&report);
1705
1706        let unavailable = plan
1707            .decisions
1708            .iter()
1709            .find(|decision| decision.kind == RepairKind::TrackAllRebuildUnavailable)
1710            .expect("track-all unavailable decision when ledger missing");
1711        assert!(!plan.apply_track_a_rebuild);
1712        assert!(!plan.apply_track_b_rebuild);
1713        assert!(!unavailable.fixable);
1714        assert!(
1715            unavailable
1716                .check_ids
1717                .contains(&"track_b.tables_exist".to_string())
1718        );
1719    }
1720
1721    #[test]
1722    fn repair_plan_marks_track_a_only_drift_as_fixable() {
1723        let report = ValidationReport {
1724            checks: vec![Check {
1725                id: "cross_track.drift".into(),
1726                ok: false,
1727                severity: Severity::Warning,
1728                details: "drift found".into(),
1729                suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
1730            }],
1731            drift: vec![DriftEntry {
1732                day_id: 20254,
1733                agent_slug: "codex".into(),
1734                source_id: "local".into(),
1735                track_a_total: 0,
1736                track_b_total: 123,
1737                delta: -123,
1738                delta_pct: 100.0,
1739                likely_cause:
1740                    "Track B higher — Track A may have been rebuilt recently without all data"
1741                        .into(),
1742            }],
1743            _meta: ReportMeta {
1744                elapsed_ms: 1,
1745                sampling: SamplingMeta {
1746                    buckets_checked: 1,
1747                    buckets_total: 1,
1748                    mode: "deep".into(),
1749                },
1750                path: "rollup".into(),
1751            },
1752        };
1753
1754        let plan = build_repair_plan(&report);
1755        assert!(plan.apply_track_a_rebuild);
1756        assert_eq!(plan.decisions.len(), 1);
1757        assert_eq!(plan.decisions[0].kind, RepairKind::RebuildTrackA);
1758    }
1759}