crtx 0.1.0

CLI for the Cortex supervisory memory substrate.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
//! `cortex run-ledger-drill` — deterministic run-ledger recovery hook.
//!
//! This command is a development drill surface for ADR 0035's crash-window
//! boundary. In deterministic mode it appends a JSONL row, leaves SQLite
//! behind, replays recovery, and writes a report. In external-supervisor mode
//! it writes a kill-ready marker after JSONL acknowledgement, then expects the
//! shell harness to send SIGKILL and rerun the command with `--recover-only`.
//! It is not a signed ledger path.

use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;

use chrono::Utc;
use clap::{Args, ValueEnum};
use cortex_core::{
    compose_policy_outcomes, Event, EventId, EventSource, EventType, PolicyContribution,
    PolicyDecision, PolicyOutcome, SCHEMA_VERSION,
};
use cortex_ledger::{
    JsonlLog, APPEND_ATTESTATION_REQUIRED_RULE_ID, APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
    APPEND_RUNTIME_MODE_RULE_ID,
};
use cortex_store::{migrate::apply_pending, mirror, Pool};
use serde_json::json;

use crate::exit::Exit;

/// `cortex run-ledger-drill` arguments.
#[derive(Debug, Args)]
pub struct RunLedgerDrillArgs {
    /// SQLite database path used only for this drill iteration.
    #[arg(long)]
    pub db: PathBuf,

    /// JSONL event-log path used only for this drill iteration.
    #[arg(long)]
    pub event_log: PathBuf,

    /// 1-based drill iteration identifier.
    #[arg(long)]
    pub iteration: u64,

    /// Crash window to exercise.
    #[arg(long, value_enum)]
    pub kill_window: DrillKillWindow,

    /// Report JSON path to write.
    #[arg(long)]
    pub report_json: PathBuf,

    /// Marker file written after JSONL append acknowledgement and before
    /// SQLite recovery. External supervisors wait for this file, then send
    /// SIGKILL to this process.
    #[arg(long)]
    pub kill_ready_file: Option<PathBuf>,

    /// Milliseconds to wait after writing --kill-ready-file. If the
    /// process is not killed during this wait, the command fails closed
    /// instead of producing external SIGKILL evidence.
    #[arg(long, default_value_t = 30_000)]
    pub wait_for_sigkill_ms: u64,

    /// Recover and validate an existing JSONL/SQLite pair after an
    /// externally killed drill process.
    #[arg(long)]
    pub recover_only: bool,

    /// Mark the report as externally killed. Valid only with
    /// --recover-only, after a supervisor has observed SIGKILL.
    #[arg(long)]
    pub external_sigkill_proof: bool,
}

/// Supported deterministic drill windows.
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum DrillKillWindow {
    /// JSONL fsync/ack happened; SQLite mirror commit did not.
    JsonlAckBeforeSqliteCommit,
}

impl DrillKillWindow {
    fn as_str(self) -> &'static str {
        match self {
            Self::JsonlAckBeforeSqliteCommit => "jsonl-ack-before-sqlite-commit",
        }
    }
}

/// Run the deterministic run-ledger recovery drill.
pub fn run(args: RunLedgerDrillArgs) -> Exit {
    if args.iteration == 0 {
        eprintln!("cortex run-ledger-drill: --iteration must be positive");
        return Exit::Usage;
    }

    match run_inner(args) {
        Ok(()) => Exit::Ok,
        Err((exit, message)) => {
            eprintln!("cortex run-ledger-drill: {message}");
            exit
        }
    }
}

