Skip to main content

cortex_ledger/
jsonl.rs

1//! Append-only JSONL event log: `JsonlLog`.
2//!
3//! Per BUILD_SPEC §7, the JSONL mirror is a peer to the SQLite store: it
4//! provides **inspectability** (one event per line, grep-able) and
5//! **disaster recovery** (the full ledger can be rebuilt from this file
6//! alone). Append-only, fsync-per-write, and hash-chained.
7//!
8//! ## Append protocol
9//!
10//! 1. The caller hands [`JsonlLog::append`] an [`Event`].
11//! 2. The log sets `prev_event_hash` to its current head (or `None` for
12//!    the first event).
13//! 3. The log re-seals the event via [`crate::hash::seal`], which
14//!    recomputes both `payload_hash` and `event_hash` under the canonical
15//!    framing.
16//! 4. The log writes one JSON line + `\n`, then **fsyncs the file**.
17//! 5. The new `event_hash` becomes the head.
18//!
19//! Re-sealing on append means callers don't need to pre-populate the
20//! hashes (they're an artifact of the chain, not the event's identity).
21//!
22//! ## Why fsync per write
23//!
24//! The JSONL log is the disaster-recovery source of truth. An event that
25//! is "appended" but lost on power loss leaves the SQL store ahead of the
26//! mirror — defeating the mirror's purpose. We pay the latency cost
27//! (~1ms-10ms per write on commodity SSDs) in exchange for crash safety.
28//! Higher-throughput modes (group commit, periodic fsync) are a future
29//! optimization gated on a config flag and an ADR.
30//!
31//! ## What this module does NOT do
32//!
33//! - Replicate the chain to remote storage (out of scope; future ADR).
34//! - Compact or rotate the log (out of scope for v0; planned for Phase 4).
35//! - Index the log (the SQL store is the queryable surface).
36
37use std::fs::{File, OpenOptions};
38use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
39use std::path::{Path, PathBuf};
40
41use chrono::Utc;
42use cortex_core::{
43    attestor::Attestor, canonical::canonical_signing_input, schema_migration_v1_to_v2_event, Event,
44    EventSource, PolicyDecision, PolicyOutcome, SchemaMigrationV1ToV2Payload,
45};
46use thiserror::Error;
47
48use crate::anchor_chain::{row_preimage, GENESIS_PREV_SIGNATURE};
49use crate::hash::seal;
50use crate::signed_row::{b64_decode, b64_encode, RowSignature, SignedRow};
51
52/// Required contributor rule id documenting that the event source tier gate
53/// composed into the policy decision for an unsigned JSONL append
54/// (ADR 0019 §3, ADR 0026 §2). The ledger refuses
55/// [`EventSource::User`] rows when the final outcome is
56/// [`PolicyOutcome::Reject`] or [`PolicyOutcome::Quarantine`].
57pub const APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID: &str = "ledger.append.event_source_tier_gate";
58/// Required contributor rule id documenting that attestation requirements
59/// (ADR 0010 §1, ADR 0014 §3, ADR 0026 §4) composed into the policy
60/// decision for an unsigned JSONL append. The contributor MUST vote
61/// `Allow` for `EventSource::User`; the ledger refuses authority-bearing
62/// rows that lack attestation.
63pub const APPEND_ATTESTATION_REQUIRED_RULE_ID: &str = "ledger.append.attestation_required";
64/// Required contributor rule id documenting that the runtime mode gate
65/// (ADR 0037 §2) composed into the policy decision for an unsigned JSONL
66/// append. Local-development unsigned ledgers register a `Warn`; trusted
67/// modes register `Reject` to prevent unsigned rows from being passed off
68/// as authority grade.
69pub const APPEND_RUNTIME_MODE_RULE_ID: &str = "ledger.append.runtime_mode";
70/// Required contributor rule id documenting that the signing key state at
71/// event time satisfies ADR 0023 current-use revalidation for a signed
72/// JSONL append. Historical-only or revoked keys vote `Reject` here.
73pub const APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID: &str =
74    "ledger.append_signed.key_state_current_use";
75/// Required contributor rule id documenting that the signing principal's
76/// trust tier satisfies the ADR 0019 minimum for a signed JSONL append.
77/// Principals below `Verified` vote `Reject` here.
78pub const APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID: &str =
79    "ledger.append_signed.trust_tier_minimum";
80/// Required contributor rule id documenting that the proposing principal sits
81/// in the `Operator` authority class (ADR 0019 §3) for a v1 -> v2 schema
82/// migration boundary append. Non-operator principals vote `Reject` here; the
83/// rule is documented as authority-class so a future ADR 0019 §7 scoped
84/// `tier_admin` capability can satisfy the same contributor.
85pub const SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID: &str =
86    "ledger.schema_migration.authority_class";
87/// Required contributor rule id documenting that a fresh operator attestation
88/// (ADR 0010 §1-§2) was supplied over the proposed v1 -> v2 boundary payload.
89/// Absent or invalid attestation votes `Reject`. ADR 0026 §4 forbids
90/// `BreakGlass` substituting for this contributor at the migration authority
91/// root.
92pub const SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID: &str =
93    "ledger.schema_migration.attestation_required";
94/// Required contributor rule id documenting that the signing key supplied for
95/// the operator attestation is in current use (ADR 0023 §2 / §5): the key
96/// state at attestation time is `Active`, not `Retired` or `Revoked`. A
97/// historical-only signing key votes `Reject` here; ADR 0026 §4 forbids
98/// `BreakGlass` substituting for this contributor.
99pub const SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID: &str =
100    "ledger.schema_migration.current_use_temporal_authority";
101
102/// Errors raised by [`JsonlLog`].
103#[derive(Debug, Error)]
104pub enum JsonlError {
105    /// File-system I/O failure.
106    #[error("io error on {path:?}: {source}")]
107    Io {
108        /// Path that was being read or written.
109        path: PathBuf,
110        /// Underlying I/O error.
111        #[source]
112        source: std::io::Error,
113    },
114    /// A JSON decode failed during iteration / verification.
115    #[error("json decode error at line {line} in {path:?}: {source}")]
116    Decode {
117        /// Path being read.
118        path: PathBuf,
119        /// 1-based line number.
120        line: usize,
121        /// Underlying serde error.
122        #[source]
123        source: serde_json::Error,
124    },
125    /// A JSON encode failed during append.
126    #[error("json encode error: {0}")]
127    Encode(#[source] serde_json::Error),
128    /// Chain verification failed (one or more rows broke the chain).
129    ///
130    /// Callers needing per-row diagnostics should use
131    /// [`crate::audit::verify_chain`] instead.
132    #[error("chain verification failed: {0}")]
133    ChainBroken(String),
134    /// Validation failed before append.
135    #[error("validation failed: {0}")]
136    Validation(String),
137}
138
139/// Append-only JSONL log handle.
140///
141/// Owns the file path and an in-memory copy of the current chain head.
142/// The file handle is opened per write to keep the type `Send + Sync` and
143/// to make file-rotation / handoff straightforward (if we ever need it).
144///
145/// Per Lane 3.D.6 / ADR 0010 §1-§2 the log also tracks the **previous
146/// row's Ed25519 signature bytes** so the next signed append can build
147/// the canonical preimage (which couples `S_n` to `S_{n-1}`).
148#[derive(Debug)]
149pub struct JsonlLog {
150    path: PathBuf,
151    /// Hex `event_hash` of the most-recently-appended event.
152    head: Option<String>,
153    /// Number of rows currently in the file.
154    len: u64,
155    /// 32-byte truncation of the most-recently-appended row's Ed25519
156    /// signature (or all-zero genesis sentinel for an empty log). Used
157    /// as `S_{n-1}` input to [`row_preimage`] on the next signed append.
158    /// We carry only the first 32 bytes because that is what the
159    /// canonical lineage hex captures (Ed25519 sigs are 64 bytes; the
160    /// canonical preimage hashes the lineage hex string, so the chosen
161    /// truncation is what binds rows together — see ADR 0010 §2 Option A
162    /// "prev_signature digest inside `P_n`").
163    last_sig_prefix: [u8; 32],
164}
165
166/// Logical identifier (`ledger_id`) for the JSONL chain. Used as part of
167/// every row's canonical preimage so signatures from one log cannot be
168/// replayed into another.
169///
170/// Exposed `pub(crate)` so [`crate::audit::verify_signed_chain`] derives
171/// the same id from the same path on the verify side.
172pub(crate) fn ledger_id_for(path: &Path) -> String {
173    path.file_stem()
174        .and_then(|s| s.to_str())
175        .unwrap_or("cortex-jsonl")
176        .to_string()
177}
178
179impl JsonlLog {
180    /// Open (or create) the log at `path`.
181    ///
182    /// On open we scan the existing file to recover the head hash and row
183    /// count. An empty file (or non-existent path) is a fresh log with no
184    /// head.
185    pub fn open(path: impl AsRef<Path>) -> Result<Self, JsonlError> {
186        let path = path.as_ref().to_path_buf();
187        // Touch the file so subsequent appends succeed without a TOCTOU.
188        OpenOptions::new()
189            .create(true)
190            .append(true)
191            .open(&path)
192            .map_err(|source| JsonlError::Io {
193                path: path.clone(),
194                source,
195            })?;
196
197        // Scan to recover head, len, and last signature prefix.
198        let f = File::open(&path).map_err(|source| JsonlError::Io {
199            path: path.clone(),
200            source,
201        })?;
202        let reader = BufReader::new(f);
203        let mut head: Option<String> = None;
204        let mut len: u64 = 0;
205        let mut last_sig_prefix: [u8; 32] = GENESIS_PREV_SIGNATURE;
206        for (i, line) in reader.lines().enumerate() {
207            let line = line.map_err(|source| JsonlError::Io {
208                path: path.clone(),
209                source,
210            })?;
211            if line.trim().is_empty() {
212                continue;
213            }
214            let row: SignedRow =
215                serde_json::from_str(&line).map_err(|source| JsonlError::Decode {
216                    path: path.clone(),
217                    line: i + 1,
218                    source,
219                })?;
220            head = Some(row.event.event_hash.clone());
221            len += 1;
222            if let Some(sig) = &row.signature {
223                if let Some(bytes) = b64_decode(&sig.bytes) {
224                    if bytes.len() >= 32 {
225                        last_sig_prefix.copy_from_slice(&bytes[..32]);
226                    } else {
227                        // Malformed-but-present signature on disk; leave
228                        // last_sig_prefix at the genesis sentinel and let
229                        // the audit verifier surface it as BadSignature.
230                    }
231                }
232            } else {
233                // Legacy unsigned row: do NOT advance last_sig_prefix.
234                // The audit verifier will flag this row's MissingSignature
235                // failure; subsequent signed appends will still chain off
236                // whatever prior signature was last seen (or genesis).
237            }
238        }
239
240        Ok(Self {
241            path,
242            head,
243            len,
244            last_sig_prefix,
245        })
246    }
247
248    /// Path of the underlying file.
249    #[must_use]
250    pub fn path(&self) -> &Path {
251        &self.path
252    }
253
254    /// Number of rows currently in the log.
255    #[must_use]
256    pub fn len(&self) -> u64 {
257        self.len
258    }
259
260    /// Whether the log has zero rows.
261    #[must_use]
262    pub fn is_empty(&self) -> bool {
263        self.len == 0
264    }
265
266    /// Current chain head (`event_hash` of the most recent row).
267    ///
268    /// `None` for an empty log.
269    #[must_use]
270    pub fn head(&self) -> Option<&str> {
271        self.head.as_deref()
272    }
273
274    /// Append one event to the log **without** an Ed25519 signature, gated
275    /// through the ADR 0026 enforcement lattice.
276    ///
277    /// This is the legacy / pre-3.D.6 path. Rows written via this method
278    /// **fail** [`crate::audit::verify_signed_chain`] with
279    /// [`crate::audit::FailureReason::MissingSignature`] (per ADR 0010 §1
280    /// "Single asymmetric trust domain": rows without a valid Ed25519
281    /// signature do not verify; there is no symmetric-MAC fallback). It
282    /// is retained for the local-development ledger path and for tests of
283    /// non-attestation features (hash chain, payload framing);
284    /// **trusted-history / authority paths MUST use
285    /// [`Self::append_signed`]**.
286    ///
287    /// `policy` is the composed [`PolicyDecision`] for this append and
288    /// MUST satisfy:
289    ///
290    /// 1. The composition includes contributors for
291    ///    [`APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID`],
292    ///    [`APPEND_ATTESTATION_REQUIRED_RULE_ID`], and
293    ///    [`APPEND_RUNTIME_MODE_RULE_ID`]. Callers that skipped composition
294    ///    are refused.
295    /// 2. When `event.source` is [`EventSource::User`], the attestation
296    ///    contributor MUST vote [`PolicyOutcome::Allow`] regardless of
297    ///    final outcome. ADR 0026 §4 forbids `BreakGlass` from substituting
298    ///    for the attestation requirement at the user-event authority
299    ///    boundary.
300    /// 3. A final outcome of [`PolicyOutcome::Reject`] or
301    ///    [`PolicyOutcome::Quarantine`] fails closed and writes nothing.
302    pub fn append(
303        &mut self,
304        mut event: Event,
305        policy: &PolicyDecision,
306    ) -> Result<String, JsonlError> {
307        require_append_contributors(policy)?;
308        require_event_source_attestation(policy, &event.source)?;
309        require_append_final_outcome(policy, "ledger.append")?;
310
311        event.prev_event_hash = self.head.clone();
312        seal(&mut event);
313        let row = SignedRow::unsigned(event);
314        let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
315        self.write_line(&line)?;
316        self.head = Some(row.event.event_hash.clone());
317        self.len += 1;
318        // Unsigned append does NOT advance last_sig_prefix.
319        Ok(row.event.event_hash)
320    }
321
322    /// Append one event with an Ed25519 signature over the canonical
323    /// attestation preimage (T-3.D.6, ADR 0010 §1-§2), gated through the
324    /// ADR 0026 enforcement lattice.
325    ///
326    /// The preimage couples this row's signature to `S_{n-1}` (the
327    /// previous row's signature, or the genesis sentinel for the first
328    /// row), the row's `event_id`, `payload_hash`, `session_id`,
329    /// `ledger_id`, and the signing key's `key_id`. See
330    /// [`row_preimage`] for the exact shape.
331    ///
332    /// `policy` is the composed [`PolicyDecision`] for this signed append
333    /// and MUST satisfy:
334    ///
335    /// 1. The composition includes contributors for
336    ///    [`APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID`] (ADR 0023
337    ///    current-use revalidation: historical-only or revoked signing
338    ///    keys vote `Reject`) and
339    ///    [`APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID`] (ADR 0019: the
340    ///    principal MUST sit at or above `Verified`). Callers that skipped
341    ///    composition are refused.
342    /// 2. The key-state contributor MUST vote [`PolicyOutcome::Allow`]
343    ///    regardless of final outcome. ADR 0026 §4 forbids `BreakGlass`
344    ///    from substituting for current-use revalidation at the signed
345    ///    ledger root.
346    /// 3. A final outcome of [`PolicyOutcome::Reject`] or
347    ///    [`PolicyOutcome::Quarantine`] fails closed and writes nothing.
348    ///
349    /// Returns the new head hash on success.
350    pub fn append_signed(
351        &mut self,
352        mut event: Event,
353        attestor: &dyn Attestor,
354        policy: &PolicyDecision,
355    ) -> Result<String, JsonlError> {
356        require_append_signed_contributors(policy)?;
357        require_append_signed_key_state_not_break_glassed(policy)?;
358        require_append_final_outcome(policy, "ledger.append_signed")?;
359
360        // Seal hash chain first so payload_hash is available for the
361        // attestation preimage.
362        event.prev_event_hash = self.head.clone();
363        seal(&mut event);
364
365        let signed_at = Utc::now();
366        let key_id = attestor.key_id().to_string();
367        let ledger_id = ledger_id_for(&self.path);
368        let preimage = row_preimage(
369            &event,
370            &self.last_sig_prefix,
371            &ledger_id,
372            &key_id,
373            signed_at,
374        );
375        let signing_input = canonical_signing_input(&preimage);
376        let sig = attestor.sign(&signing_input);
377        let sig_bytes = sig.to_bytes();
378
379        let row = SignedRow {
380            event,
381            signature: Some(RowSignature {
382                schema_version: cortex_core::canonical::SCHEMA_VERSION_ATTESTATION,
383                key_id,
384                signed_at,
385                bytes: b64_encode(&sig_bytes),
386            }),
387        };
388        let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
389        self.write_line(&line)?;
390
391        self.head = Some(row.event.event_hash.clone());
392        self.len += 1;
393        // Advance the chain coupling.
394        self.last_sig_prefix.copy_from_slice(&sig_bytes[..32]);
395        Ok(row.event.event_hash)
396    }
397
398    /// Append the schema v1 -> v2 boundary event after the current v1 head,
399    /// gated through the ADR 0026 enforcement lattice.
400    ///
401    /// This helper is intentionally narrow: it validates the typed payload and
402    /// refuses to append unless the log's current head matches
403    /// `payload.previous_v1_head_hash`. It does not run the full migration or
404    /// bump `cortex_core::SCHEMA_VERSION`.
405    ///
406    /// `policy` is the composed [`PolicyDecision`] for the v1 -> v2 boundary
407    /// append (ADR 0026 punch list #17). See
408    /// [`Self::append_schema_migration_v1_to_v2_with_event`] for the
409    /// preflight contract.
410    pub fn append_schema_migration_v1_to_v2(
411        &mut self,
412        payload: SchemaMigrationV1ToV2Payload,
413        policy: &PolicyDecision,
414    ) -> Result<String, JsonlError> {
415        let (head, _event) = self.append_schema_migration_v1_to_v2_with_event(payload, policy)?;
416        Ok(head)
417    }
418
419    /// Schema v1 -> v2 boundary append variant that also returns the sealed
420    /// [`Event`] written to the log, gated through the ADR 0026 enforcement
421    /// lattice.
422    ///
423    /// Callers that mirror the boundary row into a side store (e.g. the
424    /// SQLite events table during `cortex migrate v2` cutover) need the
425    /// sealed event in hand. The JSONL row carries the canonical hash chain;
426    /// the returned event is byte-identical to what was just persisted (its
427    /// `prev_event_hash` and `event_hash` reflect the head couple).
428    ///
429    /// `policy` is the composed [`PolicyDecision`] for the v1 -> v2 boundary
430    /// append and MUST satisfy:
431    ///
432    /// 1. The composition includes contributors for
433    ///    [`SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID`] (ADR 0019 §3: the
434    ///    proposing principal sits in the `Operator` authority class — a
435    ///    non-operator key cannot mint a v2 boundary),
436    ///    [`SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID`] (ADR 0010 §1-§2:
437    ///    a fresh operator attestation is supplied over the boundary
438    ///    payload), and
439    ///    [`SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID`]
440    ///    (ADR 0023 §2 / §5: the signing key state at attestation time is
441    ///    `Active`, not `Retired` or `Revoked`). Callers that skipped
442    ///    composition are refused.
443    /// 2. Per ADR 0026 §4 (hard wall), the attestation contributor and the
444    ///    current-use temporal-authority contributor MUST each vote
445    ///    [`PolicyOutcome::Allow`] regardless of final outcome. A
446    ///    `BreakGlass` decision MUST NOT substitute for either contributor at
447    ///    the migration authority root.
448    /// 3. A final outcome of [`PolicyOutcome::Reject`] or
449    ///    [`PolicyOutcome::Quarantine`] fails closed and writes nothing.
450    pub fn append_schema_migration_v1_to_v2_with_event(
451        &mut self,
452        payload: SchemaMigrationV1ToV2Payload,
453        policy: &PolicyDecision,
454    ) -> Result<(String, Event), JsonlError> {
455        require_schema_migration_contributors(policy)?;
456        require_schema_migration_attestation_not_break_glassed(policy)?;
457        require_schema_migration_current_use_not_break_glassed(policy)?;
458        require_append_final_outcome(policy, "ledger.schema_migration")?;
459
460        let expected_head = payload.previous_v1_head_hash.clone();
461        match self.head.as_deref() {
462            Some(head) if head == expected_head => {}
463            observed => {
464                return Err(JsonlError::Validation(format!(
465                    "schema migration boundary previous_v1_head_hash mismatch: observed {observed:?}, expected {expected_head}"
466                )));
467            }
468        }
469
470        let now = Utc::now();
471        let event = schema_migration_v1_to_v2_event(payload, now, now, None)
472            .map_err(|err| JsonlError::Validation(err.to_string()))?;
473        // Mirror what `append` does internally so we can hand the caller the
474        // identical sealed event the JSONL row carries (the post-migrate row
475        // count refusal helper needs the typed Event, not just the hash, to
476        // mirror the boundary into SQLite via `mirror_single_event_into_sqlite`).
477        self.append_unchecked_returning_event(event)
478    }
479
480    /// Internal: append one event without the ADR 0026 policy preflight,
481    /// returning both the sealed event hash and the typed Event.
482    ///
483    /// Used internally by
484    /// [`Self::append_schema_migration_v1_to_v2_with_event`] after its own
485    /// migration-specific contributors have been verified. The caller-facing
486    /// Event return shape lets the schema v2 cutover mirror the boundary
487    /// row into SQLite (post-migrate row-count refusal helper) without
488    /// re-iterating the JSONL log. Not exposed outside the crate so
489    /// external callers cannot bypass the gate on [`Self::append`].
490    fn append_unchecked_returning_event(
491        &mut self,
492        mut event: Event,
493    ) -> Result<(String, Event), JsonlError> {
494        event.prev_event_hash = self.head.clone();
495        seal(&mut event);
496        let row = SignedRow::unsigned(event.clone());
497        let line = serde_json::to_string(&row).map_err(JsonlError::Encode)?;
498        self.write_line(&line)?;
499        self.head = Some(event.event_hash.clone());
500        self.len += 1;
501        Ok((event.event_hash.clone(), event))
502    }
503
504    /// Internal: append one already-formatted JSON line + `\n` and fsync.
505    fn write_line(&self, line: &str) -> Result<(), JsonlError> {
506        let mut f = OpenOptions::new()
507            .create(true)
508            .append(true)
509            .open(&self.path)
510            .map_err(|source| JsonlError::Io {
511                path: self.path.clone(),
512                source,
513            })?;
514        f.write_all(line.as_bytes())
515            .map_err(|source| JsonlError::Io {
516                path: self.path.clone(),
517                source,
518            })?;
519        f.write_all(b"\n").map_err(|source| JsonlError::Io {
520            path: self.path.clone(),
521            source,
522        })?;
523        // Fsync per write — see module docs.
524        f.sync_all().map_err(|source| JsonlError::Io {
525            path: self.path.clone(),
526            source,
527        })?;
528        Ok(())
529    }
530
531    /// 32-byte prefix of the most recently appended row's signature, or
532    /// the genesis sentinel if no signed row has been appended yet.
533    /// Exposed so callers (audit verifier, tests) can reproduce the
534    /// chain coupling.
535    #[must_use]
536    pub fn last_sig_prefix(&self) -> [u8; 32] {
537        self.last_sig_prefix
538    }
539
540    /// Iterate every row in the log in append order.
541    ///
542    /// Each item is the parsed [`Event`] (the inner semantic event of the
543    /// signed envelope). For access to the row signature use
544    /// [`Self::iter_signed`]. Decode errors short-circuit the iterator
545    /// and surface as `Err`.
546    pub fn iter(&self) -> Result<JsonlIter, JsonlError> {
547        let mut f = File::open(&self.path).map_err(|source| JsonlError::Io {
548            path: self.path.clone(),
549            source,
550        })?;
551        f.seek(SeekFrom::Start(0))
552            .map_err(|source| JsonlError::Io {
553                path: self.path.clone(),
554                source,
555            })?;
556        Ok(JsonlIter {
557            path: self.path.clone(),
558            reader: BufReader::new(Box::new(f) as Box<dyn Read>),
559            line: 0,
560        })
561    }
562
563    /// Iterate every row in the log as [`SignedRow`] envelopes — i.e.
564    /// the [`Event`] plus its optional [`RowSignature`].
565    ///
566    /// Used by [`crate::audit::verify_signed_chain`] to reconstruct the
567    /// per-row attestation preimage. Decode errors surface as `Err` and
568    /// short-circuit.
569    pub fn iter_signed(&self) -> Result<SignedJsonlIter, JsonlError> {
570        let f = File::open(&self.path).map_err(|source| JsonlError::Io {
571            path: self.path.clone(),
572            source,
573        })?;
574        Ok(SignedJsonlIter {
575            path: self.path.clone(),
576            reader: BufReader::new(Box::new(f) as Box<dyn Read>),
577            line: 0,
578        })
579    }
580
581    /// Verify the chain end-to-end.
582    ///
583    /// Walks every row, recomputes both `payload_hash` and `event_hash`
584    /// under the canonical framing, and confirms each row's
585    /// `prev_event_hash` matches the previous row's `event_hash`. Returns
586    /// `Ok(())` on success or [`JsonlError::ChainBroken`] with a short
587    /// summary on failure. For per-row diagnostics use
588    /// [`crate::audit::verify_chain`].
589    pub fn verify_chain(&self) -> Result<(), JsonlError> {
590        let mut prev: Option<String> = None;
591        for (i, item) in self.iter()?.enumerate() {
592            let e = item?;
593            // Recompute payload_hash.
594            let expected_payload = crate::hash::payload_hash(&e.payload);
595            if e.payload_hash != expected_payload {
596                return Err(JsonlError::ChainBroken(format!(
597                    "row {} payload_hash mismatch",
598                    i + 1
599                )));
600            }
601            let expected_event =
602                crate::hash::event_hash(e.prev_event_hash.as_deref(), &e.payload_hash);
603            if e.event_hash != expected_event {
604                return Err(JsonlError::ChainBroken(format!(
605                    "row {} event_hash mismatch",
606                    i + 1
607                )));
608            }
609            if e.prev_event_hash != prev {
610                return Err(JsonlError::ChainBroken(format!(
611                    "row {} prev_event_hash does not point at previous row",
612                    i + 1
613                )));
614            }
615            prev = Some(e.event_hash.clone());
616        }
617        Ok(())
618    }
619}
620
621fn require_append_final_outcome(policy: &PolicyDecision, surface: &str) -> Result<(), JsonlError> {
622    match policy.final_outcome {
623        PolicyOutcome::Allow | PolicyOutcome::Warn | PolicyOutcome::BreakGlass => Ok(()),
624        PolicyOutcome::Quarantine | PolicyOutcome::Reject => Err(JsonlError::Validation(format!(
625            "{surface} preflight: composed policy outcome {:?} blocks ledger append",
626            policy.final_outcome,
627        ))),
628    }
629}
630
631fn require_contributor(policy: &PolicyDecision, rule_id: &str) -> Result<(), JsonlError> {
632    let contains_rule = policy
633        .contributing
634        .iter()
635        .chain(policy.discarded.iter())
636        .any(|contribution| contribution.rule_id.as_str() == rule_id);
637    if contains_rule {
638        Ok(())
639    } else {
640        Err(JsonlError::Validation(format!(
641            "policy decision missing required contributor `{rule_id}`; caller skipped ADR 0026 composition",
642        )))
643    }
644}
645
646fn require_append_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
647    require_contributor(policy, APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID)?;
648    require_contributor(policy, APPEND_ATTESTATION_REQUIRED_RULE_ID)?;
649    require_contributor(policy, APPEND_RUNTIME_MODE_RULE_ID)?;
650    Ok(())
651}
652
653fn require_append_signed_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
654    require_contributor(policy, APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID)?;
655    require_contributor(policy, APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID)?;
656    Ok(())
657}
658
659fn require_event_source_attestation(
660    policy: &PolicyDecision,
661    source: &EventSource,
662) -> Result<(), JsonlError> {
663    // ADR 0026 §4: at the User-event authority boundary, the attestation
664    // contributor must itself have voted `Allow`. A BreakGlass final
665    // outcome MUST NOT substitute for missing attestation on a user-sourced
666    // row.
667    if !matches!(source, EventSource::User) {
668        return Ok(());
669    }
670    let attestation = policy
671        .contributing
672        .iter()
673        .chain(policy.discarded.iter())
674        .find(|contribution| {
675            contribution.rule_id.as_str() == APPEND_ATTESTATION_REQUIRED_RULE_ID
676        })
677        .ok_or_else(|| {
678            JsonlError::Validation(format!(
679                "ledger.append preflight: required attestation contributor `{APPEND_ATTESTATION_REQUIRED_RULE_ID}` is absent from the policy decision for EventSource::User"
680            ))
681        })?;
682    if attestation.outcome == PolicyOutcome::Allow {
683        Ok(())
684    } else {
685        Err(JsonlError::Validation(format!(
686            "ledger.append preflight: attestation contributor `{APPEND_ATTESTATION_REQUIRED_RULE_ID}` returned {:?} for EventSource::User; ADR 0026 §4 forbids BreakGlass substituting for attestation",
687            attestation.outcome,
688        )))
689    }
690}
691
692fn require_schema_migration_contributors(policy: &PolicyDecision) -> Result<(), JsonlError> {
693    require_contributor(policy, SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID)?;
694    require_contributor(policy, SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID)?;
695    require_contributor(
696        policy,
697        SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
698    )?;
699    Ok(())
700}
701
702fn require_schema_migration_attestation_not_break_glassed(
703    policy: &PolicyDecision,
704) -> Result<(), JsonlError> {
705    // ADR 0026 §4: at the migration authority root, the attestation
706    // contributor (ADR 0010) MUST itself have voted `Allow`. A `BreakGlass`
707    // final outcome MUST NOT substitute for missing operator attestation at
708    // the v1 -> v2 boundary; this is the long-promised `--unattended-migrate`
709    // blocker.
710    let attestation = policy
711        .contributing
712        .iter()
713        .chain(policy.discarded.iter())
714        .find(|contribution| {
715            contribution.rule_id.as_str() == SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID
716        })
717        .ok_or_else(|| {
718            JsonlError::Validation(format!(
719                "ledger.schema_migration preflight: required attestation contributor `{SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID}` is absent from the policy decision"
720            ))
721        })?;
722    if attestation.outcome == PolicyOutcome::Allow {
723        Ok(())
724    } else {
725        Err(JsonlError::Validation(format!(
726            "ledger.schema_migration preflight: attestation contributor `{SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID}` returned {:?}; ADR 0026 §4 forbids BreakGlass substituting for operator attestation at the migration authority root",
727            attestation.outcome,
728        )))
729    }
730}
731
732fn require_schema_migration_current_use_not_break_glassed(
733    policy: &PolicyDecision,
734) -> Result<(), JsonlError> {
735    // ADR 0023 §5 + ADR 0026 §4: the current-use temporal-authority
736    // contributor MUST itself have voted `Allow`. A historical-only or
737    // revoked signing key vote of `Reject` here MUST NOT be overridden by a
738    // `BreakGlass` final outcome at the migration authority root.
739    let current_use = policy
740        .contributing
741        .iter()
742        .chain(policy.discarded.iter())
743        .find(|contribution| {
744            contribution.rule_id.as_str()
745                == SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID
746        })
747        .ok_or_else(|| {
748            JsonlError::Validation(format!(
749                "ledger.schema_migration preflight: required current-use contributor `{SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID}` is absent from the policy decision"
750            ))
751        })?;
752    if current_use.outcome == PolicyOutcome::Allow {
753        Ok(())
754    } else {
755        Err(JsonlError::Validation(format!(
756            "ledger.schema_migration preflight: current-use contributor `{SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID}` returned {:?}; ADR 0023 forbids historical-only or revoked signing keys at the migration authority root",
757            current_use.outcome,
758        )))
759    }
760}
761
762fn require_append_signed_key_state_not_break_glassed(
763    policy: &PolicyDecision,
764) -> Result<(), JsonlError> {
765    // ADR 0023 + ADR 0026 §4: the current-use revalidation contributor
766    // must itself have voted `Allow` for a signed append. A revoked or
767    // historical-only key vote of `Reject` here MUST NOT be overridden by
768    // a BreakGlass final outcome on the signed ledger root.
769    let key_state = policy
770        .contributing
771        .iter()
772        .chain(policy.discarded.iter())
773        .find(|contribution| {
774            contribution.rule_id.as_str() == APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID
775        })
776        .ok_or_else(|| {
777            JsonlError::Validation(format!(
778                "ledger.append_signed preflight: required current-use contributor `{APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID}` is absent from the policy decision"
779            ))
780        })?;
781    if key_state.outcome == PolicyOutcome::Allow {
782        Ok(())
783    } else {
784        Err(JsonlError::Validation(format!(
785            "ledger.append_signed preflight: current-use contributor `{APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID}` returned {:?}; ADR 0023 forbids historical-only or revoked signing keys at the trusted ledger root",
786            key_state.outcome,
787        )))
788    }
789}
790
791/// Build a [`PolicyDecision`] that satisfies [`JsonlLog::append`] inputs
792/// for the happy path. Intended for tests and fixtures only.
793///
794/// Production callers MUST compose
795/// [`APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID`],
796/// [`APPEND_ATTESTATION_REQUIRED_RULE_ID`], and
797/// [`APPEND_RUNTIME_MODE_RULE_ID`] from real trust-tier evidence, real
798/// attestation evidence, and the active runtime mode. This helper is
799/// exposed unconditionally because integration test crates outside
800/// `cortex-ledger` need the same fixture shape; the `_test_allow` suffix
801/// is the contract that documents intent.
802#[must_use]
803pub fn append_policy_decision_test_allow() -> PolicyDecision {
804    use cortex_core::{compose_policy_outcomes, PolicyContribution};
805    compose_policy_outcomes(
806        vec![
807            PolicyContribution::new(
808                APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
809                PolicyOutcome::Allow,
810                "test fixture: event source tier gate satisfied",
811            )
812            .expect("static test contribution is valid"),
813            PolicyContribution::new(
814                APPEND_ATTESTATION_REQUIRED_RULE_ID,
815                PolicyOutcome::Allow,
816                "test fixture: attestation requirement satisfied",
817            )
818            .expect("static test contribution is valid"),
819            PolicyContribution::new(
820                APPEND_RUNTIME_MODE_RULE_ID,
821                PolicyOutcome::Allow,
822                "test fixture: runtime mode permits unsigned append",
823            )
824            .expect("static test contribution is valid"),
825        ],
826        None,
827    )
828}
829
830/// Build a [`PolicyDecision`] that satisfies
831/// [`JsonlLog::append_schema_migration_v1_to_v2`] inputs for the happy path.
832/// Intended for tests and fixtures only.
833///
834/// Production callers MUST compose
835/// [`SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID`] from a real ADR 0019
836/// authority-class evaluation, [`SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID`]
837/// from a real ADR 0010 attestation envelope verification, and
838/// [`SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID`] from real
839/// ADR 0023 key-state-at-event-time evidence. The `_test_allow` suffix is the
840/// contract that documents intent; this helper exists so integration tests
841/// outside `cortex-ledger` can build a fixture decision without re-encoding
842/// the rule-id list.
843#[must_use]
844pub fn schema_migration_v1_to_v2_policy_decision_test_allow() -> PolicyDecision {
845    use cortex_core::{compose_policy_outcomes, PolicyContribution};
846    compose_policy_outcomes(
847        vec![
848            PolicyContribution::new(
849                SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
850                PolicyOutcome::Allow,
851                "test fixture: operator authority class satisfied",
852            )
853            .expect("static test contribution is valid"),
854            PolicyContribution::new(
855                SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
856                PolicyOutcome::Allow,
857                "test fixture: operator attestation present",
858            )
859            .expect("static test contribution is valid"),
860            PolicyContribution::new(
861                SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
862                PolicyOutcome::Allow,
863                "test fixture: signing key state is current-use",
864            )
865            .expect("static test contribution is valid"),
866        ],
867        None,
868    )
869}
870
871/// Build a [`PolicyDecision`] that satisfies [`JsonlLog::append_signed`]
872/// inputs for the happy path. Intended for tests and fixtures only; see
873/// [`append_policy_decision_test_allow`] for the production-caller
874/// contract.
875#[must_use]
876pub fn append_signed_policy_decision_test_allow() -> PolicyDecision {
877    use cortex_core::{compose_policy_outcomes, PolicyContribution};
878    compose_policy_outcomes(
879        vec![
880            PolicyContribution::new(
881                APPEND_SIGNED_KEY_STATE_CURRENT_USE_RULE_ID,
882                PolicyOutcome::Allow,
883                "test fixture: signing key state is current-use",
884            )
885            .expect("static test contribution is valid"),
886            PolicyContribution::new(
887                APPEND_SIGNED_TRUST_TIER_MINIMUM_RULE_ID,
888                PolicyOutcome::Allow,
889                "test fixture: signing principal trust tier satisfies minimum",
890            )
891            .expect("static test contribution is valid"),
892        ],
893        None,
894    )
895}
896
897/// Owning iterator over a [`JsonlLog`].
898pub struct JsonlIter {
899    path: PathBuf,
900    reader: BufReader<Box<dyn Read>>,
901    line: usize,
902}
903
904impl std::fmt::Debug for JsonlIter {
905    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
906        f.debug_struct("JsonlIter")
907            .field("path", &self.path)
908            .field("line", &self.line)
909            .finish()
910    }
911}
912
913impl Iterator for JsonlIter {
914    type Item = Result<Event, JsonlError>;
915
916    fn next(&mut self) -> Option<Self::Item> {
917        loop {
918            let mut buf = String::new();
919            let n = match self.reader.read_line(&mut buf) {
920                Ok(n) => n,
921                Err(source) => {
922                    return Some(Err(JsonlError::Io {
923                        path: self.path.clone(),
924                        source,
925                    }));
926                }
927            };
928            if n == 0 {
929                return None; // EOF
930            }
931            self.line += 1;
932            let trimmed = buf.trim();
933            if trimmed.is_empty() {
934                continue;
935            }
936            return Some(
937                serde_json::from_str::<SignedRow>(trimmed)
938                    .map(|row| row.event)
939                    .map_err(|source| JsonlError::Decode {
940                        path: self.path.clone(),
941                        line: self.line,
942                        source,
943                    }),
944            );
945        }
946    }
947}
948
949/// Owning iterator over a [`JsonlLog`] yielding full [`SignedRow`]
950/// envelopes (event + optional signature). Used by the Ed25519-aware
951/// audit verifier in [`crate::audit::verify_signed_chain`].
952pub struct SignedJsonlIter {
953    path: PathBuf,
954    reader: BufReader<Box<dyn Read>>,
955    line: usize,
956}
957
958impl std::fmt::Debug for SignedJsonlIter {
959    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
960        f.debug_struct("SignedJsonlIter")
961            .field("path", &self.path)
962            .field("line", &self.line)
963            .finish()
964    }
965}
966
967impl Iterator for SignedJsonlIter {
968    type Item = Result<SignedRow, JsonlError>;
969
970    fn next(&mut self) -> Option<Self::Item> {
971        loop {
972            let mut buf = String::new();
973            let n = match self.reader.read_line(&mut buf) {
974                Ok(n) => n,
975                Err(source) => {
976                    return Some(Err(JsonlError::Io {
977                        path: self.path.clone(),
978                        source,
979                    }));
980                }
981            };
982            if n == 0 {
983                return None;
984            }
985            self.line += 1;
986            let trimmed = buf.trim();
987            if trimmed.is_empty() {
988                continue;
989            }
990            return Some(
991                serde_json::from_str::<SignedRow>(trimmed).map_err(|source| JsonlError::Decode {
992                    path: self.path.clone(),
993                    line: self.line,
994                    source,
995                }),
996            );
997        }
998    }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004    use chrono::TimeZone;
1005    use cortex_core::{
1006        compose_policy_outcomes, Event, EventId, EventSource, EventType, PolicyContribution,
1007        SchemaMigrationV1ToV2Payload, SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND,
1008        SCHEMA_MIGRATION_V1_TO_V2_TARGET, SCHEMA_VERSION,
1009    };
1010    use tempfile::tempdir;
1011
1012    fn fixture_event(seq: u64) -> Event {
1013        Event {
1014            id: EventId::new(),
1015            schema_version: SCHEMA_VERSION,
1016            observed_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(),
1017            recorded_at: chrono::Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 1).unwrap(),
1018            source: EventSource::User,
1019            event_type: EventType::UserMessage,
1020            trace_id: None,
1021            session_id: None,
1022            domain_tags: vec![],
1023            payload: serde_json::json!({"seq": seq, "text": format!("event {seq}")}),
1024            payload_hash: String::new(),
1025            prev_event_hash: None,
1026            event_hash: String::new(),
1027        }
1028    }
1029
1030    fn allow_policy() -> PolicyDecision {
1031        append_policy_decision_test_allow()
1032    }
1033
1034    fn migration_allow_policy() -> PolicyDecision {
1035        schema_migration_v1_to_v2_policy_decision_test_allow()
1036    }
1037
1038    /// T-1.B.2 acceptance: append N events, reopen, verify chain.
1039    #[test]
1040    fn append_n_reopen_and_verify() {
1041        let dir = tempdir().unwrap();
1042        let path = dir.path().join("events.jsonl");
1043
1044        // Append 25 events.
1045        let mut log = JsonlLog::open(&path).unwrap();
1046        let mut heads = Vec::new();
1047        let policy = allow_policy();
1048        for i in 0..25u64 {
1049            let head = log.append(fixture_event(i), &policy).unwrap();
1050            heads.push(head);
1051        }
1052        assert_eq!(log.len(), 25);
1053        assert_eq!(log.head(), Some(heads.last().unwrap().as_str()));
1054
1055        // Reopen and verify.
1056        let log2 = JsonlLog::open(&path).unwrap();
1057        assert_eq!(log2.len(), 25);
1058        assert_eq!(log2.head(), Some(heads.last().unwrap().as_str()));
1059        log2.verify_chain().expect("chain verifies after reopen");
1060
1061        // Iterate and confirm prev->next linkage.
1062        let mut prev: Option<String> = None;
1063        let mut count = 0;
1064        for item in log2.iter().unwrap() {
1065            let e = item.unwrap();
1066            assert_eq!(e.prev_event_hash, prev);
1067            prev = Some(e.event_hash.clone());
1068            count += 1;
1069        }
1070        assert_eq!(count, 25);
1071    }
1072
1073    #[test]
1074    fn empty_log_verifies() {
1075        let dir = tempdir().unwrap();
1076        let path = dir.path().join("empty.jsonl");
1077        let log = JsonlLog::open(&path).unwrap();
1078        assert_eq!(log.len(), 0);
1079        assert!(log.head().is_none());
1080        log.verify_chain().expect("empty chain is valid");
1081    }
1082
1083    #[test]
1084    fn append_persists_after_drop() {
1085        let dir = tempdir().unwrap();
1086        let path = dir.path().join("persist.jsonl");
1087        {
1088            let mut log = JsonlLog::open(&path).unwrap();
1089            let policy = allow_policy();
1090            log.append(fixture_event(0), &policy).unwrap();
1091            log.append(fixture_event(1), &policy).unwrap();
1092            // drop log; fsync should have made the rows durable.
1093        }
1094        let log2 = JsonlLog::open(&path).unwrap();
1095        assert_eq!(log2.len(), 2);
1096    }
1097
1098    #[test]
1099    fn corrupted_payload_fails_verify() {
1100        let dir = tempdir().unwrap();
1101        let path = dir.path().join("corrupt.jsonl");
1102        let mut log = JsonlLog::open(&path).unwrap();
1103        let policy = allow_policy();
1104        log.append(fixture_event(0), &policy).unwrap();
1105        log.append(fixture_event(1), &policy).unwrap();
1106        log.append(fixture_event(2), &policy).unwrap();
1107
1108        // Tamper: rewrite the file with a mutated payload on row 2.
1109        let lines: Vec<String> = std::fs::read_to_string(&path)
1110            .unwrap()
1111            .lines()
1112            .map(|s| s.to_string())
1113            .collect();
1114        let mut bad: Event = serde_json::from_str(&lines[1]).unwrap();
1115        bad.payload = serde_json::json!({"tampered": true});
1116        let mut new_content = String::new();
1117        new_content.push_str(&lines[0]);
1118        new_content.push('\n');
1119        new_content.push_str(&serde_json::to_string(&bad).unwrap());
1120        new_content.push('\n');
1121        new_content.push_str(&lines[2]);
1122        new_content.push('\n');
1123        std::fs::write(&path, new_content).unwrap();
1124
1125        let log2 = JsonlLog::open(&path).unwrap();
1126        let err = log2.verify_chain().unwrap_err();
1127        assert!(matches!(err, JsonlError::ChainBroken(_)));
1128    }
1129
1130    #[test]
1131    fn append_after_reopen_continues_chain() {
1132        let dir = tempdir().unwrap();
1133        let path = dir.path().join("continue.jsonl");
1134        let head_before;
1135        {
1136            let mut log = JsonlLog::open(&path).unwrap();
1137            let policy = allow_policy();
1138            log.append(fixture_event(0), &policy).unwrap();
1139            head_before = log.append(fixture_event(1), &policy).unwrap();
1140        }
1141        let mut log2 = JsonlLog::open(&path).unwrap();
1142        assert_eq!(log2.head(), Some(head_before.as_str()));
1143        let head_after = log2.append(fixture_event(2), &allow_policy()).unwrap();
1144        assert_ne!(head_after, head_before);
1145        log2.verify_chain().expect("continued chain verifies");
1146    }
1147
1148    #[test]
1149    fn schema_migration_v1_to_v2_event_emitted_after_v1_head() {
1150        let dir = tempdir().unwrap();
1151        let path = dir.path().join("schema-boundary.jsonl");
1152        let mut log = JsonlLog::open(&path).unwrap();
1153        let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1154        let payload = SchemaMigrationV1ToV2Payload::new(
1155            previous_v1_head.clone(),
1156            "script-digest",
1157            None,
1158            "fixture-digest",
1159        );
1160
1161        let boundary_head = log
1162            .append_schema_migration_v1_to_v2(payload, &migration_allow_policy())
1163            .expect("boundary event appends");
1164
1165        assert_eq!(log.len(), 2);
1166        assert_eq!(log.head(), Some(boundary_head.as_str()));
1167        log.verify_chain().expect("boundary chain verifies");
1168
1169        let rows = log.iter().unwrap().collect::<Result<Vec<_>, _>>().unwrap();
1170        let boundary = rows.last().expect("boundary row exists");
1171        assert_eq!(boundary.schema_version, SCHEMA_MIGRATION_V1_TO_V2_TARGET);
1172        assert_eq!(boundary.event_type, EventType::SystemNote);
1173        assert_eq!(boundary.source, EventSource::Runtime);
1174        assert_eq!(
1175            boundary.prev_event_hash.as_deref(),
1176            Some(previous_v1_head.as_str())
1177        );
1178        assert_eq!(
1179            boundary.payload["kind"],
1180            SCHEMA_MIGRATION_V1_TO_V2_EVENT_KIND
1181        );
1182        assert_eq!(
1183            boundary.payload["payload"]["previous_v1_head_hash"],
1184            previous_v1_head
1185        );
1186    }
1187
1188    #[test]
1189    fn schema_migration_v1_to_v2_event_rejects_wrong_previous_head() {
1190        let dir = tempdir().unwrap();
1191        let path = dir.path().join("schema-boundary-mismatch.jsonl");
1192        let mut log = JsonlLog::open(&path).unwrap();
1193        log.append(fixture_event(0), &allow_policy()).unwrap();
1194        let payload =
1195            SchemaMigrationV1ToV2Payload::new("not-current-head", "script-digest", None, "fixture");
1196
1197        let err = log
1198            .append_schema_migration_v1_to_v2(payload, &migration_allow_policy())
1199            .expect_err("wrong previous head must fail");
1200
1201        assert!(matches!(err, JsonlError::Validation(_)));
1202        assert_eq!(log.len(), 1);
1203        log.verify_chain()
1204            .expect("rejected boundary append must not corrupt chain");
1205    }
1206
1207    #[test]
1208    fn schema_migration_v1_to_v2_refuses_missing_authority_class_contributor() {
1209        let dir = tempdir().unwrap();
1210        let path = dir.path().join("schema-boundary-missing-auth.jsonl");
1211        let mut log = JsonlLog::open(&path).unwrap();
1212        let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1213        let payload = SchemaMigrationV1ToV2Payload::new(
1214            previous_v1_head,
1215            "script-digest",
1216            None,
1217            "fixture-digest",
1218        );
1219
1220        // Policy is missing the authority-class contributor.
1221        let policy = compose_policy_outcomes(
1222            vec![
1223                PolicyContribution::new(
1224                    SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1225                    PolicyOutcome::Allow,
1226                    "fixture: attestation present",
1227                )
1228                .unwrap(),
1229                PolicyContribution::new(
1230                    SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1231                    PolicyOutcome::Allow,
1232                    "fixture: current use",
1233                )
1234                .unwrap(),
1235            ],
1236            None,
1237        );
1238
1239        let err = log
1240            .append_schema_migration_v1_to_v2(payload, &policy)
1241            .expect_err("missing authority-class contributor must fail");
1242        assert!(matches!(err, JsonlError::Validation(_)));
1243        assert_eq!(log.len(), 1);
1244        log.verify_chain()
1245            .expect("rejected boundary append must not corrupt chain");
1246    }
1247
1248    #[test]
1249    fn schema_migration_v1_to_v2_refuses_reject_outcome() {
1250        let dir = tempdir().unwrap();
1251        let path = dir.path().join("schema-boundary-reject.jsonl");
1252        let mut log = JsonlLog::open(&path).unwrap();
1253        let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1254        let payload = SchemaMigrationV1ToV2Payload::new(
1255            previous_v1_head,
1256            "script-digest",
1257            None,
1258            "fixture-digest",
1259        );
1260
1261        // All three contributors present; authority-class refuses with Reject.
1262        let policy = compose_policy_outcomes(
1263            vec![
1264                PolicyContribution::new(
1265                    SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
1266                    PolicyOutcome::Reject,
1267                    "fixture: non-operator authority class",
1268                )
1269                .unwrap(),
1270                PolicyContribution::new(
1271                    SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1272                    PolicyOutcome::Allow,
1273                    "fixture: attestation present",
1274                )
1275                .unwrap(),
1276                PolicyContribution::new(
1277                    SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1278                    PolicyOutcome::Allow,
1279                    "fixture: current use",
1280                )
1281                .unwrap(),
1282            ],
1283            None,
1284        );
1285        assert_eq!(policy.final_outcome, PolicyOutcome::Reject);
1286
1287        let err = log
1288            .append_schema_migration_v1_to_v2(payload, &policy)
1289            .expect_err("reject outcome must fail closed");
1290        assert!(matches!(err, JsonlError::Validation(_)));
1291        assert_eq!(log.len(), 1);
1292    }
1293
1294    #[test]
1295    fn schema_migration_v1_to_v2_refuses_quarantine_outcome() {
1296        let dir = tempdir().unwrap();
1297        let path = dir.path().join("schema-boundary-quarantine.jsonl");
1298        let mut log = JsonlLog::open(&path).unwrap();
1299        let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1300        let payload = SchemaMigrationV1ToV2Payload::new(
1301            previous_v1_head,
1302            "script-digest",
1303            None,
1304            "fixture-digest",
1305        );
1306
1307        let policy = compose_policy_outcomes(
1308            vec![
1309                PolicyContribution::new(
1310                    SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
1311                    PolicyOutcome::Quarantine,
1312                    "fixture: under-trust authority class",
1313                )
1314                .unwrap(),
1315                PolicyContribution::new(
1316                    SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1317                    PolicyOutcome::Allow,
1318                    "fixture: attestation present",
1319                )
1320                .unwrap(),
1321                PolicyContribution::new(
1322                    SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1323                    PolicyOutcome::Allow,
1324                    "fixture: current use",
1325                )
1326                .unwrap(),
1327            ],
1328            None,
1329        );
1330        assert_eq!(policy.final_outcome, PolicyOutcome::Quarantine);
1331
1332        let err = log
1333            .append_schema_migration_v1_to_v2(payload, &policy)
1334            .expect_err("quarantine outcome must fail closed");
1335        assert!(matches!(err, JsonlError::Validation(_)));
1336        assert_eq!(log.len(), 1);
1337    }
1338
1339    #[test]
1340    fn schema_migration_v1_to_v2_refuses_attestation_break_glass() {
1341        let dir = tempdir().unwrap();
1342        let path = dir.path().join("schema-boundary-break-glass.jsonl");
1343        let mut log = JsonlLog::open(&path).unwrap();
1344        let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1345        let payload = SchemaMigrationV1ToV2Payload::new(
1346            previous_v1_head,
1347            "script-digest",
1348            None,
1349            "fixture-digest",
1350        );
1351
1352        // Attestation contributor voted BreakGlass; ADR 0026 §4 forbids
1353        // BreakGlass from substituting for operator attestation here.
1354        let policy = compose_policy_outcomes(
1355            vec![
1356                PolicyContribution::new(
1357                    SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
1358                    PolicyOutcome::Allow,
1359                    "fixture: operator authority class",
1360                )
1361                .unwrap(),
1362                PolicyContribution::new(
1363                    SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1364                    PolicyOutcome::BreakGlass,
1365                    "fixture: operator attempted break-glass on attestation",
1366                )
1367                .unwrap(),
1368                PolicyContribution::new(
1369                    SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1370                    PolicyOutcome::Allow,
1371                    "fixture: current use",
1372                )
1373                .unwrap(),
1374            ],
1375            None,
1376        );
1377
1378        let err = log
1379            .append_schema_migration_v1_to_v2(payload, &policy)
1380            .expect_err("BreakGlass on attestation must fail");
1381        assert!(matches!(err, JsonlError::Validation(_)));
1382        assert_eq!(log.len(), 1);
1383    }
1384
1385    #[test]
1386    fn schema_migration_v1_to_v2_refuses_historical_key_current_use() {
1387        let dir = tempdir().unwrap();
1388        let path = dir.path().join("schema-boundary-historical-key.jsonl");
1389        let mut log = JsonlLog::open(&path).unwrap();
1390        let previous_v1_head = log.append(fixture_event(0), &allow_policy()).unwrap();
1391        let payload = SchemaMigrationV1ToV2Payload::new(
1392            previous_v1_head,
1393            "script-digest",
1394            None,
1395            "fixture-digest",
1396        );
1397
1398        // Current-use contributor refuses because the signing key is
1399        // historical-only (Retired or Revoked at attestation time).
1400        let policy = compose_policy_outcomes(
1401            vec![
1402                PolicyContribution::new(
1403                    SCHEMA_MIGRATION_AUTHORITY_CLASS_RULE_ID,
1404                    PolicyOutcome::Allow,
1405                    "fixture: operator authority class",
1406                )
1407                .unwrap(),
1408                PolicyContribution::new(
1409                    SCHEMA_MIGRATION_ATTESTATION_REQUIRED_RULE_ID,
1410                    PolicyOutcome::Allow,
1411                    "fixture: attestation present",
1412                )
1413                .unwrap(),
1414                PolicyContribution::new(
1415                    SCHEMA_MIGRATION_CURRENT_USE_TEMPORAL_AUTHORITY_RULE_ID,
1416                    PolicyOutcome::Reject,
1417                    "fixture: signing key retired before attestation time",
1418                )
1419                .unwrap(),
1420            ],
1421            None,
1422        );
1423
1424        let err = log
1425            .append_schema_migration_v1_to_v2(payload, &policy)
1426            .expect_err("historical-only signing key must fail");
1427        assert!(matches!(err, JsonlError::Validation(_)));
1428        assert_eq!(log.len(), 1);
1429    }
1430
1431    #[test]
1432    fn append_refuses_policy_decision_missing_contributors() {
1433        let dir = tempdir().unwrap();
1434        let path = dir.path().join("missing-contributor.jsonl");
1435        let mut log = JsonlLog::open(&path).unwrap();
1436        // Compose a policy with only one of the three required contributors.
1437        let policy = compose_policy_outcomes(
1438            vec![PolicyContribution::new(
1439                APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
1440                PolicyOutcome::Allow,
1441                "fixture: tier gate only",
1442            )
1443            .unwrap()],
1444            None,
1445        );
1446
1447        let err = log
1448            .append(fixture_event(0), &policy)
1449            .expect_err("missing contributor must fail");
1450        assert!(matches!(err, JsonlError::Validation(_)));
1451        assert_eq!(log.len(), 0);
1452    }
1453
1454    #[test]
1455    fn append_refuses_reject_outcome() {
1456        let dir = tempdir().unwrap();
1457        let path = dir.path().join("reject-outcome.jsonl");
1458        let mut log = JsonlLog::open(&path).unwrap();
1459        let policy = compose_policy_outcomes(
1460            vec![
1461                PolicyContribution::new(
1462                    APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
1463                    PolicyOutcome::Reject,
1464                    "fixture: tier gate refuses",
1465                )
1466                .unwrap(),
1467                PolicyContribution::new(
1468                    APPEND_ATTESTATION_REQUIRED_RULE_ID,
1469                    PolicyOutcome::Allow,
1470                    "fixture: attestation present",
1471                )
1472                .unwrap(),
1473                PolicyContribution::new(
1474                    APPEND_RUNTIME_MODE_RULE_ID,
1475                    PolicyOutcome::Allow,
1476                    "fixture: runtime mode permits unsigned",
1477                )
1478                .unwrap(),
1479            ],
1480            None,
1481        );
1482        assert_eq!(policy.final_outcome, PolicyOutcome::Reject);
1483
1484        let err = log
1485            .append(fixture_event(0), &policy)
1486            .expect_err("reject outcome must fail closed");
1487        assert!(matches!(err, JsonlError::Validation(_)));
1488        assert_eq!(log.len(), 0);
1489    }
1490
1491    #[test]
1492    fn append_refuses_user_event_when_attestation_contributor_not_allow() {
1493        let dir = tempdir().unwrap();
1494        let path = dir.path().join("user-attestation.jsonl");
1495        let mut log = JsonlLog::open(&path).unwrap();
1496        // Attestation says Warn, not Allow; ADR 0026 §4 forbids BreakGlass /
1497        // Warn substituting for attestation at the User authority boundary.
1498        let policy = compose_policy_outcomes(
1499            vec![
1500                PolicyContribution::new(
1501                    APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
1502                    PolicyOutcome::Allow,
1503                    "fixture: tier gate allows",
1504                )
1505                .unwrap(),
1506                PolicyContribution::new(
1507                    APPEND_ATTESTATION_REQUIRED_RULE_ID,
1508                    PolicyOutcome::Warn,
1509                    "fixture: attestation warning",
1510                )
1511                .unwrap(),
1512                PolicyContribution::new(
1513                    APPEND_RUNTIME_MODE_RULE_ID,
1514                    PolicyOutcome::Allow,
1515                    "fixture: runtime mode permits unsigned",
1516                )
1517                .unwrap(),
1518            ],
1519            None,
1520        );
1521        // Final outcome is Warn, but EventSource::User still must fail.
1522        let err = log
1523            .append(fixture_event(0), &policy)
1524            .expect_err("User event without Allow attestation must fail");
1525        assert!(matches!(err, JsonlError::Validation(_)));
1526        assert_eq!(log.len(), 0);
1527    }
1528
1529    #[test]
1530    fn append_allows_warn_outcome() {
1531        let dir = tempdir().unwrap();
1532        let path = dir.path().join("warn-outcome.jsonl");
1533        let mut log = JsonlLog::open(&path).unwrap();
1534        // Warn final outcome with Allow attestation must still append for
1535        // an EventSource::User row (the local-development ledger pattern).
1536        let policy = compose_policy_outcomes(
1537            vec![
1538                PolicyContribution::new(
1539                    APPEND_EVENT_SOURCE_TIER_GATE_RULE_ID,
1540                    PolicyOutcome::Allow,
1541                    "fixture: tier gate allows",
1542                )
1543                .unwrap(),
1544                PolicyContribution::new(
1545                    APPEND_ATTESTATION_REQUIRED_RULE_ID,
1546                    PolicyOutcome::Allow,
1547                    "fixture: attestation present",
1548                )
1549                .unwrap(),
1550                PolicyContribution::new(
1551                    APPEND_RUNTIME_MODE_RULE_ID,
1552                    PolicyOutcome::Warn,
1553                    "fixture: runtime mode is local-development",
1554                )
1555                .unwrap(),
1556            ],
1557            None,
1558        );
1559        assert_eq!(policy.final_outcome, PolicyOutcome::Warn);
1560        log.append(fixture_event(0), &policy)
1561            .expect("warn outcome must still append");
1562        assert_eq!(log.len(), 1);
1563    }
1564}