use std::fs::OpenOptions;
use std::io::Write;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};
use super::messages::BrokerMessage;
use super::{AgentRecord, AgentStatusEntry, BrokerState, BrokerStateInner};
fn update_agent_record(inner: &mut BrokerStateInner, msg: &BrokerMessage) {
let agent_id = msg.agent_id().to_string();
let status = msg.status_label().to_string();
let record = inner
.agents
.entry(agent_id.clone())
.or_insert_with(|| AgentRecord {
agent_id: agent_id.clone(),
status: String::new(),
last_seen: std::time::Instant::now(),
last_message: None,
});
record.status = status;
record.last_seen = std::time::Instant::now();
record.last_message = Some(msg.clone());
inner.queues.entry(agent_id).or_default();
}
pub fn publish_message(state: &Arc<BrokerState>, msg: &BrokerMessage) {
let seq = state.next_seq();
let mut inner = state.write();
update_agent_record(&mut inner, msg);
inner
.message_log
.push((seq, SystemTime::now(), msg.clone()));
match msg {
BrokerMessage::Status { .. } => {
}
BrokerMessage::Artifact { agent_id, .. } => {
let targets: Vec<String> = inner
.queues
.keys()
.filter(|id| id.as_str() != agent_id)
.cloned()
.collect();
for target in targets {
if let Some(inbox) = inner.queues.get_mut(&target) {
inbox.push((seq, msg.clone()));
}
}
}
BrokerMessage::Blocked { payload, .. } => {
if let Some(inbox) = inner.queues.get_mut(&payload.from) {
inbox.push((seq, msg.clone()));
}
}
}
}
pub fn poll_messages(
state: &Arc<BrokerState>,
agent_id: &str,
since: u64,
) -> (Vec<BrokerMessage>, u64) {
let inner = state.read();
let Some(inbox) = inner.queues.get(agent_id) else {
return (Vec::new(), 0);
};
let mut messages = Vec::new();
let mut last_seq: u64 = 0;
for (seq, msg) in inbox {
if *seq > since {
messages.push(msg.clone());
if *seq > last_seq {
last_seq = *seq;
}
}
}
(messages, last_seq)
}
pub fn agent_status_snapshot(state: &Arc<BrokerState>) -> Vec<AgentStatusEntry> {
let inner = state.read();
inner
.agents
.values()
.map(|r| AgentStatusEntry {
agent_id: r.agent_id.clone(),
cli: String::new(),
status: r.status.clone(),
last_seen_seconds: r.last_seen.elapsed().as_secs(),
summary: String::new(),
last_seen: r.last_seen,
})
.collect()
}
pub fn flush_loop(state: &Arc<BrokerState>, stop: &Arc<AtomicBool>) {
let log_path = match &state.log_path {
Some(p) => p.clone(),
None => return,
};
let mut last_flushed_seq: u64 = 0;
let flush_interval = Duration::from_secs(5);
let check_interval = Duration::from_millis(100);
loop {
let mut elapsed = Duration::ZERO;
while elapsed < flush_interval {
if stop.load(Ordering::Acquire) {
flush_entries(state, &log_path, &mut last_flushed_seq);
return;
}
std::thread::sleep(check_interval);
elapsed += check_interval;
}
flush_entries(state, &log_path, &mut last_flushed_seq);
}
}
fn flush_entries(state: &Arc<BrokerState>, log_path: &std::path::Path, last_flushed_seq: &mut u64) {
let entries: Vec<(u64, SystemTime, BrokerMessage)> = {
let inner = state.read();
inner
.message_log
.iter()
.filter(|(seq, _, _)| *seq > *last_flushed_seq)
.cloned()
.collect()
};
if entries.is_empty() {
return;
}
let Ok(mut file) = OpenOptions::new().create(true).append(true).open(log_path) else {
return; };
for (seq, timestamp, msg) in &entries {
let ts = timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.map_or_else(|_| "0".to_string(), |d| d.as_secs().to_string());
let line = format!("[{seq}] {ts} [{}] {msg}\n", msg.agent_id());
let _ = file.write_all(line.as_bytes());
}
if let Some((max_seq, _, _)) = entries.last() {
*last_flushed_seq = *max_seq;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::broker::messages::{ArtifactPayload, BlockedPayload, StatusPayload};
use crate::broker::start_broker;
use crate::config::BrokerConfig;
fn make_status(agent_id: &str, status: &str) -> BrokerMessage {
BrokerMessage::Status {
agent_id: agent_id.to_string(),
payload: StatusPayload {
status: status.to_string(),
modified_files: vec![],
message: None,
},
}
}
fn make_artifact(agent_id: &str, status: &str, exports: &[&str]) -> BrokerMessage {
BrokerMessage::Artifact {
agent_id: agent_id.to_string(),
payload: ArtifactPayload {
status: status.to_string(),
exports: exports.iter().map(|s| (*s).to_string()).collect(),
modified_files: vec!["src/main.rs".to_string()],
},
}
}
fn make_blocked(agent_id: &str, needs: &str, from: &str) -> BrokerMessage {
BrokerMessage::Blocked {
agent_id: agent_id.to_string(),
payload: BlockedPayload {
needs: needs.to_string(),
from: from.to_string(),
},
}
}
fn fresh_state() -> Arc<BrokerState> {
Arc::new(BrokerState::new(None))
}
#[test]
fn message_log_accumulates_three_entries() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_artifact("b", "done", &[]));
publish_message(&state, &make_blocked("c", "reason", "a"));
let inner = state.read();
assert_eq!(inner.message_log.len(), 3);
assert_eq!(inner.message_log[0].0, 1);
assert_eq!(inner.message_log[1].0, 2);
assert_eq!(inner.message_log[2].0, 3);
}
#[test]
fn message_log_includes_all_types() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
publish_message(&state, &make_blocked("b", "reason", "a"));
let inner = state.read();
assert_eq!(inner.message_log.len(), 3);
}
#[test]
fn inbox_stores_correct_sequence_number() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); publish_message(&state, &make_artifact("a", "done", &[]));
let inner = state.read();
let b_inbox = &inner.queues["b"];
assert_eq!(b_inbox.len(), 1);
assert_eq!(b_inbox[0].0, 3);
}
#[test]
fn first_publish_creates_record_and_inbox() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
let inner = state.read();
assert!(inner.agents.contains_key("feat-errors"));
assert_eq!(inner.agents["feat-errors"].status, "working");
assert!(inner.queues.contains_key("feat-errors"));
}
#[test]
fn status_not_routed_to_any_inbox() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_status("feat-errors", "idle"));
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
let (errors_msgs, _) = poll_messages(&state, "feat-errors", 0);
assert!(detect_msgs.is_empty());
assert!(errors_msgs.is_empty());
}
#[test]
fn artifact_broadcast_to_all_peers() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_status("feat-config", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
let (config_msgs, _) = poll_messages(&state, "feat-config", 0);
assert_eq!(detect_msgs.len(), 1);
assert_eq!(config_msgs.len(), 1);
}
#[test]
fn artifact_broadcast_skips_sender() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let (errors_msgs, _) = poll_messages(&state, "feat-errors", 0);
assert!(errors_msgs.is_empty());
}
#[test]
fn artifact_broadcast_skips_unregistered_agents() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let inner = state.read();
assert!(!inner.queues.contains_key("feat-detect"));
}
#[test]
fn blocked_delivered_to_target() {
let state = fresh_state();
publish_message(&state, &make_status("feat-config", "working"));
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(
&state,
&make_blocked("feat-config", "error types", "feat-errors"),
);
let (errors_msgs, _) = poll_messages(&state, "feat-errors", 0);
assert_eq!(errors_msgs.len(), 1);
assert_eq!(errors_msgs[0].agent_id(), "feat-config");
}
#[test]
fn blocked_not_delivered_to_other_agents() {
let state = fresh_state();
publish_message(&state, &make_status("feat-config", "working"));
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_status("feat-detect", "working"));
publish_message(
&state,
&make_blocked("feat-config", "error types", "feat-errors"),
);
let (detect_msgs, _) = poll_messages(&state, "feat-detect", 0);
assert!(detect_msgs.is_empty());
}
#[test]
fn blocked_to_unregistered_target_silently_dropped() {
let state = fresh_state();
publish_message(&state, &make_status("feat-config", "working"));
publish_message(
&state,
&make_blocked("feat-config", "error types", "feat-errors"),
);
let inner = state.read();
assert!(!inner.queues.contains_key("feat-errors"));
}
#[test]
fn poll_since_zero_returns_all() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); publish_message(&state, &make_artifact("b", "done", &[])); publish_message(&state, &make_artifact("b", "done", &[])); publish_message(&state, &make_artifact("b", "done", &[]));
let (msgs, last_seq) = poll_messages(&state, "a", 0);
assert_eq!(msgs.len(), 3);
assert_eq!(last_seq, 5);
}
#[test]
fn poll_since_filters_older_messages() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); for _ in 0..5 {
publish_message(&state, &make_artifact("b", "done", &[]));
}
let (msgs, last_seq) = poll_messages(&state, "a", 5);
assert_eq!(msgs.len(), 2); assert_eq!(last_seq, 7);
}
#[test]
fn poll_since_latest_returns_empty() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("b", "done", &[]));
let (_, first_seq) = poll_messages(&state, "a", 0);
let (msgs, last_seq) = poll_messages(&state, "a", first_seq);
assert!(msgs.is_empty());
assert_eq!(last_seq, 0);
}
#[test]
fn poll_is_nondestructive() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("b", "done", &[]));
let (msgs1, seq1) = poll_messages(&state, "a", 0);
let (msgs2, seq2) = poll_messages(&state, "a", 0);
assert_eq!(msgs1.len(), msgs2.len());
assert_eq!(seq1, seq2);
}
#[test]
fn poll_unknown_agent_returns_empty() {
let state = fresh_state();
let (msgs, last_seq) = poll_messages(&state, "feat-unknown", 0);
assert!(msgs.is_empty());
assert_eq!(last_seq, 0);
}
#[test]
fn snapshot_contains_all_registered_agents() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "idle"));
publish_message(&state, &make_status("c", "done"));
let snap = agent_status_snapshot(&state);
assert_eq!(snap.len(), 3);
}
#[test]
fn snapshot_reflects_latest_status() {
let state = fresh_state();
publish_message(&state, &make_status("feat-errors", "working"));
publish_message(&state, &make_artifact("feat-errors", "done", &[]));
let snap = agent_status_snapshot(&state);
let entry = snap.iter().find(|e| e.agent_id == "feat-errors").unwrap();
assert_eq!(entry.status, "done");
}
#[test]
fn snapshot_empty_on_fresh_state() {
let state = fresh_state();
let snap = agent_status_snapshot(&state);
assert!(snap.is_empty());
}
#[test]
fn flush_writes_messages_to_disk() {
let tmp = tempfile::tempdir().unwrap();
let log_path = tmp.path().join("broker.log");
let state = Arc::new(BrokerState::new(Some(log_path.clone())));
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
let mut last_flushed = 0u64;
flush_entries(&state, &log_path, &mut last_flushed);
let content = std::fs::read_to_string(&log_path).unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 3);
assert!(lines[0].starts_with("[1]"));
assert!(lines[2].starts_with("[3]"));
assert_eq!(last_flushed, 3);
}
#[test]
fn flush_only_writes_new_entries() {
let tmp = tempfile::tempdir().unwrap();
let log_path = tmp.path().join("broker.log");
let state = Arc::new(BrokerState::new(Some(log_path.clone())));
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
let mut last_flushed = 0u64;
flush_entries(&state, &log_path, &mut last_flushed);
assert_eq!(last_flushed, 3);
publish_message(&state, &make_artifact("b", "done", &[]));
publish_message(&state, &make_artifact("a", "done", &[]));
flush_entries(&state, &log_path, &mut last_flushed);
assert_eq!(last_flushed, 5);
let content = std::fs::read_to_string(&log_path).unwrap();
assert_eq!(content.lines().count(), 5);
}
#[test]
fn final_flush_on_handle_drop() {
let tmp = tempfile::tempdir().unwrap();
let log_path = tmp.path().join("broker.log");
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_300 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
};
let handle = start_broker(&config, BrokerState::new(Some(log_path.clone())));
if let Ok(handle) = handle {
publish_message(&handle.state, &make_status("a", "working"));
publish_message(&handle.state, &make_artifact("a", "done", &[]));
drop(handle);
let content = std::fs::read_to_string(&log_path).unwrap();
assert_eq!(content.lines().count(), 2);
}
}
#[test]
fn no_flush_thread_when_no_log_path() {
let config = BrokerConfig {
enabled: true,
#[allow(clippy::cast_possible_truncation)]
port: 19_400 + (std::process::id() as u16 % 100),
bind: "127.0.0.1".to_string(),
};
if let Ok(handle) = start_broker(&config, BrokerState::new(None)) {
assert!(handle.flush_thread.is_none());
publish_message(&handle.state, &make_status("a", "working"));
let inner = handle.state.read();
assert!(inner.agents.contains_key("a"));
}
}
#[test]
fn disk_failure_does_not_affect_state() {
let bad_path = std::path::PathBuf::from("/nonexistent/path/broker.log");
let state = Arc::new(BrokerState::new(Some(bad_path.clone())));
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
let mut last_flushed = 0u64;
flush_entries(&state, &bad_path, &mut last_flushed);
let inner = state.read();
assert_eq!(inner.message_log.len(), 2);
assert!(inner.agents.contains_key("a"));
}
#[test]
fn first_message_gets_sequence_one() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working"));
publish_message(&state, &make_status("b", "working"));
publish_message(&state, &make_artifact("a", "done", &[]));
let inner = state.read();
assert_eq!(inner.message_log[0].0, 1);
}
#[test]
fn sequence_numbers_are_globally_monotonic() {
let state = fresh_state();
publish_message(&state, &make_status("a", "working")); publish_message(&state, &make_status("b", "working")); publish_message(&state, &make_artifact("a", "done", &[])); publish_message(&state, &make_artifact("b", "done", &[]));
let inner = state.read();
let b_inbox_seq = inner.queues["b"][0].0; let a_inbox_seq = inner.queues["a"][0].0; assert!(b_inbox_seq < a_inbox_seq);
}
}