fn run_inner(args: RunLedgerDrillArgs) -> Result<(), (Exit, String)> {
    create_parent_dir(&args.db)?;
    create_parent_dir(&args.event_log)?;
    create_parent_dir(&args.report_json)?;
    if let Some(path) = &args.kill_ready_file {
        create_parent_dir(path)?;
    }
    if args.external_sigkill_proof && !args.recover_only {
        return Err((
            Exit::Usage,
            "--external-sigkill-proof is valid only with --recover-only".to_string(),
        ));
    }
    if args.recover_only && args.kill_ready_file.is_some() {
        return Err((
            Exit::Usage,
            "--kill-ready-file is not valid with --recover-only".to_string(),
        ));
    }

    let mut pool = Pool::open(&args.db).map_err(|err| {
        (
            Exit::PreconditionUnmet,
            format!("failed to open database: {err}"),
        )
    })?;
    apply_pending(&pool).map_err(|err| {
        (
            Exit::Internal,
            format!("failed to apply migrations to drill database: {err}"),
        )
    })?;

    let mut log = JsonlLog::open(&args.event_log).map_err(|err| {
        (
            Exit::PreconditionUnmet,
            format!(
                "failed to open event log {}: {err}",
                args.event_log.display()
            ),
        )
    })?;

    let (chain_head, acknowledged_count) = if args.recover_only {
        let head = log.head().ok_or_else(|| {
            (
                Exit::PreconditionUnmet,
                "cannot recover drill: event log is empty".to_string(),
            )
        })?;
        (head.to_string(), log.len())
    } else {
        let event = drill_event(args.iteration);
        let chain_head = log.append(event, &drill_append_policy()).map_err(|err| {
            (
                Exit::Internal,
                format!("failed to append drill event to JSONL: {err}"),
            )
        })?;
        let acknowledged_count = log.len();
        if let Some(kill_ready_file) = &args.kill_ready_file {
            let marker = json!({
                "pid": std::process::id(),
                "iteration": args.iteration,
                "killed_phase": args.kill_window.as_str(),
                "acknowledged_count": acknowledged_count,
                "chain_head": chain_head
            });
            let marker_json = serde_json::to_string_pretty(&marker).map_err(|err| {
                (
                    Exit::Internal,
                    format!("failed to serialize kill-ready marker: {err}"),
                )
            })?;
            fs::write(kill_ready_file, format!("{marker_json}\n")).map_err(|err| {
                (
                    Exit::Internal,
                    format!(
                        "failed to write kill-ready marker {}: {err}",
                        kill_ready_file.display()
                    ),
                )
            })?;
            thread::sleep(Duration::from_millis(args.wait_for_sigkill_ms));
            return Err((
                Exit::PreconditionUnmet,
                "external SIGKILL was not observed before wait timeout".to_string(),
            ));
        }
        (chain_head, acknowledged_count)
    };

    let recovery = mirror::replay_jsonl_into_sqlite(&mut pool, &args.event_log).map_err(|err| {
        (
            Exit::Internal,
            format!("failed to replay acknowledged JSONL rows into SQLite: {err}"),
        )
    })?;
    let recovered_count = recovery.parity.sqlite_event_count as u64;
    let event_set_parity = recovery.parity.is_consistent();

    let partial_tail_rejected = partial_tail_rejected(&mut pool, &args.event_log, args.iteration)?;
    let divergent_duplicate_rejected =
        divergent_duplicate_rejected(&mut pool, &args.event_log, args.iteration)?;

    let report = json!({
        "iteration": args.iteration,
        "killed_phase": args.kill_window.as_str(),
        "acknowledged_count": acknowledged_count,
        "recovered_count": recovered_count,
        "chain_head": chain_head,
        "event_set_parity": event_set_parity,
        "partial_tail_rejected": partial_tail_rejected,
        "divergent_duplicate_rejected": divergent_duplicate_rejected,
        "ledger_authority": "development",
        "signed_ledger_authority": false,
        "external_sigkill_proof": args.external_sigkill_proof
    });

    let serialized = serde_json::to_string_pretty(&report).map_err(|err| {
        (
            Exit::Internal,
            format!("failed to serialize drill report: {err}"),
        )
    })?;
    fs::write(&args.report_json, format!("{serialized}\n")).map_err(|err| {
        (
            Exit::Internal,
            format!(
                "failed to write report {}: {err}",
                args.report_json.display()
            ),
        )
    })?;
    Ok(())
}

fn create_parent_dir(path: &Path) -> Result<(), (Exit, String)> {
    let Some(parent) = path.parent() else {
        return Ok(());
    };
    if parent.as_os_str().is_empty() {
        return Ok(());
    }
    fs::create_dir_all(parent).map_err(|err| {
        (
            Exit::PreconditionUnmet,
            format!(
                "failed to create parent directory {}: {err}",
                parent.display()
            ),
        )
    })
}

fn drill_event(iteration: u64) -> Event {
    let now = Utc::now();
    Event {
        id: EventId::new(),
        schema_version: SCHEMA_VERSION,
        observed_at: now,
        recorded_at: now,
        source: EventSource::Runtime,
        event_type: EventType::SystemNote,
        trace_id: None,
        session_id: Some(format!("run-ledger-drill-{iteration}")),
        domain_tags: vec![
            "run-ledger-drill".to_string(),
            "development-ledger".to_string(),
        ],
        payload: json!({
            "kind": "run-ledger-drill",
            "iteration": iteration,
            "kill_window": "jsonl-ack-before-sqlite-commit",
            "ledger_authority": "development",
            "signed_ledger_authority": false
        }),
        payload_hash: String::new(),
        prev_event_hash: None,
        event_hash: String::new(),
    }
}

