use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
use chrono::Utc;
use clap::{Args, ValueEnum};
use cortex_core::{
compose_policy_outcomes, Event, EventId, EventSource, EventType, PolicyContribution,
PolicyDecision, PolicyOutcome, SCHEMA_VERSION,
};
use cortex_ledger::{
JsonlLog, APPEND_ATTESTATION_REQUIRED_RULE_ID, APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
APPEND_RUNTIME_MODE_RULE_ID,
};
use cortex_store::{migrate::apply_pending, mirror, Pool};
use serde_json::json;
use crate::exit::Exit;
#[derive(Debug, Args)]
pub struct RunLedgerDrillArgs {
#[arg(long)]
pub db: PathBuf,
#[arg(long)]
pub event_log: PathBuf,
#[arg(long)]
pub iteration: u64,
#[arg(long, value_enum)]
pub kill_window: DrillKillWindow,
#[arg(long)]
pub report_json: PathBuf,
#[arg(long)]
pub kill_ready_file: Option<PathBuf>,
#[arg(long, default_value_t = 30_000)]
pub wait_for_sigkill_ms: u64,
#[arg(long)]
pub recover_only: bool,
#[arg(long)]
pub external_sigkill_proof: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum DrillKillWindow {
JsonlAckBeforeSqliteCommit,
}
impl DrillKillWindow {
fn as_str(self) -> &'static str {
match self {
Self::JsonlAckBeforeSqliteCommit => "jsonl-ack-before-sqlite-commit",
}
}
}
pub fn run(args: RunLedgerDrillArgs) -> Exit {
if args.iteration == 0 {
eprintln!("cortex run-ledger-drill: --iteration must be positive");
return Exit::Usage;
}
match run_inner(args) {
Ok(()) => Exit::Ok,
Err((exit, message)) => {
eprintln!("cortex run-ledger-drill: {message}");
exit
}
}
}
fn run_inner(args: RunLedgerDrillArgs) -> Result<(), (Exit, String)> {
create_parent_dir(&args.db)?;
create_parent_dir(&args.event_log)?;
create_parent_dir(&args.report_json)?;
if let Some(path) = &args.kill_ready_file {
create_parent_dir(path)?;
}
if args.external_sigkill_proof && !args.recover_only {
return Err((
Exit::Usage,
"--external-sigkill-proof is valid only with --recover-only".to_string(),
));
}
if args.recover_only && args.kill_ready_file.is_some() {
return Err((
Exit::Usage,
"--kill-ready-file is not valid with --recover-only".to_string(),
));
}
let mut pool = Pool::open(&args.db).map_err(|err| {
(
Exit::PreconditionUnmet,
format!("failed to open database: {err}"),
)
})?;
apply_pending(&pool).map_err(|err| {
(
Exit::Internal,
format!("failed to apply migrations to drill database: {err}"),
)
})?;
let mut log = JsonlLog::open(&args.event_log).map_err(|err| {
(
Exit::PreconditionUnmet,
format!(
"failed to open event log {}: {err}",
args.event_log.display()
),
)
})?;
let (chain_head, acknowledged_count) = if args.recover_only {
let head = log.head().ok_or_else(|| {
(
Exit::PreconditionUnmet,
"cannot recover drill: event log is empty".to_string(),
)
})?;
(head.to_string(), log.len())
} else {
let event = drill_event(args.iteration);
let chain_head = log.append(event, &drill_append_policy()).map_err(|err| {
(
Exit::Internal,
format!("failed to append drill event to JSONL: {err}"),
)
})?;
let acknowledged_count = log.len();
if let Some(kill_ready_file) = &args.kill_ready_file {
let marker = json!({
"pid": std::process::id(),
"iteration": args.iteration,
"killed_phase": args.kill_window.as_str(),
"acknowledged_count": acknowledged_count,
"chain_head": chain_head
});
let marker_json = serde_json::to_string_pretty(&marker).map_err(|err| {
(
Exit::Internal,
format!("failed to serialize kill-ready marker: {err}"),
)
})?;
fs::write(kill_ready_file, format!("{marker_json}\n")).map_err(|err| {
(
Exit::Internal,
format!(
"failed to write kill-ready marker {}: {err}",
kill_ready_file.display()
),
)
})?;
thread::sleep(Duration::from_millis(args.wait_for_sigkill_ms));
return Err((
Exit::PreconditionUnmet,
"external SIGKILL was not observed before wait timeout".to_string(),
));
}
(chain_head, acknowledged_count)
};
let recovery = mirror::replay_jsonl_into_sqlite(&mut pool, &args.event_log).map_err(|err| {
(
Exit::Internal,
format!("failed to replay acknowledged JSONL rows into SQLite: {err}"),
)
})?;
let recovered_count = recovery.parity.sqlite_event_count as u64;
let event_set_parity = recovery.parity.is_consistent();
let partial_tail_rejected = partial_tail_rejected(&mut pool, &args.event_log, args.iteration)?;
let divergent_duplicate_rejected =
divergent_duplicate_rejected(&mut pool, &args.event_log, args.iteration)?;
let report = json!({
"iteration": args.iteration,
"killed_phase": args.kill_window.as_str(),
"acknowledged_count": acknowledged_count,
"recovered_count": recovered_count,
"chain_head": chain_head,
"event_set_parity": event_set_parity,
"partial_tail_rejected": partial_tail_rejected,
"divergent_duplicate_rejected": divergent_duplicate_rejected,
"ledger_authority": "development",
"signed_ledger_authority": false,
"external_sigkill_proof": args.external_sigkill_proof
});
let serialized = serde_json::to_string_pretty(&report).map_err(|err| {
(
Exit::Internal,
format!("failed to serialize drill report: {err}"),
)
})?;
fs::write(&args.report_json, format!("{serialized}\n")).map_err(|err| {
(
Exit::Internal,
format!(
"failed to write report {}: {err}",
args.report_json.display()
),
)
})?;
Ok(())
}
fn create_parent_dir(path: &Path) -> Result<(), (Exit, String)> {
let Some(parent) = path.parent() else {
return Ok(());
};
if parent.as_os_str().is_empty() {
return Ok(());
}
fs::create_dir_all(parent).map_err(|err| {
(
Exit::PreconditionUnmet,
format!(
"failed to create parent directory {}: {err}",
parent.display()
),
)
})
}
fn drill_event(iteration: u64) -> Event {
let now = Utc::now();
Event {
id: EventId::new(),
schema_version: SCHEMA_VERSION,
observed_at: now,
recorded_at: now,
source: EventSource::Runtime,
event_type: EventType::SystemNote,
trace_id: None,
session_id: Some(format!("run-ledger-drill-{iteration}")),
domain_tags: vec![
"run-ledger-drill".to_string(),
"development-ledger".to_string(),
],
payload: json!({
"kind": "run-ledger-drill",
"iteration": iteration,
"kill_window": "jsonl-ack-before-sqlite-commit",
"ledger_authority": "development",
"signed_ledger_authority": false
}),
payload_hash: String::new(),
prev_event_hash: None,
event_hash: String::new(),
}
}
fn partial_tail_rejected(
pool: &mut Pool,
event_log_path: &Path,
iteration: u64,
) -> Result<bool, (Exit, String)> {
let partial_path = event_log_path.with_extension(format!("partial-{iteration}.jsonl"));
fs::copy(event_log_path, &partial_path).map_err(|err| {
(
Exit::Internal,
format!("failed to copy JSONL for partial-tail check: {err}"),
)
})?;
let mut file = OpenOptions::new()
.append(true)
.open(&partial_path)
.map_err(|err| {
(
Exit::Internal,
format!("failed to open partial-tail JSONL copy: {err}"),
)
})?;
file.write_all(b"{\"partial_tail\":").map_err(|err| {
(
Exit::Internal,
format!("failed to append partial-tail bytes: {err}"),
)
})?;
Ok(mirror::replay_jsonl_into_sqlite(pool, &partial_path).is_err())
}
fn divergent_duplicate_rejected(
pool: &mut Pool,
event_log_path: &Path,
iteration: u64,
) -> Result<bool, (Exit, String)> {
let duplicate_path = event_log_path.with_extension(format!("duplicate-{iteration}.jsonl"));
fs::copy(event_log_path, &duplicate_path).map_err(|err| {
(
Exit::Internal,
format!("failed to copy JSONL for duplicate check: {err}"),
)
})?;
let existing = {
let log = JsonlLog::open(event_log_path).map_err(|err| {
(
Exit::Internal,
format!("failed to reopen event log for duplicate check: {err}"),
)
})?;
let mut iter = log.iter().map_err(|err| {
(
Exit::Internal,
format!("failed to iterate event log for duplicate check: {err}"),
)
})?;
let first = iter.next().ok_or_else(|| {
(
Exit::Internal,
"event log was empty during duplicate check".to_string(),
)
})?;
first.map_err(|err| {
(
Exit::Internal,
format!("failed to decode event log during duplicate check: {err}"),
)
})?
};
let mut divergent = drill_event(iteration);
divergent.id = existing.id;
divergent.payload = json!({
"kind": "run-ledger-drill-divergent-duplicate",
"iteration": iteration,
"ledger_authority": "development",
"signed_ledger_authority": false
});
let mut duplicate_log = JsonlLog::open(&duplicate_path).map_err(|err| {
(
Exit::Internal,
format!("failed to open duplicate JSONL copy: {err}"),
)
})?;
duplicate_log
.append(divergent, &drill_append_policy())
.map_err(|err| {
(
Exit::Internal,
format!("failed to append divergent duplicate JSONL row: {err}"),
)
})?;
Ok(mirror::replay_jsonl_into_sqlite(pool, &duplicate_path).is_err())
}
fn drill_append_policy() -> PolicyDecision {
compose_policy_outcomes(
vec![
PolicyContribution::new(
APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
PolicyOutcome::Allow,
"run-ledger-drill: runtime drill event source tier gate satisfied",
)
.expect("static policy contribution is valid"),
PolicyContribution::new(
APPEND_ATTESTATION_REQUIRED_RULE_ID,
PolicyOutcome::Allow,
"run-ledger-drill: non-user runtime event does not require user attestation",
)
.expect("static policy contribution is valid"),
PolicyContribution::new(
APPEND_RUNTIME_MODE_RULE_ID,
PolicyOutcome::Warn,
"run-ledger-drill: deterministic drill ledger (ADR 0037 §2 DevOnly)",
)
.expect("static policy contribution is valid"),
],
None,
)
}