Skip to main content

coding_agent_search/analytics/
validate.rs

1//! Analytics validation library.
2//!
3//! Provides deterministic checks for:
4//! - **Track A invariants** — `usage_daily` matches `SUM(message_metrics)`.
5//! - **Track B invariants** — `token_daily_stats` matches `SUM(token_usage)`.
6//! - **Cross-track drift** — Track A vs Track B deltas by day + agent.
7//! - **Performance guardrails** — timing budgets for queries and rebuilds.
8//!
9//! Output is a structured [`ValidationReport`] that serialises to JSON
10//! for `cass analytics validate --json`.
11
12use frankensqlite::Connection;
13use frankensqlite::Row;
14use frankensqlite::compat::{ConnectionExt, RowExt};
15use serde::Serialize;
16use std::collections::BTreeMap;
17
18use super::query::{query_breakdown, query_tokens_timeseries, table_exists};
19use super::types::{AnalyticsFilter, Dim, GroupBy, Metric};
20
21// ---------------------------------------------------------------------------
22// Output types
23// ---------------------------------------------------------------------------
24
25/// Severity level for a single check.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
27#[serde(rename_all = "lowercase")]
28pub enum Severity {
29    Info,
30    Warning,
31    Error,
32}
33
34/// A single validation check result.
35#[derive(Debug, Clone, Serialize)]
36pub struct Check {
37    pub id: String,
38    pub ok: bool,
39    pub severity: Severity,
40    pub details: String,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub suggested_action: Option<String>,
43}
44
45/// A cross-track drift entry.
46#[derive(Debug, Clone, Serialize)]
47pub struct DriftEntry {
48    pub day_id: i64,
49    pub agent_slug: String,
50    pub source_id: String,
51    pub track_a_total: i64,
52    pub track_b_total: i64,
53    pub delta: i64,
54    pub delta_pct: f64,
55    pub likely_cause: String,
56}
57
58/// Sampling metadata.
59#[derive(Debug, Clone, Serialize)]
60pub struct SamplingMeta {
61    pub buckets_checked: usize,
62    pub buckets_total: usize,
63    pub mode: String, // "sample" or "deep"
64}
65
66/// Report metadata.
67#[derive(Debug, Clone, Serialize)]
68pub struct ReportMeta {
69    pub elapsed_ms: u64,
70    pub sampling: SamplingMeta,
71    pub path: String,
72}
73
74/// Full validation report.
75#[derive(Debug, Clone, Serialize)]
76pub struct ValidationReport {
77    pub checks: Vec<Check>,
78    pub drift: Vec<DriftEntry>,
79    pub _meta: ReportMeta,
80}
81
82impl ValidationReport {
83    /// True if every check passed.
84    pub fn all_ok(&self) -> bool {
85        self.checks.iter().all(|c| c.ok)
86    }
87
88    /// Count of checks that failed with a given severity.
89    pub fn count_failures(&self, sev: Severity) -> usize {
90        self.checks
91            .iter()
92            .filter(|c| !c.ok && c.severity == sev)
93            .count()
94    }
95
96    /// Produce the JSON value.
97    pub fn to_json(&self) -> serde_json::Value {
98        serde_json::to_value(self).unwrap_or(serde_json::json!({"error": "serialization failed"}))
99    }
100}
101
102/// Safe automatic repair class for a validation failure.
103#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
104#[serde(rename_all = "snake_case")]
105pub enum RepairKind {
106    RebuildTrackA,
107    /// Track B (`token_daily_stats`) rollup mismatch where the
108    /// underlying `token_usage` ledger is intact. Repairable by
109    /// calling `FrankenStorage::rebuild_token_daily_stats()` — it
110    /// replays the ledger into fresh `token_daily_stats` rows
111    /// transactionally. Bead m7xrw.
112    RebuildTrackB,
113    /// Neither Track A nor Track B rebuild can fix this failure —
114    /// e.g. token_usage ledger itself is missing or corrupt, agents
115    /// table is gone, or the validation query failed to execute.
116    /// Only a full canonical replay (ibuuh.29 / z9fse.13 class work)
117    /// would recover this.
118    TrackAllRebuildUnavailable,
119    ManualReview,
120}
121
122/// Grouped repair decision derived from a validation report.
123#[derive(Debug, Clone, Serialize)]
124pub struct RepairDecision {
125    pub kind: RepairKind,
126    pub fixable: bool,
127    pub check_ids: Vec<String>,
128    pub reason: String,
129}
130
131/// Summary of automatic repair opportunities in a validation report.
132#[derive(Debug, Clone, Serialize)]
133pub struct RepairPlan {
134    pub apply_track_a_rebuild: bool,
135    /// Whether any Track B check can be repaired by replaying the
136    /// `token_usage` ledger into fresh `token_daily_stats` via
137    /// `rebuild_token_daily_stats`. Bead m7xrw.
138    pub apply_track_b_rebuild: bool,
139    pub decisions: Vec<RepairDecision>,
140}
141
142/// Build a safe automatic repair plan from a validation report.
143pub fn build_repair_plan(report: &ValidationReport) -> RepairPlan {
144    let mut grouped: BTreeMap<RepairKind, Vec<String>> = BTreeMap::new();
145
146    for check in report.checks.iter().filter(|check| !check.ok) {
147        let kind = classify_repair_kind(check, report);
148        grouped.entry(kind).or_default().push(check.id.clone());
149    }
150
151    let decisions = grouped
152        .into_iter()
153        .map(|(kind, mut check_ids)| {
154            check_ids.sort();
155            RepairDecision {
156                fixable: matches!(kind, RepairKind::RebuildTrackA | RepairKind::RebuildTrackB),
157                reason: repair_reason(kind).into(),
158                kind,
159                check_ids,
160            }
161        })
162        .collect::<Vec<_>>();
163
164    let apply_track_a_rebuild = decisions
165        .iter()
166        .any(|decision| decision.kind == RepairKind::RebuildTrackA);
167    let apply_track_b_rebuild = decisions
168        .iter()
169        .any(|decision| decision.kind == RepairKind::RebuildTrackB);
170
171    RepairPlan {
172        apply_track_a_rebuild,
173        apply_track_b_rebuild,
174        decisions,
175    }
176}
177
178fn classify_repair_kind(check: &Check, report: &ValidationReport) -> RepairKind {
179    if check.id.starts_with("track_a.") {
180        return RepairKind::RebuildTrackA;
181    }
182
183    if check.id == "cross_track.drift" {
184        if report.drift.iter().all(|entry| {
185            entry.likely_cause.starts_with("Track A missing rows")
186                || entry.likely_cause.starts_with("Track B higher")
187        }) {
188            return RepairKind::RebuildTrackA;
189        }
190        return RepairKind::TrackAllRebuildUnavailable;
191    }
192
193    if check.id.starts_with("track_b.") {
194        // Bead m7xrw: Track B failures where the `token_usage` ledger
195        // is intact are repairable by replaying the ledger into fresh
196        // `token_daily_stats` rows via
197        // `FrankenStorage::rebuild_token_daily_stats()`. Only when the
198        // ledger itself is missing/corrupt or the infrastructure
199        // preconditions fail do we fall back to
200        // `TrackAllRebuildUnavailable` (that would need a full
201        // canonical replay from messages, which is the larger z9fse.13
202        // class of work and is NOT what this repair path provides).
203        match check.id.as_str() {
204            // Infrastructure-level failures that rebuild_token_daily_stats
205            // cannot repair on its own: either the ledger is gone, or a
206            // required joined table is missing, or the query itself
207            // couldn't execute.
208            "track_b.tables_exist" | "track_b.agents_table_missing" | "track_b.query_exec" => {
209                RepairKind::TrackAllRebuildUnavailable
210            }
211            // Every other Track B check ("has_data", "grand_total_match",
212            // "tool_calls_match", "non_negative_counters", and any future
213            // same-shape checks) describes a state rebuild_token_daily_stats
214            // will fix by replaying the intact ledger.
215            _ => RepairKind::RebuildTrackB,
216        }
217    } else {
218        RepairKind::ManualReview
219    }
220}
221
222fn repair_reason(kind: RepairKind) -> &'static str {
223    match kind {
224        RepairKind::RebuildTrackA => {
225            "Track A rollups are derivable from raw messages and can be rebuilt safely."
226        }
227        RepairKind::RebuildTrackB => {
228            "Track B rollups are derivable from the intact token_usage ledger and can be rebuilt safely via rebuild_token_daily_stats()."
229        }
230        RepairKind::TrackAllRebuildUnavailable => {
231            "Track B ledger or cross-track precondition is missing; a full canonical replay is required and is not implemented by --fix. Run 'cass doctor check --json' and restore or repair the canonical archive before rebuilding derived assets."
232        }
233        RepairKind::ManualReview => {
234            "This validation failure does not have a proven automatic repair path."
235        }
236    }
237}
238
239// ---------------------------------------------------------------------------
240// Configuration
241// ---------------------------------------------------------------------------
242
243/// Controls sampling vs deep-scan behaviour.
244#[derive(Debug, Clone)]
245pub struct ValidateConfig {
246    /// Maximum number of (day_id, agent_slug) buckets to check per track.
247    /// `0` means full scan (deep mode).
248    pub sample_buckets: usize,
249    /// Absolute delta threshold below which drift is treated as rounding noise.
250    pub drift_abs_threshold: i64,
251    /// Percentage threshold above which drift is flagged.
252    pub drift_pct_threshold: f64,
253}
254
255impl Default for ValidateConfig {
256    fn default() -> Self {
257        Self {
258            sample_buckets: 20,
259            drift_abs_threshold: 10,
260            drift_pct_threshold: 1.0,
261        }
262    }
263}
264
265impl ValidateConfig {
266    /// Deep-scan mode: check every bucket.
267    pub fn deep() -> Self {
268        Self {
269            sample_buckets: 0,
270            ..Default::default()
271        }
272    }
273}
274
275// ---------------------------------------------------------------------------
276// Main entry point
277// ---------------------------------------------------------------------------
278
279/// Run the full validation suite and return a structured report.
280pub fn run_validation(conn: &Connection, config: &ValidateConfig) -> ValidationReport {
281    let start = std::time::Instant::now();
282    let mut checks = Vec::new();
283    let mut buckets_checked: usize = 0;
284    let mut buckets_total: usize = 0;
285
286    // --- Track A ---
287    let (a_checks, a_checked, a_total) = validate_track_a(conn, config);
288    checks.extend(a_checks);
289    buckets_checked += a_checked;
290    buckets_total += a_total;
291
292    // --- Track B ---
293    let (b_checks, b_checked, b_total) = validate_track_b(conn, config);
294    checks.extend(b_checks);
295    buckets_checked += b_checked;
296    buckets_total += b_total;
297
298    // --- Cross-track drift ---
299    let (d_checks, d_entries) = validate_cross_track_drift(conn, config);
300    checks.extend(d_checks);
301    let drift = d_entries;
302
303    // --- Non-negative counters ---
304    checks.extend(validate_non_negative_counters(conn));
305
306    let elapsed_ms = start.elapsed().as_millis() as u64;
307    let mode = if config.sample_buckets == 0 {
308        "deep"
309    } else {
310        "sample"
311    };
312
313    ValidationReport {
314        checks,
315        drift,
316        _meta: ReportMeta {
317            elapsed_ms,
318            sampling: SamplingMeta {
319                buckets_checked,
320                buckets_total,
321                mode: mode.into(),
322            },
323            path: "rollup".into(),
324        },
325    }
326}
327
328fn query_executes(conn: &Connection, sql: &str) -> Result<(), String> {
329    conn.query_map_collect(sql, &[], |_row: &Row| Ok(()))
330        .map(|_| ())
331        .map_err(|err| err.to_string())
332}
333
334fn query_exec_error_check(id: &str, details: String, suggested_action: &str) -> Check {
335    Check {
336        id: id.into(),
337        ok: false,
338        severity: Severity::Error,
339        details,
340        suggested_action: Some(suggested_action.into()),
341    }
342}
343
344// ---------------------------------------------------------------------------
345// Track A validation
346// ---------------------------------------------------------------------------
347
348/// Validate Track A: `usage_daily` aggregates must match `SUM(message_metrics)`.
349///
350/// Returns `(checks, buckets_checked, buckets_total)`.
351fn validate_track_a(conn: &Connection, config: &ValidateConfig) -> (Vec<Check>, usize, usize) {
352    let mut checks = Vec::new();
353
354    if !table_exists(conn, "usage_daily") || !table_exists(conn, "message_metrics") {
355        checks.push(Check {
356            id: "track_a.tables_exist".into(),
357            ok: false,
358            severity: Severity::Error,
359            details: "Track A tables missing (usage_daily or message_metrics)".into(),
360            suggested_action: Some("Run 'cass analytics rebuild'".into()),
361        });
362        return (checks, 0, 0);
363    }
364
365    checks.push(Check {
366        id: "track_a.tables_exist".into(),
367        ok: true,
368        severity: Severity::Info,
369        details: "Track A tables exist".into(),
370        suggested_action: None,
371    });
372
373    // Get all distinct (day_id, agent_slug, workspace_id, source_id) buckets in usage_daily.
374    let total_buckets: usize = conn
375        .query_row_map("SELECT COUNT(*) FROM usage_daily", &[], |r: &Row| {
376            r.get_typed::<i64>(0).map(|v| v as usize)
377        })
378        .unwrap_or(0);
379
380    let limit_clause = if config.sample_buckets > 0 {
381        format!("LIMIT {}", config.sample_buckets)
382    } else {
383        String::new()
384    };
385
386    // Check content_tokens_est_total invariant.
387    let sql = format!(
388        "SELECT ud.day_id, ud.agent_slug, ud.workspace_id, ud.source_id,
389                ud.content_tokens_est_total,
390                COALESCE(mm.sum_content, 0),
391                ud.message_count,
392                COALESCE(mm.sum_msgs, 0),
393                ud.api_tokens_total,
394                COALESCE(mm.sum_api, 0),
395                ud.api_coverage_message_count,
396                COALESCE(mm.sum_api_coverage, 0)
397         FROM usage_daily ud
398         LEFT JOIN (
399             SELECT day_id, agent_slug, workspace_id, source_id,
400                    SUM(content_tokens_est) AS sum_content,
401                    COUNT(*) AS sum_msgs,
402                    SUM(CASE WHEN api_data_source = 'api'
403                             THEN COALESCE(api_input_tokens, 0)
404                                + COALESCE(api_output_tokens, 0)
405                                + COALESCE(api_cache_read_tokens, 0)
406                                + COALESCE(api_cache_creation_tokens, 0)
407                                + COALESCE(api_thinking_tokens, 0)
408                             ELSE 0 END) AS sum_api,
409                    SUM(CASE WHEN api_data_source = 'api' THEN 1 ELSE 0 END) AS sum_api_coverage
410             FROM message_metrics
411             GROUP BY day_id, agent_slug, workspace_id, source_id
412         ) mm ON ud.day_id = mm.day_id
413              AND ud.agent_slug = mm.agent_slug
414              AND ud.workspace_id = mm.workspace_id
415              AND ud.source_id = mm.source_id
416         ORDER BY ud.day_id DESC
417         {limit_clause}"
418    );
419
420    if total_buckets == 0 {
421        if let Err(err) = query_executes(conn, &sql) {
422            checks.push(query_exec_error_check(
423                "track_a.query_exec",
424                format!("Track A invariant query failed: {err}"),
425                "Run 'cass analytics rebuild --track a' or verify the analytics schema",
426            ));
427            return (checks, 0, 0);
428        }
429
430        checks.push(Check {
431            id: "track_a.has_data".into(),
432            ok: false,
433            severity: Severity::Warning,
434            details: "usage_daily is empty".into(),
435            suggested_action: Some("Run 'cass analytics rebuild'".into()),
436        });
437        return (checks, 0, 0);
438    }
439
440    let mut mismatches_content = 0_usize;
441    let mut mismatches_msg_count = 0_usize;
442    let mut mismatches_api = 0_usize;
443    let mut mismatches_api_cov = 0_usize;
444    let mut checked = 0_usize;
445
446    let rows = match conn.query_map_collect(&sql, &[], |row: &Row| {
447        Ok((
448            row.get_typed::<i64>(0)?,    // day_id
449            row.get_typed::<String>(1)?, // agent_slug
450            row.get_typed::<i64>(4)?,    // ud.content_tokens_est_total
451            row.get_typed::<i64>(5)?,    // mm.sum_content
452            row.get_typed::<i64>(6)?,    // ud.message_count
453            row.get_typed::<i64>(7)?,    // mm.sum_msgs
454            row.get_typed::<i64>(8)?,    // ud.api_tokens_total
455            row.get_typed::<i64>(9)?,    // mm.sum_api
456            row.get_typed::<i64>(10)?,   // ud.api_coverage_message_count
457            row.get_typed::<i64>(11)?,   // mm.sum_api_coverage
458        ))
459    }) {
460        Ok(rows) => rows,
461        Err(err) => {
462            checks.push(query_exec_error_check(
463                "track_a.query_exec",
464                format!("Track A invariant query failed: {err}"),
465                "Run 'cass analytics rebuild --track a' or verify the analytics schema",
466            ));
467            return (checks, 0, total_buckets);
468        }
469    };
470
471    for row in rows {
472        checked += 1;
473        let (
474            _day_id,
475            _agent,
476            ud_content,
477            mm_content,
478            ud_msgs,
479            mm_msgs,
480            ud_api,
481            mm_api,
482            ud_cov,
483            mm_cov,
484        ) = row;
485        if ud_content != mm_content {
486            mismatches_content += 1;
487        }
488        if ud_msgs != mm_msgs {
489            mismatches_msg_count += 1;
490        }
491        if ud_api != mm_api {
492            mismatches_api += 1;
493        }
494        if ud_cov != mm_cov {
495            mismatches_api_cov += 1;
496        }
497    }
498
499    // Content tokens check.
500    checks.push(Check {
501        id: "track_a.content_tokens_match".into(),
502        ok: mismatches_content == 0,
503        severity: if mismatches_content > 0 {
504            Severity::Error
505        } else {
506            Severity::Info
507        },
508        details: format!(
509            "content_tokens_est_total: {mismatches_content}/{checked} buckets mismatched"
510        ),
511        suggested_action: if mismatches_content > 0 {
512            Some("Run 'cass analytics rebuild --track a'".into())
513        } else {
514            None
515        },
516    });
517
518    // Message count check.
519    checks.push(Check {
520        id: "track_a.message_count_match".into(),
521        ok: mismatches_msg_count == 0,
522        severity: if mismatches_msg_count > 0 {
523            Severity::Error
524        } else {
525            Severity::Info
526        },
527        details: format!("message_count: {mismatches_msg_count}/{checked} buckets mismatched"),
528        suggested_action: if mismatches_msg_count > 0 {
529            Some("Run 'cass analytics rebuild --track a'".into())
530        } else {
531            None
532        },
533    });
534
535    // API tokens check.
536    checks.push(Check {
537        id: "track_a.api_tokens_match".into(),
538        ok: mismatches_api == 0,
539        severity: if mismatches_api > 0 {
540            Severity::Error
541        } else {
542            Severity::Info
543        },
544        details: format!("api_tokens_total: {mismatches_api}/{checked} buckets mismatched"),
545        suggested_action: if mismatches_api > 0 {
546            Some("Run 'cass analytics rebuild --track a'".into())
547        } else {
548            None
549        },
550    });
551
552    // API coverage check.
553    checks.push(Check {
554        id: "track_a.api_coverage_match".into(),
555        ok: mismatches_api_cov == 0,
556        severity: if mismatches_api_cov > 0 {
557            Severity::Warning
558        } else {
559            Severity::Info
560        },
561        details: format!(
562            "api_coverage_message_count: {mismatches_api_cov}/{checked} buckets mismatched"
563        ),
564        suggested_action: if mismatches_api_cov > 0 {
565            Some("Run 'cass analytics rebuild --track a'".into())
566        } else {
567            None
568        },
569    });
570
571    (checks, checked, total_buckets)
572}
573
574// ---------------------------------------------------------------------------
575// Track B validation
576// ---------------------------------------------------------------------------
577
578/// Validate Track B: `token_daily_stats` must match `SUM(token_usage)`.
579fn validate_track_b(conn: &Connection, config: &ValidateConfig) -> (Vec<Check>, usize, usize) {
580    let mut checks = Vec::new();
581
582    if !table_exists(conn, "token_daily_stats") || !table_exists(conn, "token_usage") {
583        checks.push(Check {
584            id: "track_b.tables_exist".into(),
585            ok: false,
586            severity: Severity::Error,
587            details: "Track B tables missing (token_daily_stats or token_usage)".into(),
588            suggested_action: Some(
589                "Run 'cass analytics rebuild --track all' (requires z9fse.13)".into(),
590            ),
591        });
592        return (checks, 0, 0);
593    }
594
595    checks.push(Check {
596        id: "track_b.tables_exist".into(),
597        ok: true,
598        severity: Severity::Info,
599        details: "Track B tables exist".into(),
600        suggested_action: None,
601    });
602
603    let total_buckets: usize = conn
604        .query_row_map("SELECT COUNT(*) FROM token_daily_stats", &[], |r: &Row| {
605            r.get_typed::<i64>(0).map(|v| v as usize)
606        })
607        .unwrap_or(0);
608
609    let limit_clause = if config.sample_buckets > 0 {
610        format!("LIMIT {}", config.sample_buckets)
611    } else {
612        String::new()
613    };
614
615    // token_usage uses agent_id (FK) not agent_slug; we need agents table.
616    // If agents table doesn't exist, we fall back to a simpler join.
617    let has_agents_table = table_exists(conn, "agents");
618
619    let sql = if has_agents_table {
620        format!(
621            "SELECT tds.day_id, tds.agent_slug, tds.source_id, tds.model_family,
622                    tds.grand_total_tokens,
623                    COALESCE(tu.sum_total, 0),
624                    tds.total_tool_calls,
625                    COALESCE(tu.sum_tools, 0),
626                    tds.api_call_count,
627                    COALESCE(tu.sum_rows, 0)
628             FROM token_daily_stats tds
629             LEFT JOIN (
630                 SELECT t.day_id,
631                        a.slug AS agent_slug,
632                        t.source_id,
633                        COALESCE(t.model_family, 'unknown') AS model_family,
634                        SUM(COALESCE(t.total_tokens, 0)) AS sum_total,
635                        SUM(t.tool_call_count) AS sum_tools,
636                        COUNT(*) AS sum_rows
637                 FROM token_usage t
638                 JOIN agents a ON a.id = t.agent_id
639                 GROUP BY t.day_id, a.slug, t.source_id, COALESCE(t.model_family, 'unknown')
640             ) tu ON tds.day_id = tu.day_id
641                   AND tds.agent_slug = tu.agent_slug
642                   AND tds.source_id = tu.source_id
643                   AND tds.model_family = tu.model_family
644             ORDER BY tds.day_id DESC
645             {limit_clause}"
646        )
647    } else {
648        // Without agents table, we can't join — skip granular check.
649        checks.push(Check {
650            id: "track_b.agents_table_missing".into(),
651            ok: false,
652            severity: Severity::Warning,
653            details: "agents table not found — cannot validate Track B granular invariants".into(),
654            suggested_action: None,
655        });
656        return (checks, 0, total_buckets);
657    };
658
659    if total_buckets == 0 {
660        if let Err(err) = query_executes(conn, &sql) {
661            checks.push(query_exec_error_check(
662                "track_b.query_exec",
663                format!("Track B invariant query failed: {err}"),
664                "Run 'cass analytics rebuild --track all' or verify the analytics schema",
665            ));
666            return (checks, 0, 0);
667        }
668
669        checks.push(Check {
670            id: "track_b.has_data".into(),
671            ok: false,
672            severity: Severity::Warning,
673            details: "token_daily_stats is empty".into(),
674            suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
675        });
676        return (checks, 0, 0);
677    }
678
679    let mut mismatches_total = 0_usize;
680    let mut mismatches_tools = 0_usize;
681    let mut checked = 0_usize;
682
683    let rows = match conn.query_map_collect(&sql, &[], |row: &Row| {
684        Ok((
685            row.get_typed::<i64>(4)?, // tds.grand_total_tokens
686            row.get_typed::<i64>(5)?, // tu.sum_total
687            row.get_typed::<i64>(6)?, // tds.total_tool_calls
688            row.get_typed::<i64>(7)?, // tu.sum_tools
689        ))
690    }) {
691        Ok(rows) => rows,
692        Err(err) => {
693            checks.push(query_exec_error_check(
694                "track_b.query_exec",
695                format!("Track B invariant query failed: {err}"),
696                "Run 'cass analytics rebuild --track all' or verify the analytics schema",
697            ));
698            return (checks, 0, total_buckets);
699        }
700    };
701
702    for row in rows {
703        checked += 1;
704        let (tds_total, tu_total, tds_tools, tu_tools) = row;
705        if tds_total != tu_total {
706            mismatches_total += 1;
707        }
708        if tds_tools != tu_tools {
709            mismatches_tools += 1;
710        }
711    }
712
713    checks.push(Check {
714        id: "track_b.grand_total_match".into(),
715        ok: mismatches_total == 0,
716        severity: if mismatches_total > 0 {
717            Severity::Error
718        } else {
719            Severity::Info
720        },
721        details: format!("grand_total_tokens: {mismatches_total}/{checked} buckets mismatched"),
722        suggested_action: if mismatches_total > 0 {
723            Some("Run 'cass analytics rebuild --track all'".into())
724        } else {
725            None
726        },
727    });
728
729    checks.push(Check {
730        id: "track_b.tool_calls_match".into(),
731        ok: mismatches_tools == 0,
732        severity: if mismatches_tools > 0 {
733            Severity::Warning
734        } else {
735            Severity::Info
736        },
737        details: format!("total_tool_calls: {mismatches_tools}/{checked} buckets mismatched"),
738        suggested_action: if mismatches_tools > 0 {
739            Some("Run 'cass analytics rebuild --track all'".into())
740        } else {
741            None
742        },
743    });
744
745    (checks, checked, total_buckets)
746}
747
748// ---------------------------------------------------------------------------
749// Cross-track drift detection
750// ---------------------------------------------------------------------------
751
752/// Detect drift between Track A and Track B at the day + agent + source level.
753fn validate_cross_track_drift(
754    conn: &Connection,
755    config: &ValidateConfig,
756) -> (Vec<Check>, Vec<DriftEntry>) {
757    let mut checks = Vec::new();
758    let mut entries = Vec::new();
759
760    let has_a = table_exists(conn, "usage_daily");
761    let has_b = table_exists(conn, "token_daily_stats");
762
763    if !has_a || !has_b {
764        let missing = if !has_a && !has_b {
765            "both tracks"
766        } else if !has_a {
767            "Track A (usage_daily)"
768        } else {
769            "Track B (token_daily_stats)"
770        };
771        checks.push(Check {
772            id: "cross_track.tables_exist".into(),
773            ok: false,
774            severity: Severity::Warning,
775            details: format!("Cannot compute cross-track drift: {missing} missing"),
776            suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
777        });
778        return (checks, entries);
779    }
780
781    let mut drift_count = 0_usize;
782    let mut drift_checked = 0_usize;
783    let mut merged = BTreeMap::<(i64, String, String), (i64, i64)>::new();
784
785    let track_a_rows = match conn.query_map_collect(
786        "SELECT day_id, agent_slug, source_id, SUM(api_tokens_total) AS api_total
787         FROM usage_daily
788         GROUP BY day_id, agent_slug, source_id",
789        &[],
790        |row: &Row| {
791            Ok((
792                row.get_typed::<i64>(0)?,
793                row.get_typed::<String>(1)?,
794                row.get_typed::<String>(2)?,
795                row.get_typed::<i64>(3)?,
796            ))
797        },
798    ) {
799        Ok(rows) => rows,
800        Err(err) => {
801            checks.push(Check {
802                id: "cross_track.query_exec".into(),
803                ok: false,
804                severity: Severity::Error,
805                details: format!("Cross-track drift query failed while reading Track A: {err}"),
806                suggested_action: Some(
807                    "Run 'cass analytics rebuild --track all' or verify the analytics schema"
808                        .into(),
809                ),
810            });
811            return (checks, entries);
812        }
813    };
814
815    for (day_id, agent_slug, source_id, total) in track_a_rows {
816        merged
817            .entry((day_id, agent_slug, source_id))
818            .or_insert((0, 0))
819            .0 = total;
820    }
821
822    let track_b_rows = match conn.query_map_collect(
823        "SELECT day_id, agent_slug, source_id, SUM(grand_total_tokens) AS grand_total
824         FROM token_daily_stats
825         GROUP BY day_id, agent_slug, source_id",
826        &[],
827        |row: &Row| {
828            Ok((
829                row.get_typed::<i64>(0)?,
830                row.get_typed::<String>(1)?,
831                row.get_typed::<String>(2)?,
832                row.get_typed::<i64>(3)?,
833            ))
834        },
835    ) {
836        Ok(rows) => rows,
837        Err(err) => {
838            checks.push(Check {
839                id: "cross_track.query_exec".into(),
840                ok: false,
841                severity: Severity::Error,
842                details: format!("Cross-track drift query failed while reading Track B: {err}"),
843                suggested_action: Some(
844                    "Run 'cass analytics rebuild --track all' or verify the analytics schema"
845                        .into(),
846                ),
847            });
848            return (checks, entries);
849        }
850    };
851
852    for (day_id, agent_slug, source_id, total) in track_b_rows {
853        merged
854            .entry((day_id, agent_slug, source_id))
855            .or_insert((0, 0))
856            .1 = total;
857    }
858
859    let mut rows: Vec<_> = merged.into_iter().collect();
860    rows.sort_by(|left, right| {
861        right
862            .0
863            .0
864            .cmp(&left.0.0)
865            .then_with(|| left.0.1.cmp(&right.0.1))
866            .then_with(|| left.0.2.cmp(&right.0.2))
867    });
868    if config.sample_buckets > 0 && rows.len() > config.sample_buckets {
869        rows.truncate(config.sample_buckets);
870    }
871
872    for ((day_id, agent_slug, source_id), (a_total, b_total)) in rows {
873        drift_checked += 1;
874        let delta = a_total.saturating_sub(b_total);
875        let denom = a_total.max(b_total).max(1);
876        let abs_delta = delta.unsigned_abs();
877        let delta_pct = (abs_delta as f64 / denom as f64) * 100.0;
878
879        if abs_delta > config.drift_abs_threshold as u64 && delta_pct > config.drift_pct_threshold {
880            drift_count += 1;
881            let likely_cause = if a_total > 0 && b_total == 0 {
882                "Track B missing rows (rebuild needed or not yet ingested)"
883            } else if b_total > 0 && a_total == 0 {
884                "Track A missing rows (rebuild needed)"
885            } else if a_total > b_total {
886                "Track A higher — Track B may be stale or missing some messages"
887            } else {
888                "Track B higher — Track A may have been rebuilt recently without all data"
889            };
890
891            entries.push(DriftEntry {
892                day_id,
893                agent_slug,
894                source_id,
895                track_a_total: a_total,
896                track_b_total: b_total,
897                delta,
898                delta_pct: (delta_pct * 100.0).round() / 100.0,
899                likely_cause: likely_cause.into(),
900            });
901        }
902    }
903
904    let total_ok = drift_count == 0;
905    checks.push(Check {
906        id: "cross_track.drift".into(),
907        ok: total_ok,
908        severity: if drift_count > 0 {
909            Severity::Warning
910        } else {
911            Severity::Info
912        },
913        details: format!(
914            "Cross-track drift: {drift_count}/{drift_checked} day+agent+source slices drifted"
915        ),
916        suggested_action: if drift_count > 0 {
917            Some("Run 'cass analytics rebuild --track all' to re-sync both tracks".into())
918        } else {
919            None
920        },
921    });
922
923    (checks, entries)
924}
925
926// ---------------------------------------------------------------------------
927// Non-negative counter checks
928// ---------------------------------------------------------------------------
929
930/// Validate that rollup counters are never negative.
931fn validate_non_negative_counters(conn: &Connection) -> Vec<Check> {
932    let mut checks = Vec::new();
933
934    // Track A: usage_daily non-negative.
935    if table_exists(conn, "usage_daily") {
936        let cols = [
937            "message_count",
938            "user_message_count",
939            "assistant_message_count",
940            "tool_call_count",
941            "plan_message_count",
942            "api_coverage_message_count",
943            "content_tokens_est_total",
944            "api_tokens_total",
945        ];
946        let cond = cols
947            .iter()
948            .map(|c| format!("{c} < 0"))
949            .collect::<Vec<_>>()
950            .join(" OR ");
951        let sql = format!("SELECT COUNT(*) FROM usage_daily WHERE {cond}");
952        match conn.query_row_map(&sql, &[], |r: &Row| r.get_typed::<i64>(0)) {
953            Ok(negative_rows) => {
954                checks.push(Check {
955                    id: "track_a.non_negative_counters".into(),
956                    ok: negative_rows == 0,
957                    severity: if negative_rows > 0 {
958                        Severity::Error
959                    } else {
960                        Severity::Info
961                    },
962                    details: format!("usage_daily: {negative_rows} rows with negative counters"),
963                    suggested_action: if negative_rows > 0 {
964                        Some("Run 'cass analytics rebuild --track a'".into())
965                    } else {
966                        None
967                    },
968                });
969            }
970            Err(err) => {
971                checks.push(Check {
972                    id: "track_a.non_negative_counters".into(),
973                    ok: false,
974                    severity: Severity::Error,
975                    details: format!("usage_daily negative-counter query failed: {err}"),
976                    suggested_action: Some(
977                        "Run 'cass analytics rebuild --track a' or verify the analytics schema"
978                            .into(),
979                    ),
980                });
981            }
982        }
983    }
984
985    // Track A: api_coverage_message_count <= message_count.
986    if table_exists(conn, "usage_daily") {
987        match conn.query_row_map(
988            "SELECT COUNT(*) FROM usage_daily WHERE api_coverage_message_count > message_count",
989            &[],
990            |r: &Row| r.get_typed::<i64>(0),
991        ) {
992            Ok(bad) => {
993                checks.push(Check {
994                    id: "track_a.coverage_lte_messages".into(),
995                    ok: bad == 0,
996                    severity: if bad > 0 {
997                        Severity::Warning
998                    } else {
999                        Severity::Info
1000                    },
1001                    details: format!(
1002                        "usage_daily: {bad} rows where api_coverage_message_count > message_count"
1003                    ),
1004                    suggested_action: if bad > 0 {
1005                        Some("Run 'cass analytics rebuild --track a'".into())
1006                    } else {
1007                        None
1008                    },
1009                });
1010            }
1011            Err(err) => {
1012                checks.push(Check {
1013                    id: "track_a.coverage_lte_messages".into(),
1014                    ok: false,
1015                    severity: Severity::Error,
1016                    details: format!("usage_daily coverage query failed: {err}"),
1017                    suggested_action: Some(
1018                        "Run 'cass analytics rebuild --track a' or verify the analytics schema"
1019                            .into(),
1020                    ),
1021                });
1022            }
1023        }
1024    }
1025
1026    // Track B: token_daily_stats non-negative.
1027    if table_exists(conn, "token_daily_stats") {
1028        let cols = [
1029            "api_call_count",
1030            "total_input_tokens",
1031            "total_output_tokens",
1032            "grand_total_tokens",
1033            "total_tool_calls",
1034        ];
1035        let cond = cols
1036            .iter()
1037            .map(|c| format!("{c} < 0"))
1038            .collect::<Vec<_>>()
1039            .join(" OR ");
1040        let sql = format!("SELECT COUNT(*) FROM token_daily_stats WHERE {cond}");
1041        match conn.query_row_map(&sql, &[], |r: &Row| r.get_typed::<i64>(0)) {
1042            Ok(negative_rows) => {
1043                checks.push(Check {
1044                    id: "track_b.non_negative_counters".into(),
1045                    ok: negative_rows == 0,
1046                    severity: if negative_rows > 0 {
1047                        Severity::Error
1048                    } else {
1049                        Severity::Info
1050                    },
1051                    details: format!(
1052                        "token_daily_stats: {negative_rows} rows with negative counters"
1053                    ),
1054                    suggested_action: if negative_rows > 0 {
1055                        Some("Run 'cass analytics rebuild --track all'".into())
1056                    } else {
1057                        None
1058                    },
1059                });
1060            }
1061            Err(err) => {
1062                checks.push(Check {
1063                    id: "track_b.non_negative_counters".into(),
1064                    ok: false,
1065                    severity: Severity::Error,
1066                    details: format!("token_daily_stats negative-counter query failed: {err}"),
1067                    suggested_action: Some(
1068                        "Run 'cass analytics rebuild --track all' or verify the analytics schema"
1069                            .into(),
1070                    ),
1071                });
1072            }
1073        }
1074    }
1075
1076    checks
1077}
1078
1079// ---------------------------------------------------------------------------
1080// Performance guardrails
1081// ---------------------------------------------------------------------------
1082
1083/// A single performance measurement.
1084#[derive(Debug, Clone, Serialize)]
1085pub struct PerfMeasurement {
1086    pub id: String,
1087    pub elapsed_ms: u64,
1088    pub budget_ms: u64,
1089    pub within_budget: bool,
1090    #[serde(skip_serializing_if = "Option::is_none")]
1091    pub error: Option<String>,
1092    pub details: String,
1093}
1094
1095/// Run a performance guardrail check: time a basic timeseries query.
1096pub fn perf_query_guardrail(conn: &Connection) -> PerfMeasurement {
1097    let start = std::time::Instant::now();
1098
1099    // Exercise the same query path used by `cass analytics tokens` so a
1100    // malformed rollup table cannot pass a shallow schema probe.
1101    let budget_ms = 500_u64; // 500ms budget for rollup timeseries query
1102    if !table_exists(conn, "usage_daily") {
1103        let elapsed_ms = start.elapsed().as_millis() as u64;
1104        return PerfMeasurement {
1105            id: "perf.query_timeseries".into(),
1106            elapsed_ms,
1107            budget_ms,
1108            within_budget: true,
1109            error: None,
1110            details: "Skipped timeseries rollup query: usage_daily table missing".into(),
1111        };
1112    }
1113
1114    let result = query_tokens_timeseries(conn, &AnalyticsFilter::default(), GroupBy::Day);
1115    let elapsed_ms = start.elapsed().as_millis() as u64;
1116
1117    match result {
1118        Ok(result) => PerfMeasurement {
1119            id: "perf.query_timeseries".into(),
1120            elapsed_ms,
1121            budget_ms,
1122            within_budget: elapsed_ms <= budget_ms,
1123            error: None,
1124            details: format!(
1125                "Timeseries rollup query: {} day buckets in {elapsed_ms}ms",
1126                result.buckets.len()
1127            ),
1128        },
1129        Err(err) => PerfMeasurement {
1130            id: "perf.query_timeseries".into(),
1131            elapsed_ms,
1132            budget_ms,
1133            within_budget: false,
1134            error: Some(err.to_string()),
1135            details: format!("Timeseries rollup query failed after {elapsed_ms}ms: {err}"),
1136        },
1137    }
1138}
1139
1140/// Run a performance guardrail for breakdown queries.
1141pub fn perf_breakdown_guardrail(conn: &Connection) -> PerfMeasurement {
1142    let start = std::time::Instant::now();
1143    let budget_ms = 200_u64;
1144
1145    if !table_exists(conn, "usage_daily") {
1146        let elapsed_ms = start.elapsed().as_millis() as u64;
1147        return PerfMeasurement {
1148            id: "perf.query_breakdown".into(),
1149            elapsed_ms,
1150            budget_ms,
1151            within_budget: true,
1152            error: None,
1153            details: "Skipped breakdown query: usage_daily table missing".into(),
1154        };
1155    }
1156
1157    let result = query_breakdown(
1158        conn,
1159        &AnalyticsFilter::default(),
1160        Dim::Agent,
1161        Metric::ApiTotal,
1162        25,
1163    );
1164    let elapsed_ms = start.elapsed().as_millis() as u64;
1165
1166    match result {
1167        Ok(result) => PerfMeasurement {
1168            id: "perf.query_breakdown".into(),
1169            elapsed_ms,
1170            budget_ms,
1171            within_budget: elapsed_ms <= budget_ms,
1172            error: None,
1173            details: format!(
1174                "Breakdown query: {} agent groups in {elapsed_ms}ms",
1175                result.rows.len()
1176            ),
1177        },
1178        Err(err) => PerfMeasurement {
1179            id: "perf.query_breakdown".into(),
1180            elapsed_ms,
1181            budget_ms,
1182            within_budget: false,
1183            error: Some(err.to_string()),
1184            details: format!("Breakdown query failed after {elapsed_ms}ms: {err}"),
1185        },
1186    }
1187}
1188
1189// ---------------------------------------------------------------------------
1190// Tests
1191// ---------------------------------------------------------------------------
1192
1193#[cfg(test)]
1194mod tests {
1195    use super::*;
1196
1197    // -- Fixture helpers --
1198
1199    /// Create a minimal Track A fixture (message_metrics + usage_daily).
1200    fn setup_track_a_fixture() -> Connection {
1201        let conn = Connection::open(":memory:").unwrap();
1202        conn.execute_batch(
1203            "CREATE TABLE message_metrics (
1204                message_id INTEGER PRIMARY KEY,
1205                created_at_ms INTEGER NOT NULL,
1206                hour_id INTEGER NOT NULL,
1207                day_id INTEGER NOT NULL,
1208                agent_slug TEXT NOT NULL,
1209                workspace_id INTEGER NOT NULL DEFAULT 0,
1210                source_id TEXT NOT NULL DEFAULT 'local',
1211                role TEXT NOT NULL,
1212                content_chars INTEGER NOT NULL,
1213                content_tokens_est INTEGER NOT NULL,
1214                api_input_tokens INTEGER,
1215                api_output_tokens INTEGER,
1216                api_cache_read_tokens INTEGER,
1217                api_cache_creation_tokens INTEGER,
1218                api_thinking_tokens INTEGER,
1219                api_service_tier TEXT,
1220                api_data_source TEXT NOT NULL DEFAULT 'estimated',
1221                tool_call_count INTEGER NOT NULL DEFAULT 0,
1222                has_tool_calls INTEGER NOT NULL DEFAULT 0,
1223                has_plan INTEGER NOT NULL DEFAULT 0
1224            );
1225            CREATE TABLE usage_daily (
1226                day_id INTEGER NOT NULL,
1227                agent_slug TEXT NOT NULL,
1228                workspace_id INTEGER NOT NULL DEFAULT 0,
1229                source_id TEXT NOT NULL DEFAULT 'local',
1230                message_count INTEGER NOT NULL DEFAULT 0,
1231                user_message_count INTEGER NOT NULL DEFAULT 0,
1232                assistant_message_count INTEGER NOT NULL DEFAULT 0,
1233                tool_call_count INTEGER NOT NULL DEFAULT 0,
1234                plan_message_count INTEGER NOT NULL DEFAULT 0,
1235                api_coverage_message_count INTEGER NOT NULL DEFAULT 0,
1236                content_tokens_est_total INTEGER NOT NULL DEFAULT 0,
1237                content_tokens_est_user INTEGER NOT NULL DEFAULT 0,
1238                content_tokens_est_assistant INTEGER NOT NULL DEFAULT 0,
1239                api_tokens_total INTEGER NOT NULL DEFAULT 0,
1240                api_input_tokens_total INTEGER NOT NULL DEFAULT 0,
1241                api_output_tokens_total INTEGER NOT NULL DEFAULT 0,
1242                api_cache_read_tokens_total INTEGER NOT NULL DEFAULT 0,
1243                api_cache_creation_tokens_total INTEGER NOT NULL DEFAULT 0,
1244                api_thinking_tokens_total INTEGER NOT NULL DEFAULT 0,
1245                last_updated INTEGER NOT NULL DEFAULT 0,
1246                PRIMARY KEY (day_id, agent_slug, workspace_id, source_id)
1247            );",
1248        )
1249        .unwrap();
1250
1251        // Insert consistent data: 3 messages for claude_code on day 20250.
1252        conn.execute_batch(
1253            "INSERT INTO message_metrics VALUES
1254                (1, 1750000000000, 416666, 20254, 'claude_code', 1, 'local', 'user',   400, 100, NULL, NULL, NULL, NULL, NULL, NULL, 'estimated', 0, 0, 0),
1255                (2, 1750000000001, 416666, 20254, 'claude_code', 1, 'local', 'assistant', 800, 200, 500, 300, 50, 20, 10, NULL, 'api', 3, 1, 0),
1256                (3, 1750000000002, 416666, 20254, 'claude_code', 1, 'local', 'user',   600, 150, NULL, NULL, NULL, NULL, NULL, NULL, 'estimated', 0, 0, 0);
1257            INSERT INTO usage_daily VALUES
1258                (20254, 'claude_code', 1, 'local',
1259                 3, 2, 1, 3, 0, 1,
1260                 450, 250, 200,
1261                 880, 500, 300, 50, 20, 10,
1262                 0);",
1263        )
1264        .unwrap();
1265
1266        conn
1267    }
1268
1269    /// Create a consistent fixture with both Track A and Track B.
1270    fn setup_both_tracks_fixture() -> Connection {
1271        let conn = setup_track_a_fixture();
1272
1273        conn.execute_batch(
1274            "CREATE TABLE agents (
1275                id INTEGER PRIMARY KEY,
1276                slug TEXT NOT NULL UNIQUE
1277            );
1278            INSERT INTO agents VALUES (1, 'claude_code');
1279
1280            CREATE TABLE token_usage (
1281                id INTEGER PRIMARY KEY AUTOINCREMENT,
1282                message_id INTEGER NOT NULL,
1283                conversation_id INTEGER NOT NULL,
1284                agent_id INTEGER NOT NULL,
1285                workspace_id INTEGER,
1286                source_id TEXT NOT NULL DEFAULT 'local',
1287                timestamp_ms INTEGER NOT NULL,
1288                day_id INTEGER NOT NULL,
1289                model_name TEXT,
1290                model_family TEXT,
1291                model_tier TEXT,
1292                service_tier TEXT,
1293                provider TEXT,
1294                input_tokens INTEGER,
1295                output_tokens INTEGER,
1296                cache_read_tokens INTEGER,
1297                cache_creation_tokens INTEGER,
1298                thinking_tokens INTEGER,
1299                total_tokens INTEGER,
1300                estimated_cost_usd REAL,
1301                role TEXT NOT NULL,
1302                content_chars INTEGER NOT NULL,
1303                has_tool_calls INTEGER NOT NULL DEFAULT 0,
1304                tool_call_count INTEGER NOT NULL DEFAULT 0,
1305                data_source TEXT NOT NULL DEFAULT 'api',
1306                UNIQUE(message_id)
1307            );
1308
1309            CREATE TABLE token_daily_stats (
1310                day_id INTEGER NOT NULL,
1311                agent_slug TEXT NOT NULL,
1312                source_id TEXT NOT NULL DEFAULT 'all',
1313                model_family TEXT NOT NULL DEFAULT 'all',
1314                api_call_count INTEGER NOT NULL DEFAULT 0,
1315                user_message_count INTEGER NOT NULL DEFAULT 0,
1316                assistant_message_count INTEGER NOT NULL DEFAULT 0,
1317                tool_message_count INTEGER NOT NULL DEFAULT 0,
1318                total_input_tokens INTEGER NOT NULL DEFAULT 0,
1319                total_output_tokens INTEGER NOT NULL DEFAULT 0,
1320                total_cache_read_tokens INTEGER NOT NULL DEFAULT 0,
1321                total_cache_creation_tokens INTEGER NOT NULL DEFAULT 0,
1322                total_thinking_tokens INTEGER NOT NULL DEFAULT 0,
1323                grand_total_tokens INTEGER NOT NULL DEFAULT 0,
1324                total_content_chars INTEGER NOT NULL DEFAULT 0,
1325                total_tool_calls INTEGER NOT NULL DEFAULT 0,
1326                estimated_cost_usd REAL NOT NULL DEFAULT 0.0,
1327                session_count INTEGER NOT NULL DEFAULT 0,
1328                last_updated INTEGER NOT NULL,
1329                PRIMARY KEY (day_id, agent_slug, source_id, model_family)
1330            );
1331
1332            -- Insert matching token_usage for message 2 (the only api-sourced message).
1333            INSERT INTO token_usage VALUES
1334                (1, 2, 100, 1, 1, 'local', 1750000000001, 20254,
1335                 'claude-opus-4', 'opus', 'opus', NULL, 'anthropic',
1336                 500, 300, 50, 20, 10, 880, 0.05, 'assistant', 800, 1, 3, 'api');
1337
1338            -- Token daily stats matching the token_usage.
1339            INSERT INTO token_daily_stats VALUES
1340                (20254, 'claude_code', 'local', 'opus',
1341                 1, 0, 1, 0,
1342                 500, 300, 50, 20, 10, 880,
1343                 800, 3, 0.05, 1, 0);",
1344        )
1345        .unwrap();
1346
1347        conn
1348    }
1349
1350    // -- Tests --
1351
1352    #[test]
1353    fn consistent_track_a_passes() {
1354        let conn = setup_track_a_fixture();
1355        let config = ValidateConfig::deep();
1356        let report = run_validation(&conn, &config);
1357
1358        // Track A checks should all pass.
1359        let track_a_checks: Vec<_> = report
1360            .checks
1361            .iter()
1362            .filter(|c| c.id.starts_with("track_a."))
1363            .collect();
1364        assert!(!track_a_checks.is_empty());
1365        for c in &track_a_checks {
1366            assert!(c.ok, "Check {} failed: {}", c.id, c.details);
1367        }
1368    }
1369
1370    #[test]
1371    fn drifted_track_a_detects_mismatch() {
1372        let conn = setup_track_a_fixture();
1373
1374        // Inject drift: change usage_daily content_tokens_est_total.
1375        conn.execute("UPDATE usage_daily SET content_tokens_est_total = 9999 WHERE day_id = 20254")
1376            .unwrap();
1377
1378        let config = ValidateConfig::deep();
1379        let report = run_validation(&conn, &config);
1380
1381        let content_check = report
1382            .checks
1383            .iter()
1384            .find(|c| c.id == "track_a.content_tokens_match")
1385            .expect("should have content tokens check");
1386        assert!(!content_check.ok, "Should detect content tokens mismatch");
1387        assert!(content_check.suggested_action.is_some());
1388    }
1389
1390    #[test]
1391    fn drifted_track_a_message_count_detected() {
1392        let conn = setup_track_a_fixture();
1393
1394        // Inject drift: change message_count.
1395        conn.execute("UPDATE usage_daily SET message_count = 999 WHERE day_id = 20254")
1396            .unwrap();
1397
1398        let config = ValidateConfig::deep();
1399        let report = run_validation(&conn, &config);
1400
1401        let msg_check = report
1402            .checks
1403            .iter()
1404            .find(|c| c.id == "track_a.message_count_match")
1405            .expect("should have message count check");
1406        assert!(!msg_check.ok);
1407    }
1408
1409    #[test]
1410    fn consistent_both_tracks_passes() {
1411        let conn = setup_both_tracks_fixture();
1412        let config = ValidateConfig::deep();
1413        let report = run_validation(&conn, &config);
1414
1415        assert!(
1416            report.all_ok(),
1417            "All checks should pass on consistent fixture: {:#?}",
1418            report.checks.iter().filter(|c| !c.ok).collect::<Vec<_>>()
1419        );
1420        assert!(report.drift.is_empty());
1421    }
1422
1423    #[test]
1424    fn cross_track_drift_detected() {
1425        let conn = setup_both_tracks_fixture();
1426
1427        // Inject drift: delete token_usage row (Track B ledger).
1428        conn.execute("DELETE FROM token_usage WHERE id = 1")
1429            .unwrap();
1430        // Also zero out token_daily_stats to be consistent with the deletion.
1431        conn.execute("UPDATE token_daily_stats SET grand_total_tokens = 0 WHERE day_id = 20254")
1432            .unwrap();
1433
1434        let config = ValidateConfig::deep();
1435        let report = run_validation(&conn, &config);
1436
1437        let drift_check = report
1438            .checks
1439            .iter()
1440            .find(|c| c.id == "cross_track.drift")
1441            .expect("should have cross-track drift check");
1442        // Track A has api_tokens_total=880 but Track B now has 0.
1443        assert!(!drift_check.ok, "Should detect cross-track drift");
1444        assert!(!report.drift.is_empty());
1445        assert_eq!(report.drift[0].track_a_total, 880);
1446        assert_eq!(report.drift[0].track_b_total, 0);
1447    }
1448
1449    #[test]
1450    fn negative_counters_detected() {
1451        let conn = setup_track_a_fixture();
1452
1453        // Inject negative counter.
1454        conn.execute("UPDATE usage_daily SET tool_call_count = -5 WHERE day_id = 20254")
1455            .unwrap();
1456
1457        let config = ValidateConfig::deep();
1458        let report = run_validation(&conn, &config);
1459
1460        let neg_check = report
1461            .checks
1462            .iter()
1463            .find(|c| c.id == "track_a.non_negative_counters")
1464            .expect("should have non-negative check");
1465        assert!(!neg_check.ok, "Should detect negative counters");
1466    }
1467
1468    #[test]
1469    fn coverage_exceeding_message_count_detected() {
1470        let conn = setup_track_a_fixture();
1471
1472        // Inject bad data: coverage > message count.
1473        conn.execute(
1474            "UPDATE usage_daily SET api_coverage_message_count = 999 WHERE day_id = 20254",
1475        )
1476        .unwrap();
1477
1478        let config = ValidateConfig::deep();
1479        let report = run_validation(&conn, &config);
1480
1481        let cov_check = report
1482            .checks
1483            .iter()
1484            .find(|c| c.id == "track_a.coverage_lte_messages")
1485            .expect("should have coverage <= messages check");
1486        assert!(!cov_check.ok);
1487    }
1488
1489    #[test]
1490    fn empty_database_reports_missing_tables() {
1491        let conn = Connection::open(":memory:").unwrap();
1492        let config = ValidateConfig::default();
1493        let report = run_validation(&conn, &config);
1494
1495        // Should have error-level checks about missing tables.
1496        let errors: Vec<_> = report
1497            .checks
1498            .iter()
1499            .filter(|c| !c.ok && c.severity == Severity::Error)
1500            .collect();
1501        assert!(!errors.is_empty());
1502    }
1503
1504    #[test]
1505    fn sample_mode_limits_buckets() {
1506        let conn = setup_track_a_fixture();
1507        let config = ValidateConfig {
1508            sample_buckets: 1,
1509            ..Default::default()
1510        };
1511        let report = run_validation(&conn, &config);
1512
1513        assert_eq!(report._meta.sampling.mode, "sample");
1514        // We only have 1 bucket anyway, but the mode should reflect sampling.
1515        assert!(report._meta.sampling.buckets_checked <= 1);
1516    }
1517
1518    #[test]
1519    fn deep_mode_scans_all() {
1520        let conn = setup_track_a_fixture();
1521        let config = ValidateConfig::deep();
1522        let report = run_validation(&conn, &config);
1523
1524        assert_eq!(report._meta.sampling.mode, "deep");
1525    }
1526
1527    #[test]
1528    fn report_json_shape() {
1529        let conn = setup_track_a_fixture();
1530        let config = ValidateConfig::deep();
1531        let report = run_validation(&conn, &config);
1532        let json = report.to_json();
1533
1534        assert!(json["checks"].is_array());
1535        assert!(json["drift"].is_array());
1536        assert!(json["_meta"]["elapsed_ms"].is_number());
1537        assert!(json["_meta"]["sampling"]["mode"].is_string());
1538    }
1539
1540    #[test]
1541    fn perf_query_guardrail_completes() {
1542        let conn = setup_track_a_fixture();
1543        let m = perf_query_guardrail(&conn);
1544        assert!(
1545            m.error.is_none(),
1546            "timeseries guardrail should complete: {}",
1547            m.details
1548        );
1549        assert_eq!(m.id, "perf.query_timeseries");
1550        assert_eq!(m.budget_ms, 500);
1551        assert!(m.details.contains("Timeseries rollup query"));
1552    }
1553
1554    #[test]
1555    fn perf_breakdown_guardrail_completes() {
1556        let conn = setup_track_a_fixture();
1557        let m = perf_breakdown_guardrail(&conn);
1558        assert!(
1559            m.error.is_none(),
1560            "breakdown guardrail should complete: {}",
1561            m.details
1562        );
1563        assert_eq!(m.id, "perf.query_breakdown");
1564        assert_eq!(m.budget_ms, 200);
1565        assert!(m.details.contains("Breakdown query"));
1566    }
1567
1568    #[test]
1569    fn perf_query_guardrail_reports_query_failure() {
1570        let conn = Connection::open(":memory:").unwrap();
1571        conn.execute_batch("CREATE TABLE usage_daily (message_count INTEGER);")
1572            .unwrap();
1573
1574        let m = perf_query_guardrail(&conn);
1575        assert!(!m.within_budget);
1576        assert!(m.error.is_some());
1577        assert!(m.details.contains("failed"));
1578    }
1579
1580    #[test]
1581    fn perf_breakdown_guardrail_reports_query_failure() {
1582        let conn = Connection::open(":memory:").unwrap();
1583        conn.execute_batch("CREATE TABLE usage_daily (api_tokens_total INTEGER);")
1584            .unwrap();
1585
1586        let m = perf_breakdown_guardrail(&conn);
1587        assert!(!m.within_budget);
1588        assert!(m.error.is_some());
1589        assert!(m.details.contains("failed"));
1590    }
1591
1592    #[test]
1593    fn malformed_track_a_schema_reports_query_failure() {
1594        let conn = Connection::open(":memory:").unwrap();
1595        conn.execute_batch(
1596            "CREATE TABLE message_metrics (day_id INTEGER);
1597             CREATE TABLE usage_daily (day_id INTEGER);",
1598        )
1599        .unwrap();
1600
1601        let (checks, checked, total) = validate_track_a(&conn, &ValidateConfig::deep());
1602        let failure = checks
1603            .iter()
1604            .find(|c| c.id == "track_a.query_exec")
1605            .expect("Track A query failure should be reported");
1606
1607        assert!(!failure.ok);
1608        assert_eq!(failure.severity, Severity::Error);
1609        assert_eq!(checked, 0);
1610        assert_eq!(total, 0);
1611    }
1612
1613    #[test]
1614    fn malformed_track_b_schema_reports_query_failure() {
1615        let conn = Connection::open(":memory:").unwrap();
1616        conn.execute_batch(
1617            "CREATE TABLE agents (id INTEGER PRIMARY KEY, slug TEXT NOT NULL UNIQUE);
1618             CREATE TABLE token_usage (day_id INTEGER, agent_id INTEGER, source_id TEXT, model_family TEXT);
1619             CREATE TABLE token_daily_stats (day_id INTEGER, agent_slug TEXT, source_id TEXT, model_family TEXT);",
1620        )
1621        .unwrap();
1622
1623        let (checks, checked, total) = validate_track_b(&conn, &ValidateConfig::deep());
1624        let failure = checks
1625            .iter()
1626            .find(|c| c.id == "track_b.query_exec")
1627            .expect("Track B query failure should be reported");
1628
1629        assert!(!failure.ok);
1630        assert_eq!(failure.severity, Severity::Error);
1631        assert_eq!(checked, 0);
1632        assert_eq!(total, 0);
1633    }
1634
1635    #[test]
1636    fn malformed_cross_track_schema_reports_query_failure() {
1637        let conn = Connection::open(":memory:").unwrap();
1638        conn.execute_batch(
1639            "CREATE TABLE usage_daily (day_id INTEGER);
1640             CREATE TABLE token_daily_stats (day_id INTEGER);",
1641        )
1642        .unwrap();
1643
1644        let (checks, drift) = validate_cross_track_drift(&conn, &ValidateConfig::deep());
1645        let failure = checks
1646            .iter()
1647            .find(|c| c.id == "cross_track.query_exec")
1648            .expect("Cross-track query failure should be reported");
1649
1650        assert!(!failure.ok);
1651        assert_eq!(failure.severity, Severity::Error);
1652        assert!(drift.is_empty());
1653    }
1654
1655    #[test]
1656    fn repair_plan_marks_track_a_failures_fixable() {
1657        let conn = setup_track_a_fixture();
1658        conn.execute("UPDATE usage_daily SET message_count = 999 WHERE day_id = 20254")
1659            .unwrap();
1660
1661        let report = run_validation(&conn, &ValidateConfig::deep());
1662        let plan = build_repair_plan(&report);
1663
1664        let track_a = plan
1665            .decisions
1666            .iter()
1667            .find(|decision| decision.kind == RepairKind::RebuildTrackA)
1668            .expect("track a repair decision");
1669        assert!(plan.apply_track_a_rebuild);
1670        assert!(track_a.fixable);
1671        assert!(
1672            track_a
1673                .check_ids
1674                .contains(&"track_a.message_count_match".to_string())
1675        );
1676    }
1677
1678    #[test]
1679    fn repair_plan_marks_track_b_data_drift_as_rebuild_track_b() {
1680        // Bead m7xrw: Track B rollup drift with an intact token_usage
1681        // ledger is now repairable via `rebuild_token_daily_stats()`,
1682        // not deferred as TrackAllRebuildUnavailable. Deleting only the
1683        // `token_daily_stats` rows (keeping token_usage intact) is the
1684        // textbook repairable scenario.
1685        let conn = setup_both_tracks_fixture();
1686        conn.execute("DELETE FROM token_daily_stats").unwrap();
1687
1688        let report = run_validation(&conn, &ValidateConfig::deep());
1689        let plan = build_repair_plan(&report);
1690
1691        let rebuild_b = plan
1692            .decisions
1693            .iter()
1694            .find(|decision| decision.kind == RepairKind::RebuildTrackB)
1695            .expect("track-b rebuild decision");
1696        assert!(!plan.apply_track_a_rebuild);
1697        assert!(plan.apply_track_b_rebuild);
1698        assert!(rebuild_b.fixable);
1699        assert!(
1700            rebuild_b
1701                .check_ids
1702                .contains(&"track_b.has_data".to_string())
1703        );
1704    }
1705
1706    #[test]
1707    fn repair_plan_marks_track_b_tables_missing_as_unavailable() {
1708        // Bead m7xrw: when the `token_usage` ledger itself is missing
1709        // (not just empty rollups), `rebuild_token_daily_stats()`
1710        // cannot recover — fall through to TrackAllRebuildUnavailable
1711        // which tells the operator to do a full canonical replay.
1712        let conn = setup_both_tracks_fixture();
1713        conn.execute("DROP TABLE token_usage").unwrap();
1714
1715        let report = run_validation(&conn, &ValidateConfig::deep());
1716        let plan = build_repair_plan(&report);
1717
1718        let unavailable = plan
1719            .decisions
1720            .iter()
1721            .find(|decision| decision.kind == RepairKind::TrackAllRebuildUnavailable)
1722            .expect("track-all unavailable decision when ledger missing");
1723        assert!(!plan.apply_track_a_rebuild);
1724        assert!(!plan.apply_track_b_rebuild);
1725        assert!(!unavailable.fixable);
1726        assert!(
1727            unavailable
1728                .check_ids
1729                .contains(&"track_b.tables_exist".to_string())
1730        );
1731    }
1732
1733    #[test]
1734    fn repair_plan_marks_track_a_only_drift_as_fixable() {
1735        let report = ValidationReport {
1736            checks: vec![Check {
1737                id: "cross_track.drift".into(),
1738                ok: false,
1739                severity: Severity::Warning,
1740                details: "drift found".into(),
1741                suggested_action: Some("Run 'cass analytics rebuild --track all'".into()),
1742            }],
1743            drift: vec![DriftEntry {
1744                day_id: 20254,
1745                agent_slug: "codex".into(),
1746                source_id: "local".into(),
1747                track_a_total: 0,
1748                track_b_total: 123,
1749                delta: -123,
1750                delta_pct: 100.0,
1751                likely_cause:
1752                    "Track B higher — Track A may have been rebuilt recently without all data"
1753                        .into(),
1754            }],
1755            _meta: ReportMeta {
1756                elapsed_ms: 1,
1757                sampling: SamplingMeta {
1758                    buckets_checked: 1,
1759                    buckets_total: 1,
1760                    mode: "deep".into(),
1761                },
1762                path: "rollup".into(),
1763            },
1764        };
1765
1766        let plan = build_repair_plan(&report);
1767        assert!(plan.apply_track_a_rebuild);
1768        assert_eq!(plan.decisions.len(), 1);
1769        assert_eq!(plan.decisions[0].kind, RepairKind::RebuildTrackA);
1770    }
1771}