fn partial_tail_rejected(
    pool: &mut Pool,
    event_log_path: &Path,
    iteration: u64,
) -> Result<bool, (Exit, String)> {
    let partial_path = event_log_path.with_extension(format!("partial-{iteration}.jsonl"));
    fs::copy(event_log_path, &partial_path).map_err(|err| {
        (
            Exit::Internal,
            format!("failed to copy JSONL for partial-tail check: {err}"),
        )
    })?;
    let mut file = OpenOptions::new()
        .append(true)
        .open(&partial_path)
        .map_err(|err| {
            (
                Exit::Internal,
                format!("failed to open partial-tail JSONL copy: {err}"),
            )
        })?;
    file.write_all(b"{\"partial_tail\":").map_err(|err| {
        (
            Exit::Internal,
            format!("failed to append partial-tail bytes: {err}"),
        )
    })?;

    Ok(mirror::replay_jsonl_into_sqlite(pool, &partial_path).is_err())
}

fn divergent_duplicate_rejected(
    pool: &mut Pool,
    event_log_path: &Path,
    iteration: u64,
) -> Result<bool, (Exit, String)> {
    let duplicate_path = event_log_path.with_extension(format!("duplicate-{iteration}.jsonl"));
    fs::copy(event_log_path, &duplicate_path).map_err(|err| {
        (
            Exit::Internal,
            format!("failed to copy JSONL for duplicate check: {err}"),
        )
    })?;

    let existing = {
        let log = JsonlLog::open(event_log_path).map_err(|err| {
            (
                Exit::Internal,
                format!("failed to reopen event log for duplicate check: {err}"),
            )
        })?;
        let mut iter = log.iter().map_err(|err| {
            (
                Exit::Internal,
                format!("failed to iterate event log for duplicate check: {err}"),
            )
        })?;
        let first = iter.next().ok_or_else(|| {
            (
                Exit::Internal,
                "event log was empty during duplicate check".to_string(),
            )
        })?;
        first.map_err(|err| {
            (
                Exit::Internal,
                format!("failed to decode event log during duplicate check: {err}"),
            )
        })?
    };

    let mut divergent = drill_event(iteration);
    divergent.id = existing.id;
    divergent.payload = json!({
        "kind": "run-ledger-drill-divergent-duplicate",
        "iteration": iteration,
        "ledger_authority": "development",
        "signed_ledger_authority": false
    });

    let mut duplicate_log = JsonlLog::open(&duplicate_path).map_err(|err| {
        (
            Exit::Internal,
            format!("failed to open duplicate JSONL copy: {err}"),
        )
    })?;
    duplicate_log
        .append(divergent, &drill_append_policy())
        .map_err(|err| {
            (
                Exit::Internal,
                format!("failed to append divergent duplicate JSONL row: {err}"),
            )
        })?;

    Ok(mirror::replay_jsonl_into_sqlite(pool, &duplicate_path).is_err())
}

/// Build the ADR 0026 policy decision for unsigned drill appends.
///
/// The drill harness writes its own `EventSource::Runtime` rows (see
/// [`drill_event`]) into a private SQLite + JSONL pair. It is not a
/// signed ledger; the runtime-mode contributor emits `Warn` to mark the
/// row as ADR 0037 §2 `DevOnly`.
fn drill_append_policy() -> PolicyDecision {
    compose_policy_outcomes(
        vec![
            PolicyContribution::new(
                APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
                PolicyOutcome::Allow,
                "run-ledger-drill: runtime drill event source tier gate satisfied",
            )
            .expect("static policy contribution is valid"),
            PolicyContribution::new(
                APPEND_ATTESTATION_REQUIRED_RULE_ID,
                PolicyOutcome::Allow,
                "run-ledger-drill: non-user runtime event does not require user attestation",
            )
            .expect("static policy contribution is valid"),
            PolicyContribution::new(
                APPEND_RUNTIME_MODE_RULE_ID,
                PolicyOutcome::Warn,
                "run-ledger-drill: deterministic drill ledger (ADR 0037 §2 DevOnly)",
            )
            .expect("static policy contribution is valid"),
        ],
        None,
    )
}