ai_memory/signed_events.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 / H-track substrate — append-only `signed_events` audit
5//! table.
6//!
7//! # NSA CSI MCP Security mapping
8//!
9//! Primary defense against **NSA concern (g) Poor or missing audit
10//! logs** and contributing implementation of **NSA recommendation (e)
11//! Sign and verify MCP messages** + **(g) Instrument for logging and
12//! detection** per the NSA Cybersecurity Information document on MCP
13//! security (U/OO/6030316-26 \| PP-26-1834, May 2026, Version 1.0).
14//! The V-4 cross-row hash chain (`prev_hash` SHA-256 over the prior
15//! row's canonical bytes plus monotonic `sequence` counter) makes the
16//! audit log tamper-evident even when individual row signatures are
17//! valid: tampering with row N's content breaks row N+1's `prev_hash`
18//! verification; tampering with `sequence` breaks the contiguity
19//! check. Capability inventory anchor:
20//! `v4_signed_events_chain` in
21//! [`docs/compliance/_inventory/v0.7.0-capabilities.json`](../../docs/compliance/_inventory/v0.7.0-capabilities.json);
22//! narrative in
23//! [`docs/compliance/nsa-csi-mcp.html`](../../docs/compliance/nsa-csi-mcp.html)
24//! §3.7 (concern g) and §4.7 (recommendation g).
25//!
26//! Each identity-bearing write (today: every `memory_link` insert
27//! through `db::create_link` / `db::create_link_signed`) appends one
28//! row to `signed_events` so a downstream auditor can replay the
29//! exact sequence of attestation events the daemon emitted, without
30//! having to scan the mutable `memory_links` table for "what did
31//! this row look like at write time" — by construction, the
32//! `payload_hash` captured here is SHA-256 over the same canonical-
33//! CBOR bytes the H2 signer committed to.
34//!
35//! # Append-only invariant (row-level)
36//!
37//! This module exposes ONE writer ([`append_signed_event`]) and
38//! ZERO mutators. There are no `UPDATE signed_events` or `DELETE
39//! FROM signed_events` statements anywhere in the production
40//! codepath. Operators that need to prune (compliance retention,
41//! disk pressure) MUST do so via direct SQL with explicit
42//! awareness that they are breaking the audit chain — that is the
43//! deliberate escape hatch documented in
44//! `migrations/sqlite/0020_v07_signed_events.sql`.
45//!
46//! The H5 test suite asserts (via grep over `src/`) that no
47//! `UPDATE signed_events` / `DELETE FROM signed_events` strings
48//! appear in production code outside doc comments — adding any
49//! such call site will fail the build.
50//!
51//! # Cross-row tamper evidence (schema v34, #698 V-4 closeout)
52//!
53//! Since schema v34 the table carries TWO chain columns on top of
54//! each row's Ed25519 `signature`:
55//!
56//! - **`prev_hash BLOB`** — SHA-256 (32 bytes) over the canonical-
57//! bytes encoding of the PRECEDING row (see
58//! [`canonical_chain_bytes`]). First row gets 32 zero bytes.
59//! - **`sequence INTEGER`** — monotonically-increasing rank starting
60//! at 1, pinned by a `UNIQUE INDEX`.
61//!
62//! Together they form a SQL-side hash chain mirroring the JSONL
63//! property in [`crate::audit`]. A `DELETE` of row N still passes
64//! per-row signature verification on the surviving rows individually,
65//! BUT row N+1's stored `prev_hash` will no longer match the
66//! recomputed digest of the (now-missing) row N — the chain break is
67//! detected at row N+1 by [`verify_chain`]. An `UPDATE` of any
68//! column included in the canonical-bytes encoding propagates the
69//! same way. A tampered `sequence` breaks the contiguity check.
70//!
71//! The cross-row chain is the LOAD-BEARING property; per-row Ed25519
72//! signatures (the existing `signature` column) remain as defense-in-
73//! depth.
74//!
75//! ## Relationship to [`crate::audit`] (JSONL chain)
76//!
77//! The JSONL audit log under `<audit_dir>/audit.log` remains the
78//! cross-host portable evidence format with its own:
79//!
80//! - **Cross-line hash chain.** Each JSONL line carries `prev_hash`
81//! pointing to the prior line's `self_hash`; `ai-memory audit
82//! verify` recomputes the chain and exits non-zero on mismatch.
83//! - **Monotonic sequence.** F2 (v0.7.0 round-2) wired the sequence
84//! counter to survive process restart so SIEMs detect dropped
85//! lines even before the chain check.
86//! - **Append-only OS hint.** Best-effort `chflags(2)` /
87//! `FS_IOC_SETFLAGS`.
88//!
89//! The SQL chain (this module) is the daemon-local property; the
90//! JSONL chain is the portable evidence the daemon hands off to a
91//! SIEM. They are complementary, not redundant.
92//!
93//! ## When to use which surface
94//!
95//! | Question | Surface |
96//! |---|---|
97//! | "What did this signed link's bytes look like at write time?" | `signed_events.payload_hash` (binds canonical CBOR) |
98//! | "Was the SQL substrate tampered with between `T0` and `T1`?" | `signed_events.prev_hash` + `signed_events.sequence` via [`verify_chain`] |
99//! | "Was the on-disk audit log truncated?" | `audit.rs` JSONL chain |
100//! | "Did the same key issue the create and the invalidate?" | `signed_events.signature` on both rows |
101//!
102//! # Out of scope
103//!
104//! - H4 (`memory_verify` MCP tool, `attest_level` enum surfacing).
105//! - H6 (end-to-end test of the immutable chain).
106
107use anyhow::{Context, Result};
108use rusqlite::{Connection, params};
109use sha2::{Digest, Sha256};
110
111/// Tracing target for signed-events chain emission/verification log
112/// lines across the storage backends (#1558 tracing-target SSOT).
113/// Distinct from the `signed_events` SQL table name, which stays
114/// embedded in the SQL strings that reference it.
115pub(crate) const SIGNED_EVENTS_TRACE_TARGET: &str = "signed_events";
116
117// ---------------------------------------------------------------------------
118// v0.7.0 multi-agent literal-sweep (scanner B finding F-B9.x) —
119// canonical signed_events event-type slugs.
120//
121// Pre-sweep, `event_type` strings were scattered across ~14
122// production sites as inline literals:
123// - "memory_link.created" (storage::create_link x2, cli::verify, cli::verify_signed_events)
124// - "memory_link.invalidated" (storage::reflection invalidation)
125// - "memory.stored" (cli::verify substrate writer)
126// - "memory.touch" (cli::verify touch path)
127// - "reflection.invalidation_notified" (notification::invalidation)
128// - "reflection.depth_exceeded" (cli::doctor)
129// - "reflects_on.cycle_refused" (mcp::link)
130// - "skill.exported" (mcp::skill_export)
131// - "skill.registered" (mcp::skill_register)
132// - "memory_capture_turn" (mcp::capture_turn, RFC-0001 L4)
133// - "persona_generated" (persona::generate)
134// - "atomisation_complete" (atomisation::run)
135//
136// Renaming any slug — or adding a new one without grep-ing every
137// candidate writer site — was a substrate-wide search. The
138// `event_types` module below centralises every slug as a `&'static
139// str` const so a rename is one edit + the writer + downstream
140// auditor + replay-tool callsites get the new value mechanically.
141// Mirrors the `errors::error_codes` pattern (ARCH-9 / FX-C4-batch2).
142// ---------------------------------------------------------------------------
143
144#[allow(dead_code)]
145pub mod event_types {
146 /// `signed_events.event_type` for `db::create_link` /
147 /// `db::create_link_signed` writes (the canonical link-write audit
148 /// emission). 4 production callsites pre-sweep: 2 in
149 /// `src/storage/mod.rs` + 1 in `src/cli/verify.rs` + 1 in
150 /// `src/cli/verify_signed_events.rs`.
151 pub const MEMORY_LINK_CREATED: &str = "memory_link.created";
152
153 /// `signed_events.event_type` for link invalidation via the L2-3
154 /// reflection-invalidation walker (`src/storage/mod.rs::5257`).
155 pub const MEMORY_LINK_INVALIDATED: &str = "memory_link.invalidated";
156
157 /// `signed_events.event_type` for substrate-side memory inserts
158 /// (`src/cli/verify.rs::933`).
159 pub const MEMORY_STORED: &str = "memory.stored";
160
161 /// `signed_events.event_type` for substrate-side touch path
162 /// (`src/cli/verify.rs::970`).
163 pub const MEMORY_TOUCH: &str = "memory.touch";
164
165 /// `signed_events.event_type` for notification fan-out by the L2-3
166 /// reflection-invalidation walker
167 /// (`src/notification/invalidation.rs::278`).
168 pub const REFLECTION_INVALIDATION_NOTIFIED: &str = "reflection.invalidation_notified";
169
170 /// `signed_events.event_type` for the recursive-learning depth-cap
171 /// trip (`src/cli/doctor.rs::1904`). v0.7.0 #655 Task 1/8.
172 pub const REFLECTION_DEPTH_EXCEEDED: &str = "reflection.depth_exceeded";
173
174 /// `signed_events.event_type` for the `reflects_on` cycle refusal
175 /// at `src/mcp/tools/link.rs::199` (v0.7.0 #655 Task 1/8 — cycle
176 /// detection on reflection chain).
177 pub const REFLECTS_ON_CYCLE_REFUSED: &str = "reflects_on.cycle_refused";
178
179 /// `signed_events.event_type` for skill-export emission
180 /// (`src/mcp/tools/skill_export.rs::244`).
181 pub const SKILL_EXPORTED: &str = "skill.exported";
182
183 /// `signed_events.event_type` for skill-register emission
184 /// (`src/mcp/tools/skill_register.rs::226`).
185 pub const SKILL_REGISTERED: &str = "skill.registered";
186
187 /// `signed_events.event_type` for the L4 `memory_capture_turn`
188 /// MCP tool emission per RFC-0001 (`src/mcp/tools/capture_turn.rs::472`).
189 pub const MEMORY_CAPTURE_TURN: &str = "memory_capture_turn";
190
191 /// `signed_events.event_type` for persona regeneration
192 /// (`src/persona/mod.rs::787`). v0.7.0 QW-2 persona-as-artifact.
193 pub const PERSONA_GENERATED: &str = "persona_generated";
194
195 /// `signed_events.event_type` for WT-1-C atomisation completion
196 /// (`src/atomisation/mod.rs::943`). v0.7.0 Batman Form 1-4
197 /// atomisation engine.
198 pub const ATOMISATION_COMPLETE: &str = "atomisation_complete";
199
200 /// `signed_events.event_type` for an outbound federation credential
201 /// renewal — emitted by the file-refresh renewal worker
202 /// (`src/federation/identity/renewal.rs`) on every tick that swaps a
203 /// freshly-issued credential into the live send path
204 /// ([`crate::federation::identity::renewal::RenewalOutcome::Reloaded`]).
205 /// This is the FED-P4-f §8 audit obligation: the node's own
206 /// credential lifecycle is recorded in the tamper-evident chain.
207 /// Issuance (`FederationIssuer`) is centrally operated and revocation
208 /// is by self-expiry (no CRL — see `issuer.rs`), so renewal is the
209 /// only node-local lifecycle transition there is to audit.
210 pub const FED_CREDENTIAL_RENEWED: &str = "federation.credential_renewed";
211
212 /// `signed_events.event_type` for operator-authorized governance-rule
213 /// deletion via `ai-memory rules remove --sign`
214 /// (`src/governance/rules_store.rs::remove_signed`). Closes the
215 /// audit gap where a `DELETE FROM governance_rules` left no
216 /// tamper-evident trace of WHICH rule the operator removed; the
217 /// emitted row's `payload_hash` is the SHA-256 over the removed
218 /// rule's canonical signing bytes and `signature` is the operator's
219 /// Ed25519 signature over that hash.
220 pub const GOVERNANCE_RULE_REMOVED: &str = "governance.rule_removed";
221}
222
223/// One row of the `signed_events` audit table.
224///
225/// `id` is a UUIDv4 minted by the writer; `payload_hash` is the
226/// 32-byte SHA-256 over the canonical-CBOR bytes that H2 hashed for
227/// the original signature; `signature` mirrors the source row's
228/// `memory_links.signature` (NULL when the source write was
229/// unsigned).
230///
231/// `prev_hash` and `sequence` are populated by
232/// [`append_signed_event`] (writer fills them from the current chain
233/// head — callers MUST NOT set them) and by [`row_to_event`] on read
234/// (selecting back rows from the table).
235#[derive(Debug, Clone, PartialEq, Eq, Default)]
236pub struct SignedEvent {
237 pub id: String,
238 pub agent_id: String,
239 pub event_type: String,
240 pub payload_hash: Vec<u8>,
241 pub signature: Option<Vec<u8>>,
242 pub attest_level: String,
243 pub timestamp: String,
244 /// v34 — SHA-256 (32 bytes) over the canonical-bytes encoding of
245 /// the preceding row, or 32 zero bytes for the first row. Filled
246 /// by [`append_signed_event`] at insert time; callers MUST NOT
247 /// pre-populate this field — any value set by the caller is
248 /// ignored. Use `..SignedEvent::default()` at the struct-literal
249 /// tail to leave this empty.
250 pub prev_hash: Vec<u8>,
251 /// v34 — monotonically-increasing chain rank starting at 1.
252 /// Filled by [`append_signed_event`] at insert time; callers MUST
253 /// NOT pre-populate this field — any value set by the caller is
254 /// ignored. Use `..SignedEvent::default()` at the struct-literal
255 /// tail to leave this zero.
256 pub sequence: i64,
257}
258
259impl SignedEvent {
260 /// v0.7.0 #1099 (SR-1 #4, HIGH) — build a `SignedEvent` that
261 /// consults the process-wide daemon audit signing key (installed
262 /// at boot via [`crate::governance::audit::init`]) and applies it
263 /// to `payload_hash`. When a key is installed, the returned row
264 /// carries `signature: Some(sig_bytes)` + `attest_level:
265 /// "daemon_signed"`; when no key is installed, falls back to
266 /// `signature: None, attest_level: "unsigned"`.
267 ///
268 /// Homogenises every production audit-row writer (pending_action
269 /// approve/reject/timeout, federation.quota_refused on both
270 /// sqlite + postgres paths, governance.check) so a downstream
271 /// auditor sees per-row signatures matching the daemon's
272 /// `VerifyingKey` and the cross-row chain head together.
273 ///
274 /// The other chain columns (`prev_hash`, `sequence`) are left at
275 /// their defaults — [`append_signed_event`] fills them at INSERT
276 /// time. Callers MUST NOT pre-populate them.
277 #[must_use]
278 pub fn with_daemon_signature(
279 payload_hash: Vec<u8>,
280 agent_id: String,
281 event_type: String,
282 timestamp: String,
283 ) -> Self {
284 let (signature, attest_level) =
285 match crate::governance::audit::try_sign_audit_payload(&payload_hash) {
286 Some((sig_bytes, tag)) => (Some(sig_bytes), tag.to_string()),
287 None => (
288 None,
289 crate::models::AttestLevel::Unsigned.as_str().to_string(),
290 ),
291 };
292 SignedEvent {
293 id: uuid::Uuid::new_v4().to_string(),
294 agent_id,
295 event_type,
296 payload_hash,
297 signature,
298 attest_level,
299 timestamp,
300 ..SignedEvent::default()
301 }
302 }
303}
304
305/// All-zeros 32-byte digest used as `prev_hash` for the first row.
306pub const ZERO_HASH: [u8; 32] = [0u8; 32];
307
308/// Field separator for [`canonical_chain_bytes`]. ASCII 0x1F
309/// ("Unit Separator") — present in neither RFC3339 timestamps nor
310/// UUID strings nor the hex/base64 / raw-bytes payloads we encode,
311/// so concatenation is unambiguous without escaping.
312const FIELD_SEP: u8 = 0x1F;
313
314/// Canonical bytes used as the chain-hash input.
315///
316/// Commits to every column that identifies the row's content:
317/// `id || 0x1F || agent_id || 0x1F || event_type || 0x1F ||
318/// payload_hash || 0x1F || signature_or_empty || 0x1F ||
319/// attest_level || 0x1F || timestamp || 0x1F || sequence_be_8_bytes`.
320///
321/// Each row's `prev_hash` is `SHA-256(canonical_chain_bytes(prev_row))`,
322/// or [`ZERO_HASH`] for the first row. A future hash-agility
323/// migration can change the digest in one place; the encoding itself
324/// is byte-stable so an auditor can replay the chain from the stored
325/// columns alone.
326#[must_use]
327pub fn canonical_chain_bytes(event: &SignedEvent) -> Vec<u8> {
328 let mut out: Vec<u8> = Vec::with_capacity(
329 event.id.len()
330 + event.agent_id.len()
331 + event.event_type.len()
332 + event.payload_hash.len()
333 + event.signature.as_ref().map_or(0, Vec::len)
334 + event.attest_level.len()
335 + event.timestamp.len()
336 + 8
337 + 7, // seven separators
338 );
339 out.extend_from_slice(event.id.as_bytes());
340 out.push(FIELD_SEP);
341 out.extend_from_slice(event.agent_id.as_bytes());
342 out.push(FIELD_SEP);
343 out.extend_from_slice(event.event_type.as_bytes());
344 out.push(FIELD_SEP);
345 out.extend_from_slice(&event.payload_hash);
346 out.push(FIELD_SEP);
347 if let Some(sig) = event.signature.as_ref() {
348 out.extend_from_slice(sig);
349 }
350 // empty signature contributes zero bytes between separators —
351 // the separator on either side still pins the slot's position.
352 out.push(FIELD_SEP);
353 out.extend_from_slice(event.attest_level.as_bytes());
354 out.push(FIELD_SEP);
355 out.extend_from_slice(event.timestamp.as_bytes());
356 out.push(FIELD_SEP);
357 out.extend_from_slice(&event.sequence.to_be_bytes());
358 out
359}
360
361/// Read the chain head — `(max_sequence, prev_canonical_hash)`.
362///
363/// Returns `(0, ZERO_HASH)` for an empty table. The "previous"
364/// canonical hash is the SHA-256 over the canonical bytes of the
365/// row with the highest sequence; the next inserted row's
366/// `prev_hash` is exactly this value.
367///
368/// # Cluster-C COR-9 (issue #767): NULL-sequence diagnostic
369///
370/// Prior to this fix the query was
371/// `SELECT … COALESCE(sequence, 0) … ORDER BY COALESCE(sequence, 0) DESC`,
372/// which silently treated a row whose `sequence IS NULL` (a
373/// pre-v34 row that the v34 backfill missed) as sequence == 0 and
374/// would then issue `next_seq = 1`, colliding with the legitimately-
375/// backfilled first row on the UNIQUE index. The mask hid a real
376/// migration-needed state behind a misleading SQLITE_CONSTRAINT_UNIQUE.
377///
378/// We now restrict the head SELECT to `WHERE sequence IS NOT NULL`,
379/// and issue a SEPARATE diagnostic SELECT that asserts no
380/// `sequence IS NULL` rows exist. If any are found post-v34 the
381/// function returns a clear error and emits `tracing::error!` so an
382/// operator can run `ai-memory migrate` (or invoke
383/// `migrate_v34_backfill_chain` directly) to repair the chain.
384fn read_chain_head(conn: &Connection) -> Result<(i64, [u8; 32])> {
385 // COR-9 diagnostic: hard-fail on NULL-sequence rows so a
386 // partially-migrated DB never silently produces a duplicate
387 // sequence. The v34 backfill is idempotent — operators recovering
388 // from this error re-run the migration ladder.
389 let null_seq_count: i64 = conn
390 .query_row(
391 "SELECT COUNT(*) FROM signed_events WHERE sequence IS NULL",
392 [],
393 |row| row.get(0),
394 )
395 .context("read_chain_head: null-sequence diagnostic")?;
396 if null_seq_count > 0 {
397 tracing::error!(
398 null_sequence_rows = null_seq_count,
399 "signed_events: found {null_seq_count} row(s) with sequence IS NULL — \
400 v34 chain backfill is incomplete. Re-run the migration ladder \
401 (`ai-memory migrate` or restart with the current binary) to \
402 stamp prev_hash + sequence on the unmigrated rows; refusing to \
403 append further audit rows until the chain is repaired."
404 );
405 anyhow::bail!(
406 "read_chain_head: {null_seq_count} signed_events row(s) have sequence IS NULL — \
407 v34 chain backfill incomplete; re-run `ai-memory migrate`"
408 );
409 }
410
411 // Pull the column shape that `canonical_chain_bytes` needs,
412 // ordered by sequence DESC so the head is the first row. Restrict
413 // to non-NULL sequences (the diagnostic above guarantees this
414 // matches every row, but the WHERE makes the read itself robust
415 // against a TOCTOU window if a concurrent unmigrated INSERT
416 // landed between the diagnostic and this query).
417 let mut stmt = conn
418 .prepare(
419 "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
420 timestamp, sequence \
421 FROM signed_events \
422 WHERE sequence IS NOT NULL \
423 ORDER BY sequence DESC, rowid DESC \
424 LIMIT 1",
425 )
426 .context("read_chain_head: prepare")?;
427 let head: Option<SignedEvent> = stmt
428 .query_map([], |row| {
429 Ok(SignedEvent {
430 id: row.get(0)?,
431 agent_id: row.get(1)?,
432 event_type: row.get(2)?,
433 payload_hash: row.get(3)?,
434 signature: row.get(4)?,
435 attest_level: row.get(5)?,
436 timestamp: row.get(6)?,
437 sequence: row.get(7)?,
438 prev_hash: Vec::new(), // not part of the canonical bytes
439 })
440 })
441 .context("read_chain_head: query_map")?
442 .next()
443 .transpose()
444 .context("read_chain_head: collect")?;
445 match head {
446 None => Ok((0, ZERO_HASH)),
447 Some(prev) => {
448 let max_seq = prev.sequence;
449 let canon = canonical_chain_bytes(&prev);
450 let mut hasher = Sha256::new();
451 hasher.update(&canon);
452 let mut digest = [0u8; 32];
453 digest.copy_from_slice(&hasher.finalize());
454 Ok((max_seq, digest))
455 }
456 }
457}
458
459/// Outcome of a [`verify_chain`] pass over the `signed_events` table.
460///
461/// `rows_checked` counts every row the verifier walked.
462/// `chain_break` is `Some(sequence)` when the FIRST detected break
463/// happens — that row's stored `prev_hash` does not equal
464/// SHA-256(canonical_chain_bytes(row N-1)), OR the row's `sequence`
465/// is not the expected `prior + 1` (gap / duplicate / non-monotonic
466/// jump). `signature_failures` records sequences whose Ed25519
467/// signature did not verify against the supplied key set — the
468/// chain itself may still be intact even if individual signatures
469/// fail (defense-in-depth split).
470#[derive(Debug, Clone, PartialEq, Eq)]
471pub struct ChainVerificationReport {
472 pub rows_checked: u64,
473 pub chain_break: Option<i64>,
474 pub signature_failures: Vec<i64>,
475}
476
477impl ChainVerificationReport {
478 /// `true` when the cross-row chain held end-to-end. Per-row
479 /// signature failures are surfaced separately because they are a
480 /// disjoint property (a chain break is structurally worse than a
481 /// signature failure).
482 #[must_use]
483 pub fn chain_holds(&self) -> bool {
484 self.chain_break.is_none()
485 }
486}
487
488/// Walk all rows in `sequence` order and verify the cross-row chain
489/// + per-row signatures.
490///
491/// For each row:
492/// 1. Check `sequence == prior + 1` (first row: `sequence == 1`).
493/// 2. Recompute SHA-256 over [`canonical_chain_bytes`] of the
494/// preceding row (or [`ZERO_HASH`] for the first row), compare
495/// to the current row's stored `prev_hash`.
496/// 3. Verify the Ed25519 signature (when present) over the row's
497/// `payload_hash`. The verifying-key resolver is provided by
498/// the caller; pass `None` to skip signature verification (the
499/// chain check is still performed).
500///
501/// On a chain break (step 1 or step 2 fail) the verifier records
502/// the breaking row's `sequence` and continues so the caller still
503/// gets per-row signature stats over the rest of the table.
504///
505/// # Errors
506///
507/// Returns the underlying `rusqlite` error if the SELECT or any row
508/// decode fails. A clean (chain held + zero signature failures)
509/// report is returned as `Ok`; the caller checks
510/// [`ChainVerificationReport::chain_holds`] for the chain bit.
511pub fn verify_chain(
512 conn: &Connection,
513 since_sequence: Option<i64>,
514) -> Result<ChainVerificationReport> {
515 let lower = since_sequence.unwrap_or(0);
516 let mut stmt = conn
517 .prepare(
518 "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
519 timestamp, prev_hash, COALESCE(sequence, 0) \
520 FROM signed_events \
521 WHERE COALESCE(sequence, 0) > ?1 \
522 ORDER BY COALESCE(sequence, 0) ASC",
523 )
524 .context("verify_chain: prepare")?;
525 let mut rows = stmt.query(params![lower]).context("verify_chain: query")?;
526
527 let mut rows_checked: u64 = 0;
528 let mut chain_break: Option<i64> = None;
529 // v0.7.0 #1071 (SR-2 #1, HIGH) — actually walk Ed25519 signatures.
530 // Resolved once outside the loop so we don't re-lock the OnceLock
531 // per row. Returns `None` when the daemon process has no audit
532 // signing key installed (i.e. `governance::audit::init` was called
533 // with `signing_key: None`); in that case we record no signature
534 // failures because there is no key to verify against — the
535 // verifier is structure-only on an unsigned daemon.
536 let verifier: Option<ed25519_dalek::VerifyingKey> =
537 crate::governance::audit::resolve_daemon_verifying_key();
538 let mut signature_failures: Vec<i64> = Vec::new();
539
540 let mut expected_seq = lower + 1;
541 let mut prev_canonical_hash: [u8; 32] = ZERO_HASH;
542 // When resuming with `since_sequence`, the prior row's canonical
543 // hash must be recomputed from the row immediately before `lower`
544 // so the chain check at `lower + 1` lines up.
545 if lower > 0 {
546 let mut head_stmt = conn
547 .prepare(
548 "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
549 timestamp, COALESCE(sequence, 0) \
550 FROM signed_events \
551 WHERE COALESCE(sequence, 0) = ?1",
552 )
553 .context("verify_chain: prepare head")?;
554 let head: Option<SignedEvent> = head_stmt
555 .query_map(params![lower], |row| {
556 Ok(SignedEvent {
557 id: row.get(0)?,
558 agent_id: row.get(1)?,
559 event_type: row.get(2)?,
560 payload_hash: row.get(3)?,
561 signature: row.get(4)?,
562 attest_level: row.get(5)?,
563 timestamp: row.get(6)?,
564 sequence: row.get(7)?,
565 prev_hash: Vec::new(),
566 })
567 })
568 .context("verify_chain: head query")?
569 .next()
570 .transpose()
571 .context("verify_chain: head collect")?;
572 if let Some(h) = head {
573 let canon = canonical_chain_bytes(&h);
574 let mut hasher = Sha256::new();
575 hasher.update(&canon);
576 prev_canonical_hash.copy_from_slice(&hasher.finalize());
577 }
578 }
579
580 while let Some(row) = rows.next().context("verify_chain: next row")? {
581 rows_checked += 1;
582 let event = SignedEvent {
583 id: row.get(0).context("verify_chain: id")?,
584 agent_id: row.get(1).context("verify_chain: agent_id")?,
585 event_type: row.get(2).context("verify_chain: event_type")?,
586 payload_hash: row.get(3).context("verify_chain: payload_hash")?,
587 signature: row.get(4).context("verify_chain: signature")?,
588 attest_level: row.get(5).context("verify_chain: attest_level")?,
589 timestamp: row.get(6).context("verify_chain: timestamp")?,
590 prev_hash: row
591 .get::<_, Option<Vec<u8>>>(7)
592 .context("verify_chain: prev_hash")?
593 .unwrap_or_default(),
594 sequence: row.get(8).context("verify_chain: sequence")?,
595 };
596
597 // (1) Sequence contiguity.
598 if event.sequence != expected_seq {
599 if chain_break.is_none() {
600 chain_break = Some(event.sequence);
601 }
602 // Keep walking so we still count rows + can later add
603 // signature-failure tracking; but realign expected_seq to
604 // the row we read so subsequent rows aren't ALL flagged.
605 expected_seq = event.sequence;
606 }
607
608 // (2) prev_hash chain.
609 if event.prev_hash.len() != 32 || event.prev_hash != prev_canonical_hash {
610 if chain_break.is_none() {
611 chain_break = Some(event.sequence);
612 }
613 }
614
615 // (3) v0.7.0 #1071 — per-row Ed25519 signature verification.
616 // Only fires when a verifier is resolvable AND the row has a
617 // signature blob attached (rows written before #1035 / #1099
618 // and the legacy `attest_level == "unsigned"` rows that ship
619 // by design have `signature: None` and are skipped). On
620 // shape mismatch (signature blob not 64 bytes) or
621 // `verify_strict` failure, push the breaking sequence onto
622 // `signature_failures` and continue the walk — a signature
623 // failure is disjoint from a chain break by design (per-row
624 // forgery vs. across-row substrate tamper).
625 if let Some(vk) = verifier.as_ref() {
626 match event.signature.as_ref() {
627 Some(sig_bytes) if !sig_bytes.is_empty() => {
628 let sig_array: Option<[u8; 64]> = sig_bytes.as_slice().try_into().ok();
629 match sig_array {
630 Some(arr) => {
631 let sig = ed25519_dalek::Signature::from_bytes(&arr);
632 if vk.verify_strict(&event.payload_hash, &sig).is_err() {
633 signature_failures.push(event.sequence);
634 }
635 }
636 None => {
637 // Malformed signature length — record as failure.
638 signature_failures.push(event.sequence);
639 }
640 }
641 }
642 // #1452 (SEC, HIGH) — fail-closed on a missing signature
643 // when a verifier IS installed. A row whose `attest_level`
644 // is anything other than the by-design legacy `"unsigned"`
645 // marker, but which carries no signature blob (None or an
646 // empty vec), is a stripped / never-signed row on a daemon
647 // that is supposed to be signing — record it as a
648 // signature failure rather than silently skipping it.
649 // Legitimately-unsigned (`attest_level == "unsigned"`)
650 // legacy rows remain skip-by-design.
651 _ => {
652 if event.attest_level != crate::models::AttestLevel::Unsigned.as_str() {
653 signature_failures.push(event.sequence);
654 }
655 }
656 }
657 }
658
659 // Recompute the canonical hash for the NEXT iteration.
660 let canon = canonical_chain_bytes(&event);
661 let mut hasher = Sha256::new();
662 hasher.update(&canon);
663 prev_canonical_hash.copy_from_slice(&hasher.finalize());
664
665 expected_seq += 1;
666 }
667
668 Ok(ChainVerificationReport {
669 rows_checked,
670 chain_break,
671 signature_failures,
672 })
673}
674
675/// SHA-256 helper. Centralised so every audit-row producer commits
676/// to the same digest; a future hash-agility migration changes one
677/// line here, not every call site.
678#[must_use]
679pub fn payload_hash(bytes: &[u8]) -> Vec<u8> {
680 let mut hasher = Sha256::new();
681 hasher.update(bytes);
682 hasher.finalize().to_vec()
683}
684
685/// Append a single audit row.
686///
687/// INSERT-only. There is no companion `update_signed_event` or
688/// `delete_signed_event` — the append-only invariant is enforced at
689/// the API surface, not via a SQLite trigger (a trigger would also
690/// block the documented operator-driven pruning escape hatch).
691///
692/// # Errors
693///
694/// Returns the underlying `rusqlite` error if the INSERT fails
695/// (typically a duplicate UUIDv4 — vanishingly rare but surfaced
696/// rather than ignored so the audit chain never silently drops a
697/// row).
698pub fn append_signed_event(conn: &Connection, event: &SignedEvent) -> Result<()> {
699 // v34 (#698 V-4 closeout) / Cluster-C SEC-3 (issue #767): compute
700 // chain head + INSERT in a single IMMEDIATE transaction so the
701 // (read MAX(sequence), INSERT new row) pair is atomic against
702 // concurrent writers on the same connection mutex.
703 //
704 // SQLite serializes write transactions, but a concurrent
705 // BEGIN IMMEDIATE on a sibling connection that beats us to the
706 // wal-write lock would otherwise let us read a stale head and
707 // then INSERT a duplicate sequence. The UNIQUE INDEX on
708 // `sequence` (idx_signed_events_sequence) makes the worst case
709 // a `SQLITE_CONSTRAINT_UNIQUE` error from this fn — the chain
710 // never silently breaks even on race. Callers that batch
711 // appends through the project's `Arc<Mutex<Connection>>` pool
712 // see no contention; we still wrap in IMMEDIATE for correctness
713 // under multi-connection deployments (the deferred-audit
714 // drainer opens its own connection on the same DB file).
715 //
716 // SEC-3 specifically: `rusqlite::Connection::unchecked_transaction`
717 // defaults to BEGIN DEFERRED — the prior comment claiming
718 // IMMEDIATE was a bug that let two writers on sibling
719 // connections both read a stale chain head, with one losing the
720 // INSERT race to `SQLITE_CONSTRAINT_UNIQUE` and (when invoked
721 // through the deferred-audit drainer) silently dropping the
722 // audit row. We now use `transaction_with_behavior(Immediate)`
723 // to grab the wal-write lock at BEGIN time so the read-then-
724 // INSERT pair is serialized at the SQLite layer.
725 let tx = rusqlite::Transaction::new_unchecked(conn, rusqlite::TransactionBehavior::Immediate)
726 .context("append signed_event: begin IMMEDIATE tx")?;
727 append_signed_event_no_tx(&tx, event)?;
728 tx.commit().context("append signed_event: commit tx")?;
729 Ok(())
730}
731
732/// Append a signed event using the caller's already-open transaction.
733///
734/// Use this when the caller is mid-transaction (e.g.
735/// `invalidate_link` after its `BEGIN IMMEDIATE` for the UPDATE +
736/// audit-INSERT atom). The public `append_signed_event` wrapper
737/// adds its own IMMEDIATE tx; calling that from inside a wrapping
738/// tx fails on SQLite (nested transactions are not supported on
739/// the same connection). This variant takes a `Connection`-like
740/// reference (works for both `Connection` and `Transaction`) and
741/// inserts directly.
742///
743/// # Errors
744///
745/// Returns the underlying `rusqlite` error if the chain-head read
746/// fails or the INSERT itself fails. Callers MUST rollback their
747/// own transaction on error.
748pub fn append_signed_event_no_tx(conn: &Connection, event: &SignedEvent) -> Result<()> {
749 let (max_seq, prev_hash) = read_chain_head(conn).context("append signed_event: read head")?;
750 let next_seq = max_seq + 1;
751 conn.execute(
752 "INSERT INTO signed_events \
753 (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
754 prev_hash, sequence) \
755 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
756 params![
757 event.id,
758 event.agent_id,
759 event.event_type,
760 event.payload_hash,
761 event.signature,
762 event.attest_level,
763 event.timestamp,
764 prev_hash.to_vec(),
765 next_seq,
766 ],
767 )
768 .context("append signed_event")?;
769 Ok(())
770}
771
772/// Read-only listing.
773///
774/// When `agent_id` is `Some`, restricts to that agent's events;
775/// when `None`, returns every agent's events. Ordering is
776/// `timestamp ASC, id ASC` so callers iterating with
777/// `(limit, offset)` see a stable sequence even when two events
778/// share the same RFC3339 second-precision timestamp (the `id`
779/// tiebreaker keeps the order deterministic across calls).
780///
781/// # Errors
782///
783/// Returns the underlying `rusqlite` error if the query or row
784/// decode fails.
785pub fn list_signed_events(
786 conn: &Connection,
787 agent_id: Option<&str>,
788 limit: usize,
789 offset: usize,
790) -> Result<Vec<SignedEvent>> {
791 let limit_i64 = i64::try_from(limit).unwrap_or(i64::MAX);
792 let offset_i64 = i64::try_from(offset).unwrap_or(0);
793 if let Some(agent) = agent_id {
794 let mut stmt = conn.prepare(
795 "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
796 prev_hash, COALESCE(sequence, 0) \
797 FROM signed_events \
798 WHERE agent_id = ?1 \
799 ORDER BY timestamp ASC, id ASC \
800 LIMIT ?2 OFFSET ?3",
801 )?;
802 let rows = stmt.query_map(params![agent, limit_i64, offset_i64], row_to_event)?;
803 rows.collect::<rusqlite::Result<Vec<_>>>()
804 .map_err(Into::into)
805 } else {
806 let mut stmt = conn.prepare(
807 "SELECT id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
808 prev_hash, COALESCE(sequence, 0) \
809 FROM signed_events \
810 ORDER BY timestamp ASC, id ASC \
811 LIMIT ?1 OFFSET ?2",
812 )?;
813 let rows = stmt.query_map(params![limit_i64, offset_i64], row_to_event)?;
814 rows.collect::<rusqlite::Result<Vec<_>>>()
815 .map_err(Into::into)
816 }
817}
818
819fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<SignedEvent> {
820 Ok(SignedEvent {
821 id: row.get(0)?,
822 agent_id: row.get(1)?,
823 event_type: row.get(2)?,
824 payload_hash: row.get(3)?,
825 signature: row.get(4)?,
826 attest_level: row.get(5)?,
827 timestamp: row.get(6)?,
828 prev_hash: row.get::<_, Option<Vec<u8>>>(7)?.unwrap_or_default(),
829 sequence: row.get(8)?,
830 })
831}
832
833#[cfg(test)]
834mod tests {
835 use super::*;
836 use chrono::Utc;
837 use rusqlite::Connection;
838 use uuid::Uuid;
839
840 /// In-memory connection with the v25 schema applied. We don't go
841 /// through `db::open` (which carries the full migration ladder
842 /// + WAL / FK PRAGMAs) so the unit test stays focused on the
843 /// `signed_events` table itself.
844 fn fresh_db() -> Connection {
845 let conn = Connection::open_in_memory().expect("in-memory db");
846 conn.execute_batch(include_str!(
847 "../migrations/sqlite/0020_v07_signed_events.sql"
848 ))
849 .expect("apply v25 migration");
850 conn
851 }
852
853 fn fixture(agent: &str, event_type: &str) -> SignedEvent {
854 SignedEvent {
855 id: Uuid::new_v4().to_string(),
856 agent_id: agent.to_string(),
857 event_type: event_type.to_string(),
858 payload_hash: payload_hash(b"test-payload"),
859 signature: None,
860 attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
861 timestamp: Utc::now().to_rfc3339(),
862 // prev_hash + sequence are overwritten by
863 // append_signed_event; the caller-side values here are
864 // placeholders so the struct constructs.
865 prev_hash: Vec::new(),
866 sequence: 0,
867 }
868 }
869
870 #[test]
871 fn migration_is_idempotent() {
872 // Re-applying the migration must be a no-op — it's the
873 // contract `db::migrate` relies on (every step uses
874 // `CREATE TABLE IF NOT EXISTS` + `CREATE INDEX IF NOT
875 // EXISTS`).
876 let conn = fresh_db();
877 conn.execute_batch(include_str!(
878 "../migrations/sqlite/0020_v07_signed_events.sql"
879 ))
880 .expect("re-apply v25 migration");
881 // Append still works after the re-run.
882 let event = fixture("alice", "memory_link.created");
883 append_signed_event(&conn, &event).expect("append after re-migration");
884 }
885
886 #[test]
887 fn append_then_list_round_trip() {
888 let conn = fresh_db();
889 let event = fixture("alice", "memory_link.created");
890 append_signed_event(&conn, &event).expect("append");
891 let listed = list_signed_events(&conn, Some("alice"), 10, 0).expect("list");
892 assert_eq!(listed.len(), 1);
893 // The caller-side fixture's prev_hash/sequence are
894 // placeholders — append_signed_event overwrites them with
895 // (ZERO_HASH, 1) for a fresh table. Compare every caller-
896 // controlled field individually, then assert the chain
897 // columns were populated by the writer.
898 assert_eq!(listed[0].id, event.id);
899 assert_eq!(listed[0].agent_id, event.agent_id);
900 assert_eq!(listed[0].event_type, event.event_type);
901 assert_eq!(listed[0].payload_hash, event.payload_hash);
902 assert_eq!(listed[0].signature, event.signature);
903 assert_eq!(listed[0].attest_level, event.attest_level);
904 assert_eq!(listed[0].timestamp, event.timestamp);
905 assert_eq!(listed[0].prev_hash, ZERO_HASH.to_vec());
906 assert_eq!(listed[0].sequence, 1);
907 }
908
909 #[test]
910 fn list_orders_by_timestamp_ascending() {
911 let conn = fresh_db();
912 // Three events for the same agent at distinct timestamps,
913 // inserted out of chronological order.
914 let mut a = fixture("alice", "memory_link.created");
915 a.timestamp = "2026-05-05T12:00:00+00:00".to_string();
916 let mut b = fixture("alice", "memory_link.created");
917 b.timestamp = "2026-05-05T12:00:01+00:00".to_string();
918 let mut c = fixture("alice", "memory_link.created");
919 c.timestamp = "2026-05-05T12:00:02+00:00".to_string();
920 append_signed_event(&conn, &b).unwrap();
921 append_signed_event(&conn, &c).unwrap();
922 append_signed_event(&conn, &a).unwrap();
923 let listed = list_signed_events(&conn, Some("alice"), 10, 0).expect("list");
924 let ts: Vec<&str> = listed.iter().map(|e| e.timestamp.as_str()).collect();
925 assert_eq!(
926 ts,
927 vec![
928 "2026-05-05T12:00:00+00:00",
929 "2026-05-05T12:00:01+00:00",
930 "2026-05-05T12:00:02+00:00",
931 ],
932 "rows must come back in ascending timestamp order"
933 );
934 }
935
936 #[test]
937 fn list_filters_by_agent() {
938 let conn = fresh_db();
939 append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
940 append_signed_event(&conn, &fixture("bob", "memory_link.created")).unwrap();
941 append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
942 let alice = list_signed_events(&conn, Some("alice"), 10, 0).unwrap();
943 let bob = list_signed_events(&conn, Some("bob"), 10, 0).unwrap();
944 let all = list_signed_events(&conn, None, 10, 0).unwrap();
945 assert_eq!(alice.len(), 2);
946 assert_eq!(bob.len(), 1);
947 assert_eq!(all.len(), 3);
948 }
949
950 #[test]
951 fn list_respects_limit_and_offset() {
952 let conn = fresh_db();
953 for i in 0..5 {
954 let mut e = fixture("alice", "memory_link.created");
955 e.timestamp = format!("2026-05-05T12:00:0{i}+00:00");
956 append_signed_event(&conn, &e).unwrap();
957 }
958 let page1 = list_signed_events(&conn, Some("alice"), 2, 0).unwrap();
959 let page2 = list_signed_events(&conn, Some("alice"), 2, 2).unwrap();
960 assert_eq!(page1.len(), 2);
961 assert_eq!(page2.len(), 2);
962 assert_ne!(page1[0].id, page2[0].id);
963 }
964
965 #[test]
966 fn append_preserves_signature_blob() {
967 let conn = fresh_db();
968 let mut event = fixture("alice", "memory_link.created");
969 event.signature = Some(vec![0xAA; 64]); // Ed25519 sig length
970 event.attest_level = "self_signed".to_string();
971 append_signed_event(&conn, &event).unwrap();
972 let listed = list_signed_events(&conn, Some("alice"), 10, 0).unwrap();
973 assert_eq!(listed[0].signature.as_deref(), Some(&[0xAA; 64][..]));
974 assert_eq!(listed[0].attest_level, "self_signed");
975 }
976
977 #[test]
978 fn indexes_exist_on_documented_columns() {
979 // PRAGMA index_list returns one row per index on a table.
980 // We assert each documented index is present so a future
981 // migration that drops one of them fails this test.
982 let conn = fresh_db();
983 let mut stmt = conn.prepare("PRAGMA index_list('signed_events')").unwrap();
984 let names: Vec<String> = stmt
985 .query_map([], |row| row.get::<_, String>(1))
986 .unwrap()
987 .collect::<rusqlite::Result<Vec<_>>>()
988 .unwrap();
989 assert!(
990 names.iter().any(|n| n == "idx_signed_events_agent"),
991 "missing idx_signed_events_agent in {names:?}"
992 );
993 assert!(
994 names.iter().any(|n| n == "idx_signed_events_type"),
995 "missing idx_signed_events_type in {names:?}"
996 );
997 assert!(
998 names.iter().any(|n| n == "idx_signed_events_timestamp"),
999 "missing idx_signed_events_timestamp in {names:?}"
1000 );
1001 assert!(
1002 names.iter().any(|n| n == "idx_signed_events_sequence"),
1003 "missing idx_signed_events_sequence in {names:?} \
1004 — v34 (V-4 closeout, #698) requires a UNIQUE index on \
1005 the cross-row chain sequence column"
1006 );
1007 }
1008
1009 #[test]
1010 fn payload_hash_is_sha256_32_bytes() {
1011 let h = payload_hash(b"hello world");
1012 assert_eq!(h.len(), 32, "SHA-256 digest is 32 bytes");
1013 // Stable across calls.
1014 assert_eq!(h, payload_hash(b"hello world"));
1015 // Sensitive to input.
1016 assert_ne!(h, payload_hash(b"hello worle"));
1017 }
1018
1019 // -----------------------------------------------------------------
1020 // L0.7-2 Tier A — error paths + empty / boundary coverage
1021 // -----------------------------------------------------------------
1022
1023 #[test]
1024 fn append_duplicate_id_returns_error() {
1025 let conn = fresh_db();
1026 let mut a = fixture("alice", "memory_link.created");
1027 append_signed_event(&conn, &a).expect("first append ok");
1028 // Re-use the exact same id — the PRIMARY KEY on `id` rejects
1029 // the second INSERT, exercising the .context("append signed_event")
1030 // error path.
1031 a.timestamp = Utc::now().to_rfc3339();
1032 let err = append_signed_event(&conn, &a).expect_err("second append with same id must fail");
1033 assert!(
1034 format!("{err:?}").contains("append signed_event")
1035 || format!("{err:#}").contains("append signed_event"),
1036 "anyhow context should include the 'append signed_event' tag, got: {err:?}"
1037 );
1038 }
1039
1040 #[test]
1041 fn list_signed_events_empty_db_returns_empty() {
1042 let conn = fresh_db();
1043 let alice = list_signed_events(&conn, Some("alice"), 10, 0).expect("list ok");
1044 let all = list_signed_events(&conn, None, 10, 0).expect("list ok");
1045 assert!(alice.is_empty());
1046 assert!(all.is_empty());
1047 }
1048
1049 #[test]
1050 fn list_signed_events_offset_past_end_returns_empty() {
1051 let conn = fresh_db();
1052 append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
1053 let beyond = list_signed_events(&conn, Some("alice"), 10, 100).expect("list ok");
1054 assert!(beyond.is_empty());
1055 }
1056
1057 #[test]
1058 fn list_signed_events_no_agent_filter_returns_all_agents() {
1059 let conn = fresh_db();
1060 append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
1061 append_signed_event(&conn, &fixture("bob", "memory_link.created")).unwrap();
1062 append_signed_event(&conn, &fixture("carol", "memory_link.created")).unwrap();
1063 let all = list_signed_events(&conn, None, 10, 0).expect("list ok");
1064 let agents: std::collections::HashSet<&str> =
1065 all.iter().map(|e| e.agent_id.as_str()).collect();
1066 assert_eq!(agents.len(), 3);
1067 }
1068
1069 #[test]
1070 fn payload_hash_known_vector() {
1071 // SHA-256 of the empty input must be the well-known constant
1072 // e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.
1073 let h = payload_hash(b"");
1074 let hex: String = h.iter().map(|b| format!("{b:02x}")).collect();
1075 assert_eq!(
1076 hex,
1077 "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
1078 );
1079 }
1080
1081 /// Append-only invariant: there's no public function to UPDATE
1082 /// or DELETE rows from `signed_events`, and no `UPDATE
1083 /// signed_events` / `DELETE FROM signed_events` SQL string
1084 /// appears in any *non-comment* source line under `src/`.
1085 ///
1086 /// The check strips Rust line comments (`//...`) and intra-line
1087 /// `/* ... */` blocks before searching, so the doc-comments in
1088 /// this module and in `db.rs` that *describe* the contract
1089 /// (and therefore must contain the forbidden phrases verbatim)
1090 /// don't trigger a false positive. A real SQL-string call site
1091 /// — `conn.execute("UPDATE signed_events SET ...", ...)` —
1092 /// would survive the comment strip and trip the assertion.
1093 ///
1094 /// # v34 migration backfill carve-out
1095 ///
1096 /// `src/storage/migrations.rs::migrate_v34_backfill_chain` and
1097 /// `src/store/postgres.rs::migrate_v33` each issue a
1098 /// `UPDATE signed_events SET prev_hash = ?, sequence = ?` to
1099 /// stamp the cross-row chain columns on pre-existing rows. These
1100 /// are migration-time one-shot updates against rows whose
1101 /// `sequence` column is still NULL (i.e. never-stamped) — they
1102 /// do NOT mutate post-backfill rows. The carve-out is path-
1103 /// scoped: the test still flags any UPDATE/DELETE in non-
1104 /// migration files (the production write paths under
1105 /// `signed_events.rs`, the MCP/HTTP handlers, etc.).
1106 #[test]
1107 fn append_only_invariant_no_mutators_in_src() {
1108 use std::path::Path;
1109
1110 let src_root = Path::new(env!("CARGO_MANIFEST_DIR")).join("src");
1111 // Two forbidden patterns. We split each into halves and
1112 // concat at runtime so the grep still flags real call sites
1113 // even if a future contributor copy-pastes a literal needle
1114 // into a doc comment elsewhere.
1115 let forbidden: [String; 2] = [
1116 format!("{} signed_events", "UPDATE"),
1117 format!("{} signed_events", "DELETE FROM"),
1118 ];
1119 // Files allowed to contain the v34 backfill UPDATE — these
1120 // are the migration paths described in the doc comment above.
1121 // Path matching is by suffix so the test passes on every
1122 // OS / working-directory layout.
1123 let migration_carveouts: [&str; 2] = ["src/storage/migrations.rs", "src/store/postgres.rs"];
1124 let mut hits: Vec<String> = Vec::new();
1125 walk_rs_files(&src_root, &mut |path, contents| {
1126 // Skip the v34 backfill UPDATE in migration paths.
1127 let path_str = path.to_string_lossy().replace('\\', "/");
1128 let is_carveout = migration_carveouts.iter().any(|c| path_str.ends_with(c));
1129 let stripped = strip_rust_comments(contents);
1130 for needle in &forbidden {
1131 if !stripped.contains(needle.as_str()) {
1132 continue;
1133 }
1134 if is_carveout && needle.starts_with("UPDATE") {
1135 // Carve-out: backfill is allowed to UPDATE.
1136 // DELETE FROM is still flagged.
1137 continue;
1138 }
1139 hits.push(format!("{}: {}", path.display(), needle));
1140 }
1141 });
1142 assert!(
1143 hits.is_empty(),
1144 "found forbidden mutator(s) on signed_events: {hits:?} \
1145 — append-only invariant requires zero UPDATE/DELETE call sites in production code \
1146 (the v34 backfill UPDATE in migrations.rs / postgres.rs is the only allowed exception)"
1147 );
1148 }
1149
1150 /// Strip Rust line comments (`//...`) and single-line block
1151 /// comments (`/* ... */`). Multi-line block comments are
1152 /// rare in this codebase; an unmatched `/*` falls through and
1153 /// leaves the rest of the file intact, which is fine — the
1154 /// grep is a guard, not a parser.
1155 fn strip_rust_comments(src: &str) -> String {
1156 let mut out = String::with_capacity(src.len());
1157 for line in src.lines() {
1158 // Drop everything from the first `//` onward. We don't
1159 // try to honour `//` inside a string literal — none of
1160 // the production code under `src/` quotes these
1161 // forbidden phrases inside a string except the
1162 // legitimate signed_events.sql include path, which the
1163 // outer needle ("UPDATE signed_events") doesn't match.
1164 let line_no_line_comment = match line.find("//") {
1165 Some(idx) => &line[..idx],
1166 None => line,
1167 };
1168 // Strip /* ... */ blocks that open and close on the
1169 // same line. Good enough for the doc-comment patterns
1170 // that exist today.
1171 let mut buf = String::from(line_no_line_comment);
1172 while let (Some(start), Some(end_rel)) = (buf.find("/*"), buf.find("*/").map(|i| i + 2))
1173 {
1174 if end_rel <= start {
1175 break;
1176 }
1177 buf.replace_range(start..end_rel, "");
1178 }
1179 out.push_str(&buf);
1180 out.push('\n');
1181 }
1182 out
1183 }
1184
1185 fn walk_rs_files(dir: &std::path::Path, visit: &mut dyn FnMut(&std::path::Path, &str)) {
1186 let Ok(entries) = std::fs::read_dir(dir) else {
1187 return;
1188 };
1189 for entry in entries.flatten() {
1190 let path = entry.path();
1191 if path.is_dir() {
1192 walk_rs_files(&path, visit);
1193 } else if path.extension().and_then(|s| s.to_str()) == Some("rs") {
1194 if let Ok(contents) = std::fs::read_to_string(&path) {
1195 visit(&path, &contents);
1196 }
1197 }
1198 }
1199 }
1200
1201 // -----------------------------------------------------------------
1202 // L0.7-2 Tier A — row decode error paths (row_to_event row.get(N)?).
1203 //
1204 // The Err-arms of `row.get(0..6)?` in `row_to_event` are
1205 // triggered when the SELECTed columns can't be decoded into the
1206 // target Rust type. We exercise this by constructing an
1207 // in-memory DB with the SAME shape but a deliberately wrong
1208 // value type for the `agent_id` column (NULL where NOT NULL is
1209 // expected by the Rust type — rusqlite reports a type error on
1210 // String::from_sql when the column is NULL).
1211 // -----------------------------------------------------------------
1212
1213 #[test]
1214 fn list_signed_events_row_decode_error_propagates() {
1215 // Build a permissive signed_events shape (no NOT NULL on
1216 // agent_id) so we can INSERT a NULL there. The list_signed_events
1217 // query selects agent_id into a String — String::from_sql
1218 // refuses NULL, which exercises the row_to_event row.get(1)?
1219 // Err arm.
1220 let conn = Connection::open_in_memory().expect("in-memory db");
1221 // Drop the SQL-file table and recreate a NULL-permissive shape
1222 // with the SAME column order so the SELECT in
1223 // list_signed_events still works.
1224 conn.execute_batch(
1225 "CREATE TABLE signed_events (
1226 id TEXT PRIMARY KEY,
1227 agent_id TEXT,
1228 event_type TEXT NOT NULL,
1229 payload_hash BLOB NOT NULL,
1230 signature BLOB,
1231 attest_level TEXT NOT NULL,
1232 timestamp TEXT NOT NULL,
1233 prev_hash BLOB,
1234 sequence INTEGER
1235 );",
1236 )
1237 .unwrap();
1238 // Insert one row with NULL in agent_id — the SELECT shape
1239 // matches list_signed_events but row.get(1)? fails on the
1240 // NULL→String decode.
1241 conn.execute(
1242 "INSERT INTO signed_events \
1243 (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
1244 prev_hash, sequence) \
1245 VALUES ('row1', NULL, 'memory_link.created', X'00', NULL, 'unsigned', \
1246 '2026-05-13T00:00:00+00:00', NULL, 1)",
1247 [],
1248 )
1249 .unwrap();
1250 // Listing now exercises the row.get(1)? Err arm.
1251 let res = list_signed_events(&conn, None, 10, 0);
1252 assert!(res.is_err(), "row decode must fail when agent_id is NULL");
1253 }
1254
1255 // -----------------------------------------------------------------
1256 // COR-9 (issue #767) — read_chain_head NULL-sequence diagnostic
1257 // -----------------------------------------------------------------
1258
1259 /// A legacy DB carrying a row with `sequence IS NULL` (pre-v34 or
1260 /// a backfill that was interrupted) MUST be detected by
1261 /// `read_chain_head` — surfacing as a clear error rather than
1262 /// silently colliding on the UNIQUE index when a fresh
1263 /// `append_signed_event` would otherwise compute `next_seq = 1`
1264 /// against an already-stamped first row.
1265 #[test]
1266 fn read_chain_head_rejects_null_sequence_row() {
1267 // Build a NULL-permissive schema so we can INSERT a sequence
1268 // IS NULL row directly.
1269 let conn = Connection::open_in_memory().expect("in-memory db");
1270 conn.execute_batch(
1271 "CREATE TABLE signed_events (
1272 id TEXT PRIMARY KEY,
1273 agent_id TEXT NOT NULL,
1274 event_type TEXT NOT NULL,
1275 payload_hash BLOB NOT NULL,
1276 signature BLOB,
1277 attest_level TEXT NOT NULL DEFAULT 'unsigned',
1278 timestamp TEXT NOT NULL,
1279 prev_hash BLOB,
1280 sequence INTEGER
1281 );
1282 CREATE UNIQUE INDEX idx_signed_events_sequence
1283 ON signed_events(sequence);",
1284 )
1285 .unwrap();
1286 // Insert a row whose sequence column is NULL (pre-v34 shape).
1287 conn.execute(
1288 "INSERT INTO signed_events \
1289 (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
1290 prev_hash, sequence) \
1291 VALUES ('legacy', 'alice', 'memory_link.created', X'00', NULL, 'unsigned', \
1292 '2026-05-13T00:00:00+00:00', NULL, NULL)",
1293 [],
1294 )
1295 .unwrap();
1296
1297 // read_chain_head MUST surface this clearly, not silently
1298 // collapse the NULL to 0.
1299 let err = read_chain_head(&conn).expect_err("NULL-sequence row must trigger diagnostic");
1300 let rendered = format!("{err:#}");
1301 assert!(
1302 rendered.contains("sequence IS NULL") || rendered.contains("backfill incomplete"),
1303 "diagnostic must mention NULL-sequence rows; got: {rendered}"
1304 );
1305 }
1306
1307 /// And, by extension, `append_signed_event` MUST refuse to grow
1308 /// the chain when the diagnostic fires — silently appending atop
1309 /// a partially-migrated table is the exact failure mode COR-9 closes.
1310 #[test]
1311 fn append_signed_event_refuses_when_null_sequence_present() {
1312 let conn = Connection::open_in_memory().expect("in-memory db");
1313 conn.execute_batch(
1314 "CREATE TABLE signed_events (
1315 id TEXT PRIMARY KEY,
1316 agent_id TEXT NOT NULL,
1317 event_type TEXT NOT NULL,
1318 payload_hash BLOB NOT NULL,
1319 signature BLOB,
1320 attest_level TEXT NOT NULL DEFAULT 'unsigned',
1321 timestamp TEXT NOT NULL,
1322 prev_hash BLOB,
1323 sequence INTEGER
1324 );
1325 CREATE UNIQUE INDEX idx_signed_events_sequence
1326 ON signed_events(sequence);",
1327 )
1328 .unwrap();
1329 conn.execute(
1330 "INSERT INTO signed_events \
1331 (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
1332 prev_hash, sequence) \
1333 VALUES ('legacy', 'alice', 'memory_link.created', X'00', NULL, 'unsigned', \
1334 '2026-05-13T00:00:00+00:00', NULL, NULL)",
1335 [],
1336 )
1337 .unwrap();
1338 let event = fixture("alice", "memory_link.created");
1339 let err = append_signed_event(&conn, &event)
1340 .expect_err("append must refuse while NULL-sequence row exists");
1341 let rendered = format!("{err:#}");
1342 assert!(
1343 rendered.contains("sequence IS NULL") || rendered.contains("backfill incomplete"),
1344 "diagnostic must surface in append error; got: {rendered}"
1345 );
1346 }
1347
1348 #[test]
1349 fn list_signed_events_with_agent_filter_row_decode_error_propagates() {
1350 // Same as above, but exercise the agent_id == Some(...) branch
1351 // of list_signed_events. The `WHERE agent_id = ?1` won't
1352 // match NULL rows (NULL ≠ anything), so we need a row whose
1353 // agent_id is the queried string but another column NULLs out.
1354 // Insert a row whose `event_type` is NULL — row.get(2)?
1355 // fails on NULL→String when listing filtered by agent.
1356 let conn = Connection::open_in_memory().expect("in-memory db");
1357 conn.execute_batch(
1358 "CREATE TABLE signed_events (
1359 id TEXT PRIMARY KEY,
1360 agent_id TEXT NOT NULL,
1361 event_type TEXT,
1362 payload_hash BLOB NOT NULL,
1363 signature BLOB,
1364 attest_level TEXT NOT NULL,
1365 timestamp TEXT NOT NULL,
1366 prev_hash BLOB,
1367 sequence INTEGER
1368 );",
1369 )
1370 .unwrap();
1371 conn.execute(
1372 "INSERT INTO signed_events \
1373 (id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
1374 prev_hash, sequence) \
1375 VALUES ('row2', 'alice', NULL, X'00', NULL, 'unsigned', \
1376 '2026-05-13T00:00:00+00:00', NULL, 1)",
1377 [],
1378 )
1379 .unwrap();
1380 let res = list_signed_events(&conn, Some("alice"), 10, 0);
1381 assert!(res.is_err(), "row decode must fail when event_type is NULL");
1382 }
1383}