use crate::{CliError, CliResult};
use rusqlite::Connection;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct FeedEvent {
pub event_key: String,
pub source_table: String,
pub source_id: String,
pub session_id: String,
pub timestamp: String,
pub kind: String,
pub subtype: Option<String>,
pub mid: Option<String>,
pub actor_agent_id: Option<String>,
pub target_agent_id: Option<String>,
pub source_address: Option<String>,
pub target_address: Option<String>,
pub transport: Option<String>,
pub workspace_id: Option<String>,
pub mode: Option<String>,
pub r#ref: Option<String>,
pub re: Option<String>,
pub summary: Option<String>,
pub body: Option<String>,
pub redaction_policy: Option<String>,
pub proof_audit_id: Option<String>,
pub delivery_status: Option<String>,
pub verified_by: Option<String>,
pub payload_hash: Option<String>,
pub artifact_path: Option<String>,
pub severity: Option<String>,
pub is_derived: bool,
}
pub fn feed_for_session(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut events = message_events(connection, session_id)?;
events.extend(status_events(connection, session_id)?);
events.sort_by(|a, b| {
b.timestamp
.cmp(&a.timestamp)
.then_with(|| b.source_id.cmp(&a.source_id))
});
Ok(events)
}
fn message_events(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut statement = connection
.prepare(
"SELECT m.message_id, m.session_id, m.timestamp, m.message_type, m.mid,
m.source_agent_id, m.target_agent_id, m.mode, m.ref,
m.payload_redaction_policy,
CASE WHEN m.payload_redaction_policy = 'hash-only'
THEN NULL ELSE m.payload_excerpt END,
m.latest_delivery_status, m.latest_verified_by, m.payload_hash,
m.latest_audit_id,
COALESCE(a.transport, m.transport),
a.workspace_id, a.source_address, a.target_address, a.re
FROM messages AS m
LEFT JOIN audit_records AS a ON a.audit_id = m.latest_audit_id
WHERE m.session_id = ?1
ORDER BY m.timestamp, m.mid",
)
.map_err(|error| CliError::failure(format!("failed to query feed messages: {error}")))?;
let rows = statement
.query_map([session_id], |row| {
let message_id: String = row.get(0)?;
Ok(FeedEvent {
event_key: format!("message:{message_id}"),
source_table: "messages".to_string(),
source_id: message_id,
session_id: row.get(1)?,
timestamp: row.get(2)?,
kind: "message".to_string(),
subtype: row.get(3)?,
mid: row.get(4)?,
actor_agent_id: row.get(5)?,
target_agent_id: row.get(6)?,
source_address: row.get(17)?,
target_address: row.get(18)?,
transport: row.get(15)?,
workspace_id: row.get(16)?,
mode: row.get(7)?,
r#ref: row.get(8)?,
re: row.get(19)?,
summary: None,
body: row.get(10)?,
redaction_policy: row.get(9)?,
proof_audit_id: row.get(14)?,
delivery_status: row.get(11)?,
verified_by: row.get(12)?,
payload_hash: row.get(13)?,
artifact_path: None,
severity: None,
is_derived: false,
})
})
.map_err(|error| CliError::failure(format!("failed to read feed messages: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read feed messages: {error}")))?;
Ok(rows)
}
fn status_events(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut statement = connection
.prepare(
"SELECT status_event_id, session_id, timestamp, workflow_status, mode, next_action
FROM status_events WHERE session_id = ?1 ORDER BY timestamp, status_event_id",
)
.map_err(|error| CliError::failure(format!("failed to query feed status: {error}")))?;
let rows = statement
.query_map([session_id], |row| {
let id: i64 = row.get(0)?;
Ok(FeedEvent {
event_key: format!("status:{id}"),
source_table: "status_events".to_string(),
source_id: id.to_string(),
session_id: row.get(1)?,
timestamp: row.get(2)?,
kind: "status".to_string(),
subtype: row.get(3)?,
mid: None,
actor_agent_id: None,
target_agent_id: None,
source_address: None,
target_address: None,
transport: None,
workspace_id: None,
mode: row.get(4)?,
r#ref: None,
re: None,
summary: row.get(5)?,
body: None,
redaction_policy: None,
proof_audit_id: None,
delivery_status: None,
verified_by: None,
payload_hash: None,
artifact_path: None,
severity: None,
is_derived: true,
})
})
.map_err(|error| CliError::failure(format!("failed to read feed status: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read feed status: {error}")))?;
Ok(rows)
}
pub fn permalink(event: &FeedEvent) -> Option<String> {
let workspace = event.workspace_id.as_deref()?;
let audit_id = event.proof_audit_id.as_deref()?;
Some(format!("acp://{workspace}/{}#{audit_id}", event.session_id))
}
#[derive(Debug)]
pub struct ChainVerification {
pub ok: bool,
pub broken_at: Option<String>,
pub verified_count: usize,
}
pub fn verify_chain(connection: &Connection, session_id: &str) -> CliResult<ChainVerification> {
let mut statement = connection
.prepare(
"SELECT audit_id, previous_audit_id FROM audit_records
WHERE session_id = ?1 ORDER BY timestamp, audit_id",
)
.map_err(|error| CliError::failure(format!("failed to query chain: {error}")))?;
let rows: Vec<(String, Option<String>)> = statement
.query_map([session_id], |row| Ok((row.get(0)?, row.get(1)?)))
.map_err(|error| CliError::failure(format!("failed to read chain: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read chain: {error}")))?;
let count = rows.len();
let broken = |audit_id: &str| ChainVerification {
ok: false,
broken_at: Some(audit_id.to_string()),
verified_count: count,
};
if rows.is_empty() {
return Ok(ChainVerification {
ok: true,
broken_at: None,
verified_count: 0,
});
}
let ids: std::collections::HashSet<&str> = rows.iter().map(|(id, _)| id.as_str()).collect();
let mut head: Option<&str> = None;
let mut child_of: std::collections::HashMap<&str, &str> = std::collections::HashMap::new();
for (audit_id, previous) in &rows {
match previous.as_deref() {
None => {
if head.is_some() {
return Ok(broken(audit_id)); }
head = Some(audit_id);
}
Some(previous) => {
if !ids.contains(previous) {
return Ok(broken(audit_id)); }
if child_of.insert(previous, audit_id).is_some() {
return Ok(broken(audit_id)); }
}
}
}
let Some(head) = head else {
return Ok(broken(&rows[0].0)); };
let mut visited: std::collections::HashSet<&str> = std::collections::HashSet::new();
let mut current = Some(head);
while let Some(node) = current {
if !visited.insert(node) {
break; }
current = child_of.get(node).copied();
}
if visited.len() != count {
let orphan = rows
.iter()
.map(|(id, _)| id.as_str())
.find(|id| !visited.contains(id))
.unwrap_or(rows[0].0.as_str());
return Ok(broken(orphan)); }
Ok(ChainVerification {
ok: true,
broken_at: None,
verified_count: count,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::open_database;
fn seed(dir: &std::path::Path) -> Connection {
let db = dir.join("zynk.db");
open_database(&db).unwrap();
let conn = Connection::open(&db).unwrap();
conn.execute_batch(
"INSERT INTO projects(project_id,name,root_path,created_at,updated_at)
VALUES('p','p','/p','2026-05-30T00:00:00Z','2026-05-30T00:00:00Z');
INSERT INTO sessions(session_id,project_id,title,phase,mode,workflow_status,created_at,updated_at)
VALUES('s','p','s','x','review','working','2026-05-30T00:00:00Z','2026-05-30T00:00:00Z');
INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,re,timestamp)
VALUES('a1',NULL,'s','w1-2','w1-1','herdr','w1','m1','ack','agent','sha256:x','full',5,
'sent','codex','helper-tool','re-parent','2026-05-30T00:00:02Z');
INSERT INTO messages(message_id,session_id,mid,message_type,transport,payload_redaction_policy,
payload_excerpt,payload_hash,latest_delivery_status,latest_verified_by,latest_audit_id,timestamp)
VALUES('s:m1','s','m1','ack','herdr','full','the body','sha256:x','sent','helper-tool','a1',
'2026-05-30T00:00:02Z');
INSERT INTO status_events(session_id,timestamp,phase,mode,workflow_status,completed_since_last_update,
in_progress,next_action,blockers,asks_for_zevs,risk_or_residual_uncertainty,expected_wait)
VALUES('s','2026-05-30T00:00:01Z','x','review','working','c','p','n','none','none','none','unk');",
)
.unwrap();
conn
}
#[test]
fn feed_interleaves_newest_first_with_proof_overlay_and_body() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
let feed = feed_for_session(&conn, "s").unwrap();
assert_eq!(feed.len(), 2, "one status + one message");
assert_eq!(feed[0].kind, "message");
assert_eq!(feed[1].kind, "status");
let msg = &feed[0];
assert_eq!(msg.body.as_deref(), Some("the body"), "full body carried");
assert_eq!(msg.mid.as_deref(), Some("m1"));
assert_eq!(msg.proof_audit_id.as_deref(), Some("a1"));
assert_eq!(msg.delivery_status.as_deref(), Some("sent"));
assert_eq!(msg.verified_by.as_deref(), Some("helper-tool"));
assert_eq!(msg.transport.as_deref(), Some("herdr"));
assert_eq!(msg.workspace_id.as_deref(), Some("w1"));
assert_eq!(msg.source_address.as_deref(), Some("w1-2"));
assert_eq!(msg.target_address.as_deref(), Some("w1-1"));
}
#[test]
fn hash_only_message_has_no_body() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
conn.execute(
"UPDATE messages SET payload_redaction_policy='hash-only', payload_excerpt=NULL WHERE mid='m1'",
[],
)
.unwrap();
let feed = feed_for_session(&conn, "s").unwrap();
let msg = feed.iter().find(|e| e.kind == "message").unwrap();
assert_eq!(msg.body, None, "hash-only carries no corpus body (ADR 029)");
assert_eq!(msg.redaction_policy.as_deref(), Some("hash-only"));
}
#[test]
fn message_re_comes_from_proof_overlay() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
let feed = feed_for_session(&conn, "s").unwrap();
let msg = feed.iter().find(|e| e.kind == "message").unwrap();
assert_eq!(
msg.re.as_deref(),
Some("re-parent"),
"re comes from the latest audit_records.re (messages has no re column)"
);
}
#[test]
fn permalink_uses_workspace_session_audit() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
let feed = feed_for_session(&conn, "s").unwrap();
let msg = feed.iter().find(|e| e.kind == "message").unwrap();
assert_eq!(
permalink(msg).as_deref(),
Some("acp://w1/s#a1"),
"permalink = acp://workspace/session#audit_id"
);
}
#[test]
fn verify_chain_intact_then_broken() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
conn.execute(
"INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,timestamp)
VALUES('a2','a1','s','w1-1','w1-2','herdr','w1','m2','ack','agent','sha256:y','full',5,
'observed','claude','helper-tool','2026-05-30T00:00:03Z')",
[],
)
.unwrap();
let intact = verify_chain(&conn, "s").unwrap();
assert!(intact.ok, "a1<-a2 is an intact single-line chain");
assert_eq!(intact.verified_count, 2);
conn.execute(
"INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,timestamp)
VALUES('a3',NULL,'s','w1-2','w1-1','herdr','w1','m3','ack','agent','sha256:z','full',5,
'sent','codex','helper-tool','2026-05-30T00:00:04Z')",
[],
)
.unwrap();
let broken = verify_chain(&conn, "s").unwrap();
assert!(!broken.ok, "two heads is a malformed chain");
assert_eq!(broken.broken_at.as_deref(), Some("a3"));
}
#[test]
fn verify_chain_rejects_cross_session_parent() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path()); conn.execute_batch(
"INSERT INTO sessions(session_id,project_id,title,phase,mode,workflow_status,created_at,updated_at)
VALUES('s2','p','s2','x','review','working','2026-05-30T00:00:00Z','2026-05-30T00:00:00Z');
INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,timestamp)
VALUES('b1',NULL,'s2','w1-2','w1-1','herdr','w1','mb1','ack','agent','sha256:p','full',5,
'sent','codex','helper-tool','2026-05-30T00:00:05Z');
INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,timestamp)
VALUES('b2','a1','s2','w1-2','w1-1','herdr','w1','mb2','ack','agent','sha256:q','full',5,
'observed','codex','helper-tool','2026-05-30T00:00:06Z');",
)
.unwrap();
let v = verify_chain(&conn, "s2").unwrap();
assert!(
!v.ok,
"b2.previous=a1 lives in session s, not s2 — s2 is not a self-contained chain"
);
assert_eq!(v.broken_at.as_deref(), Some("b2"));
}
}