use std::path::{Path, PathBuf};
use std::process::Command;
use chrono::{TimeZone, Utc};
use cortex_core::{
Event, EventId, EventSource, EventType, KeyLifecycleState, SchemaMigrationV1ToV2Payload,
TrustTier,
};
use cortex_ledger::{
append_policy_decision_test_allow, audit::verify_schema_migration_v1_to_v2_boundary,
schema_migration_v1_to_v2_policy_decision_test_allow, JsonlLog,
};
use cortex_store::migrate::apply_pending;
use cortex_store::migrate_v2::{dry_run_plan, fixture_verification_result_hash};
use cortex_store::repo::{AuthorityRepo, KeyTimelineRecord, PrincipalTimelineRecord};
use ed25519_dalek::{Signer, SigningKey};
use rusqlite::Connection;
fn cortex_bin() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_cortex"))
}
fn fixtures_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("fixtures")
}
fn run_in(cwd: &Path, args: &[&str]) -> std::process::Output {
let data_dir = cwd.join("xdg").join("cortex");
Command::new(cortex_bin())
.current_dir(cwd)
.env("CORTEX_DATA_DIR", &data_dir)
.env("XDG_DATA_HOME", cwd.join("xdg"))
.env("HOME", cwd)
.env("APPDATA", cwd.join("appdata"))
.env("LOCALAPPDATA", cwd.join("localappdata"))
.args(args)
.output()
.expect("spawn cortex")
}
fn assert_exit(out: &std::process::Output, expected: i32) {
let code = out.status.code().expect("process exited via signal");
assert_eq!(
code,
expected,
"expected exit {expected}, got {code}\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
}
fn assert_cutover_readiness_ready(output: &str) {
for field in [
"default_v2_persistence_ready=true",
"default_v2_write_enabled=true",
"default_v2_cutover_ready=true",
"cutover_readiness=ready",
"cutover_readiness_missing_gates=0",
"unattended_migrate_supported=requires_operator_attestation",
] {
assert!(output.contains(field), "output missing {field}: {output}");
}
}
fn has_column(pool: &Connection, table: &str, column: &str) -> bool {
let mut stmt = pool
.prepare(&format!("PRAGMA table_info({table});"))
.expect("prepare table_info");
let rows = stmt
.query_map([], |row| row.get::<_, String>(1))
.expect("query table_info");
for row in rows {
if row.expect("column row") == column {
return true;
}
}
false
}
fn has_table(pool: &Connection, table: &str) -> bool {
pool.query_row(
"SELECT EXISTS (
SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1
);",
[table],
|row| row.get::<_, bool>(0),
)
.expect("query table existence")
}
fn table_count(pool: &Connection, table: &str) -> u64 {
let sql = format!("SELECT COUNT(*) FROM {table};");
pool.query_row(&sql, [], |row| row.get(0))
.expect("query table count")
}
fn stdout_value<'a>(stdout: &'a str, key: &str) -> &'a str {
let prefix = format!("{key}=");
stdout
.lines()
.find_map(|line| line.strip_prefix(&prefix))
.unwrap_or_else(|| panic!("stdout missing {key}: {stdout}"))
}
fn init_layout(tmp: &Path) -> (PathBuf, PathBuf) {
let out = run_in(tmp, &["init"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
let db_line = stdout
.lines()
.find(|line| line.starts_with("cortex init: db"))
.expect("init stdout includes db path");
let path = db_line
.split_once('=')
.expect("db line has equals")
.1
.trim()
.split_once(" (")
.expect("db line has status suffix")
.0;
let event_log_line = stdout
.lines()
.find(|line| line.starts_with("cortex init: event_log"))
.expect("init stdout includes event log path");
let event_log_path = event_log_line
.split_once('=')
.expect("event_log line has equals")
.1
.trim()
.split_once(" (")
.expect("event_log line has status suffix")
.0;
(PathBuf::from(path), PathBuf::from(event_log_path))
}
fn init(tmp: &Path) -> PathBuf {
init_layout(tmp).0
}
fn seed_migrate_operator_authority(db: &Path) {
let pool = Connection::open(db).expect("open initialized sqlite db");
apply_pending(&pool).expect("apply migrations");
let repo = AuthorityRepo::new(&pool);
let effective_at = Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 1).unwrap();
repo.append_principal_state(
&PrincipalTimelineRecord {
principal_id: "operator-principal".into(),
trust_tier: TrustTier::Operator,
effective_at,
trust_review_due_at: None,
removed_at: None,
audit_ref: None,
},
&cortex_store::repo::authority::principal_state_policy_decision_test_allow(),
)
.expect("append operator trust state");
repo.append_key_state(
&KeyTimelineRecord {
key_id: "fixture-operator-key".into(),
principal_id: "operator-principal".into(),
state: KeyLifecycleState::Active,
effective_at,
reason: None,
audit_ref: None,
},
&cortex_store::repo::authority::key_state_policy_decision_test_allow(),
)
.expect("append active operator key state");
}
fn revoke_migrate_operator_authority(db: &Path, effective_at: chrono::DateTime<Utc>) {
let pool = Connection::open(db).expect("open initialized sqlite db");
apply_pending(&pool).expect("apply migrations");
AuthorityRepo::new(&pool)
.append_key_state(
&KeyTimelineRecord {
key_id: "fixture-operator-key".into(),
principal_id: "operator-principal".into(),
state: KeyLifecycleState::Revoked,
effective_at,
reason: Some("test revocation".into()),
audit_ref: None,
},
&cortex_store::repo::authority::key_state_policy_decision_test_allow(),
)
.expect("append revoked operator key state");
}
fn ingest_minimal_session(tmp: &Path) {
let session = fixtures_dir().join("session-minimal.json");
let out = run_in(tmp, &["ingest", session.to_str().unwrap()]);
assert_exit(&out, 0);
}
fn write_valid_backup_manifest(path: &Path) {
let dir = path.parent().expect("backup manifest has parent");
std::fs::write(dir.join("state.sqlite"), "sqlite backup placeholder")
.expect("write sqlite backup artifact");
std::fs::write(dir.join("events.jsonl"), "jsonl backup placeholder")
.expect("write jsonl backup artifact");
std::fs::write(
path,
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z","table_row_counts":{"events":0,"traces":0,"episodes":0,"memories":0}}"#,
)
.expect("write backup manifest");
}
fn generate_backup_manifest(tmp: &Path) -> PathBuf {
let output = tmp.join("backup-bundle");
let out = run_in(tmp, &["backup", "--output", output.to_str().unwrap()]);
assert_exit(&out, 0);
output.join("BACKUP_MANIFEST")
}
fn fixture_operator_signing_key() -> SigningKey {
SigningKey::from_bytes(&[0x07u8; 32])
}
fn lowercase_hex(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
out.push_str(&format!("{b:02x}"));
}
out
}
fn dry_run_boundary_preflight(tmp: &Path) -> (String, String, String) {
let out = run_in(tmp, &["migrate", "v2", "--dry-run"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
let head = stdout_value(&stdout, "boundary_previous_v1_head_hash").to_string();
let script = stdout_value(&stdout, "migration_script_digest").to_string();
let fixture = stdout_value(&stdout, "fixture_verification_result_hash").to_string();
(head, script, fixture)
}
fn operator_attestation_test_signing_input(
schema_version: u16,
purpose: &str,
key_id: &str,
signed_at_rfc3339: &str,
previous_v1_head_hash: &str,
migration_script_digest: &str,
fixture_verification_result_hash: &str,
) -> Vec<u8> {
const DOMAIN_TAG: u8 = 0x20;
let mut out = Vec::new();
out.push(DOMAIN_TAG);
out.extend_from_slice(&schema_version.to_be_bytes());
for field in [
purpose,
key_id,
signed_at_rfc3339,
previous_v1_head_hash,
migration_script_digest,
fixture_verification_result_hash,
] {
out.extend_from_slice(&(field.len() as u64).to_be_bytes());
out.extend_from_slice(field.as_bytes());
}
out
}
fn write_valid_operator_attestation(
path: &Path,
previous_v1_head_hash: &str,
migration_script_digest: &str,
fixture_verification_result_hash: &str,
) {
let signing_key = fixture_operator_signing_key();
let verifying_key = signing_key.verifying_key();
let operator_key_id = "fixture-operator-key";
let purpose = "cortex.schema_migration.v1_to_v2";
let schema_version: u16 = 1;
let signed_at = Utc::now().to_rfc3339();
let signing_input = operator_attestation_test_signing_input(
schema_version,
purpose,
operator_key_id,
&signed_at,
previous_v1_head_hash,
migration_script_digest,
fixture_verification_result_hash,
);
let signature = signing_key.sign(&signing_input);
let envelope = serde_json::json!({
"schema_version": schema_version,
"purpose": purpose,
"operator_verifying_key_hex": lowercase_hex(verifying_key.as_bytes()),
"operator_key_id": operator_key_id,
"signed_at": signed_at,
"boundary": {
"previous_v1_head_hash": previous_v1_head_hash,
"migration_script_digest": migration_script_digest,
"fixture_verification_result_hash": fixture_verification_result_hash,
},
"signature_hex": lowercase_hex(&signature.to_bytes()),
});
std::fs::write(path, serde_json::to_string_pretty(&envelope).unwrap())
.expect("write operator attestation envelope");
}
fn tamper_last_jsonl_event_payload(path: &Path) {
let raw = std::fs::read_to_string(path).expect("read event log");
let mut rows: Vec<serde_json::Value> = raw
.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| serde_json::from_str(line).expect("parse signed row"))
.collect();
let last_row = rows.last_mut().expect("event log has rows");
let event = if last_row.get("event").is_some() {
last_row.get_mut("event").expect("nested event exists")
} else {
last_row
};
let payload = event
.get_mut("payload")
.expect("last event row has payload");
*payload = serde_json::json!({"tampered_after_boundary": true});
let mut serialized = String::new();
for row in rows {
serialized.push_str(&serde_json::to_string(&row).expect("serialize signed row"));
serialized.push('\n');
}
std::fs::write(path, serialized).expect("rewrite tampered event log");
}
fn append_post_boundary_schema_v2_event(path: &Path) {
let mut log = JsonlLog::open(path).expect("open event log for post-boundary v2 append");
let event = Event {
id: EventId::new(),
schema_version: 2,
observed_at: "2026-05-04T23:00:00Z".parse().unwrap(),
recorded_at: "2026-05-04T23:00:01Z".parse().unwrap(),
source: EventSource::Tool {
name: "schema-v2-fixture".into(),
},
event_type: EventType::ToolResult,
trace_id: None,
session_id: Some("s2-post-boundary".into()),
domain_tags: vec!["schema".into(), "s2".into()],
payload: serde_json::json!({
"post_boundary_schema_v2": true,
"fixture": "cli-audit-traversal"
}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
};
log.append(event, &append_policy_decision_test_allow())
.expect("append post-boundary schema v2 row");
}
fn assert_backup_manifest_rejected_without_mutation(
manifest_json: &str,
expected_exit: i32,
expected_stderr: &[&str],
) {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let manifest = tmp.path().join("backup-manifest.json");
std::fs::write(&manifest, manifest_json).expect("write backup manifest");
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, expected_exit);
let stderr = String::from_utf8_lossy(&out.stderr);
for expected in expected_stderr {
assert!(stderr.contains(expected), "stderr: {stderr}");
}
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
assert!(has_column(&pool, "events", "source_attestation_json"));
}
#[test]
fn audit_verify_default_does_not_require_v1_to_v2_boundary() {
let tmp = tempfile::tempdir().unwrap();
init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let out = run_in(tmp.path(), &["audit", "verify"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(stdout.contains("audit verify:"), "stdout: {stdout}");
}
#[test]
fn audit_verify_requires_v1_to_v2_boundary_when_flagged() {
let tmp = tempfile::tempdir().unwrap();
init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let out = run_in(
tmp.path(),
&["audit", "verify", "--require-v1-to-v2-boundary"],
);
assert_exit(&out, 4);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("schema_migration.v1_to_v2.boundary.missing"),
"stderr: {stderr}"
);
}
#[test]
fn audit_verify_require_v1_to_v2_boundary_passes_with_exactly_one_boundary() {
let tmp = tempfile::tempdir().unwrap();
let (_db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let mut log = JsonlLog::open(&event_log_path).expect("open event log");
let v1_head = log.head().expect("v1 head").to_string();
log.append_schema_migration_v1_to_v2(
SchemaMigrationV1ToV2Payload::new(v1_head, "script-digest", None, "fixture-digest"),
&schema_migration_v1_to_v2_policy_decision_test_allow(),
)
.expect("append boundary");
let out = run_in(
tmp.path(),
&["audit", "verify", "--require-v1-to-v2-boundary"],
);
assert_exit(&out, 0);
}
#[test]
fn audit_verify_require_v1_to_v2_boundary_rejects_duplicate_boundary_rows() {
let tmp = tempfile::tempdir().unwrap();
let (_db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let mut log = JsonlLog::open(&event_log_path).expect("open event log");
let v1_head = log.head().expect("v1 head").to_string();
let first_boundary = log
.append_schema_migration_v1_to_v2(
SchemaMigrationV1ToV2Payload::new(v1_head, "script-digest", None, "fixture-digest"),
&schema_migration_v1_to_v2_policy_decision_test_allow(),
)
.expect("append first boundary");
log.append_schema_migration_v1_to_v2(
SchemaMigrationV1ToV2Payload::new(first_boundary, "script-digest", None, "fixture-digest"),
&schema_migration_v1_to_v2_policy_decision_test_allow(),
)
.expect("append duplicate boundary");
let out = run_in(
tmp.path(),
&["audit", "verify", "--require-v1-to-v2-boundary"],
);
assert_exit(&out, 4);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("schema_migration.v1_to_v2.boundary.duplicate"),
"stderr: {stderr}"
);
}
#[test]
fn audit_verify_accepts_post_cutover_v2_current_event_wire_rows() {
let tmp = tempfile::tempdir().unwrap();
let (_db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let mut log = JsonlLog::open(&event_log_path).expect("open event log");
let v1_head = log.head().expect("v1 head").to_string();
log.append_schema_migration_v1_to_v2(
SchemaMigrationV1ToV2Payload::new(v1_head, "script-digest", None, "fixture-digest"),
&schema_migration_v1_to_v2_policy_decision_test_allow(),
)
.expect("append boundary");
let post_cutover = Event {
id: EventId::new(),
schema_version: 2,
observed_at: chrono::Utc::now(),
recorded_at: chrono::Utc::now(),
source: EventSource::Runtime,
event_type: EventType::ToolResult,
trace_id: None,
session_id: Some("lane-s2".into()),
domain_tags: vec![],
payload: serde_json::json!({
"fixture": "post-cutover-v2",
"expected": "verify"
}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
};
log.append(post_cutover, &append_policy_decision_test_allow())
.expect("append unsupported v2 row");
let out = run_in(tmp.path(), &["audit", "verify"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(stdout.contains("0 failures"), "stdout: {stdout}");
}
#[test]
fn migrate_v2_dry_run_reports_plan_without_mutation() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let migrations_before = table_count(&pool, "_migrations");
let out = run_in(tmp.path(), &["migrate", "v2", "--dry-run"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(
stdout.contains("cortex migrate v2: dry-run ok"),
"stdout: {stdout}"
);
assert!(
stdout.contains("step=preflight_schema_v1"),
"stdout: {stdout}"
);
assert!(
stdout.contains("step=leave_schema_version_unchanged"),
"stdout: {stdout}"
);
assert!(
stdout.contains("boundary_event_kind=schema_migration.v1_to_v2"),
"stdout: {stdout}"
);
assert!(
stdout.contains("schema_version_target=2"),
"stdout: {stdout}"
);
assert!(
stdout.contains("boundary_previous_v1_head_hash="),
"stdout: {stdout}"
);
assert!(
stdout.contains("migration_script_digest=blake3:"),
"stdout: {stdout}"
);
assert!(
stdout.contains("fixture_verification_result_hash=blake3:"),
"stdout: {stdout}"
);
let boundary_previous_v1_head_hash = stdout_value(&stdout, "boundary_previous_v1_head_hash");
let fixture_verification_result_hash_stdout =
stdout_value(&stdout, "fixture_verification_result_hash");
assert_eq!(
fixture_verification_result_hash_stdout.len(),
"blake3:".len() + 64,
"stdout: {stdout}"
);
let expected_fixture_verification_result_hash = fixture_verification_result_hash(
&dry_run_plan(&pool).expect("dry-run plan after CLI dry-run"),
boundary_previous_v1_head_hash,
);
assert_eq!(
fixture_verification_result_hash_stdout, expected_fixture_verification_result_hash,
"stdout digest must match store fixture verification helper"
);
assert!(
stdout.contains("operator_attestation_mode=dry_run_not_collected"),
"stdout: {stdout}"
);
assert!(
stdout.contains("boundary_preflight_ready=true"),
"stdout: {stdout}"
);
assert!(stdout.contains("cutover_authority=ok"), "stdout: {stdout}");
assert!(
stdout.contains("cutover_approved=false"),
"stdout: {stdout}"
);
assert!(
stdout.contains("cutover_guard=requires_backup_manifest"),
"stdout: {stdout}"
);
assert_cutover_readiness_ready(&stdout);
assert!(
!stdout.contains("cutover_approved=true"),
"stdout: {stdout}"
);
assert!(stdout.contains("no state was changed"), "stdout: {stdout}");
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
assert_eq!(table_count(&pool, "_migrations"), migrations_before);
assert!(has_column(&pool, "events", "source_attestation_json"));
assert!(has_table(&pool, "memory_session_uses"));
}
#[test]
fn migrate_v2_dry_run_requires_v1_event_head_for_boundary_preflight() {
let tmp = tempfile::tempdir().unwrap();
init(tmp.path());
let out = run_in(tmp.path(), &["migrate", "v2", "--dry-run"]);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("boundary preflight requires a current v1 event_chain_head"),
"stderr: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
}
#[test]
fn migrate_v2_dry_run_refuses_future_schema_rows() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES (
'evt_future', 3, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
'{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
'payload-hash', NULL, 'event-hash'
);",
[],
)
.expect("insert mismatched event");
let out = run_in(tmp.path(), &["migrate", "v2", "--dry-run"]);
assert_exit(&out, 4);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("cortex migrate v2: schema_version.events.matches_code"),
"stderr: {stderr}"
);
assert!(
stderr.contains("row evt_future has schema_version 3; expected 2"),
"stderr: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
}
#[test]
fn migrate_v2_without_dry_run_requires_backup_manifest() {
let tmp = tempfile::tempdir().unwrap();
init(tmp.path());
let out = run_in(tmp.path(), &["migrate", "v2"]);
assert_exit(&out, 2);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("cutover requires --backup-manifest"),
"stderr: {stderr}"
);
assert_cutover_readiness_ready(&stderr);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
}
#[test]
fn migrate_v2_rejects_backup_manifest_cutover_approval_fields_without_mutation() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let migrations_before = table_count(&pool, "_migrations");
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
std::fs::write(tmp.path().join("state.sqlite"), "sqlite backup placeholder")
.expect("write sqlite backup artifact");
std::fs::write(tmp.path().join("events.jsonl"), "jsonl backup placeholder")
.expect("write jsonl backup artifact");
let manifest = tmp.path().join("backup-manifest.json");
std::fs::write(
&manifest,
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z","cutover_approved":true,"operator_approval":{"actor":"test"}}"#,
)
.expect("write forged approval backup manifest");
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("reserved cutover approval field `cutover_approved`")
&& stderr.contains("backup manifests cannot approve schema cutover"),
"stderr: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
assert_eq!(table_count(&pool, "_migrations"), migrations_before);
assert!(has_column(&pool, "events", "source_attestation_json"));
assert!(has_table(&pool, "memory_session_uses"));
let current_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("v1 head still present")
.to_string();
assert_eq!(current_head, previous_v1_head);
let boundary_report =
verify_schema_migration_v1_to_v2_boundary(&event_log_path, false).expect("boundary audit");
assert!(boundary_report.ok(), "boundary report: {boundary_report:?}");
assert!(boundary_report.boundary_rows.is_empty());
}
#[test]
fn migrate_v2_full_path_cuts_over_after_post_cutover_audit_dispatch() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
assert_eq!(cortex_core::SCHEMA_VERSION, 2);
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let migrations_before = table_count(&pool, "_migrations");
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
let missing_manifest = tmp.path().join("missing-backup-manifest.json");
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
missing_manifest.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("backup manifest") && stderr.contains("was not found"),
"stderr: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&out, 0);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("operator_attestation_verified=true"),
"stderr: {stderr}"
);
assert!(
stderr.contains("stage=backup-preflight-ready status=ready"),
"stderr: {stderr}"
);
assert!(
stderr.contains("stage=expand/backfill status=ready"),
"stderr: {stderr}"
);
assert!(
stderr.contains("boundary_previous_v1_head_hash="),
"stderr: {stderr}"
);
assert!(stderr.contains("boundary_event_hash="), "stderr: {stderr}");
assert!(
stderr.contains("boundary_event_kind=schema_migration.v1_to_v2"),
"stderr: {stderr}"
);
assert!(stderr.contains("boundary_audit=ok"), "stderr: {stderr}");
assert!(
stderr.contains("post_migrate_mixed_chain_audit=ok"),
"stderr: {stderr}"
);
assert!(
stderr.contains("post_cutover_audit_dispatch=available"),
"stderr: {stderr}"
);
assert!(
stderr.contains("post_migrate_row_count_refusal=ok"),
"stderr: {stderr}"
);
assert!(stderr.contains("cutover_authority=ok"), "stderr: {stderr}");
assert!(stderr.contains("cutover_approved=true"), "stderr: {stderr}");
assert!(
stderr.contains("cutover_guard=committed"),
"stderr: {stderr}"
);
assert_cutover_readiness_ready(&stderr);
assert!(
stderr.contains("schema cutover complete. SCHEMA_VERSION=2 active"),
"stderr: {stderr}"
);
assert!(
manifest
.parent()
.unwrap()
.join("POST_V2_MIGRATE_MANIFEST")
.is_file(),
"post-migrate manifest must be written next to the backup manifest"
);
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(
table_count(&pool, "events"),
cortex_store::verify::SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA
);
assert_eq!(table_count(&pool, "_migrations"), migrations_before);
assert!(has_column(&pool, "events", "source_attestation_json"));
assert!(has_table(&pool, "memory_session_uses"));
let boundary_report =
verify_schema_migration_v1_to_v2_boundary(&event_log_path, true).expect("boundary audit");
assert!(boundary_report.ok(), "boundary report: {boundary_report:?}");
assert_eq!(boundary_report.boundary_rows.len(), 1);
let boundary_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("boundary head")
.to_string();
assert_ne!(
boundary_head, previous_v1_head,
"boundary append must advance JSONL head"
);
}
#[test]
fn migrate_v2_generated_backup_manifest_cuts_over_to_schema_v2() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
assert_eq!(cortex_core::SCHEMA_VERSION, 2);
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let manifest = generate_backup_manifest(tmp.path());
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&out, 0);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("boundary_audit=ok")
&& stderr.contains("post_migrate_mixed_chain_audit=ok")
&& stderr.contains("post_cutover_audit_dispatch=available")
&& stderr.contains("cutover_authority=ok")
&& stderr.contains("cutover_approved=true")
&& stderr.contains("cutover_guard=committed")
&& stderr.contains("schema cutover complete. SCHEMA_VERSION=2 active"),
"stderr: {stderr}"
);
assert_cutover_readiness_ready(&stderr);
assert!(
manifest
.parent()
.unwrap()
.join("POST_V2_MIGRATE_MANIFEST")
.is_file(),
"post-migrate manifest must be written next to the backup manifest"
);
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(
table_count(&pool, "events"),
events_before + cortex_store::verify::SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA
);
assert!(has_column(&pool, "events", "source_attestation_json"));
assert!(has_table(&pool, "memory_session_uses"));
let boundary_report =
verify_schema_migration_v1_to_v2_boundary(&event_log_path, true).expect("boundary audit");
assert!(boundary_report.ok(), "boundary report: {boundary_report:?}");
assert_eq!(boundary_report.boundary_rows.len(), 1);
}
#[test]
fn migrate_v2_repeated_full_path_cannot_append_second_boundary() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let manifest = generate_backup_manifest(tmp.path());
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let first = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&first, 0);
let first_stderr = String::from_utf8_lossy(&first.stderr);
assert!(
first_stderr.contains("cutover_authority=ok")
&& first_stderr.contains("cutover_approved=true")
&& first_stderr.contains("schema cutover complete"),
"stderr: {first_stderr}"
);
let boundary_report = verify_schema_migration_v1_to_v2_boundary(&event_log_path, true)
.expect("boundary audit after first run");
assert!(boundary_report.ok(), "boundary report: {boundary_report:?}");
assert_eq!(boundary_report.boundary_rows.len(), 1);
let first_boundary_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("boundary head")
.to_string();
let migrations_after_first = {
let pool = Connection::open(&db).expect("reopen db after first cutover");
table_count(&pool, "_migrations")
};
let second = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
let second_code = second.status.code().expect("second run exit code");
let second_stderr = String::from_utf8_lossy(&second.stderr);
assert_ne!(
second_code, 0,
"second migrate v2 against already-cut-over store must not succeed: stderr {second_stderr}"
);
assert!(
second_stderr.contains("schema_migration.v1_to_v2 boundary already exists")
|| second_stderr.contains("boundary already exists")
|| second_stderr.contains("boundary preflight"),
"stderr: {second_stderr}"
);
let boundary_report = verify_schema_migration_v1_to_v2_boundary(&event_log_path, true)
.expect("boundary audit after second run");
assert!(boundary_report.ok(), "boundary report: {boundary_report:?}");
assert_eq!(boundary_report.boundary_rows.len(), 1);
let second_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("boundary head remains")
.to_string();
assert_eq!(second_head, first_boundary_head);
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "_migrations"), migrations_after_first);
assert_eq!(cortex_core::SCHEMA_VERSION, 2);
}
#[test]
fn audit_verify_rejects_payload_tamper_after_v1_to_v2_boundary_event() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let manifest = generate_backup_manifest(tmp.path());
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&out, 0);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("boundary_audit=ok")
&& stderr.contains("post_migrate_mixed_chain_audit=ok"),
"stderr: {stderr}"
);
tamper_last_jsonl_event_payload(&event_log_path);
let out = run_in(
tmp.path(),
&["audit", "verify", "--require-v1-to-v2-boundary"],
);
assert_exit(&out, 3);
let stdout = String::from_utf8_lossy(&out.stdout);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(stdout.contains("audit verify:"), "stdout: {stdout}");
assert!(
stderr.contains("PayloadHashMismatch") || stderr.contains("EventHashMismatch"),
"stderr: {stderr}"
);
}
#[test]
fn audit_verify_accepts_historical_v1_boundary_and_new_v2_rows() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let manifest = generate_backup_manifest(tmp.path());
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&out, 0);
append_post_boundary_schema_v2_event(&event_log_path);
let out = run_in(
tmp.path(),
&["audit", "verify", "--require-v1-to-v2-boundary"],
);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(stdout.contains("audit verify:"), "stdout: {stdout}");
assert!(stdout.contains("0 failures"), "stdout: {stdout}");
}
#[test]
fn migrate_v2_rejects_malformed_backup_manifest_without_mutation() {
assert_backup_manifest_rejected_without_mutation(
r#"{"kind":"cortex_pre_v2_backup","schema_version":1}"#,
7,
&[
"not a valid backup manifest",
"missing field `sqlite_store`",
],
);
}
#[test]
fn migrate_v2_rejects_invalid_json_backup_manifest_without_mutation() {
assert_backup_manifest_rejected_without_mutation(
r#"{"kind":"cortex_pre_v2_backup","schema_version":"#,
7,
&["not a valid backup manifest"],
);
}
#[test]
fn migrate_v2_rejects_wrong_kind_backup_manifest_without_mutation() {
assert_backup_manifest_rejected_without_mutation(
r#"{"kind":"generic_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z"}"#,
7,
&[
"invalid kind `generic_backup`",
"expected cortex_pre_v2_backup",
],
);
}
#[test]
fn migrate_v2_rejects_wrong_schema_backup_manifest_without_mutation() {
assert_backup_manifest_rejected_without_mutation(
r#"{"kind":"cortex_pre_v2_backup","schema_version":2,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z"}"#,
4,
&["has schema_version 2; expected 1"],
);
}
#[test]
fn migrate_v2_rejects_empty_backup_manifest_fields_without_mutation() {
assert_backup_manifest_rejected_without_mutation(
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z"}"#,
7,
&["empty `sqlite_store`"],
);
}
#[test]
fn migrate_v2_rejects_missing_backup_artifacts_without_mutation() {
assert_backup_manifest_rejected_without_mutation(
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"missing-state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z"}"#,
7,
&["missing `sqlite_store` artifact", "missing-state.sqlite"],
);
}
#[test]
fn migrate_v2_rejects_absolute_backup_artifact_paths_without_mutation() {
let absolute_artifact = std::env::current_dir()
.unwrap()
.join("state.sqlite")
.to_string_lossy()
.replace('\\', "\\\\");
let manifest = format!(
r#"{{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"{absolute_artifact}","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z"}}"#
);
assert_backup_manifest_rejected_without_mutation(
&manifest,
7,
&["absolute paths are not accepted"],
);
}
#[test]
fn migrate_v2_rejects_parent_traversal_backup_artifact_paths_without_mutation() {
assert_backup_manifest_rejected_without_mutation(
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"../state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z"}"#,
7,
&["parent-directory traversal is not accepted"],
);
}
#[test]
fn migrate_v2_rejects_missing_backup_jsonl_artifact_without_mutation() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let manifest = tmp.path().join("backup-manifest.json");
std::fs::write(tmp.path().join("state.sqlite"), "sqlite backup placeholder")
.expect("write sqlite backup artifact");
std::fs::write(
&manifest,
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"missing-events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z"}"#,
)
.expect("write backup manifest");
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("missing `jsonl_mirror` artifact")
&& stderr.contains("missing-events.jsonl"),
"stderr: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
assert!(has_column(&pool, "events", "source_attestation_json"));
}
#[test]
fn migrate_v2_rejects_invalid_backup_manifest_timestamp_after_artifacts_exist_without_mutation() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let manifest = tmp.path().join("backup-manifest.json");
std::fs::write(tmp.path().join("state.sqlite"), "sqlite backup placeholder")
.expect("write sqlite backup artifact");
std::fs::write(tmp.path().join("events.jsonl"), "jsonl backup placeholder")
.expect("write jsonl backup artifact");
std::fs::write(
&manifest,
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"not-a-date"}"#,
)
.expect("write backup manifest");
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("invalid `backup_timestamp`") && stderr.contains("expected RFC3339"),
"stderr: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
assert!(has_column(&pool, "events", "source_attestation_json"));
}
#[test]
fn migrate_v2_dry_run_rejects_backup_manifest_flag_without_mutation() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--dry-run",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, 2);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("--backup-manifest") && stderr.contains("not accepted with --dry-run"),
"stderr: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
assert!(has_column(&pool, "events", "source_attestation_json"));
}
#[test]
fn migrate_v2_refuses_cutover_without_operator_attestation() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("--operator-attestation"),
"stderr should name the new flag: {stderr}"
);
assert!(stderr.contains("ADR 0010"), "stderr: {stderr}");
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let current_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("v1 head still present")
.to_string();
assert_eq!(current_head, previous_v1_head);
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
}
#[test]
fn migrate_v2_refuses_cutover_when_operator_attestation_targets_wrong_boundary() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let (_head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(
&attestation,
"0000000000000000000000000000000000000000000000000000000000000000",
&script_digest,
&fixture_digest,
);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("previous_v1_head_hash") && stderr.contains("mismatch"),
"stderr should name the boundary mismatch: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let current_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("v1 head still present")
.to_string();
assert_eq!(current_head, previous_v1_head);
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
}
#[test]
fn migrate_v2_refuses_cutover_when_operator_attestation_signature_is_forged() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation_path = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation_path, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let raw = std::fs::read_to_string(&attestation_path).expect("read envelope");
let mut envelope: serde_json::Value = serde_json::from_str(&raw).expect("envelope decodes");
let sig_hex = envelope["signature_hex"]
.as_str()
.expect("signature_hex present")
.to_string();
let mut forged = String::with_capacity(sig_hex.len());
for c in sig_hex.chars() {
let v = u8::from_str_radix(&c.to_string(), 16).expect("hex");
forged.push_str(&format!("{:x}", v ^ 0x0f));
}
envelope["signature_hex"] = serde_json::Value::String(forged);
std::fs::write(
&attestation_path,
serde_json::to_string_pretty(&envelope).unwrap(),
)
.expect("rewrite forged envelope");
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation_path.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("Ed25519 signature did not verify")
|| stderr.contains("signature did not verify"),
"stderr should surface signature rejection: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let current_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("v1 head still present")
.to_string();
assert_eq!(current_head, previous_v1_head);
}
#[test]
fn migrate_v2_unattended_migrate_succeeds_when_operator_attestation_is_valid() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let manifest = generate_backup_manifest(tmp.path());
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--unattended-migrate",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&out, 0);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("operator_attestation_verified=true"),
"stderr: {stderr}"
);
assert!(
stderr.contains("schema cutover complete. SCHEMA_VERSION=2 active"),
"stderr: {stderr}"
);
let boundary_report = verify_schema_migration_v1_to_v2_boundary(&event_log_path, true)
.expect("boundary audit after cutover");
assert!(boundary_report.ok());
assert_eq!(boundary_report.boundary_rows.len(), 1);
}
#[test]
fn migrate_v2_unattended_migrate_refuses_until_operator_attestation_is_supplied() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let migrations_before = table_count(&pool, "_migrations");
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--unattended-migrate",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("--unattended-migrate requires --operator-attestation"),
"stderr should name the new flag requirement: {stderr}"
);
assert!(
stderr.contains("ADR 0010"),
"stderr should cite ADR 0010 doctrine: {stderr}"
);
assert_cutover_readiness_ready(&stderr);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
assert_eq!(table_count(&pool, "_migrations"), migrations_before);
assert!(has_column(&pool, "events", "source_attestation_json"));
assert!(has_table(&pool, "memory_session_uses"));
let current_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("v1 head still present")
.to_string();
assert_eq!(current_head, previous_v1_head);
}
#[test]
fn doctor_strict_passes_when_schema_versions_match() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let out = run_in(tmp.path(), &["doctor", "--strict"]);
assert_exit(&out, 0);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(
stdout.contains("schema_version matches code version 2"),
"stdout: {stdout}"
);
}
#[test]
fn doctor_strict_exits_schema_mismatch_and_names_invariant() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES (
'evt_future', 3, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
'{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
'payload-hash', NULL, 'event-hash'
);",
[],
)
.expect("insert mismatched event");
let out = run_in(tmp.path(), &["doctor", "--strict"]);
assert_exit(&out, 4);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("schema_version.events.matches_code"),
"stderr: {stderr}"
);
assert!(
stderr.contains("row evt_future has schema_version 3; expected 2"),
"stderr: {stderr}"
);
}
#[test]
fn store_open_commands_refuse_future_v3_rows_without_mutation() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES (
'evt_future_v3', 3, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
'{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
'payload-hash', NULL, 'event-hash'
);",
[],
)
.expect("insert future v3 event row");
let events_before = table_count(&pool, "events");
let traces_before = table_count(&pool, "traces");
let migrations_before = table_count(&pool, "_migrations");
let out = run_in(tmp.path(), &["memory", "search", "anything"]);
assert_exit(&out, 4);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("cortex memory search: schema_version.events.matches_code"),
"stderr: {stderr}"
);
assert!(
stderr.contains("has schema_version 3; expected 2"),
"stderr: {stderr}"
);
let pool = Connection::open(&db).expect("reopen initialized db");
assert_eq!(table_count(&pool, "events"), events_before);
assert_eq!(table_count(&pool, "traces"), traces_before);
assert_eq!(table_count(&pool, "_migrations"), migrations_before);
}
#[test]
fn doctor_strict_exits_schema_mismatch_for_missing_required_column() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
pool.execute("ALTER TABLE events DROP COLUMN payload_hash;", [])
.expect("drop payload_hash");
let out = run_in(tmp.path(), &["doctor", "--strict"]);
assert_exit(&out, 4);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("schema_shape.events.payload_hash.exists"),
"stderr: {stderr}"
);
assert!(
stderr.contains("table events is missing required column payload_hash"),
"stderr: {stderr}"
);
}
#[test]
fn doctor_strict_rejects_duplicate_schema_v2_boundary_rows() {
let tmp = tempfile::tempdir().unwrap();
let (_db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let mut log = JsonlLog::open(&event_log_path).expect("open event log");
let v1_head = log.head().expect("v1 head").to_string();
let first_boundary = log
.append_schema_migration_v1_to_v2(
SchemaMigrationV1ToV2Payload::new(v1_head, "script-digest", None, "fixture-digest"),
&schema_migration_v1_to_v2_policy_decision_test_allow(),
)
.expect("append first boundary");
log.append_schema_migration_v1_to_v2(
SchemaMigrationV1ToV2Payload::new(first_boundary, "script-digest", None, "fixture-digest"),
&schema_migration_v1_to_v2_policy_decision_test_allow(),
)
.expect("append duplicate boundary");
let out = run_in(tmp.path(), &["doctor", "--strict"]);
assert_exit(&out, 4);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("schema_migration.v1_to_v2.boundary.duplicate"),
"stderr: {stderr}"
);
assert!(stderr.contains("Duplicate"), "stderr: {stderr}");
}
#[test]
fn store_open_commands_refuse_unknown_future_migration() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
pool.execute("INSERT INTO _migrations (name) VALUES ('999_future');", [])
.expect("insert unknown migration");
let out = run_in(tmp.path(), &["memory", "search", "anything"]);
assert_exit(&out, 4);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("cortex memory search: schema_migration.known_to_code"),
"stderr: {stderr}"
);
assert!(
stderr.contains("migration 999_future is unknown to this binary"),
"stderr: {stderr}"
);
}
#[test]
fn migrate_v2_refuses_pre_v2_manifest_when_live_store_has_post_v2_rows() {
let tmp = tempfile::tempdir().unwrap();
let db = init(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash, source_attestation_json
) VALUES (
'evt_fresh_v2_misuse', 2, '2026-05-04T12:00:00Z',
'2026-05-04T12:00:01Z', '{\"kind\":\"test\"}', 'tool.result',
NULL, NULL, '[]', '{\"step\":1}', 'payload-hash', NULL,
'event-hash-fresh-v2-misuse', '{\"state\":\"missing\",\"value\":null}'
);",
[],
)
.expect("seed fresh-v2 events row outside the boundary");
let events_before = table_count(&pool, "events");
drop(pool);
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("migrate.v2.backup_manifest.pre_v2_kind_but_v2_rows_present"),
"stderr should surface the R1 invariant: {stderr}"
);
assert!(
stderr.contains("events_post_v2=1"),
"stderr should report the fresh-v2 events count: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(
table_count(&pool, "events"),
events_before,
"R1 guard must refuse before any boundary mutation"
);
}
#[test]
fn migrate_v2_r1_guard_exempts_existing_boundary_row() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let manifest = generate_backup_manifest(tmp.path());
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let first = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&first, 0);
let boundary_report = verify_schema_migration_v1_to_v2_boundary(&event_log_path, true)
.expect("boundary audit after first run");
assert!(boundary_report.ok());
let second = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
let second_stderr = String::from_utf8_lossy(&second.stderr);
assert!(
!second_stderr.contains("pre_v2_kind_but_v2_rows_present"),
"R1 guard must not fire on the boundary row itself: {second_stderr}"
);
assert!(
second_stderr.contains("boundary already exists"),
"second cutover must refuse via boundary_preflight: {second_stderr}"
);
}
#[test]
fn migrate_v2_cutover_tx_rolls_back_sqlite_on_post_mirror_failure() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let traces_before = table_count(&pool, "traces");
let episodes_before = table_count(&pool, "episodes");
let memories_before = table_count(&pool, "memories");
let context_packs_before = table_count(&pool, "context_packs");
let migrations_before = table_count(&pool, "_migrations");
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
drop(pool);
let manifest = tmp.path().join("backup-manifest.json");
std::fs::write(tmp.path().join("state.sqlite"), "sqlite backup placeholder")
.expect("write sqlite backup artifact");
std::fs::write(tmp.path().join("events.jsonl"), "jsonl backup placeholder")
.expect("write jsonl backup artifact");
std::fs::write(
&manifest,
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z","table_row_counts":{"events":999,"traces":0,"episodes":0,"memories":0}}"#,
)
.expect("write tampered backup manifest");
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
let code = out.status.code().expect("process exited via signal");
let stderr = String::from_utf8_lossy(&out.stderr);
assert_ne!(
code, 0,
"cutover with wrong baseline must not succeed; stderr: {stderr}"
);
assert!(
stderr.contains("schema_v2_post_migrate.row_count"),
"stderr should surface a count-mismatch invariant: {stderr}"
);
assert!(
stderr.contains("migrate.v2.cutover_tx.rolled_back"),
"stderr should surface the SQLite-side rollback invariant: {stderr}"
);
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(
table_count(&pool, "events"),
events_before,
"events count must be unchanged: boundary mirror INSERT rolled back"
);
assert_eq!(table_count(&pool, "traces"), traces_before);
assert_eq!(table_count(&pool, "episodes"), episodes_before);
assert_eq!(table_count(&pool, "memories"), memories_before);
assert_eq!(table_count(&pool, "context_packs"), context_packs_before);
assert_eq!(table_count(&pool, "_migrations"), migrations_before);
let durable_boundary_hash = JsonlLog::open(&event_log_path)
.expect("reopen event log for boundary hash")
.head()
.expect("durable boundary head")
.to_string();
let boundary_row_in_sqlite: u64 = pool
.query_row(
"SELECT COUNT(*) FROM events WHERE event_hash = ?1;",
[&durable_boundary_hash],
|row| row.get(0),
)
.expect("count boundary row in SQLite");
assert_eq!(
boundary_row_in_sqlite, 0,
"boundary mirror INSERT must roll back: SQLite still carries the row at event_hash={durable_boundary_hash}"
);
let post_head = JsonlLog::open(&event_log_path)
.expect("reopen event log after rollback")
.head()
.expect("post-attempt head")
.to_string();
assert_ne!(
post_head, previous_v1_head,
"JSONL boundary append must remain durable across SQLite-side rollback"
);
let boundary_report =
verify_schema_migration_v1_to_v2_boundary(&event_log_path, true).expect("boundary audit");
assert!(boundary_report.ok(), "boundary report: {boundary_report:?}");
assert_eq!(boundary_report.boundary_rows.len(), 1);
let second = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
let second_stderr = String::from_utf8_lossy(&second.stderr);
assert!(
second_stderr.contains("boundary already exists"),
"second cutover must refuse on durable JSONL boundary: {second_stderr}"
);
}
#[test]
fn migrate_v2_resume_mirror_recovers_after_cutover_tx_rollback() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
drop(pool);
let manifest = tmp.path().join("backup-manifest.json");
std::fs::write(tmp.path().join("state.sqlite"), "sqlite backup placeholder")
.expect("write sqlite backup artifact");
std::fs::write(tmp.path().join("events.jsonl"), "jsonl backup placeholder")
.expect("write jsonl backup artifact");
std::fs::write(
&manifest,
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z","table_row_counts":{"events":999,"traces":0,"episodes":0,"memories":0}}"#,
)
.expect("write tampered backup manifest");
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
let code = out.status.code().expect("first run exit code");
let stderr_first = String::from_utf8_lossy(&out.stderr);
assert_ne!(code, 0, "tampered cutover must not succeed: {stderr_first}");
assert!(
stderr_first.contains("migrate.v2.cutover_tx.rolled_back"),
"first run must trigger the cutover-tx rollback: {stderr_first}"
);
let durable_boundary_hash = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("durable boundary head")
.to_string();
let pool = Connection::open(&db).expect("reopen db");
let boundary_row_in_sqlite_pre_resume: u64 = pool
.query_row(
"SELECT COUNT(*) FROM events WHERE event_hash = ?1;",
[&durable_boundary_hash],
|row| row.get(0),
)
.expect("count boundary row in SQLite before resume");
assert_eq!(
boundary_row_in_sqlite_pre_resume, 0,
"preconditions for resume: boundary must be in JSONL but not SQLite"
);
assert_eq!(table_count(&pool, "events"), events_before);
drop(pool);
let resume = run_in(tmp.path(), &["migrate", "v2", "--resume-mirror"]);
assert_exit(&resume, 0);
let resume_stderr = String::from_utf8_lossy(&resume.stderr);
assert!(
resume_stderr.contains("migrate.v2.resume_mirror.completed"),
"resume must surface the completion invariant: {resume_stderr}"
);
assert!(
resume_stderr.contains(&durable_boundary_hash),
"resume must report the boundary event hash it mirrored: {resume_stderr}"
);
let pool = Connection::open(&db).expect("reopen db after resume");
assert_eq!(
table_count(&pool, "events"),
events_before + cortex_store::verify::SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA,
"boundary mirror INSERT must be durable after --resume-mirror commits"
);
let boundary_row_in_sqlite_post_resume: u64 = pool
.query_row(
"SELECT COUNT(*) FROM events WHERE event_hash = ?1;",
[&durable_boundary_hash],
|row| row.get(0),
)
.expect("count boundary row in SQLite after resume");
assert_eq!(boundary_row_in_sqlite_post_resume, 1);
let boundary_attestation_set: u64 = pool
.query_row(
"SELECT COUNT(*) FROM events \
WHERE event_hash = ?1 AND source_attestation_json IS NOT NULL;",
[&durable_boundary_hash],
|row| row.get(0),
)
.expect("verify boundary source_attestation_json populated");
assert_eq!(
boundary_attestation_set, 1,
"legacy-attestation backfill must stamp the just-mirrored boundary row"
);
let doctor = run_in(tmp.path(), &["doctor", "--strict"]);
assert_exit(&doctor, 0);
}
#[test]
fn migrate_v2_resume_mirror_recovers_from_rolled_back_cutover_with_missing_schema_skeleton() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let manifest = tmp.path().join("backup-manifest.json");
std::fs::write(tmp.path().join("state.sqlite"), "sqlite backup placeholder")
.expect("write sqlite backup artifact");
std::fs::write(tmp.path().join("events.jsonl"), "jsonl backup placeholder")
.expect("write jsonl backup artifact");
std::fs::write(
&manifest,
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z","table_row_counts":{"events":999,"traces":0,"episodes":0,"memories":0}}"#,
)
.expect("write tampered backup manifest");
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
let code = out.status.code().expect("first run exit code");
let stderr_first = String::from_utf8_lossy(&out.stderr);
assert_ne!(code, 0, "tampered cutover must not succeed: {stderr_first}");
assert!(
stderr_first.contains("migrate.v2.cutover_tx.rolled_back"),
"first run must trigger the cutover-tx rollback: {stderr_first}"
);
let durable_boundary_hash = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("durable boundary head")
.to_string();
let pool = Connection::open(&db).expect("reopen db after cutover-tx rollback");
pool.execute_batch(
"DROP TABLE IF EXISTS memory_session_uses;
DROP TABLE IF EXISTS outcome_memory_relations;",
)
.expect("drop v2 side tables to model partial DDL rollback");
drop(pool);
let pool = Connection::open(&db).expect("reopen db pre-resume");
assert!(
!has_table(&pool, "memory_session_uses"),
"rolled-back DDL must not carry memory_session_uses"
);
assert!(
!has_table(&pool, "outcome_memory_relations"),
"rolled-back DDL must not carry outcome_memory_relations"
);
let boundary_row_in_sqlite_pre_resume: u64 = pool
.query_row(
"SELECT COUNT(*) FROM events WHERE event_hash = ?1;",
[&durable_boundary_hash],
|row| row.get(0),
)
.expect("count boundary row in SQLite pre-resume");
assert_eq!(
boundary_row_in_sqlite_pre_resume, 0,
"preconditions: boundary must be in JSONL but not SQLite"
);
drop(pool);
let resume = run_in(tmp.path(), &["migrate", "v2", "--resume-mirror"]);
assert_exit(&resume, 4);
let resume_stderr = String::from_utf8_lossy(&resume.stderr);
assert!(
resume_stderr.contains("schema_shape.memory_session_uses.exists")
|| resume_stderr.contains("schema_shape.outcome_memory_relations.exists"),
"verify_schema_version must surface a typed missing-table diagnostic: {resume_stderr}"
);
assert!(
resume_stderr.contains("resume refused"),
"resume must report a typed refusal rather than a raw SQLite error: {resume_stderr}"
);
let pool = Connection::open(&db).expect("reopen db after refused resume");
assert!(!has_table(&pool, "memory_session_uses"));
let boundary_row_in_sqlite_post_resume: u64 = pool
.query_row(
"SELECT COUNT(*) FROM events WHERE event_hash = ?1;",
[&durable_boundary_hash],
|row| row.get(0),
)
.expect("count boundary row in SQLite post-resume");
assert_eq!(
boundary_row_in_sqlite_post_resume, 0,
"refused resume must not mutate SQLite"
);
}
#[test]
fn migrate_v2_resume_mirror_schema_skeleton_replay_is_noop_when_schema_present() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
drop(pool);
let manifest = tmp.path().join("backup-manifest.json");
std::fs::write(tmp.path().join("state.sqlite"), "sqlite backup placeholder")
.expect("write sqlite backup artifact");
std::fs::write(tmp.path().join("events.jsonl"), "jsonl backup placeholder")
.expect("write jsonl backup artifact");
std::fs::write(
&manifest,
r#"{"kind":"cortex_pre_v2_backup","schema_version":1,"sqlite_store":"state.sqlite","jsonl_mirror":"events.jsonl","tool_version":"cortex-test","backup_timestamp":"2026-05-04T22:00:00Z","table_row_counts":{"events":999,"traces":0,"episodes":0,"memories":0}}"#,
)
.expect("write tampered backup manifest");
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
let code = out.status.code().expect("first run exit code");
let stderr_first = String::from_utf8_lossy(&out.stderr);
assert_ne!(code, 0, "tampered cutover must not succeed: {stderr_first}");
assert!(
stderr_first.contains("migrate.v2.cutover_tx.rolled_back"),
"first run must trigger the cutover-tx rollback: {stderr_first}"
);
let pool = Connection::open(&db).expect("reopen db");
assert!(
has_column(&pool, "events", "source_attestation_json"),
"schema-already-present test requires the column on disk"
);
drop(pool);
let durable_boundary_hash = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("durable boundary head")
.to_string();
let resume = run_in(tmp.path(), &["migrate", "v2", "--resume-mirror"]);
assert_exit(&resume, 0);
let resume_stderr = String::from_utf8_lossy(&resume.stderr);
assert!(
resume_stderr.contains("migrate.v2.resume_mirror.completed"),
"schema-present resume must surface completion: {resume_stderr}"
);
assert!(
!resume_stderr.contains("migrate.v2.resume_mirror.schema_skeleton_replayed"),
"schema-present resume must not surface the schema-skeleton replay token: {resume_stderr}"
);
let pool = Connection::open(&db).expect("reopen db after schema-present resume");
let boundary_row_in_sqlite_post_resume: u64 = pool
.query_row(
"SELECT COUNT(*) FROM events WHERE event_hash = ?1;",
[&durable_boundary_hash],
|row| row.get(0),
)
.expect("count boundary row in SQLite post-resume");
assert_eq!(boundary_row_in_sqlite_post_resume, 1);
}
#[test]
fn migrate_v2_resume_mirror_is_idempotent_when_boundary_already_mirrored() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let manifest = generate_backup_manifest(tmp.path());
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
let cutover = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&cutover, 0);
let boundary_hash = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("boundary head")
.to_string();
let pool = Connection::open(&db).expect("reopen db after cutover");
let events_after_cutover = table_count(&pool, "events");
drop(pool);
let resume = run_in(tmp.path(), &["migrate", "v2", "--resume-mirror"]);
assert_exit(&resume, 0);
let resume_stderr = String::from_utf8_lossy(&resume.stderr);
assert!(
resume_stderr.contains("migrate.v2.resume_mirror.boundary_already_mirrored"),
"idempotent resume must surface the already-mirrored invariant: {resume_stderr}"
);
assert!(
resume_stderr.contains(&boundary_hash),
"idempotent resume must name the boundary hash already present: {resume_stderr}"
);
assert!(
!resume_stderr.contains("migrate.v2.resume_mirror.completed"),
"idempotent resume must NOT emit the completion invariant: {resume_stderr}"
);
let pool = Connection::open(&db).expect("reopen db after idempotent resume");
assert_eq!(table_count(&pool, "events"), events_after_cutover);
let post_resume_head = JsonlLog::open(&event_log_path)
.expect("reopen event log after idempotent resume")
.head()
.expect("boundary head still present")
.to_string();
assert_eq!(post_resume_head, boundary_hash);
}
#[test]
fn migrate_v2_resume_mirror_refuses_when_jsonl_has_no_boundary() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let migrations_before = table_count(&pool, "_migrations");
drop(pool);
let head_before = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
let resume = run_in(tmp.path(), &["migrate", "v2", "--resume-mirror"]);
assert_exit(&resume, 7);
let resume_stderr = String::from_utf8_lossy(&resume.stderr);
assert!(
resume_stderr.contains("migrate.v2.resume_mirror.no_jsonl_boundary"),
"wrong-surface resume must surface the no-boundary invariant: {resume_stderr}"
);
assert!(
resume_stderr.contains("no state was changed"),
"wrong-surface resume must report no-state-changed: {resume_stderr}"
);
let pool = Connection::open(&db).expect("reopen db after refused resume");
assert_eq!(table_count(&pool, "events"), events_before);
assert_eq!(table_count(&pool, "_migrations"), migrations_before);
let head_after = JsonlLog::open(&event_log_path)
.expect("reopen event log after refused resume")
.head()
.expect("v1 head unchanged")
.to_string();
assert_eq!(head_after, head_before);
}
#[test]
fn migrate_v2_resume_mirror_is_mutually_exclusive_with_cutover_flags() {
let tmp = tempfile::tempdir().unwrap();
init(tmp.path());
let out = run_in(
tmp.path(),
&["migrate", "v2", "--resume-mirror", "--dry-run"],
);
assert_exit(&out, 2);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("--resume-mirror is a partial-mutation recovery surface"),
"mutual-exclusion refusal must name the recovery-surface contract: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--resume-mirror",
"--backup-manifest",
manifest.to_str().unwrap(),
],
);
assert_exit(&out, 2);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("--resume-mirror is a partial-mutation recovery surface"),
"stderr: {stderr}"
);
let out = run_in(
tmp.path(),
&["migrate", "v2", "--resume-mirror", "--unattended-migrate"],
);
assert_exit(&out, 2);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("--resume-mirror is a partial-mutation recovery surface"),
"stderr: {stderr}"
);
}
#[test]
fn migrate_v2_refuses_cutover_when_operator_key_revoked_in_timeline() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
assert_eq!(cortex_core::SCHEMA_VERSION, 2);
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
seed_migrate_operator_authority(&db);
revoke_migrate_operator_authority(&db, Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 2).unwrap());
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("migrate.v2.operator_temporal_authority.revalidation_failed"),
"stderr must carry the stable invariant: {stderr}"
);
assert!(stderr.contains("no state was changed"), "stderr: {stderr}");
let current_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("v1 head still present")
.to_string();
assert_eq!(current_head, previous_v1_head);
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
}
#[test]
fn migrate_v2_refuses_cutover_when_operator_key_absent_from_timeline() {
let tmp = tempfile::tempdir().unwrap();
let (db, event_log_path) = init_layout(tmp.path());
ingest_minimal_session(tmp.path());
assert_eq!(cortex_core::SCHEMA_VERSION, 2);
let previous_v1_head = JsonlLog::open(&event_log_path)
.expect("open event log")
.head()
.expect("v1 head")
.to_string();
let pool = Connection::open(&db).expect("open initialized db");
apply_pending(&pool).expect("apply migrations");
let events_before = table_count(&pool, "events");
let manifest = tmp.path().join("backup-manifest.json");
write_valid_backup_manifest(&manifest);
let (head, script_digest, fixture_digest) = dry_run_boundary_preflight(tmp.path());
let attestation = tmp.path().join("operator-attestation.json");
write_valid_operator_attestation(&attestation, &head, &script_digest, &fixture_digest);
let out = run_in(
tmp.path(),
&[
"migrate",
"v2",
"--backup-manifest",
manifest.to_str().unwrap(),
"--operator-attestation",
attestation.to_str().unwrap(),
],
);
assert_exit(&out, 7);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("migrate.v2.operator_temporal_authority.revalidation_failed")
&& stderr.contains("key_unknown"),
"stderr must carry the stable invariant + key_unknown reason: {stderr}"
);
let current_head = JsonlLog::open(&event_log_path)
.expect("reopen event log")
.head()
.expect("v1 head still present")
.to_string();
assert_eq!(current_head, previous_v1_head);
let pool = Connection::open(&db).expect("reopen db");
assert_eq!(table_count(&pool, "events"), events_before);
}