Skip to main content

cortex_store/
verify.rs

1//! Store verification helpers for schema-shape and schema-version gates.
2
3use chrono::{DateTime, Utc};
4use cortex_core::{
5    validate_summary_spans, ConsumerAdvisory, CrossSessionSalience, OutcomeMemoryRelation,
6    SourceAttestation, SourceAuthority, SummarySpan,
7};
8use rusqlite::{params, OptionalExtension};
9
10use crate::{Pool, StoreError, StoreResult};
11
12const SCHEMA_VERSION_TABLES: &[&str] = &["events", "traces"];
13
14const V2_EXPAND_COLUMNS: &[(&str, &str)] = &[
15    ("events", "source_attestation_json"),
16    ("episodes", "summary_spans_json"),
17    ("memories", "summary_spans_json"),
18    ("memories", "cross_session_use_count"),
19    ("memories", "first_used_at"),
20    ("memories", "last_cross_session_use_at"),
21    ("memories", "last_validation_at"),
22    ("memories", "validation_epoch"),
23    ("memories", "blessed_until"),
24    ("context_packs", "consumer_advisory_json"),
25];
26
27const V2_EXPAND_TABLES: &[&str] = &["memory_session_uses", "outcome_memory_relations"];
28
29const V2_BACKFILL_REQUIRED_COLUMNS: &[(&str, &str)] = &[
30    ("events", "source_attestation_json"),
31    ("episodes", "summary_spans_json"),
32    ("memories", "summary_spans_json"),
33    ("memories", "cross_session_use_count"),
34    ("memories", "validation_epoch"),
35    ("context_packs", "consumer_advisory_json"),
36];
37
38const DEFAULT_SCHEMA_V2_TARGET: u16 = 2;
39
40#[derive(Debug, Clone, Copy)]
41struct RequiredTable {
42    name: &'static str,
43    columns: &'static [&'static str],
44}
45
46const REQUIRED_TABLES: &[RequiredTable] = &[
47    RequiredTable {
48        name: "_migrations",
49        columns: &["name", "applied_at"],
50    },
51    RequiredTable {
52        name: "audit_records",
53        columns: &[
54            "id",
55            "operation",
56            "target_ref",
57            "before_hash",
58            "after_hash",
59            "reason",
60            "actor_json",
61            "source_refs_json",
62            "created_at",
63        ],
64    },
65    RequiredTable {
66        name: "authority_key_timeline",
67        columns: &[
68            "key_id",
69            "principal_id",
70            "state",
71            "effective_at",
72            "reason",
73            "audit_ref",
74        ],
75    },
76    RequiredTable {
77        name: "authority_principal_timeline",
78        columns: &[
79            "principal_id",
80            "trust_tier",
81            "effective_at",
82            "trust_review_due_at",
83            "removed_at",
84            "audit_ref",
85        ],
86    },
87    RequiredTable {
88        name: "context_packs",
89        columns: &[
90            "id",
91            "task",
92            "pack_json",
93            "selection_audit",
94            "created_at",
95            // Schema v2 (ADR 0016 consumer advisory) — nullable column required by
96            // the default v2 persistence shape after the atomic cutover.
97            "consumer_advisory_json",
98        ],
99    },
100    RequiredTable {
101        name: "contradictions",
102        columns: &[
103            "id",
104            "left_ref",
105            "right_ref",
106            "contradiction_type",
107            "status",
108            "interpretation",
109            "created_at",
110            "updated_at",
111        ],
112    },
113    RequiredTable {
114        name: "doctrine",
115        columns: &[
116            "id",
117            "source_principle",
118            "rule",
119            "force",
120            "promotion_reason",
121            "promoted_by_json",
122            "created_at",
123        ],
124    },
125    RequiredTable {
126        name: "episodes",
127        columns: &[
128            "id",
129            "trace_id",
130            "source_events_json",
131            "summary",
132            "domains_json",
133            "entities_json",
134            "candidate_meaning",
135            "extracted_by_json",
136            "confidence",
137            "status",
138            // Schema v2 (ADR 0015 span-level summary provenance) — nullable.
139            "summary_spans_json",
140        ],
141    },
142    RequiredTable {
143        name: "events",
144        columns: &[
145            "id",
146            "schema_version",
147            "observed_at",
148            "recorded_at",
149            "source_json",
150            "event_type",
151            "trace_id",
152            "session_id",
153            "domain_tags_json",
154            "payload_json",
155            "payload_hash",
156            "prev_event_hash",
157            "event_hash",
158            // Schema v2 (ADR 0014 source attestation) — nullable column required
159            // by the default v2 persistence shape after the atomic cutover.
160            "source_attestation_json",
161        ],
162    },
163    RequiredTable {
164        name: "memories",
165        columns: &[
166            "id",
167            "memory_type",
168            "status",
169            "claim",
170            "source_episodes_json",
171            "source_events_json",
172            "domains_json",
173            "salience_json",
174            "confidence",
175            "authority",
176            "applies_when_json",
177            "does_not_apply_when_json",
178            "created_at",
179            "updated_at",
180            // Schema v2 (ADR 0015 spans + ADR 0017 cross-session salience) — all
181            // nullable, all required as columns after the atomic cutover.
182            "summary_spans_json",
183            "cross_session_use_count",
184            "first_used_at",
185            "last_cross_session_use_at",
186            "last_validation_at",
187            "validation_epoch",
188            "blessed_until",
189        ],
190    },
191    RequiredTable {
192        name: "memory_session_uses",
193        // Schema v2 (ADR 0017) cross-session salience side table.
194        columns: &[
195            "memory_id",
196            "session_id",
197            "first_used_at",
198            "last_used_at",
199            "use_count",
200        ],
201    },
202    RequiredTable {
203        name: "outcome_memory_relations",
204        // Schema v2 (ADR 0020) outcome-bound salience side table.
205        // The trailing three columns are the ADR 0020 §6 scoped-validation
206        // payload bound by migration 005_outcome_relation_scope (Phase 2.6 D1
207        // closure).
208        columns: &[
209            "outcome_ref",
210            "memory_id",
211            "relation",
212            "recorded_at",
213            "source_event_id",
214            "validation_scope",
215            "validating_principal_id",
216            "evidence_ref",
217        ],
218    },
219    RequiredTable {
220        name: "principles",
221        columns: &[
222            "id",
223            "statement",
224            "status",
225            "supporting_memories_json",
226            "contradicting_memories_json",
227            "domains_observed_json",
228            "applies_when_json",
229            "does_not_apply_when_json",
230            "confidence",
231            "validation",
232            "brightness",
233            "created_by_json",
234            "created_at",
235            "updated_at",
236        ],
237    },
238    RequiredTable {
239        name: "trace_events",
240        columns: &["trace_id", "event_id", "ordinal"],
241    },
242    RequiredTable {
243        name: "traces",
244        columns: &[
245            "id",
246            "schema_version",
247            "opened_at",
248            "closed_at",
249            "trace_type",
250            "status",
251        ],
252    },
253];
254
255/// Result of checking schema shape and row-level schema versions against the binary.
256#[derive(Debug, Clone, PartialEq, Eq)]
257pub struct SchemaVersionReport {
258    /// Schema version expected by this binary.
259    pub expected: u16,
260    /// Tables inspected by the verifier.
261    pub checked_tables: Vec<&'static str>,
262    /// Schema-shape or schema-version invariant failures.
263    pub failures: Vec<SchemaVersionFailure>,
264}
265
266impl SchemaVersionReport {
267    /// Returns true when schema shape is present and all inspected row-level versions match.
268    #[must_use]
269    pub fn is_ok(&self) -> bool {
270        self.failures.is_empty()
271    }
272}
273
274/// Schema-shape or schema-version invariant failure.
275#[allow(missing_docs)]
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub enum SchemaVersionFailure {
278    /// A required table is missing.
279    MissingTable { table: &'static str },
280    /// A required table is missing a required column.
281    MissingColumn {
282        table: &'static str,
283        column: &'static str,
284    },
285    /// The migrations table contains a migration this binary does not know.
286    UnknownMigration { name: String },
287    /// A persisted row carries a different schema version than this binary.
288    Mismatch {
289        table: &'static str,
290        row_id: String,
291        expected: u16,
292        actual: u16,
293    },
294    /// Schema v2 expand/backfill metadata is present but not complete enough to trust.
295    IllegalIntermediateShape {
296        invariant: &'static str,
297        detail: String,
298    },
299}
300
301/// Rollback/fork metadata available to a caller that is comparing current state
302/// against a candidate state before permitting restore, doctor, or write flows.
303#[derive(Debug, Clone, Default, PartialEq, Eq)]
304pub struct RollbackForkState {
305    /// Declared schema version for the state, if the caller has one.
306    pub schema_version: Option<u16>,
307    /// Explicit fork marker detected in metadata or state, if any.
308    pub fork_marker: Option<ForkMarker>,
309}
310
311/// Explicit marker that state belongs to, or is attempting to enter, a forked lineage.
312#[derive(Debug, Clone, PartialEq, Eq)]
313pub struct ForkMarker {
314    /// Stable marker value from the metadata source that found it.
315    pub marker: String,
316}
317
318impl ForkMarker {
319    /// Creates a fork marker from caller-owned metadata.
320    #[must_use]
321    pub fn new(marker: impl Into<String>) -> Self {
322        Self {
323            marker: marker.into(),
324        }
325    }
326}
327
328/// Result of checking ADR 0033 rollback/fork refusal invariants.
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct RollbackForkReport {
331    /// Refusal invariant failures.
332    pub failures: Vec<RollbackForkFailure>,
333}
334
335impl RollbackForkReport {
336    /// Returns true when no rollback or fork marker was detected.
337    #[must_use]
338    pub fn is_ok(&self) -> bool {
339        self.failures.is_empty()
340    }
341}
342
343/// Rollback/fork refusal invariant failure.
344#[allow(missing_docs)]
345#[derive(Debug, Clone, PartialEq, Eq)]
346pub enum RollbackForkFailure {
347    /// Candidate state declares a lower schema version than current state.
348    SchemaVersionRollback { current: u16, candidate: u16 },
349    /// A fork marker is present where silent continuation would be unsafe.
350    ForkMarkerPresent {
351        location: RollbackForkStateLocation,
352        marker: String,
353    },
354}
355
356impl RollbackForkFailure {
357    /// Stable invariant name for operator output and tests.
358    #[must_use]
359    pub fn invariant(&self) -> &'static str {
360        match self {
361            Self::SchemaVersionRollback { .. } => "rollback.schema_version.not_decreased",
362            Self::ForkMarkerPresent { .. } => "fork.marker.absent",
363        }
364    }
365
366    /// Human-readable failure detail.
367    #[must_use]
368    pub fn detail(&self) -> String {
369        match self {
370            Self::SchemaVersionRollback { current, candidate } => format!(
371                "candidate schema_version {candidate} is lower than current schema_version {current}"
372            ),
373            Self::ForkMarkerPresent { location, marker } => {
374                format!("{location} state contains fork marker {marker}")
375            }
376        }
377    }
378}
379
380/// Which side of a rollback/fork comparison carried a refusal marker.
381#[derive(Debug, Clone, Copy, PartialEq, Eq)]
382pub enum RollbackForkStateLocation {
383    /// The currently active state.
384    Current,
385    /// The candidate state being evaluated.
386    Candidate,
387}
388
389impl std::fmt::Display for RollbackForkStateLocation {
390    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
391        match self {
392            Self::Current => formatter.write_str("current"),
393            Self::Candidate => formatter.write_str("candidate"),
394        }
395    }
396}
397
398impl SchemaVersionFailure {
399    /// Stable invariant name for operator output and tests.
400    #[must_use]
401    pub fn invariant(&self) -> String {
402        match self {
403            Self::MissingTable { table } => format!("schema_shape.{table}.exists"),
404            Self::MissingColumn { table, column } => {
405                format!("schema_shape.{table}.{column}.exists")
406            }
407            Self::UnknownMigration { .. } => "schema_migration.known_to_code".into(),
408            Self::Mismatch { table, .. } => {
409                format!("schema_version.{table}.matches_code")
410            }
411            Self::IllegalIntermediateShape { invariant, .. } => (*invariant).into(),
412        }
413    }
414
415    /// Human-readable failure detail.
416    #[must_use]
417    pub fn detail(&self) -> String {
418        match self {
419            Self::MissingTable { table } => {
420                format!("required table {table} is missing")
421            }
422            Self::MissingColumn { table, column } => {
423                format!("table {table} is missing required column {column}")
424            }
425            Self::UnknownMigration { name } => {
426                format!("migration {name} is unknown to this binary")
427            }
428            Self::Mismatch {
429                table,
430                row_id,
431                expected,
432                actual,
433            } => format!(
434                "table {table} row {row_id} has schema_version {actual}; expected {expected}"
435            ),
436            Self::IllegalIntermediateShape { detail, .. } => detail.clone(),
437        }
438    }
439}
440
441/// Pre-migrate per-table row counts captured by the pre-v2 backup manifest.
442///
443/// The four fields mirror the `table_row_counts` payload emitted by
444/// `cortex backup --output` and validated by
445/// `cortex migrate v2 --backup-manifest`. They are the input side of the
446/// post-migrate count-mismatch refusal helper:
447/// [`verify_post_migrate_row_counts`] compares this baseline against the
448/// post-migrate store and refuses cutover whenever any table drifts beyond the
449/// documented v1 -> v2 transformation (only `events` may grow, by exactly the
450/// boundary-append delta).
451#[derive(Debug, Clone, Copy, PartialEq, Eq)]
452pub struct PreV2BackupRowCounts {
453    /// Pre-migrate `events` row count.
454    pub events: u64,
455    /// Pre-migrate `traces` row count.
456    pub traces: u64,
457    /// Pre-migrate `episodes` row count.
458    pub episodes: u64,
459    /// Pre-migrate `memories` row count.
460    pub memories: u64,
461}
462
463/// Documented delta the v1 -> v2 boundary append applies to the `events` count.
464///
465/// The boundary append writes exactly one new event row (the
466/// `schema_migration.v1_to_v2` payload). No other counted table changes.
467pub const SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA: u64 = 1;
468
469/// Per-table count-mismatch refusal failure for the post-migrate verifier.
470#[derive(Debug, Clone, PartialEq, Eq)]
471pub struct PostMigrateCountMismatchFailure {
472    /// Table that drifted.
473    pub table: &'static str,
474    /// Row count recorded in the pre-v2 backup manifest.
475    pub expected: u64,
476    /// Row count observed after migration in the active store.
477    pub actual: u64,
478    /// Stable invariant name surfaced to operators and tests.
479    pub invariant: &'static str,
480}
481
482impl PostMigrateCountMismatchFailure {
483    /// Stable invariant name for this failure class.
484    #[must_use]
485    pub fn invariant(&self) -> &'static str {
486        self.invariant
487    }
488
489    /// Human-readable detail formatted for CLI/operator output.
490    #[must_use]
491    pub fn detail(&self) -> String {
492        format!(
493            "table {table} has {actual} rows after migrate; backup manifest recorded {expected} rows before migrate",
494            table = self.table,
495            actual = self.actual,
496            expected = self.expected,
497        )
498    }
499}
500
501/// Compare a post-migrate store against the pre-v2 backup-manifest counts.
502///
503/// The `events` table is allowed to grow by exactly
504/// [`SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA`] (the boundary append). Every other
505/// counted table must match the pre-migrate baseline byte-for-row. Any drift
506/// fails closed with a stable invariant name and structured detail so the
507/// post-migrate doctor and CLI can surface it without parsing free text.
508pub fn verify_post_migrate_row_counts(
509    pool: &Pool,
510    pre: &PreV2BackupRowCounts,
511) -> StoreResult<Vec<PostMigrateCountMismatchFailure>> {
512    let observed = PreV2BackupRowCounts {
513        events: read_table_count(pool, "events")?,
514        traces: read_table_count(pool, "traces")?,
515        episodes: read_table_count(pool, "episodes")?,
516        memories: read_table_count(pool, "memories")?,
517    };
518
519    let mut failures = Vec::new();
520    // Honest overflow refusal (ADR 0033 §1, red-team B3): a pre-migrate count
521    // at `u64::MAX` cannot legitimately grow by the boundary-append delta, so
522    // saturating would silently invent a passing target. `checked_add` returns
523    // `None` and we refuse with a typed error carrying the stable invariant
524    // `verify.row_counts.checked_add_overflow` rather than masking the overflow
525    // as a count-mismatch with a contrived `expected` operand.
526    let expected_events_after = match pre.events.checked_add(SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA) {
527        Some(value) => value,
528        None => {
529            return Err(StoreError::RowCountCheckedAddOverflow {
530                table: "events",
531                pre: pre.events,
532                delta: SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA,
533            });
534        }
535    };
536    if observed.events != expected_events_after {
537        failures.push(PostMigrateCountMismatchFailure {
538            table: "events",
539            expected: expected_events_after,
540            actual: observed.events,
541            invariant: "schema_v2_post_migrate.row_count.events.matches_pre_plus_boundary",
542        });
543    }
544    for (table, expected, actual, invariant) in [
545        (
546            "traces",
547            pre.traces,
548            observed.traces,
549            "schema_v2_post_migrate.row_count.traces.unchanged",
550        ),
551        (
552            "episodes",
553            pre.episodes,
554            observed.episodes,
555            "schema_v2_post_migrate.row_count.episodes.unchanged",
556        ),
557        (
558            "memories",
559            pre.memories,
560            observed.memories,
561            "schema_v2_post_migrate.row_count.memories.unchanged",
562        ),
563    ] {
564        if expected != actual {
565            failures.push(PostMigrateCountMismatchFailure {
566                table,
567                expected,
568                actual,
569                invariant,
570            });
571        }
572    }
573    Ok(failures)
574}
575
576fn read_table_count(pool: &Pool, table: &'static str) -> StoreResult<u64> {
577    let sql = format!("SELECT COUNT(*) FROM {table};");
578    pool.query_row(&sql, [], |row| row.get(0))
579        .map_err(Into::into)
580}
581
582/// Per-table v2-row population observed by [`count_post_v2_rows_outside_boundary`].
583///
584/// All four counters describe rows whose persisted `schema_version` column is
585/// `>= 2`, with the well-known schema-migration boundary row excluded from
586/// `events_post_v2`. A fresh-v2 store (no v1 history, no boundary append)
587/// reports non-zero counters; a pre-v2 store with the canonical schema_version=1
588/// rows reports zero across the board. The post-cutover store reports zero
589/// post-v2 rows in `events` outside the boundary until normal v2 writes begin.
590#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
591pub struct PostV2RowPopulation {
592    /// Rows in `events` with `schema_version >= 2`, excluding the v1 -> v2
593    /// schema-migration boundary row identified by
594    /// `payload_json.kind = 'schema_migration.v1_to_v2'`.
595    pub events_post_v2: u64,
596    /// Rows in `traces` with `schema_version >= 2`.
597    pub traces_post_v2: u64,
598}
599
600impl PostV2RowPopulation {
601    /// Returns true when the inspected store has zero v2 rows outside the
602    /// boundary across every counted table.
603    #[must_use]
604    pub fn is_empty(&self) -> bool {
605        self.events_post_v2 == 0 && self.traces_post_v2 == 0
606    }
607
608    /// Total post-v2 row count across all counted tables, excluding the
609    /// boundary row in `events`. Useful for one-line operator output.
610    #[must_use]
611    pub fn total(&self) -> u64 {
612        self.events_post_v2.saturating_add(self.traces_post_v2)
613    }
614}
615
616/// Count rows whose `schema_version` is post-v1 (>= 2) outside the
617/// schema-migration boundary row.
618///
619/// R1 (RED_TEAM_FINDINGS phase B): a fresh-v2 store (no v1 history, no
620/// boundary append, every write already v2) generates a
621/// `kind=cortex_pre_v2_backup` manifest because `cortex backup` decides
622/// kind solely by JSONL boundary presence. Feeding that manifest to
623/// `cortex migrate v2 --backup-manifest` would cut a boundary on top of
624/// v2 rows the boundary falsely claims were v1. This helper gives
625/// `cortex migrate v2` the data it needs to refuse closed at validate
626/// time: any non-zero counter is grounds for the
627/// `migrate.v2.backup_manifest.pre_v2_kind_but_v2_rows_present` refusal.
628///
629/// The boundary row itself is identified by
630/// `payload_json -> '$.kind' = 'schema_migration.v1_to_v2'` and is
631/// excluded from `events_post_v2`; the boundary's existence is checked
632/// independently by `boundary_preflight`.
633pub fn count_post_v2_rows_outside_boundary(pool: &Pool) -> StoreResult<PostV2RowPopulation> {
634    const BOUNDARY_PAYLOAD_KIND: &str = "schema_migration.v1_to_v2";
635    let events_post_v2: u64 = pool.query_row(
636        "SELECT COUNT(*) FROM events \
637             WHERE schema_version >= 2 \
638               AND (json_extract(payload_json, '$.kind') IS NULL \
639                    OR json_extract(payload_json, '$.kind') != ?1);",
640        params![BOUNDARY_PAYLOAD_KIND],
641        |row| row.get(0),
642    )?;
643    let traces_post_v2: u64 = pool.query_row(
644        "SELECT COUNT(*) FROM traces WHERE schema_version >= 2;",
645        [],
646        |row| row.get(0),
647    )?;
648    Ok(PostV2RowPopulation {
649        events_post_v2,
650        traces_post_v2,
651    })
652}
653
654/// Verify pure rollback/fork refusal metadata.
655///
656/// Missing metadata is non-failing: callers can pass only the fields they have.
657/// When both schema versions are available, the candidate must not move
658/// backward. Any explicit fork marker fails closed until a future caller handles
659/// fork isolation or merge provenance.
660#[must_use]
661pub fn verify_rollback_fork_refusal(
662    current: &RollbackForkState,
663    candidate: &RollbackForkState,
664) -> RollbackForkReport {
665    let mut failures = Vec::new();
666
667    if let (Some(current), Some(candidate)) = (current.schema_version, candidate.schema_version) {
668        if candidate < current {
669            failures.push(RollbackForkFailure::SchemaVersionRollback { current, candidate });
670        }
671    }
672
673    if let Some(marker) = &current.fork_marker {
674        failures.push(RollbackForkFailure::ForkMarkerPresent {
675            location: RollbackForkStateLocation::Current,
676            marker: marker.marker.clone(),
677        });
678    }
679
680    if let Some(marker) = &candidate.fork_marker {
681        failures.push(RollbackForkFailure::ForkMarkerPresent {
682            location: RollbackForkStateLocation::Candidate,
683            marker: marker.marker.clone(),
684        });
685    }
686
687    RollbackForkReport { failures }
688}
689
690/// Verify required schema shape and row-level schema versions.
691pub fn verify_schema_version(pool: &Pool, expected: u16) -> StoreResult<SchemaVersionReport> {
692    let mut checked_tables = Vec::new();
693    let mut failures = Vec::new();
694
695    for table in REQUIRED_TABLES {
696        checked_tables.push(table.name);
697        if !has_table(pool, table.name)? {
698            failures.push(SchemaVersionFailure::MissingTable { table: table.name });
699            continue;
700        }
701
702        for column in table.columns {
703            if !has_column(pool, table.name, column)? {
704                failures.push(SchemaVersionFailure::MissingColumn {
705                    table: table.name,
706                    column,
707                });
708            }
709        }
710    }
711
712    // Unknown-migration refusal (ADR 0033 §5 "DDL mismatch vs declared
713    // revision for version" => Reject, ADR 0033 §6 forward-compatibility).
714    // Any row in `_migrations` whose name is not in this binary's
715    // `known_migration_names()` means the store has been advanced by a newer
716    // binary; the current binary cannot safely frame writes against it and
717    // refuses structurally — even before any row-level schema_version scan
718    // observes a forward row. This closes the v1-on-v2 race window the
719    // 2026-05-12 red-team flagged: between the JSONL boundary append and the
720    // SQLite boundary mirror, the events table has no schema_version=2 rows
721    // yet, but `_migrations` already contains `003_schema_v2_expand` and the
722    // v1 binary refuses on that signal alone.
723    if has_table(pool, "_migrations")? && has_column(pool, "_migrations", "name")? {
724        let known = crate::migrate::known_migration_names();
725        let mut stmt = pool.prepare("SELECT name FROM _migrations ORDER BY name;")?;
726        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
727        for row in rows {
728            let name = row?;
729            if !known.iter().any(|known_name| *known_name == name) {
730                failures.push(SchemaVersionFailure::UnknownMigration { name });
731            }
732        }
733    }
734
735    // Schema v2 atomic cutover (ADR 0018, ADR 0033 §6): a row with
736    // `schema_version > expected` is a future row this binary cannot frame
737    // (forward-only refusal). A row with `schema_version < expected` is a
738    // legitimate historical row from a pre-cutover lineage and is verified
739    // by the ledger's mixed-framing dispatch, not by this strict-equality
740    // shape check. Reject only the forward case.
741    for table in SCHEMA_VERSION_TABLES {
742        if !has_column(pool, table, "schema_version")? {
743            continue;
744        }
745
746        let sql = format!(
747            "SELECT id, schema_version FROM {table} \
748             WHERE schema_version > ?1 \
749             ORDER BY id;"
750        );
751        let mut stmt = pool.prepare(&sql)?;
752        let mismatches = stmt.query_map(params![expected], |row| {
753            Ok(SchemaVersionFailure::Mismatch {
754                table,
755                row_id: row.get(0)?,
756                expected,
757                actual: row.get(1)?,
758            })
759        })?;
760
761        for mismatch in mismatches {
762            failures.push(mismatch?);
763        }
764    }
765
766    Ok(SchemaVersionReport {
767        expected,
768        checked_tables,
769        failures,
770    })
771}
772
773/// Verify that an explicit schema v2 expand/backfill is either absent or complete.
774///
775/// This is a pre-cutover ADR 0033 guard. The normal v1 schema is valid when no
776/// v2 expand artifacts exist. Once any v2 expand artifact exists, the helper
777/// requires the whole nullable shape and the mandatory conservative backfills,
778/// so a half-applied expand fails closed instead of being treated as a clean v1
779/// fixture.
780pub fn verify_schema_v2_expand_shape(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
781    let mut failures = Vec::new();
782    let mut present_artifacts = 0;
783
784    for (table, column) in V2_EXPAND_COLUMNS {
785        if has_column(pool, table, column)? {
786            present_artifacts += 1;
787        }
788    }
789    for table in V2_EXPAND_TABLES {
790        if has_table(pool, table)? {
791            present_artifacts += 1;
792        }
793    }
794
795    if present_artifacts == 0 {
796        return Ok(failures);
797    }
798
799    for (table, column) in V2_EXPAND_COLUMNS {
800        if !has_column(pool, table, column)? {
801            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
802                invariant: "schema_v2_expand_shape.complete",
803                detail: format!(
804                    "partial schema v2 expand shape is missing column {table}.{column}"
805                ),
806            });
807        }
808    }
809    for table in V2_EXPAND_TABLES {
810        if !has_table(pool, table)? {
811            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
812                invariant: "schema_v2_expand_shape.complete",
813                detail: format!("partial schema v2 expand shape is missing table {table}"),
814            });
815        }
816    }
817
818    if !failures.is_empty() {
819        return Ok(failures);
820    }
821
822    for (table, column) in V2_BACKFILL_REQUIRED_COLUMNS {
823        let null_rows = count_null_column_rows(pool, table, column)?;
824        if null_rows > 0 {
825            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
826                invariant: "schema_v2_expand_backfill.complete",
827                detail: format!(
828                    "schema v2 expand backfill left {null_rows} {table}.{column} rows unset"
829                ),
830            });
831        }
832    }
833
834    Ok(failures)
835}
836
837/// Verify that an explicitly prepared store is ready for default schema-v2 persistence.
838///
839/// This helper is intentionally not called from [`crate::open`] or migration startup paths while
840/// `cortex_core::SCHEMA_VERSION` remains 1. It gives Lane S2 tests a fail-closed validator for the
841/// future v2-only row shape without enabling cutover.
842pub fn verify_schema_v2_default_persistence_readiness(
843    pool: &Pool,
844) -> StoreResult<Vec<SchemaVersionFailure>> {
845    let mut failures = verify_schema_version(pool, DEFAULT_SCHEMA_V2_TARGET)?.failures;
846    let mut shape_complete = true;
847
848    for (table, column) in V2_EXPAND_COLUMNS {
849        if !has_column(pool, table, column)? {
850            shape_complete = false;
851            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
852                invariant: "schema_v2_default_persistence.s2_9_shape.complete",
853                detail: format!("default schema v2 persistence is missing column {table}.{column}"),
854            });
855        }
856    }
857    for table in V2_EXPAND_TABLES {
858        if !has_table(pool, table)? {
859            shape_complete = false;
860            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
861                invariant: "schema_v2_default_persistence.s2_9_shape.complete",
862                detail: format!("default schema v2 persistence is missing table {table}"),
863            });
864        }
865    }
866
867    if !shape_complete {
868        return Ok(failures);
869    }
870
871    failures.extend(verify_schema_v2_expand_shape(pool)?);
872    failures.extend(verify_v2_source_attestations(pool)?);
873    failures.extend(verify_v2_episode_summary_spans(pool)?);
874    failures.extend(verify_v2_memory_summary_spans(pool)?);
875    failures.extend(verify_v2_context_pack_advisories(pool)?);
876    failures.extend(verify_v2_cross_session_salience(pool)?);
877    failures.extend(verify_v2_memory_session_uses(pool)?);
878    failures.extend(verify_v2_outcome_memory_relations(pool)?);
879
880    Ok(failures)
881}
882
883fn verify_v2_source_attestations(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
884    let mut failures = Vec::new();
885    let mut stmt = pool.prepare("SELECT id, source_attestation_json FROM events ORDER BY id;")?;
886    let rows = stmt.query_map([], |row| {
887        Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
888    })?;
889
890    for row in rows {
891        let (id, raw) = row?;
892        let Some(raw) = raw else {
893            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
894                invariant: "schema_v2_default_persistence.source_attestation.present",
895                detail: format!("events row {id} has unset source_attestation_json"),
896            });
897            continue;
898        };
899        if let Err(err) = serde_json::from_str::<SourceAttestation>(&raw) {
900            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
901                invariant: "schema_v2_default_persistence.source_attestation.valid",
902                detail: format!(
903                    "events row {id} source_attestation_json is not a valid SourceAttestation: {err}"
904                ),
905            });
906        }
907    }
908
909    Ok(failures)
910}
911
912fn verify_v2_episode_summary_spans(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
913    let mut failures = Vec::new();
914    let mut stmt =
915        pool.prepare("SELECT id, summary, summary_spans_json FROM episodes ORDER BY id;")?;
916    let rows = stmt.query_map([], |row| {
917        Ok((
918            row.get::<_, String>(0)?,
919            row.get::<_, String>(1)?,
920            row.get::<_, Option<String>>(2)?,
921        ))
922    })?;
923
924    for row in rows {
925        let (id, summary, raw) = row?;
926        let Some(raw) = raw else {
927            // ADR 0015 / S2.9 nullable column: a NULL value means the row has
928            // not yet had spans backfilled. The cutover readiness gate
929            // surfaces that as a missing-backfill failure via
930            // `verify_schema_v2_expand_shape`; here we only validate the
931            // shape of any value that IS present.
932            continue;
933        };
934        match serde_json::from_str::<Vec<SummarySpan>>(&raw) {
935            Ok(spans) => {
936                if let Err(err) =
937                    validate_summary_spans(&summary, &spans, |_| SourceAuthority::Derived)
938                {
939                    failures.push(SchemaVersionFailure::IllegalIntermediateShape {
940                        invariant: "schema_v2_default_persistence.summary_spans.valid",
941                        detail: format!(
942                            "episodes row {id} summary_spans_json failed {}",
943                            err.invariant()
944                        ),
945                    });
946                }
947            }
948            Err(err) => failures.push(SchemaVersionFailure::IllegalIntermediateShape {
949                invariant: "schema_v2_default_persistence.summary_spans.valid",
950                detail: format!(
951                    "episodes row {id} summary_spans_json is not valid SummarySpan JSON: {err}"
952                ),
953            }),
954        }
955    }
956
957    Ok(failures)
958}
959
960fn verify_v2_memory_summary_spans(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
961    let mut failures = Vec::new();
962    let mut stmt = pool.prepare("SELECT id, summary_spans_json FROM memories ORDER BY id;")?;
963    let rows = stmt.query_map([], |row| {
964        Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
965    })?;
966
967    for row in rows {
968        let (id, raw) = row?;
969        // S2.9 nullable column: NULL means unbackfilled. Shape validity is
970        // checked only when a value is present.
971        let Some(raw) = raw else {
972            continue;
973        };
974        if let Err(err) = serde_json::from_str::<Vec<SummarySpan>>(&raw) {
975            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
976                invariant: "schema_v2_default_persistence.summary_spans.valid",
977                detail: format!(
978                    "memories row {id} summary_spans_json is not valid SummarySpan JSON: {err}"
979                ),
980            });
981        }
982    }
983
984    Ok(failures)
985}
986
987fn verify_v2_context_pack_advisories(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
988    let mut failures = Vec::new();
989    let mut stmt =
990        pool.prepare("SELECT id, consumer_advisory_json FROM context_packs ORDER BY id;")?;
991    let rows = stmt.query_map([], |row| {
992        Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
993    })?;
994
995    for row in rows {
996        let (id, raw) = row?;
997        // S2.9 nullable column: NULL means unbackfilled.
998        let Some(raw) = raw else {
999            continue;
1000        };
1001        if let Err(err) = serde_json::from_str::<ConsumerAdvisory>(&raw) {
1002            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
1003                invariant: "schema_v2_default_persistence.context_pack_advisory.valid",
1004                detail: format!(
1005                    "context_packs row {id} consumer_advisory_json is not a valid ConsumerAdvisory: {err}"
1006                ),
1007            });
1008        }
1009    }
1010
1011    Ok(failures)
1012}
1013
1014fn verify_v2_cross_session_salience(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
1015    let mut failures = Vec::new();
1016    let mut stmt = pool.prepare(
1017        "SELECT id, cross_session_use_count, first_used_at, last_cross_session_use_at,
1018                last_validation_at, validation_epoch, blessed_until
1019         FROM memories
1020         ORDER BY id;",
1021    )?;
1022    let rows = stmt.query_map([], |row| {
1023        Ok((
1024            row.get::<_, String>(0)?,
1025            row.get::<_, Option<u32>>(1)?,
1026            row.get::<_, Option<String>>(2)?,
1027            row.get::<_, Option<String>>(3)?,
1028            row.get::<_, Option<String>>(4)?,
1029            row.get::<_, Option<u32>>(5)?,
1030            row.get::<_, Option<String>>(6)?,
1031        ))
1032    })?;
1033
1034    for row in rows {
1035        let (
1036            id,
1037            cross_session_use_count,
1038            first_used_at,
1039            last_cross_session_use_at,
1040            last_validation_at,
1041            validation_epoch,
1042            blessed_until,
1043        ) = row?;
1044
1045        let Some(cross_session_use_count) = cross_session_use_count else {
1046            failures.push(v2_salience_failure(
1047                id,
1048                "cross_session_use_count is missing",
1049            ));
1050            continue;
1051        };
1052        let Some(validation_epoch) = validation_epoch else {
1053            failures.push(v2_salience_failure(id, "validation_epoch is missing"));
1054            continue;
1055        };
1056
1057        let first_used_at =
1058            parse_optional_v2_time(&id, "first_used_at", first_used_at, &mut failures);
1059        let last_cross_session_use_at = parse_optional_v2_time(
1060            &id,
1061            "last_cross_session_use_at",
1062            last_cross_session_use_at,
1063            &mut failures,
1064        );
1065        let last_validation_at =
1066            parse_optional_v2_time(&id, "last_validation_at", last_validation_at, &mut failures);
1067        let blessed_until =
1068            parse_optional_v2_time(&id, "blessed_until", blessed_until, &mut failures);
1069
1070        if failures.iter().any(|failure| match failure {
1071            SchemaVersionFailure::IllegalIntermediateShape { detail, .. } => {
1072                detail.starts_with(&format!("memories row {id} "))
1073            }
1074            _ => false,
1075        }) {
1076            continue;
1077        }
1078
1079        let _state = CrossSessionSalience {
1080            cross_session_use_count,
1081            first_used_at,
1082            last_cross_session_use_at,
1083            last_validation_at,
1084            validation_epoch,
1085            blessed_until,
1086        };
1087    }
1088
1089    Ok(failures)
1090}
1091
1092fn parse_optional_v2_time(
1093    row_id: &str,
1094    column: &'static str,
1095    raw: Option<String>,
1096    failures: &mut Vec<SchemaVersionFailure>,
1097) -> Option<DateTime<Utc>> {
1098    raw.and_then(|value| match value.parse::<DateTime<Utc>>() {
1099        Ok(parsed) => Some(parsed),
1100        Err(err) => {
1101            failures.push(v2_salience_failure(
1102                row_id.to_string(),
1103                format!("{column} is not RFC3339 UTC datetime: {err}"),
1104            ));
1105            None
1106        }
1107    })
1108}
1109
1110fn v2_salience_failure(
1111    row_id: impl Into<String>,
1112    detail: impl Into<String>,
1113) -> SchemaVersionFailure {
1114    SchemaVersionFailure::IllegalIntermediateShape {
1115        invariant: "schema_v2_default_persistence.cross_session_salience.valid",
1116        detail: format!("memories row {} {}", row_id.into(), detail.into()),
1117    }
1118}
1119
1120fn verify_v2_memory_session_uses(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
1121    let mut failures = Vec::new();
1122    let mut stmt = pool.prepare(
1123        "SELECT memory_id, session_id, first_used_at, last_used_at, use_count
1124         FROM memory_session_uses
1125         ORDER BY memory_id, session_id;",
1126    )?;
1127    let rows = stmt.query_map([], |row| {
1128        Ok((
1129            row.get::<_, String>(0)?,
1130            row.get::<_, String>(1)?,
1131            row.get::<_, String>(2)?,
1132            row.get::<_, String>(3)?,
1133            row.get::<_, u32>(4)?,
1134        ))
1135    })?;
1136
1137    for row in rows {
1138        let (memory_id, session_id, first_used_at, last_used_at, use_count) = row?;
1139        let row_id = format!("{memory_id}/{session_id}");
1140        if memory_id.trim().is_empty() {
1141            failures.push(v2_memory_session_use_failure(&row_id, "memory_id is empty"));
1142        } else if !row_exists(pool, "memories", &memory_id)? {
1143            failures.push(v2_memory_session_use_failure(
1144                &row_id,
1145                "memory_id does not reference an existing memory",
1146            ));
1147        }
1148        if session_id.trim().is_empty() {
1149            failures.push(v2_memory_session_use_failure(
1150                &row_id,
1151                "session_id is empty",
1152            ));
1153        }
1154        if use_count == 0 {
1155            failures.push(v2_memory_session_use_failure(&row_id, "use_count is zero"));
1156        }
1157
1158        let first_used_at = parse_required_v2_time(
1159            "schema_v2_default_persistence.memory_session_uses.valid",
1160            &row_id,
1161            "first_used_at",
1162            &first_used_at,
1163            &mut failures,
1164        );
1165        let last_used_at = parse_required_v2_time(
1166            "schema_v2_default_persistence.memory_session_uses.valid",
1167            &row_id,
1168            "last_used_at",
1169            &last_used_at,
1170            &mut failures,
1171        );
1172        if let (Some(first), Some(last)) = (first_used_at, last_used_at) {
1173            if last < first {
1174                failures.push(v2_memory_session_use_failure(
1175                    &row_id,
1176                    "last_used_at is earlier than first_used_at",
1177                ));
1178            }
1179        }
1180    }
1181
1182    Ok(failures)
1183}
1184
1185fn verify_v2_outcome_memory_relations(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
1186    let mut failures = Vec::new();
1187    let mut stmt = pool.prepare(
1188        "SELECT outcome_ref, memory_id, relation, recorded_at, source_event_id,
1189                validation_scope, validating_principal_id, evidence_ref
1190         FROM outcome_memory_relations
1191         ORDER BY outcome_ref, memory_id, relation;",
1192    )?;
1193    let rows = stmt.query_map([], |row| {
1194        Ok((
1195            row.get::<_, String>(0)?,
1196            row.get::<_, String>(1)?,
1197            row.get::<_, String>(2)?,
1198            row.get::<_, String>(3)?,
1199            row.get::<_, Option<String>>(4)?,
1200            row.get::<_, Option<String>>(5)?,
1201            row.get::<_, Option<String>>(6)?,
1202            row.get::<_, Option<String>>(7)?,
1203        ))
1204    })?;
1205
1206    for row in rows {
1207        let (
1208            outcome_ref,
1209            memory_id,
1210            relation,
1211            recorded_at,
1212            source_event_id,
1213            validation_scope,
1214            validating_principal_id,
1215            evidence_ref,
1216        ) = row?;
1217        let row_id = format!("{outcome_ref}/{memory_id}/{relation}");
1218        if outcome_ref.trim().is_empty() {
1219            failures.push(v2_outcome_memory_relation_failure(
1220                &row_id,
1221                "outcome_ref is empty",
1222            ));
1223        }
1224        if memory_id.trim().is_empty() {
1225            failures.push(v2_outcome_memory_relation_failure(
1226                &row_id,
1227                "memory_id is empty",
1228            ));
1229        } else if !row_exists(pool, "memories", &memory_id)? {
1230            failures.push(v2_outcome_memory_relation_failure(
1231                &row_id,
1232                "memory_id does not reference an existing memory",
1233            ));
1234        }
1235        let parsed_relation =
1236            serde_json::from_value::<OutcomeMemoryRelation>(serde_json::json!(relation));
1237        if let Err(err) = &parsed_relation {
1238            failures.push(v2_outcome_memory_relation_failure(
1239                &row_id,
1240                format!("relation is not a valid OutcomeMemoryRelation: {err}"),
1241            ));
1242        }
1243        parse_required_v2_time(
1244            "schema_v2_default_persistence.outcome_memory_relations.valid",
1245            &row_id,
1246            "recorded_at",
1247            &recorded_at,
1248            &mut failures,
1249        );
1250        if let Some(source_event_id) = source_event_id {
1251            if source_event_id.trim().is_empty() {
1252                failures.push(v2_outcome_memory_relation_failure(
1253                    &row_id,
1254                    "source_event_id is empty",
1255                ));
1256            } else if !row_exists(pool, "events", &source_event_id)? {
1257                failures.push(v2_outcome_memory_relation_failure(
1258                    &row_id,
1259                    "source_event_id does not reference an existing event",
1260                ));
1261            }
1262        }
1263        // ADR 0020 §6 scoped-validation payload (Phase 2.6 D1 closure):
1264        // A `Validated` row must carry validation_scope, validating_principal_id,
1265        // and evidence_ref. Non-validation relations must NOT carry these — they
1266        // do not move the validation axis and any populated value would be a
1267        // category error.
1268        if let Ok(parsed) = parsed_relation {
1269            if parsed.advances_validation() {
1270                if validation_scope.is_none() {
1271                    failures.push(v2_outcome_memory_relation_failure(
1272                        &row_id,
1273                        "validated relation missing validation_scope",
1274                    ));
1275                }
1276                if validating_principal_id.is_none() {
1277                    failures.push(v2_outcome_memory_relation_failure(
1278                        &row_id,
1279                        "validated relation missing validating_principal_id",
1280                    ));
1281                }
1282                if evidence_ref.is_none() {
1283                    failures.push(v2_outcome_memory_relation_failure(
1284                        &row_id,
1285                        "validated relation missing evidence_ref",
1286                    ));
1287                }
1288            } else {
1289                if validation_scope.is_some() {
1290                    failures.push(v2_outcome_memory_relation_failure(
1291                        &row_id,
1292                        "non-validation relation must not carry validation_scope",
1293                    ));
1294                }
1295                if validating_principal_id.is_some() {
1296                    failures.push(v2_outcome_memory_relation_failure(
1297                        &row_id,
1298                        "non-validation relation must not carry validating_principal_id",
1299                    ));
1300                }
1301                if evidence_ref.is_some() {
1302                    failures.push(v2_outcome_memory_relation_failure(
1303                        &row_id,
1304                        "non-validation relation must not carry evidence_ref",
1305                    ));
1306                }
1307            }
1308        }
1309    }
1310
1311    Ok(failures)
1312}
1313
1314fn parse_required_v2_time(
1315    invariant: &'static str,
1316    row_id: &str,
1317    column: &'static str,
1318    raw: &str,
1319    failures: &mut Vec<SchemaVersionFailure>,
1320) -> Option<DateTime<Utc>> {
1321    match raw.parse::<DateTime<Utc>>() {
1322        Ok(parsed) => Some(parsed),
1323        Err(err) => {
1324            failures.push(SchemaVersionFailure::IllegalIntermediateShape {
1325                invariant,
1326                detail: format!("{row_id} {column} is not RFC3339 UTC datetime: {err}"),
1327            });
1328            None
1329        }
1330    }
1331}
1332
1333fn v2_memory_session_use_failure(row_id: &str, detail: impl Into<String>) -> SchemaVersionFailure {
1334    SchemaVersionFailure::IllegalIntermediateShape {
1335        invariant: "schema_v2_default_persistence.memory_session_uses.valid",
1336        detail: format!("memory_session_uses row {row_id} {}", detail.into()),
1337    }
1338}
1339
1340fn v2_outcome_memory_relation_failure(
1341    row_id: &str,
1342    detail: impl Into<String>,
1343) -> SchemaVersionFailure {
1344    SchemaVersionFailure::IllegalIntermediateShape {
1345        invariant: "schema_v2_default_persistence.outcome_memory_relations.valid",
1346        detail: format!("outcome_memory_relations row {row_id} {}", detail.into()),
1347    }
1348}
1349
1350fn count_null_column_rows(pool: &Pool, table: &str, column: &str) -> StoreResult<u64> {
1351    let sql = format!("SELECT COUNT(*) FROM {table} WHERE {column} IS NULL;");
1352    pool.query_row(&sql, [], |row| row.get(0))
1353        .map_err(Into::into)
1354}
1355
1356fn row_exists(pool: &Pool, table: &'static str, id: &str) -> StoreResult<bool> {
1357    let sql = format!("SELECT 1 FROM {table} WHERE id = ?1 LIMIT 1;");
1358    let found = pool.query_row(&sql, params![id], |_| Ok(())).optional()?;
1359    Ok(found.is_some())
1360}
1361
1362fn has_table(pool: &Pool, table: &str) -> StoreResult<bool> {
1363    let existing = pool
1364        .query_row(
1365            "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?1;",
1366            params![table],
1367            |row| row.get::<_, String>(0),
1368        )
1369        .optional()?;
1370    Ok(existing.is_some())
1371}
1372
1373fn has_column(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
1374    let sql = format!("PRAGMA table_info({table});");
1375    let mut stmt = pool.prepare(&sql)?;
1376    let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
1377    for found in columns {
1378        if found? == column {
1379            return Ok(true);
1380        }
1381    }
1382    Ok(false)
1383}
1384
1385#[cfg(test)]
1386mod tests {
1387    use rusqlite::Connection;
1388
1389    use super::*;
1390
1391    fn migrated_pool() -> Connection {
1392        let pool = Connection::open_in_memory().expect("open sqlite");
1393        crate::migrate::apply_pending(&pool).expect("apply migrations");
1394        pool
1395    }
1396
1397    #[test]
1398    fn verify_schema_version_passes_for_empty_v1_schema() {
1399        let pool = migrated_pool();
1400
1401        let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1402            .expect("verify schema version");
1403
1404        assert!(report.is_ok(), "unexpected failures: {report:?}");
1405        assert_eq!(
1406            report.checked_tables,
1407            REQUIRED_TABLES
1408                .iter()
1409                .map(|table| table.name)
1410                .collect::<Vec<_>>()
1411        );
1412    }
1413
1414    #[test]
1415    fn verify_schema_version_names_mismatched_event_row() {
1416        let pool = migrated_pool();
1417        pool.execute(
1418            "INSERT INTO events (
1419                id, schema_version, observed_at, recorded_at, source_json, event_type,
1420                trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1421                prev_event_hash, event_hash
1422            ) VALUES (
1423                'evt_future', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1424                '{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
1425                'payload-hash', NULL, 'event-hash'
1426            );",
1427            [],
1428        )
1429        .expect("insert mismatched event");
1430
1431        let report = verify_schema_version(&pool, 1).expect("verify schema version");
1432
1433        assert_eq!(
1434            report.failures,
1435            vec![SchemaVersionFailure::Mismatch {
1436                table: "events",
1437                row_id: "evt_future".into(),
1438                expected: 1,
1439                actual: 2,
1440            }]
1441        );
1442        assert_eq!(
1443            report.failures[0].invariant(),
1444            "schema_version.events.matches_code"
1445        );
1446    }
1447
1448    #[test]
1449    fn verify_schema_version_reports_missing_required_table() {
1450        let pool = migrated_pool();
1451        pool.execute("DROP TABLE audit_records;", [])
1452            .expect("drop audit_records");
1453
1454        let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1455            .expect("verify schema version");
1456
1457        assert!(
1458            report
1459                .failures
1460                .contains(&SchemaVersionFailure::MissingTable {
1461                    table: "audit_records"
1462                }),
1463            "failures: {report:?}"
1464        );
1465    }
1466
1467    #[test]
1468    fn verify_schema_version_reports_missing_required_column() {
1469        let pool = migrated_pool();
1470        pool.execute("ALTER TABLE events DROP COLUMN payload_hash;", [])
1471            .expect("drop payload_hash");
1472
1473        let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1474            .expect("verify schema version");
1475
1476        assert!(
1477            report
1478                .failures
1479                .contains(&SchemaVersionFailure::MissingColumn {
1480                    table: "events",
1481                    column: "payload_hash",
1482                }),
1483            "failures: {report:?}"
1484        );
1485    }
1486
1487    #[test]
1488    fn verify_schema_version_reports_unknown_migration() {
1489        let pool = migrated_pool();
1490        pool.execute("INSERT INTO _migrations (name) VALUES ('999_future');", [])
1491            .expect("insert future migration row");
1492
1493        let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1494            .expect("verify schema version");
1495
1496        assert_eq!(
1497            report.failures,
1498            vec![SchemaVersionFailure::UnknownMigration {
1499                name: "999_future".into(),
1500            }]
1501        );
1502        assert_eq!(
1503            report.failures[0].invariant(),
1504            "schema_migration.known_to_code"
1505        );
1506    }
1507
1508    /// Red-team D1 (ADR 0033 §5/§6): a v1 binary opening a v2 store between
1509    /// the JSONL boundary append and the SQLite boundary mirror sees no
1510    /// row-level `schema_version=2` in `events`, but `_migrations` already
1511    /// contains `003_schema_v2_expand`. The unknown-migration refusal closes
1512    /// that race window structurally before any write path runs.
1513    #[test]
1514    fn verify_schema_version_v1_binary_refuses_on_v2_migration_marker() {
1515        let pool = migrated_pool();
1516        // Wipe the v2 (and later) migration rows so the simulated v1 binary's
1517        // `known_migration_names()` matches what's persisted, then re-insert
1518        // `003_schema_v2_expand` to model the v2-store-state the v1 binary
1519        // is opening. The simulated v1 binary's known list is the prefix
1520        // `["001_init", "002_authority_timeline"]`.
1521        pool.execute(
1522            "DELETE FROM _migrations WHERE name NOT IN ('001_init', '002_authority_timeline');",
1523            [],
1524        )
1525        .expect("trim post-v1 migration rows to simulate v1-binary baseline");
1526        pool.execute(
1527            "INSERT INTO _migrations (name) VALUES ('003_schema_v2_expand');",
1528            [],
1529        )
1530        .expect("insert v2 cutover migration row");
1531
1532        // Manually re-implement the unknown-migration refusal against a
1533        // simulated v1 binary's `KNOWN_MIGRATION_NAMES` to prove the
1534        // structural invariant: the actual binary's
1535        // `known_migration_names()` necessarily includes the migration
1536        // currently being inserted, so we cross-check the rule here.
1537        let simulated_v1_known: &[&str] = &["001_init", "002_authority_timeline"];
1538        let mut stmt = pool
1539            .prepare("SELECT name FROM _migrations ORDER BY name;")
1540            .expect("prepare migrations scan");
1541        let observed_names: Vec<String> = stmt
1542            .query_map([], |row| row.get::<_, String>(0))
1543            .expect("query migrations")
1544            .collect::<rusqlite::Result<Vec<_>>>()
1545            .expect("collect migration names");
1546        let unknown_to_v1: Vec<String> = observed_names
1547            .into_iter()
1548            .filter(|name| !simulated_v1_known.iter().any(|known| *known == name))
1549            .collect();
1550
1551        assert_eq!(
1552            unknown_to_v1,
1553            vec!["003_schema_v2_expand".to_string()],
1554            "v1 binary must see 003_schema_v2_expand as unknown"
1555        );
1556
1557        // End-to-end: in this binary `KNOWN_MIGRATION_NAMES` does include
1558        // 003_schema_v2_expand, so the report passes the unknown-migration
1559        // check. Insert an explicitly unknown name to mirror the same
1560        // structural refusal a v1 binary would emit and assert the failure
1561        // carries the migration name and stable invariant.
1562        pool.execute(
1563            "INSERT INTO _migrations (name) VALUES ('003_schema_v2_expand_unknown_to_v1');",
1564            [],
1565        )
1566        .expect("insert simulated-v1-unknown migration row");
1567
1568        let report = verify_schema_version(&pool, 1).expect("verify schema version");
1569
1570        let unknown_failures: Vec<_> = report
1571            .failures
1572            .iter()
1573            .filter_map(|failure| match failure {
1574                SchemaVersionFailure::UnknownMigration { name } => Some(name.clone()),
1575                _ => None,
1576            })
1577            .collect();
1578        assert_eq!(
1579            unknown_failures,
1580            vec!["003_schema_v2_expand_unknown_to_v1".to_string()]
1581        );
1582        assert!(report.failures.iter().any(|failure| matches!(
1583            failure,
1584            SchemaVersionFailure::UnknownMigration { name } if name == "003_schema_v2_expand_unknown_to_v1"
1585        )));
1586        let invariant = report
1587            .failures
1588            .iter()
1589            .find_map(|failure| match failure {
1590                SchemaVersionFailure::UnknownMigration { .. } => Some(failure.invariant()),
1591                _ => None,
1592            })
1593            .expect("unknown-migration failure present");
1594        assert_eq!(invariant, "schema_migration.known_to_code");
1595    }
1596
1597    /// Red-team D1: a v2 binary opening a v2 store sees every migration in
1598    /// `_migrations` listed in `known_migration_names()`, so the refusal
1599    /// gate stays quiet.
1600    #[test]
1601    fn verify_schema_version_v2_binary_on_v2_store_emits_no_unknown_migration_failure() {
1602        let pool = migrated_pool();
1603
1604        let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1605            .expect("verify schema version");
1606
1607        assert!(
1608            !report
1609                .failures
1610                .iter()
1611                .any(|failure| matches!(failure, SchemaVersionFailure::UnknownMigration { .. })),
1612            "no unknown-migration failure expected: {report:?}"
1613        );
1614        assert!(report.is_ok(), "report must be clean: {report:?}");
1615    }
1616
1617    /// Red-team D1 / ADR 0033 §6: a v2 binary opening a future-v3 store
1618    /// (with `004_future` or any other migration name beyond
1619    /// `known_migration_names()`) refuses structurally, even though
1620    /// `expected = 2` matches the row-level shape it understands.
1621    #[test]
1622    fn verify_schema_version_v2_binary_on_future_v3_store_fails_closed() {
1623        let pool = migrated_pool();
1624        pool.execute(
1625            "INSERT INTO _migrations (name) VALUES ('999_v3_future_unknown_to_v2');",
1626            [],
1627        )
1628        .expect("insert v3 future migration row");
1629
1630        let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1631            .expect("verify schema version");
1632
1633        let unknown = report
1634            .failures
1635            .iter()
1636            .find_map(|failure| match failure {
1637                SchemaVersionFailure::UnknownMigration { name } => Some(name.clone()),
1638                _ => None,
1639            })
1640            .expect("unknown-migration failure present");
1641        assert_eq!(unknown, "999_v3_future_unknown_to_v2");
1642        assert!(!report.is_ok(), "v3 marker must fail closed: {report:?}");
1643    }
1644
1645    fn insert_event_row(pool: &Connection, id: &str) {
1646        pool.execute(
1647            "INSERT INTO events (
1648                id, schema_version, observed_at, recorded_at, source_json, event_type,
1649                trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1650                prev_event_hash, event_hash
1651            ) VALUES (
1652                ?1, 1, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1653                '{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
1654                'payload-hash', NULL, ?2
1655            );",
1656            params![id, format!("event-hash-{id}")],
1657        )
1658        .expect("insert event");
1659    }
1660
1661    #[test]
1662    fn verify_post_migrate_row_counts_passes_with_only_boundary_delta() {
1663        let pool = migrated_pool();
1664        insert_event_row(&pool, "evt_one");
1665        // Simulate the boundary append: pre-migrate observed 1 event row, post-migrate observes 2.
1666        insert_event_row(&pool, "evt_boundary");
1667
1668        let pre = PreV2BackupRowCounts {
1669            events: 1,
1670            traces: 0,
1671            episodes: 0,
1672            memories: 0,
1673        };
1674
1675        let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
1676
1677        assert!(
1678            failures.is_empty(),
1679            "clean +1 events delta must not fail: {failures:?}"
1680        );
1681    }
1682
1683    #[test]
1684    fn verify_post_migrate_row_counts_rejects_events_growing_beyond_boundary_delta() {
1685        let pool = migrated_pool();
1686        insert_event_row(&pool, "evt_one");
1687        insert_event_row(&pool, "evt_two_boundary");
1688        insert_event_row(&pool, "evt_three_extra");
1689
1690        let pre = PreV2BackupRowCounts {
1691            events: 1,
1692            traces: 0,
1693            episodes: 0,
1694            memories: 0,
1695        };
1696
1697        let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
1698
1699        assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
1700        assert_eq!(failures[0].table, "events");
1701        assert_eq!(failures[0].expected, 2);
1702        assert_eq!(failures[0].actual, 3);
1703        assert_eq!(
1704            failures[0].invariant(),
1705            "schema_v2_post_migrate.row_count.events.matches_pre_plus_boundary"
1706        );
1707        assert!(
1708            failures[0]
1709                .detail()
1710                .contains("table events has 3 rows after migrate"),
1711            "detail: {}",
1712            failures[0].detail()
1713        );
1714    }
1715
1716    #[test]
1717    fn verify_post_migrate_row_counts_rejects_events_count_shrinking() {
1718        let pool = migrated_pool();
1719        // pre says 5, post observed 0; events should never shrink across migrate.
1720        let pre = PreV2BackupRowCounts {
1721            events: 5,
1722            traces: 0,
1723            episodes: 0,
1724            memories: 0,
1725        };
1726
1727        let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
1728
1729        assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
1730        assert_eq!(failures[0].expected, 6);
1731        assert_eq!(failures[0].actual, 0);
1732    }
1733
1734    #[test]
1735    fn verify_post_migrate_row_counts_rejects_unchanged_tables_that_drifted() {
1736        let pool = migrated_pool();
1737        insert_event_row(&pool, "evt_one");
1738        insert_event_row(&pool, "evt_boundary");
1739        pool.execute(
1740            "INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
1741             VALUES ('trc_new', 1, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
1742            [],
1743        )
1744        .expect("insert trace");
1745
1746        let pre = PreV2BackupRowCounts {
1747            events: 1,
1748            traces: 0,
1749            episodes: 0,
1750            memories: 0,
1751        };
1752
1753        let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
1754
1755        assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
1756        assert_eq!(failures[0].table, "traces");
1757        assert_eq!(failures[0].expected, 0);
1758        assert_eq!(failures[0].actual, 1);
1759        assert_eq!(
1760            failures[0].invariant(),
1761            "schema_v2_post_migrate.row_count.traces.unchanged"
1762        );
1763    }
1764
1765    /// Red-team B3: a pre-migrate `events` row count at `u64::MAX` plus the
1766    /// boundary-append delta cannot be represented in `u64`. The verifier
1767    /// MUST refuse with a typed `RowCountCheckedAddOverflow` carrying the
1768    /// stable invariant `verify.row_counts.checked_add_overflow` rather than
1769    /// `saturating_add`-ing back to `u64::MAX` or surfacing the overflow as a
1770    /// soft count-mismatch failure with a contrived `expected` operand.
1771    #[test]
1772    fn verify_post_migrate_row_counts_refuses_pre_events_at_u64_max_overflow() {
1773        let pool = migrated_pool();
1774
1775        let pre = PreV2BackupRowCounts {
1776            events: u64::MAX,
1777            traces: 0,
1778            episodes: 0,
1779            memories: 0,
1780        };
1781
1782        let err = verify_post_migrate_row_counts(&pool, &pre)
1783            .expect_err("overflow must refuse via typed error");
1784
1785        match err {
1786            crate::StoreError::RowCountCheckedAddOverflow { table, pre, delta } => {
1787                assert_eq!(table, "events");
1788                assert_eq!(pre, u64::MAX);
1789                assert_eq!(delta, SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA);
1790            }
1791            other => panic!("unexpected error variant: {other:?}"),
1792        }
1793
1794        // Re-run to assert the stable invariant accessor (separate borrow).
1795        let invariant_err = verify_post_migrate_row_counts(&pool, &pre)
1796            .expect_err("overflow must refuse via typed error");
1797        assert_eq!(
1798            invariant_err.invariant(),
1799            Some(crate::VERIFY_ROW_COUNTS_CHECKED_ADD_OVERFLOW_INVARIANT),
1800        );
1801        assert_eq!(
1802            invariant_err.invariant(),
1803            Some("verify.row_counts.checked_add_overflow"),
1804        );
1805    }
1806
1807    #[test]
1808    fn verify_schema_v2_expand_shape_passes_after_default_bundle() {
1809        // Post-cutover (ADR 0018): `apply_pending` includes
1810        // `003_schema_v2_expand` by default, so every fresh store has the
1811        // complete S2.9 shape. With no rows present the backfill-complete
1812        // check passes trivially.
1813        let pool = migrated_pool();
1814
1815        let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
1816
1817        assert!(failures.is_empty(), "unexpected failures: {failures:?}");
1818    }
1819
1820    #[test]
1821    fn verify_schema_v2_expand_shape_reports_partial_expand_when_artifacts_removed() {
1822        // Simulate a half-applied / locally tampered expand by dropping one
1823        // expected column and one expected side table from the otherwise
1824        // complete default-bundle shape. The shape-complete invariant must
1825        // surface both the missing column and the missing side table by name.
1826        let pool = migrated_pool();
1827        pool.execute("ALTER TABLE episodes DROP COLUMN summary_spans_json;", [])
1828            .expect("drop episodes.summary_spans_json to simulate partial expand");
1829        pool.execute("DROP TABLE memory_session_uses;", [])
1830            .expect("drop memory_session_uses to simulate partial expand");
1831
1832        let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
1833
1834        assert!(
1835            failures.iter().any(|failure| matches!(
1836                failure,
1837                SchemaVersionFailure::IllegalIntermediateShape {
1838                    invariant: "schema_v2_expand_shape.complete",
1839                    detail,
1840                } if detail == "partial schema v2 expand shape is missing column episodes.summary_spans_json"
1841            )),
1842            "failures: {failures:?}"
1843        );
1844        assert!(
1845            failures.iter().any(|failure| matches!(
1846                failure,
1847                SchemaVersionFailure::IllegalIntermediateShape {
1848                    invariant: "schema_v2_expand_shape.complete",
1849                    detail,
1850                } if detail == "partial schema v2 expand shape is missing table memory_session_uses"
1851            )),
1852            "failures: {failures:?}"
1853        );
1854    }
1855
1856    #[test]
1857    fn verify_schema_v2_expand_shape_reports_unset_required_backfill() {
1858        let pool = migrated_pool();
1859        crate::migrate_v2::apply_expand_backfill_skeleton(
1860            &pool,
1861            "2026-05-04T13:00:00Z".parse().unwrap(),
1862        )
1863        .expect("apply complete v2 expand shape");
1864        pool.execute(
1865            "INSERT INTO events (
1866                id, schema_version, observed_at, recorded_at, source_json, event_type,
1867                trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1868                prev_event_hash, event_hash, source_attestation_json
1869            ) VALUES (
1870                'evt_s2_unset_backfill', 1, '2026-05-04T12:00:00Z',
1871                '2026-05-04T12:00:01Z', '{\"kind\":\"test\"}', 'test.event',
1872                NULL, NULL, '[]', '{}', 'payload-hash', NULL, 'event-hash', NULL
1873            );",
1874            [],
1875        )
1876        .expect("insert row with unset v2 backfill metadata");
1877
1878        let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
1879
1880        assert_eq!(
1881            failures,
1882            vec![SchemaVersionFailure::IllegalIntermediateShape {
1883                invariant: "schema_v2_expand_backfill.complete",
1884                detail:
1885                    "schema v2 expand backfill left 1 events.source_attestation_json rows unset"
1886                        .into(),
1887            }]
1888        );
1889    }
1890
1891    #[test]
1892    fn count_post_v2_rows_outside_boundary_is_zero_for_empty_store() {
1893        let pool = migrated_pool();
1894        let counts =
1895            count_post_v2_rows_outside_boundary(&pool).expect("count post-v2 rows on empty store");
1896        assert!(counts.is_empty());
1897        assert_eq!(counts.events_post_v2, 0);
1898        assert_eq!(counts.traces_post_v2, 0);
1899        assert_eq!(counts.total(), 0);
1900    }
1901
1902    #[test]
1903    fn count_post_v2_rows_outside_boundary_is_zero_for_pre_v2_rows() {
1904        let pool = migrated_pool();
1905        pool.execute(
1906            "INSERT INTO events (
1907                id, schema_version, observed_at, recorded_at, source_json, event_type,
1908                trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1909                prev_event_hash, event_hash
1910            ) VALUES (
1911                'evt_pre_v2_one', 1, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1912                '{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
1913                'payload-hash', NULL, 'event-hash-one'
1914            );",
1915            [],
1916        )
1917        .expect("insert pre-v2 event row");
1918        pool.execute(
1919            "INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
1920             VALUES ('trc_pre_v2_one', 1, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
1921            [],
1922        )
1923        .expect("insert pre-v2 trace row");
1924
1925        let counts =
1926            count_post_v2_rows_outside_boundary(&pool).expect("count post-v2 rows on v1 fixture");
1927        assert!(counts.is_empty());
1928    }
1929
1930    #[test]
1931    fn count_post_v2_rows_outside_boundary_skips_the_schema_migration_boundary_row() {
1932        let pool = migrated_pool();
1933        pool.execute(
1934            "INSERT INTO events (
1935                id, schema_version, observed_at, recorded_at, source_json, event_type,
1936                trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1937                prev_event_hash, event_hash
1938            ) VALUES (
1939                'evt_boundary', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1940                '{\"kind\":\"runtime\"}', 'system.note', NULL, NULL, '[]',
1941                '{\"kind\":\"schema_migration.v1_to_v2\",\"previous_v1_head_hash\":\"prev\",\"migration_script_digest\":\"blake3:digest\",\"fixture_verification_result_hash\":\"blake3:fixture\"}',
1942                'payload-hash', NULL, 'event-hash-boundary'
1943            );",
1944            [],
1945        )
1946        .expect("insert boundary-shape row");
1947
1948        let counts = count_post_v2_rows_outside_boundary(&pool)
1949            .expect("count post-v2 rows after boundary insert");
1950        assert_eq!(counts.events_post_v2, 0);
1951        assert_eq!(counts.traces_post_v2, 0);
1952    }
1953
1954    #[test]
1955    fn count_post_v2_rows_outside_boundary_surfaces_fresh_v2_events_row() {
1956        let pool = migrated_pool();
1957        pool.execute(
1958            "INSERT INTO events (
1959                id, schema_version, observed_at, recorded_at, source_json, event_type,
1960                trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1961                prev_event_hash, event_hash
1962            ) VALUES (
1963                'evt_fresh_v2', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1964                '{\"kind\":\"runtime\"}', 'tool.result', NULL, NULL, '[]',
1965                '{\"step\":1}',
1966                'payload-hash', NULL, 'event-hash-fresh-v2'
1967            );",
1968            [],
1969        )
1970        .expect("insert fresh-v2 events row");
1971
1972        let counts = count_post_v2_rows_outside_boundary(&pool)
1973            .expect("count post-v2 rows on fresh-v2 fixture");
1974        assert!(!counts.is_empty());
1975        assert_eq!(counts.events_post_v2, 1);
1976        assert_eq!(counts.traces_post_v2, 0);
1977        assert_eq!(counts.total(), 1);
1978    }
1979
1980    #[test]
1981    fn count_post_v2_rows_outside_boundary_surfaces_fresh_v2_traces_row() {
1982        let pool = migrated_pool();
1983        pool.execute(
1984            "INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
1985             VALUES ('trc_fresh_v2', 2, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
1986            [],
1987        )
1988        .expect("insert fresh-v2 trace row");
1989
1990        let counts = count_post_v2_rows_outside_boundary(&pool)
1991            .expect("count post-v2 rows on fresh-v2 trace fixture");
1992        assert!(!counts.is_empty());
1993        assert_eq!(counts.events_post_v2, 0);
1994        assert_eq!(counts.traces_post_v2, 1);
1995    }
1996}