use crate::db_dashboard::escape_html;
use crate::read_model::{DecisionView, FeedEvent};
use std::collections::BTreeMap;
use std::io::Write;
#[derive(Debug, PartialEq, Eq)]
pub enum FeedDelta {
Append(usize),
Reset,
}
pub fn feed_key(event: &FeedEvent) -> String {
format!(
"{}|{}|{}|{}|{}|{}",
event.event_key,
event.delivery_status.as_deref().unwrap_or(""),
event.verified_by.as_deref().unwrap_or(""),
event.proof_audit_id.as_deref().unwrap_or(""),
event.re.as_deref().unwrap_or(""),
event.body.as_deref().unwrap_or(""),
)
}
pub fn feed_diff_key(event: &FeedEvent, overlay_map: &BTreeMap<i64, DecisionView>) -> String {
let mut key = feed_key(event);
if event.source_table == "work_events" {
if let Ok(work_event_id) = event.source_id.parse::<i64>() {
if let Some(decision) = overlay_map.get(&work_event_id) {
key.push_str(&format!(
"|ov:{}|{}|{}|{}",
decision.audit_id,
decision.verdict.as_deref().unwrap_or(""),
decision.resolution.as_deref().unwrap_or(""),
decision.notification_status,
));
}
}
}
if let Some(decision) = &event.decision {
key.push_str(&format!("|dn:{}", decision.notification_status));
}
key
}
pub fn diff_feed(old: &[String], new: &[String]) -> FeedDelta {
if new.len() >= old.len() && new.starts_with(old) {
FeedDelta::Append(old.len())
} else {
FeedDelta::Reset
}
}
pub fn sse_event(buf: &mut Vec<u8>, event: &str, payload: &str) {
let _ = writeln!(buf, "event: {event}");
if payload.is_empty() {
let _ = writeln!(buf, "data:");
} else {
let normalized = payload.replace("\r\n", "\n").replace('\r', "\n");
for line in normalized.split('\n') {
let _ = writeln!(buf, "data: {line}");
}
}
let _ = writeln!(buf); }
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Provenance {
LiveHerdr,
DbFallback,
Unknown,
}
impl Provenance {
fn label(self) -> &'static str {
match self {
Provenance::LiveHerdr => "live-herdr",
Provenance::DbFallback => "db-fallback",
Provenance::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone)]
pub struct RosterEntry {
pub agent: String,
pub address: String,
pub status: String, pub provenance: Provenance,
}
pub fn roster_from_db(participants: Vec<(String, String, String)>) -> Vec<RosterEntry> {
participants
.into_iter()
.map(|(agent, address, db_status)| RosterEntry {
agent,
address,
status: db_status,
provenance: Provenance::DbFallback,
})
.collect()
}
pub fn render_roster_html(roster: &[RosterEntry]) -> String {
let mut html = String::from("<ul class=\"roster\">");
for e in roster {
html.push_str(&format!(
"<li class=\"roster-entry status-{}\"><span class=\"who\">{}</span><span class=\"addr\">{}</span><span class=\"prov\">{}</span></li>",
escape_html(&e.status),
escape_html(&e.agent),
escape_html(&e.address),
e.provenance.label(),
));
}
html.push_str("</ul>");
html
}
pub fn render_usage_html(agg: &crate::read_model::UsageAggregate) -> String {
fn cents(c: Option<u64>) -> String {
match c {
Some(v) => format!("${}.{:02}", v / 100, v % 100),
None => "—".to_string(),
}
}
let mut html = format!(
"<div class=\"usage\"><span class=\"usage-tokens\">{} tokens</span><span class=\"usage-cost\">{}</span>",
agg.total_tokens,
cents(agg.total_cost_cents)
);
for row in &agg.per_agent {
html.push_str(&format!(
"<span class=\"usage-agent\">{}: {} ({})</span>",
crate::db_dashboard::escape_html(&row.agent),
row.tokens,
cents(row.cost_cents)
));
}
html.push_str("</div>");
html
}
pub fn usage_diff_key(agg: &crate::read_model::UsageAggregate) -> String {
let mut key = format!("{}|{:?}", agg.total_tokens, agg.total_cost_cents);
for row in &agg.per_agent {
key.push_str(&format!(
"|{}:{}:{:?}",
row.agent, row.tokens, row.cost_cents
));
}
key
}
pub fn load_roster(
herdr_bin: &str,
db_participants: Vec<(String, String, String)>,
) -> Vec<RosterEntry> {
if std::env::var("HERDR_ENV").as_deref() == Ok("1") {
if let Some(live) = live_herdr_roster(herdr_bin, &db_participants) {
return live;
}
}
roster_from_db(db_participants)
}
fn live_herdr_roster(
herdr_bin: &str,
db_participants: &[(String, String, String)],
) -> Option<Vec<RosterEntry>> {
let output = std::process::Command::new(herdr_bin)
.args(["pane", "list"])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let text = String::from_utf8_lossy(&output.stdout);
let mut roster = Vec::new();
for (agent, address, db_status) in db_participants {
let status = herdr_status_for_pane(&text, address);
roster.push(RosterEntry {
agent: agent.clone(),
address: address.clone(),
status: status.clone().unwrap_or_else(|| db_status.clone()),
provenance: if status.is_some() {
Provenance::LiveHerdr
} else {
Provenance::Unknown
},
});
}
Some(roster)
}
fn herdr_status_for_pane(list_text: &str, pane: &str) -> Option<String> {
let needle = format!("\"pane_id\":\"{pane}\"");
let id_at = list_text.find(&needle)?;
let obj_start = list_text[..id_at].rfind('{').unwrap_or(0);
let obj_end = list_text[id_at..]
.find('}')
.map(|rel| id_at + rel)
.unwrap_or(list_text.len());
let object = &list_text[obj_start..obj_end];
let status = json_string_field(object, "agent_status")?;
if status.is_empty() {
None
} else {
Some(status)
}
}
fn json_string_field(fragment: &str, field: &str) -> Option<String> {
let key = format!("\"{field}\":\"");
let start = fragment.find(&key)? + key.len();
let rest = &fragment[start..];
let end = rest.find('"')?;
Some(rest[..end].to_string())
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_event() -> FeedEvent {
FeedEvent {
event_key: "message:m1".to_string(),
source_table: "messages".to_string(),
source_id: "m1".to_string(),
session_id: "s1".to_string(),
timestamp: "2026-05-30T00:00:00Z".to_string(),
kind: "message".to_string(),
subtype: Some("status-update".to_string()),
mid: Some("m1".to_string()),
actor_agent_id: Some("claude".to_string()),
target_agent_id: Some("codex".to_string()),
source_address: Some("w1-2".to_string()),
target_address: Some("w1-1".to_string()),
transport: Some("herdr".to_string()),
workspace_id: Some("w1".to_string()),
mode: None,
r#ref: None,
re: None,
summary: None,
body: Some("live ping".to_string()),
redaction_policy: Some("full".to_string()),
proof_audit_id: Some("a1".to_string()),
delivery_status: Some("sent".to_string()),
verified_by: Some("helper-tool".to_string()),
payload_hash: Some("deadbeef".to_string()),
artifact_path: None,
severity: None,
is_derived: false,
work: None,
decision: None,
revealable: false,
}
}
#[test]
fn feed_key_changes_on_proof_overlay_edit() {
let before = sample_event();
let mut after = before.clone();
after.delivery_status = Some("observed".to_string());
assert_ne!(
feed_key(&before),
feed_key(&after),
"an in-place proof overlay edit must alter feed_key (else the live feed goes stale)"
);
let mut sibling = before.clone();
sibling.event_key = "message:m2".to_string();
sibling.mid = Some("m2".to_string());
assert_ne!(feed_key(&before), feed_key(&sibling));
}
fn sample_gate_work_event() -> FeedEvent {
FeedEvent {
event_key: "work:1".to_string(),
source_table: "work_events".to_string(),
source_id: "1".to_string(),
kind: "gate".to_string(),
is_derived: true,
..FeedEvent::empty()
}
}
fn sample_overlay() -> DecisionView {
DecisionView {
audit_id: "od1".to_string(),
decision_type: "gate-decision".to_string(),
target_work_event_id: Some(1),
verdict: Some("approve".to_string()),
resolution: None,
mode_to: None,
target_agent: None,
reason: None,
note: None,
actor_agent_id: Some("operator".to_string()),
timestamp: "2026-05-31T00:00:00Z".to_string(),
notification_status: "not-requested".to_string(),
revealable: false,
}
}
#[test]
fn feed_diff_key_flips_when_gate_overlay_appears_or_changes() {
let event = sample_gate_work_event();
let empty: BTreeMap<i64, DecisionView> = BTreeMap::new();
let undecided = feed_diff_key(&event, &empty);
assert_eq!(undecided, feed_key(&event));
let mut decided_map = BTreeMap::new();
decided_map.insert(1_i64, sample_overlay());
let decided = feed_diff_key(&event, &decided_map);
assert_ne!(
undecided, decided,
"a gate's not-decided -> decided overlay transition must flip the diff key"
);
let mut notified_map = BTreeMap::new();
let mut notified = sample_overlay();
notified.notification_status = "sent".to_string();
notified_map.insert(1_i64, notified);
let notified_key = feed_diff_key(&event, ¬ified_map);
assert_ne!(
decided, notified_key,
"an overlay notify-status change (not-requested -> sent) must flip the diff key"
);
let mut redecided_map = BTreeMap::new();
let mut redecided = sample_overlay();
redecided.audit_id = "od2".to_string(); redecided.verdict = Some("approve".to_string()); redecided.note = Some("now with a note".to_string()); redecided_map.insert(1_i64, redecided);
assert_ne!(
decided,
feed_diff_key(&event, &redecided_map),
"a same-verdict re-decision (new audit_id / new note) must flip the diff key"
);
let mut other_map = BTreeMap::new();
other_map.insert(2_i64, sample_overlay());
assert_eq!(
undecided,
feed_diff_key(&event, &other_map),
"an overlay keyed to another work_event id must not change this row's key"
);
}
#[test]
fn feed_diff_key_flips_on_standalone_decision_notification_change() {
let empty: BTreeMap<i64, DecisionView> = BTreeMap::new();
let mut decision = sample_overlay();
decision.decision_type = "redirect".to_string();
decision.target_work_event_id = None;
decision.verdict = None;
decision.target_agent = Some("codex:w1-1".to_string());
let mut event = FeedEvent {
event_key: "decision:od1".to_string(),
source_table: "operator_decisions".to_string(),
source_id: "od1".to_string(),
kind: "redirect".to_string(),
is_derived: true,
decision: Some(decision),
..FeedEvent::empty()
};
let before = feed_diff_key(&event, &empty);
if let Some(d) = event.decision.as_mut() {
d.notification_status = "sent".to_string();
}
let after = feed_diff_key(&event, &empty);
assert_ne!(
before, after,
"a standalone decision's notify-status transition must flip the diff key"
);
}
#[test]
fn sse_event_frames_event_and_multiline_data() {
let mut buf = Vec::new();
sse_event(
&mut buf,
"feed-reset",
"<article>a</article>\n<article>b</article>",
);
let text = String::from_utf8(buf).unwrap();
assert_eq!(
text,
"event: feed-reset\ndata: <article>a</article>\ndata: <article>b</article>\n\n"
);
}
#[test]
fn sse_event_splits_bare_cr() {
let mut buf = Vec::new();
sse_event(&mut buf, "feed-reset", "a\rb");
let text = String::from_utf8(buf).unwrap();
assert_eq!(text, "event: feed-reset\ndata: a\ndata: b\n\n");
assert!(!text.contains('\r'), "no bare CR survives in any data line");
let mut crlf = Vec::new();
sse_event(&mut crlf, "feed-reset", "a\r\nb");
assert_eq!(
String::from_utf8(crlf).unwrap(),
"event: feed-reset\ndata: a\ndata: b\n\n",
"CRLF is one break, not an empty data line",
);
}
#[test]
fn roster_falls_back_to_db_when_no_herdr() {
let participants = vec![
(
"claude".to_string(),
"w1-2".to_string(),
"working".to_string(),
),
(
"codex".to_string(),
"w1-1".to_string(),
"unknown".to_string(),
),
];
let roster = roster_from_db(participants);
assert_eq!(roster.len(), 2);
assert!(roster
.iter()
.all(|e| e.provenance == Provenance::DbFallback));
assert_eq!(
roster.iter().find(|e| e.agent == "claude").unwrap().status,
"working"
);
let html = render_roster_html(&roster);
assert!(html.contains("codex") && html.contains("db-fallback"));
assert!(!html.contains("<script"), "escaped");
}
#[test]
fn herdr_status_attributes_each_panes_own_status() {
let blob = r#"{"result":{"panes":[{"pane_id":"w1-1","agent_status":"working"},{"pane_id":"w1-2","agent_status":"idle"}]}}"#;
assert_eq!(
herdr_status_for_pane(blob, "w1-1").as_deref(),
Some("working"),
"first pane keeps its own status",
);
assert_eq!(
herdr_status_for_pane(blob, "w1-2").as_deref(),
Some("idle"),
"second pane is NOT mis-attributed the first pane's status",
);
}
#[test]
fn herdr_status_does_not_collide_on_pane_id_prefix() {
let blob = r#"{"panes":[{"pane_id":"1-11","agent_status":"blocked"},{"pane_id":"1-1","agent_status":"done"}]}"#;
assert_eq!(
herdr_status_for_pane(blob, "1-1").as_deref(),
Some("done"),
"1-1 resolves to its own status, not 1-11's",
);
assert_eq!(
herdr_status_for_pane(blob, "1-11").as_deref(),
Some("blocked")
);
}
#[test]
fn herdr_status_missing_agent_status_is_none() {
let blob = r#"{"panes":[{"pane_id":"w1-1","cwd":"/x","focused":false}]}"#;
assert_eq!(herdr_status_for_pane(blob, "w1-1"), None);
}
#[test]
fn herdr_status_non_json_or_empty_is_none() {
assert_eq!(herdr_status_for_pane("not json at all", "w1-1"), None);
assert_eq!(herdr_status_for_pane("", "w1-1"), None);
let blob = r#"{"panes":[{"pane_id":"w1-2","agent_status":"idle"}]}"#;
assert_eq!(herdr_status_for_pane(blob, "w1-1"), None);
}
#[test]
fn diff_feed_appends_pure_suffix_else_resets() {
let a = vec!["k1".to_string(), "k2".to_string()];
let ab = vec!["k1".to_string(), "k2".to_string(), "k3".to_string()];
assert_eq!(diff_feed(&a, &ab), FeedDelta::Append(2));
let reordered = vec!["k2".to_string(), "k1".to_string(), "k3".to_string()];
assert_eq!(diff_feed(&a, &reordered), FeedDelta::Reset);
assert_eq!(diff_feed(&ab, &a), FeedDelta::Reset);
assert_eq!(diff_feed(&a, &a), FeedDelta::Append(2));
}
#[test]
fn usage_html_never_renders_fake_zero_cost() {
use crate::read_model::{UsageAgentRow, UsageAggregate};
let none_cost = UsageAggregate {
total_tokens: 350,
total_cost_cents: None,
per_agent: vec![UsageAgentRow {
agent: "claude".into(),
tokens: 350,
cost_cents: None,
}],
};
let html = render_usage_html(&none_cost);
assert!(html.contains("350"), "tokens are shown: {html}");
assert!(
!html.contains("$0.00"),
"an absent cost must NOT fabricate $0.00: {html}"
);
assert!(
html.contains('\u{2014}'),
"an absent cost renders as the em dash placeholder: {html}"
);
let with_cost = UsageAggregate {
total_tokens: 350,
total_cost_cents: Some(13),
per_agent: vec![UsageAgentRow {
agent: "claude".into(),
tokens: 350,
cost_cents: Some(13),
}],
};
let html = render_usage_html(&with_cost);
assert!(
html.contains("$0.13"),
"a present cost renders dollars from integer cents: {html}"
);
}
#[test]
fn usage_diff_key_changes_on_aggregate_change() {
use crate::read_model::{UsageAgentRow, UsageAggregate};
let base = UsageAggregate {
total_tokens: 100,
total_cost_cents: Some(5),
per_agent: vec![UsageAgentRow {
agent: "claude".into(),
tokens: 100,
cost_cents: Some(5),
}],
};
let mut more_tokens = base.clone();
more_tokens.total_tokens = 200;
assert_ne!(
usage_diff_key(&base),
usage_diff_key(&more_tokens),
"a total_tokens change must flip the key"
);
let mut more_cost = base.clone();
more_cost.total_cost_cents = Some(8);
assert_ne!(
usage_diff_key(&base),
usage_diff_key(&more_cost),
"a total_cost_cents change must flip the key"
);
let mut more_agent_tokens = base.clone();
more_agent_tokens.per_agent[0].tokens = 150;
assert_ne!(
usage_diff_key(&base),
usage_diff_key(&more_agent_tokens),
"a per_agent token change must flip the key"
);
}
}