use std::fs::OpenOptions;
use std::io::Write;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};
use super::messages::BrokerMessage;
use super::{AgentRecord, AgentStatusEntry, BrokerState, BrokerStateInner};
fn sender_id(msg: &BrokerMessage) -> &str {
match msg {
BrokerMessage::Status { agent_id, .. }
| BrokerMessage::Artifact { agent_id, .. }
| BrokerMessage::Blocked { agent_id, .. }
| BrokerMessage::Question { agent_id, .. }
| BrokerMessage::Intent { agent_id, .. } => agent_id,
BrokerMessage::Verified { payload, .. } => &payload.verified_by,
BrokerMessage::Feedback { payload, .. } => &payload.from,
BrokerMessage::AdvancedMain { payload } => &payload.from,
BrokerMessage::Learning { payload } => &payload.agent_id,
BrokerMessage::VerifyNow { branch_id } => branch_id,
}
}
fn upserts_roster(msg: &BrokerMessage) -> bool {
matches!(
msg,
BrokerMessage::Status { .. }
| BrokerMessage::Artifact { .. }
| BrokerMessage::Blocked { .. }
| BrokerMessage::Intent { .. }
)
}
fn update_agent_record(inner: &mut BrokerStateInner, msg: &BrokerMessage) {
if !upserts_roster(msg) {
return;
}
let ttl = inner.republish_working_ttl;
let agent_id = sender_id(msg).to_string();
let status = msg.status_label().to_string();
let now = std::time::Instant::now();
let is_committed_artifact =
matches!(msg, BrokerMessage::Artifact { payload, .. } if payload.status == "committed");
let record = inner
.agents
.entry(agent_id.clone())
.or_insert_with(|| AgentRecord {
agent_id: agent_id.clone(),
status: String::new(),
last_seen: now,
last_message: None,
last_committed_at: None,
});
let is_terminal_status = |s: &str| matches!(s, "done" | "verified" | "blocked" | "committed");
let committed_reentry = record.status == "committed"
&& status == "working"
&& !ttl.is_zero()
&& record.last_committed_at.is_some_and(|t| t.elapsed() <= ttl);
if committed_reentry || !is_terminal_status(&record.status) || is_terminal_status(&status) {
record.status = status;
}
if is_committed_artifact {
record.last_committed_at = Some(now);
}
record.last_seen = now;
record.last_message = Some(msg.clone());
if let BrokerMessage::Status { payload, .. } = msg
&& let Some(cli) = payload.cli.as_ref()
&& !cli.is_empty()
{
inner
.agent_clis
.entry(agent_id.clone())
.or_insert_with(|| cli.clone());
}
inner.queues.entry(agent_id).or_default();
}
pub fn publish_message(state: &Arc<BrokerState>, msg: &BrokerMessage) {
let seq = state.next_seq();
{
let mut inner = state.write();
update_agent_record(&mut inner, msg);
inner
.message_log
.push((seq, SystemTime::now(), msg.clone()));
route_message(&mut inner, msg, seq);
}
if let Some(agg) = state.learnings.as_ref()
&& let Ok(mut a) = agg.lock()
{
a.observe(msg);
}
if state.verify_on_commit_nudge
&& let BrokerMessage::Artifact { agent_id, payload } = msg
&& payload.status == "committed"
{
let nudge = BrokerMessage::VerifyNow {
branch_id: agent_id.clone(),
};
publish_message(state, &nudge);
}
if let Some(ctx) = state.role_gating.as_ref()
&& let BrokerMessage::Artifact { agent_id, payload } = msg
&& payload.status == "committed"
{
crate::opsx::role_guard::run_guard(state, agent_id, payload, ctx);
}
}
fn route_message(inner: &mut BrokerStateInner, msg: &BrokerMessage, seq: u64) {
match msg {
BrokerMessage::Status { .. } => {
}
BrokerMessage::Artifact { agent_id, .. } => {
let targets: Vec<String> = inner
.queues
.keys()
.filter(|id| id.as_str() != agent_id)
.cloned()
.collect();
for target in targets {
if let Some(inbox) = inner.queues.get_mut(&target) {
inbox.push((seq, msg.clone()));
}
}
}
BrokerMessage::Blocked { payload, .. } => {
if let Some(inbox) = inner.queues.get_mut(&payload.from) {
inbox.push((seq, msg.clone()));
}
}
BrokerMessage::Verified { payload, .. } => {
let sender = payload.verified_by.clone();
let targets: Vec<String> = inner
.queues
.keys()
.filter(|id| id.as_str() != sender.as_str())
.cloned()
.collect();
for target in targets {
if let Some(inbox) = inner.queues.get_mut(&target) {
inbox.push((seq, msg.clone()));
}
}
}
BrokerMessage::Feedback { agent_id, .. } => {
if let Some(inbox) = inner.queues.get_mut(agent_id) {
inbox.push((seq, msg.clone()));
}
}
BrokerMessage::Question { .. } | BrokerMessage::VerifyNow { .. } => {
let inbox = inner.queues.entry("supervisor".to_string()).or_default();
inbox.push((seq, msg.clone()));
}
BrokerMessage::Learning { payload } => {
let target = payload.branch_id.as_deref().unwrap_or("supervisor");
let inbox = inner.queues.entry(target.to_string()).or_default();
inbox.push((seq, msg.clone()));
}
BrokerMessage::Intent { agent_id, .. } => {
let targets: Vec<String> = inner
.queues
.keys()
.filter(|id| id.as_str() != agent_id)
.cloned()
.collect();
for target in targets {
if let Some(inbox) = inner.queues.get_mut(&target) {
inbox.push((seq, msg.clone()));
}
}
}
BrokerMessage::AdvancedMain { payload } => {
let sender = payload.from.clone();
let targets: Vec<String> = inner
.queues
.keys()
.filter(|id| id.as_str() != sender.as_str())
.cloned()
.collect();
for target in targets {
if let Some(inbox) = inner.queues.get_mut(&target) {
inbox.push((seq, msg.clone()));
}
}
}
}
}
pub fn poll_messages(
state: &Arc<BrokerState>,
agent_id: &str,
since: u64,
) -> (Vec<BrokerMessage>, u64) {
let inner = state.read();
let Some(inbox) = inner.queues.get(agent_id) else {
return (Vec::new(), 0);
};
let mut messages = Vec::new();
let mut last_seq: u64 = 0;
for (seq, msg) in inbox {
if *seq > since {
messages.push(msg.clone());
if *seq > last_seq {
last_seq = *seq;
}
}
}
(messages, last_seq)
}
pub fn recent_messages(
state: &Arc<BrokerState>,
limit: usize,
) -> Vec<(u64, std::time::SystemTime, BrokerMessage)> {
let inner = state.read();
inner
.message_log
.iter()
.rev()
.take(limit)
.cloned()
.collect()
}
pub fn full_log(
state: &Arc<BrokerState>,
since: u64,
) -> Vec<(u64, std::time::SystemTime, BrokerMessage)> {
let inner = state.read();
inner
.message_log
.iter()
.filter(|(seq, _, _)| *seq > since)
.cloned()
.collect()
}
pub fn agent_status_snapshot(state: &Arc<BrokerState>) -> Vec<AgentStatusEntry> {
let inner = state.read();
let mut out: Vec<AgentStatusEntry> = inner
.agents
.values()
.map(|r| {
let cli = inner
.agent_clis
.get(&r.agent_id)
.cloned()
.unwrap_or_default();
let phase = if let Some(BrokerMessage::Status { payload, .. }) = r.last_message.as_ref()
{
payload.phase.clone()
} else {
None
};
AgentStatusEntry {
agent_id: r.agent_id.clone(),
cli,
status: r.status.clone(),
last_seen_seconds: r.last_seen.elapsed().as_secs(),
summary: String::new(),
last_seen: r.last_seen,
phase,
}
})
.collect();
out.sort_by(|a, b| a.agent_id.cmp(&b.agent_id));
out
}
pub fn flush_loop(state: &Arc<BrokerState>, stop: &Arc<AtomicBool>) {
let log_path = match &state.log_path {
Some(p) => p.clone(),
None => return,
};
let mut last_flushed_seq: u64 = 0;
let flush_interval = Duration::from_secs(5);
let check_interval = Duration::from_millis(100);
loop {
let mut elapsed = Duration::ZERO;
while elapsed < flush_interval {
if stop.load(Ordering::Acquire) {
flush_entries(state, &log_path, &mut last_flushed_seq);
return;
}
std::thread::sleep(check_interval);
elapsed += check_interval;
}
flush_entries(state, &log_path, &mut last_flushed_seq);
}
}
fn flush_entries(state: &Arc<BrokerState>, log_path: &std::path::Path, last_flushed_seq: &mut u64) {
let entries: Vec<(u64, SystemTime, BrokerMessage)> = {
let inner = state.read();
inner
.message_log
.iter()
.filter(|(seq, _, _)| *seq > *last_flushed_seq)
.cloned()
.collect()
};
if entries.is_empty() {
return;
}
let Ok(mut file) = OpenOptions::new().create(true).append(true).open(log_path) else {
return; };
for (seq, timestamp, msg) in &entries {
let ts = timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.map_or_else(|_| "0".to_string(), |d| d.as_secs().to_string());
let line = format!("[{seq}] {ts} [{}] {msg}\n", msg.agent_id());
let _ = file.write_all(line.as_bytes());
}
if let Some((max_seq, _, _)) = entries.last() {
*last_flushed_seq = *max_seq;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::broker::messages::{
ArtifactPayload, BlockedPayload, FeedbackPayload, IntentPayload, QuestionPayload,
StatusPayload, VerifiedPayload,
};
use crate::broker::start_broker;
use crate::config::BrokerConfig;
fn make_status(agent_id: &str, status: &str) -> BrokerMessage {
BrokerMessage::Status {
agent_id: agent_id.to_string(),
payload: StatusPayload {
status: status.to_string(),
modified_files: vec![],
message: None,
..Default::default()
},
}
}
fn make_artifact(agent_id: &str, status: &str, exports: &[&str]) -> BrokerMessage {
BrokerMessage::Artifact {
agent_id: agent_id.to_string(),
payload: ArtifactPayload {
status: status.to_string(),
exports: exports.iter().map(|s| (*s).to_string()).collect(),
modified_files: vec!["src/main.rs".to_string()],
},
}
}
fn make_blocked(agent_id: &str, needs: &str, from: &str) -> BrokerMessage {
BrokerMessage::Blocked {
agent_id: agent_id.to_string(),
payload: BlockedPayload {
needs: needs.to_string(),
from: from.to_string(),
},
}
}
fn make_verified(agent_id: &str, verified_by: &str, message: Option<&str>) -> BrokerMessage {
BrokerMessage::Verified {
agent_id: agent_id.to_string(),
payload: VerifiedPayload {
verified_by: verified_by.to_string(),
message: message.map(str::to_string),
},
}
}
fn make_feedback(agent_id: &str, from: &str, errors: &[&str]) -> BrokerMessage {
BrokerMessage::Feedback {
agent_id: agent_id.to_string(),
payload: FeedbackPayload {
from: from.to_string(),
errors: errors.iter().map(|s| (*s).to_string()).collect(),
},
}
}
fn make_question(agent_id: &str, question: &str) -> BrokerMessage {
BrokerMessage::Question {
agent_id: agent_id.to_string(),
payload: QuestionPayload {
question: question.to_string(),
},
}
}
fn make_intent(agent_id: &str, files: &[&str], summary: &str, ttl: u64) -> BrokerMessage {
BrokerMessage::Intent {
agent_id: agent_id.to_string(),
payload: IntentPayload {
files: files
.iter()
.map(|s| crate::broker::messages::FileIntent::from(*s))
.collect(),
summary: summary.to_string(),
valid_for_seconds: ttl,
},
}
}
fn fresh_state() -> Arc<BrokerState> {
Arc::new(BrokerState::new(None))
}
#[test]
fn message_log_accumulates_three_entries() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_artifact("b", "done", &[]));
publish_message(&state, &make_blocked("c", "reason", "a"));
let inner = state.read();
assert_eq!(inner.message_log.len(), 3);
assert_eq!(inner.message_log[0].0, 1);
assert_eq!(inner.message_log[1].0, 2);
assert_eq!(inner.message_log[2].0, 3);
}
#[test]
fn message_log_includes_all_types() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
publish_message(&state, &make_blocked("b", "reason", "a"));
let inner = state.read();
assert_eq!(inner.message_log.len(), 3);
}
#[test]
fn inbox_stores_correct_sequence_number() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); publish_message(&state, &make_artifact("a", "done", &[]));
let inner = state.read();
let b_inbox = &inner.queues["b"];
assert_eq!(b_inbox.len(), 1);
assert_eq!(b_inbox[0].0, 3);
}
#[test]
fn first_publish_creates_record_and_inbox() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
let inner = state.read();
assert!(inner.agents.contains_key("feat-errors"));
assert_eq!(inner.agents["feat-errors"].status, "working");
assert!(inner.queues.contains_key("feat-errors"));
}
#[test]
fn status_not_routed_to_any_inbox() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_status("feat-errors", "idle"));
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
let (errors_msgs, _) = poll_messages(&state, "feat-errors", 0);
assert!(detect_msgs.is_empty());
assert!(errors_msgs.is_empty());
}
#[test]
fn artifact_broadcast_to_all_peers() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_status("feat-config", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
let (config_msgs, _) = poll_messages(&state, "feat-config", 0);
assert_eq!(detect_msgs.len(), 1);
assert_eq!(config_msgs.len(), 1);
}
#[test]
fn artifact_broadcast_skips_sender() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let (errors_msgs, _) = poll_messages(&state, "feat-errors", 0);
assert!(errors_msgs.is_empty());
}
#[test]
fn artifact_broadcast_skips_unregistered_agents() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let inner = state.read();
assert!(!inner.queues.contains_key("feat-detect"));
}
#[test]
fn blocked_delivered_to_target() {
let state = fresh_state();
publish_message(&state, &make_status("feat-config", "working"));
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(
&state,
&make_blocked("feat-config", "error types", "feat-errors"),
);
let (errors_msgs, _) = poll_messages(&state, "feat-errors", 0);
assert_eq!(errors_msgs.len(), 1);
assert_eq!(errors_msgs[0].agent_id(), "feat-config");
}
#[test]
fn blocked_not_delivered_to_other_agents() {
let state = fresh_state();
publish_message(&state, &make_status("feat-config", "working"));
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(
&state,
&make_blocked("feat-config", "error types", "feat-errors"),
);
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
assert!(detect_msgs.is_empty());
}
#[test]
fn blocked_to_unregistered_target_silently_dropped() {
let state = fresh_state();
publish_message(&state, &make_status("feat-config", "working"));
publish_message(
&state,
&make_blocked("feat-config", "error types", "feat-errors"),
);
let inner = state.read();
assert!(!inner.queues.contains_key("feat-errors"));
}
#[test]
fn verified_broadcast_reaches_all_peers() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_status("supervisor", "working"));
publish_message(&state, &make_verified("feat-errors", "supervisor", None));
let (errors_msgs, _) = poll_messages(&state, "feat-errors", 0);
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
assert_eq!(errors_msgs.len(), 1);
assert_eq!(detect_msgs.len(), 1);
}
#[test]
fn verified_broadcast_skips_sender() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("supervisor", "working"));
publish_message(&state, &make_verified("feat-errors", "supervisor", None));
let (sup_msgs, _) = poll_messages(&state, "supervisor", 0);
assert!(sup_msgs.is_empty());
}
#[test]
fn verified_does_not_mutate_verifier_record() {
let state = fresh_state();
publish_message(&state, &make_status("supervisor", "working"));
publish_message(&state, &make_verified("feat-errors", "supervisor", None));
let inner = state.read();
let record = inner
.agents
.get("supervisor")
.expect("supervisor record exists");
assert_eq!(
record.status, "working",
"a Verified message must not mutate the verifier's roster row",
);
}
#[test]
fn feedback_delivered_to_target_agent() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("supervisor", "working"));
publish_message(
&state,
&make_feedback("feat-errors", "supervisor", &["test failed"]),
);
let (errors_msgs, _) = poll_messages(&state, "feat-errors", 0);
assert_eq!(errors_msgs.len(), 1);
assert_eq!(errors_msgs[0].status_label(), "feedback");
}
#[test]
fn feedback_not_delivered_to_other_agents() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_status("supervisor", "working"));
publish_message(
&state,
&make_feedback("feat-errors", "supervisor", &["test failed"]),
);
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
assert!(detect_msgs.is_empty());
}
#[test]
fn feedback_does_not_mutate_sender_record() {
let state = fresh_state();
publish_message(&state, &make_status("supervisor", "working"));
publish_message(
&state,
&make_feedback("feat-errors", "supervisor", &["test failed"]),
);
let inner = state.read();
let record = inner
.agents
.get("supervisor")
.expect("supervisor record exists");
assert_eq!(
record.status, "working",
"a Feedback message must not mutate the sender's roster row",
);
}
#[test]
fn feedback_from_non_agent_creates_no_phantom_row() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("supervisor", "working"));
publish_message(
&state,
&make_feedback("feat-errors", "human", &["fix the flaky test"]),
);
let inner = state.read();
assert!(
!inner.agents.contains_key("human"),
"a feedback's `from` identity must never mint a roster row",
);
assert_eq!(
inner.agents.len(),
2,
"roster holds exactly the two status publishers",
);
drop(inner);
let (msgs, _) = poll_messages(&state, "feat-errors", 0);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].status_label(), "feedback");
}
#[test]
fn fresh_broker_state_builds_roster_only_from_status_publishers() {
let state = fresh_state();
{
let inner = state.read();
assert!(inner.agents.is_empty(), "a fresh broker has no roster rows");
}
publish_message(&state, &make_status("feat-a", "working"));
publish_message(&state, &make_status("supervisor", "working"));
publish_message(&state, &make_feedback("feat-a", "human", &["nudge"]));
let inner = state.read();
let mut ids: Vec<&str> = inner.agents.keys().map(String::as_str).collect();
ids.sort_unstable();
assert_eq!(ids, ["feat-a", "supervisor"]);
}
#[test]
fn question_and_verified_identities_create_no_phantom_rows() {
let state = fresh_state();
publish_message(&state, &make_status("supervisor", "working"));
publish_message(&state, &make_verified("feat-x", "reviewer-bot", None));
publish_message(&state, &make_question("feat-x", "proceed?"));
let inner = state.read();
assert!(!inner.agents.contains_key("reviewer-bot"));
assert!(!inner.agents.contains_key("feat-x"));
assert_eq!(
inner.agents.len(),
1,
"only the status-publishing supervisor is a roster row",
);
}
#[test]
fn question_routed_to_supervisor_inbox() {
let state = fresh_state();
publish_message(
&state,
&make_question("feat-config", "Should I skip tests?"),
);
let (msgs, _) = poll_messages(&state, "supervisor", 0);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].agent_id(), "feat-config");
assert_eq!(msgs[0].status_label(), "question");
}
#[test]
fn question_creates_supervisor_inbox_if_absent() {
let state = fresh_state();
{
let inner = state.read();
assert!(!inner.queues.contains_key("supervisor"));
}
publish_message(&state, &make_question("feat-config", "anything?"));
let inner = state.read();
assert!(inner.queues.contains_key("supervisor"));
}
#[test]
fn question_not_in_sender_inbox() {
let state = fresh_state();
publish_message(&state, &make_status("feat-config", "working"));
publish_message(&state, &make_question("feat-config", "anything?"));
let (msgs, _) = poll_messages(&state, "feat-config", 0);
assert!(msgs.is_empty());
}
#[test]
fn question_not_delivered_to_other_agents() {
let state = fresh_state();
publish_message(&state, &make_status("feat-config", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_question("feat-config", "anything?"));
let (msgs, _) = poll_messages(&state, "feat-detect", 0);
assert!(msgs.is_empty());
}
#[test]
fn question_appears_in_message_log() {
let state = fresh_state();
publish_message(&state, &make_question("feat-config", "anything?"));
let inner = state.read();
assert_eq!(inner.message_log.len(), 1);
assert_eq!(inner.message_log[0].2.status_label(), "question");
}
#[test]
fn intent_broadcast_reaches_all_peers() {
let state = fresh_state();
publish_message(&state, &make_status("feat-auth", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_status("supervisor", "working"));
publish_message(
&state,
&make_intent("feat-auth", &["src/a.rs"], "wire AuthClient", 600),
);
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
let (sup_msgs, _) = poll_messages(&state, "supervisor", 0);
assert!(
detect_msgs
.iter()
.any(|m| matches!(m, BrokerMessage::Intent { .. }))
);
assert!(
sup_msgs
.iter()
.any(|m| matches!(m, BrokerMessage::Intent { .. }))
);
}
#[test]
fn intent_broadcast_skips_sender() {
let state = fresh_state();
publish_message(&state, &make_status("feat-auth", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(
&state,
&make_intent("feat-auth", &["src/a.rs"], "wire AuthClient", 600),
);
let (own_msgs, _) = poll_messages(&state, "feat-auth", 0);
assert!(
!own_msgs
.iter()
.any(|m| matches!(m, BrokerMessage::Intent { .. }))
);
}
#[test]
fn intent_broadcast_skips_unregistered_agents() {
let state = fresh_state();
publish_message(&state, &make_status("feat-auth", "working"));
publish_message(
&state,
&make_intent("feat-auth", &["src/a.rs"], "wire AuthClient", 600),
);
let inner = state.read();
assert!(!inner.queues.contains_key("feat-detect"));
}
#[test]
fn intent_updates_sender_record_status_to_intent() {
let state = fresh_state();
publish_message(
&state,
&make_intent("feat-auth", &["src/a.rs"], "wire AuthClient", 600),
);
let inner = state.read();
let record = inner.agents.get("feat-auth").expect("record exists");
assert_eq!(record.status, "intent");
}
#[test]
fn question_does_not_create_sender_roster_row() {
let state = fresh_state();
publish_message(&state, &make_question("feat-x", "Should I rebase?"));
let inner = state.read();
assert!(
!inner.agents.contains_key("feat-x"),
"a question must not create a roster row for its sender",
);
}
#[test]
fn question_leaves_existing_sender_row_unchanged() {
let state = fresh_state();
publish_message(&state, &make_status("feat-x", "working"));
publish_message(&state, &make_question("feat-x", "Should I rebase?"));
let inner = state.read();
let record = inner
.agents
.get("feat-x")
.expect("sender record exists from its status publish");
assert_eq!(
record.status, "working",
"a question must not flip an existing row to status \"question\"",
);
}
#[test]
fn question_vs_blocked_inbox_creation_differs() {
let state = fresh_state();
publish_message(
&state,
&make_blocked("feat-x", "needs types", "feat-missing"),
);
{
let inner = state.read();
assert!(
!inner.queues.contains_key("feat-missing"),
"Blocked must not create the target inbox when it is missing"
);
}
publish_message(&state, &make_question("feat-x", "anything?"));
let inner = state.read();
assert!(
inner.queues.contains_key("supervisor"),
"Question must create the supervisor inbox when it is missing"
);
let (msgs, _) = poll_messages(&state, "supervisor", 0);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].status_label(), "question");
}
#[test]
fn poll_since_zero_returns_all() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); publish_message(&state, &make_artifact("b", "done", &[])); publish_message(&state, &make_artifact("b", "done", &[])); publish_message(&state, &make_artifact("b", "done", &[]));
let (msgs, last_seq) = poll_messages(&state, "a", 0);
assert_eq!(msgs.len(), 3);
assert_eq!(last_seq, 5);
}
#[test]
fn poll_since_filters_older_messages() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); for _ in 0..5 {
publish_message(&state, &make_artifact("b", "done", &[]));
}
let (msgs, last_seq) = poll_messages(&state, "a", 5);
assert_eq!(msgs.len(), 2); assert_eq!(last_seq, 7);
}
#[test]
fn poll_since_latest_returns_empty() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("b", "done", &[]));
let (_, first_seq) = poll_messages(&state, "a", 0);
let (msgs, last_seq) = poll_messages(&state, "a", first_seq);
assert!(msgs.is_empty());
assert_eq!(last_seq, 0);
}
#[test]
fn poll_is_nondestructive() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("b", "done", &[]));
let (msgs1, seq1) = poll_messages(&state, "a", 0);
let (msgs2, seq2) = poll_messages(&state, "a", 0);
assert_eq!(msgs1.len(), msgs2.len());
assert_eq!(seq1, seq2);
}
#[test]
fn poll_unknown_agent_returns_empty() {
let state = fresh_state();
let (msgs, last_seq) = poll_messages(&state, "feat-unknown", 0);
assert!(msgs.is_empty());
assert_eq!(last_seq, 0);
}
#[test]
fn snapshot_contains_all_registered_agents() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "idle"));
publish_message(&state, &make_status("c", "done"));
let snap = agent_status_snapshot(&state);
assert_eq!(snap.len(), 3);
}
#[test]
fn snapshot_reflects_latest_status() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let snap = agent_status_snapshot(&state);
let entry = snap.iter().find(|e| e.agent_id == "feat-errors").unwrap();
assert_eq!(entry.status, "done");
}
#[test]
fn snapshot_empty_on_fresh_state() {
let state = fresh_state();
let snap = agent_status_snapshot(&state);
assert!(snap.is_empty());
}
#[test]
fn snapshot_carries_phase_from_most_recent_status_message() {
let state = fresh_state();
let msg = BrokerMessage::Status {
agent_id: "supervisor".to_string(),
payload: StatusPayload {
status: "working".to_string(),
modified_files: vec![],
message: None,
cli: Some("claude".to_string()),
phase: Some("merging".to_string()),
detail: None,
},
};
publish_message(&state, &msg);
let snap = agent_status_snapshot(&state);
let entry = snap.iter().find(|e| e.agent_id == "supervisor").unwrap();
assert_eq!(entry.phase.as_deref(), Some("merging"));
assert_eq!(entry.cli, "claude");
}
#[test]
fn snapshot_phase_is_none_when_last_message_is_not_status() {
let state = fresh_state();
publish_message(&state, &make_status("supervisor", "working"));
publish_message(
&state,
&make_feedback("feat-x", "supervisor", &["bad test"]),
);
let snap = agent_status_snapshot(&state);
let entry = snap.iter().find(|e| e.agent_id == "supervisor").unwrap();
assert_eq!(
entry.phase, None,
"Feedback as last_message must not carry over a phase"
);
}
#[test]
fn supervisor_cli_lands_in_agent_clis_via_status_payload() {
let state = fresh_state();
let msg = BrokerMessage::Status {
agent_id: "supervisor".to_string(),
payload: StatusPayload {
status: "working".to_string(),
modified_files: vec![],
message: None,
cli: Some("claude".to_string()),
phase: Some("baseline".to_string()),
detail: None,
},
};
publish_message(&state, &msg);
let inner = state.read();
assert_eq!(
inner.agent_clis.get("supervisor").map(String::as_str),
Some("claude"),
"supervisor's cli must be upserted into agent_clis from the status payload",
);
}
#[test]
fn coding_agent_status_cli_appears_in_snapshot() {
let state = fresh_state();
let msg = BrokerMessage::Status {
agent_id: "feat-roster".to_string(),
payload: StatusPayload {
status: "working".to_string(),
modified_files: vec![],
message: None,
cli: Some("claude-oss".to_string()),
phase: None,
detail: None,
},
};
publish_message(&state, &msg);
let snap = agent_status_snapshot(&state);
let entry = snap.iter().find(|e| e.agent_id == "feat-roster").unwrap();
assert_eq!(
entry.cli, "claude-oss",
"a coding agent's cli must populate its snapshot row",
);
}
#[test]
fn seeded_cli_appears_only_after_the_pane_publishes() {
let state = Arc::new(BrokerState::new(None).with_seeded_cli("supervisor", "claude-oss"));
let snap = agent_status_snapshot(&state);
assert!(
snap.iter().all(|e| e.agent_id != "supervisor"),
"a seeded-but-unpublished pane must not show a phantom row",
);
publish_message(&state, &make_status("supervisor", "working"));
let snap = agent_status_snapshot(&state);
let entry = snap
.iter()
.find(|e| e.agent_id == "supervisor")
.expect("supervisor row appears once it publishes");
assert_eq!(
entry.cli, "claude-oss",
"the published row carries the authoritatively-seeded cli",
);
}
#[test]
fn seeded_cli_wins_over_a_wrong_self_report() {
let state = Arc::new(BrokerState::new(None).with_seeded_cli("supervisor", "claude-oss"));
let msg = BrokerMessage::Status {
agent_id: "supervisor".to_string(),
payload: StatusPayload {
status: "working".to_string(),
modified_files: vec![],
message: None,
cli: Some("claude".to_string()),
phase: None,
detail: None,
},
};
publish_message(&state, &msg);
let snap = agent_status_snapshot(&state);
let entry = snap.iter().find(|e| e.agent_id == "supervisor").unwrap();
assert_eq!(
entry.cli, "claude-oss",
"the authoritative seed must win over a wrong self-reported cli",
);
}
#[test]
fn with_seeded_cli_ignores_blank_value() {
let state = Arc::new(BrokerState::new(None).with_seeded_cli("supervisor", ""));
let snap = agent_status_snapshot(&state);
assert!(
snap.iter().all(|e| e.agent_id != "supervisor"),
"a blank seed must not create a supervisor row",
);
}
#[test]
fn snapshot_resolves_cli_from_seeded_map_when_status_omits_it() {
let state = fresh_state();
{
let mut inner = state.write();
inner
.agent_clis
.insert("feat-seeded".to_string(), "claude-oss".to_string());
}
publish_message(&state, &make_status("feat-seeded", "working"));
let snap = agent_status_snapshot(&state);
let entry = snap.iter().find(|e| e.agent_id == "feat-seeded").unwrap();
assert_eq!(
entry.cli, "claude-oss",
"snapshot must fall back to the seeded cli when status omits it",
);
}
#[test]
fn flush_writes_messages_to_disk() {
let tmp = tempfile::tempdir().unwrap();
let log_path = tmp.path().join("broker.log");
let state = Arc::new(BrokerState::new(Some(log_path.clone())));
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
let mut last_flushed = 0u64;
flush_entries(&state, &log_path, &mut last_flushed);
let content = std::fs::read_to_string(&log_path).unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 3);
assert!(lines[0].starts_with("[1]"));
assert!(lines[2].starts_with("[3]"));
assert_eq!(last_flushed, 3);
}
#[test]
fn flush_only_writes_new_entries() {
let tmp = tempfile::tempdir().unwrap();
let log_path = tmp.path().join("broker.log");
let state = Arc::new(BrokerState::new(Some(log_path.clone())));
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
let mut last_flushed = 0u64;
flush_entries(&state, &log_path, &mut last_flushed);
assert_eq!(last_flushed, 3);
publish_message(&state, &make_artifact("b", "done", &[]));
publish_message(&state, &make_artifact("a", "done", &[]));
flush_entries(&state, &log_path, &mut last_flushed);
assert_eq!(last_flushed, 5);
let content = std::fs::read_to_string(&log_path).unwrap();
assert_eq!(content.lines().count(), 5);
}
#[test]
fn final_flush_on_handle_drop() {
let tmp = tempfile::tempdir().unwrap();
let log_path = tmp.path().join("broker.log");
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_300 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
..Default::default()
};
let handle = start_broker(
&config,
BrokerState::new(Some(log_path.clone())),
Vec::new(),
);
if let Ok(handle) = handle {
publish_message(&handle.state, &make_status("a", "working"));
publish_message(&handle.state, &make_artifact("a", "done", &[]));
drop(handle);
let content = std::fs::read_to_string(&log_path).unwrap();
assert_eq!(content.lines().count(), 2);
}
}
#[test]
fn no_flush_thread_when_no_log_path() {
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_400 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
..Default::default()
};
if let Ok(handle) = start_broker(&config, BrokerState::new(None), Vec::new()) {
assert!(handle.flush_thread.is_none());
publish_message(&handle.state, &make_status("a", "working"));
let inner = handle.state.read();
assert!(inner.agents.contains_key("a"));
}
}
#[test]
fn disk_failure_does_not_affect_state() {
let bad_path = std::path::PathBuf::from("/nonexistent/path/broker.log");
let state = Arc::new(BrokerState::new(Some(bad_path.clone())));
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
let mut last_flushed = 0u64;
flush_entries(&state, &bad_path, &mut last_flushed);
let inner = state.read();
assert_eq!(inner.message_log.len(), 2);
assert!(inner.agents.contains_key("a"));
}
#[test]
fn recent_messages_returns_empty_when_no_messages() {
let state = fresh_state();
let messages = recent_messages(&state, 10);
assert!(messages.is_empty());
}
#[test]
fn recent_messages_returns_messages_in_reverse_order() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); publish_message(&state, &make_artifact("a", "done", &[]));
let messages = recent_messages(&state, 10);
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].0, 3); assert_eq!(messages[1].0, 2); assert_eq!(messages[2].0, 1); }
#[test]
fn recent_messages_respects_limit() {
let state = fresh_state();
for i in 0..5 {
publish_message(&state, &make_status(&format!("agent-{i}"), "working"));
}
let messages = recent_messages(&state, 3);
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].0, 5);
assert_eq!(messages[1].0, 4);
assert_eq!(messages[2].0, 3);
}
#[test]
fn recent_messages_includes_all_types() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_artifact("b", "done", &[]));
publish_message(&state, &make_blocked("c", "types", "b"));
publish_message(&state, &make_verified("d", "supervisor", None));
publish_message(&state, &make_feedback("e", "supervisor", &["error"]));
publish_message(&state, &make_question("f", "question?"));
let messages = recent_messages(&state, 10);
assert_eq!(messages.len(), 6);
let has_status = messages
.iter()
.any(|(_, _, msg)| matches!(msg, BrokerMessage::Status { .. }));
let has_artifact = messages
.iter()
.any(|(_, _, msg)| matches!(msg, BrokerMessage::Artifact { .. }));
let has_blocked = messages
.iter()
.any(|(_, _, msg)| matches!(msg, BrokerMessage::Blocked { .. }));
let has_verified = messages
.iter()
.any(|(_, _, msg)| matches!(msg, BrokerMessage::Verified { .. }));
let has_feedback = messages
.iter()
.any(|(_, _, msg)| matches!(msg, BrokerMessage::Feedback { .. }));
let has_question = messages
.iter()
.any(|(_, _, msg)| matches!(msg, BrokerMessage::Question { .. }));
assert!(has_status, "Should contain Status message");
assert!(has_artifact, "Should contain Artifact message");
assert!(has_blocked, "Should contain Blocked message");
assert!(has_verified, "Should contain Verified message");
assert!(has_feedback, "Should contain Feedback message");
assert!(has_question, "Should contain Question message");
}
#[test]
fn first_message_gets_sequence_one() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
let inner = state.read();
assert_eq!(inner.message_log[0].0, 1);
}
#[test]
fn sequence_numbers_are_globally_monotonic() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); publish_message(&state, &make_artifact("a", "done", &[])); publish_message(&state, &make_artifact("b", "done", &[]));
let inner = state.read();
let b_inbox_seq = inner.queues["b"][0].0; let a_inbox_seq = inner.queues["a"][0].0; assert!(b_inbox_seq < a_inbox_seq);
}
#[test]
fn terminal_state_not_overwritten_by_non_terminal() {
let state = fresh_state();
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
assert_eq!(state.read().agents["feat-errors"].status, "done");
publish_message(&state, &make_status("feat-errors", "working"));
assert_eq!(state.read().agents["feat-errors"].status, "done");
}
#[test]
fn terminal_state_not_overwritten_by_non_terminal_simple() {
let state = fresh_state();
publish_message(&state, &make_artifact("feat-simple", "done", &[]));
assert_eq!(state.read().agents["feat-simple"].status, "done");
publish_message(&state, &make_status("feat-simple", "working"));
assert_eq!(state.read().agents["feat-simple"].status, "done");
}
#[test]
fn terminal_state_can_be_overwritten_by_other_terminal() {
let state = fresh_state();
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
publish_message(&state, &make_artifact("feat-errors", "verified", &[]));
let inner = state.read();
assert_eq!(inner.agents["feat-errors"].status, "verified");
}
#[test]
fn non_terminal_state_can_be_overwritten_by_terminal() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let inner = state.read();
assert_eq!(inner.agents["feat-errors"].status, "done");
}
#[test]
fn all_terminal_states_are_protected() {
let terminal_states = ["done", "verified", "blocked"];
for &terminal_state in &terminal_states {
let agent_id = format!("feat-{terminal_state}");
let state = fresh_state();
publish_message(&state, &make_artifact(&agent_id, terminal_state, &[]));
publish_message(&state, &make_status(&agent_id, "working"));
let inner = state.read();
assert_eq!(
inner.agents[&agent_id].status, terminal_state,
"Terminal state {terminal_state} should be protected from non-terminal overwrite"
);
}
}
#[test]
fn terminal_status_protection_with_artifact_messages() {
let state = fresh_state();
publish_message(
&state,
&make_artifact("feat-config", "done", &["ConfigType"]),
);
publish_message(&state, &make_status("feat-config", "working"));
let inner = state.read();
assert_eq!(inner.agents["feat-config"].status, "done");
}
#[test]
fn terminal_status_protection_with_blocked_messages() {
let state = fresh_state();
publish_message(&state, &make_artifact("feat-ui", "blocked", &[]));
publish_message(&state, &make_status("feat-ui", "idle"));
let inner = state.read();
assert_eq!(inner.agents["feat-ui"].status, "blocked");
}
#[test]
fn committed_artifact_stamps_last_committed_at() {
let state = fresh_state();
publish_message(&state, &make_artifact("feat-x", "committed", &[]));
let inner = state.read();
let rec = inner.agents.get("feat-x").expect("record exists");
assert_eq!(rec.status, "committed");
assert!(
rec.last_committed_at.is_some(),
"committed artifact must stamp last_committed_at"
);
}
#[test]
fn committed_reenters_working_within_ttl() {
let state = fresh_state();
publish_message(&state, &make_artifact("feat-x", "committed", &[]));
assert_eq!(state.read().agents["feat-x"].status, "committed");
publish_message(&state, &make_status("feat-x", "working"));
assert_eq!(
state.read().agents["feat-x"].status,
"working",
"a working status within the TTL must re-enter the working state"
);
}
#[test]
fn committed_stays_terminal_when_ttl_zero() {
let state = fresh_state();
state.set_republish_working_ttl(Duration::ZERO);
publish_message(&state, &make_artifact("feat-x", "committed", &[]));
publish_message(&state, &make_status("feat-x", "working"));
assert_eq!(
state.read().agents["feat-x"].status,
"committed",
"with TTL=0, committed must stay terminal (v0.5.0 model)"
);
}
#[test]
fn committed_does_not_reenter_after_ttl_window() {
let state = fresh_state();
state.set_republish_working_ttl(Duration::from_secs(5));
publish_message(&state, &make_artifact("feat-x", "committed", &[]));
{
let mut inner = state.write();
let rec = inner.agents.get_mut("feat-x").unwrap();
rec.last_committed_at = Some(
std::time::Instant::now()
.checked_sub(Duration::from_secs(90))
.unwrap(),
);
}
publish_message(&state, &make_status("feat-x", "working"));
assert_eq!(
state.read().agents["feat-x"].status,
"committed",
"a working status past the TTL window must not re-enter working"
);
}
#[test]
fn question_creates_supervisor_inbox_when_absent() {
let state = fresh_state();
publish_message(&state, &make_status("feat-x", "working"));
{
let inner = state.read();
assert!(
!inner.queues.contains_key("supervisor"),
"supervisor inbox must be absent before publishing the question"
);
}
publish_message(&state, &make_question("feat-x", "How should I proceed?"));
{
let inner = state.read();
assert!(
inner.queues.contains_key("supervisor"),
"publishing an agent.question must create the supervisor inbox; got queues: {:?}",
inner.queues.keys().collect::<Vec<_>>()
);
}
let (messages, last_seq) = poll_messages(&state, "supervisor", 0);
assert_eq!(
messages.len(),
1,
"supervisor inbox should contain the published question"
);
assert!(
matches!(&messages[0], BrokerMessage::Question { agent_id, payload }
if agent_id == "feat-x" && payload.question == "How should I proceed?"),
"supervisor inbox should hold the original question; got: {:?}",
messages[0]
);
assert!(
last_seq > 0,
"poll_messages should return a non-zero cursor"
);
}
fn nudge_state() -> Arc<BrokerState> {
Arc::new(BrokerState::new(None).with_verify_on_commit_nudge(true))
}
fn supervisor_verify_now_branches(state: &Arc<BrokerState>) -> Vec<String> {
let (msgs, _) = poll_messages(state, "supervisor", 0);
msgs.into_iter()
.filter_map(|m| match m {
BrokerMessage::VerifyNow { branch_id } => Some(branch_id),
_ => None,
})
.collect()
}
#[test]
fn committed_artifact_publishes_verify_now_to_supervisor() {
let state = nudge_state();
publish_message(&state, &make_artifact("feat-foo", "committed", &[]));
assert_eq!(
supervisor_verify_now_branches(&state),
vec!["feat-foo".to_string()],
"a committed artifact must publish exactly one verify-now nudge carrying the branch"
);
}
#[test]
fn default_config_enables_the_nudge() {
let enabled = crate::config::SupervisorConfig::default().verify_on_commit_nudge_enabled();
let state = Arc::new(BrokerState::new(None).with_verify_on_commit_nudge(enabled));
publish_message(&state, &make_artifact("feat-foo", "committed", &[]));
assert_eq!(
supervisor_verify_now_branches(&state),
vec!["feat-foo".to_string()],
"the default config must enable the verify-now nudge"
);
}
#[test]
fn verify_now_carries_committing_branch_verbatim() {
let state = nudge_state();
publish_message(&state, &make_artifact("feat/foo", "committed", &[]));
assert_eq!(
supervisor_verify_now_branches(&state),
vec!["feat/foo".to_string()]
);
}
#[test]
fn committed_artifact_suppresses_nudge_when_disabled() {
let state = fresh_state();
publish_message(&state, &make_artifact("feat-foo", "committed", &[]));
assert!(
supervisor_verify_now_branches(&state).is_empty(),
"no verify-now nudge may be published when the feature is disabled"
);
}
#[test]
fn done_artifact_does_not_trigger_nudge() {
let state = nudge_state();
publish_message(&state, &make_artifact("feat-foo", "done", &[]));
assert!(
supervisor_verify_now_branches(&state).is_empty(),
"only `committed` artifacts trigger the verify-now nudge, not `done`"
);
}
#[test]
fn nudge_does_not_overwrite_committing_agent_record() {
let state = nudge_state();
publish_message(&state, &make_artifact("feat-foo", "committed", &["api_fn"]));
let inner = state.read();
let record = inner
.agents
.get("feat-foo")
.expect("committing record exists");
assert_eq!(record.status, "committed", "status must remain committed");
assert!(
matches!(record.last_message, Some(BrokerMessage::Artifact { .. })),
"last_message must remain the committing artifact, not the nudge"
);
}
#[test]
fn nudge_appears_in_message_log_after_artifact() {
let state = nudge_state();
publish_message(&state, &make_artifact("feat-foo", "committed", &[]));
let inner = state.read();
assert_eq!(inner.message_log.len(), 2, "artifact + nudge");
assert!(matches!(
inner.message_log[0].2,
BrokerMessage::Artifact { .. }
));
assert!(matches!(
inner.message_log[1].2,
BrokerMessage::VerifyNow { .. }
));
assert!(
inner.message_log[0].0 < inner.message_log[1].0,
"nudge sequence number must follow the artifact's"
);
}
}