use crate::backend::{as_u64, blob32, Backend, Value};
use crate::error::{Result, StoreError};
const LOG_TABLE: &str = "_agent_store_log";
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Entry {
pub stream: String,
pub writer: String,
pub seq: u64,
pub prev_hash: Option<[u8; 32]>,
pub content_hash: [u8; 32],
pub payload: Vec<u8>,
}
pub struct WriterLog;
impl WriterLog {
pub fn ensure_schema(db: &dyn Backend) -> Result<()> {
db.exec(
&format!(
"CREATE TABLE IF NOT EXISTS {LOG_TABLE} (\
stream TEXT NOT NULL, \
writer TEXT NOT NULL, \
seq INTEGER NOT NULL, \
prev_hash BLOB, \
content_hash BLOB NOT NULL, \
payload BLOB NOT NULL, \
PRIMARY KEY (stream, writer, seq))"
),
&[],
)?;
Ok(())
}
pub fn head(db: &dyn Backend, stream: &str, writer: &str) -> Result<Option<Entry>> {
let rows = db.query(
&format!(
"SELECT seq, prev_hash, content_hash, payload FROM {LOG_TABLE} \
WHERE stream = ? AND writer = ? ORDER BY seq DESC LIMIT 1"
),
&[Value::Text(stream.into()), Value::Text(writer.into())],
)?;
match rows.first() {
None => Ok(None),
Some(row) => Ok(Some(row_to_entry(stream, writer, row)?)),
}
}
pub fn append(db: &dyn Backend, stream: &str, writer: &str, payload: &[u8]) -> Result<Entry> {
let (seq, prev_hash) = match Self::head(db, stream, writer)? {
Some(h) => (h.seq + 1, Some(h.content_hash)),
None => (1, None),
};
let content_hash = chain_hash(prev_hash.as_ref(), payload);
db.exec(
&format!(
"INSERT INTO {LOG_TABLE} \
(stream, writer, seq, prev_hash, content_hash, payload) \
VALUES (?, ?, ?, ?, ?, ?)"
),
&[
Value::Text(stream.into()),
Value::Text(writer.into()),
Value::Int(seq as i64),
match prev_hash {
Some(h) => Value::Blob(h.to_vec()),
None => Value::Null,
},
Value::Blob(content_hash.to_vec()),
Value::Blob(payload.to_vec()),
],
)?;
Ok(Entry {
stream: stream.into(),
writer: writer.into(),
seq,
prev_hash,
content_hash,
payload: payload.to_vec(),
})
}
pub fn entries(db: &dyn Backend, stream: &str, writer: &str) -> Result<Vec<Entry>> {
let rows = db.query(
&format!(
"SELECT seq, prev_hash, content_hash, payload FROM {LOG_TABLE} \
WHERE stream = ? AND writer = ? ORDER BY seq ASC"
),
&[Value::Text(stream.into()), Value::Text(writer.into())],
)?;
rows.iter()
.map(|r| row_to_entry(stream, writer, r))
.collect()
}
pub fn verify(db: &dyn Backend, stream: &str, writer: &str) -> Result<()> {
let mut prev: Option<[u8; 32]> = None;
for (expected_seq, entry) in (1u64..).zip(Self::entries(db, stream, writer)?) {
if entry.seq != expected_seq {
return Err(StoreError::ChainBroken {
stream: stream.into(),
writer: writer.into(),
seq: entry.seq,
detail: format!("expected seq {expected_seq}, found {}", entry.seq),
});
}
if entry.prev_hash != prev {
return Err(StoreError::ChainBroken {
stream: stream.into(),
writer: writer.into(),
seq: entry.seq,
detail: "prev_hash does not link to the prior entry".into(),
});
}
let recomputed = chain_hash(prev.as_ref(), &entry.payload);
if recomputed != entry.content_hash {
return Err(StoreError::ChainBroken {
stream: stream.into(),
writer: writer.into(),
seq: entry.seq,
detail: "content_hash does not match payload (tampered)".into(),
});
}
prev = Some(entry.content_hash);
}
Ok(())
}
}
fn chain_hash(prev: Option<&[u8; 32]>, payload: &[u8]) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
if let Some(p) = prev {
hasher.update(p);
}
hasher.update(payload);
*hasher.finalize().as_bytes()
}
fn row_to_entry(stream: &str, writer: &str, row: &[Value]) -> Result<Entry> {
let seq = as_u64(&row[0])?;
let prev_hash = match &row[1] {
Value::Null => None,
other => Some(blob32(other)?),
};
let content_hash = blob32(&row[2])?;
let payload = match &row[3] {
Value::Blob(b) => b.clone(),
other => {
return Err(StoreError::MalformedRow(format!(
"payload must be a blob, got {other:?}"
)))
}
};
Ok(Entry {
stream: stream.into(),
writer: writer.into(),
seq,
prev_hash,
content_hash,
payload,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::SqliteBackend;
fn db() -> SqliteBackend {
let db = SqliteBackend::in_memory().unwrap();
WriterLog::ensure_schema(&db).unwrap();
db
}
#[test]
fn appends_chain_and_verify() {
let db = db();
let e1 = WriterLog::append(&db, "conv:x", "alice", b"hello").unwrap();
let e2 = WriterLog::append(&db, "conv:x", "alice", b"world").unwrap();
assert_eq!(e1.seq, 1);
assert_eq!(e1.prev_hash, None);
assert_eq!(e2.seq, 2);
assert_eq!(e2.prev_hash, Some(e1.content_hash));
assert_ne!(e1.content_hash, e2.content_hash);
WriterLog::verify(&db, "conv:x", "alice").unwrap();
}
#[test]
fn writers_have_independent_sequences() {
let db = db();
WriterLog::append(&db, "conv:x", "alice", b"a").unwrap();
let bob1 = WriterLog::append(&db, "conv:x", "bob", b"b").unwrap();
assert_eq!(bob1.seq, 1, "each writer's seq starts at 1");
WriterLog::verify(&db, "conv:x", "alice").unwrap();
WriterLog::verify(&db, "conv:x", "bob").unwrap();
}
#[test]
fn verify_detects_tampering() {
let db = db();
WriterLog::append(&db, "conv:x", "alice", b"original").unwrap();
WriterLog::append(&db, "conv:x", "alice", b"second").unwrap();
db.exec(
"UPDATE _agent_store_log SET payload = ? WHERE stream = ? AND writer = ? AND seq = 1",
&[
Value::Blob(b"TAMPERED".to_vec()),
Value::Text("conv:x".into()),
Value::Text("alice".into()),
],
)
.unwrap();
let err = WriterLog::verify(&db, "conv:x", "alice").unwrap_err();
assert!(matches!(err, StoreError::ChainBroken { seq: 1, .. }));
}
#[test]
fn verify_empty_log_is_ok() {
let db = db();
WriterLog::verify(&db, "conv:none", "nobody").unwrap();
}
}