Skip to main content

ai_memory/
signed_events.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 / H-track substrate — append-only `signed_events` audit
5//! table.
6//!
7//! # NSA CSI MCP Security mapping
8//!
9//! Primary defense against **NSA concern (g) Poor or missing audit
10//! logs** and contributing implementation of **NSA recommendation (e)
11//! Sign and verify MCP messages** + **(g) Instrument for logging and
12//! detection** per the NSA Cybersecurity Information document on MCP
13//! security (U/OO/6030316-26 \| PP-26-1834, May 2026, Version 1.0).
14//! The V-4 cross-row hash chain (`prev_hash` SHA-256 over the prior
15//! row's canonical bytes plus monotonic `sequence` counter) makes the
16//! audit log tamper-evident even when individual row signatures are
17//! valid: tampering with row N's content breaks row N+1's `prev_hash`
18//! verification; tampering with `sequence` breaks the contiguity
19//! check. Capability inventory anchor:
20//! `v4_signed_events_chain` in
21//! [`docs/compliance/_inventory/v0.7.0-capabilities.json`](../../docs/compliance/_inventory/v0.7.0-capabilities.json);
22//! narrative in
23//! [`docs/compliance/nsa-csi-mcp.html`](../../docs/compliance/nsa-csi-mcp.html)
24//! §3.7 (concern g) and §4.7 (recommendation g).
25//!
26//! Each identity-bearing write (today: every `memory_link` insert
27//! through `db::create_link` / `db::create_link_signed`) appends one
28//! row to `signed_events` so a downstream auditor can replay the
29//! exact sequence of attestation events the daemon emitted, without
30//! having to scan the mutable `memory_links` table for "what did
31//! this row look like at write time" — by construction, the
32//! `payload_hash` captured here is SHA-256 over the same canonical-
33//! CBOR bytes the H2 signer committed to.
34//!
35//! # Append-only invariant (row-level)
36//!
37//! This module exposes ONE writer ([`append_signed_event`]) and
38//! ZERO mutators. There are no `UPDATE signed_events` or `DELETE
39//! FROM signed_events` statements anywhere in the production
40//! codepath. Operators that need to prune (compliance retention,
41//! disk pressure) MUST do so via direct SQL with explicit
42//! awareness that they are breaking the audit chain — that is the
43//! deliberate escape hatch documented in
44//! `migrations/sqlite/0020_v07_signed_events.sql`.
45//!
46//! The H5 test suite asserts (via grep over `src/`) that no
47//! `UPDATE signed_events` / `DELETE FROM signed_events` strings
48//! appear in production code outside doc comments — adding any
49//! such call site will fail the build.
50//!
51//! # Cross-row tamper evidence (schema v34, #698 V-4 closeout)
52//!
53//! Since schema v34 the table carries TWO chain columns on top of
54//! each row's Ed25519 `signature`:
55//!
56//! - **`prev_hash BLOB`** — SHA-256 (32 bytes) over the canonical-
57//!   bytes encoding of the PRECEDING row (see
58//!   [`canonical_chain_bytes`]). First row gets 32 zero bytes.
59//! - **`sequence INTEGER`** — monotonically-increasing rank starting
60//!   at 1, pinned by a `UNIQUE INDEX`.
61//!
62//! Together they form a SQL-side hash chain mirroring the JSONL
63//! property in [`crate::audit`]. A `DELETE` of row N still passes
64//! per-row signature verification on the surviving rows individually,
65//! BUT row N+1's stored `prev_hash` will no longer match the
66//! recomputed digest of the (now-missing) row N — the chain break is
67//! detected at row N+1 by [`verify_chain`]. An `UPDATE` of any
68//! column included in the canonical-bytes encoding propagates the
69//! same way. A tampered `sequence` breaks the contiguity check.
70//!
71//! The cross-row chain is the LOAD-BEARING property; per-row Ed25519
72//! signatures (the existing `signature` column) remain as defense-in-
73//! depth.
74//!
75//! ## Relationship to [`crate::audit`] (JSONL chain)
76//!
77//! The JSONL audit log under `<audit_dir>/audit.log` remains the
78//! cross-host portable evidence format with its own:
79//!
80//! - **Cross-line hash chain.** Each JSONL line carries `prev_hash`
81//!   pointing to the prior line's `self_hash`; `ai-memory audit
82//!   verify` recomputes the chain and exits non-zero on mismatch.
83//! - **Monotonic sequence.** F2 (v0.7.0 round-2) wired the sequence
84//!   counter to survive process restart so SIEMs detect dropped
85//!   lines even before the chain check.
86//! - **Append-only OS hint.** Best-effort `chflags(2)` /
87//!   `FS_IOC_SETFLAGS`.
88//!
89//! The SQL chain (this module) is the daemon-local property; the
90//! JSONL chain is the portable evidence the daemon hands off to a
91//! SIEM. They are complementary, not redundant.
92//!
93//! ## When to use which surface
94//!
95//! | Question | Surface |
96//! |---|---|
97//! | "What did this signed link's bytes look like at write time?" | `signed_events.payload_hash` (binds canonical CBOR) |
98//! | "Was the SQL substrate tampered with between `T0` and `T1`?" | `signed_events.prev_hash` + `signed_events.sequence` via [`verify_chain`] |
99//! | "Was the on-disk audit log truncated?" | `audit.rs` JSONL chain |
100//! | "Did the same key issue the create and the invalidate?" | `signed_events.signature` on both rows |
101//!
102//! # Out of scope
103//!
104//! - H4 (`memory_verify` MCP tool, `attest_level` enum surfacing).
105//! - H6 (end-to-end test of the immutable chain).
106
107use anyhow::{Context, Result};
108use rusqlite::{Connection, params};
109use sha2::{Digest, Sha256};
110
111/// Tracing target for signed-events chain emission/verification log
112/// lines across the storage backends (#1558 tracing-target SSOT).
113/// Distinct from the `signed_events` SQL table name, which stays
114/// embedded in the SQL strings that reference it.
115pub(crate) const SIGNED_EVENTS_TRACE_TARGET: &str = "signed_events";
116
117// ---------------------------------------------------------------------------
118// v0.7.0 multi-agent literal-sweep (scanner B finding F-B9.x) —
119// canonical signed_events event-type slugs.
120//
121// Pre-sweep, `event_type` strings were scattered across ~14
122// production sites as inline literals:
123//   - "memory_link.created"               (storage::create_link x2, cli::verify, cli::verify_signed_events)
124//   - "memory_link.invalidated"           (storage::reflection invalidation)
125//   - "memory.stored"                     (cli::verify substrate writer)
126//   - "memory.touch"                      (cli::verify touch path)
127//   - "reflection.invalidation_notified"  (notification::invalidation)
128//   - "reflection.depth_exceeded"         (cli::doctor)
129//   - "reflects_on.cycle_refused"         (mcp::link)
130//   - "skill.exported"                    (mcp::skill_export)
131//   - "skill.registered"                  (mcp::skill_register)
132//   - "memory_capture_turn"               (mcp::capture_turn, RFC-0001 L4)
133//   - "persona_generated"                 (persona::generate)
134//   - "atomisation_complete"              (atomisation::run)
135//
136// Renaming any slug — or adding a new one without grep-ing every
137// candidate writer site — was a substrate-wide search. The
138// `event_types` module below centralises every slug as a `&'static
139// str` const so a rename is one edit + the writer + downstream
140// auditor + replay-tool callsites get the new value mechanically.
141// Mirrors the `errors::error_codes` pattern (ARCH-9 / FX-C4-batch2).
142// ---------------------------------------------------------------------------
143
144#[allow(dead_code)]
145pub mod event_types {
146    /// `signed_events.event_type` for `db::create_link` /
147    /// `db::create_link_signed` writes (the canonical link-write audit
148    /// emission). 4 production callsites pre-sweep: 2 in
149    /// `src/storage/mod.rs` + 1 in `src/cli/verify.rs` + 1 in
150    /// `src/cli/verify_signed_events.rs`.
151    pub const MEMORY_LINK_CREATED: &str = "memory_link.created";
152
153    /// `signed_events.event_type` for link invalidation via the L2-3
154    /// reflection-invalidation walker (`src/storage/mod.rs::5257`).
155    pub const MEMORY_LINK_INVALIDATED: &str = "memory_link.invalidated";
156
157    /// `signed_events.event_type` for substrate-side memory inserts
158    /// (`src/cli/verify.rs::933`).
159    pub const MEMORY_STORED: &str = "memory.stored";
160
161    /// `signed_events.event_type` for substrate-side touch path
162    /// (`src/cli/verify.rs::970`).
163    pub const MEMORY_TOUCH: &str = "memory.touch";
164
165    /// `signed_events.event_type` for notification fan-out by the L2-3
166    /// reflection-invalidation walker
167    /// (`src/notification/invalidation.rs::278`).
168    pub const REFLECTION_INVALIDATION_NOTIFIED: &str = "reflection.invalidation_notified";
169
170    /// `signed_events.event_type` for the recursive-learning depth-cap
171    /// trip (`src/cli/doctor.rs::1904`). v0.7.0 #655 Task 1/8.
172    pub const REFLECTION_DEPTH_EXCEEDED: &str = "reflection.depth_exceeded";
173
174    /// `signed_events.event_type` for the `reflects_on` cycle refusal
175    /// at `src/mcp/tools/link.rs::199` (v0.7.0 #655 Task 1/8 — cycle
176    /// detection on reflection chain).
177    pub const REFLECTS_ON_CYCLE_REFUSED: &str = "reflects_on.cycle_refused";
178
179    /// `signed_events.event_type` for skill-export emission
180    /// (`src/mcp/tools/skill_export.rs::244`).
181    pub const SKILL_EXPORTED: &str = "skill.exported";
182
183    /// `signed_events.event_type` for skill-register emission
184    /// (`src/mcp/tools/skill_register.rs::226`).
185    pub const SKILL_REGISTERED: &str = "skill.registered";
186
187    /// `signed_events.event_type` for the L4 `memory_capture_turn`
188    /// MCP tool emission per RFC-0001 (`src/mcp/tools/capture_turn.rs::472`).
189    pub const MEMORY_CAPTURE_TURN: &str = "memory_capture_turn";
190
191    /// `signed_events.event_type` for persona regeneration
192    /// (`src/persona/mod.rs::787`). v0.7.0 QW-2 persona-as-artifact.
193    pub const PERSONA_GENERATED: &str = "persona_generated";
194
195    /// `signed_events.event_type` for WT-1-C atomisation completion
196    /// (`src/atomisation/mod.rs::943`). v0.7.0 Batman Form 1-4
197    /// atomisation engine.
198    pub const ATOMISATION_COMPLETE: &str = "atomisation_complete";
199
200    /// `signed_events.event_type` for an outbound federation credential
201    /// renewal — emitted by the file-refresh renewal worker
202    /// (`src/federation/identity/renewal.rs`) on every tick that swaps a
203    /// freshly-issued credential into the live send path
204    /// ([`crate::federation::identity::renewal::RenewalOutcome::Reloaded`]).
205    /// This is the FED-P4-f §8 audit obligation: the node's own
206    /// credential lifecycle is recorded in the tamper-evident chain.
207    /// Issuance (`FederationIssuer`) is centrally operated and revocation
208    /// is by self-expiry (no CRL — see `issuer.rs`), so renewal is the
209    /// only node-local lifecycle transition there is to audit.
210    pub const FED_CREDENTIAL_RENEWED: &str = "federation.credential_renewed";
211
212    /// `signed_events.event_type` for operator-authorized governance-rule
213    /// deletion via `ai-memory rules remove --sign`
214    /// (`src/governance/rules_store.rs::remove_signed`). Closes the
215    /// audit gap where a `DELETE FROM governance_rules` left no
216    /// tamper-evident trace of WHICH rule the operator removed; the
217    /// emitted row's `payload_hash` is the SHA-256 over the removed
218    /// rule's canonical signing bytes and `signature` is the operator's
219    /// Ed25519 signature over that hash.
220    pub const GOVERNANCE_RULE_REMOVED: &str = "governance.rule_removed";
221}
222
223/// One row of the `signed_events` audit table.
224///
225/// `id` is a UUIDv4 minted by the writer; `payload_hash` is the
226/// 32-byte SHA-256 over the canonical-CBOR bytes that H2 hashed for
227/// the original signature; `signature` mirrors the source row's
228/// `memory_links.signature` (NULL when the source write was
229/// unsigned).
230///
231/// `prev_hash` and `sequence` are populated by
232/// [`append_signed_event`] (writer fills them from the current chain
233/// head — callers MUST NOT set them) and by [`row_to_event`] on read
234/// (selecting back rows from the table).
235#[derive(Debug, Clone, PartialEq, Eq, Default)]
236pub struct SignedEvent {
237    pub id: String,
238    pub agent_id: String,
239    pub event_type: String,
240    pub payload_hash: Vec<u8>,
241    pub signature: Option<Vec<u8>>,
242    pub attest_level: String,
243    pub timestamp: String,
244    /// v34 — SHA-256 (32 bytes) over the canonical-bytes encoding of
245    /// the preceding row, or 32 zero bytes for the first row. Filled
246    /// by [`append_signed_event`] at insert time; callers MUST NOT
247    /// pre-populate this field — any value set by the caller is
248    /// ignored. Use `..SignedEvent::default()` at the struct-literal
249    /// tail to leave this empty.
250    pub prev_hash: Vec<u8>,
251    /// v34 — monotonically-increasing chain rank starting at 1.
252    /// Filled by [`append_signed_event`] at insert time; callers MUST
253    /// NOT pre-populate this field — any value set by the caller is
254    /// ignored. Use `..SignedEvent::default()` at the struct-literal
255    /// tail to leave this zero.
256    pub sequence: i64,
257}
258
259impl SignedEvent {
260    /// v0.7.0 #1099 (SR-1 #4, HIGH) — build a `SignedEvent` that
261    /// consults the process-wide daemon audit signing key (installed
262    /// at boot via [`crate::governance::audit::init`]) and applies it
263    /// to `payload_hash`. When a key is installed, the returned row
264    /// carries `signature: Some(sig_bytes)` + `attest_level:
265    /// "daemon_signed"`; when no key is installed, falls back to
266    /// `signature: None, attest_level: "unsigned"`.
267    ///
268    /// Homogenises every production audit-row writer (pending_action
269    /// approve/reject/timeout, federation.quota_refused on both
270    /// sqlite + postgres paths, governance.check) so a downstream
271    /// auditor sees per-row signatures matching the daemon's
272    /// `VerifyingKey` and the cross-row chain head together.
273    ///
274    /// The other chain columns (`prev_hash`, `sequence`) are left at
275    /// their defaults — [`append_signed_event`] fills them at INSERT
276    /// time. Callers MUST NOT pre-populate them.
277    #[must_use]
278    pub fn with_daemon_signature(
279        payload_hash: Vec<u8>,
280        agent_id: String,
281        event_type: String,
282        timestamp: String,
283    ) -> Self {
284        let (signature, attest_level) =
285            match crate::governance::audit::try_sign_audit_payload(&payload_hash) {
286                Some((sig_bytes, tag)) => (Some(sig_bytes), tag.to_string()),
287                None => (
288                    None,
289                    crate::models::AttestLevel::Unsigned.as_str().to_string(),
290                ),
291            };
292        SignedEvent {
293            id: uuid::Uuid::new_v4().to_string(),
294            agent_id,
295            event_type,
296            payload_hash,
297            signature,
298            attest_level,
299            timestamp,
300            ..SignedEvent::default()
301        }
302    }
303}
304
305/// All-zeros 32-byte digest used as `prev_hash` for the first row.
306pub const ZERO_HASH: [u8; 32] = [0u8; 32];
307
308/// Field separator for [`canonical_chain_bytes`]. ASCII 0x1F
309/// ("Unit Separator") — present in neither RFC3339 timestamps nor
310/// UUID strings nor the hex/base64 / raw-bytes payloads we encode,
311/// so concatenation is unambiguous without escaping.
312const FIELD_SEP: u8 = 0x1F;
313
314/// Canonical bytes used as the chain-hash input.
315///
316/// Commits to every column that identifies the row's content:
317/// `id || 0x1F || agent_id || 0x1F || event_type || 0x1F ||
318///  payload_hash || 0x1F || signature_or_empty || 0x1F ||
319///  attest_level || 0x1F || timestamp || 0x1F || sequence_be_8_bytes`.
320///
321/// Each row's `prev_hash` is `SHA-256(canonical_chain_bytes(prev_row))`,
322/// or [`ZERO_HASH`] for the first row. A future hash-agility
323/// migration can change the digest in one place; the encoding itself
324/// is byte-stable so an auditor can replay the chain from the stored
325/// columns alone.
326#[must_use]
327pub fn canonical_chain_bytes(event: &SignedEvent) -> Vec<u8> {
328    let mut out: Vec<u8> = Vec::with_capacity(
329        event.id.len()
330            + event.agent_id.len()
331            + event.event_type.len()
332            + event.payload_hash.len()
333            + event.signature.as_ref().map_or(0, Vec::len)
334            + event.attest_level.len()
335            + event.timestamp.len()
336            + 8
337            + 7, // seven separators
338    );
339    out.extend_from_slice(event.id.as_bytes());
340    out.push(FIELD_SEP);
341    out.extend_from_slice(event.agent_id.as_bytes());
342    out.push(FIELD_SEP);
343    out.extend_from_slice(event.event_type.as_bytes());
344    out.push(FIELD_SEP);
345    out.extend_from_slice(&event.payload_hash);
346    out.push(FIELD_SEP);
347    if let Some(sig) = event.signature.as_ref() {
348        out.extend_from_slice(sig);
349    }
350    // empty signature contributes zero bytes between separators —
351    // the separator on either side still pins the slot's position.
352    out.push(FIELD_SEP);
353    out.extend_from_slice(event.attest_level.as_bytes());
354    out.push(FIELD_SEP);
355    out.extend_from_slice(event.timestamp.as_bytes());
356    out.push(FIELD_SEP);
357    out.extend_from_slice(&event.sequence.to_be_bytes());
358    out
359}
360
361/// Read the chain head — `(max_sequence, prev_canonical_hash)`.
362///
363/// Returns `(0, ZERO_HASH)` for an empty table. The "previous"
364/// canonical hash is the SHA-256 over the canonical bytes of the
365/// row with the highest sequence; the next inserted row's
366/// `prev_hash` is exactly this value.
367///
368/// # Cluster-C COR-9 (issue #767): NULL-sequence diagnostic
369///
370/// Prior to this fix the query was
371/// `SELECT … COALESCE(sequence, 0) … ORDER BY COALESCE(sequence, 0) DESC`,
372/// which silently treated a row whose `sequence IS NULL` (a
373/// pre-v34 row that the v34 backfill missed) as sequence == 0 and
374/// would then issue `next_seq = 1`, colliding with the legitimately-
375/// backfilled first row on the UNIQUE index. The mask hid a real
376/// migration-needed state behind a misleading SQLITE_CONSTRAINT_UNIQUE.
377///
378/// We now restrict the head SELECT to `WHERE sequence IS NOT NULL`,
379/// and issue a SEPARATE diagnostic SELECT that asserts no
380/// `sequence IS NULL` rows exist. If any are found post-v34 the
381/// function returns a clear error and emits `tracing::error!` so an
382/// operator can run `ai-memory migrate` (or invoke
383/// `migrate_v34_backfill_chain` directly) to repair the chain.
384fn read_chain_head(conn: &Connection) -> Result<(i64, [u8; 32])> {
385    // COR-9 diagnostic: hard-fail on NULL-sequence rows so a
386    // partially-migrated DB never silently produces a duplicate
387    // sequence. The v34 backfill is idempotent — operators recovering
388    // from this error re-run the migration ladder.
389    let null_seq_count: i64 = conn
390        .query_row(
391            "SELECT COUNT(*) FROM signed_events WHERE sequence IS NULL",
392            [],
393            |row| row.get(0),
394        )
395        .context("read_chain_head: null-sequence diagnostic")?;
396    if null_seq_count > 0 {
397        tracing::error!(
398            null_sequence_rows = null_seq_count,
399            "signed_events: found {null_seq_count} row(s) with sequence IS NULL — \
400             v34 chain backfill is incomplete. Re-run the migration ladder \
401             (`ai-memory migrate` or restart with the current binary) to \
402             stamp prev_hash + sequence on the unmigrated rows; refusing to \
403             append further audit rows until the chain is repaired."
404        );
405        anyhow::bail!(
406            "read_chain_head: {null_seq_count} signed_events row(s) have sequence IS NULL — \
407             v34 chain backfill incomplete; re-run `ai-memory migrate`"
408        );
409    }
410
411    // Pull the column shape that `canonical_chain_bytes` needs,
412    // ordered by sequence DESC so the head is the first row. Restrict
413    // to non-NULL sequences (the diagnostic above guarantees this
414    // matches every row, but the WHERE makes the read itself robust
415    // against a TOCTOU window if a concurrent unmigrated INSERT
416    // landed between the diagnostic and this query).
417    let mut stmt = conn
418        .prepare(
419            "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
420                    timestamp, sequence \
421             FROM signed_events \
422             WHERE sequence IS NOT NULL \
423             ORDER BY sequence DESC, rowid DESC \
424             LIMIT 1",
425        )
426        .context("read_chain_head: prepare")?;
427    let head: Option<SignedEvent> = stmt
428        .query_map([], |row| {
429            Ok(SignedEvent {
430                id: row.get(0)?,
431                agent_id: row.get(1)?,
432                event_type: row.get(2)?,
433                payload_hash: row.get(3)?,
434                signature: row.get(4)?,
435                attest_level: row.get(5)?,
436                timestamp: row.get(6)?,
437                sequence: row.get(7)?,
438                prev_hash: Vec::new(), // not part of the canonical bytes
439            })
440        })
441        .context("read_chain_head: query_map")?
442        .next()
443        .transpose()
444        .context("read_chain_head: collect")?;
445    match head {
446        None => Ok((0, ZERO_HASH)),
447        Some(prev) => {
448            let max_seq = prev.sequence;
449            let canon = canonical_chain_bytes(&prev);
450            let mut hasher = Sha256::new();
451            hasher.update(&canon);
452            let mut digest = [0u8; 32];
453            digest.copy_from_slice(&hasher.finalize());
454            Ok((max_seq, digest))
455        }
456    }
457}
458
459/// Outcome of a [`verify_chain`] pass over the `signed_events` table.
460///
461/// `rows_checked` counts every row the verifier walked.
462/// `chain_break` is `Some(sequence)` when the FIRST detected break
463/// happens — that row's stored `prev_hash` does not equal
464/// SHA-256(canonical_chain_bytes(row N-1)), OR the row's `sequence`
465/// is not the expected `prior + 1` (gap / duplicate / non-monotonic
466/// jump). `signature_failures` records sequences whose Ed25519
467/// signature did not verify against the supplied key set — the
468/// chain itself may still be intact even if individual signatures
469/// fail (defense-in-depth split).
470#[derive(Debug, Clone, PartialEq, Eq)]
471pub struct ChainVerificationReport {
472    pub rows_checked: u64,
473    pub chain_break: Option<i64>,
474    pub signature_failures: Vec<i64>,
475}
476
477impl ChainVerificationReport {
478    /// `true` when the cross-row chain held end-to-end. Per-row
479    /// signature failures are surfaced separately because they are a
480    /// disjoint property (a chain break is structurally worse than a
481    /// signature failure).
482    #[must_use]
483    pub fn chain_holds(&self) -> bool {
484        self.chain_break.is_none()
485    }
486}
487
488/// Walk all rows in `sequence` order and verify the cross-row chain
489/// + per-row signatures.
490///
491/// For each row:
492///   1. Check `sequence == prior + 1` (first row: `sequence == 1`).
493///   2. Recompute SHA-256 over [`canonical_chain_bytes`] of the
494///      preceding row (or [`ZERO_HASH`] for the first row), compare
495///      to the current row's stored `prev_hash`.
496///   3. Verify the Ed25519 signature (when present) over the row's
497///      `payload_hash`. The verifying-key resolver is provided by
498///      the caller; pass `None` to skip signature verification (the
499///      chain check is still performed).
500///
501/// On a chain break (step 1 or step 2 fail) the verifier records
502/// the breaking row's `sequence` and continues so the caller still
503/// gets per-row signature stats over the rest of the table.
504///
505/// # Errors
506///
507/// Returns the underlying `rusqlite` error if the SELECT or any row
508/// decode fails. A clean (chain held + zero signature failures)
509/// report is returned as `Ok`; the caller checks
510/// [`ChainVerificationReport::chain_holds`] for the chain bit.
511pub fn verify_chain(
512    conn: &Connection,
513    since_sequence: Option<i64>,
514) -> Result<ChainVerificationReport> {
515    let lower = since_sequence.unwrap_or(0);
516    let mut stmt = conn
517        .prepare(
518            "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
519                    timestamp, prev_hash, COALESCE(sequence, 0) \
520             FROM signed_events \
521             WHERE COALESCE(sequence, 0) > ?1 \
522             ORDER BY COALESCE(sequence, 0) ASC",
523        )
524        .context("verify_chain: prepare")?;
525    let mut rows = stmt.query(params![lower]).context("verify_chain: query")?;
526
527    let mut rows_checked: u64 = 0;
528    let mut chain_break: Option<i64> = None;
529    // v0.7.0 #1071 (SR-2 #1, HIGH) — actually walk Ed25519 signatures.
530    // Resolved once outside the loop so we don't re-lock the OnceLock
531    // per row. Returns `None` when the daemon process has no audit
532    // signing key installed (i.e. `governance::audit::init` was called
533    // with `signing_key: None`); in that case we record no signature
534    // failures because there is no key to verify against — the
535    // verifier is structure-only on an unsigned daemon.
536    let verifier: Option<ed25519_dalek::VerifyingKey> =
537        crate::governance::audit::resolve_daemon_verifying_key();
538    let mut signature_failures: Vec<i64> = Vec::new();
539
540    let mut expected_seq = lower + 1;
541    let mut prev_canonical_hash: [u8; 32] = ZERO_HASH;
542    // When resuming with `since_sequence`, the prior row's canonical
543    // hash must be recomputed from the row immediately before `lower`
544    // so the chain check at `lower + 1` lines up.
545    if lower > 0 {
546        let mut head_stmt = conn
547            .prepare(
548                "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
549                        timestamp, COALESCE(sequence, 0) \
550                 FROM signed_events \
551                 WHERE COALESCE(sequence, 0) = ?1",
552            )
553            .context("verify_chain: prepare head")?;
554        let head: Option<SignedEvent> = head_stmt
555            .query_map(params![lower], |row| {
556                Ok(SignedEvent {
557                    id: row.get(0)?,
558                    agent_id: row.get(1)?,
559                    event_type: row.get(2)?,
560                    payload_hash: row.get(3)?,
561                    signature: row.get(4)?,
562                    attest_level: row.get(5)?,
563                    timestamp: row.get(6)?,
564                    sequence: row.get(7)?,
565                    prev_hash: Vec::new(),
566                })
567            })
568            .context("verify_chain: head query")?
569            .next()
570            .transpose()
571            .context("verify_chain: head collect")?;
572        if let Some(h) = head {
573            let canon = canonical_chain_bytes(&h);
574            let mut hasher = Sha256::new();
575            hasher.update(&canon);
576            prev_canonical_hash.copy_from_slice(&hasher.finalize());
577        }
578    }
579
580    while let Some(row) = rows.next().context("verify_chain: next row")? {
581        rows_checked += 1;
582        let event = SignedEvent {
583            id: row.get(0).context("verify_chain: id")?,
584            agent_id: row.get(1).context("verify_chain: agent_id")?,
585            event_type: row.get(2).context("verify_chain: event_type")?,
586            payload_hash: row.get(3).context("verify_chain: payload_hash")?,
587            signature: row.get(4).context("verify_chain: signature")?,
588            attest_level: row.get(5).context("verify_chain: attest_level")?,
589            timestamp: row.get(6).context("verify_chain: timestamp")?,
590            prev_hash: row
591                .get::<_, Option<Vec<u8>>>(7)
592                .context("verify_chain: prev_hash")?
593                .unwrap_or_default(),
594            sequence: row.get(8).context("verify_chain: sequence")?,
595        };
596
597        // (1) Sequence contiguity.
598        if event.sequence != expected_seq {
599            if chain_break.is_none() {
600                chain_break = Some(event.sequence);
601            }
602            // Keep walking so we still count rows + can later add
603            // signature-failure tracking; but realign expected_seq to
604            // the row we read so subsequent rows aren't ALL flagged.
605            expected_seq = event.sequence;
606        }
607
608        // (2) prev_hash chain.
609        if event.prev_hash.len() != 32 || event.prev_hash != prev_canonical_hash {
610            if chain_break.is_none() {
611                chain_break = Some(event.sequence);
612            }
613        }
614
615        // (3) v0.7.0 #1071 — per-row Ed25519 signature verification.
616        // Only fires when a verifier is resolvable AND the row has a
617        // signature blob attached (rows written before #1035 / #1099
618        // and the legacy `attest_level == "unsigned"` rows that ship
619        // by design have `signature: None` and are skipped). On
620        // shape mismatch (signature blob not 64 bytes) or
621        // `verify_strict` failure, push the breaking sequence onto
622        // `signature_failures` and continue the walk — a signature
623        // failure is disjoint from a chain break by design (per-row
624        // forgery vs. across-row substrate tamper).
625        if let Some(vk) = verifier.as_ref() {
626            match event.signature.as_ref() {
627                Some(sig_bytes) if !sig_bytes.is_empty() => {
628                    let sig_array: Option<[u8; 64]> = sig_bytes.as_slice().try_into().ok();
629                    match sig_array {
630                        Some(arr) => {
631                            let sig = ed25519_dalek::Signature::from_bytes(&arr);
632                            if vk.verify_strict(&event.payload_hash, &sig).is_err() {
633                                signature_failures.push(event.sequence);
634                            }
635                        }
636                        None => {
637                            // Malformed signature length — record as failure.
638                            signature_failures.push(event.sequence);
639                        }
640                    }
641                }
642                // #1452 (SEC, HIGH) — fail-closed on a missing signature
643                // when a verifier IS installed. A row whose `attest_level`
644                // is anything other than the by-design legacy `"unsigned"`
645                // marker, but which carries no signature blob (None or an
646                // empty vec), is a stripped / never-signed row on a daemon
647                // that is supposed to be signing — record it as a
648                // signature failure rather than silently skipping it.
649                // Legitimately-unsigned (`attest_level == "unsigned"`)
650                // legacy rows remain skip-by-design.
651                _ => {
652                    if event.attest_level != crate::models::AttestLevel::Unsigned.as_str() {
653                        signature_failures.push(event.sequence);
654                    }
655                }
656            }
657        }
658
659        // Recompute the canonical hash for the NEXT iteration.
660        let canon = canonical_chain_bytes(&event);
661        let mut hasher = Sha256::new();
662        hasher.update(&canon);
663        prev_canonical_hash.copy_from_slice(&hasher.finalize());
664
665        expected_seq += 1;
666    }
667
668    Ok(ChainVerificationReport {
669        rows_checked,
670        chain_break,
671        signature_failures,
672    })
673}
674
675/// SHA-256 helper. Centralised so every audit-row producer commits
676/// to the same digest; a future hash-agility migration changes one
677/// line here, not every call site.
678#[must_use]
679pub fn payload_hash(bytes: &[u8]) -> Vec<u8> {
680    let mut hasher = Sha256::new();
681    hasher.update(bytes);
682    hasher.finalize().to_vec()
683}
684
685/// Append a single audit row.
686///
687/// INSERT-only. There is no companion `update_signed_event` or
688/// `delete_signed_event` — the append-only invariant is enforced at
689/// the API surface, not via a SQLite trigger (a trigger would also
690/// block the documented operator-driven pruning escape hatch).
691///
692/// # Errors
693///
694/// Returns the underlying `rusqlite` error if the INSERT fails
695/// (typically a duplicate UUIDv4 — vanishingly rare but surfaced
696/// rather than ignored so the audit chain never silently drops a
697/// row).
698pub fn append_signed_event(conn: &Connection, event: &SignedEvent) -> Result<()> {
699    // v34 (#698 V-4 closeout) / Cluster-C SEC-3 (issue #767): compute
700    // chain head + INSERT in a single IMMEDIATE transaction so the
701    // (read MAX(sequence), INSERT new row) pair is atomic against
702    // concurrent writers on the same connection mutex.
703    //
704    // SQLite serializes write transactions, but a concurrent
705    // BEGIN IMMEDIATE on a sibling connection that beats us to the
706    // wal-write lock would otherwise let us read a stale head and
707    // then INSERT a duplicate sequence. The UNIQUE INDEX on
708    // `sequence` (idx_signed_events_sequence) makes the worst case
709    // a `SQLITE_CONSTRAINT_UNIQUE` error from this fn — the chain
710    // never silently breaks even on race. Callers that batch
711    // appends through the project's `Arc<Mutex<Connection>>` pool
712    // see no contention; we still wrap in IMMEDIATE for correctness
713    // under multi-connection deployments (the deferred-audit
714    // drainer opens its own connection on the same DB file).
715    //
716    // SEC-3 specifically: `rusqlite::Connection::unchecked_transaction`
717    // defaults to BEGIN DEFERRED — the prior comment claiming
718    // IMMEDIATE was a bug that let two writers on sibling
719    // connections both read a stale chain head, with one losing the
720    // INSERT race to `SQLITE_CONSTRAINT_UNIQUE` and (when invoked
721    // through the deferred-audit drainer) silently dropping the
722    // audit row. We now use `transaction_with_behavior(Immediate)`
723    // to grab the wal-write lock at BEGIN time so the read-then-
724    // INSERT pair is serialized at the SQLite layer.
725    let tx = rusqlite::Transaction::new_unchecked(conn, rusqlite::TransactionBehavior::Immediate)
726        .context("append signed_event: begin IMMEDIATE tx")?;
727    append_signed_event_no_tx(&tx, event)?;
728    tx.commit().context("append signed_event: commit tx")?;
729    Ok(())
730}
731
732/// Append a signed event using the caller's already-open transaction.
733///
734/// Use this when the caller is mid-transaction (e.g.
735/// `invalidate_link` after its `BEGIN IMMEDIATE` for the UPDATE +
736/// audit-INSERT atom). The public `append_signed_event` wrapper
737/// adds its own IMMEDIATE tx; calling that from inside a wrapping
738/// tx fails on SQLite (nested transactions are not supported on
739/// the same connection). This variant takes a `Connection`-like
740/// reference (works for both `Connection` and `Transaction`) and
741/// inserts directly.
742///
743/// # Errors
744///
745/// Returns the underlying `rusqlite` error if the chain-head read
746/// fails or the INSERT itself fails. Callers MUST rollback their
747/// own transaction on error.
748pub fn append_signed_event_no_tx(conn: &Connection, event: &SignedEvent) -> Result<()> {
749    let (max_seq, prev_hash) = read_chain_head(conn).context("append signed_event: read head")?;
750    let next_seq = max_seq + 1;
751    conn.execute(
752        "INSERT INTO signed_events \
753            (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
754             prev_hash, sequence) \
755         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
756        params![
757            event.id,
758            event.agent_id,
759            event.event_type,
760            event.payload_hash,
761            event.signature,
762            event.attest_level,
763            event.timestamp,
764            prev_hash.to_vec(),
765            next_seq,
766        ],
767    )
768    .context("append signed_event")?;
769    Ok(())
770}
771
772/// Read-only listing.
773///
774/// When `agent_id` is `Some`, restricts to that agent's events;
775/// when `None`, returns every agent's events. Ordering is
776/// `timestamp ASC, id ASC` so callers iterating with
777/// `(limit, offset)` see a stable sequence even when two events
778/// share the same RFC3339 second-precision timestamp (the `id`
779/// tiebreaker keeps the order deterministic across calls).
780///
781/// # Errors
782///
783/// Returns the underlying `rusqlite` error if the query or row
784/// decode fails.
785pub fn list_signed_events(
786    conn: &Connection,
787    agent_id: Option<&str>,
788    limit: usize,
789    offset: usize,
790) -> Result<Vec<SignedEvent>> {
791    let limit_i64 = i64::try_from(limit).unwrap_or(i64::MAX);
792    let offset_i64 = i64::try_from(offset).unwrap_or(0);
793    if let Some(agent) = agent_id {
794        let mut stmt = conn.prepare(
795            "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
796                    prev_hash, COALESCE(sequence, 0) \
797             FROM signed_events \
798             WHERE agent_id = ?1 \
799             ORDER BY timestamp ASC, id ASC \
800             LIMIT ?2 OFFSET ?3",
801        )?;
802        let rows = stmt.query_map(params![agent, limit_i64, offset_i64], row_to_event)?;
803        rows.collect::<rusqlite::Result<Vec<_>>>()
804            .map_err(Into::into)
805    } else {
806        let mut stmt = conn.prepare(
807            "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
808                    prev_hash, COALESCE(sequence, 0) \
809             FROM signed_events \
810             ORDER BY timestamp ASC, id ASC \
811             LIMIT ?1 OFFSET ?2",
812        )?;
813        let rows = stmt.query_map(params![limit_i64, offset_i64], row_to_event)?;
814        rows.collect::<rusqlite::Result<Vec<_>>>()
815            .map_err(Into::into)
816    }
817}
818
819fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<SignedEvent> {
820    Ok(SignedEvent {
821        id: row.get(0)?,
822        agent_id: row.get(1)?,
823        event_type: row.get(2)?,
824        payload_hash: row.get(3)?,
825        signature: row.get(4)?,
826        attest_level: row.get(5)?,
827        timestamp: row.get(6)?,
828        prev_hash: row.get::<_, Option<Vec<u8>>>(7)?.unwrap_or_default(),
829        sequence: row.get(8)?,
830    })
831}
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836    use chrono::Utc;
837    use rusqlite::Connection;
838    use uuid::Uuid;
839
840    /// In-memory connection with the v25 schema applied. We don't go
841    /// through `db::open` (which carries the full migration ladder
842    /// + WAL / FK PRAGMAs) so the unit test stays focused on the
843    /// `signed_events` table itself.
844    fn fresh_db() -> Connection {
845        let conn = Connection::open_in_memory().expect("in-memory db");
846        conn.execute_batch(include_str!(
847            "../migrations/sqlite/0020_v07_signed_events.sql"
848        ))
849        .expect("apply v25 migration");
850        conn
851    }
852
853    fn fixture(agent: &str, event_type: &str) -> SignedEvent {
854        SignedEvent {
855            id: Uuid::new_v4().to_string(),
856            agent_id: agent.to_string(),
857            event_type: event_type.to_string(),
858            payload_hash: payload_hash(b"test-payload"),
859            signature: None,
860            attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
861            timestamp: Utc::now().to_rfc3339(),
862            // prev_hash + sequence are overwritten by
863            // append_signed_event; the caller-side values here are
864            // placeholders so the struct constructs.
865            prev_hash: Vec::new(),
866            sequence: 0,
867        }
868    }
869
870    #[test]
871    fn migration_is_idempotent() {
872        // Re-applying the migration must be a no-op — it's the
873        // contract `db::migrate` relies on (every step uses
874        // `CREATE TABLE IF NOT EXISTS` + `CREATE INDEX IF NOT
875        // EXISTS`).
876        let conn = fresh_db();
877        conn.execute_batch(include_str!(
878            "../migrations/sqlite/0020_v07_signed_events.sql"
879        ))
880        .expect("re-apply v25 migration");
881        // Append still works after the re-run.
882        let event = fixture("alice", "memory_link.created");
883        append_signed_event(&conn, &event).expect("append after re-migration");
884    }
885
886    #[test]
887    fn append_then_list_round_trip() {
888        let conn = fresh_db();
889        let event = fixture("alice", "memory_link.created");
890        append_signed_event(&conn, &event).expect("append");
891        let listed = list_signed_events(&conn, Some("alice"), 10, 0).expect("list");
892        assert_eq!(listed.len(), 1);
893        // The caller-side fixture's prev_hash/sequence are
894        // placeholders — append_signed_event overwrites them with
895        // (ZERO_HASH, 1) for a fresh table. Compare every caller-
896        // controlled field individually, then assert the chain
897        // columns were populated by the writer.
898        assert_eq!(listed[0].id, event.id);
899        assert_eq!(listed[0].agent_id, event.agent_id);
900        assert_eq!(listed[0].event_type, event.event_type);
901        assert_eq!(listed[0].payload_hash, event.payload_hash);
902        assert_eq!(listed[0].signature, event.signature);
903        assert_eq!(listed[0].attest_level, event.attest_level);
904        assert_eq!(listed[0].timestamp, event.timestamp);
905        assert_eq!(listed[0].prev_hash, ZERO_HASH.to_vec());
906        assert_eq!(listed[0].sequence, 1);
907    }
908
909    #[test]
910    fn list_orders_by_timestamp_ascending() {
911        let conn = fresh_db();
912        // Three events for the same agent at distinct timestamps,
913        // inserted out of chronological order.
914        let mut a = fixture("alice", "memory_link.created");
915        a.timestamp = "2026-05-05T12:00:00+00:00".to_string();
916        let mut b = fixture("alice", "memory_link.created");
917        b.timestamp = "2026-05-05T12:00:01+00:00".to_string();
918        let mut c = fixture("alice", "memory_link.created");
919        c.timestamp = "2026-05-05T12:00:02+00:00".to_string();
920        append_signed_event(&conn, &b).unwrap();
921        append_signed_event(&conn, &c).unwrap();
922        append_signed_event(&conn, &a).unwrap();
923        let listed = list_signed_events(&conn, Some("alice"), 10, 0).expect("list");
924        let ts: Vec<&str> = listed.iter().map(|e| e.timestamp.as_str()).collect();
925        assert_eq!(
926            ts,
927            vec![
928                "2026-05-05T12:00:00+00:00",
929                "2026-05-05T12:00:01+00:00",
930                "2026-05-05T12:00:02+00:00",
931            ],
932            "rows must come back in ascending timestamp order"
933        );
934    }
935
936    #[test]
937    fn list_filters_by_agent() {
938        let conn = fresh_db();
939        append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
940        append_signed_event(&conn, &fixture("bob", "memory_link.created")).unwrap();
941        append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
942        let alice = list_signed_events(&conn, Some("alice"), 10, 0).unwrap();
943        let bob = list_signed_events(&conn, Some("bob"), 10, 0).unwrap();
944        let all = list_signed_events(&conn, None, 10, 0).unwrap();
945        assert_eq!(alice.len(), 2);
946        assert_eq!(bob.len(), 1);
947        assert_eq!(all.len(), 3);
948    }
949
950    #[test]
951    fn list_respects_limit_and_offset() {
952        let conn = fresh_db();
953        for i in 0..5 {
954            let mut e = fixture("alice", "memory_link.created");
955            e.timestamp = format!("2026-05-05T12:00:0{i}+00:00");
956            append_signed_event(&conn, &e).unwrap();
957        }
958        let page1 = list_signed_events(&conn, Some("alice"), 2, 0).unwrap();
959        let page2 = list_signed_events(&conn, Some("alice"), 2, 2).unwrap();
960        assert_eq!(page1.len(), 2);
961        assert_eq!(page2.len(), 2);
962        assert_ne!(page1[0].id, page2[0].id);
963    }
964
965    #[test]
966    fn append_preserves_signature_blob() {
967        let conn = fresh_db();
968        let mut event = fixture("alice", "memory_link.created");
969        event.signature = Some(vec![0xAA; 64]); // Ed25519 sig length
970        event.attest_level = "self_signed".to_string();
971        append_signed_event(&conn, &event).unwrap();
972        let listed = list_signed_events(&conn, Some("alice"), 10, 0).unwrap();
973        assert_eq!(listed[0].signature.as_deref(), Some(&[0xAA; 64][..]));
974        assert_eq!(listed[0].attest_level, "self_signed");
975    }
976
977    #[test]
978    fn indexes_exist_on_documented_columns() {
979        // PRAGMA index_list returns one row per index on a table.
980        // We assert each documented index is present so a future
981        // migration that drops one of them fails this test.
982        let conn = fresh_db();
983        let mut stmt = conn.prepare("PRAGMA index_list('signed_events')").unwrap();
984        let names: Vec<String> = stmt
985            .query_map([], |row| row.get::<_, String>(1))
986            .unwrap()
987            .collect::<rusqlite::Result<Vec<_>>>()
988            .unwrap();
989        assert!(
990            names.iter().any(|n| n == "idx_signed_events_agent"),
991            "missing idx_signed_events_agent in {names:?}"
992        );
993        assert!(
994            names.iter().any(|n| n == "idx_signed_events_type"),
995            "missing idx_signed_events_type in {names:?}"
996        );
997        assert!(
998            names.iter().any(|n| n == "idx_signed_events_timestamp"),
999            "missing idx_signed_events_timestamp in {names:?}"
1000        );
1001        assert!(
1002            names.iter().any(|n| n == "idx_signed_events_sequence"),
1003            "missing idx_signed_events_sequence in {names:?} \
1004             — v34 (V-4 closeout, #698) requires a UNIQUE index on \
1005             the cross-row chain sequence column"
1006        );
1007    }
1008
1009    #[test]
1010    fn payload_hash_is_sha256_32_bytes() {
1011        let h = payload_hash(b"hello world");
1012        assert_eq!(h.len(), 32, "SHA-256 digest is 32 bytes");
1013        // Stable across calls.
1014        assert_eq!(h, payload_hash(b"hello world"));
1015        // Sensitive to input.
1016        assert_ne!(h, payload_hash(b"hello worle"));
1017    }
1018
1019    // -----------------------------------------------------------------
1020    // L0.7-2 Tier A — error paths + empty / boundary coverage
1021    // -----------------------------------------------------------------
1022
1023    #[test]
1024    fn append_duplicate_id_returns_error() {
1025        let conn = fresh_db();
1026        let mut a = fixture("alice", "memory_link.created");
1027        append_signed_event(&conn, &a).expect("first append ok");
1028        // Re-use the exact same id — the PRIMARY KEY on `id` rejects
1029        // the second INSERT, exercising the .context("append signed_event")
1030        // error path.
1031        a.timestamp = Utc::now().to_rfc3339();
1032        let err = append_signed_event(&conn, &a).expect_err("second append with same id must fail");
1033        assert!(
1034            format!("{err:?}").contains("append signed_event")
1035                || format!("{err:#}").contains("append signed_event"),
1036            "anyhow context should include the 'append signed_event' tag, got: {err:?}"
1037        );
1038    }
1039
1040    #[test]
1041    fn list_signed_events_empty_db_returns_empty() {
1042        let conn = fresh_db();
1043        let alice = list_signed_events(&conn, Some("alice"), 10, 0).expect("list ok");
1044        let all = list_signed_events(&conn, None, 10, 0).expect("list ok");
1045        assert!(alice.is_empty());
1046        assert!(all.is_empty());
1047    }
1048
1049    #[test]
1050    fn list_signed_events_offset_past_end_returns_empty() {
1051        let conn = fresh_db();
1052        append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
1053        let beyond = list_signed_events(&conn, Some("alice"), 10, 100).expect("list ok");
1054        assert!(beyond.is_empty());
1055    }
1056
1057    #[test]
1058    fn list_signed_events_no_agent_filter_returns_all_agents() {
1059        let conn = fresh_db();
1060        append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
1061        append_signed_event(&conn, &fixture("bob", "memory_link.created")).unwrap();
1062        append_signed_event(&conn, &fixture("carol", "memory_link.created")).unwrap();
1063        let all = list_signed_events(&conn, None, 10, 0).expect("list ok");
1064        let agents: std::collections::HashSet<&str> =
1065            all.iter().map(|e| e.agent_id.as_str()).collect();
1066        assert_eq!(agents.len(), 3);
1067    }
1068
1069    #[test]
1070    fn payload_hash_known_vector() {
1071        // SHA-256 of the empty input must be the well-known constant
1072        // e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.
1073        let h = payload_hash(b"");
1074        let hex: String = h.iter().map(|b| format!("{b:02x}")).collect();
1075        assert_eq!(
1076            hex,
1077            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1078        );
1079    }
1080
1081    /// Append-only invariant: there's no public function to UPDATE
1082    /// or DELETE rows from `signed_events`, and no `UPDATE
1083    /// signed_events` / `DELETE FROM signed_events` SQL string
1084    /// appears in any *non-comment* source line under `src/`.
1085    ///
1086    /// The check strips Rust line comments (`//...`) and intra-line
1087    /// `/* ... */` blocks before searching, so the doc-comments in
1088    /// this module and in `db.rs` that *describe* the contract
1089    /// (and therefore must contain the forbidden phrases verbatim)
1090    /// don't trigger a false positive. A real SQL-string call site
1091    /// — `conn.execute("UPDATE signed_events SET ...", ...)` —
1092    /// would survive the comment strip and trip the assertion.
1093    ///
1094    /// # v34 migration backfill carve-out
1095    ///
1096    /// `src/storage/migrations.rs::migrate_v34_backfill_chain` and
1097    /// `src/store/postgres.rs::migrate_v33` each issue a
1098    /// `UPDATE signed_events SET prev_hash = ?, sequence = ?` to
1099    /// stamp the cross-row chain columns on pre-existing rows. These
1100    /// are migration-time one-shot updates against rows whose
1101    /// `sequence` column is still NULL (i.e. never-stamped) — they
1102    /// do NOT mutate post-backfill rows. The carve-out is path-
1103    /// scoped: the test still flags any UPDATE/DELETE in non-
1104    /// migration files (the production write paths under
1105    /// `signed_events.rs`, the MCP/HTTP handlers, etc.).
1106    #[test]
1107    fn append_only_invariant_no_mutators_in_src() {
1108        use std::path::Path;
1109
1110        let src_root = Path::new(env!("CARGO_MANIFEST_DIR")).join("src");
1111        // Two forbidden patterns. We split each into halves and
1112        // concat at runtime so the grep still flags real call sites
1113        // even if a future contributor copy-pastes a literal needle
1114        // into a doc comment elsewhere.
1115        let forbidden: [String; 2] = [
1116            format!("{} signed_events", "UPDATE"),
1117            format!("{} signed_events", "DELETE FROM"),
1118        ];
1119        // Files allowed to contain the v34 backfill UPDATE — these
1120        // are the migration paths described in the doc comment above.
1121        // Path matching is by suffix so the test passes on every
1122        // OS / working-directory layout.
1123        let migration_carveouts: [&str; 2] = ["src/storage/migrations.rs", "src/store/postgres.rs"];
1124        let mut hits: Vec<String> = Vec::new();
1125        walk_rs_files(&src_root, &mut |path, contents| {
1126            // Skip the v34 backfill UPDATE in migration paths.
1127            let path_str = path.to_string_lossy().replace('\\', "/");
1128            let is_carveout = migration_carveouts.iter().any(|c| path_str.ends_with(c));
1129            let stripped = strip_rust_comments(contents);
1130            for needle in &forbidden {
1131                if !stripped.contains(needle.as_str()) {
1132                    continue;
1133                }
1134                if is_carveout && needle.starts_with("UPDATE") {
1135                    // Carve-out: backfill is allowed to UPDATE.
1136                    // DELETE FROM is still flagged.
1137                    continue;
1138                }
1139                hits.push(format!("{}: {}", path.display(), needle));
1140            }
1141        });
1142        assert!(
1143            hits.is_empty(),
1144            "found forbidden mutator(s) on signed_events: {hits:?} \
1145             — append-only invariant requires zero UPDATE/DELETE call sites in production code \
1146             (the v34 backfill UPDATE in migrations.rs / postgres.rs is the only allowed exception)"
1147        );
1148    }
1149
1150    /// Strip Rust line comments (`//...`) and single-line block
1151    /// comments (`/* ... */`). Multi-line block comments are
1152    /// rare in this codebase; an unmatched `/*` falls through and
1153    /// leaves the rest of the file intact, which is fine — the
1154    /// grep is a guard, not a parser.
1155    fn strip_rust_comments(src: &str) -> String {
1156        let mut out = String::with_capacity(src.len());
1157        for line in src.lines() {
1158            // Drop everything from the first `//` onward. We don't
1159            // try to honour `//` inside a string literal — none of
1160            // the production code under `src/` quotes these
1161            // forbidden phrases inside a string except the
1162            // legitimate signed_events.sql include path, which the
1163            // outer needle ("UPDATE signed_events") doesn't match.
1164            let line_no_line_comment = match line.find("//") {
1165                Some(idx) => &line[..idx],
1166                None => line,
1167            };
1168            // Strip /* ... */ blocks that open and close on the
1169            // same line. Good enough for the doc-comment patterns
1170            // that exist today.
1171            let mut buf = String::from(line_no_line_comment);
1172            while let (Some(start), Some(end_rel)) = (buf.find("/*"), buf.find("*/").map(|i| i + 2))
1173            {
1174                if end_rel <= start {
1175                    break;
1176                }
1177                buf.replace_range(start..end_rel, "");
1178            }
1179            out.push_str(&buf);
1180            out.push('\n');
1181        }
1182        out
1183    }
1184
1185    fn walk_rs_files(dir: &std::path::Path, visit: &mut dyn FnMut(&std::path::Path, &str)) {
1186        let Ok(entries) = std::fs::read_dir(dir) else {
1187            return;
1188        };
1189        for entry in entries.flatten() {
1190            let path = entry.path();
1191            if path.is_dir() {
1192                walk_rs_files(&path, visit);
1193            } else if path.extension().and_then(|s| s.to_str()) == Some("rs") {
1194                if let Ok(contents) = std::fs::read_to_string(&path) {
1195                    visit(&path, &contents);
1196                }
1197            }
1198        }
1199    }
1200
1201    // -----------------------------------------------------------------
1202    // L0.7-2 Tier A — row decode error paths (row_to_event row.get(N)?).
1203    //
1204    // The Err-arms of `row.get(0..6)?` in `row_to_event` are
1205    // triggered when the SELECTed columns can't be decoded into the
1206    // target Rust type. We exercise this by constructing an
1207    // in-memory DB with the SAME shape but a deliberately wrong
1208    // value type for the `agent_id` column (NULL where NOT NULL is
1209    // expected by the Rust type — rusqlite reports a type error on
1210    // String::from_sql when the column is NULL).
1211    // -----------------------------------------------------------------
1212
1213    #[test]
1214    fn list_signed_events_row_decode_error_propagates() {
1215        // Build a permissive signed_events shape (no NOT NULL on
1216        // agent_id) so we can INSERT a NULL there. The list_signed_events
1217        // query selects agent_id into a String — String::from_sql
1218        // refuses NULL, which exercises the row_to_event row.get(1)?
1219        // Err arm.
1220        let conn = Connection::open_in_memory().expect("in-memory db");
1221        // Drop the SQL-file table and recreate a NULL-permissive shape
1222        // with the SAME column order so the SELECT in
1223        // list_signed_events still works.
1224        conn.execute_batch(
1225            "CREATE TABLE signed_events (
1226                id              TEXT PRIMARY KEY,
1227                agent_id        TEXT,
1228                event_type      TEXT NOT NULL,
1229                payload_hash    BLOB NOT NULL,
1230                signature       BLOB,
1231                attest_level    TEXT NOT NULL,
1232                timestamp       TEXT NOT NULL,
1233                prev_hash       BLOB,
1234                sequence        INTEGER
1235            );",
1236        )
1237        .unwrap();
1238        // Insert one row with NULL in agent_id — the SELECT shape
1239        // matches list_signed_events but row.get(1)? fails on the
1240        // NULL→String decode.
1241        conn.execute(
1242            "INSERT INTO signed_events \
1243             (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
1244              prev_hash, sequence) \
1245             VALUES ('row1', NULL, 'memory_link.created', X'00', NULL, 'unsigned', \
1246             '2026-05-13T00:00:00+00:00', NULL, 1)",
1247            [],
1248        )
1249        .unwrap();
1250        // Listing now exercises the row.get(1)? Err arm.
1251        let res = list_signed_events(&conn, None, 10, 0);
1252        assert!(res.is_err(), "row decode must fail when agent_id is NULL");
1253    }
1254
1255    // -----------------------------------------------------------------
1256    // COR-9 (issue #767) — read_chain_head NULL-sequence diagnostic
1257    // -----------------------------------------------------------------
1258
1259    /// A legacy DB carrying a row with `sequence IS NULL` (pre-v34 or
1260    /// a backfill that was interrupted) MUST be detected by
1261    /// `read_chain_head` — surfacing as a clear error rather than
1262    /// silently colliding on the UNIQUE index when a fresh
1263    /// `append_signed_event` would otherwise compute `next_seq = 1`
1264    /// against an already-stamped first row.
1265    #[test]
1266    fn read_chain_head_rejects_null_sequence_row() {
1267        // Build a NULL-permissive schema so we can INSERT a sequence
1268        // IS NULL row directly.
1269        let conn = Connection::open_in_memory().expect("in-memory db");
1270        conn.execute_batch(
1271            "CREATE TABLE signed_events (
1272                id              TEXT PRIMARY KEY,
1273                agent_id        TEXT NOT NULL,
1274                event_type      TEXT NOT NULL,
1275                payload_hash    BLOB NOT NULL,
1276                signature       BLOB,
1277                attest_level    TEXT NOT NULL DEFAULT 'unsigned',
1278                timestamp       TEXT NOT NULL,
1279                prev_hash       BLOB,
1280                sequence        INTEGER
1281            );
1282            CREATE UNIQUE INDEX idx_signed_events_sequence
1283                ON signed_events(sequence);",
1284        )
1285        .unwrap();
1286        // Insert a row whose sequence column is NULL (pre-v34 shape).
1287        conn.execute(
1288            "INSERT INTO signed_events \
1289             (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
1290              prev_hash, sequence) \
1291             VALUES ('legacy', 'alice', 'memory_link.created', X'00', NULL, 'unsigned', \
1292             '2026-05-13T00:00:00+00:00', NULL, NULL)",
1293            [],
1294        )
1295        .unwrap();
1296
1297        // read_chain_head MUST surface this clearly, not silently
1298        // collapse the NULL to 0.
1299        let err = read_chain_head(&conn).expect_err("NULL-sequence row must trigger diagnostic");
1300        let rendered = format!("{err:#}");
1301        assert!(
1302            rendered.contains("sequence IS NULL") || rendered.contains("backfill incomplete"),
1303            "diagnostic must mention NULL-sequence rows; got: {rendered}"
1304        );
1305    }
1306
1307    /// And, by extension, `append_signed_event` MUST refuse to grow
1308    /// the chain when the diagnostic fires — silently appending atop
1309    /// a partially-migrated table is the exact failure mode COR-9 closes.
1310    #[test]
1311    fn append_signed_event_refuses_when_null_sequence_present() {
1312        let conn = Connection::open_in_memory().expect("in-memory db");
1313        conn.execute_batch(
1314            "CREATE TABLE signed_events (
1315                id              TEXT PRIMARY KEY,
1316                agent_id        TEXT NOT NULL,
1317                event_type      TEXT NOT NULL,
1318                payload_hash    BLOB NOT NULL,
1319                signature       BLOB,
1320                attest_level    TEXT NOT NULL DEFAULT 'unsigned',
1321                timestamp       TEXT NOT NULL,
1322                prev_hash       BLOB,
1323                sequence        INTEGER
1324            );
1325            CREATE UNIQUE INDEX idx_signed_events_sequence
1326                ON signed_events(sequence);",
1327        )
1328        .unwrap();
1329        conn.execute(
1330            "INSERT INTO signed_events \
1331             (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
1332              prev_hash, sequence) \
1333             VALUES ('legacy', 'alice', 'memory_link.created', X'00', NULL, 'unsigned', \
1334             '2026-05-13T00:00:00+00:00', NULL, NULL)",
1335            [],
1336        )
1337        .unwrap();
1338        let event = fixture("alice", "memory_link.created");
1339        let err = append_signed_event(&conn, &event)
1340            .expect_err("append must refuse while NULL-sequence row exists");
1341        let rendered = format!("{err:#}");
1342        assert!(
1343            rendered.contains("sequence IS NULL") || rendered.contains("backfill incomplete"),
1344            "diagnostic must surface in append error; got: {rendered}"
1345        );
1346    }
1347
1348    #[test]
1349    fn list_signed_events_with_agent_filter_row_decode_error_propagates() {
1350        // Same as above, but exercise the agent_id == Some(...) branch
1351        // of list_signed_events. The `WHERE agent_id = ?1` won't
1352        // match NULL rows (NULL ≠ anything), so we need a row whose
1353        // agent_id is the queried string but another column NULLs out.
1354        // Insert a row whose `event_type` is NULL — row.get(2)?
1355        // fails on NULL→String when listing filtered by agent.
1356        let conn = Connection::open_in_memory().expect("in-memory db");
1357        conn.execute_batch(
1358            "CREATE TABLE signed_events (
1359                id              TEXT PRIMARY KEY,
1360                agent_id        TEXT NOT NULL,
1361                event_type      TEXT,
1362                payload_hash    BLOB NOT NULL,
1363                signature       BLOB,
1364                attest_level    TEXT NOT NULL,
1365                timestamp       TEXT NOT NULL,
1366                prev_hash       BLOB,
1367                sequence        INTEGER
1368            );",
1369        )
1370        .unwrap();
1371        conn.execute(
1372            "INSERT INTO signed_events \
1373             (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
1374              prev_hash, sequence) \
1375             VALUES ('row2', 'alice', NULL, X'00', NULL, 'unsigned', \
1376             '2026-05-13T00:00:00+00:00', NULL, 1)",
1377            [],
1378        )
1379        .unwrap();
1380        let res = list_signed_events(&conn, Some("alice"), 10, 0);
1381        assert!(res.is_err(), "row decode must fail when event_type is NULL");
1382    }
1383}