use anyhow::{Context, Result};
use rusqlite::{Connection, params};
use sha2::{Digest, Sha256};
pub(crate) const SIGNED_EVENTS_TRACE_TARGET: &str = "signed_events";
#[allow(dead_code)]
pub mod event_types {
pub const MEMORY_LINK_CREATED: &str = "memory_link.created";
pub const MEMORY_LINK_INVALIDATED: &str = "memory_link.invalidated";
pub const MEMORY_STORED: &str = "memory.stored";
pub const MEMORY_TOUCH: &str = "memory.touch";
pub const REFLECTION_INVALIDATION_NOTIFIED: &str = "reflection.invalidation_notified";
pub const REFLECTION_DEPTH_EXCEEDED: &str = "reflection.depth_exceeded";
pub const REFLECTS_ON_CYCLE_REFUSED: &str = "reflects_on.cycle_refused";
pub const SKILL_EXPORTED: &str = "skill.exported";
pub const SKILL_REGISTERED: &str = "skill.registered";
pub const MEMORY_CAPTURE_TURN: &str = "memory_capture_turn";
pub const PERSONA_GENERATED: &str = "persona_generated";
pub const ATOMISATION_COMPLETE: &str = "atomisation_complete";
pub const FED_CREDENTIAL_RENEWED: &str = "federation.credential_renewed";
pub const GOVERNANCE_RULE_REMOVED: &str = "governance.rule_removed";
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct SignedEvent {
pub id: String,
pub agent_id: String,
pub event_type: String,
pub payload_hash: Vec<u8>,
pub signature: Option<Vec<u8>>,
pub attest_level: String,
pub timestamp: String,
pub prev_hash: Vec<u8>,
pub sequence: i64,
}
impl SignedEvent {
#[must_use]
pub fn with_daemon_signature(
payload_hash: Vec<u8>,
agent_id: String,
event_type: String,
timestamp: String,
) -> Self {
let (signature, attest_level) =
match crate::governance::audit::try_sign_audit_payload(&payload_hash) {
Some((sig_bytes, tag)) => (Some(sig_bytes), tag.to_string()),
None => (
None,
crate::models::AttestLevel::Unsigned.as_str().to_string(),
),
};
SignedEvent {
id: uuid::Uuid::new_v4().to_string(),
agent_id,
event_type,
payload_hash,
signature,
attest_level,
timestamp,
..SignedEvent::default()
}
}
}
pub const ZERO_HASH: [u8; 32] = [0u8; 32];
const FIELD_SEP: u8 = 0x1F;
#[must_use]
pub fn canonical_chain_bytes(event: &SignedEvent) -> Vec<u8> {
let mut out: Vec<u8> = Vec::with_capacity(
event.id.len()
+ event.agent_id.len()
+ event.event_type.len()
+ event.payload_hash.len()
+ event.signature.as_ref().map_or(0, Vec::len)
+ event.attest_level.len()
+ event.timestamp.len()
+ 8
+ 7, );
out.extend_from_slice(event.id.as_bytes());
out.push(FIELD_SEP);
out.extend_from_slice(event.agent_id.as_bytes());
out.push(FIELD_SEP);
out.extend_from_slice(event.event_type.as_bytes());
out.push(FIELD_SEP);
out.extend_from_slice(&event.payload_hash);
out.push(FIELD_SEP);
if let Some(sig) = event.signature.as_ref() {
out.extend_from_slice(sig);
}
out.push(FIELD_SEP);
out.extend_from_slice(event.attest_level.as_bytes());
out.push(FIELD_SEP);
out.extend_from_slice(event.timestamp.as_bytes());
out.push(FIELD_SEP);
out.extend_from_slice(&event.sequence.to_be_bytes());
out
}
fn read_chain_head(conn: &Connection) -> Result<(i64, [u8; 32])> {
let null_seq_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM signed_events WHERE sequence IS NULL",
[],
|row| row.get(0),
)
.context("read_chain_head: null-sequence diagnostic")?;
if null_seq_count > 0 {
tracing::error!(
null_sequence_rows = null_seq_count,
"signed_events: found {null_seq_count} row(s) with sequence IS NULL — \
v34 chain backfill is incomplete. Re-run the migration ladder \
(`ai-memory migrate` or restart with the current binary) to \
stamp prev_hash + sequence on the unmigrated rows; refusing to \
append further audit rows until the chain is repaired."
);
anyhow::bail!(
"read_chain_head: {null_seq_count} signed_events row(s) have sequence IS NULL — \
v34 chain backfill incomplete; re-run `ai-memory migrate`"
);
}
let mut stmt = conn
.prepare(
"SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
timestamp, sequence \
FROM signed_events \
WHERE sequence IS NOT NULL \
ORDER BY sequence DESC, rowid DESC \
LIMIT 1",
)
.context("read_chain_head: prepare")?;
let head: Option<SignedEvent> = stmt
.query_map([], |row| {
Ok(SignedEvent {
id: row.get(0)?,
agent_id: row.get(1)?,
event_type: row.get(2)?,
payload_hash: row.get(3)?,
signature: row.get(4)?,
attest_level: row.get(5)?,
timestamp: row.get(6)?,
sequence: row.get(7)?,
prev_hash: Vec::new(), })
})
.context("read_chain_head: query_map")?
.next()
.transpose()
.context("read_chain_head: collect")?;
match head {
None => Ok((0, ZERO_HASH)),
Some(prev) => {
let max_seq = prev.sequence;
let canon = canonical_chain_bytes(&prev);
let mut hasher = Sha256::new();
hasher.update(&canon);
let mut digest = [0u8; 32];
digest.copy_from_slice(&hasher.finalize());
Ok((max_seq, digest))
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChainVerificationReport {
pub rows_checked: u64,
pub chain_break: Option<i64>,
pub signature_failures: Vec<i64>,
}
impl ChainVerificationReport {
#[must_use]
pub fn chain_holds(&self) -> bool {
self.chain_break.is_none()
}
}
pub fn verify_chain(
conn: &Connection,
since_sequence: Option<i64>,
) -> Result<ChainVerificationReport> {
let lower = since_sequence.unwrap_or(0);
let mut stmt = conn
.prepare(
"SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
timestamp, prev_hash, COALESCE(sequence, 0) \
FROM signed_events \
WHERE COALESCE(sequence, 0) > ?1 \
ORDER BY COALESCE(sequence, 0) ASC",
)
.context("verify_chain: prepare")?;
let mut rows = stmt.query(params![lower]).context("verify_chain: query")?;
let mut rows_checked: u64 = 0;
let mut chain_break: Option<i64> = None;
let verifier: Option<ed25519_dalek::VerifyingKey> =
crate::governance::audit::resolve_daemon_verifying_key();
let mut signature_failures: Vec<i64> = Vec::new();
let mut expected_seq = lower + 1;
let mut prev_canonical_hash: [u8; 32] = ZERO_HASH;
if lower > 0 {
let mut head_stmt = conn
.prepare(
"SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
timestamp, COALESCE(sequence, 0) \
FROM signed_events \
WHERE COALESCE(sequence, 0) = ?1",
)
.context("verify_chain: prepare head")?;
let head: Option<SignedEvent> = head_stmt
.query_map(params![lower], |row| {
Ok(SignedEvent {
id: row.get(0)?,
agent_id: row.get(1)?,
event_type: row.get(2)?,
payload_hash: row.get(3)?,
signature: row.get(4)?,
attest_level: row.get(5)?,
timestamp: row.get(6)?,
sequence: row.get(7)?,
prev_hash: Vec::new(),
})
})
.context("verify_chain: head query")?
.next()
.transpose()
.context("verify_chain: head collect")?;
if let Some(h) = head {
let canon = canonical_chain_bytes(&h);
let mut hasher = Sha256::new();
hasher.update(&canon);
prev_canonical_hash.copy_from_slice(&hasher.finalize());
}
}
while let Some(row) = rows.next().context("verify_chain: next row")? {
rows_checked += 1;
let event = SignedEvent {
id: row.get(0).context("verify_chain: id")?,
agent_id: row.get(1).context("verify_chain: agent_id")?,
event_type: row.get(2).context("verify_chain: event_type")?,
payload_hash: row.get(3).context("verify_chain: payload_hash")?,
signature: row.get(4).context("verify_chain: signature")?,
attest_level: row.get(5).context("verify_chain: attest_level")?,
timestamp: row.get(6).context("verify_chain: timestamp")?,
prev_hash: row
.get::<_, Option<Vec<u8>>>(7)
.context("verify_chain: prev_hash")?
.unwrap_or_default(),
sequence: row.get(8).context("verify_chain: sequence")?,
};
if event.sequence != expected_seq {
if chain_break.is_none() {
chain_break = Some(event.sequence);
}
expected_seq = event.sequence;
}
if event.prev_hash.len() != 32 || event.prev_hash != prev_canonical_hash {
if chain_break.is_none() {
chain_break = Some(event.sequence);
}
}
if let Some(vk) = verifier.as_ref() {
match event.signature.as_ref() {
Some(sig_bytes) if !sig_bytes.is_empty() => {
let sig_array: Option<[u8; 64]> = sig_bytes.as_slice().try_into().ok();
match sig_array {
Some(arr) => {
let sig = ed25519_dalek::Signature::from_bytes(&arr);
if vk.verify_strict(&event.payload_hash, &sig).is_err() {
signature_failures.push(event.sequence);
}
}
None => {
signature_failures.push(event.sequence);
}
}
}
_ => {
if event.attest_level != crate::models::AttestLevel::Unsigned.as_str() {
signature_failures.push(event.sequence);
}
}
}
}
let canon = canonical_chain_bytes(&event);
let mut hasher = Sha256::new();
hasher.update(&canon);
prev_canonical_hash.copy_from_slice(&hasher.finalize());
expected_seq += 1;
}
Ok(ChainVerificationReport {
rows_checked,
chain_break,
signature_failures,
})
}
#[must_use]
pub fn payload_hash(bytes: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(bytes);
hasher.finalize().to_vec()
}
pub fn append_signed_event(conn: &Connection, event: &SignedEvent) -> Result<()> {
let tx = rusqlite::Transaction::new_unchecked(conn, rusqlite::TransactionBehavior::Immediate)
.context("append signed_event: begin IMMEDIATE tx")?;
append_signed_event_no_tx(&tx, event)?;
tx.commit().context("append signed_event: commit tx")?;
Ok(())
}
pub fn append_signed_event_no_tx(conn: &Connection, event: &SignedEvent) -> Result<()> {
let (max_seq, prev_hash) = read_chain_head(conn).context("append signed_event: read head")?;
let next_seq = max_seq + 1;
conn.execute(
"INSERT INTO signed_events \
(id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
prev_hash, sequence) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
event.id,
event.agent_id,
event.event_type,
event.payload_hash,
event.signature,
event.attest_level,
event.timestamp,
prev_hash.to_vec(),
next_seq,
],
)
.context("append signed_event")?;
Ok(())
}
pub fn list_signed_events(
conn: &Connection,
agent_id: Option<&str>,
limit: usize,
offset: usize,
) -> Result<Vec<SignedEvent>> {
let limit_i64 = i64::try_from(limit).unwrap_or(i64::MAX);
let offset_i64 = i64::try_from(offset).unwrap_or(0);
if let Some(agent) = agent_id {
let mut stmt = conn.prepare(
"SELECT id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
prev_hash, COALESCE(sequence, 0) \
FROM signed_events \
WHERE agent_id = ?1 \
ORDER BY timestamp ASC, id ASC \
LIMIT ?2 OFFSET ?3",
)?;
let rows = stmt.query_map(params![agent, limit_i64, offset_i64], row_to_event)?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(Into::into)
} else {
let mut stmt = conn.prepare(
"SELECT id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
prev_hash, COALESCE(sequence, 0) \
FROM signed_events \
ORDER BY timestamp ASC, id ASC \
LIMIT ?1 OFFSET ?2",
)?;
let rows = stmt.query_map(params![limit_i64, offset_i64], row_to_event)?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(Into::into)
}
}
fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<SignedEvent> {
Ok(SignedEvent {
id: row.get(0)?,
agent_id: row.get(1)?,
event_type: row.get(2)?,
payload_hash: row.get(3)?,
signature: row.get(4)?,
attest_level: row.get(5)?,
timestamp: row.get(6)?,
prev_hash: row.get::<_, Option<Vec<u8>>>(7)?.unwrap_or_default(),
sequence: row.get(8)?,
})
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use rusqlite::Connection;
use uuid::Uuid;
fn fresh_db() -> Connection {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(include_str!(
"../migrations/sqlite/0020_v07_signed_events.sql"
))
.expect("apply v25 migration");
conn
}
fn fixture(agent: &str, event_type: &str) -> SignedEvent {
SignedEvent {
id: Uuid::new_v4().to_string(),
agent_id: agent.to_string(),
event_type: event_type.to_string(),
payload_hash: payload_hash(b"test-payload"),
signature: None,
attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
timestamp: Utc::now().to_rfc3339(),
prev_hash: Vec::new(),
sequence: 0,
}
}
#[test]
fn migration_is_idempotent() {
let conn = fresh_db();
conn.execute_batch(include_str!(
"../migrations/sqlite/0020_v07_signed_events.sql"
))
.expect("re-apply v25 migration");
let event = fixture("alice", "memory_link.created");
append_signed_event(&conn, &event).expect("append after re-migration");
}
#[test]
fn append_then_list_round_trip() {
let conn = fresh_db();
let event = fixture("alice", "memory_link.created");
append_signed_event(&conn, &event).expect("append");
let listed = list_signed_events(&conn, Some("alice"), 10, 0).expect("list");
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].id, event.id);
assert_eq!(listed[0].agent_id, event.agent_id);
assert_eq!(listed[0].event_type, event.event_type);
assert_eq!(listed[0].payload_hash, event.payload_hash);
assert_eq!(listed[0].signature, event.signature);
assert_eq!(listed[0].attest_level, event.attest_level);
assert_eq!(listed[0].timestamp, event.timestamp);
assert_eq!(listed[0].prev_hash, ZERO_HASH.to_vec());
assert_eq!(listed[0].sequence, 1);
}
#[test]
fn list_orders_by_timestamp_ascending() {
let conn = fresh_db();
let mut a = fixture("alice", "memory_link.created");
a.timestamp = "2026-05-05T12:00:00+00:00".to_string();
let mut b = fixture("alice", "memory_link.created");
b.timestamp = "2026-05-05T12:00:01+00:00".to_string();
let mut c = fixture("alice", "memory_link.created");
c.timestamp = "2026-05-05T12:00:02+00:00".to_string();
append_signed_event(&conn, &b).unwrap();
append_signed_event(&conn, &c).unwrap();
append_signed_event(&conn, &a).unwrap();
let listed = list_signed_events(&conn, Some("alice"), 10, 0).expect("list");
let ts: Vec<&str> = listed.iter().map(|e| e.timestamp.as_str()).collect();
assert_eq!(
ts,
vec![
"2026-05-05T12:00:00+00:00",
"2026-05-05T12:00:01+00:00",
"2026-05-05T12:00:02+00:00",
],
"rows must come back in ascending timestamp order"
);
}
#[test]
fn list_filters_by_agent() {
let conn = fresh_db();
append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
append_signed_event(&conn, &fixture("bob", "memory_link.created")).unwrap();
append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
let alice = list_signed_events(&conn, Some("alice"), 10, 0).unwrap();
let bob = list_signed_events(&conn, Some("bob"), 10, 0).unwrap();
let all = list_signed_events(&conn, None, 10, 0).unwrap();
assert_eq!(alice.len(), 2);
assert_eq!(bob.len(), 1);
assert_eq!(all.len(), 3);
}
#[test]
fn list_respects_limit_and_offset() {
let conn = fresh_db();
for i in 0..5 {
let mut e = fixture("alice", "memory_link.created");
e.timestamp = format!("2026-05-05T12:00:0{i}+00:00");
append_signed_event(&conn, &e).unwrap();
}
let page1 = list_signed_events(&conn, Some("alice"), 2, 0).unwrap();
let page2 = list_signed_events(&conn, Some("alice"), 2, 2).unwrap();
assert_eq!(page1.len(), 2);
assert_eq!(page2.len(), 2);
assert_ne!(page1[0].id, page2[0].id);
}
#[test]
fn append_preserves_signature_blob() {
let conn = fresh_db();
let mut event = fixture("alice", "memory_link.created");
event.signature = Some(vec![0xAA; 64]); event.attest_level = "self_signed".to_string();
append_signed_event(&conn, &event).unwrap();
let listed = list_signed_events(&conn, Some("alice"), 10, 0).unwrap();
assert_eq!(listed[0].signature.as_deref(), Some(&[0xAA; 64][..]));
assert_eq!(listed[0].attest_level, "self_signed");
}
#[test]
fn indexes_exist_on_documented_columns() {
let conn = fresh_db();
let mut stmt = conn.prepare("PRAGMA index_list('signed_events')").unwrap();
let names: Vec<String> = stmt
.query_map([], |row| row.get::<_, String>(1))
.unwrap()
.collect::<rusqlite::Result<Vec<_>>>()
.unwrap();
assert!(
names.iter().any(|n| n == "idx_signed_events_agent"),
"missing idx_signed_events_agent in {names:?}"
);
assert!(
names.iter().any(|n| n == "idx_signed_events_type"),
"missing idx_signed_events_type in {names:?}"
);
assert!(
names.iter().any(|n| n == "idx_signed_events_timestamp"),
"missing idx_signed_events_timestamp in {names:?}"
);
assert!(
names.iter().any(|n| n == "idx_signed_events_sequence"),
"missing idx_signed_events_sequence in {names:?} \
— v34 (V-4 closeout, #698) requires a UNIQUE index on \
the cross-row chain sequence column"
);
}
#[test]
fn payload_hash_is_sha256_32_bytes() {
let h = payload_hash(b"hello world");
assert_eq!(h.len(), 32, "SHA-256 digest is 32 bytes");
assert_eq!(h, payload_hash(b"hello world"));
assert_ne!(h, payload_hash(b"hello worle"));
}
#[test]
fn append_duplicate_id_returns_error() {
let conn = fresh_db();
let mut a = fixture("alice", "memory_link.created");
append_signed_event(&conn, &a).expect("first append ok");
a.timestamp = Utc::now().to_rfc3339();
let err = append_signed_event(&conn, &a).expect_err("second append with same id must fail");
assert!(
format!("{err:?}").contains("append signed_event")
|| format!("{err:#}").contains("append signed_event"),
"anyhow context should include the 'append signed_event' tag, got: {err:?}"
);
}
#[test]
fn list_signed_events_empty_db_returns_empty() {
let conn = fresh_db();
let alice = list_signed_events(&conn, Some("alice"), 10, 0).expect("list ok");
let all = list_signed_events(&conn, None, 10, 0).expect("list ok");
assert!(alice.is_empty());
assert!(all.is_empty());
}
#[test]
fn list_signed_events_offset_past_end_returns_empty() {
let conn = fresh_db();
append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
let beyond = list_signed_events(&conn, Some("alice"), 10, 100).expect("list ok");
assert!(beyond.is_empty());
}
#[test]
fn list_signed_events_no_agent_filter_returns_all_agents() {
let conn = fresh_db();
append_signed_event(&conn, &fixture("alice", "memory_link.created")).unwrap();
append_signed_event(&conn, &fixture("bob", "memory_link.created")).unwrap();
append_signed_event(&conn, &fixture("carol", "memory_link.created")).unwrap();
let all = list_signed_events(&conn, None, 10, 0).expect("list ok");
let agents: std::collections::HashSet<&str> =
all.iter().map(|e| e.agent_id.as_str()).collect();
assert_eq!(agents.len(), 3);
}
#[test]
fn payload_hash_known_vector() {
let h = payload_hash(b"");
let hex: String = h.iter().map(|b| format!("{b:02x}")).collect();
assert_eq!(
hex,
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
);
}
#[test]
fn append_only_invariant_no_mutators_in_src() {
use std::path::Path;
let src_root = Path::new(env!("CARGO_MANIFEST_DIR")).join("src");
let forbidden: [String; 2] = [
format!("{} signed_events", "UPDATE"),
format!("{} signed_events", "DELETE FROM"),
];
let migration_carveouts: [&str; 2] = ["src/storage/migrations.rs", "src/store/postgres.rs"];
let mut hits: Vec<String> = Vec::new();
walk_rs_files(&src_root, &mut |path, contents| {
let path_str = path.to_string_lossy().replace('\\', "/");
let is_carveout = migration_carveouts.iter().any(|c| path_str.ends_with(c));
let stripped = strip_rust_comments(contents);
for needle in &forbidden {
if !stripped.contains(needle.as_str()) {
continue;
}
if is_carveout && needle.starts_with("UPDATE") {
continue;
}
hits.push(format!("{}: {}", path.display(), needle));
}
});
assert!(
hits.is_empty(),
"found forbidden mutator(s) on signed_events: {hits:?} \
— append-only invariant requires zero UPDATE/DELETE call sites in production code \
(the v34 backfill UPDATE in migrations.rs / postgres.rs is the only allowed exception)"
);
}
fn strip_rust_comments(src: &str) -> String {
let mut out = String::with_capacity(src.len());
for line in src.lines() {
let line_no_line_comment = match line.find("//") {
Some(idx) => &line[..idx],
None => line,
};
let mut buf = String::from(line_no_line_comment);
while let (Some(start), Some(end_rel)) = (buf.find("/*"), buf.find("*/").map(|i| i + 2))
{
if end_rel <= start {
break;
}
buf.replace_range(start..end_rel, "");
}
out.push_str(&buf);
out.push('\n');
}
out
}
fn walk_rs_files(dir: &std::path::Path, visit: &mut dyn FnMut(&std::path::Path, &str)) {
let Ok(entries) = std::fs::read_dir(dir) else {
return;
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
walk_rs_files(&path, visit);
} else if path.extension().and_then(|s| s.to_str()) == Some("rs") {
if let Ok(contents) = std::fs::read_to_string(&path) {
visit(&path, &contents);
}
}
}
}
#[test]
fn list_signed_events_row_decode_error_propagates() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(
"CREATE TABLE signed_events (
id TEXT PRIMARY KEY,
agent_id TEXT,
event_type TEXT NOT NULL,
payload_hash BLOB NOT NULL,
signature BLOB,
attest_level TEXT NOT NULL,
timestamp TEXT NOT NULL,
prev_hash BLOB,
sequence INTEGER
);",
)
.unwrap();
conn.execute(
"INSERT INTO signed_events \
(id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
prev_hash, sequence) \
VALUES ('row1', NULL, 'memory_link.created', X'00', NULL, 'unsigned', \
'2026-05-13T00:00:00+00:00', NULL, 1)",
[],
)
.unwrap();
let res = list_signed_events(&conn, None, 10, 0);
assert!(res.is_err(), "row decode must fail when agent_id is NULL");
}
#[test]
fn read_chain_head_rejects_null_sequence_row() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(
"CREATE TABLE signed_events (
id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload_hash BLOB NOT NULL,
signature BLOB,
attest_level TEXT NOT NULL DEFAULT 'unsigned',
timestamp TEXT NOT NULL,
prev_hash BLOB,
sequence INTEGER
);
CREATE UNIQUE INDEX idx_signed_events_sequence
ON signed_events(sequence);",
)
.unwrap();
conn.execute(
"INSERT INTO signed_events \
(id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
prev_hash, sequence) \
VALUES ('legacy', 'alice', 'memory_link.created', X'00', NULL, 'unsigned', \
'2026-05-13T00:00:00+00:00', NULL, NULL)",
[],
)
.unwrap();
let err = read_chain_head(&conn).expect_err("NULL-sequence row must trigger diagnostic");
let rendered = format!("{err:#}");
assert!(
rendered.contains("sequence IS NULL") || rendered.contains("backfill incomplete"),
"diagnostic must mention NULL-sequence rows; got: {rendered}"
);
}
#[test]
fn append_signed_event_refuses_when_null_sequence_present() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(
"CREATE TABLE signed_events (
id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload_hash BLOB NOT NULL,
signature BLOB,
attest_level TEXT NOT NULL DEFAULT 'unsigned',
timestamp TEXT NOT NULL,
prev_hash BLOB,
sequence INTEGER
);
CREATE UNIQUE INDEX idx_signed_events_sequence
ON signed_events(sequence);",
)
.unwrap();
conn.execute(
"INSERT INTO signed_events \
(id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
prev_hash, sequence) \
VALUES ('legacy', 'alice', 'memory_link.created', X'00', NULL, 'unsigned', \
'2026-05-13T00:00:00+00:00', NULL, NULL)",
[],
)
.unwrap();
let event = fixture("alice", "memory_link.created");
let err = append_signed_event(&conn, &event)
.expect_err("append must refuse while NULL-sequence row exists");
let rendered = format!("{err:#}");
assert!(
rendered.contains("sequence IS NULL") || rendered.contains("backfill incomplete"),
"diagnostic must surface in append error; got: {rendered}"
);
}
#[test]
fn list_signed_events_with_agent_filter_row_decode_error_propagates() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(
"CREATE TABLE signed_events (
id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
event_type TEXT,
payload_hash BLOB NOT NULL,
signature BLOB,
attest_level TEXT NOT NULL,
timestamp TEXT NOT NULL,
prev_hash BLOB,
sequence INTEGER
);",
)
.unwrap();
conn.execute(
"INSERT INTO signed_events \
(id, agent_id, event_type, payload_hash, signature, attest_level, timestamp, \
prev_hash, sequence) \
VALUES ('row2', 'alice', NULL, X'00', NULL, 'unsigned', \
'2026-05-13T00:00:00+00:00', NULL, 1)",
[],
)
.unwrap();
let res = list_signed_events(&conn, Some("alice"), 10, 0);
assert!(res.is_err(), "row decode must fail when event_type is NULL");
}
}