Skip to main content

agent_store/
writer_log.rs

1//! Per-writer, BLAKE3-chained append log โ€” newt-agent's ยง6 ordering contract,
2//! lifted into the substrate.
3//!
4//! Each `(stream, writer)` pair owns a strictly-monotonic `seq` starting at 1.
5//! Every entry's `content_hash` chains the previous entry's hash:
6//!
7//! ```text
8//! content_hash = BLAKE3( prev_hash ?: "" || payload )
9//! ```
10//!
11//! so the log is tamper-evident: [`WriterLog::verify`] recomputes the chain
12//! and rejects any reorder, gap, or edit. Ordering is purely causal โ€” no
13//! wall-clock anywhere.
14//!
15//! v0 note: `append` does read-head-then-insert; the `PRIMARY KEY(stream,
16//! writer, seq)` makes a duplicate `seq` a hard error rather than silent
17//! corruption. A single writer is single-threaded within a process by
18//! construction (one identity per agent), which is the intended use.
19
20use crate::backend::{as_u64, blob32, Backend, Value};
21use crate::error::{Result, StoreError};
22
23const LOG_TABLE: &str = "_agent_store_log";
24
25/// One entry in a writer's chained log.
26#[derive(Clone, Debug, PartialEq, Eq)]
27pub struct Entry {
28    pub stream: String,
29    pub writer: String,
30    pub seq: u64,
31    /// Previous entry's `content_hash`; `None` for the genesis entry.
32    pub prev_hash: Option<[u8; 32]>,
33    pub content_hash: [u8; 32],
34    pub payload: Vec<u8>,
35}
36
37/// Stateless operations over the chained log table.
38pub struct WriterLog;
39
40impl WriterLog {
41    /// Create the backing table if it does not exist. Idempotent.
42    pub fn ensure_schema(db: &dyn Backend) -> Result<()> {
43        db.exec(
44            &format!(
45                "CREATE TABLE IF NOT EXISTS {LOG_TABLE} (\
46                 stream TEXT NOT NULL, \
47                 writer TEXT NOT NULL, \
48                 seq INTEGER NOT NULL, \
49                 prev_hash BLOB, \
50                 content_hash BLOB NOT NULL, \
51                 payload BLOB NOT NULL, \
52                 PRIMARY KEY (stream, writer, seq))"
53            ),
54            &[],
55        )?;
56        Ok(())
57    }
58
59    /// The highest-seq entry for `(stream, writer)`, or `None` if empty.
60    pub fn head(db: &dyn Backend, stream: &str, writer: &str) -> Result<Option<Entry>> {
61        let rows = db.query(
62            &format!(
63                "SELECT seq, prev_hash, content_hash, payload FROM {LOG_TABLE} \
64                 WHERE stream = ? AND writer = ? ORDER BY seq DESC LIMIT 1"
65            ),
66            &[Value::Text(stream.into()), Value::Text(writer.into())],
67        )?;
68        match rows.first() {
69            None => Ok(None),
70            Some(row) => Ok(Some(row_to_entry(stream, writer, row)?)),
71        }
72    }
73
74    /// Append `payload`, computing the next `seq` and chained `content_hash`.
75    pub fn append(db: &dyn Backend, stream: &str, writer: &str, payload: &[u8]) -> Result<Entry> {
76        let (seq, prev_hash) = match Self::head(db, stream, writer)? {
77            Some(h) => (h.seq + 1, Some(h.content_hash)),
78            None => (1, None),
79        };
80        let content_hash = chain_hash(prev_hash.as_ref(), payload);
81        db.exec(
82            &format!(
83                "INSERT INTO {LOG_TABLE} \
84                 (stream, writer, seq, prev_hash, content_hash, payload) \
85                 VALUES (?, ?, ?, ?, ?, ?)"
86            ),
87            &[
88                Value::Text(stream.into()),
89                Value::Text(writer.into()),
90                Value::Int(seq as i64),
91                match prev_hash {
92                    Some(h) => Value::Blob(h.to_vec()),
93                    None => Value::Null,
94                },
95                Value::Blob(content_hash.to_vec()),
96                Value::Blob(payload.to_vec()),
97            ],
98        )?;
99        Ok(Entry {
100            stream: stream.into(),
101            writer: writer.into(),
102            seq,
103            prev_hash,
104            content_hash,
105            payload: payload.to_vec(),
106        })
107    }
108
109    /// All entries for `(stream, writer)`, ascending by seq.
110    pub fn entries(db: &dyn Backend, stream: &str, writer: &str) -> Result<Vec<Entry>> {
111        let rows = db.query(
112            &format!(
113                "SELECT seq, prev_hash, content_hash, payload FROM {LOG_TABLE} \
114                 WHERE stream = ? AND writer = ? ORDER BY seq ASC"
115            ),
116            &[Value::Text(stream.into()), Value::Text(writer.into())],
117        )?;
118        rows.iter()
119            .map(|r| row_to_entry(stream, writer, r))
120            .collect()
121    }
122
123    /// Recompute the chain and reject any gap, reorder, or tamper.
124    pub fn verify(db: &dyn Backend, stream: &str, writer: &str) -> Result<()> {
125        let mut prev: Option<[u8; 32]> = None;
126        // seq is 1-based and contiguous; zip against the natural counter.
127        for (expected_seq, entry) in (1u64..).zip(Self::entries(db, stream, writer)?) {
128            if entry.seq != expected_seq {
129                return Err(StoreError::ChainBroken {
130                    stream: stream.into(),
131                    writer: writer.into(),
132                    seq: entry.seq,
133                    detail: format!("expected seq {expected_seq}, found {}", entry.seq),
134                });
135            }
136            if entry.prev_hash != prev {
137                return Err(StoreError::ChainBroken {
138                    stream: stream.into(),
139                    writer: writer.into(),
140                    seq: entry.seq,
141                    detail: "prev_hash does not link to the prior entry".into(),
142                });
143            }
144            let recomputed = chain_hash(prev.as_ref(), &entry.payload);
145            if recomputed != entry.content_hash {
146                return Err(StoreError::ChainBroken {
147                    stream: stream.into(),
148                    writer: writer.into(),
149                    seq: entry.seq,
150                    detail: "content_hash does not match payload (tampered)".into(),
151                });
152            }
153            prev = Some(entry.content_hash);
154        }
155        Ok(())
156    }
157}
158
159/// `BLAKE3(prev_hash? || payload)`.
160fn chain_hash(prev: Option<&[u8; 32]>, payload: &[u8]) -> [u8; 32] {
161    let mut hasher = blake3::Hasher::new();
162    if let Some(p) = prev {
163        hasher.update(p);
164    }
165    hasher.update(payload);
166    *hasher.finalize().as_bytes()
167}
168
169fn row_to_entry(stream: &str, writer: &str, row: &[Value]) -> Result<Entry> {
170    let seq = as_u64(&row[0])?;
171    let prev_hash = match &row[1] {
172        Value::Null => None,
173        other => Some(blob32(other)?),
174    };
175    let content_hash = blob32(&row[2])?;
176    let payload = match &row[3] {
177        Value::Blob(b) => b.clone(),
178        other => {
179            return Err(StoreError::MalformedRow(format!(
180                "payload must be a blob, got {other:?}"
181            )))
182        }
183    };
184    Ok(Entry {
185        stream: stream.into(),
186        writer: writer.into(),
187        seq,
188        prev_hash,
189        content_hash,
190        payload,
191    })
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use crate::backend::SqliteBackend;
198
199    fn db() -> SqliteBackend {
200        let db = SqliteBackend::in_memory().unwrap();
201        WriterLog::ensure_schema(&db).unwrap();
202        db
203    }
204
205    #[test]
206    fn appends_chain_and_verify() {
207        let db = db();
208        let e1 = WriterLog::append(&db, "conv:x", "alice", b"hello").unwrap();
209        let e2 = WriterLog::append(&db, "conv:x", "alice", b"world").unwrap();
210        assert_eq!(e1.seq, 1);
211        assert_eq!(e1.prev_hash, None);
212        assert_eq!(e2.seq, 2);
213        assert_eq!(e2.prev_hash, Some(e1.content_hash));
214        assert_ne!(e1.content_hash, e2.content_hash);
215        WriterLog::verify(&db, "conv:x", "alice").unwrap();
216    }
217
218    #[test]
219    fn writers_have_independent_sequences() {
220        let db = db();
221        WriterLog::append(&db, "conv:x", "alice", b"a").unwrap();
222        let bob1 = WriterLog::append(&db, "conv:x", "bob", b"b").unwrap();
223        assert_eq!(bob1.seq, 1, "each writer's seq starts at 1");
224        WriterLog::verify(&db, "conv:x", "alice").unwrap();
225        WriterLog::verify(&db, "conv:x", "bob").unwrap();
226    }
227
228    #[test]
229    fn verify_detects_tampering() {
230        // Regression: editing a stored payload must break verification, even
231        // though the row still parses and the seq is intact.
232        let db = db();
233        WriterLog::append(&db, "conv:x", "alice", b"original").unwrap();
234        WriterLog::append(&db, "conv:x", "alice", b"second").unwrap();
235        // Tamper with seq 1's payload directly, behind the chain's back.
236        db.exec(
237            "UPDATE _agent_store_log SET payload = ? WHERE stream = ? AND writer = ? AND seq = 1",
238            &[
239                Value::Blob(b"TAMPERED".to_vec()),
240                Value::Text("conv:x".into()),
241                Value::Text("alice".into()),
242            ],
243        )
244        .unwrap();
245        let err = WriterLog::verify(&db, "conv:x", "alice").unwrap_err();
246        assert!(matches!(err, StoreError::ChainBroken { seq: 1, .. }));
247    }
248
249    #[test]
250    fn verify_empty_log_is_ok() {
251        let db = db();
252        WriterLog::verify(&db, "conv:none", "nobody").unwrap();
253    }
254}