Skip to main content

cortex_store/
migrate_v2.rs

1//! Dry-run planning helpers for the schema v2 cutover.
2//!
3//! The dry-run planner deliberately does not mutate the store and does not bump
4//! `cortex_core::SCHEMA_VERSION`. The explicit expand/backfill helper below is
5//! still pre-cutover: it adds nullable v2 compatibility columns and honest
6//! legacy markers, but it is not part of the default `apply_pending` path while
7//! schema v1 remains the live product schema.
8
9use chrono::{DateTime, Utc};
10use cortex_ledger::{event_hash as ledger_event_hash, hash::canonical_payload_bytes, payload_hash};
11use rusqlite::{params, OptionalExtension};
12use serde_json::{json, Value};
13
14use crate::verify::{
15    verify_schema_v2_default_persistence_readiness, verify_schema_v2_expand_shape,
16    verify_schema_version, SchemaVersionFailure,
17};
18use crate::{Pool, StoreError, StoreResult};
19
20/// Draft SQL shape for the schema v2 expand/backfill skeleton.
21pub const SCHEMA_V2_EXPAND_SQL: &str = include_str!("../migrations/003_schema_v2_expand.sql");
22
23const FIXTURE_COUNT_TABLES: &[&str] = &[
24    "events",
25    "traces",
26    "episodes",
27    "memories",
28    "context_packs",
29    "audit_records",
30];
31
32const DRY_RUN_STEPS: &[&str] = &[
33    "preflight_schema_v1",
34    "inspect_v1_hash_chain_head",
35    "plan_expand_nullable_v2_columns",
36    "plan_legacy_unattested_backfill",
37    "plan_schema_migration_boundary_event",
38    "leave_schema_version_unchanged",
39];
40
41const STAGE_BACKUP_PREFLIGHT_READY: &str = "backup-preflight-ready";
42const STAGE_EXPAND_BACKFILL: &str = "expand/backfill";
43const STAGE_BOUNDARY_APPEND_PENDING: &str = "boundary-append-pending";
44const STAGE_POST_MIGRATION_AUDIT_PENDING: &str = "post-migration-audit-pending";
45
46const FIXTURE_VERIFICATION_TRANSCRIPT_SCHEMA_VERSION: u16 = 1;
47const FIXTURE_VERIFICATION_MIGRATION_ID: &str = "schema_v2_dry_run_fixture_verification";
48const DEFAULT_SCHEMA_V2_TARGET: u16 = 2;
49
50/// Durable schema artifact kind required before default schema-v2 persistence is ready.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum V2DurableS29ArtifactKind {
53    /// A required column on an existing v1 table.
54    Column,
55    /// A required side table introduced for durable v2 state.
56    Table,
57}
58
59/// Durable S2.9 schema artifact required by the default schema-v2 readiness gate.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub struct V2DurableS29Artifact {
62    /// Artifact kind.
63    pub kind: V2DurableS29ArtifactKind,
64    /// Owning table, or table name when `kind` is [`V2DurableS29ArtifactKind::Table`].
65    pub table: &'static str,
66    /// Column name when `kind` is [`V2DurableS29ArtifactKind::Column`].
67    pub column: Option<&'static str>,
68}
69
70/// Durable S2.9 columns/tables that must exist for default schema-v2 persistence readiness.
71pub const DURABLE_S2_9_ARTIFACTS: &[V2DurableS29Artifact] = &[
72    V2DurableS29Artifact {
73        kind: V2DurableS29ArtifactKind::Column,
74        table: "events",
75        column: Some("source_attestation_json"),
76    },
77    V2DurableS29Artifact {
78        kind: V2DurableS29ArtifactKind::Column,
79        table: "episodes",
80        column: Some("summary_spans_json"),
81    },
82    V2DurableS29Artifact {
83        kind: V2DurableS29ArtifactKind::Column,
84        table: "memories",
85        column: Some("summary_spans_json"),
86    },
87    V2DurableS29Artifact {
88        kind: V2DurableS29ArtifactKind::Column,
89        table: "memories",
90        column: Some("cross_session_use_count"),
91    },
92    V2DurableS29Artifact {
93        kind: V2DurableS29ArtifactKind::Column,
94        table: "memories",
95        column: Some("first_used_at"),
96    },
97    V2DurableS29Artifact {
98        kind: V2DurableS29ArtifactKind::Column,
99        table: "memories",
100        column: Some("last_cross_session_use_at"),
101    },
102    V2DurableS29Artifact {
103        kind: V2DurableS29ArtifactKind::Column,
104        table: "memories",
105        column: Some("last_validation_at"),
106    },
107    V2DurableS29Artifact {
108        kind: V2DurableS29ArtifactKind::Column,
109        table: "memories",
110        column: Some("validation_epoch"),
111    },
112    V2DurableS29Artifact {
113        kind: V2DurableS29ArtifactKind::Column,
114        table: "memories",
115        column: Some("blessed_until"),
116    },
117    V2DurableS29Artifact {
118        kind: V2DurableS29ArtifactKind::Column,
119        table: "context_packs",
120        column: Some("consumer_advisory_json"),
121    },
122    V2DurableS29Artifact {
123        kind: V2DurableS29ArtifactKind::Table,
124        table: "memory_session_uses",
125        column: None,
126    },
127    V2DurableS29Artifact {
128        kind: V2DurableS29ArtifactKind::Table,
129        table: "outcome_memory_relations",
130        column: None,
131    },
132];
133
134/// Per-table row count captured by a schema v2 dry run.
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct FixtureTableCount {
137    /// Table inspected.
138    pub table: &'static str,
139    /// Rows currently present in the table.
140    pub rows: u64,
141}
142
143/// Read-only snapshot of a v1 store fixture before schema v2 migration.
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct V1FixtureReport {
146    /// Expected schema version for this pre-cutover binary.
147    pub expected_schema_version: u16,
148    /// Distinct row-level schema versions observed in `events` and `traces`.
149    pub observed_row_schema_versions: Vec<u16>,
150    /// Selected table counts needed by the v2 fixture matrix.
151    pub table_counts: Vec<FixtureTableCount>,
152    /// Current event hash-chain head, if the fixture contains events.
153    pub event_chain_head: Option<String>,
154    /// Migration rows already recorded in `_migrations`.
155    pub applied_migrations: Vec<String>,
156}
157
158/// Read-only migration plan for a v1 store.
159#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct V2DryRunPlan {
161    /// Fixture state inspected by the planner.
162    pub fixture: V1FixtureReport,
163    /// Ordered high-level steps a real migration must execute later.
164    pub steps: Vec<&'static str>,
165    /// Precondition failures. Non-empty means a real migration must fail closed.
166    pub failures: Vec<SchemaVersionFailure>,
167}
168
169/// Store-local readiness status for one schema v2 migration stage.
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum V2MigrationStageStatus {
172    /// The stage is ready to be executed by the store-only staging surface.
173    Ready,
174    /// The stage is intentionally not executable in the store-only pre-cutover slice.
175    Pending,
176    /// The stage is blocked by dry-run/preflight evidence.
177    Blocked,
178}
179
180/// One schema v2 migration stage in the pre-cutover execution plan.
181#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct V2MigrationStage {
183    /// Stable stage name.
184    pub name: &'static str,
185    /// Current stage status.
186    pub status: V2MigrationStageStatus,
187    /// Whether this stage can mutate SQLite store state in this crate.
188    pub mutates_store: bool,
189    /// Whether this stage would enable schema v2 cutover.
190    pub enables_cutover: bool,
191    /// Stable explanation for operators and tests.
192    pub reason: &'static str,
193}
194
195/// Store-only schema v2 staging plan.
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct V2MigrationStagePlan {
198    /// Underlying dry-run fixture/preflight evidence.
199    pub dry_run: V2DryRunPlan,
200    /// Ordered pre-cutover stages.
201    pub stages: Vec<V2MigrationStage>,
202}
203
204/// Stable failure row included in a fixture verification transcript.
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct V2FixtureVerificationFailure {
207    /// Stable invariant name.
208    pub invariant: String,
209    /// Stable failure detail.
210    pub detail: String,
211}
212
213/// Deterministic transcript for a schema v2 dry-run fixture verification.
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct V2FixtureVerificationTranscript {
216    /// Transcript schema version for the digest input shape.
217    pub transcript_schema_version: u16,
218    /// Stable migration/drill identifier.
219    pub migration_id: &'static str,
220    /// Boundary previous-head value that the later migration event would bind.
221    pub boundary_previous_v1_head_hash: String,
222    /// Fixture state inspected by the dry-run planner.
223    pub fixture: V1FixtureReport,
224    /// Ordered dry-run steps included in the verification transcript.
225    pub steps: Vec<&'static str>,
226    /// Stable dry-run precondition failures.
227    pub failures: Vec<V2FixtureVerificationFailure>,
228}
229
230/// Summary of one explicit schema v2 expand/backfill skeleton pass.
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct V2ExpandBackfillReport {
233    /// Columns that were added during this invocation.
234    pub added_columns: Vec<String>,
235    /// Tables that were created during this invocation.
236    pub created_tables: Vec<&'static str>,
237    /// Legacy event rows marked as explicitly unattested.
238    pub legacy_event_attestations_backfilled: u64,
239    /// Episode rows backfilled with span-level provenance placeholders.
240    pub episode_summary_spans_backfilled: u64,
241    /// Memory rows backfilled with span-level provenance placeholders.
242    pub memory_summary_spans_backfilled: u64,
243    /// Context pack rows backfilled with advisory posture.
244    pub context_pack_advisories_backfilled: u64,
245    /// Memory rows backfilled with cross-session salience defaults.
246    pub memory_salience_defaults_backfilled: u64,
247}
248
249/// Result of preparing an explicit store fixture for default schema-v2 write-shape checks.
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct V2DefaultWriteShapeReadinessReport {
252    /// Expand/backfill pass that created the required S2.9 columns and side tables.
253    pub expand_backfill: V2ExpandBackfillReport,
254    /// Event rows moved to the v2 row-version shape for readiness verification.
255    pub event_rows_marked_v2: u64,
256    /// Trace rows moved to the v2 row-version shape for readiness verification.
257    pub trace_rows_marked_v2: u64,
258    /// Fail-closed readiness failures after the fixture has been prepared.
259    pub readiness_failures: Vec<SchemaVersionFailure>,
260    /// Fail-closed event framing failures after the fixture has been prepared.
261    pub event_framing_failures: Vec<SchemaVersionFailure>,
262}
263
264impl V2DryRunPlan {
265    /// Returns true when the dry-run preflight found no schema/version failures.
266    #[must_use]
267    pub fn is_ready(&self) -> bool {
268        self.failures.is_empty()
269    }
270
271    /// Build a deterministic fixture verification transcript for this plan.
272    #[must_use]
273    pub fn fixture_verification_transcript(
274        &self,
275        boundary_previous_v1_head_hash: impl Into<String>,
276    ) -> V2FixtureVerificationTranscript {
277        fixture_verification_transcript(self, boundary_previous_v1_head_hash)
278    }
279
280    /// Return the stable BLAKE3-prefixed digest for this plan's fixture transcript.
281    #[must_use]
282    pub fn fixture_verification_result_hash(
283        &self,
284        boundary_previous_v1_head_hash: impl Into<String>,
285    ) -> String {
286        self.fixture_verification_transcript(boundary_previous_v1_head_hash)
287            .digest()
288    }
289}
290
291impl V2MigrationStagePlan {
292    /// Returns true when any stage in this plan enables cutover.
293    #[must_use]
294    pub fn cutover_enabled(&self) -> bool {
295        self.stages.iter().any(|stage| stage.enables_cutover)
296    }
297
298    /// Returns the ordered stable stage names.
299    #[must_use]
300    pub fn stage_names(&self) -> Vec<&'static str> {
301        self.stages.iter().map(|stage| stage.name).collect()
302    }
303}
304
305impl V2DefaultWriteShapeReadinessReport {
306    /// Returns true when the prepared fixture satisfies the default v2 readiness verifier.
307    #[must_use]
308    pub fn is_ready(&self) -> bool {
309        self.readiness_failures.is_empty()
310    }
311
312    /// Returns true when the prepared fixture also satisfies future v2 event framing checks.
313    #[must_use]
314    pub fn is_cutover_ready(&self) -> bool {
315        self.is_ready() && self.event_framing_failures.is_empty()
316    }
317}
318
319impl V2FixtureVerificationTranscript {
320    /// Deterministic JSON value used as the digest preimage.
321    #[must_use]
322    pub fn to_json_value(&self) -> Value {
323        let mut table_counts = self.fixture.table_counts.clone();
324        table_counts.sort_by(|left, right| left.table.cmp(right.table));
325
326        let mut applied_migrations = self.fixture.applied_migrations.clone();
327        applied_migrations.sort();
328
329        json!({
330            "boundary_previous_v1_head_hash": &self.boundary_previous_v1_head_hash,
331            "failures": self.failures.iter().map(|failure| {
332                json!({
333                    "detail": &failure.detail,
334                    "invariant": &failure.invariant,
335                })
336            }).collect::<Vec<_>>(),
337            "fixture": {
338                "applied_migrations": applied_migrations,
339                "event_chain_head": &self.fixture.event_chain_head,
340                "expected_schema_version": self.fixture.expected_schema_version,
341                "observed_row_schema_versions": &self.fixture.observed_row_schema_versions,
342                "table_counts": table_counts.iter().map(|count| {
343                    json!({
344                        "rows": count.rows,
345                        "table": count.table,
346                    })
347                }).collect::<Vec<_>>(),
348            },
349            "migration_id": self.migration_id,
350            "steps": &self.steps,
351            "transcript_schema_version": self.transcript_schema_version,
352        })
353    }
354
355    /// Canonical JSON bytes used as the BLAKE3 digest input.
356    #[must_use]
357    pub fn canonical_json_bytes(&self) -> Vec<u8> {
358        canonical_payload_bytes(&self.to_json_value())
359    }
360
361    /// Stable BLAKE3-prefixed digest of the canonical transcript.
362    #[must_use]
363    pub fn digest(&self) -> String {
364        format!("blake3:{}", payload_hash(&self.to_json_value()))
365    }
366}
367
368/// Inspect the current store as a schema v1 migration fixture.
369pub fn inspect_v1_fixture(pool: &Pool) -> StoreResult<V1FixtureReport> {
370    Ok(V1FixtureReport {
371        expected_schema_version: cortex_core::SCHEMA_VERSION,
372        observed_row_schema_versions: observed_schema_versions(pool)?,
373        table_counts: table_counts(pool)?,
374        event_chain_head: event_chain_head(pool)?,
375        applied_migrations: applied_migrations(pool)?,
376    })
377}
378
379/// Produce a read-only v2 migration plan for a v1 store.
380///
381/// Schema v2 atomic cutover (ADR 0018): the post-cutover binary's default
382/// migration bundle creates the S2.9 columns at `apply_pending` time. The
383/// dry-run plan therefore only refuses *future*-schema rows
384/// (`schema_version > SCHEMA_VERSION`) and intermediate-shape inconsistencies
385/// stronger than a missing row-level backfill. Backfill completeness is a
386/// downstream concern checked by the `apply_expand_backfill_skeleton` helper
387/// and the default-v2 cutover readiness gate, not by the read-only dry-run
388/// surface.
389pub fn dry_run_plan(pool: &Pool) -> StoreResult<V2DryRunPlan> {
390    let fixture = inspect_v1_fixture(pool)?;
391    let mut failures = verify_schema_version(pool, cortex_core::SCHEMA_VERSION)?.failures;
392    failures.extend(dry_run_shape_failures(pool)?);
393
394    Ok(V2DryRunPlan {
395        fixture,
396        steps: DRY_RUN_STEPS.to_vec(),
397        failures,
398    })
399}
400
401/// Subset of [`verify_schema_v2_expand_shape`] failures relevant to the
402/// dry-run plan: partial/missing expand artifacts only. Row-level backfill
403/// gaps are deliberately omitted — they describe a state that must precede
404/// the explicit backfill helper, not a dry-run blocker.
405fn dry_run_shape_failures(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
406    let all = verify_schema_v2_expand_shape(pool)?;
407    Ok(all
408        .into_iter()
409        .filter(|failure| match failure {
410            SchemaVersionFailure::IllegalIntermediateShape { invariant, .. } => {
411                // Keep partial-shape failures; drop backfill-completeness ones.
412                *invariant != "schema_v2_expand_backfill.complete"
413            }
414            _ => true,
415        })
416        .collect())
417}
418
419/// Produce the store-only schema v2 migration stage plan.
420///
421/// This is a pure planning surface: it sequences the store-side staging work
422/// without applying DDL, appending a boundary event, running post-migration
423/// audit, changing `_migrations`, or bumping `SCHEMA_VERSION`.
424pub fn staged_execution_plan(pool: &Pool) -> StoreResult<V2MigrationStagePlan> {
425    let dry_run = dry_run_plan(pool)?;
426    let store_ready_status = if dry_run.is_ready() {
427        V2MigrationStageStatus::Ready
428    } else {
429        V2MigrationStageStatus::Blocked
430    };
431
432    Ok(V2MigrationStagePlan {
433        dry_run,
434        stages: vec![
435            V2MigrationStage {
436                name: STAGE_BACKUP_PREFLIGHT_READY,
437                status: store_ready_status,
438                mutates_store: false,
439                enables_cutover: false,
440                reason: "schema v1 preflight passed; backup manifest remains an external CLI/operator gate",
441            },
442            V2MigrationStage {
443                name: STAGE_EXPAND_BACKFILL,
444                status: store_ready_status,
445                mutates_store: true,
446                enables_cutover: false,
447                reason: "store may apply nullable v2 expand/backfill skeleton while row schema versions stay v1",
448            },
449            V2MigrationStage {
450                name: STAGE_BOUNDARY_APPEND_PENDING,
451                status: V2MigrationStageStatus::Pending,
452                mutates_store: false,
453                enables_cutover: false,
454                reason: "boundary append is ledger/CLI cutover work and is not executable from cortex-store",
455            },
456            V2MigrationStage {
457                name: STAGE_POST_MIGRATION_AUDIT_PENDING,
458                status: V2MigrationStageStatus::Pending,
459                mutates_store: false,
460                enables_cutover: false,
461                reason: "post-migration audit is pending until full migrate and boundary append exist",
462            },
463        ],
464    })
465}
466
467/// Build the deterministic fixture verification transcript for a dry-run plan.
468#[must_use]
469pub fn fixture_verification_transcript(
470    plan: &V2DryRunPlan,
471    boundary_previous_v1_head_hash: impl Into<String>,
472) -> V2FixtureVerificationTranscript {
473    let mut failures = plan
474        .failures
475        .iter()
476        .map(|failure| V2FixtureVerificationFailure {
477            invariant: failure.invariant(),
478            detail: failure.detail(),
479        })
480        .collect::<Vec<_>>();
481    failures.sort_by(|left, right| {
482        left.invariant
483            .cmp(&right.invariant)
484            .then_with(|| left.detail.cmp(&right.detail))
485    });
486
487    V2FixtureVerificationTranscript {
488        transcript_schema_version: FIXTURE_VERIFICATION_TRANSCRIPT_SCHEMA_VERSION,
489        migration_id: FIXTURE_VERIFICATION_MIGRATION_ID,
490        boundary_previous_v1_head_hash: boundary_previous_v1_head_hash.into(),
491        fixture: plan.fixture.clone(),
492        steps: plan.steps.clone(),
493        failures,
494    }
495}
496
497/// Return the deterministic BLAKE3-prefixed fixture verification result hash.
498#[must_use]
499pub fn fixture_verification_result_hash(
500    plan: &V2DryRunPlan,
501    boundary_previous_v1_head_hash: impl Into<String>,
502) -> String {
503    fixture_verification_transcript(plan, boundary_previous_v1_head_hash).digest()
504}
505
506/// Return the durable S2.9 artifacts checked before default schema-v2 persistence can be enabled.
507#[must_use]
508pub fn durable_s2_9_artifacts() -> &'static [V2DurableS29Artifact] {
509    DURABLE_S2_9_ARTIFACTS
510}
511
512/// Return default schema-v2 persistence readiness failures without mutating the store.
513pub fn default_v2_persistence_readiness_failures(
514    pool: &Pool,
515) -> StoreResult<Vec<SchemaVersionFailure>> {
516    verify_schema_v2_default_persistence_readiness(pool)
517}
518
519/// Return future default schema-v2 event framing failures without mutating the store.
520///
521/// This validates only rows already marked with the future v2 row schema. It is not called from
522/// startup, `apply_pending`, or the v1 repository append path.
523pub fn default_v2_event_framing_readiness_failures(
524    pool: &Pool,
525) -> StoreResult<Vec<SchemaVersionFailure>> {
526    let mut failures = Vec::new();
527    let mut stmt = pool.prepare(
528        "SELECT id, schema_version, payload_json, payload_hash, prev_event_hash, event_hash
529         FROM events
530         ORDER BY id;",
531    )?;
532    let rows = stmt.query_map([], |row| {
533        Ok((
534            row.get::<_, String>(0)?,
535            row.get::<_, u16>(1)?,
536            row.get::<_, String>(2)?,
537            row.get::<_, String>(3)?,
538            row.get::<_, Option<String>>(4)?,
539            row.get::<_, String>(5)?,
540        ))
541    })?;
542
543    for row in rows {
544        let (
545            id,
546            schema_version,
547            payload_json,
548            stored_payload_hash,
549            prev_event_hash,
550            stored_event_hash,
551        ) = row?;
552        if schema_version != DEFAULT_SCHEMA_V2_TARGET {
553            continue;
554        }
555
556        let payload = match serde_json::from_str::<Value>(&payload_json) {
557            Ok(payload) => payload,
558            Err(err) => {
559                failures.push(v2_event_framing_failure(
560                    id,
561                    format!("payload_json is not valid JSON: {err}"),
562                ));
563                continue;
564            }
565        };
566        let expected_payload_hash = payload_hash(&payload);
567        if stored_payload_hash != expected_payload_hash {
568            failures.push(v2_event_framing_failure(
569                &id,
570                format!(
571                    "payload_hash mismatch under future v2 framing: expected {expected_payload_hash}, found {stored_payload_hash}"
572                ),
573            ));
574        }
575
576        let expected_event_hash =
577            ledger_event_hash(prev_event_hash.as_deref(), &stored_payload_hash);
578        if stored_event_hash != expected_event_hash {
579            failures.push(v2_event_framing_failure(
580                id,
581                format!(
582                    "event_hash mismatch under future v2 framing: expected {expected_event_hash}, found {stored_event_hash}"
583                ),
584            ));
585        }
586    }
587
588    Ok(failures)
589}
590
591/// Return default schema-v2 cutover readiness failures without mutating the store.
592pub fn default_v2_cutover_readiness_failures(
593    pool: &Pool,
594) -> StoreResult<Vec<SchemaVersionFailure>> {
595    let mut failures = default_v2_persistence_readiness_failures(pool)?;
596    failures.extend(default_v2_event_framing_readiness_failures(pool)?);
597    Ok(failures)
598}
599
600/// Fail closed unless the current store satisfies default schema-v2 persistence readiness.
601pub fn require_default_v2_persistence_readiness(pool: &Pool) -> StoreResult<()> {
602    let failures = default_v2_persistence_readiness_failures(pool)?;
603    if failures.is_empty() {
604        Ok(())
605    } else {
606        Err(StoreError::Validation(format!(
607            "schema v2 default persistence readiness failed: {failures:?}"
608        )))
609    }
610}
611
612/// Fail closed unless the current store satisfies default schema-v2 cutover readiness.
613pub fn require_default_v2_cutover_readiness(pool: &Pool) -> StoreResult<()> {
614    let failures = default_v2_cutover_readiness_failures(pool)?;
615    if failures.is_empty() {
616        Ok(())
617    } else {
618        Err(StoreError::Validation(format!(
619            "schema v2 default cutover readiness failed: {failures:?}"
620        )))
621    }
622}
623
624/// Explicitly add nullable schema v2 columns and conservative legacy markers.
625///
626/// This is a pre-cutover helper for Lane S2 fixture drills. It intentionally
627/// leaves row-level schema versions at v1 and does not record a normal
628/// `_migrations` row, because the live binary is still `SCHEMA_VERSION = 1`.
629pub fn apply_expand_backfill_skeleton(
630    pool: &Pool,
631    imported_at: DateTime<Utc>,
632) -> StoreResult<V2ExpandBackfillReport> {
633    // Schema v2 atomic cutover (ADR 0018): the default migration bundle now
634    // creates the S2.9 columns and side tables at `apply_pending` time, so the
635    // expand-shape check would otherwise fail closed on every fresh store —
636    // backfill is exactly what makes it pass. We still reject *future*-schema
637    // rows (`schema_version > SCHEMA_VERSION`) so a leaked v3 row cannot
638    // smuggle in: that gate lives in `verify_schema_version`. Historical v1
639    // rows (`schema_version < SCHEMA_VERSION`) are accepted and backfilled.
640    let schema_version_report = verify_schema_version(pool, cortex_core::SCHEMA_VERSION)?;
641    if !schema_version_report.is_ok() {
642        return Err(StoreError::Validation(format!(
643            "schema v2 expand/backfill preflight refused future-schema rows: {:?}",
644            schema_version_report.failures
645        )));
646    }
647
648    let mut report = V2ExpandBackfillReport {
649        added_columns: Vec::new(),
650        created_tables: Vec::new(),
651        legacy_event_attestations_backfilled: 0,
652        episode_summary_spans_backfilled: 0,
653        memory_summary_spans_backfilled: 0,
654        context_pack_advisories_backfilled: 0,
655        memory_salience_defaults_backfilled: 0,
656    };
657
658    add_column_if_missing(
659        pool,
660        &mut report,
661        "events",
662        "source_attestation_json",
663        "ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
664         CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
665    )?;
666    add_column_if_missing(
667        pool,
668        &mut report,
669        "episodes",
670        "summary_spans_json",
671        "ALTER TABLE episodes ADD COLUMN summary_spans_json TEXT NULL \
672         CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
673    )?;
674    add_column_if_missing(
675        pool,
676        &mut report,
677        "memories",
678        "summary_spans_json",
679        "ALTER TABLE memories ADD COLUMN summary_spans_json TEXT NULL \
680         CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
681    )?;
682    add_column_if_missing(
683        pool,
684        &mut report,
685        "memories",
686        "cross_session_use_count",
687        "ALTER TABLE memories ADD COLUMN cross_session_use_count INTEGER NULL \
688         CHECK (cross_session_use_count IS NULL OR cross_session_use_count >= 0);",
689    )?;
690    add_column_if_missing(
691        pool,
692        &mut report,
693        "memories",
694        "first_used_at",
695        "ALTER TABLE memories ADD COLUMN first_used_at TEXT NULL;",
696    )?;
697    add_column_if_missing(
698        pool,
699        &mut report,
700        "memories",
701        "last_cross_session_use_at",
702        "ALTER TABLE memories ADD COLUMN last_cross_session_use_at TEXT NULL;",
703    )?;
704    add_column_if_missing(
705        pool,
706        &mut report,
707        "memories",
708        "last_validation_at",
709        "ALTER TABLE memories ADD COLUMN last_validation_at TEXT NULL;",
710    )?;
711    add_column_if_missing(
712        pool,
713        &mut report,
714        "memories",
715        "validation_epoch",
716        "ALTER TABLE memories ADD COLUMN validation_epoch INTEGER NULL \
717         CHECK (validation_epoch IS NULL OR validation_epoch >= 0);",
718    )?;
719    add_column_if_missing(
720        pool,
721        &mut report,
722        "memories",
723        "blessed_until",
724        "ALTER TABLE memories ADD COLUMN blessed_until TEXT NULL;",
725    )?;
726    add_column_if_missing(
727        pool,
728        &mut report,
729        "context_packs",
730        "consumer_advisory_json",
731        "ALTER TABLE context_packs ADD COLUMN consumer_advisory_json TEXT NULL \
732         CHECK (consumer_advisory_json IS NULL OR json_valid(consumer_advisory_json));",
733    )?;
734
735    create_table_if_missing(
736        pool,
737        &mut report,
738        "memory_session_uses",
739        "CREATE TABLE memory_session_uses (
740            memory_id TEXT NOT NULL REFERENCES memories(id),
741            session_id TEXT NOT NULL,
742            first_used_at TEXT NOT NULL,
743            last_used_at TEXT NOT NULL,
744            use_count INTEGER NOT NULL CHECK (use_count >= 0),
745            PRIMARY KEY (memory_id, session_id)
746        );",
747    )?;
748    create_table_if_missing(
749        pool,
750        &mut report,
751        "outcome_memory_relations",
752        "CREATE TABLE outcome_memory_relations (
753            outcome_ref TEXT NOT NULL,
754            memory_id TEXT NOT NULL REFERENCES memories(id),
755            relation TEXT NOT NULL,
756            recorded_at TEXT NOT NULL,
757            source_event_id TEXT NULL REFERENCES events(id),
758            PRIMARY KEY (outcome_ref, memory_id, relation)
759        );",
760    )?;
761
762    report.legacy_event_attestations_backfilled =
763        backfill_legacy_event_attestations(pool, imported_at)?;
764    report.episode_summary_spans_backfilled = backfill_episode_summary_spans(pool)?;
765    report.memory_summary_spans_backfilled = backfill_memory_summary_spans(pool)?;
766    report.context_pack_advisories_backfilled = backfill_context_pack_advisories(pool)?;
767    report.memory_salience_defaults_backfilled = backfill_memory_salience_defaults(pool)?;
768
769    Ok(report)
770}
771
772/// Prepare a store fixture for explicit default schema-v2 write-shape readiness checks.
773///
774/// This helper is intentionally not called from startup, `apply_pending`, or the normal v1
775/// repository append path. It is a cutover-readiness surface for Lane S2 tests: it constructs the
776/// required S2.9 store fields, then moves event/trace row-version tags to the default v2 target so
777/// [`verify_schema_v2_default_persistence_readiness`] can validate the resulting write shape while
778/// `cortex_core::SCHEMA_VERSION` remains 1.
779pub fn prepare_default_v2_write_shape_for_readiness(
780    pool: &Pool,
781    imported_at: DateTime<Utc>,
782) -> StoreResult<V2DefaultWriteShapeReadinessReport> {
783    let expand_backfill = apply_expand_backfill_skeleton(pool, imported_at)?;
784    let event_rows_marked_v2 = mark_schema_version(pool, "events", DEFAULT_SCHEMA_V2_TARGET)?;
785    let trace_rows_marked_v2 = mark_schema_version(pool, "traces", DEFAULT_SCHEMA_V2_TARGET)?;
786    let readiness_failures = default_v2_persistence_readiness_failures(pool)?;
787    let event_framing_failures = default_v2_event_framing_readiness_failures(pool)?;
788
789    Ok(V2DefaultWriteShapeReadinessReport {
790        expand_backfill,
791        event_rows_marked_v2,
792        trace_rows_marked_v2,
793        readiness_failures,
794        event_framing_failures,
795    })
796}
797
798fn v2_event_framing_failure(
799    row_id: impl Into<String>,
800    detail: impl Into<String>,
801) -> SchemaVersionFailure {
802    SchemaVersionFailure::IllegalIntermediateShape {
803        invariant: "schema_v2_default_persistence.event_framing.valid",
804        detail: format!("events row {} {}", row_id.into(), detail.into()),
805    }
806}
807
808fn observed_schema_versions(pool: &Pool) -> StoreResult<Vec<u16>> {
809    let mut versions = Vec::new();
810    for table in ["events", "traces"] {
811        let sql = format!("SELECT DISTINCT schema_version FROM {table} ORDER BY schema_version;");
812        let mut stmt = pool.prepare(&sql)?;
813        let rows = stmt.query_map([], |row| row.get::<_, u16>(0))?;
814        for row in rows {
815            let version = row?;
816            if !versions.contains(&version) {
817                versions.push(version);
818            }
819        }
820    }
821    versions.sort_unstable();
822    Ok(versions)
823}
824
825fn table_counts(pool: &Pool) -> StoreResult<Vec<FixtureTableCount>> {
826    FIXTURE_COUNT_TABLES
827        .iter()
828        .map(|table| {
829            let sql = format!("SELECT COUNT(*) FROM {table};");
830            let rows = pool.query_row(&sql, [], |row| row.get::<_, u64>(0))?;
831            Ok(FixtureTableCount { table, rows })
832        })
833        .collect()
834}
835
836fn event_chain_head(pool: &Pool) -> StoreResult<Option<String>> {
837    let head = pool
838        .query_row(
839            "SELECT e.event_hash
840             FROM events e
841             WHERE NOT EXISTS (
842                 SELECT 1 FROM events child WHERE child.prev_event_hash = e.event_hash
843             )
844             ORDER BY e.recorded_at DESC, e.id DESC
845             LIMIT 1;",
846            [],
847            |row| row.get::<_, String>(0),
848        )
849        .optional()?;
850    Ok(head)
851}
852
853fn applied_migrations(pool: &Pool) -> StoreResult<Vec<String>> {
854    let mut stmt = pool.prepare("SELECT name FROM _migrations ORDER BY name;")?;
855    let rows = stmt.query_map(params![], |row| row.get::<_, String>(0))?;
856    rows.collect::<Result<_, _>>().map_err(Into::into)
857}
858
859fn add_column_if_missing(
860    pool: &Pool,
861    report: &mut V2ExpandBackfillReport,
862    table: &'static str,
863    column: &'static str,
864    ddl: &str,
865) -> StoreResult<()> {
866    if has_column(pool, table, column)? {
867        return Ok(());
868    }
869
870    pool.execute_batch(ddl)?;
871    report.added_columns.push(format!("{table}.{column}"));
872    Ok(())
873}
874
875fn create_table_if_missing(
876    pool: &Pool,
877    report: &mut V2ExpandBackfillReport,
878    table: &'static str,
879    ddl: &str,
880) -> StoreResult<()> {
881    if has_table(pool, table)? {
882        return Ok(());
883    }
884
885    pool.execute_batch(ddl)?;
886    report.created_tables.push(table);
887    Ok(())
888}
889
890/// Promote any rows still tagged at the legacy v1 schema version to the
891/// default v2 target.
892///
893/// Schema v2 atomic cutover (ADR 0018): the row-version source is the legacy
894/// v1 marker (`1`), not `cortex_core::SCHEMA_VERSION`. Post-cutover the running
895/// constant is `2`, so reading the source value from the constant would turn
896/// this UPDATE into a no-op on every store that still carries v1 rows.
897fn mark_schema_version(pool: &Pool, table: &'static str, target: u16) -> StoreResult<u64> {
898    const LEGACY_V1_ROW_SCHEMA_VERSION: u16 = 1;
899    let sql = format!(
900        "UPDATE {table}
901         SET schema_version = ?1
902         WHERE schema_version = ?2;"
903    );
904    let changed = pool.execute(&sql, params![target, LEGACY_V1_ROW_SCHEMA_VERSION])?;
905    Ok(changed as u64)
906}
907
908fn has_table(pool: &Pool, table: &str) -> StoreResult<bool> {
909    let existing = pool
910        .query_row(
911            "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?1;",
912            params![table],
913            |row| row.get::<_, String>(0),
914        )
915        .optional()?;
916    Ok(existing.is_some())
917}
918
919fn has_column(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
920    let sql = format!("PRAGMA table_info({table});");
921    let mut stmt = pool.prepare(&sql)?;
922    let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
923    for found in columns {
924        if found? == column {
925            return Ok(true);
926        }
927    }
928    Ok(false)
929}
930
931/// Backfill `events.source_attestation_json` with the `LegacyUnattested` marker
932/// for any rows still missing one.
933///
934/// `apply_expand_backfill_skeleton` runs this once before the boundary append.
935/// The schema v2 atomic cutover then re-runs it after mirroring the boundary
936/// row into SQLite so that the boundary row itself is stamped as
937/// `legacy_unattested` (ADR 0010 §1 placeholder until operator attestation is
938/// wired) and the default-v2 cutover readiness gate passes.
939///
940/// # B2 boundary attestation choice (RED_TEAM_FINDINGS phase B)
941///
942/// The red-team flagged that the boundary `schema_migration.v1_to_v2` row is
943/// doctrinally NOT a "legacy v1 import" — it is the row that *announces* v2.
944/// Stamping it `LegacyUnattested` reads as a category error.
945///
946/// The choice to keep `LegacyUnattested` for the boundary row in this slice
947/// is deliberate and narrow:
948///
949/// - The CLI cutover (`cortex migrate v2`) DOES verify an Ed25519-signed
950///   operator attestation envelope at the migration authority root (ADR 0010
951///   §1-§2, Gate 5 punch list #17) before the boundary append. The boundary
952///   payload triple `(previous_v1_head_hash, migration_script_digest,
953///   fixture_verification_result_hash)` is bound by that signature. That
954///   verified evidence is NOT materialised onto the boundary row's
955///   `source_attestation_json` column today — the column still receives the
956///   `legacy_unattested` marker.
957/// - `SourceAttestation::Missing` would be marginally more honest
958///   ("no operator attestation captured **on this column** yet"), but the
959///   default-v2 readiness gate currently treats both variants identically
960///   (both pass `verify_v2_source_attestations`), so swapping the marker
961///   would change the operator-facing JSON shape without changing any gate
962///   behaviour.
963/// - A future slice will replace this `legacy_unattested` stamp with
964///   `SourceAttestation::Verified(..)` sourced from the operator envelope
965///   verified at the migration authority root. Until then, this in-code
966///   comment is the durable record of the placeholder choice (B2).
967///
968/// Idempotent: rows whose `source_attestation_json` is already set are left
969/// alone, including a boundary row that a future slice has already stamped
970/// with a verified operator attestation.
971pub fn backfill_legacy_event_attestations(
972    pool: &Pool,
973    imported_at: DateTime<Utc>,
974) -> StoreResult<u64> {
975    // SAFETY (B2): this UPDATE stamps `legacy_unattested` on the boundary row
976    // as the doctrine-placeholder explained on the function doc above. Do
977    // NOT remove the `WHERE source_attestation_json IS NULL` clause — once a
978    // future slice writes a verified operator attestation onto the boundary
979    // row, this backfill must observe that as already-set and skip the row.
980    let changed = pool.execute(
981        "UPDATE events
982         SET source_attestation_json = json_object(
983            'state', 'legacy_unattested',
984            'value', json_object(
985                'imported_at', ?1,
986                'original_recorded_at', recorded_at
987            )
988         )
989         WHERE source_attestation_json IS NULL;",
990        params![imported_at.to_rfc3339()],
991    )?;
992    Ok(changed as u64)
993}
994
995fn backfill_episode_summary_spans(pool: &Pool) -> StoreResult<u64> {
996    let mut stmt = pool.prepare(
997        "SELECT id, summary, source_events_json
998         FROM episodes
999         WHERE summary_spans_json IS NULL
1000         ORDER BY id;",
1001    )?;
1002    let rows = stmt.query_map([], |row| {
1003        Ok((
1004            row.get::<_, String>(0)?,
1005            row.get::<_, String>(1)?,
1006            row.get::<_, String>(2)?,
1007        ))
1008    })?;
1009
1010    let mut changed = 0;
1011    for row in rows {
1012        let (id, summary, source_events_json) = row?;
1013        let source_events: Value = serde_json::from_str(&source_events_json)?;
1014        let spans = legacy_summary_spans(&summary, source_events);
1015        pool.execute(
1016            "UPDATE episodes SET summary_spans_json = ?1 WHERE id = ?2;",
1017            params![serde_json::to_string(&spans)?, id],
1018        )?;
1019        changed += 1;
1020    }
1021    Ok(changed)
1022}
1023
1024fn backfill_memory_summary_spans(pool: &Pool) -> StoreResult<u64> {
1025    let mut stmt = pool.prepare(
1026        "SELECT id, memory_type, claim, source_events_json
1027         FROM memories
1028         WHERE summary_spans_json IS NULL
1029         ORDER BY id;",
1030    )?;
1031    let rows = stmt.query_map([], |row| {
1032        Ok((
1033            row.get::<_, String>(0)?,
1034            row.get::<_, String>(1)?,
1035            row.get::<_, String>(2)?,
1036            row.get::<_, String>(3)?,
1037        ))
1038    })?;
1039
1040    let mut changed = 0;
1041    for row in rows {
1042        let (id, memory_type, claim, source_events_json) = row?;
1043        let source_events: Value = serde_json::from_str(&source_events_json)?;
1044        let spans = if memory_type.contains("summary") {
1045            legacy_summary_spans(&claim, source_events)
1046        } else {
1047            json!([])
1048        };
1049        pool.execute(
1050            "UPDATE memories SET summary_spans_json = ?1 WHERE id = ?2;",
1051            params![serde_json::to_string(&spans)?, id],
1052        )?;
1053        changed += 1;
1054    }
1055    Ok(changed)
1056}
1057
1058fn backfill_context_pack_advisories(pool: &Pool) -> StoreResult<u64> {
1059    let advisory = json!({
1060        "render_trust": "untrusted_rendering",
1061        "execution_trust": "untrusted_execution",
1062        "flags": ["contains_unattested_sources"],
1063        "advisory_text": "Legacy v1 context pack: render as untrusted text; do not execute pack-derived strings."
1064    });
1065    let changed = pool.execute(
1066        "UPDATE context_packs
1067         SET consumer_advisory_json = ?1
1068         WHERE consumer_advisory_json IS NULL;",
1069        params![serde_json::to_string(&advisory)?],
1070    )?;
1071    Ok(changed as u64)
1072}
1073
1074fn backfill_memory_salience_defaults(pool: &Pool) -> StoreResult<u64> {
1075    let changed = pool.execute(
1076        "UPDATE memories
1077         SET cross_session_use_count = COALESCE(cross_session_use_count, 0),
1078             validation_epoch = COALESCE(validation_epoch, 0)
1079         WHERE cross_session_use_count IS NULL
1080            OR validation_epoch IS NULL;",
1081        [],
1082    )?;
1083    Ok(changed as u64)
1084}
1085
1086fn legacy_summary_spans(text: &str, source_events: Value) -> Value {
1087    if text.trim().is_empty() {
1088        json!([])
1089    } else {
1090        json!([{
1091            "byte_start": 0,
1092            "byte_end": text.len(),
1093            "derived_from_event_ids": source_events,
1094            "max_source_authority": "derived"
1095        }])
1096    }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use rusqlite::Connection;
1102
1103    use super::*;
1104
1105    fn fixture_pool() -> Connection {
1106        let pool = Connection::open_in_memory().expect("open sqlite");
1107        crate::migrate::apply_pending(&pool).expect("apply migrations");
1108        insert_small_v1_fixture(&pool);
1109        pool
1110    }
1111
1112    fn insert_small_v1_fixture(pool: &Pool) {
1113        pool.execute_batch(
1114            r#"
1115            INSERT INTO events (
1116                id, schema_version, observed_at, recorded_at, source_json, event_type,
1117                trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1118                prev_event_hash, event_hash
1119            ) VALUES
1120                ('evt_s2_001', 1, '2026-05-04T12:00:00Z', '2026-05-04T12:00:01Z',
1121                 '{"kind":"tool","name":"fixture"}', 'tool.result', NULL, 's2-fixture',
1122                 '["s2"]', '{"step":1}', 'payload-1', NULL, 'event-hash-1'),
1123                ('evt_s2_002', 1, '2026-05-04T12:00:02Z', '2026-05-04T12:00:03Z',
1124                 '{"kind":"tool","name":"fixture"}', 'tool.result', NULL, 's2-fixture',
1125                 '["s2"]', '{"step":2}', 'payload-2', 'event-hash-1', 'event-hash-2');
1126
1127            INSERT INTO traces (
1128                id, schema_version, opened_at, closed_at, trace_type, status
1129            ) VALUES (
1130                'trc_s2_001', 1, '2026-05-04T12:00:00Z', NULL, 'migration_fixture', 'open'
1131            );
1132
1133            INSERT INTO trace_events (trace_id, event_id, ordinal) VALUES
1134                ('trc_s2_001', 'evt_s2_001', 0),
1135                ('trc_s2_001', 'evt_s2_002', 1);
1136
1137            INSERT INTO episodes (
1138                id, trace_id, source_events_json, summary, domains_json, entities_json,
1139                candidate_meaning, extracted_by_json, confidence, status
1140            ) VALUES (
1141                'epi_s2_001', 'trc_s2_001', '["evt_s2_001","evt_s2_002"]',
1142                'Small v1 fixture episode.', '["s2"]', '["cortex"]', NULL,
1143                '{"kind":"fixture"}', 0.8, 'candidate'
1144            );
1145
1146            INSERT INTO memories (
1147                id, memory_type, status, claim, source_episodes_json, source_events_json,
1148                domains_json, salience_json, confidence, authority, applies_when_json,
1149                does_not_apply_when_json, created_at, updated_at
1150            ) VALUES (
1151                'mem_s2_001', 'semantic', 'candidate', 'S2 fixture exists.',
1152                '["epi_s2_001"]', '["evt_s2_001","evt_s2_002"]', '["s2"]',
1153                '{"score":0.4}', 0.7, 'candidate', '{}', '{}',
1154                '2026-05-04T12:00:04Z', '2026-05-04T12:00:04Z'
1155            );
1156
1157            INSERT INTO context_packs (
1158                id, task, pack_json, selection_audit, created_at
1159            ) VALUES (
1160                'ctx_s2_001', 'schema v2 fixture', '{"refs":[]}', 'fixture',
1161                '2026-05-04T12:00:05Z'
1162            );
1163
1164            INSERT INTO audit_records (
1165                id, operation, target_ref, before_hash, after_hash, reason, actor_json,
1166                source_refs_json, created_at
1167            ) VALUES (
1168                'aud_s2_001', 'fixture.create', 'mem_s2_001', NULL, 'after',
1169                'small v1 fixture', '{"kind":"test"}', '["evt_s2_001"]',
1170                '2026-05-04T12:00:06Z'
1171            );
1172            "#,
1173        )
1174        .expect("insert small v1 fixture");
1175    }
1176
1177    #[test]
1178    fn dry_run_plan_inspects_small_v1_fixture_without_mutation() {
1179        // Schema v2 atomic cutover (ADR 0018): `apply_pending` now applies
1180        // migration `003_schema_v2_expand` by default, so a fresh fixture
1181        // store already has the S2.9 columns. The fixture inserts v1 rows
1182        // with NULL backfill columns; `dry_run_plan` accordingly reports
1183        // backfill-incomplete failures until the operator runs the
1184        // expand/backfill skeleton. This test inspects the read-only plan
1185        // and asserts it does not mutate the store regardless.
1186        let pool = fixture_pool();
1187        let before = inspect_v1_fixture(&pool).expect("inspect before dry run");
1188
1189        let plan = dry_run_plan(&pool).expect("dry run plan");
1190        let after = inspect_v1_fixture(&pool).expect("inspect after dry run");
1191
1192        assert_eq!(before, after, "dry run must not mutate the v1 fixture");
1193        assert_eq!(
1194            plan.fixture.event_chain_head.as_deref(),
1195            Some("event-hash-2")
1196        );
1197        assert_eq!(plan.fixture.observed_row_schema_versions, vec![1]);
1198        assert!(plan.steps.contains(&"plan_schema_migration_boundary_event"));
1199        assert_eq!(
1200            plan.fixture.applied_migrations,
1201            vec![
1202                "001_init",
1203                "002_authority_timeline",
1204                "003_schema_v2_expand",
1205                "004_principle_promotion_policy_record",
1206                "005_outcome_relation_scope",
1207                "006_fts5_memories",
1208                "007_embeddings",
1209                "008_decay_jobs",
1210                "009_decay_supersessions",
1211                "010_pending_mcp_commit",
1212            ]
1213        );
1214    }
1215
1216    #[test]
1217    fn dry_run_plan_fails_closed_on_future_schema_rows() {
1218        // Post-cutover (ADR 0018, ADR 0033 §6): `SCHEMA_VERSION = 2`, so a row
1219        // claiming `schema_version = 3` is a *future* row this binary cannot
1220        // frame and must fail closed. Historical v1 rows are accepted.
1221        let pool = fixture_pool();
1222        pool.execute(
1223            "UPDATE events SET schema_version = 3 WHERE id = 'evt_s2_002';",
1224            [],
1225        )
1226        .expect("mark one row as future schema");
1227
1228        let plan = dry_run_plan(&pool).expect("dry run plan");
1229
1230        assert!(!plan.is_ready());
1231        assert!(
1232            plan.failures.iter().any(|failure| matches!(
1233                failure,
1234                SchemaVersionFailure::Mismatch {
1235                    table: "events",
1236                    row_id,
1237                    expected: 2,
1238                    actual: 3,
1239                } if row_id == "evt_s2_002"
1240            )),
1241            "expected future v3 events row mismatch failure, got: {:?}",
1242            plan.failures
1243        );
1244    }
1245
1246    #[test]
1247    fn expand_backfill_skeleton_is_idempotent_and_keeps_v1_versions() {
1248        let pool = fixture_pool();
1249        let imported_at = "2026-05-04T13:00:00Z".parse().unwrap();
1250
1251        let first =
1252            apply_expand_backfill_skeleton(&pool, imported_at).expect("first expand/backfill pass");
1253        let second = apply_expand_backfill_skeleton(&pool, imported_at)
1254            .expect("second expand/backfill pass");
1255
1256        // After ADR 0018 the S2.9 columns and side tables ship in migration
1257        // 003_schema_v2_expand and are added by `apply_pending`. The expand
1258        // helper therefore observes them as already-present and reports zero
1259        // added columns / zero created tables. The legacy-attestation and
1260        // S2.9 backfills still run because the fixture's v1 rows still
1261        // arrive with the new columns NULL.
1262        assert!(first.added_columns.is_empty());
1263        assert!(first.created_tables.is_empty());
1264        assert_eq!(first.legacy_event_attestations_backfilled, 2);
1265        assert_eq!(first.episode_summary_spans_backfilled, 1);
1266        assert_eq!(first.memory_summary_spans_backfilled, 1);
1267        assert_eq!(first.context_pack_advisories_backfilled, 1);
1268        assert_eq!(first.memory_salience_defaults_backfilled, 1);
1269
1270        assert!(second.added_columns.is_empty());
1271        assert!(second.created_tables.is_empty());
1272        assert_eq!(second.legacy_event_attestations_backfilled, 0);
1273        assert_eq!(second.episode_summary_spans_backfilled, 0);
1274        assert_eq!(second.memory_summary_spans_backfilled, 0);
1275        assert_eq!(second.context_pack_advisories_backfilled, 0);
1276        assert_eq!(second.memory_salience_defaults_backfilled, 0);
1277
1278        let plan = dry_run_plan(&pool).expect("expanded v1 fixture still has a dry-run plan");
1279        assert!(plan.is_ready(), "unexpected failures: {:?}", plan.failures);
1280        assert_eq!(plan.fixture.observed_row_schema_versions, vec![1]);
1281        assert_eq!(
1282            plan.fixture.applied_migrations,
1283            vec![
1284                "001_init",
1285                "002_authority_timeline",
1286                "003_schema_v2_expand",
1287                "004_principle_promotion_policy_record",
1288                "005_outcome_relation_scope",
1289                "006_fts5_memories",
1290                "007_embeddings",
1291                "008_decay_jobs",
1292                "009_decay_supersessions",
1293                "010_pending_mcp_commit",
1294            ]
1295        );
1296    }
1297
1298    #[test]
1299    fn expand_backfill_skeleton_writes_honest_legacy_markers() {
1300        let pool = fixture_pool();
1301        let imported_at = "2026-05-04T13:00:00Z".parse().unwrap();
1302
1303        apply_expand_backfill_skeleton(&pool, imported_at).expect("expand/backfill");
1304
1305        let source_attestation: serde_json::Value = json_column(
1306            &pool,
1307            "SELECT source_attestation_json FROM events WHERE id = 'evt_s2_001';",
1308        );
1309        assert_eq!(source_attestation["state"], "legacy_unattested");
1310        assert_eq!(
1311            source_attestation["value"]["imported_at"],
1312            "2026-05-04T13:00:00+00:00"
1313        );
1314        assert_eq!(
1315            source_attestation["value"]["original_recorded_at"],
1316            "2026-05-04T12:00:01Z"
1317        );
1318
1319        let episode_spans: serde_json::Value = json_column(
1320            &pool,
1321            "SELECT summary_spans_json FROM episodes WHERE id = 'epi_s2_001';",
1322        );
1323        assert_eq!(episode_spans[0]["byte_start"], 0);
1324        assert_eq!(
1325            episode_spans[0]["byte_end"],
1326            "Small v1 fixture episode.".len()
1327        );
1328        assert_eq!(episode_spans[0]["max_source_authority"], "derived");
1329        assert_eq!(
1330            episode_spans[0]["derived_from_event_ids"],
1331            serde_json::json!(["evt_s2_001", "evt_s2_002"])
1332        );
1333
1334        let memory_spans: serde_json::Value = json_column(
1335            &pool,
1336            "SELECT summary_spans_json FROM memories WHERE id = 'mem_s2_001';",
1337        );
1338        assert_eq!(memory_spans, serde_json::json!([]));
1339
1340        let advisory: serde_json::Value = json_column(
1341            &pool,
1342            "SELECT consumer_advisory_json FROM context_packs WHERE id = 'ctx_s2_001';",
1343        );
1344        assert_eq!(advisory["render_trust"], "untrusted_rendering");
1345        assert_eq!(advisory["execution_trust"], "untrusted_execution");
1346        assert_eq!(
1347            advisory["flags"],
1348            serde_json::json!(["contains_unattested_sources"])
1349        );
1350
1351        let defaults: (u64, u64) = pool
1352            .query_row(
1353                "SELECT cross_session_use_count, validation_epoch
1354                 FROM memories WHERE id = 'mem_s2_001';",
1355                [],
1356                |row| Ok((row.get(0)?, row.get(1)?)),
1357            )
1358            .expect("read salience defaults");
1359        assert_eq!(defaults, (0, 0));
1360    }
1361
1362    #[test]
1363    fn expand_backfill_skeleton_fails_closed_before_mutation_on_future_schema_rows() {
1364        // Post-cutover (ADR 0018, ADR 0033 §6): `SCHEMA_VERSION = 2`, so a
1365        // trace claiming `schema_version = 3` is a future row this binary
1366        // cannot frame. The expand/backfill helper must refuse before adding
1367        // any further columns or running any backfill.
1368        let pool = fixture_pool();
1369        pool.execute(
1370            "UPDATE traces SET schema_version = 3 WHERE id = 'trc_s2_001';",
1371            [],
1372        )
1373        .expect("mark trace as future schema");
1374
1375        let err = apply_expand_backfill_skeleton(&pool, "2026-05-04T13:00:00Z".parse().unwrap())
1376            .expect_err("future schema rows must block expand/backfill");
1377
1378        assert!(
1379            err.to_string()
1380                .contains("schema v2 expand/backfill preflight refused future-schema rows"),
1381            "unexpected error: {err}"
1382        );
1383        // The S2.9 column is provided by migration 003 in the default bundle
1384        // (`apply_pending`), so it must already be present regardless of the
1385        // expand-helper refusal path.
1386        assert!(has_column(&pool, "events", "source_attestation_json").unwrap());
1387    }
1388
1389    fn json_column(pool: &Pool, sql: &str) -> serde_json::Value {
1390        let raw: String = pool
1391            .query_row(sql, [], |row| row.get(0))
1392            .expect("read json column");
1393        serde_json::from_str(&raw).expect("json column parses")
1394    }
1395}