use crate::{CliError, CliResult};
use rusqlite::Connection;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct FeedEvent {
pub event_key: String,
pub source_table: String,
pub source_id: String,
pub session_id: String,
pub timestamp: String,
pub kind: String,
pub subtype: Option<String>,
pub mid: Option<String>,
pub actor_agent_id: Option<String>,
pub target_agent_id: Option<String>,
pub source_address: Option<String>,
pub target_address: Option<String>,
pub transport: Option<String>,
pub workspace_id: Option<String>,
pub mode: Option<String>,
pub r#ref: Option<String>,
pub re: Option<String>,
pub summary: Option<String>,
pub body: Option<String>,
pub redaction_policy: Option<String>,
pub proof_audit_id: Option<String>,
pub delivery_status: Option<String>,
pub verified_by: Option<String>,
pub payload_hash: Option<String>,
pub artifact_path: Option<String>,
pub severity: Option<String>,
pub is_derived: bool,
pub work: Option<crate::work_event::WorkEventPayload>,
pub decision: Option<DecisionView>,
pub revealable: bool,
}
#[derive(Debug, Clone)]
pub struct DecisionView {
pub audit_id: String,
pub decision_type: String,
pub target_work_event_id: Option<i64>,
pub verdict: Option<String>,
pub resolution: Option<String>,
pub mode_to: Option<String>,
pub target_agent: Option<String>,
pub reason: Option<String>,
pub note: Option<String>,
pub actor_agent_id: Option<String>,
pub timestamp: String,
pub notification_status: String,
pub revealable: bool,
}
impl FeedEvent {
pub(crate) fn empty() -> Self {
FeedEvent {
event_key: String::new(),
source_table: String::new(),
source_id: String::new(),
session_id: String::new(),
timestamp: String::new(),
kind: String::new(),
subtype: None,
mid: None,
actor_agent_id: None,
target_agent_id: None,
source_address: None,
target_address: None,
transport: None,
workspace_id: None,
mode: None,
r#ref: None,
re: None,
summary: None,
body: None,
redaction_policy: None,
proof_audit_id: None,
delivery_status: None,
verified_by: None,
payload_hash: None,
artifact_path: None,
severity: None,
is_derived: false,
work: None,
decision: None,
revealable: false,
}
}
}
pub fn feed_for_session(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut events = message_events(connection, session_id)?;
events.extend(status_events(connection, session_id)?);
events.extend(work_events(connection, session_id)?);
events.extend(decision_feed_items(connection, session_id)?);
events.sort_by(|a, b| {
b.timestamp
.cmp(&a.timestamp)
.then_with(|| b.source_id.cmp(&a.source_id))
});
Ok(events)
}
pub fn feed_oldest_first(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut feed = feed_for_session(connection, session_id)?;
feed.reverse();
Ok(feed)
}
fn message_events(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut statement = connection
.prepare(
"SELECT m.message_id, m.session_id, m.timestamp, m.message_type, m.mid,
m.source_agent_id, m.target_agent_id, m.mode, m.ref,
m.payload_redaction_policy,
CASE WHEN m.payload_redaction_policy = 'hash-only'
THEN NULL ELSE m.payload_excerpt END,
m.latest_delivery_status, m.latest_verified_by, m.payload_hash,
m.latest_audit_id,
COALESCE(a.transport, m.transport),
a.workspace_id, a.source_address, a.target_address, a.re,
(cv.audit_id IS NOT NULL AND a.payload_redaction_policy <> 'full')
FROM messages AS m
LEFT JOIN audit_records AS a ON a.audit_id = m.latest_audit_id
LEFT JOIN custody_vault AS cv ON cv.audit_id = m.latest_audit_id
WHERE m.session_id = ?1
ORDER BY m.timestamp, m.mid",
)
.map_err(|error| CliError::failure(format!("failed to query feed messages: {error}")))?;
let rows = statement
.query_map([session_id], |row| {
let message_id: String = row.get(0)?;
Ok(FeedEvent {
event_key: format!("message:{message_id}"),
source_table: "messages".to_string(),
source_id: message_id,
session_id: row.get(1)?,
timestamp: row.get(2)?,
kind: "message".to_string(),
subtype: row.get(3)?,
mid: row.get(4)?,
actor_agent_id: row.get(5)?,
target_agent_id: row.get(6)?,
source_address: row.get(17)?,
target_address: row.get(18)?,
transport: row.get(15)?,
workspace_id: row.get(16)?,
mode: row.get(7)?,
r#ref: row.get(8)?,
re: row.get(19)?,
summary: None,
body: row.get(10)?,
redaction_policy: row.get(9)?,
proof_audit_id: row.get(14)?,
delivery_status: row.get(11)?,
verified_by: row.get(12)?,
payload_hash: row.get(13)?,
artifact_path: None,
severity: None,
is_derived: false,
work: None,
decision: None,
revealable: row.get(20)?,
})
})
.map_err(|error| CliError::failure(format!("failed to read feed messages: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read feed messages: {error}")))?;
Ok(rows)
}
fn status_events(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut statement = connection
.prepare(
"SELECT status_event_id, session_id, timestamp, workflow_status, mode, next_action
FROM status_events WHERE session_id = ?1 ORDER BY timestamp, status_event_id",
)
.map_err(|error| CliError::failure(format!("failed to query feed status: {error}")))?;
let rows = statement
.query_map([session_id], |row| {
let id: i64 = row.get(0)?;
Ok(FeedEvent {
event_key: format!("status:{id}"),
source_table: "status_events".to_string(),
source_id: id.to_string(),
session_id: row.get(1)?,
timestamp: row.get(2)?,
kind: "status".to_string(),
subtype: row.get(3)?,
mid: None,
actor_agent_id: None,
target_agent_id: None,
source_address: None,
target_address: None,
transport: None,
workspace_id: None,
mode: row.get(4)?,
r#ref: None,
re: None,
summary: row.get(5)?,
body: None,
redaction_policy: None,
proof_audit_id: None,
delivery_status: None,
verified_by: None,
payload_hash: None,
artifact_path: None,
severity: None,
is_derived: true,
work: None,
decision: None,
revealable: false,
})
})
.map_err(|error| CliError::failure(format!("failed to read feed status: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read feed status: {error}")))?;
Ok(rows)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageAgentRow {
pub agent: String,
pub tokens: u64,
pub cost_cents: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageAggregate {
pub total_tokens: u64,
pub total_cost_cents: Option<u64>,
pub per_agent: Vec<UsageAgentRow>,
}
pub(crate) fn usage_aggregate(
connection: &Connection,
session_id: &str,
) -> CliResult<UsageAggregate> {
let mut statement = connection
.prepare(
"SELECT work_event_id, payload FROM work_events WHERE session_id = ?1 AND kind = 'usage'",
)
.map_err(|e| CliError::failure(format!("failed to prepare usage_aggregate: {e}")))?;
let rows = statement
.query_map([session_id], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
})
.map_err(|e| CliError::failure(format!("failed to query usage_aggregate: {e}")))?;
let mut total_tokens: u64 = 0;
let mut total_cost_cents: Option<u64> = None;
let mut by_agent: std::collections::BTreeMap<String, (u64, Option<u64>)> =
std::collections::BTreeMap::new();
for row in rows {
let (id, payload) =
row.map_err(|e| CliError::failure(format!("failed to read usage row: {e}")))?;
match crate::work_event::WorkEventPayload::from_storage(&payload)? {
crate::work_event::WorkEventPayload::Usage {
agent,
tokens,
cost_cents,
} => {
total_tokens = total_tokens.saturating_add(tokens);
if let Some(c) = cost_cents {
total_cost_cents = Some(total_cost_cents.unwrap_or(0).saturating_add(c));
}
let entry = by_agent.entry(agent).or_insert((0, None));
entry.0 = entry.0.saturating_add(tokens);
if let Some(c) = cost_cents {
entry.1 = Some(entry.1.unwrap_or(0).saturating_add(c));
}
}
other => {
return Err(CliError::failure(format!(
"usage_aggregate: work_event {id} kind='usage' has non-Usage payload (payload kind {:?})",
other.kind()
)));
}
}
}
let per_agent = by_agent
.into_iter()
.map(|(agent, (tokens, cost_cents))| UsageAgentRow {
agent,
tokens,
cost_cents,
})
.collect();
Ok(UsageAggregate {
total_tokens,
total_cost_cents,
per_agent,
})
}
fn work_events(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut statement = connection
.prepare(
"SELECT work_event_id, actor_agent_id, kind, timestamp, payload
FROM work_events WHERE session_id = ?1 ORDER BY timestamp, work_event_id",
)
.map_err(|error| CliError::failure(format!("failed to query work_events: {error}")))?;
let raw = statement
.query_map([session_id], |row| {
let id: i64 = row.get(0)?;
let actor: String = row.get(1)?;
let kind: String = row.get(2)?;
let timestamp: String = row.get(3)?;
let payload: String = row.get(4)?;
Ok((id, actor, kind, timestamp, payload))
})
.map_err(|error| CliError::failure(format!("failed to read work_events: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read work_events: {error}")))?;
let mut events = Vec::with_capacity(raw.len());
for (id, actor, kind, timestamp, payload) in raw {
let work = crate::work_event::WorkEventPayload::from_storage(&payload)?;
if kind != work.kind() {
return Err(CliError::failure(format!(
"work_event {id} kind/payload mismatch: row kind {kind:?} != payload kind {:?}",
work.kind()
)));
}
events.push(FeedEvent {
event_key: format!("work:{id}"),
source_table: "work_events".to_string(),
source_id: id.to_string(),
session_id: session_id.to_string(),
timestamp,
kind,
actor_agent_id: Some(actor),
is_derived: true,
work: Some(work),
..FeedEvent::empty()
});
}
Ok(events)
}
const DECISION_SELECT: &str = "SELECT od.audit_id, od.decision_type, od.target_work_event_id,
od.verdict, od.resolution, od.mode_to, od.target_agent, od.reason, od.note,
a.source_agent_id, a.timestamp, od.notification_status,
(cv.audit_id IS NOT NULL AND a.payload_redaction_policy <> 'full')
FROM operator_decisions AS od
JOIN audit_records AS a ON a.audit_id = od.audit_id
LEFT JOIN custody_vault AS cv ON cv.audit_id = od.audit_id
WHERE od.session_id = ?1";
fn decision_view_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<DecisionView> {
Ok(DecisionView {
audit_id: row.get(0)?,
decision_type: row.get(1)?,
target_work_event_id: row.get(2)?,
verdict: row.get(3)?,
resolution: row.get(4)?,
mode_to: row.get(5)?,
target_agent: row.get(6)?,
reason: row.get(7)?,
note: row.get(8)?,
actor_agent_id: row.get(9)?,
timestamp: row.get(10)?,
notification_status: row.get(11)?,
revealable: row.get(12)?,
})
}
pub fn decision_overlay_for_session(
connection: &Connection,
session_id: &str,
) -> CliResult<std::collections::BTreeMap<i64, DecisionView>> {
let mut statement = connection
.prepare(&format!(
"{DECISION_SELECT} AND od.target_work_event_id IS NOT NULL ORDER BY a.timestamp, od.audit_id"
))
.map_err(|error| {
CliError::failure(format!("failed to query decision overlay: {error}"))
})?;
let views = statement
.query_map([session_id], decision_view_from_row)
.map_err(|error| CliError::failure(format!("failed to read decision overlay: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read decision overlay: {error}")))?;
let mut overlay = std::collections::BTreeMap::new();
for view in views {
if let Some(id) = view.target_work_event_id {
overlay.insert(id, view);
}
}
Ok(overlay)
}
fn decision_feed_items(connection: &Connection, session_id: &str) -> CliResult<Vec<FeedEvent>> {
let mut statement = connection
.prepare(&format!(
"{DECISION_SELECT} AND od.target_work_event_id IS NULL ORDER BY a.timestamp, od.audit_id"
))
.map_err(|error| {
CliError::failure(format!("failed to query decision feed items: {error}"))
})?;
let views = statement
.query_map([session_id], decision_view_from_row)
.map_err(|error| CliError::failure(format!("failed to read decision feed items: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| {
CliError::failure(format!("failed to read decision feed items: {error}"))
})?;
let events = views
.into_iter()
.map(|view| FeedEvent {
event_key: format!("decision:{}", view.audit_id),
source_table: "operator_decisions".to_string(),
source_id: view.audit_id.clone(),
session_id: session_id.to_string(),
timestamp: view.timestamp.clone(),
kind: view.decision_type.clone(),
actor_agent_id: view.actor_agent_id.clone(),
is_derived: true,
revealable: view.revealable,
decision: Some(view),
..FeedEvent::empty()
})
.collect();
Ok(events)
}
pub fn permalink(event: &FeedEvent) -> Option<String> {
let workspace = event.workspace_id.as_deref()?;
let audit_id = event.proof_audit_id.as_deref()?;
Some(format!("acp://{workspace}/{}#{audit_id}", event.session_id))
}
#[derive(Debug)]
pub struct ChainVerification {
pub ok: bool,
pub broken_at: Option<String>,
pub verified_count: usize,
}
pub fn verify_chain(connection: &Connection, session_id: &str) -> CliResult<ChainVerification> {
let mut statement = connection
.prepare(
"SELECT audit_id, previous_audit_id FROM audit_records
WHERE session_id = ?1 ORDER BY timestamp, audit_id",
)
.map_err(|error| CliError::failure(format!("failed to query chain: {error}")))?;
let rows: Vec<(String, Option<String>)> = statement
.query_map([session_id], |row| Ok((row.get(0)?, row.get(1)?)))
.map_err(|error| CliError::failure(format!("failed to read chain: {error}")))?
.collect::<Result<Vec<_>, _>>()
.map_err(|error| CliError::failure(format!("failed to read chain: {error}")))?;
let count = rows.len();
let broken = |audit_id: &str| ChainVerification {
ok: false,
broken_at: Some(audit_id.to_string()),
verified_count: count,
};
if rows.is_empty() {
return Ok(ChainVerification {
ok: true,
broken_at: None,
verified_count: 0,
});
}
let ids: std::collections::HashSet<&str> = rows.iter().map(|(id, _)| id.as_str()).collect();
let mut head: Option<&str> = None;
let mut child_of: std::collections::HashMap<&str, &str> = std::collections::HashMap::new();
for (audit_id, previous) in &rows {
match previous.as_deref() {
None => {
if head.is_some() {
return Ok(broken(audit_id)); }
head = Some(audit_id);
}
Some(previous) => {
if !ids.contains(previous) {
return Ok(broken(audit_id)); }
if child_of.insert(previous, audit_id).is_some() {
return Ok(broken(audit_id)); }
}
}
}
let Some(head) = head else {
return Ok(broken(&rows[0].0)); };
let mut visited: std::collections::HashSet<&str> = std::collections::HashSet::new();
let mut current = Some(head);
while let Some(node) = current {
if !visited.insert(node) {
break; }
current = child_of.get(node).copied();
}
if visited.len() != count {
let orphan = rows
.iter()
.map(|(id, _)| id.as_str())
.find(|id| !visited.contains(id))
.unwrap_or(rows[0].0.as_str());
return Ok(broken(orphan)); }
Ok(ChainVerification {
ok: true,
broken_at: None,
verified_count: count,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::open_database;
fn seed(dir: &std::path::Path) -> Connection {
let db = dir.join("zynk.db");
open_database(&db).unwrap();
let conn = Connection::open(&db).unwrap();
conn.execute_batch(
"INSERT INTO projects(project_id,name,root_path,created_at,updated_at)
VALUES('p','p','/p','2026-05-30T00:00:00Z','2026-05-30T00:00:00Z');
INSERT INTO sessions(session_id,project_id,title,phase,mode,workflow_status,created_at,updated_at)
VALUES('s','p','s','x','review','working','2026-05-30T00:00:00Z','2026-05-30T00:00:00Z');
INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,re,timestamp)
VALUES('a1',NULL,'s','w1-2','w1-1','herdr','w1','m1','ack','agent','sha256:x','full',5,
'sent','codex','helper-tool','re-parent','2026-05-30T00:00:02Z');
INSERT INTO messages(message_id,session_id,mid,message_type,transport,payload_redaction_policy,
payload_excerpt,payload_hash,latest_delivery_status,latest_verified_by,latest_audit_id,timestamp)
VALUES('s:m1','s','m1','ack','herdr','full','the body','sha256:x','sent','helper-tool','a1',
'2026-05-30T00:00:02Z');
INSERT INTO status_events(session_id,timestamp,phase,mode,workflow_status,completed_since_last_update,
in_progress,next_action,blockers,asks_for_zevs,risk_or_residual_uncertainty,expected_wait)
VALUES('s','2026-05-30T00:00:01Z','x','review','working','c','p','n','none','none','none','unk');",
)
.unwrap();
conn
}
fn fixture_with_two_messages() -> Connection {
let dir = tempfile::tempdir().unwrap().keep();
let db = dir.join("zynk.db");
open_database(&db).unwrap();
let conn = Connection::open(&db).unwrap();
conn.execute_batch(
"INSERT INTO projects(project_id,name,root_path,created_at,updated_at)
VALUES('p','p','/p','2026-05-30T00:00:00Z','2026-05-30T00:00:00Z');
INSERT INTO sessions(session_id,project_id,title,phase,mode,workflow_status,created_at,updated_at)
VALUES('s1','p','s1','x','review','working','2026-05-30T00:00:00Z','2026-05-30T00:00:00Z');
INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,re,timestamp)
VALUES('a1',NULL,'s1','w1-2','w1-1','herdr','w1','m1','ack','agent','sha256:x','full',5,
'sent','codex','helper-tool','re-parent','2026-05-30T00:00:01Z');
INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,re,timestamp)
VALUES('a2','a1','s1','w1-2','w1-1','herdr','w1','m2','ack','agent','sha256:y','full',5,
'sent','codex','helper-tool','re-parent','2026-05-30T00:00:02Z');
INSERT INTO messages(message_id,session_id,mid,message_type,transport,payload_redaction_policy,
payload_excerpt,payload_hash,latest_delivery_status,latest_verified_by,latest_audit_id,timestamp)
VALUES('s1:m1','s1','m1','ack','herdr','full','first body','sha256:x','sent','helper-tool','a1',
'2026-05-30T00:00:01Z');
INSERT INTO messages(message_id,session_id,mid,message_type,transport,payload_redaction_policy,
payload_excerpt,payload_hash,latest_delivery_status,latest_verified_by,latest_audit_id,timestamp)
VALUES('s1:m2','s1','m2','ack','herdr','full','second body','sha256:y','sent','helper-tool','a2',
'2026-05-30T00:00:02Z');",
)
.unwrap();
conn
}
#[test]
fn feed_oldest_first_reverses_newest_first() {
let connection = fixture_with_two_messages(); let newest = feed_for_session(&connection, "s1").unwrap();
let oldest = feed_oldest_first(&connection, "s1").unwrap();
assert_eq!(oldest.len(), newest.len());
assert_eq!(
oldest.first().unwrap().timestamp,
newest.last().unwrap().timestamp
);
assert_eq!(
oldest.last().unwrap().timestamp,
newest.first().unwrap().timestamp
);
}
#[test]
fn feed_interleaves_newest_first_with_proof_overlay_and_body() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
let feed = feed_for_session(&conn, "s").unwrap();
assert_eq!(feed.len(), 2, "one status + one message");
assert_eq!(feed[0].kind, "message");
assert_eq!(feed[1].kind, "status");
let msg = &feed[0];
assert_eq!(msg.body.as_deref(), Some("the body"), "full body carried");
assert_eq!(msg.mid.as_deref(), Some("m1"));
assert_eq!(msg.proof_audit_id.as_deref(), Some("a1"));
assert_eq!(msg.delivery_status.as_deref(), Some("sent"));
assert_eq!(msg.verified_by.as_deref(), Some("helper-tool"));
assert_eq!(msg.transport.as_deref(), Some("herdr"));
assert_eq!(msg.workspace_id.as_deref(), Some("w1"));
assert_eq!(msg.source_address.as_deref(), Some("w1-2"));
assert_eq!(msg.target_address.as_deref(), Some("w1-1"));
}
#[test]
fn hash_only_message_has_no_body() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
conn.execute(
"UPDATE messages SET payload_redaction_policy='hash-only', payload_excerpt=NULL WHERE mid='m1'",
[],
)
.unwrap();
let feed = feed_for_session(&conn, "s").unwrap();
let msg = feed.iter().find(|e| e.kind == "message").unwrap();
assert_eq!(msg.body, None, "hash-only carries no corpus body (ADR 029)");
assert_eq!(msg.redaction_policy.as_deref(), Some("hash-only"));
}
#[test]
fn message_re_comes_from_proof_overlay() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
let feed = feed_for_session(&conn, "s").unwrap();
let msg = feed.iter().find(|e| e.kind == "message").unwrap();
assert_eq!(
msg.re.as_deref(),
Some("re-parent"),
"re comes from the latest audit_records.re (messages has no re column)"
);
}
#[test]
fn permalink_uses_workspace_session_audit() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
let feed = feed_for_session(&conn, "s").unwrap();
let msg = feed.iter().find(|e| e.kind == "message").unwrap();
assert_eq!(
permalink(msg).as_deref(),
Some("acp://w1/s#a1"),
"permalink = acp://workspace/session#audit_id"
);
}
#[test]
fn verify_chain_intact_then_broken() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path());
conn.execute(
"INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,timestamp)
VALUES('a2','a1','s','w1-1','w1-2','herdr','w1','m2','ack','agent','sha256:y','full',5,
'observed','claude','helper-tool','2026-05-30T00:00:03Z')",
[],
)
.unwrap();
let intact = verify_chain(&conn, "s").unwrap();
assert!(intact.ok, "a1<-a2 is an intact single-line chain");
assert_eq!(intact.verified_count, 2);
conn.execute(
"INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,timestamp)
VALUES('a3',NULL,'s','w1-2','w1-1','herdr','w1','m3','ack','agent','sha256:z','full',5,
'sent','codex','helper-tool','2026-05-30T00:00:04Z')",
[],
)
.unwrap();
let broken = verify_chain(&conn, "s").unwrap();
assert!(!broken.ok, "two heads is a malformed chain");
assert_eq!(broken.broken_at.as_deref(), Some("a3"));
}
#[test]
fn feed_includes_typed_work_events() {
let connection = fixture_with_two_messages(); crate::db::project_work_event(
&connection,
"s1",
"codex",
"2026-05-29T01:30:00Z",
0,
&crate::work_event::WorkEventPayload::Tool {
name: "run_tests".into(),
arg: "cargo test".into(),
output: "12 passing".into(),
ok: true,
},
)
.unwrap();
let feed = feed_for_session(&connection, "s1").unwrap();
let tool = feed
.iter()
.find(|e| e.kind == "tool")
.expect("tool event in feed");
match tool.work.as_ref().expect("typed work payload") {
crate::work_event::WorkEventPayload::Tool { name, ok, .. } => {
assert_eq!(name, "run_tests");
assert!(ok);
}
other => panic!("wrong variant: {other:?}"),
}
}
#[test]
fn feed_fails_loud_on_work_event_kind_mismatch() {
let connection = fixture_with_two_messages(); let diff = crate::work_event::WorkEventPayload::Diff {
file: "src/x.rs".into(),
added: 3,
removed: 1,
hunks: vec![crate::work_event::DiffHunk {
op: "add".into(),
text: "fn x() {}".into(),
}],
};
let stored = diff.to_storage().unwrap();
connection
.execute(
"INSERT INTO work_events
(session_id, actor_agent_id, kind, timestamp, payload, content_hash, created_at)
VALUES ('s1', 'codex', 'think', '2026-05-29T01:30:00Z', ?1, 'sha256:bad',
'2026-05-29T01:30:00Z')",
[&stored],
)
.unwrap();
let error = feed_for_session(&connection, "s1")
.expect_err("a kind/payload mismatch must fail loud, not render");
assert!(
error.message.contains("kind") && error.message.contains("mismatch"),
"error must name the kind mismatch: {}",
error.message
);
}
#[test]
fn verify_chain_rejects_cross_session_parent() {
let dir = tempfile::tempdir().unwrap();
let conn = seed(dir.path()); conn.execute_batch(
"INSERT INTO sessions(session_id,project_id,title,phase,mode,workflow_status,created_at,updated_at)
VALUES('s2','p','s2','x','review','working','2026-05-30T00:00:00Z','2026-05-30T00:00:00Z');
INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,timestamp)
VALUES('b1',NULL,'s2','w1-2','w1-1','herdr','w1','mb1','ack','agent','sha256:p','full',5,
'sent','codex','helper-tool','2026-05-30T00:00:05Z');
INSERT INTO audit_records(audit_id,previous_audit_id,session_id,source_address,target_address,
transport,workspace_id,mid,record_type,command_origin,payload_hash,payload_redaction_policy,
content_size,delivery_status,observed_by,verified_by,timestamp)
VALUES('b2','a1','s2','w1-2','w1-1','herdr','w1','mb2','ack','agent','sha256:q','full',5,
'observed','codex','helper-tool','2026-05-30T00:00:06Z');",
)
.unwrap();
let v = verify_chain(&conn, "s2").unwrap();
assert!(
!v.ok,
"b2.previous=a1 lives in session s, not s2 — s2 is not a self-contained chain"
);
assert_eq!(v.broken_at.as_deref(), Some("b2"));
}
fn seed_project_and_session(db: &std::path::Path, session_id: &str) {
let conn = open_database(db).unwrap();
let ts = "2026-05-31T00:00:00Z";
conn.execute(
"INSERT INTO projects (project_id, name, root_path, created_at, updated_at)
VALUES ('p', 'p', '/tmp', ?1, ?1)",
rusqlite::params![ts],
)
.unwrap();
conn.execute(
"INSERT INTO sessions (session_id, project_id, title, phase, mode, workflow_status, created_at, updated_at)
VALUES (?1, 'p', ?1, 'live', 'review', 'working', ?2, ?2)",
rusqlite::params![session_id, ts],
)
.unwrap();
}
fn decision_audit(
session_id: &str,
audit_id: &str,
record_type: &str,
timestamp: &str,
) -> crate::db::ImportedAuditRecord {
crate::db::ImportedAuditRecord {
audit_id: audit_id.to_string(),
previous_audit_id: None,
timestamp: timestamp.to_string(),
source_agent_id: Some("operator".to_string()),
source_address: "operator".to_string(),
target_agent_id: None,
target_address: "none".to_string(),
transport: "none".to_string(),
workspace_id: "w".to_string(),
session_id: session_id.to_string(),
mid: audit_id.to_string(),
record_type: record_type.to_string(),
command_origin: "operator".to_string(),
mode: None,
r#ref: None,
re: None,
payload_hash: "sha256:x".to_string(),
payload_redaction_policy: "hash-only".to_string(),
content_size: 0,
delivery_status: "observed".to_string(),
observed_by: "operator".to_string(),
verified_by: "operator".to_string(),
due: None,
payload_excerpt: None,
}
}
fn seed_session_gate_and_decision(db: &std::path::Path, session_id: &str, verdict: &str) {
seed_project_and_session(db, session_id);
let conn = open_database(db).unwrap();
crate::db::project_work_event(
&conn,
session_id,
"claude",
"2026-05-31T00:00:00Z",
0,
&crate::work_event::WorkEventPayload::Gate {
title: "merge?".into(),
summary: "approve the merge".into(),
proposer: "claude".into(),
actions: vec!["merge".into()],
},
)
.unwrap();
drop(conn);
let record = decision_audit(
session_id,
"dec-gate-1",
"gate-decision",
"2026-05-31T01:00:00Z",
);
let decision = crate::decision::Decision::Gate {
target_work_event_id: 1,
verdict: verdict.to_string(),
note: None,
};
crate::db::project_decision(db, std::path::Path::new("outputs"), &record, &decision)
.unwrap();
}
fn seed_session_and_mode_decision(db: &std::path::Path, session_id: &str, mode_to: &str) {
seed_project_and_session(db, session_id);
let record = decision_audit(
session_id,
"dec-mode-1",
"mode-switch",
"2026-05-31T01:00:00Z",
);
let decision = crate::decision::Decision::Mode {
mode_to: mode_to.to_string(),
};
crate::db::project_decision(db, std::path::Path::new("outputs"), &record, &decision)
.unwrap();
}
#[test]
fn gate_decision_overlays_work_event_by_target_id() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("zynk.db");
seed_session_gate_and_decision(&db, "s1", "approve");
let conn = Connection::open(&db).unwrap();
let overlay = decision_overlay_for_session(&conn, "s1").unwrap();
let d = overlay
.get(&1)
.expect("gate work_event 1 has a decision overlay");
assert_eq!(d.decision_type, "gate-decision");
assert_eq!(d.verdict.as_deref(), Some("approve"));
assert_eq!(d.target_work_event_id, Some(1));
assert_eq!(
d.actor_agent_id.as_deref(),
Some("operator"),
"the actor comes from the joined audit row (source_agent=operator)"
);
}
#[test]
fn mode_decision_is_a_standalone_feed_item() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("zynk.db");
seed_session_and_mode_decision(&db, "s1", "review");
let conn = Connection::open(&db).unwrap();
let feed = feed_oldest_first(&conn, "s1").unwrap();
assert!(
feed.iter().any(|e| e.source_table == "operator_decisions"
&& e.decision.as_ref().map(|d| d.decision_type.as_str()) == Some("mode-switch")),
"a mode-switch decision must appear as a standalone operator_decisions feed item"
);
}
fn seed_usage(
conn: &Connection,
session_id: &str,
ts: &str,
agent: &str,
tokens: u64,
cost: Option<u64>,
) {
crate::db::project_work_event(
conn,
session_id,
agent,
ts,
0,
&crate::work_event::WorkEventPayload::Usage {
agent: agent.to_string(),
tokens,
cost_cents: cost,
},
)
.unwrap();
}
#[test]
fn usage_aggregate_sums_tokens_and_present_cost() {
let conn = fixture_with_two_messages(); seed_usage(&conn, "s1", "2026-05-30T00:01:00Z", "claude", 100, Some(5));
seed_usage(&conn, "s1", "2026-05-30T00:02:00Z", "codex", 200, Some(8));
seed_usage(&conn, "s1", "2026-05-30T00:03:00Z", "claude", 50, None);
let agg = usage_aggregate(&conn, "s1").unwrap();
assert_eq!(agg.total_tokens, 350);
assert_eq!(agg.total_cost_cents, Some(13));
assert_eq!(
agg.per_agent,
vec![
UsageAgentRow {
agent: "claude".into(),
tokens: 150,
cost_cents: Some(5),
},
UsageAgentRow {
agent: "codex".into(),
tokens: 200,
cost_cents: Some(8),
},
],
"per_agent is grouped + summed and ordered by agent"
);
}
#[test]
fn usage_aggregate_no_cost_is_none() {
let conn = fixture_with_two_messages(); seed_usage(&conn, "s1", "2026-05-30T00:01:00Z", "claude", 100, None);
seed_usage(&conn, "s1", "2026-05-30T00:02:00Z", "codex", 200, None);
seed_usage(&conn, "s1", "2026-05-30T00:03:00Z", "claude", 50, None);
let agg = usage_aggregate(&conn, "s1").unwrap();
assert_eq!(agg.total_tokens, 350);
assert_eq!(
agg.total_cost_cents, None,
"no PRESENT cost_cents → total_cost_cents is None, never Some(0)"
);
}
#[test]
fn usage_aggregate_empty_session() {
let dir = tempfile::tempdir().unwrap();
let db = dir.path().join("zynk.db");
seed_project_and_session(&db, "s1");
let conn = Connection::open(&db).unwrap();
let agg = usage_aggregate(&conn, "s1").unwrap();
assert_eq!(agg.total_tokens, 0);
assert_eq!(agg.total_cost_cents, None);
assert_eq!(agg.per_agent, Vec::<UsageAgentRow>::new());
}
#[test]
fn usage_aggregate_fails_loud_on_kind_payload_mismatch() {
let connection = fixture_with_two_messages(); let non_usage = crate::work_event::WorkEventPayload::System {
text: "not a usage payload".into(),
};
let stored = non_usage.to_storage().unwrap();
connection
.execute(
"INSERT INTO work_events
(session_id, actor_agent_id, kind, timestamp, payload, content_hash, created_at)
VALUES ('s1', 'codex', 'usage', '2026-05-30T00:05:00Z', ?1, 'sha256:bad',
'2026-05-30T00:05:00Z')",
[&stored],
)
.unwrap();
let error = usage_aggregate(&connection, "s1").expect_err(
"a kind='usage' row with a non-Usage payload must fail loud, not be skipped",
);
assert!(
error.message.contains("usage_aggregate") && error.message.contains("non-Usage"),
"error must name the usage_aggregate kind/payload mismatch: {}",
error.message
);
}
}