use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use clap::{Args, Subcommand};
use cortex_core::{
compose_policy_outcomes, Event, KeyLifecycleState, PolicyContribution, PolicyDecision,
PolicyOutcome, SchemaMigrationV1ToV2Payload, TrustTier, SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND,
};
use cortex_ledger::{
audit::verify_schema_migration_v1_to_v2_boundary, canonical_payload_bytes, verify_chain,
JsonlLog, SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
};
use cortex_store::migrate_v2::{
fixture_verification_result_hash as store_fixture_verification_result_hash,
staged_execution_plan, V2DryRunPlan, V2MigrationStageStatus, SCHEMA_V2_EXPAND_SQL,
};
use cortex_store::repo::{
authority::{key_state_policy_decision_test_allow, principal_state_policy_decision_test_allow},
AuthorityRepo, KeyTimelineRecord, PrincipalTimelineRecord,
};
use cortex_store::Pool;
use ed25519_dalek::{Signature, Signer, Verifier, VerifyingKey};
use serde::Deserialize;
use crate::cmd::open_default_store;
use crate::cmd::temporal::{revalidate_operator_temporal_authority, revalidation_failed_invariant};
use crate::exit::Exit;
use crate::output::{self, Envelope};
use crate::paths::DataLayout;
const MISSING_ATOMIC_CUTOVER_PREREQUISITES: &[&str] = &[];
const PRE_V2_BACKUP_SCHEMA_VERSION: u16 = 1;
const RESERVED_BACKUP_MANIFEST_CUTOVER_APPROVAL_FIELDS: &[&str] = &[
"cutover_approved",
"operator_approval",
"operator_approval_attestation",
"schema_cutover_approved",
"schema_version_cutover_approved",
"default_v2_cutover_ready",
"unattended_migrate",
];
#[derive(Debug, Subcommand)]
pub enum MigrateSub {
V2(MigrateV2Args),
SignOperatorAttestation(SignOperatorAttestationArgs),
SeedDrillOperator(SeedDrillOperatorArgs),
}
#[derive(Debug, Args)]
pub struct SeedDrillOperatorArgs {
#[arg(long, value_name = "STRING")]
pub operator_key_id: String,
#[arg(long, value_name = "STRING")]
pub principal_id: Option<String>,
}
#[derive(Debug, Args)]
pub struct SignOperatorAttestationArgs {
#[arg(long, value_name = "PATH")]
pub output: PathBuf,
#[arg(long, value_name = "PATH")]
pub signing_seed: PathBuf,
#[arg(long, value_name = "STRING", default_value = "cortex-operator")]
pub operator_key_id: String,
}
#[derive(Debug, Args)]
pub struct MigrateV2Args {
#[arg(long)]
pub dry_run: bool,
#[arg(long, value_name = "PATH")]
pub backup_manifest: Option<PathBuf>,
#[arg(long)]
pub unattended_migrate: bool,
#[arg(long, value_name = "PATH")]
pub operator_attestation: Option<PathBuf>,
#[arg(long)]
pub resume_mirror: bool,
}
#[derive(Debug, Deserialize)]
struct BackupManifest {
kind: String,
schema_version: u16,
sqlite_store: String,
jsonl_mirror: String,
tool_version: String,
backup_timestamp: String,
#[serde(default)]
table_row_counts: Option<BackupManifestTableRowCounts>,
}
#[derive(Debug, Clone, Copy, Deserialize)]
struct BackupManifestTableRowCounts {
events: u64,
traces: u64,
episodes: u64,
memories: u64,
}
pub fn run(sub: MigrateSub) -> Exit {
match sub {
MigrateSub::V2(args) => {
let dry_run = args.dry_run;
let exit = run_v2(args);
if output::json_enabled() {
emit_migrate_v2_envelope(dry_run, exit)
} else {
exit
}
}
MigrateSub::SignOperatorAttestation(args) => run_sign_operator_attestation(args),
MigrateSub::SeedDrillOperator(args) => run_seed_drill_operator(args),
}
}
const DRILL_SEED_OPERATOR_ENV_GATE: &str = "CORTEX_DRILL_ALLOW_SEED_OPERATOR";
const SEED_DRILL_OPERATOR_GATE_INVARIANT: &str = "migrate.seed_drill_operator.gate_not_set";
fn run_seed_drill_operator(args: SeedDrillOperatorArgs) -> Exit {
match std::env::var(DRILL_SEED_OPERATOR_ENV_GATE) {
Ok(value) if value == "1" => {}
_ => {
eprintln!(
"cortex migrate seed-drill-operator: {SEED_DRILL_OPERATOR_GATE_INVARIANT}: \
refused; this surface is hard-gated and requires \
{DRILL_SEED_OPERATOR_ENV_GATE}=1 (CI drill harness only). \
no state was changed."
);
return Exit::PreconditionUnmet;
}
}
if args.operator_key_id.trim().is_empty() {
eprintln!(
"cortex migrate seed-drill-operator: refused; --operator-key-id must not be empty. \
no state was changed."
);
return Exit::Usage;
}
let pool = match open_default_store("migrate seed-drill-operator") {
Ok(pool) => pool,
Err(exit) => {
eprintln!("cortex migrate seed-drill-operator: refused; no state was changed.");
return exit;
}
};
let principal_id = args
.principal_id
.clone()
.unwrap_or_else(|| format!("{}-principal", args.operator_key_id));
let effective_at = DateTime::<Utc>::from_timestamp(0, 0).expect("epoch is a valid UTC time");
let repo = AuthorityRepo::new(&pool);
let principal_record = PrincipalTimelineRecord {
principal_id: principal_id.clone(),
trust_tier: TrustTier::Operator,
effective_at,
trust_review_due_at: None,
removed_at: None,
audit_ref: None,
};
if let Err(err) = repo.append_principal_state(
&principal_record,
&principal_state_policy_decision_test_allow(),
) {
eprintln!(
"cortex migrate seed-drill-operator: failed to register principal `{principal_id}`: {err}. \
no state was changed (or only the principal row was inserted)."
);
return Exit::Internal;
}
let key_record = KeyTimelineRecord {
key_id: args.operator_key_id.clone(),
principal_id: principal_id.clone(),
state: KeyLifecycleState::Active,
effective_at,
reason: Some("v1-to-v2-drill: seed-drill-operator surface".into()),
audit_ref: None,
};
if let Err(err) = repo.append_key_state(&key_record, &key_state_policy_decision_test_allow()) {
eprintln!(
"cortex migrate seed-drill-operator: failed to register key `{}` for principal `{principal_id}`: {err}.",
args.operator_key_id,
);
return Exit::Internal;
}
println!("cortex migrate seed-drill-operator: ok");
println!("operator_key_id={}", args.operator_key_id);
println!("principal_id={principal_id}");
println!("trust_tier=operator");
println!("key_state=active");
println!("effective_at={}", effective_at.to_rfc3339());
Exit::Ok
}
fn emit_migrate_v2_envelope(dry_run: bool, exit: Exit) -> Exit {
let transcript = MIGRATE_TRANSCRIPT.with(|cell| std::mem::take(&mut *cell.borrow_mut()));
let payload = serde_json::json!({
"dry_run": dry_run,
"schema_version_target": cortex_core::SCHEMA_VERSION,
"transcript": transcript,
});
let envelope = Envelope::new("cortex.migrate.v2", exit, payload);
output::emit(&envelope, exit)
}
thread_local! {
static MIGRATE_TRANSCRIPT: std::cell::RefCell<Vec<serde_json::Value>> =
const { std::cell::RefCell::new(Vec::new()) };
}
fn record_migrate_event(label: &str, value: serde_json::Value) {
if !output::json_enabled() {
return;
}
MIGRATE_TRANSCRIPT.with(|cell| {
cell.borrow_mut().push(serde_json::json!({
"label": label,
"value": value,
}));
});
}
fn run_sign_operator_attestation(args: SignOperatorAttestationArgs) -> Exit {
let layout = match DataLayout::resolve(None, None) {
Ok(layout) => layout,
Err(exit) => return exit,
};
let pool = match open_default_store("migrate sign-operator-attestation") {
Ok(pool) => pool,
Err(exit) => {
eprintln!("cortex migrate sign-operator-attestation: refused; no state was changed.");
return exit;
}
};
let plan = match cortex_store::migrate_v2::dry_run_plan(&pool) {
Ok(plan) => plan,
Err(err) => {
eprintln!(
"cortex migrate sign-operator-attestation: failed to build dry-run plan: {err}. no state was changed."
);
return Exit::PreconditionUnmet;
}
};
if !plan.is_ready() {
for failure in &plan.failures {
eprintln!(
"cortex migrate sign-operator-attestation: {}: {}",
failure.invariant(),
failure.detail()
);
}
eprintln!(
"cortex migrate sign-operator-attestation: dry-run refused; no envelope was produced."
);
eprintln!(
"cortex migrate sign-operator-attestation: hint: run `cortex migrate v2 \
--backup-manifest <path>` to upgrade, or `cortex doctor --repair` to apply pending \
migrations"
);
return Exit::SchemaMismatch;
}
let boundary = match boundary_preflight(&layout, &plan) {
Ok(boundary) => boundary,
Err(exit) => return exit,
};
let seed = match std::fs::read(&args.signing_seed) {
Ok(seed) => seed,
Err(err) => {
eprintln!(
"cortex migrate sign-operator-attestation: signing seed `{}` could not be read: {err}.",
args.signing_seed.display()
);
return Exit::PreconditionUnmet;
}
};
if seed.len() != 32 {
eprintln!(
"cortex migrate sign-operator-attestation: signing seed `{}` must be exactly 32 bytes; got {}.",
args.signing_seed.display(),
seed.len()
);
return Exit::PreconditionUnmet;
}
let mut seed_array = [0u8; 32];
seed_array.copy_from_slice(&seed);
let signing_key = ed25519_dalek::SigningKey::from_bytes(&seed_array);
let verifying_key = signing_key.verifying_key();
let signed_at = Utc::now();
let signed_at_rfc3339 = signed_at.to_rfc3339();
let envelope_unsigned = OperatorAttestationEnvelopeForSigning {
schema_version: OPERATOR_ATTESTATION_ENVELOPE_SCHEMA_VERSION,
purpose: OPERATOR_ATTESTATION_ENVELOPE_PURPOSE,
operator_key_id: &args.operator_key_id,
signed_at_rfc3339: &signed_at_rfc3339,
previous_v1_head_hash: &boundary.previous_v1_head_hash,
migration_script_digest: &boundary.migration_script_digest,
fixture_verification_result_hash: &boundary.fixture_verification_result_hash,
};
let signing_input = envelope_unsigned.signing_input();
let signature = signing_key.sign(&signing_input);
let envelope_json = serde_json::json!({
"schema_version": OPERATOR_ATTESTATION_ENVELOPE_SCHEMA_VERSION,
"purpose": OPERATOR_ATTESTATION_ENVELOPE_PURPOSE,
"operator_verifying_key_hex": lowercase_hex(verifying_key.as_bytes()),
"operator_key_id": args.operator_key_id,
"signed_at": signed_at_rfc3339,
"boundary": {
"previous_v1_head_hash": boundary.previous_v1_head_hash,
"migration_script_digest": boundary.migration_script_digest,
"fixture_verification_result_hash": boundary.fixture_verification_result_hash,
},
"signature_hex": lowercase_hex(&signature.to_bytes()),
});
let serialized = match serde_json::to_string_pretty(&envelope_json) {
Ok(s) => s,
Err(err) => {
eprintln!(
"cortex migrate sign-operator-attestation: failed to serialise envelope: {err}."
);
return Exit::Internal;
}
};
if let Err(err) = std::fs::write(&args.output, serialized) {
eprintln!(
"cortex migrate sign-operator-attestation: failed to write `{}`: {err}.",
args.output.display()
);
return Exit::Internal;
}
println!("cortex migrate sign-operator-attestation: ok");
println!("operator_attestation_path={}", args.output.display());
println!("operator_key_id={}", args.operator_key_id);
println!("signed_at={signed_at_rfc3339}");
println!(
"boundary_previous_v1_head_hash={}",
boundary.previous_v1_head_hash
);
println!(
"migration_script_digest={}",
boundary.migration_script_digest
);
println!(
"fixture_verification_result_hash={}",
boundary.fixture_verification_result_hash
);
Exit::Ok
}
struct OperatorAttestationEnvelopeForSigning<'a> {
schema_version: u16,
purpose: &'a str,
operator_key_id: &'a str,
signed_at_rfc3339: &'a str,
previous_v1_head_hash: &'a str,
migration_script_digest: &'a str,
fixture_verification_result_hash: &'a str,
}
impl OperatorAttestationEnvelopeForSigning<'_> {
fn signing_input(&self) -> Vec<u8> {
const DOMAIN_TAG_OPERATOR_ATTESTATION_MIGRATION: u8 = 0x20;
let mut out = Vec::new();
out.push(DOMAIN_TAG_OPERATOR_ATTESTATION_MIGRATION);
out.extend_from_slice(&self.schema_version.to_be_bytes());
push_lp(&mut out, self.purpose.as_bytes());
push_lp(&mut out, self.operator_key_id.as_bytes());
push_lp(&mut out, self.signed_at_rfc3339.as_bytes());
push_lp(&mut out, self.previous_v1_head_hash.as_bytes());
push_lp(&mut out, self.migration_script_digest.as_bytes());
push_lp(&mut out, self.fixture_verification_result_hash.as_bytes());
out
}
}
fn lowercase_hex(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
out.push_str(&format!("{b:02x}"));
}
out
}
fn run_v2(args: MigrateV2Args) -> Exit {
if args.resume_mirror {
if args.dry_run
|| args.backup_manifest.is_some()
|| args.unattended_migrate
|| args.operator_attestation.is_some()
{
eprintln!(
"cortex migrate v2: --resume-mirror is a partial-mutation recovery surface and cannot be combined with --dry-run, --backup-manifest, --unattended-migrate, or --operator-attestation. no state was changed."
);
return Exit::Usage;
}
return run_v2_resume_mirror();
}
if args.unattended_migrate && args.operator_attestation.is_none() {
eprintln!(
"cortex migrate v2: --unattended-migrate requires --operator-attestation <PATH> with a valid Ed25519-signed operator attestation envelope (ADR 0010 §1-§2). Operator attestation cannot be substituted by --unattended-migrate alone. no state was changed."
);
emit_cutover_readiness_stderr();
return Exit::PreconditionUnmet;
}
if !args.dry_run {
let Some(backup_manifest) = args.backup_manifest.as_deref() else {
eprintln!(
"cortex migrate v2: cutover requires --backup-manifest <path> pointing at a blessed pre-v2 backup. rerun with --dry-run to preview, or supply a backup manifest emitted by `cortex backup --output`. no state was changed."
);
emit_cutover_readiness_stderr();
return Exit::Usage;
};
let backup_manifest = match validate_backup_manifest(backup_manifest) {
Ok(manifest) => manifest,
Err(exit) => return exit,
};
let layout = match DataLayout::resolve(None, None) {
Ok(layout) => layout,
Err(exit) => return exit,
};
let mut pool = match open_default_store("migrate v2") {
Ok(pool) => pool,
Err(exit) => {
eprintln!("cortex migrate v2: full migration refused; no state was changed.");
return exit;
}
};
if let Err(exit) =
enforce_backup_manifest_consistent_with_live_store(&pool, &backup_manifest.path)
{
return exit;
}
let stage_plan = match staged_execution_plan(&pool) {
Ok(plan) => plan,
Err(err) => {
eprintln!(
"cortex migrate v2: failed to build staged execution plan: {err}. no state was changed."
);
return Exit::PreconditionUnmet;
}
};
if !stage_plan.dry_run.is_ready() {
for failure in &stage_plan.dry_run.failures {
eprintln!(
"cortex migrate v2: {}: {}",
failure.invariant(),
failure.detail()
);
}
eprintln!("cortex migrate v2: full migration refused; no state was changed.");
eprintln!(
"cortex migrate v2: hint: run `cortex doctor --repair` to apply pending \
migrations, or retry `cortex migrate v2 --backup-manifest <path>` once the \
store preconditions are satisfied"
);
return Exit::SchemaMismatch;
}
let boundary = match boundary_preflight(&layout, &stage_plan.dry_run) {
Ok(boundary) => boundary,
Err(exit) => return exit,
};
let migration_policy = match build_migration_policy_decision(
&pool,
args.operator_attestation.as_deref(),
&boundary,
) {
Ok(decision) => decision,
Err(exit) => return exit,
};
for stage in &stage_plan.stages {
eprintln!(
"cortex migrate v2: stage={} status={} mutates_store={} enables_cutover={}",
stage.name,
stage_status_label(stage.status),
stage.mutates_store,
stage.enables_cutover
);
}
let migrate_timestamp = Utc::now();
let cutover_outcome = run_v2_cutover_tx(
&mut pool,
&layout,
&boundary,
&migration_policy,
&backup_manifest.table_row_counts,
&PostMigrateManifestInputs {
pre_v2_backup_manifest_path: backup_manifest.path.clone(),
migrate_timestamp,
},
);
let CutoverTxOutcome {
expand_report,
boundary_head,
post_migrate_manifest_blake3,
post_migrate_manifest_body,
} = match cutover_outcome {
Ok(outcome) => outcome,
Err(exit) => return exit,
};
eprintln!(
"cortex migrate v2: expand_backfill_added_columns={}",
expand_report.added_columns.len()
);
for table in &expand_report.created_tables {
eprintln!("cortex migrate v2: expand_backfill_created_table={table}");
}
eprintln!(
"cortex migrate v2: legacy_event_attestations_backfilled={}",
expand_report.legacy_event_attestations_backfilled
);
eprintln!(
"cortex migrate v2: boundary_previous_v1_head_hash={}",
boundary.previous_v1_head_hash
);
eprintln!("cortex migrate v2: boundary_event_hash={boundary_head}");
eprintln!("cortex migrate v2: boundary_event_kind={SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND}");
eprintln!("cortex migrate v2: boundary_audit=ok");
eprintln!("cortex migrate v2: post_migrate_mixed_chain_audit=ok");
eprintln!("cortex migrate v2: post_cutover_audit_dispatch=available");
eprintln!(
"cortex migrate v2: post_migrate_row_count_refusal=ok pre_events={} pre_traces={} pre_episodes={} pre_memories={}",
backup_manifest.table_row_counts.events,
backup_manifest.table_row_counts.traces,
backup_manifest.table_row_counts.episodes,
backup_manifest.table_row_counts.memories,
);
eprintln!("cortex migrate v2: cutover_authority=ok");
eprintln!("cortex migrate v2: cutover_approved=true");
eprintln!("cortex migrate v2: cutover_guard=committed");
emit_cutover_readiness_stderr();
eprintln!(
"cortex migrate v2: backup_manifest_table_row_counts events={} traces={} episodes={} memories={}",
backup_manifest.table_row_counts.events,
backup_manifest.table_row_counts.traces,
backup_manifest.table_row_counts.episodes,
backup_manifest.table_row_counts.memories,
);
let post_manifest_path = backup_manifest
.path
.with_file_name("POST_V2_MIGRATE_MANIFEST");
let mut post_manifest = post_migrate_manifest_body;
if let Some(obj) = post_manifest.as_object_mut() {
obj.insert(
POST_MIGRATE_MANIFEST_DIGEST_FIELD.to_string(),
serde_json::Value::String(post_migrate_manifest_blake3.clone()),
);
} else {
eprintln!(
"cortex migrate v2: internal error — post-migrate manifest body is not a JSON object; refusing to write tamper-evident manifest."
);
return Exit::Internal;
}
if let Err(err) = std::fs::write(
&post_manifest_path,
serde_json::to_string_pretty(&post_manifest).unwrap_or_default(),
) {
eprintln!(
"cortex migrate v2: failed to write post-migrate manifest `{}`: {err}.",
post_manifest_path.display()
);
return Exit::Internal;
}
eprintln!(
"cortex migrate v2: post_migrate_manifest={}",
post_manifest_path.display()
);
eprintln!("cortex migrate v2: post_migrate_manifest_blake3={post_migrate_manifest_blake3}");
eprintln!("cortex migrate v2: {POST_MIGRATE_MANIFEST_CHAIN_ANCHORED_INVARIANT}=ok");
eprintln!(
"cortex migrate v2: schema cutover complete. SCHEMA_VERSION={} active. New writes are v2-only; v1 binaries opening this store will refuse with Exit::SchemaMismatch. Rollback is restore-from-blessed-pre-v2-backup (ADR 0033 §3); divergent migrations use ADR 0033 §4 fork isolation.",
cortex_core::SCHEMA_VERSION
);
record_migrate_event(
"cutover_summary",
serde_json::json!({
"schema_version_active": cortex_core::SCHEMA_VERSION,
"boundary_event_hash": boundary_head,
"boundary_previous_v1_head_hash": boundary.previous_v1_head_hash,
"migration_script_digest": boundary.migration_script_digest,
"fixture_verification_result_hash": boundary.fixture_verification_result_hash,
"post_migrate_manifest": post_manifest_path.display().to_string(),
"post_migrate_manifest_blake3": post_migrate_manifest_blake3,
"cutover_authority": "ok",
"cutover_approved": true,
"cutover_guard": "committed",
"post_migrate_row_count_refusal": "ok",
}),
);
return Exit::Ok;
}
if let Some(backup_manifest) = args.backup_manifest.as_deref() {
eprintln!(
"cortex migrate v2: --backup-manifest `{}` is for the future mutating path and is not accepted with --dry-run. no state was changed.",
backup_manifest.display()
);
return Exit::Usage;
}
let layout = match DataLayout::resolve(None, None) {
Ok(layout) => layout,
Err(exit) => return exit,
};
let pool = match open_default_store("migrate v2") {
Ok(pool) => pool,
Err(exit) => {
eprintln!("cortex migrate v2: dry-run refused; no state was changed.");
return exit;
}
};
let plan = match cortex_store::migrate_v2::dry_run_plan(&pool) {
Ok(plan) => plan,
Err(err) => {
eprintln!(
"cortex migrate v2: failed to build dry-run plan: {err}. no state was changed."
);
return Exit::PreconditionUnmet;
}
};
if !plan.is_ready() {
for failure in &plan.failures {
eprintln!(
"cortex migrate v2: {}: {}",
failure.invariant(),
failure.detail()
);
}
eprintln!("cortex migrate v2: dry-run refused; no state was changed.");
eprintln!(
"cortex migrate v2: hint: run `cortex doctor --repair` to apply pending migrations, \
or retry `cortex migrate v2 --backup-manifest <path>` once the store preconditions \
are satisfied"
);
return Exit::SchemaMismatch;
}
let boundary = match boundary_preflight(&layout, &plan) {
Ok(boundary) => boundary,
Err(exit) => return exit,
};
let json = output::json_enabled();
if !json {
println!("cortex migrate v2: dry-run ok");
println!(
"expected_schema_version={}",
plan.fixture.expected_schema_version
);
println!(
"observed_row_schema_versions={:?}",
plan.fixture.observed_row_schema_versions
);
match &plan.fixture.event_chain_head {
Some(head) => println!("event_chain_head={head}"),
None => println!("event_chain_head=<none>"),
}
println!("jsonl_event_chain_head={}", boundary.previous_v1_head_hash);
for migration in &plan.fixture.applied_migrations {
println!("applied_migration={migration}");
}
for count in &plan.fixture.table_counts {
println!("table={} rows={}", count.table, count.rows);
}
for step in &plan.steps {
println!("step={step}");
}
println!("boundary_event_kind={SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND}");
println!("schema_version_target=2");
println!(
"boundary_previous_v1_head_hash={}",
boundary.previous_v1_head_hash
);
println!(
"migration_script_digest={}",
boundary.migration_script_digest
);
println!(
"fixture_verification_result_hash={}",
boundary.fixture_verification_result_hash
);
println!("operator_attestation_mode=dry_run_not_collected");
println!("boundary_preflight_ready=true");
println!("cutover_authority=ok");
println!("cutover_approved=false");
println!("cutover_guard=requires_backup_manifest");
emit_cutover_readiness_stdout();
println!("no state was changed");
} else {
record_migrate_event(
"dry_run_summary",
serde_json::json!({
"expected_schema_version": plan.fixture.expected_schema_version,
"observed_row_schema_versions": plan.fixture.observed_row_schema_versions,
"event_chain_head": plan.fixture.event_chain_head,
"jsonl_event_chain_head": boundary.previous_v1_head_hash,
"applied_migrations": plan.fixture.applied_migrations,
"table_counts": plan
.fixture
.table_counts
.iter()
.map(|count| serde_json::json!({
"table": count.table,
"rows": count.rows,
}))
.collect::<Vec<_>>(),
"steps": plan.steps,
"boundary_event_kind": SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND,
"boundary_previous_v1_head_hash": boundary.previous_v1_head_hash,
"migration_script_digest": boundary.migration_script_digest,
"fixture_verification_result_hash": boundary.fixture_verification_result_hash,
"operator_attestation_mode": "dry_run_not_collected",
"boundary_preflight_ready": true,
"cutover_authority": "ok",
"cutover_approved": false,
"cutover_guard": "requires_backup_manifest",
}),
);
}
Exit::Ok
}
struct CutoverTxOutcome {
expand_report: cortex_store::migrate_v2::V2ExpandBackfillReport,
boundary_head: String,
post_migrate_manifest_blake3: String,
post_migrate_manifest_body: serde_json::Value,
}
enum CutoverTxError {
ExpandBackfill(cortex_store::StoreError),
JsonlAppend(cortex_ledger::JsonlError),
BoundaryMirror(cortex_store::StoreError),
RebackfillAttestations(cortex_store::StoreError),
BoundaryAuditFailures(Vec<cortex_ledger::audit::SchemaMigrationBoundaryFailure>),
BoundaryAuditError(cortex_ledger::JsonlError),
ChainAuditFailures(usize),
ChainAuditError(cortex_ledger::JsonlError),
PostMigrateCountMismatch(Vec<cortex_store::verify::PostMigrateCountMismatchFailure>),
PostMigrateCountError(cortex_store::StoreError),
CutoverReadiness(cortex_store::StoreError),
TxBegin(cortex_store::StoreError),
TxCommit(rusqlite::Error),
PostMigrateManifestAnchor(rusqlite::Error),
}
fn run_v2_cutover_tx(
pool: &mut cortex_store::Pool,
layout: &DataLayout,
boundary: &BoundaryPreflight,
migration_policy: &cortex_core::PolicyDecision,
pre_counts_payload: &BackupManifestTableRowCounts,
manifest_inputs: &PostMigrateManifestInputs,
) -> Result<CutoverTxOutcome, Exit> {
let tx = match pool.transaction() {
Ok(tx) => tx,
Err(err) => {
let err = cortex_store::StoreError::from(err);
return Err(emit_cutover_tx_failure(
CutoverTxError::TxBegin(err),
boundary,
));
}
};
let expand_report =
match cortex_store::migrate_v2::apply_expand_backfill_skeleton(&tx, Utc::now()) {
Ok(report) => report,
Err(err) => {
return Err(emit_cutover_tx_failure(
CutoverTxError::ExpandBackfill(err),
boundary,
));
}
};
let (boundary_head, boundary_event) =
match JsonlLog::open(&layout.event_log_path).and_then(|mut log| {
log.append_schema_migration_v1_to_v2_with_event(
SchemaMigrationV1ToV2Payload::new(
boundary.previous_v1_head_hash.clone(),
boundary.migration_script_digest.clone(),
None,
boundary.fixture_verification_result_hash.clone(),
),
migration_policy,
)
}) {
Ok((head, event)) => (head, event),
Err(err) => {
return Err(emit_cutover_tx_failure(
CutoverTxError::JsonlAppend(err),
boundary,
));
}
};
if let Err(err) =
cortex_store::mirror::mirror_single_event_into_sqlite_in_tx(&tx, &boundary_event)
{
return Err(emit_cutover_tx_failure(
CutoverTxError::BoundaryMirror(err),
boundary,
));
}
if let Err(err) = cortex_store::migrate_v2::backfill_legacy_event_attestations(&tx, Utc::now())
{
return Err(emit_cutover_tx_failure(
CutoverTxError::RebackfillAttestations(err),
boundary,
));
}
let post_migrate_manifest_body = build_post_migrate_manifest_body(
&boundary_head,
boundary,
pre_counts_payload,
manifest_inputs,
);
let post_migrate_manifest_blake3 = canonical_blake3_hex(&post_migrate_manifest_body);
if let Err(err) = tx.execute(
"UPDATE events
SET source_attestation_json = json_set(
source_attestation_json,
'$.post_migrate_manifest_blake3',
?1
)
WHERE id = ?2;",
rusqlite::params![
post_migrate_manifest_blake3.as_str(),
boundary_event.id.to_string(),
],
) {
return Err(emit_cutover_tx_failure(
CutoverTxError::PostMigrateManifestAnchor(err),
boundary,
));
}
match verify_schema_migration_v1_to_v2_boundary(&layout.event_log_path, true) {
Ok(report) if report.ok() => {}
Ok(report) => {
return Err(emit_cutover_tx_failure(
CutoverTxError::BoundaryAuditFailures(report.failures),
boundary,
));
}
Err(err) => {
return Err(emit_cutover_tx_failure(
CutoverTxError::BoundaryAuditError(err),
boundary,
));
}
}
match verify_chain(&layout.event_log_path) {
Ok(report) if report.ok() => {}
Ok(report) => {
return Err(emit_cutover_tx_failure(
CutoverTxError::ChainAuditFailures(report.failures.len()),
boundary,
));
}
Err(err) => {
return Err(emit_cutover_tx_failure(
CutoverTxError::ChainAuditError(err),
boundary,
));
}
}
let pre_counts = cortex_store::verify::PreV2BackupRowCounts {
events: pre_counts_payload.events,
traces: pre_counts_payload.traces,
episodes: pre_counts_payload.episodes,
memories: pre_counts_payload.memories,
};
match cortex_store::verify::verify_post_migrate_row_counts(&tx, &pre_counts) {
Ok(failures) if failures.is_empty() => {}
Ok(failures) => {
return Err(emit_cutover_tx_failure(
CutoverTxError::PostMigrateCountMismatch(failures),
boundary,
));
}
Err(err) => {
return Err(emit_cutover_tx_failure(
CutoverTxError::PostMigrateCountError(err),
boundary,
));
}
}
if let Err(err) = cortex_store::migrate_v2::require_default_v2_cutover_readiness(&tx) {
return Err(emit_cutover_tx_failure(
CutoverTxError::CutoverReadiness(err),
boundary,
));
}
if let Err(err) = tx.commit() {
return Err(emit_cutover_tx_failure(
CutoverTxError::TxCommit(err),
boundary,
));
}
Ok(CutoverTxOutcome {
expand_report,
boundary_head,
post_migrate_manifest_blake3,
post_migrate_manifest_body,
})
}
pub(crate) struct PostMigrateManifestInputs {
pub(crate) pre_v2_backup_manifest_path: PathBuf,
pub(crate) migrate_timestamp: DateTime<Utc>,
}
pub(crate) const POST_MIGRATE_MANIFEST_DIGEST_FIELD: &str = "manifest_blake3";
pub(crate) const POST_MIGRATE_MANIFEST_ENVELOPE_VERSION: u16 = 1;
pub(crate) const POST_MIGRATE_MANIFEST_CHAIN_ANCHORED_INVARIANT: &str =
"migrate.post_manifest.chain_anchor.appended";
fn build_post_migrate_manifest_body(
boundary_head: &str,
boundary: &BoundaryPreflight,
pre_counts: &BackupManifestTableRowCounts,
inputs: &PostMigrateManifestInputs,
) -> serde_json::Value {
serde_json::json!({
"kind": "cortex_post_v2_migrate",
"manifest_envelope_version": POST_MIGRATE_MANIFEST_ENVELOPE_VERSION,
"schema_version": cortex_core::SCHEMA_VERSION,
"migrate_timestamp": inputs.migrate_timestamp.to_rfc3339(),
"boundary_event_hash": boundary_head,
"boundary_previous_v1_head_hash": boundary.previous_v1_head_hash,
"migration_script_digest": boundary.migration_script_digest,
"fixture_verification_result_hash": boundary.fixture_verification_result_hash,
"pre_v2_backup_manifest": inputs.pre_v2_backup_manifest_path.display().to_string(),
"pre_v2_table_row_counts": {
"events": pre_counts.events,
"traces": pre_counts.traces,
"episodes": pre_counts.episodes,
"memories": pre_counts.memories,
},
"schema_v1_to_v2_event_boundary_delta":
cortex_store::verify::SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA,
"tool_version": env!("CARGO_PKG_VERSION"),
"runtime_mode": cortex_core::RuntimeMode::LocalUnsigned,
"proof_state": cortex_core::ClaimProofState::FullChainVerified,
"claim_ceiling": cortex_core::ClaimCeiling::LocalUnsigned,
"authority_class": cortex_core::AuthorityClass::Verified,
})
}
pub(crate) fn canonical_blake3_hex(value: &serde_json::Value) -> String {
let bytes = canonical_payload_bytes(value);
format!("blake3:{}", blake3::hash(&bytes).to_hex())
}
const CUTOVER_TX_ROLLBACK_INVARIANT: &str = "migrate.v2.cutover_tx.rolled_back";
fn emit_cutover_tx_failure(err: CutoverTxError, boundary: &BoundaryPreflight) -> Exit {
match err {
CutoverTxError::TxBegin(err) => {
eprintln!(
"cortex migrate v2: failed to open SQLite cutover transaction: {err}. cutover refused; no state was changed."
);
Exit::PreconditionUnmet
}
CutoverTxError::ExpandBackfill(err) => {
eprintln!(
"cortex migrate v2: expand/backfill stage failed inside cutover transaction: {err}. boundary event was not appended; cutover remains disabled."
);
emit_cutover_tx_rollback_line(boundary);
Exit::PreconditionUnmet
}
CutoverTxError::JsonlAppend(err) => {
eprintln!(
"cortex migrate v2: boundary append failed: {err}. post-migration audit and cutover remain disabled."
);
emit_cutover_tx_rollback_line(boundary);
Exit::PreconditionUnmet
}
CutoverTxError::BoundaryMirror(err) => {
eprintln!(
"cortex migrate v2: boundary mirror into SQLite failed after JSONL append: {err}. cutover refused; the JSONL boundary row is durable — run `cortex migrate v2 --resume-mirror` to re-mirror it into SQLite, or restore from blessed pre-v2 backup."
);
emit_cutover_tx_rollback_line(boundary);
Exit::IntegrityFailure
}
CutoverTxError::RebackfillAttestations(err) => {
eprintln!(
"cortex migrate v2: boundary attestation backfill failed: {err}. cutover refused."
);
emit_cutover_tx_rollback_line(boundary);
Exit::IntegrityFailure
}
CutoverTxError::BoundaryAuditFailures(failures) => {
for failure in &failures {
eprintln!(
"cortex migrate v2: {}: {:?}",
failure.invariant, failure.detail
);
}
eprintln!(
"cortex migrate v2: boundary audit failed after append; post-migration audit and cutover remain disabled."
);
eprintln!(
"cortex migrate v2: hint: the JSONL boundary row was appended but failed \
validation; run `cortex migrate v2 --resume-mirror` to retry the cutover, or \
restore from the blessed pre-v2 backup"
);
emit_cutover_tx_rollback_line(boundary);
Exit::SchemaMismatch
}
CutoverTxError::BoundaryAuditError(err) => {
eprintln!(
"cortex migrate v2: boundary audit failed after append: {err}. post-migration audit and cutover remain disabled."
);
emit_cutover_tx_rollback_line(boundary);
Exit::PreconditionUnmet
}
CutoverTxError::ChainAuditFailures(count) => {
eprintln!(
"cortex migrate v2: staged post-migration audit found {count} hash-chain failure(s); cutover remains disabled."
);
emit_cutover_tx_rollback_line(boundary);
Exit::IntegrityFailure
}
CutoverTxError::ChainAuditError(err) => {
eprintln!(
"cortex migrate v2: staged post-migration audit failed: {err}. cutover remains disabled."
);
emit_cutover_tx_rollback_line(boundary);
Exit::PreconditionUnmet
}
CutoverTxError::PostMigrateCountMismatch(failures) => {
for failure in &failures {
eprintln!(
"cortex migrate v2: {}: {}",
failure.invariant(),
failure.detail()
);
}
eprintln!(
"cortex migrate v2: post-migrate row counts drifted from the pre-v2 backup manifest baseline. Restore from the blessed pre-v2 backup or use ADR 0033 §4 fork isolation; in-place down-migration is forbidden."
);
emit_cutover_tx_rollback_line(boundary);
Exit::IntegrityFailure
}
CutoverTxError::PostMigrateCountError(err) => {
eprintln!(
"cortex migrate v2: post-migrate row-count verification failed: {err}. Cutover refused; restore from blessed pre-v2 backup."
);
emit_cutover_tx_rollback_line(boundary);
Exit::PreconditionUnmet
}
CutoverTxError::CutoverReadiness(err) => {
eprintln!(
"cortex migrate v2: default-v2 cutover readiness gate failed: {err}. Cutover refused; restore from blessed pre-v2 backup."
);
eprintln!(
"cortex migrate v2: hint: run `cortex migrate v2 --backup-manifest <path>` to \
reattempt the upgrade, or restore from the blessed pre-v2 backup if the store \
is unrecoverable"
);
emit_cutover_tx_rollback_line(boundary);
Exit::SchemaMismatch
}
CutoverTxError::TxCommit(err) => {
eprintln!(
"cortex migrate v2: cutover transaction commit failed: {err}. cutover refused; rollback applied — no SQLite mutation is durable. If the JSONL boundary row appended successfully, run `cortex migrate v2 --resume-mirror` to re-mirror it into SQLite; otherwise restore from blessed pre-v2 backup."
);
emit_cutover_tx_rollback_line(boundary);
Exit::IntegrityFailure
}
CutoverTxError::PostMigrateManifestAnchor(err) => {
eprintln!(
"cortex migrate v2: failed to anchor post-migrate manifest BLAKE3 digest onto the boundary row's source_attestation_json column: {err}. cutover refused; rollback applied (Decision #6 / RED_TEAM_FINDINGS D2)."
);
emit_cutover_tx_rollback_line(boundary);
Exit::IntegrityFailure
}
}
}
fn emit_cutover_tx_rollback_line(boundary: &BoundaryPreflight) {
eprintln!(
"cortex migrate v2: {CUTOVER_TX_ROLLBACK_INVARIANT}: SQLite cutover transaction rolled back; pre-cutover SQLite snapshot is intact (previous_v1_head_hash={}).",
boundary.previous_v1_head_hash
);
}
const RESUME_MIRROR_COMPLETED_INVARIANT: &str = "migrate.v2.resume_mirror.completed";
const RESUME_MIRROR_BOUNDARY_ALREADY_MIRRORED_INVARIANT: &str =
"migrate.v2.resume_mirror.boundary_already_mirrored";
const RESUME_MIRROR_NO_JSONL_BOUNDARY_INVARIANT: &str =
"migrate.v2.resume_mirror.no_jsonl_boundary";
const RESUME_MIRROR_SCHEMA_SKELETON_REPLAY_INVARIANT: &str =
"migrate.v2.resume_mirror.schema_skeleton_replayed";
fn run_v2_resume_mirror() -> Exit {
let layout = match DataLayout::resolve(None, None) {
Ok(layout) => layout,
Err(exit) => return exit,
};
let mut pool = match open_default_store("migrate v2 --resume-mirror") {
Ok(pool) => pool,
Err(exit) => {
eprintln!("cortex migrate v2 --resume-mirror: resume refused; no state was changed.");
return exit;
}
};
let boundary_report = match verify_schema_migration_v1_to_v2_boundary(
&layout.event_log_path,
true,
) {
Ok(report) => report,
Err(err) => {
eprintln!(
"cortex migrate v2 --resume-mirror: failed to inspect JSONL boundary: {err}. no state was changed."
);
return Exit::PreconditionUnmet;
}
};
if boundary_report.boundary_rows.is_empty() {
eprintln!(
"cortex migrate v2 --resume-mirror: {RESUME_MIRROR_NO_JSONL_BOUNDARY_INVARIANT}: JSONL log `{}` does not carry a schema_migration.v1_to_v2 boundary row. --resume-mirror is the wrong recovery surface; if no cutover has run, use `cortex migrate v2 --backup-manifest <PATH> --operator-attestation <PATH>`; if both stores were rolled back from a blessed pre-v2 backup, no recovery is needed. no state was changed.",
layout.event_log_path.display()
);
return Exit::PreconditionUnmet;
}
if !boundary_report.ok() {
for failure in &boundary_report.failures {
eprintln!(
"cortex migrate v2 --resume-mirror: {}: {:?}",
failure.invariant, failure.detail
);
}
eprintln!(
"cortex migrate v2 --resume-mirror: JSONL boundary audit failed; resume refused. no state was changed."
);
eprintln!(
"cortex migrate v2 --resume-mirror: hint: if the boundary row is structurally \
corrupt, restore from the blessed pre-v2 backup; if this is a fresh v2 store with \
no boundary, run `cortex migrate v2 --backup-manifest <path>` instead"
);
return Exit::SchemaMismatch;
}
let boundary_event = match load_boundary_event(&layout.event_log_path) {
Ok(event) => event,
Err(exit) => return exit,
};
match boundary_row_present_in_sqlite(&pool, &boundary_event.event_hash) {
Ok(true) => {
eprintln!(
"cortex migrate v2 --resume-mirror: {RESUME_MIRROR_BOUNDARY_ALREADY_MIRRORED_INVARIANT}: SQLite already carries the schema_migration.v1_to_v2 boundary row at event_hash={}. resume is a no-op; no state was changed.",
boundary_event.event_hash
);
record_migrate_event(
"resume_mirror_summary",
serde_json::json!({
"invariant": RESUME_MIRROR_BOUNDARY_ALREADY_MIRRORED_INVARIANT,
"boundary_event_hash": boundary_event.event_hash,
"mirrored": false,
"no_op": true,
}),
);
return Exit::Ok;
}
Ok(false) => {}
Err(err) => {
eprintln!(
"cortex migrate v2 --resume-mirror: failed to probe SQLite for boundary row: {err}. no state was changed."
);
return Exit::PreconditionUnmet;
}
}
let tx = match pool.transaction() {
Ok(tx) => tx,
Err(err) => {
eprintln!(
"cortex migrate v2 --resume-mirror: failed to open SQLite recovery transaction: {err}. no state was changed."
);
return Exit::PreconditionUnmet;
}
};
let expand_report = match cortex_store::migrate_v2::apply_expand_backfill_skeleton(
&tx,
Utc::now(),
) {
Ok(report) => report,
Err(err) => {
eprintln!(
"cortex migrate v2 --resume-mirror: schema v2 expand/backfill replay failed inside recovery transaction: {err}. transaction rolled back; no state was changed."
);
return Exit::IntegrityFailure;
}
};
if !expand_report.added_columns.is_empty() || !expand_report.created_tables.is_empty() {
eprintln!(
"cortex migrate v2 --resume-mirror: {RESUME_MIRROR_SCHEMA_SKELETON_REPLAY_INVARIANT}: cutover transaction rolled back the schema v2 expand DDL; replayed inside recovery transaction. added_columns={:?} created_tables={:?}",
expand_report.added_columns,
expand_report.created_tables
);
}
if let Err(err) =
cortex_store::mirror::mirror_single_event_into_sqlite_in_tx(&tx, &boundary_event)
{
eprintln!(
"cortex migrate v2 --resume-mirror: boundary mirror INSERT failed inside recovery transaction: {err}. transaction rolled back; no state was changed."
);
return Exit::IntegrityFailure;
}
if let Err(err) = cortex_store::migrate_v2::backfill_legacy_event_attestations(&tx, Utc::now())
{
eprintln!(
"cortex migrate v2 --resume-mirror: legacy attestation backfill failed inside recovery transaction: {err}. transaction rolled back; no state was changed."
);
return Exit::IntegrityFailure;
}
if let Err(err) = cortex_store::migrate_v2::require_default_v2_cutover_readiness(&tx) {
eprintln!(
"cortex migrate v2 --resume-mirror: default-v2 cutover readiness gate failed inside recovery transaction: {err}. transaction rolled back; no state was changed."
);
eprintln!(
"cortex migrate v2 --resume-mirror: hint: run `cortex migrate v2 \
--backup-manifest <path>` to reattempt the upgrade from scratch, or restore from \
the blessed pre-v2 backup"
);
return Exit::SchemaMismatch;
}
if let Err(err) = tx.commit() {
eprintln!(
"cortex migrate v2 --resume-mirror: recovery transaction commit failed: {err}. no SQLite mutation is durable; no state was changed."
);
return Exit::IntegrityFailure;
}
eprintln!(
"cortex migrate v2 --resume-mirror: {RESUME_MIRROR_COMPLETED_INVARIANT}: boundary mirrored into SQLite and readiness gate green. boundary_event_hash={} jsonl_path={}",
boundary_event.event_hash,
layout.event_log_path.display()
);
record_migrate_event(
"resume_mirror_summary",
serde_json::json!({
"invariant": RESUME_MIRROR_COMPLETED_INVARIANT,
"boundary_event_hash": boundary_event.event_hash,
"boundary_event_kind": SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND,
"mirrored": true,
"no_op": false,
"schema_skeleton_replayed": !expand_report.added_columns.is_empty()
|| !expand_report.created_tables.is_empty(),
"schema_skeleton_added_columns": expand_report.added_columns,
"schema_skeleton_created_tables": expand_report.created_tables,
}),
);
Exit::Ok
}
fn load_boundary_event(event_log_path: &Path) -> Result<Event, Exit> {
let log = match JsonlLog::open(event_log_path) {
Ok(log) => log,
Err(err) => {
eprintln!(
"cortex migrate v2 --resume-mirror: failed to open JSONL log `{}`: {err}. no state was changed.",
event_log_path.display()
);
return Err(Exit::PreconditionUnmet);
}
};
let iter = match log.iter() {
Ok(iter) => iter,
Err(err) => {
eprintln!(
"cortex migrate v2 --resume-mirror: failed to iterate JSONL log: {err}. no state was changed."
);
return Err(Exit::PreconditionUnmet);
}
};
let mut boundary: Option<Event> = None;
for item in iter {
let event = match item {
Ok(event) => event,
Err(err) => {
eprintln!(
"cortex migrate v2 --resume-mirror: failed to decode JSONL row: {err}. no state was changed."
);
return Err(Exit::PreconditionUnmet);
}
};
if event
.payload
.get("kind")
.and_then(serde_json::Value::as_str)
== Some(SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND)
{
if boundary.is_some() {
eprintln!(
"cortex migrate v2 --resume-mirror: JSONL log carries more than one schema_migration.v1_to_v2 row; resume refused. no state was changed."
);
eprintln!(
"cortex migrate v2 --resume-mirror: hint: this is a structural integrity \
issue — restore from the blessed pre-v2 backup to recover a clean store"
);
return Err(Exit::SchemaMismatch);
}
boundary = Some(event);
}
}
boundary.ok_or_else(|| {
eprintln!(
"cortex migrate v2 --resume-mirror: boundary audit reported a boundary row but the rescan found none. no state was changed."
);
Exit::PreconditionUnmet
})
}
fn boundary_row_present_in_sqlite(
pool: &cortex_store::Pool,
event_hash: &str,
) -> Result<bool, cortex_store::StoreError> {
let count: u64 = pool.query_row(
"SELECT COUNT(*) FROM events WHERE event_hash = ?1;",
rusqlite::params![event_hash],
|row| row.get(0),
)?;
Ok(count > 0)
}
struct ValidBackupManifest {
path: PathBuf,
table_row_counts: BackupManifestTableRowCounts,
}
const BACKUP_MANIFEST_PRE_V2_KIND_BUT_V2_ROWS_PRESENT_INVARIANT: &str =
"migrate.v2.backup_manifest.pre_v2_kind_but_v2_rows_present";
fn enforce_backup_manifest_consistent_with_live_store(
pool: &cortex_store::Pool,
manifest_path: &std::path::Path,
) -> Result<(), Exit> {
let counts = match cortex_store::verify::count_post_v2_rows_outside_boundary(pool) {
Ok(counts) => counts,
Err(err) => {
eprintln!(
"cortex migrate v2: backup manifest `{}` consistency check failed to read live store: {err}. no state was changed.",
manifest_path.display()
);
return Err(Exit::PreconditionUnmet);
}
};
if counts.is_empty() {
return Ok(());
}
eprintln!(
"cortex migrate v2: {BACKUP_MANIFEST_PRE_V2_KIND_BUT_V2_ROWS_PRESENT_INVARIANT}: backup manifest `{}` declares kind=cortex_pre_v2_backup but the live store has {} row(s) with schema_version >= 2 outside the schema_migration.v1_to_v2 boundary (events_post_v2={}, traces_post_v2={}). Refusing to corrupt a v2 store with a fake v1 migration. no state was changed.",
manifest_path.display(),
counts.total(),
counts.events_post_v2,
counts.traces_post_v2,
);
Err(Exit::PreconditionUnmet)
}
fn validate_backup_manifest(path: &std::path::Path) -> Result<ValidBackupManifest, Exit> {
if !path.is_file() {
eprintln!(
"cortex migrate v2: backup manifest `{}` was not found; no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
}
let raw = match std::fs::read_to_string(path) {
Ok(raw) => raw,
Err(err) => {
eprintln!(
"cortex migrate v2: backup manifest `{}` could not be read: {err}. no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
}
};
let manifest_value: serde_json::Value = match serde_json::from_str(&raw) {
Ok(manifest_value) => manifest_value,
Err(err) => {
eprintln!(
"cortex migrate v2: backup manifest `{}` is not a valid backup manifest: {err}. no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
}
};
reject_reserved_cutover_approval_fields(path, &manifest_value)?;
let manifest: BackupManifest = match serde_json::from_value(manifest_value) {
Ok(manifest) => manifest,
Err(err) => {
eprintln!(
"cortex migrate v2: backup manifest `{}` is not a valid backup manifest: {err}. no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
}
};
if manifest.kind != "cortex_pre_v2_backup" {
eprintln!(
"cortex migrate v2: backup manifest `{}` has invalid kind `{}`; expected cortex_pre_v2_backup. no state was changed.",
path.display(),
manifest.kind
);
return Err(Exit::PreconditionUnmet);
}
if manifest.schema_version != PRE_V2_BACKUP_SCHEMA_VERSION {
eprintln!(
"cortex migrate v2: backup manifest `{}` has schema_version {}; expected {}. no state was changed.",
path.display(),
manifest.schema_version,
PRE_V2_BACKUP_SCHEMA_VERSION
);
return Err(Exit::SchemaMismatch);
}
for (field, value) in [
("sqlite_store", manifest.sqlite_store.as_str()),
("jsonl_mirror", manifest.jsonl_mirror.as_str()),
("tool_version", manifest.tool_version.as_str()),
] {
if value.trim().is_empty() {
eprintln!(
"cortex migrate v2: backup manifest `{}` has empty `{field}`. no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
}
}
for (field, value) in [
("sqlite_store", manifest.sqlite_store.as_str()),
("jsonl_mirror", manifest.jsonl_mirror.as_str()),
] {
if let Err(reason) = validate_manifest_artifact_ref(value) {
eprintln!(
"cortex migrate v2: backup manifest `{}` has invalid `{field}` artifact reference `{value}`: {reason}. no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
}
let artifact_path = manifest_artifact_path(path, value);
if !artifact_path.is_file() {
eprintln!(
"cortex migrate v2: backup manifest `{}` references missing `{field}` artifact `{}`. no state was changed.",
path.display(),
artifact_path.display()
);
return Err(Exit::PreconditionUnmet);
}
}
if chrono::DateTime::parse_from_rfc3339(&manifest.backup_timestamp).is_err() {
eprintln!(
"cortex migrate v2: backup manifest `{}` has invalid `backup_timestamp`; expected RFC3339. no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
}
let Some(table_row_counts) = manifest.table_row_counts else {
eprintln!(
"cortex migrate v2: backup manifest `{}` is missing required `table_row_counts` field. Regenerate the backup with this binary's `cortex backup --output` so the post-migrate count-mismatch refusal helper has a pre-migrate baseline. no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
};
Ok(ValidBackupManifest {
path: path.to_path_buf(),
table_row_counts,
})
}
fn reject_reserved_cutover_approval_fields(
path: &std::path::Path,
manifest: &serde_json::Value,
) -> Result<(), Exit> {
let Some(object) = manifest.as_object() else {
return Ok(());
};
for field in RESERVED_BACKUP_MANIFEST_CUTOVER_APPROVAL_FIELDS {
if object.contains_key(*field) {
eprintln!(
"cortex migrate v2: backup manifest `{}` contains reserved cutover approval field `{field}`; backup manifests cannot approve schema cutover. no state was changed.",
path.display()
);
return Err(Exit::PreconditionUnmet);
}
}
Ok(())
}
fn emit_cutover_readiness_stdout() {
for line in cutover_readiness_lines() {
println!("{line}");
}
}
fn emit_cutover_readiness_stderr() {
for line in cutover_readiness_lines() {
eprintln!("cortex migrate v2: {line}");
}
}
fn cutover_readiness_lines() -> Vec<String> {
vec![
"default_v2_persistence_ready=true".to_string(),
"default_v2_write_enabled=true".to_string(),
"default_v2_cutover_ready=true".to_string(),
"cutover_readiness=ready".to_string(),
format!(
"cutover_readiness_missing_gates={}",
MISSING_ATOMIC_CUTOVER_PREREQUISITES.len()
),
"unattended_migrate_supported=requires_operator_attestation".to_string(),
]
}
fn validate_manifest_artifact_ref(artifact: &str) -> Result<(), &'static str> {
let artifact_path = std::path::Path::new(artifact);
if artifact_path.is_absolute() {
return Err("absolute paths are not accepted");
}
if artifact_path
.components()
.any(|component| matches!(component, std::path::Component::ParentDir))
{
return Err("parent-directory traversal is not accepted");
}
Ok(())
}
fn manifest_artifact_path(manifest_path: &std::path::Path, artifact: &str) -> PathBuf {
let artifact_path = std::path::Path::new(artifact);
manifest_path
.parent()
.unwrap_or_else(|| std::path::Path::new("."))
.join(artifact_path)
}
struct BoundaryPreflight {
previous_v1_head_hash: String,
migration_script_digest: String,
fixture_verification_result_hash: String,
}
fn boundary_preflight(layout: &DataLayout, plan: &V2DryRunPlan) -> Result<BoundaryPreflight, Exit> {
match verify_schema_migration_v1_to_v2_boundary(&layout.event_log_path, false) {
Ok(report) if !report.failures.is_empty() => {
for failure in &report.failures {
eprintln!(
"cortex migrate v2: {}: {:?}",
failure.invariant, failure.detail
);
}
eprintln!(
"cortex migrate v2: existing schema boundary audit failed; no state was changed."
);
return Err(Exit::SchemaMismatch);
}
Ok(report) if !report.boundary_rows.is_empty() => {
eprintln!(
"cortex migrate v2: schema_migration.v1_to_v2 boundary already exists; no state was changed."
);
return Err(Exit::PreconditionUnmet);
}
Ok(_) => {}
Err(err) => {
eprintln!(
"cortex migrate v2: failed to inspect existing schema boundary events: {err}. no state was changed."
);
return Err(Exit::PreconditionUnmet);
}
}
let jsonl_head = match JsonlLog::open(&layout.event_log_path) {
Ok(log) => log.head().map(str::to_owned),
Err(err) => {
eprintln!(
"cortex migrate v2: failed to inspect JSONL event head: {err}. no state was changed."
);
return Err(Exit::PreconditionUnmet);
}
};
let previous_v1_head_hash = match (jsonl_head, plan.fixture.event_chain_head.clone()) {
(Some(jsonl), Some(sqlite)) if jsonl != sqlite => {
eprintln!(
"cortex migrate v2: boundary preflight found mismatched event heads: jsonl={jsonl}, sqlite={sqlite}. no state was changed."
);
return Err(Exit::PreconditionUnmet);
}
(Some(jsonl), _) => jsonl,
(None, Some(sqlite)) => {
eprintln!(
"cortex migrate v2: boundary preflight found SQLite head {sqlite} but no JSONL event head. no state was changed."
);
return Err(Exit::PreconditionUnmet);
}
(None, None) => {
eprintln!(
"cortex migrate v2: boundary preflight requires a current v1 event_chain_head. no state was changed."
);
return Err(Exit::PreconditionUnmet);
}
};
let migration_script_digest = migration_bundle_digest();
let fixture_verification_result_hash =
store_fixture_verification_result_hash(plan, &previous_v1_head_hash);
Ok(BoundaryPreflight {
previous_v1_head_hash,
migration_script_digest,
fixture_verification_result_hash,
})
}
fn migration_bundle_digest() -> String {
let mut input = String::new();
input.push_str("cortex-schema-v2-migration-bundle\n");
input.push_str("schema_v2_expand_sql\n");
input.push_str(SCHEMA_V2_EXPAND_SQL);
input.push_str("\ncli_tooling=cortex migrate v2 --dry-run preflight\n");
format!("blake3:{}", blake3::hash(input.as_bytes()).to_hex())
}
fn stage_status_label(status: V2MigrationStageStatus) -> &'static str {
match status {
V2MigrationStageStatus::Ready => "ready",
V2MigrationStageStatus::Pending => "pending",
V2MigrationStageStatus::Blocked => "blocked",
}
}
const OPERATOR_ATTESTATION_ENVELOPE_SCHEMA_VERSION: u16 = 1;
const OPERATOR_ATTESTATION_ENVELOPE_PURPOSE: &str = "cortex.schema_migration.v1_to_v2";
#[derive(Debug, Clone, Deserialize)]
struct OperatorAttestationEnvelope {
schema_version: u16,
purpose: String,
operator_verifying_key_hex: String,
operator_key_id: String,
signed_at: DateTime<Utc>,
boundary: OperatorAttestationBoundary,
signature_hex: String,
}
#[derive(Debug, Clone, Deserialize)]
struct OperatorAttestationBoundary {
previous_v1_head_hash: String,
migration_script_digest: String,
fixture_verification_result_hash: String,
}
#[derive(Debug)]
enum OperatorAttestationError {
NotFound(PathBuf),
Read {
path: PathBuf,
source: std::io::Error,
},
Decode {
path: PathBuf,
source: serde_json::Error,
},
UnknownSchema {
path: PathBuf,
found: u16,
expected: u16,
},
WrongPurpose {
path: PathBuf,
found: String,
expected: &'static str,
},
MalformedVerifyingKey {
path: PathBuf,
detail: String,
},
MalformedSignature {
path: PathBuf,
detail: String,
},
BoundaryMismatch {
path: PathBuf,
field: &'static str,
envelope: String,
observed: String,
},
SignatureRejected {
path: PathBuf,
},
}
impl std::fmt::Display for OperatorAttestationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotFound(path) => write!(
f,
"operator attestation `{}` was not found",
path.display()
),
Self::Read { path, source } => write!(
f,
"operator attestation `{}` could not be read: {source}",
path.display()
),
Self::Decode { path, source } => write!(
f,
"operator attestation `{}` is not a valid JSON envelope: {source}",
path.display()
),
Self::UnknownSchema {
path,
found,
expected,
} => write!(
f,
"operator attestation `{}` declares schema_version {found}; expected {expected}",
path.display()
),
Self::WrongPurpose {
path,
found,
expected,
} => write!(
f,
"operator attestation `{}` has purpose `{found}`; expected `{expected}`",
path.display()
),
Self::MalformedVerifyingKey { path, detail } => write!(
f,
"operator attestation `{}` has a malformed operator_verifying_key_hex field: {detail}",
path.display()
),
Self::MalformedSignature { path, detail } => write!(
f,
"operator attestation `{}` has a malformed signature_hex field: {detail}",
path.display()
),
Self::BoundaryMismatch {
path,
field,
envelope,
observed,
} => write!(
f,
"operator attestation `{}` boundary field `{field}` mismatch: envelope=`{envelope}`, observed=`{observed}`",
path.display()
),
Self::SignatureRejected { path } => write!(
f,
"operator attestation `{}` Ed25519 signature did not verify under the declared operator key",
path.display()
),
}
}
}
fn decode_lowercase_hex(input: &str) -> Result<Vec<u8>, String> {
if input.len() % 2 != 0 {
return Err(format!("odd hex length {}", input.len()));
}
let mut out = Vec::with_capacity(input.len() / 2);
let bytes = input.as_bytes();
let mut i = 0;
while i < bytes.len() {
let hi = hex_nibble(bytes[i])
.ok_or_else(|| format!("invalid hex byte `{}` at offset {i}", bytes[i] as char))?;
let lo = hex_nibble(bytes[i + 1]).ok_or_else(|| {
format!(
"invalid hex byte `{}` at offset {}",
bytes[i + 1] as char,
i + 1
)
})?;
out.push((hi << 4) | lo);
i += 2;
}
Ok(out)
}
fn hex_nibble(byte: u8) -> Option<u8> {
match byte {
b'0'..=b'9' => Some(byte - b'0'),
b'a'..=b'f' => Some(byte - b'a' + 10),
_ => None,
}
}
fn operator_attestation_signing_input(env: &OperatorAttestationEnvelope) -> Vec<u8> {
let signed_at_rfc3339 = env.signed_at.to_rfc3339();
OperatorAttestationEnvelopeForSigning {
schema_version: env.schema_version,
purpose: env.purpose.as_str(),
operator_key_id: env.operator_key_id.as_str(),
signed_at_rfc3339: &signed_at_rfc3339,
previous_v1_head_hash: env.boundary.previous_v1_head_hash.as_str(),
migration_script_digest: env.boundary.migration_script_digest.as_str(),
fixture_verification_result_hash: env.boundary.fixture_verification_result_hash.as_str(),
}
.signing_input()
}
fn push_lp(out: &mut Vec<u8>, bytes: &[u8]) {
out.extend_from_slice(&(bytes.len() as u64).to_be_bytes());
out.extend_from_slice(bytes);
}
fn load_and_verify_operator_attestation(
path: &Path,
boundary: &BoundaryPreflight,
) -> Result<OperatorAttestationEnvelope, OperatorAttestationError> {
if !path.is_file() {
return Err(OperatorAttestationError::NotFound(path.to_path_buf()));
}
let raw = std::fs::read_to_string(path).map_err(|source| OperatorAttestationError::Read {
path: path.to_path_buf(),
source,
})?;
let envelope: OperatorAttestationEnvelope =
serde_json::from_str(&raw).map_err(|source| OperatorAttestationError::Decode {
path: path.to_path_buf(),
source,
})?;
if envelope.schema_version != OPERATOR_ATTESTATION_ENVELOPE_SCHEMA_VERSION {
return Err(OperatorAttestationError::UnknownSchema {
path: path.to_path_buf(),
found: envelope.schema_version,
expected: OPERATOR_ATTESTATION_ENVELOPE_SCHEMA_VERSION,
});
}
if envelope.purpose != OPERATOR_ATTESTATION_ENVELOPE_PURPOSE {
return Err(OperatorAttestationError::WrongPurpose {
path: path.to_path_buf(),
found: envelope.purpose.clone(),
expected: OPERATOR_ATTESTATION_ENVELOPE_PURPOSE,
});
}
if envelope.boundary.previous_v1_head_hash != boundary.previous_v1_head_hash {
return Err(OperatorAttestationError::BoundaryMismatch {
path: path.to_path_buf(),
field: "previous_v1_head_hash",
envelope: envelope.boundary.previous_v1_head_hash.clone(),
observed: boundary.previous_v1_head_hash.clone(),
});
}
if envelope.boundary.migration_script_digest != boundary.migration_script_digest {
return Err(OperatorAttestationError::BoundaryMismatch {
path: path.to_path_buf(),
field: "migration_script_digest",
envelope: envelope.boundary.migration_script_digest.clone(),
observed: boundary.migration_script_digest.clone(),
});
}
if envelope.boundary.fixture_verification_result_hash
!= boundary.fixture_verification_result_hash
{
return Err(OperatorAttestationError::BoundaryMismatch {
path: path.to_path_buf(),
field: "fixture_verification_result_hash",
envelope: envelope.boundary.fixture_verification_result_hash.clone(),
observed: boundary.fixture_verification_result_hash.clone(),
});
}
let verifying_key_bytes =
decode_lowercase_hex(&envelope.operator_verifying_key_hex).map_err(|detail| {
OperatorAttestationError::MalformedVerifyingKey {
path: path.to_path_buf(),
detail,
}
})?;
if verifying_key_bytes.len() != 32 {
return Err(OperatorAttestationError::MalformedVerifyingKey {
path: path.to_path_buf(),
detail: format!(
"expected 32 verifying key bytes, got {}",
verifying_key_bytes.len()
),
});
}
let mut key_array = [0u8; 32];
key_array.copy_from_slice(&verifying_key_bytes);
let verifying_key = VerifyingKey::from_bytes(&key_array).map_err(|err| {
OperatorAttestationError::MalformedVerifyingKey {
path: path.to_path_buf(),
detail: err.to_string(),
}
})?;
let signature_bytes = decode_lowercase_hex(&envelope.signature_hex).map_err(|detail| {
OperatorAttestationError::MalformedSignature {
path: path.to_path_buf(),
detail,
}
})?;
if signature_bytes.len() != 64 {
return Err(OperatorAttestationError::MalformedSignature {
path: path.to_path_buf(),
detail: format!("expected 64 signature bytes, got {}", signature_bytes.len()),
});
}
let mut sig_array = [0u8; 64];
sig_array.copy_from_slice(&signature_bytes);
let signature = Signature::from_bytes(&sig_array);
let signing_input = operator_attestation_signing_input(&envelope);
verifying_key
.verify(&signing_input, &signature)
.map_err(|_| OperatorAttestationError::SignatureRejected {
path: path.to_path_buf(),
})?;
Ok(envelope)
}
fn build_migration_policy_decision(
pool: &Pool,
attestation_path: Option<&Path>,
boundary: &BoundaryPreflight,
) -> Result<PolicyDecision, Exit> {
let Some(path) = attestation_path else {
eprintln!(
"cortex migrate v2: cutover requires --operator-attestation <PATH> with a valid Ed25519-signed operator attestation envelope authorising the v1 -> v2 boundary (ADR 0010 §1-§2, ADR 0026 §4). no state was changed."
);
return Err(Exit::PreconditionUnmet);
};
let envelope = match load_and_verify_operator_attestation(path, boundary) {
Ok(envelope) => envelope,
Err(err) => {
eprintln!("cortex migrate v2: {err}. no state was changed.");
return Err(Exit::PreconditionUnmet);
}
};
let attestation_reason = format!(
"operator attestation envelope `{}` verified under key {} signed at {}",
path.display(),
envelope.operator_key_id,
envelope.signed_at.to_rfc3339(),
);
let authority_reason = format!(
"operator attestation envelope authorises ADR 0019 §3 operator authority class for key {}",
envelope.operator_key_id
);
let temporal_authority = match revalidate_operator_temporal_authority(
pool,
SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
&envelope.operator_key_id,
envelope.signed_at,
TrustTier::Operator,
) {
Ok(contribution) => contribution,
Err(err) => {
let invariant = revalidation_failed_invariant("migrate.v2");
eprintln!(
"cortex migrate v2: {invariant}: failed to read authority timeline for key {}: {err}. no state was changed.",
envelope.operator_key_id,
);
return Err(Exit::PreconditionUnmet);
}
};
let contributions = vec![
PolicyContribution::new(
SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
PolicyOutcome::Allow,
authority_reason,
)
.expect("static contribution shape is valid"),
PolicyContribution::new(
SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
attestation_reason,
)
.expect("static contribution shape is valid"),
temporal_authority.contribution(),
];
let decision = compose_policy_outcomes(contributions, None);
if !matches!(
decision.final_outcome,
PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass
) {
let invariant = revalidation_failed_invariant("migrate.v2");
if !temporal_authority.report.valid_now {
eprintln!(
"cortex migrate v2: {invariant}: operator temporal authority current use blocked for key {} (reasons: {}). no state was changed.",
temporal_authority.report.key_id,
temporal_authority
.report
.reasons
.iter()
.map(|reason| reason.wire_str())
.collect::<Vec<_>>()
.join(","),
);
}
eprintln!(
"cortex migrate v2: ADR 0026 composition for the v1 -> v2 boundary refused with outcome {:?}. no state was changed.",
decision.final_outcome
);
return Err(Exit::PreconditionUnmet);
}
eprintln!(
"cortex migrate v2: operator_attestation_path={}",
path.display()
);
eprintln!(
"cortex migrate v2: operator_attestation_key_id={}",
envelope.operator_key_id
);
eprintln!(
"cortex migrate v2: operator_attestation_signed_at={}",
envelope.signed_at.to_rfc3339()
);
eprintln!("cortex migrate v2: operator_attestation_verified=true");
eprintln!(
"cortex migrate v2: operator_temporal_authority_revalidated=true key_id={} valid_now={}",
temporal_authority.report.key_id, temporal_authority.report.valid_now,
);
Ok(decision)
}