use chrono::{DateTime, Utc};
use cortex_ledger::{event_hash as ledger_event_hash, hash::canonical_payload_bytes, payload_hash};
use rusqlite::{params, OptionalExtension};
use serde_json::{json, Value};
use crate::verify::{
verify_schema_v2_default_persistence_readiness, verify_schema_v2_expand_shape,
verify_schema_version, SchemaVersionFailure,
};
use crate::{Pool, StoreError, StoreResult};
pub const SCHEMA_V2_EXPAND_SQL: &str = include_str!("../migrations/003_schema_v2_expand.sql");
const FIXTURE_COUNT_TABLES: &[&str] = &[
"events",
"traces",
"episodes",
"memories",
"context_packs",
"audit_records",
];
const DRY_RUN_STEPS: &[&str] = &[
"preflight_schema_v1",
"inspect_v1_hash_chain_head",
"plan_expand_nullable_v2_columns",
"plan_legacy_unattested_backfill",
"plan_schema_migration_boundary_event",
"leave_schema_version_unchanged",
];
const STAGE_BACKUP_PREFLIGHT_READY: &str = "backup-preflight-ready";
const STAGE_EXPAND_BACKFILL: &str = "expand/backfill";
const STAGE_BOUNDARY_APPEND_PENDING: &str = "boundary-append-pending";
const STAGE_POST_MIGRATION_AUDIT_PENDING: &str = "post-migration-audit-pending";
const FIXTURE_VERIFICATION_TRANSCRIPT_SCHEMA_VERSION: u16 = 1;
const FIXTURE_VERIFICATION_MIGRATION_ID: &str = "schema_v2_dry_run_fixture_verification";
const DEFAULT_SCHEMA_V2_TARGET: u16 = 2;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum V2DurableS29ArtifactKind {
Column,
Table,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct V2DurableS29Artifact {
pub kind: V2DurableS29ArtifactKind,
pub table: &'static str,
pub column: Option<&'static str>,
}
pub const DURABLE_S2_9_ARTIFACTS: &[V2DurableS29Artifact] = &[
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "events",
column: Some("source_attestation_json"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "episodes",
column: Some("summary_spans_json"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "memories",
column: Some("summary_spans_json"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "memories",
column: Some("cross_session_use_count"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "memories",
column: Some("first_used_at"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "memories",
column: Some("last_cross_session_use_at"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "memories",
column: Some("last_validation_at"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "memories",
column: Some("validation_epoch"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "memories",
column: Some("blessed_until"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Column,
table: "context_packs",
column: Some("consumer_advisory_json"),
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Table,
table: "memory_session_uses",
column: None,
},
V2DurableS29Artifact {
kind: V2DurableS29ArtifactKind::Table,
table: "outcome_memory_relations",
column: None,
},
];
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FixtureTableCount {
pub table: &'static str,
pub rows: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct V1FixtureReport {
pub expected_schema_version: u16,
pub observed_row_schema_versions: Vec<u16>,
pub table_counts: Vec<FixtureTableCount>,
pub event_chain_head: Option<String>,
pub applied_migrations: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct V2DryRunPlan {
pub fixture: V1FixtureReport,
pub steps: Vec<&'static str>,
pub failures: Vec<SchemaVersionFailure>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum V2MigrationStageStatus {
Ready,
Pending,
Blocked,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct V2MigrationStage {
pub name: &'static str,
pub status: V2MigrationStageStatus,
pub mutates_store: bool,
pub enables_cutover: bool,
pub reason: &'static str,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct V2MigrationStagePlan {
pub dry_run: V2DryRunPlan,
pub stages: Vec<V2MigrationStage>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct V2FixtureVerificationFailure {
pub invariant: String,
pub detail: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct V2FixtureVerificationTranscript {
pub transcript_schema_version: u16,
pub migration_id: &'static str,
pub boundary_previous_v1_head_hash: String,
pub fixture: V1FixtureReport,
pub steps: Vec<&'static str>,
pub failures: Vec<V2FixtureVerificationFailure>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct V2ExpandBackfillReport {
pub added_columns: Vec<String>,
pub created_tables: Vec<&'static str>,
pub legacy_event_attestations_backfilled: u64,
pub episode_summary_spans_backfilled: u64,
pub memory_summary_spans_backfilled: u64,
pub context_pack_advisories_backfilled: u64,
pub memory_salience_defaults_backfilled: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct V2DefaultWriteShapeReadinessReport {
pub expand_backfill: V2ExpandBackfillReport,
pub event_rows_marked_v2: u64,
pub trace_rows_marked_v2: u64,
pub readiness_failures: Vec<SchemaVersionFailure>,
pub event_framing_failures: Vec<SchemaVersionFailure>,
}
impl V2DryRunPlan {
#[must_use]
pub fn is_ready(&self) -> bool {
self.failures.is_empty()
}
#[must_use]
pub fn fixture_verification_transcript(
&self,
boundary_previous_v1_head_hash: impl Into<String>,
) -> V2FixtureVerificationTranscript {
fixture_verification_transcript(self, boundary_previous_v1_head_hash)
}
#[must_use]
pub fn fixture_verification_result_hash(
&self,
boundary_previous_v1_head_hash: impl Into<String>,
) -> String {
self.fixture_verification_transcript(boundary_previous_v1_head_hash)
.digest()
}
}
impl V2MigrationStagePlan {
#[must_use]
pub fn cutover_enabled(&self) -> bool {
self.stages.iter().any(|stage| stage.enables_cutover)
}
#[must_use]
pub fn stage_names(&self) -> Vec<&'static str> {
self.stages.iter().map(|stage| stage.name).collect()
}
}
impl V2DefaultWriteShapeReadinessReport {
#[must_use]
pub fn is_ready(&self) -> bool {
self.readiness_failures.is_empty()
}
#[must_use]
pub fn is_cutover_ready(&self) -> bool {
self.is_ready() && self.event_framing_failures.is_empty()
}
}
impl V2FixtureVerificationTranscript {
#[must_use]
pub fn to_json_value(&self) -> Value {
let mut table_counts = self.fixture.table_counts.clone();
table_counts.sort_by(|left, right| left.table.cmp(right.table));
let mut applied_migrations = self.fixture.applied_migrations.clone();
applied_migrations.sort();
json!({
"boundary_previous_v1_head_hash": &self.boundary_previous_v1_head_hash,
"failures": self.failures.iter().map(|failure| {
json!({
"detail": &failure.detail,
"invariant": &failure.invariant,
})
}).collect::<Vec<_>>(),
"fixture": {
"applied_migrations": applied_migrations,
"event_chain_head": &self.fixture.event_chain_head,
"expected_schema_version": self.fixture.expected_schema_version,
"observed_row_schema_versions": &self.fixture.observed_row_schema_versions,
"table_counts": table_counts.iter().map(|count| {
json!({
"rows": count.rows,
"table": count.table,
})
}).collect::<Vec<_>>(),
},
"migration_id": self.migration_id,
"steps": &self.steps,
"transcript_schema_version": self.transcript_schema_version,
})
}
#[must_use]
pub fn canonical_json_bytes(&self) -> Vec<u8> {
canonical_payload_bytes(&self.to_json_value())
}
#[must_use]
pub fn digest(&self) -> String {
format!("blake3:{}", payload_hash(&self.to_json_value()))
}
}
pub fn inspect_v1_fixture(pool: &Pool) -> StoreResult<V1FixtureReport> {
Ok(V1FixtureReport {
expected_schema_version: cortex_core::SCHEMA_VERSION,
observed_row_schema_versions: observed_schema_versions(pool)?,
table_counts: table_counts(pool)?,
event_chain_head: event_chain_head(pool)?,
applied_migrations: applied_migrations(pool)?,
})
}
pub fn dry_run_plan(pool: &Pool) -> StoreResult<V2DryRunPlan> {
let fixture = inspect_v1_fixture(pool)?;
let mut failures = verify_schema_version(pool, cortex_core::SCHEMA_VERSION)?.failures;
failures.extend(dry_run_shape_failures(pool)?);
Ok(V2DryRunPlan {
fixture,
steps: DRY_RUN_STEPS.to_vec(),
failures,
})
}
fn dry_run_shape_failures(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let all = verify_schema_v2_expand_shape(pool)?;
Ok(all
.into_iter()
.filter(|failure| match failure {
SchemaVersionFailure::IllegalIntermediateShape { invariant, .. } => {
*invariant != "schema_v2_expand_backfill.complete"
}
_ => true,
})
.collect())
}
pub fn staged_execution_plan(pool: &Pool) -> StoreResult<V2MigrationStagePlan> {
let dry_run = dry_run_plan(pool)?;
let store_ready_status = if dry_run.is_ready() {
V2MigrationStageStatus::Ready
} else {
V2MigrationStageStatus::Blocked
};
Ok(V2MigrationStagePlan {
dry_run,
stages: vec![
V2MigrationStage {
name: STAGE_BACKUP_PREFLIGHT_READY,
status: store_ready_status,
mutates_store: false,
enables_cutover: false,
reason: "schema v1 preflight passed; backup manifest remains an external CLI/operator gate",
},
V2MigrationStage {
name: STAGE_EXPAND_BACKFILL,
status: store_ready_status,
mutates_store: true,
enables_cutover: false,
reason: "store may apply nullable v2 expand/backfill skeleton while row schema versions stay v1",
},
V2MigrationStage {
name: STAGE_BOUNDARY_APPEND_PENDING,
status: V2MigrationStageStatus::Pending,
mutates_store: false,
enables_cutover: false,
reason: "boundary append is ledger/CLI cutover work and is not executable from cortex-store",
},
V2MigrationStage {
name: STAGE_POST_MIGRATION_AUDIT_PENDING,
status: V2MigrationStageStatus::Pending,
mutates_store: false,
enables_cutover: false,
reason: "post-migration audit is pending until full migrate and boundary append exist",
},
],
})
}
#[must_use]
pub fn fixture_verification_transcript(
plan: &V2DryRunPlan,
boundary_previous_v1_head_hash: impl Into<String>,
) -> V2FixtureVerificationTranscript {
let mut failures = plan
.failures
.iter()
.map(|failure| V2FixtureVerificationFailure {
invariant: failure.invariant(),
detail: failure.detail(),
})
.collect::<Vec<_>>();
failures.sort_by(|left, right| {
left.invariant
.cmp(&right.invariant)
.then_with(|| left.detail.cmp(&right.detail))
});
V2FixtureVerificationTranscript {
transcript_schema_version: FIXTURE_VERIFICATION_TRANSCRIPT_SCHEMA_VERSION,
migration_id: FIXTURE_VERIFICATION_MIGRATION_ID,
boundary_previous_v1_head_hash: boundary_previous_v1_head_hash.into(),
fixture: plan.fixture.clone(),
steps: plan.steps.clone(),
failures,
}
}
#[must_use]
pub fn fixture_verification_result_hash(
plan: &V2DryRunPlan,
boundary_previous_v1_head_hash: impl Into<String>,
) -> String {
fixture_verification_transcript(plan, boundary_previous_v1_head_hash).digest()
}
#[must_use]
pub fn durable_s2_9_artifacts() -> &'static [V2DurableS29Artifact] {
DURABLE_S2_9_ARTIFACTS
}
pub fn default_v2_persistence_readiness_failures(
pool: &Pool,
) -> StoreResult<Vec<SchemaVersionFailure>> {
verify_schema_v2_default_persistence_readiness(pool)
}
pub fn default_v2_event_framing_readiness_failures(
pool: &Pool,
) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut stmt = pool.prepare(
"SELECT id, schema_version, payload_json, payload_hash, prev_event_hash, event_hash
FROM events
ORDER BY id;",
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, u16>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, String>(5)?,
))
})?;
for row in rows {
let (
id,
schema_version,
payload_json,
stored_payload_hash,
prev_event_hash,
stored_event_hash,
) = row?;
if schema_version != DEFAULT_SCHEMA_V2_TARGET {
continue;
}
let payload = match serde_json::from_str::<Value>(&payload_json) {
Ok(payload) => payload,
Err(err) => {
failures.push(v2_event_framing_failure(
id,
format!("payload_json is not valid JSON: {err}"),
));
continue;
}
};
let expected_payload_hash = payload_hash(&payload);
if stored_payload_hash != expected_payload_hash {
failures.push(v2_event_framing_failure(
&id,
format!(
"payload_hash mismatch under future v2 framing: expected {expected_payload_hash}, found {stored_payload_hash}"
),
));
}
let expected_event_hash =
ledger_event_hash(prev_event_hash.as_deref(), &stored_payload_hash);
if stored_event_hash != expected_event_hash {
failures.push(v2_event_framing_failure(
id,
format!(
"event_hash mismatch under future v2 framing: expected {expected_event_hash}, found {stored_event_hash}"
),
));
}
}
Ok(failures)
}
pub fn default_v2_cutover_readiness_failures(
pool: &Pool,
) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = default_v2_persistence_readiness_failures(pool)?;
failures.extend(default_v2_event_framing_readiness_failures(pool)?);
Ok(failures)
}
pub fn require_default_v2_persistence_readiness(pool: &Pool) -> StoreResult<()> {
let failures = default_v2_persistence_readiness_failures(pool)?;
if failures.is_empty() {
Ok(())
} else {
Err(StoreError::Validation(format!(
"schema v2 default persistence readiness failed: {failures:?}"
)))
}
}
pub fn require_default_v2_cutover_readiness(pool: &Pool) -> StoreResult<()> {
let failures = default_v2_cutover_readiness_failures(pool)?;
if failures.is_empty() {
Ok(())
} else {
Err(StoreError::Validation(format!(
"schema v2 default cutover readiness failed: {failures:?}"
)))
}
}
pub fn apply_expand_backfill_skeleton(
pool: &Pool,
imported_at: DateTime<Utc>,
) -> StoreResult<V2ExpandBackfillReport> {
let schema_version_report = verify_schema_version(pool, cortex_core::SCHEMA_VERSION)?;
if !schema_version_report.is_ok() {
return Err(StoreError::Validation(format!(
"schema v2 expand/backfill preflight refused future-schema rows: {:?}",
schema_version_report.failures
)));
}
let mut report = V2ExpandBackfillReport {
added_columns: Vec::new(),
created_tables: Vec::new(),
legacy_event_attestations_backfilled: 0,
episode_summary_spans_backfilled: 0,
memory_summary_spans_backfilled: 0,
context_pack_advisories_backfilled: 0,
memory_salience_defaults_backfilled: 0,
};
add_column_if_missing(
pool,
&mut report,
"events",
"source_attestation_json",
"ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
)?;
add_column_if_missing(
pool,
&mut report,
"episodes",
"summary_spans_json",
"ALTER TABLE episodes ADD COLUMN summary_spans_json TEXT NULL \
CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
)?;
add_column_if_missing(
pool,
&mut report,
"memories",
"summary_spans_json",
"ALTER TABLE memories ADD COLUMN summary_spans_json TEXT NULL \
CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
)?;
add_column_if_missing(
pool,
&mut report,
"memories",
"cross_session_use_count",
"ALTER TABLE memories ADD COLUMN cross_session_use_count INTEGER NULL \
CHECK (cross_session_use_count IS NULL OR cross_session_use_count >= 0);",
)?;
add_column_if_missing(
pool,
&mut report,
"memories",
"first_used_at",
"ALTER TABLE memories ADD COLUMN first_used_at TEXT NULL;",
)?;
add_column_if_missing(
pool,
&mut report,
"memories",
"last_cross_session_use_at",
"ALTER TABLE memories ADD COLUMN last_cross_session_use_at TEXT NULL;",
)?;
add_column_if_missing(
pool,
&mut report,
"memories",
"last_validation_at",
"ALTER TABLE memories ADD COLUMN last_validation_at TEXT NULL;",
)?;
add_column_if_missing(
pool,
&mut report,
"memories",
"validation_epoch",
"ALTER TABLE memories ADD COLUMN validation_epoch INTEGER NULL \
CHECK (validation_epoch IS NULL OR validation_epoch >= 0);",
)?;
add_column_if_missing(
pool,
&mut report,
"memories",
"blessed_until",
"ALTER TABLE memories ADD COLUMN blessed_until TEXT NULL;",
)?;
add_column_if_missing(
pool,
&mut report,
"context_packs",
"consumer_advisory_json",
"ALTER TABLE context_packs ADD COLUMN consumer_advisory_json TEXT NULL \
CHECK (consumer_advisory_json IS NULL OR json_valid(consumer_advisory_json));",
)?;
create_table_if_missing(
pool,
&mut report,
"memory_session_uses",
"CREATE TABLE memory_session_uses (
memory_id TEXT NOT NULL REFERENCES memories(id),
session_id TEXT NOT NULL,
first_used_at TEXT NOT NULL,
last_used_at TEXT NOT NULL,
use_count INTEGER NOT NULL CHECK (use_count >= 0),
PRIMARY KEY (memory_id, session_id)
);",
)?;
create_table_if_missing(
pool,
&mut report,
"outcome_memory_relations",
"CREATE TABLE outcome_memory_relations (
outcome_ref TEXT NOT NULL,
memory_id TEXT NOT NULL REFERENCES memories(id),
relation TEXT NOT NULL,
recorded_at TEXT NOT NULL,
source_event_id TEXT NULL REFERENCES events(id),
PRIMARY KEY (outcome_ref, memory_id, relation)
);",
)?;
report.legacy_event_attestations_backfilled =
backfill_legacy_event_attestations(pool, imported_at)?;
report.episode_summary_spans_backfilled = backfill_episode_summary_spans(pool)?;
report.memory_summary_spans_backfilled = backfill_memory_summary_spans(pool)?;
report.context_pack_advisories_backfilled = backfill_context_pack_advisories(pool)?;
report.memory_salience_defaults_backfilled = backfill_memory_salience_defaults(pool)?;
Ok(report)
}
pub fn prepare_default_v2_write_shape_for_readiness(
pool: &Pool,
imported_at: DateTime<Utc>,
) -> StoreResult<V2DefaultWriteShapeReadinessReport> {
let expand_backfill = apply_expand_backfill_skeleton(pool, imported_at)?;
let event_rows_marked_v2 = mark_schema_version(pool, "events", DEFAULT_SCHEMA_V2_TARGET)?;
let trace_rows_marked_v2 = mark_schema_version(pool, "traces", DEFAULT_SCHEMA_V2_TARGET)?;
let readiness_failures = default_v2_persistence_readiness_failures(pool)?;
let event_framing_failures = default_v2_event_framing_readiness_failures(pool)?;
Ok(V2DefaultWriteShapeReadinessReport {
expand_backfill,
event_rows_marked_v2,
trace_rows_marked_v2,
readiness_failures,
event_framing_failures,
})
}
fn v2_event_framing_failure(
row_id: impl Into<String>,
detail: impl Into<String>,
) -> SchemaVersionFailure {
SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.event_framing.valid",
detail: format!("events row {} {}", row_id.into(), detail.into()),
}
}
fn observed_schema_versions(pool: &Pool) -> StoreResult<Vec<u16>> {
let mut versions = Vec::new();
for table in ["events", "traces"] {
let sql = format!("SELECT DISTINCT schema_version FROM {table} ORDER BY schema_version;");
let mut stmt = pool.prepare(&sql)?;
let rows = stmt.query_map([], |row| row.get::<_, u16>(0))?;
for row in rows {
let version = row?;
if !versions.contains(&version) {
versions.push(version);
}
}
}
versions.sort_unstable();
Ok(versions)
}
fn table_counts(pool: &Pool) -> StoreResult<Vec<FixtureTableCount>> {
FIXTURE_COUNT_TABLES
.iter()
.map(|table| {
let sql = format!("SELECT COUNT(*) FROM {table};");
let rows = pool.query_row(&sql, [], |row| row.get::<_, u64>(0))?;
Ok(FixtureTableCount { table, rows })
})
.collect()
}
fn event_chain_head(pool: &Pool) -> StoreResult<Option<String>> {
let head = pool
.query_row(
"SELECT e.event_hash
FROM events e
WHERE NOT EXISTS (
SELECT 1 FROM events child WHERE child.prev_event_hash = e.event_hash
)
ORDER BY e.recorded_at DESC, e.id DESC
LIMIT 1;",
[],
|row| row.get::<_, String>(0),
)
.optional()?;
Ok(head)
}
fn applied_migrations(pool: &Pool) -> StoreResult<Vec<String>> {
let mut stmt = pool.prepare("SELECT name FROM _migrations ORDER BY name;")?;
let rows = stmt.query_map(params![], |row| row.get::<_, String>(0))?;
rows.collect::<Result<_, _>>().map_err(Into::into)
}
fn add_column_if_missing(
pool: &Pool,
report: &mut V2ExpandBackfillReport,
table: &'static str,
column: &'static str,
ddl: &str,
) -> StoreResult<()> {
if has_column(pool, table, column)? {
return Ok(());
}
pool.execute_batch(ddl)?;
report.added_columns.push(format!("{table}.{column}"));
Ok(())
}
fn create_table_if_missing(
pool: &Pool,
report: &mut V2ExpandBackfillReport,
table: &'static str,
ddl: &str,
) -> StoreResult<()> {
if has_table(pool, table)? {
return Ok(());
}
pool.execute_batch(ddl)?;
report.created_tables.push(table);
Ok(())
}
fn mark_schema_version(pool: &Pool, table: &'static str, target: u16) -> StoreResult<u64> {
const LEGACY_V1_ROW_SCHEMA_VERSION: u16 = 1;
let sql = format!(
"UPDATE {table}
SET schema_version = ?1
WHERE schema_version = ?2;"
);
let changed = pool.execute(&sql, params![target, LEGACY_V1_ROW_SCHEMA_VERSION])?;
Ok(changed as u64)
}
fn has_table(pool: &Pool, table: &str) -> StoreResult<bool> {
let existing = pool
.query_row(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?1;",
params![table],
|row| row.get::<_, String>(0),
)
.optional()?;
Ok(existing.is_some())
}
fn has_column(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
let sql = format!("PRAGMA table_info({table});");
let mut stmt = pool.prepare(&sql)?;
let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
for found in columns {
if found? == column {
return Ok(true);
}
}
Ok(false)
}
pub fn backfill_legacy_event_attestations(
pool: &Pool,
imported_at: DateTime<Utc>,
) -> StoreResult<u64> {
let changed = pool.execute(
"UPDATE events
SET source_attestation_json = json_object(
'state', 'legacy_unattested',
'value', json_object(
'imported_at', ?1,
'original_recorded_at', recorded_at
)
)
WHERE source_attestation_json IS NULL;",
params![imported_at.to_rfc3339()],
)?;
Ok(changed as u64)
}
fn backfill_episode_summary_spans(pool: &Pool) -> StoreResult<u64> {
let mut stmt = pool.prepare(
"SELECT id, summary, source_events_json
FROM episodes
WHERE summary_spans_json IS NULL
ORDER BY id;",
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
))
})?;
let mut changed = 0;
for row in rows {
let (id, summary, source_events_json) = row?;
let source_events: Value = serde_json::from_str(&source_events_json)?;
let spans = legacy_summary_spans(&summary, source_events);
pool.execute(
"UPDATE episodes SET summary_spans_json = ?1 WHERE id = ?2;",
params![serde_json::to_string(&spans)?, id],
)?;
changed += 1;
}
Ok(changed)
}
fn backfill_memory_summary_spans(pool: &Pool) -> StoreResult<u64> {
let mut stmt = pool.prepare(
"SELECT id, memory_type, claim, source_events_json
FROM memories
WHERE summary_spans_json IS NULL
ORDER BY id;",
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
))
})?;
let mut changed = 0;
for row in rows {
let (id, memory_type, claim, source_events_json) = row?;
let source_events: Value = serde_json::from_str(&source_events_json)?;
let spans = if memory_type.contains("summary") {
legacy_summary_spans(&claim, source_events)
} else {
json!([])
};
pool.execute(
"UPDATE memories SET summary_spans_json = ?1 WHERE id = ?2;",
params![serde_json::to_string(&spans)?, id],
)?;
changed += 1;
}
Ok(changed)
}
fn backfill_context_pack_advisories(pool: &Pool) -> StoreResult<u64> {
let advisory = json!({
"render_trust": "untrusted_rendering",
"execution_trust": "untrusted_execution",
"flags": ["contains_unattested_sources"],
"advisory_text": "Legacy v1 context pack: render as untrusted text; do not execute pack-derived strings."
});
let changed = pool.execute(
"UPDATE context_packs
SET consumer_advisory_json = ?1
WHERE consumer_advisory_json IS NULL;",
params![serde_json::to_string(&advisory)?],
)?;
Ok(changed as u64)
}
fn backfill_memory_salience_defaults(pool: &Pool) -> StoreResult<u64> {
let changed = pool.execute(
"UPDATE memories
SET cross_session_use_count = COALESCE(cross_session_use_count, 0),
validation_epoch = COALESCE(validation_epoch, 0)
WHERE cross_session_use_count IS NULL
OR validation_epoch IS NULL;",
[],
)?;
Ok(changed as u64)
}
fn legacy_summary_spans(text: &str, source_events: Value) -> Value {
if text.trim().is_empty() {
json!([])
} else {
json!([{
"byte_start": 0,
"byte_end": text.len(),
"derived_from_event_ids": source_events,
"max_source_authority": "derived"
}])
}
}
#[cfg(test)]
mod tests {
use rusqlite::Connection;
use super::*;
fn fixture_pool() -> Connection {
let pool = Connection::open_in_memory().expect("open sqlite");
crate::migrate::apply_pending(&pool).expect("apply migrations");
insert_small_v1_fixture(&pool);
pool
}
fn insert_small_v1_fixture(pool: &Pool) {
pool.execute_batch(
r#"
INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES
('evt_s2_001', 1, '2026-05-04T12:00:00Z', '2026-05-04T12:00:01Z',
'{"kind":"tool","name":"fixture"}', 'tool.result', NULL, 's2-fixture',
'["s2"]', '{"step":1}', 'payload-1', NULL, 'event-hash-1'),
('evt_s2_002', 1, '2026-05-04T12:00:02Z', '2026-05-04T12:00:03Z',
'{"kind":"tool","name":"fixture"}', 'tool.result', NULL, 's2-fixture',
'["s2"]', '{"step":2}', 'payload-2', 'event-hash-1', 'event-hash-2');
INSERT INTO traces (
id, schema_version, opened_at, closed_at, trace_type, status
) VALUES (
'trc_s2_001', 1, '2026-05-04T12:00:00Z', NULL, 'migration_fixture', 'open'
);
INSERT INTO trace_events (trace_id, event_id, ordinal) VALUES
('trc_s2_001', 'evt_s2_001', 0),
('trc_s2_001', 'evt_s2_002', 1);
INSERT INTO episodes (
id, trace_id, source_events_json, summary, domains_json, entities_json,
candidate_meaning, extracted_by_json, confidence, status
) VALUES (
'epi_s2_001', 'trc_s2_001', '["evt_s2_001","evt_s2_002"]',
'Small v1 fixture episode.', '["s2"]', '["cortex"]', NULL,
'{"kind":"fixture"}', 0.8, 'candidate'
);
INSERT INTO memories (
id, memory_type, status, claim, source_episodes_json, source_events_json,
domains_json, salience_json, confidence, authority, applies_when_json,
does_not_apply_when_json, created_at, updated_at
) VALUES (
'mem_s2_001', 'semantic', 'candidate', 'S2 fixture exists.',
'["epi_s2_001"]', '["evt_s2_001","evt_s2_002"]', '["s2"]',
'{"score":0.4}', 0.7, 'candidate', '{}', '{}',
'2026-05-04T12:00:04Z', '2026-05-04T12:00:04Z'
);
INSERT INTO context_packs (
id, task, pack_json, selection_audit, created_at
) VALUES (
'ctx_s2_001', 'schema v2 fixture', '{"refs":[]}', 'fixture',
'2026-05-04T12:00:05Z'
);
INSERT INTO audit_records (
id, operation, target_ref, before_hash, after_hash, reason, actor_json,
source_refs_json, created_at
) VALUES (
'aud_s2_001', 'fixture.create', 'mem_s2_001', NULL, 'after',
'small v1 fixture', '{"kind":"test"}', '["evt_s2_001"]',
'2026-05-04T12:00:06Z'
);
"#,
)
.expect("insert small v1 fixture");
}
#[test]
fn dry_run_plan_inspects_small_v1_fixture_without_mutation() {
let pool = fixture_pool();
let before = inspect_v1_fixture(&pool).expect("inspect before dry run");
let plan = dry_run_plan(&pool).expect("dry run plan");
let after = inspect_v1_fixture(&pool).expect("inspect after dry run");
assert_eq!(before, after, "dry run must not mutate the v1 fixture");
assert_eq!(
plan.fixture.event_chain_head.as_deref(),
Some("event-hash-2")
);
assert_eq!(plan.fixture.observed_row_schema_versions, vec![1]);
assert!(plan.steps.contains(&"plan_schema_migration_boundary_event"));
assert_eq!(
plan.fixture.applied_migrations,
vec![
"001_init",
"002_authority_timeline",
"003_schema_v2_expand",
"004_principle_promotion_policy_record",
"005_outcome_relation_scope",
"006_fts5_memories",
"007_embeddings",
"008_decay_jobs",
"009_decay_supersessions",
"010_pending_mcp_commit",
]
);
}
#[test]
fn dry_run_plan_fails_closed_on_future_schema_rows() {
let pool = fixture_pool();
pool.execute(
"UPDATE events SET schema_version = 3 WHERE id = 'evt_s2_002';",
[],
)
.expect("mark one row as future schema");
let plan = dry_run_plan(&pool).expect("dry run plan");
assert!(!plan.is_ready());
assert!(
plan.failures.iter().any(|failure| matches!(
failure,
SchemaVersionFailure::Mismatch {
table: "events",
row_id,
expected: 2,
actual: 3,
} if row_id == "evt_s2_002"
)),
"expected future v3 events row mismatch failure, got: {:?}",
plan.failures
);
}
#[test]
fn expand_backfill_skeleton_is_idempotent_and_keeps_v1_versions() {
let pool = fixture_pool();
let imported_at = "2026-05-04T13:00:00Z".parse().unwrap();
let first =
apply_expand_backfill_skeleton(&pool, imported_at).expect("first expand/backfill pass");
let second = apply_expand_backfill_skeleton(&pool, imported_at)
.expect("second expand/backfill pass");
assert!(first.added_columns.is_empty());
assert!(first.created_tables.is_empty());
assert_eq!(first.legacy_event_attestations_backfilled, 2);
assert_eq!(first.episode_summary_spans_backfilled, 1);
assert_eq!(first.memory_summary_spans_backfilled, 1);
assert_eq!(first.context_pack_advisories_backfilled, 1);
assert_eq!(first.memory_salience_defaults_backfilled, 1);
assert!(second.added_columns.is_empty());
assert!(second.created_tables.is_empty());
assert_eq!(second.legacy_event_attestations_backfilled, 0);
assert_eq!(second.episode_summary_spans_backfilled, 0);
assert_eq!(second.memory_summary_spans_backfilled, 0);
assert_eq!(second.context_pack_advisories_backfilled, 0);
assert_eq!(second.memory_salience_defaults_backfilled, 0);
let plan = dry_run_plan(&pool).expect("expanded v1 fixture still has a dry-run plan");
assert!(plan.is_ready(), "unexpected failures: {:?}", plan.failures);
assert_eq!(plan.fixture.observed_row_schema_versions, vec![1]);
assert_eq!(
plan.fixture.applied_migrations,
vec![
"001_init",
"002_authority_timeline",
"003_schema_v2_expand",
"004_principle_promotion_policy_record",
"005_outcome_relation_scope",
"006_fts5_memories",
"007_embeddings",
"008_decay_jobs",
"009_decay_supersessions",
"010_pending_mcp_commit",
]
);
}
#[test]
fn expand_backfill_skeleton_writes_honest_legacy_markers() {
let pool = fixture_pool();
let imported_at = "2026-05-04T13:00:00Z".parse().unwrap();
apply_expand_backfill_skeleton(&pool, imported_at).expect("expand/backfill");
let source_attestation: serde_json::Value = json_column(
&pool,
"SELECT source_attestation_json FROM events WHERE id = 'evt_s2_001';",
);
assert_eq!(source_attestation["state"], "legacy_unattested");
assert_eq!(
source_attestation["value"]["imported_at"],
"2026-05-04T13:00:00+00:00"
);
assert_eq!(
source_attestation["value"]["original_recorded_at"],
"2026-05-04T12:00:01Z"
);
let episode_spans: serde_json::Value = json_column(
&pool,
"SELECT summary_spans_json FROM episodes WHERE id = 'epi_s2_001';",
);
assert_eq!(episode_spans[0]["byte_start"], 0);
assert_eq!(
episode_spans[0]["byte_end"],
"Small v1 fixture episode.".len()
);
assert_eq!(episode_spans[0]["max_source_authority"], "derived");
assert_eq!(
episode_spans[0]["derived_from_event_ids"],
serde_json::json!(["evt_s2_001", "evt_s2_002"])
);
let memory_spans: serde_json::Value = json_column(
&pool,
"SELECT summary_spans_json FROM memories WHERE id = 'mem_s2_001';",
);
assert_eq!(memory_spans, serde_json::json!([]));
let advisory: serde_json::Value = json_column(
&pool,
"SELECT consumer_advisory_json FROM context_packs WHERE id = 'ctx_s2_001';",
);
assert_eq!(advisory["render_trust"], "untrusted_rendering");
assert_eq!(advisory["execution_trust"], "untrusted_execution");
assert_eq!(
advisory["flags"],
serde_json::json!(["contains_unattested_sources"])
);
let defaults: (u64, u64) = pool
.query_row(
"SELECT cross_session_use_count, validation_epoch
FROM memories WHERE id = 'mem_s2_001';",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.expect("read salience defaults");
assert_eq!(defaults, (0, 0));
}
#[test]
fn expand_backfill_skeleton_fails_closed_before_mutation_on_future_schema_rows() {
let pool = fixture_pool();
pool.execute(
"UPDATE traces SET schema_version = 3 WHERE id = 'trc_s2_001';",
[],
)
.expect("mark trace as future schema");
let err = apply_expand_backfill_skeleton(&pool, "2026-05-04T13:00:00Z".parse().unwrap())
.expect_err("future schema rows must block expand/backfill");
assert!(
err.to_string()
.contains("schema v2 expand/backfill preflight refused future-schema rows"),
"unexpected error: {err}"
);
assert!(has_column(&pool, "events", "source_attestation_json").unwrap());
}
fn json_column(pool: &Pool, sql: &str) -> serde_json::Value {
let raw: String = pool
.query_row(sql, [], |row| row.get(0))
.expect("read json column");
serde_json::from_str(&raw).expect("json column parses")
}
}