1use crate::backend::{as_u64, blob32, Backend, Value};
21use crate::error::{Result, StoreError};
22
23const LOG_TABLE: &str = "_agent_store_log";
24
25#[derive(Clone, Debug, PartialEq, Eq)]
27pub struct Entry {
28 pub stream: String,
29 pub writer: String,
30 pub seq: u64,
31 pub prev_hash: Option<[u8; 32]>,
33 pub content_hash: [u8; 32],
34 pub payload: Vec<u8>,
35}
36
37pub struct WriterLog;
39
40impl WriterLog {
41 pub fn ensure_schema(db: &dyn Backend) -> Result<()> {
43 db.exec(
44 &format!(
45 "CREATE TABLE IF NOT EXISTS {LOG_TABLE} (\
46 stream TEXT NOT NULL, \
47 writer TEXT NOT NULL, \
48 seq INTEGER NOT NULL, \
49 prev_hash BLOB, \
50 content_hash BLOB NOT NULL, \
51 payload BLOB NOT NULL, \
52 PRIMARY KEY (stream, writer, seq))"
53 ),
54 &[],
55 )?;
56 Ok(())
57 }
58
59 pub fn head(db: &dyn Backend, stream: &str, writer: &str) -> Result<Option<Entry>> {
61 let rows = db.query(
62 &format!(
63 "SELECT seq, prev_hash, content_hash, payload FROM {LOG_TABLE} \
64 WHERE stream = ? AND writer = ? ORDER BY seq DESC LIMIT 1"
65 ),
66 &[Value::Text(stream.into()), Value::Text(writer.into())],
67 )?;
68 match rows.first() {
69 None => Ok(None),
70 Some(row) => Ok(Some(row_to_entry(stream, writer, row)?)),
71 }
72 }
73
74 pub fn append(db: &dyn Backend, stream: &str, writer: &str, payload: &[u8]) -> Result<Entry> {
76 let (seq, prev_hash) = match Self::head(db, stream, writer)? {
77 Some(h) => (h.seq + 1, Some(h.content_hash)),
78 None => (1, None),
79 };
80 let content_hash = chain_hash(prev_hash.as_ref(), payload);
81 db.exec(
82 &format!(
83 "INSERT INTO {LOG_TABLE} \
84 (stream, writer, seq, prev_hash, content_hash, payload) \
85 VALUES (?, ?, ?, ?, ?, ?)"
86 ),
87 &[
88 Value::Text(stream.into()),
89 Value::Text(writer.into()),
90 Value::Int(seq as i64),
91 match prev_hash {
92 Some(h) => Value::Blob(h.to_vec()),
93 None => Value::Null,
94 },
95 Value::Blob(content_hash.to_vec()),
96 Value::Blob(payload.to_vec()),
97 ],
98 )?;
99 Ok(Entry {
100 stream: stream.into(),
101 writer: writer.into(),
102 seq,
103 prev_hash,
104 content_hash,
105 payload: payload.to_vec(),
106 })
107 }
108
109 pub fn entries(db: &dyn Backend, stream: &str, writer: &str) -> Result<Vec<Entry>> {
111 let rows = db.query(
112 &format!(
113 "SELECT seq, prev_hash, content_hash, payload FROM {LOG_TABLE} \
114 WHERE stream = ? AND writer = ? ORDER BY seq ASC"
115 ),
116 &[Value::Text(stream.into()), Value::Text(writer.into())],
117 )?;
118 rows.iter()
119 .map(|r| row_to_entry(stream, writer, r))
120 .collect()
121 }
122
123 pub fn verify(db: &dyn Backend, stream: &str, writer: &str) -> Result<()> {
125 let mut prev: Option<[u8; 32]> = None;
126 for (expected_seq, entry) in (1u64..).zip(Self::entries(db, stream, writer)?) {
128 if entry.seq != expected_seq {
129 return Err(StoreError::ChainBroken {
130 stream: stream.into(),
131 writer: writer.into(),
132 seq: entry.seq,
133 detail: format!("expected seq {expected_seq}, found {}", entry.seq),
134 });
135 }
136 if entry.prev_hash != prev {
137 return Err(StoreError::ChainBroken {
138 stream: stream.into(),
139 writer: writer.into(),
140 seq: entry.seq,
141 detail: "prev_hash does not link to the prior entry".into(),
142 });
143 }
144 let recomputed = chain_hash(prev.as_ref(), &entry.payload);
145 if recomputed != entry.content_hash {
146 return Err(StoreError::ChainBroken {
147 stream: stream.into(),
148 writer: writer.into(),
149 seq: entry.seq,
150 detail: "content_hash does not match payload (tampered)".into(),
151 });
152 }
153 prev = Some(entry.content_hash);
154 }
155 Ok(())
156 }
157}
158
159fn chain_hash(prev: Option<&[u8; 32]>, payload: &[u8]) -> [u8; 32] {
161 let mut hasher = blake3::Hasher::new();
162 if let Some(p) = prev {
163 hasher.update(p);
164 }
165 hasher.update(payload);
166 *hasher.finalize().as_bytes()
167}
168
169fn row_to_entry(stream: &str, writer: &str, row: &[Value]) -> Result<Entry> {
170 let seq = as_u64(&row[0])?;
171 let prev_hash = match &row[1] {
172 Value::Null => None,
173 other => Some(blob32(other)?),
174 };
175 let content_hash = blob32(&row[2])?;
176 let payload = match &row[3] {
177 Value::Blob(b) => b.clone(),
178 other => {
179 return Err(StoreError::MalformedRow(format!(
180 "payload must be a blob, got {other:?}"
181 )))
182 }
183 };
184 Ok(Entry {
185 stream: stream.into(),
186 writer: writer.into(),
187 seq,
188 prev_hash,
189 content_hash,
190 payload,
191 })
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use crate::backend::SqliteBackend;
198
199 fn db() -> SqliteBackend {
200 let db = SqliteBackend::in_memory().unwrap();
201 WriterLog::ensure_schema(&db).unwrap();
202 db
203 }
204
205 #[test]
206 fn appends_chain_and_verify() {
207 let db = db();
208 let e1 = WriterLog::append(&db, "conv:x", "alice", b"hello").unwrap();
209 let e2 = WriterLog::append(&db, "conv:x", "alice", b"world").unwrap();
210 assert_eq!(e1.seq, 1);
211 assert_eq!(e1.prev_hash, None);
212 assert_eq!(e2.seq, 2);
213 assert_eq!(e2.prev_hash, Some(e1.content_hash));
214 assert_ne!(e1.content_hash, e2.content_hash);
215 WriterLog::verify(&db, "conv:x", "alice").unwrap();
216 }
217
218 #[test]
219 fn writers_have_independent_sequences() {
220 let db = db();
221 WriterLog::append(&db, "conv:x", "alice", b"a").unwrap();
222 let bob1 = WriterLog::append(&db, "conv:x", "bob", b"b").unwrap();
223 assert_eq!(bob1.seq, 1, "each writer's seq starts at 1");
224 WriterLog::verify(&db, "conv:x", "alice").unwrap();
225 WriterLog::verify(&db, "conv:x", "bob").unwrap();
226 }
227
228 #[test]
229 fn verify_detects_tampering() {
230 let db = db();
233 WriterLog::append(&db, "conv:x", "alice", b"original").unwrap();
234 WriterLog::append(&db, "conv:x", "alice", b"second").unwrap();
235 db.exec(
237 "UPDATE _agent_store_log SET payload = ? WHERE stream = ? AND writer = ? AND seq = 1",
238 &[
239 Value::Blob(b"TAMPERED".to_vec()),
240 Value::Text("conv:x".into()),
241 Value::Text("alice".into()),
242 ],
243 )
244 .unwrap();
245 let err = WriterLog::verify(&db, "conv:x", "alice").unwrap_err();
246 assert!(matches!(err, StoreError::ChainBroken { seq: 1, .. }));
247 }
248
249 #[test]
250 fn verify_empty_log_is_ok() {
251 let db = db();
252 WriterLog::verify(&db, "conv:none", "nobody").unwrap();
253 }
254}