use std::sync::{Arc, OnceLock};
use axum::Router;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use regex::Regex;
use serde::{Deserialize, Serialize};
use super::BrokerState;
use super::delivery;
use super::messages::BrokerMessage;
use super::{WatchTarget, watcher};
fn agent_id_regex() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| {
Regex::new(r"^(supervisor|feat/[a-z0-9][a-z0-9-]+|feat-[a-z0-9][a-z0-9-]+)$")
.expect("AGENT_ID_RE compiles")
})
}
fn placeholder_regex() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"^<.*>$").expect("PLACEHOLDER_RE compiles"))
}
fn agent_id_rejection(value: &str) -> Response {
(
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({
"error": "invalid agent_id",
"value": value,
"detail": "agent_id must be 'supervisor' or match feat-{name} / feat/{name}",
})),
)
.into_response()
}
fn placeholder_rejection(field: &str, value: &str) -> Response {
(
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({
"error": "field looks like an unfilled placeholder",
"field": field,
"value": value,
"detail": "substitute the real value before publishing",
})),
)
.into_response()
}
fn check_placeholder_fields(msg: &BrokerMessage) -> Option<Response> {
let re = placeholder_regex();
match msg {
BrokerMessage::Question { payload, .. } => {
if re.is_match(&payload.question) {
return Some(placeholder_rejection("question", &payload.question));
}
}
BrokerMessage::Blocked { payload, .. } => {
if re.is_match(&payload.needs) {
return Some(placeholder_rejection("needs", &payload.needs));
}
}
BrokerMessage::Feedback { payload, .. } => {
for err in &payload.errors {
if re.is_match(err) {
return Some(placeholder_rejection("errors", err));
}
}
}
BrokerMessage::Status { .. }
| BrokerMessage::Artifact { .. }
| BrokerMessage::Verified { .. }
| BrokerMessage::Intent { .. }
| BrokerMessage::AdvancedMain { .. }
| BrokerMessage::Learning { .. }
| BrokerMessage::VerifyNow { .. } => {}
}
None
}
#[derive(Deserialize)]
struct WatchRequest {
agent_id: String,
worktree_path: String,
#[serde(default)]
cli: String,
}
#[derive(Deserialize)]
struct PollQuery {
since: Option<String>,
}
#[derive(Serialize)]
struct PollResponse {
messages: Vec<BrokerMessage>,
last_seq: u64,
}
#[derive(Serialize)]
struct LogResponse {
entries: Vec<LogEntry>,
last_seq: u64,
}
#[derive(Serialize)]
struct LogEntry {
seq: u64,
timestamp_unix_secs: u64,
message: BrokerMessage,
}
pub fn router(state: Arc<BrokerState>) -> Router {
Router::new()
.route("/publish", post(publish))
.route("/watch", post(watch))
.route("/messages/{agent_id}", get(messages))
.route("/status", get(status))
.route("/log", get(log))
.with_state(state)
}
async fn publish(
State(state): State<Arc<BrokerState>>,
headers: HeaderMap,
body: String,
) -> Response {
let content_type = headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !content_type.starts_with("application/json") {
return (
StatusCode::UNSUPPORTED_MEDIA_TYPE,
axum::Json(serde_json::json!({"error": "Content-Type must be application/json"})),
)
.into_response();
}
if body.is_empty() {
return (
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({"error": "request body must not be empty"})),
)
.into_response();
}
match BrokerMessage::from_json(&body) {
Ok(msg) => {
if !agent_id_regex().is_match(msg.agent_id()) {
return agent_id_rejection(msg.agent_id());
}
if let Some(rejection) = check_placeholder_fields(&msg) {
return rejection;
}
delivery::publish_message(&state, &msg);
StatusCode::ACCEPTED.into_response()
}
Err(e) => (
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn watch(
State(state): State<Arc<BrokerState>>,
headers: HeaderMap,
body: String,
) -> Response {
let content_type = headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !content_type.starts_with("application/json") {
return (
StatusCode::UNSUPPORTED_MEDIA_TYPE,
axum::Json(serde_json::json!({"error": "Content-Type must be application/json"})),
)
.into_response();
}
if body.is_empty() {
return (
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({"error": "request body must not be empty"})),
)
.into_response();
}
let req: WatchRequest = match serde_json::from_str(&body) {
Ok(r) => r,
Err(e) => {
return (
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({"error": e.to_string()})),
)
.into_response();
}
};
if !agent_id_regex().is_match(&req.agent_id) {
return agent_id_rejection(&req.agent_id);
}
if req.worktree_path.is_empty() {
return (
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({"error": "worktree_path must not be empty"})),
)
.into_response();
}
if placeholder_regex().is_match(&req.worktree_path) {
return placeholder_rejection("worktree_path", &req.worktree_path);
}
let target = WatchTarget {
agent_id: req.agent_id,
cli: req.cli,
worktree_path: std::path::PathBuf::from(req.worktree_path),
};
if state.register_watch_target(&target)
&& let Some(rx) = state.watcher_shutdown_rx()
{
tokio::spawn(watcher::watch_worktree(Arc::clone(&state), target, rx));
}
StatusCode::ACCEPTED.into_response()
}
async fn messages(
State(state): State<Arc<BrokerState>>,
Path(agent_id): Path<String>,
Query(params): Query<PollQuery>,
) -> Response {
if agent_id.is_empty()
|| !agent_id
.chars()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_')
{
return (
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({"error": "agent_id must match [a-z0-9-_]+"})),
)
.into_response();
}
let since = match params.since {
Some(s) => match s.parse::<u64>() {
Ok(n) => n,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({"error": "since must be a valid u64"})),
)
.into_response();
}
},
None => 0,
};
let (msgs, last_seq) = delivery::poll_messages(&state, &agent_id, since);
(
StatusCode::OK,
axum::Json(PollResponse {
messages: msgs,
last_seq,
}),
)
.into_response()
}
async fn log(State(state): State<Arc<BrokerState>>, Query(params): Query<PollQuery>) -> Response {
let since = match params.since {
Some(s) => match s.parse::<u64>() {
Ok(n) => n,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({"error": "since must be a valid u64"})),
)
.into_response();
}
},
None => 0,
};
let raw = delivery::full_log(&state, since);
let last_seq = raw.iter().map(|(s, _, _)| *s).max().unwrap_or(0);
let entries: Vec<LogEntry> = raw
.into_iter()
.map(|(seq, ts, message)| LogEntry {
seq,
timestamp_unix_secs: ts
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs()),
message,
})
.collect();
(
StatusCode::OK,
axum::Json(LogResponse { entries, last_seq }),
)
.into_response()
}
async fn status(State(state): State<Arc<BrokerState>>) -> Response {
let uptime = state.uptime_seconds();
let agents = delivery::agent_status_snapshot(&state);
(
StatusCode::OK,
axum::Json(serde_json::json!({
"git_paw": true,
"version": env!("CARGO_PKG_VERSION"),
"uptime_seconds": uptime,
"agents": agents,
})),
)
.into_response()
}
#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::Request;
use tower::ServiceExt;
fn test_router() -> Router {
router(Arc::new(BrokerState::new(None)))
}
#[tokio::test]
async fn publish_valid_message_returns_202() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/publish")
.header("content-type", "application/json")
.body(Body::from(
r#"{"type":"agent.status","agent_id":"feat-xx","payload":{"status":"idle","modified_files":[]}}"#,
))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::ACCEPTED);
}
#[tokio::test]
async fn publish_invalid_json_returns_400() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/publish")
.header("content-type", "application/json")
.body(Body::from("not json"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn publish_empty_body_returns_400() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/publish")
.header("content-type", "application/json")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn publish_wrong_content_type_returns_415() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/publish")
.header("content-type", "text/plain")
.body(Body::from("{}"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
}
#[tokio::test]
async fn publish_missing_content_type_returns_415() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/publish")
.body(Body::from("{}"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
}
#[tokio::test]
async fn publish_empty_agent_id_returns_400() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/publish")
.header("content-type", "application/json")
.body(Body::from(
r#"{"type":"agent.status","agent_id":"","payload":{"status":"idle","modified_files":[]}}"#,
))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
async fn post_publish(body: &'static str) -> (StatusCode, axum::body::Bytes) {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/publish")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
let status = resp.status();
let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
(status, bytes)
}
#[tokio::test]
async fn agent_id_rejects_single_letter() {
let (status, body) = post_publish(
r#"{"type":"agent.status","agent_id":"a","payload":{"status":"working","modified_files":[]}}"#,
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let text = String::from_utf8_lossy(&body);
assert!(
text.contains("invalid agent_id"),
"body should mention 'invalid agent_id'; got: {text}"
);
}
#[tokio::test]
async fn agent_id_rejects_placeholder() {
let (status, body) = post_publish(
r#"{"type":"agent.status","agent_id":"<agent-id>","payload":{"status":"working","modified_files":[]}}"#,
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let text = String::from_utf8_lossy(&body);
assert!(text.contains("invalid agent_id"), "body: {text}");
}
#[tokio::test]
async fn agent_id_rejects_empty() {
let (status, body) = post_publish(
r#"{"type":"agent.status","agent_id":"","payload":{"status":"working","modified_files":[]}}"#,
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let _ = body;
}
#[tokio::test]
async fn agent_id_accepts_supervisor() {
let (status, _) = post_publish(
r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[]}}"#,
)
.await;
assert!(
status == StatusCode::ACCEPTED || status == StatusCode::OK,
"supervisor should be accepted; got: {status}"
);
}
#[tokio::test]
async fn agent_id_accepts_feat_dash() {
let (status, _) = post_publish(
r#"{"type":"agent.status","agent_id":"feat-test-branch","payload":{"status":"working","modified_files":[]}}"#,
)
.await;
assert!(
status == StatusCode::ACCEPTED || status == StatusCode::OK,
"feat-test-branch should be accepted; got: {status}"
);
}
#[tokio::test]
async fn agent_id_accepts_feat_slash() {
let (status, _) = post_publish(
r#"{"type":"agent.status","agent_id":"feat/test-branch","payload":{"status":"working","modified_files":[]}}"#,
)
.await;
assert!(
status == StatusCode::ACCEPTED || status == StatusCode::OK,
"feat/test-branch should be accepted; got: {status}"
);
}
#[tokio::test]
async fn payload_question_rejects_placeholder() {
let (status, body) = post_publish(
r#"{"type":"agent.question","agent_id":"feat-test-branch","payload":{"question":"<your specific question>"}}"#,
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let text = String::from_utf8_lossy(&body);
assert!(
text.contains("placeholder") && text.contains("question"),
"body should mention both 'placeholder' and 'question'; got: {text}"
);
}
#[tokio::test]
async fn payload_question_accepts_real_content() {
let (status, _) = post_publish(
r#"{"type":"agent.question","agent_id":"feat-test-branch","payload":{"question":"Should we use bcrypt or argon2?"}}"#,
)
.await;
assert!(
status == StatusCode::ACCEPTED || status == StatusCode::OK,
"real human content should be accepted; got: {status}"
);
}
#[tokio::test]
async fn payload_blocked_rejects_placeholder_needs() {
let (status, body) = post_publish(
r#"{"type":"agent.blocked","agent_id":"feat-test-branch","payload":{"needs":"<what>","from":"feat-other"}}"#,
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let text = String::from_utf8_lossy(&body);
assert!(
text.contains("placeholder") && text.contains("needs"),
"body: {text}"
);
}
#[tokio::test]
async fn payload_feedback_rejects_placeholder_error_entry() {
let (status, body) = post_publish(
r#"{"type":"agent.feedback","agent_id":"feat-test-branch","payload":{"from":"supervisor","errors":["<error 1>"]}}"#,
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let text = String::from_utf8_lossy(&body);
assert!(
text.contains("placeholder") && text.contains("errors"),
"body: {text}"
);
}
#[tokio::test]
async fn advanced_main_accepted_through_publish_endpoint() {
let (status, _) = post_publish(
r#"{"type":"agent.advanced-main","from":"supervisor","merged_branch":"feat/auth","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z","summary":"landed auth"}"#,
)
.await;
assert_eq!(
status,
StatusCode::ACCEPTED,
"a well-formed advanced-main must be accepted (202)"
);
}
#[tokio::test]
async fn advanced_main_missing_field_returns_400_naming_field() {
let (status, body) = post_publish(
r#"{"type":"agent.advanced-main","from":"supervisor","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z"}"#,
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
let text = String::from_utf8_lossy(&body);
assert!(
text.contains("merged_branch"),
"the 400 must name the missing field; got: {text}"
);
}
#[tokio::test]
async fn advanced_main_routes_to_every_registered_agent() {
let state = Arc::new(BrokerState::new(None));
publish_json(
&state,
r#"{"type":"agent.status","agent_id":"feat-alpha","payload":{"status":"working","modified_files":[]}}"#,
)
.await;
publish_json(
&state,
r#"{"type":"agent.status","agent_id":"feat-beta","payload":{"status":"working","modified_files":[]}}"#,
)
.await;
publish_json(
&state,
r#"{"type":"agent.advanced-main","from":"supervisor","merged_branch":"feat/alpha","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z"}"#,
)
.await;
for agent in ["feat-alpha", "feat-beta"] {
let (msgs, _) = delivery::poll_messages(&state, agent, 0);
assert!(
msgs.iter()
.any(|m| matches!(m, BrokerMessage::AdvancedMain { .. })),
"{agent} inbox must surface the advanced-main event"
);
}
}
#[tokio::test]
async fn messages_valid_agent_returns_200_with_last_seq() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/messages/feat-x")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["messages"], serde_json::json!([]));
assert_eq!(json["last_seq"], serde_json::json!(0));
}
#[tokio::test]
async fn messages_invalid_agent_returns_400() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/messages/INVALID!")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn messages_invalid_since_returns_400() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/messages/feat-x?since=abc")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn status_returns_marker_and_version() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["git_paw"], true);
assert!(json["version"].is_string());
assert!(json["uptime_seconds"].is_number());
assert_eq!(json["agents"], serde_json::json!([]));
}
async fn publish_json(state: &Arc<BrokerState>, body: &'static str) {
let resp = router(Arc::clone(state))
.oneshot(
Request::builder()
.method("POST")
.uri("/publish")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::ACCEPTED);
}
async fn get_status(state: &Arc<BrokerState>) -> serde_json::Value {
let resp = router(Arc::clone(state))
.oneshot(
Request::builder()
.method("GET")
.uri("/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
serde_json::from_slice(&body).unwrap()
}
#[tokio::test]
async fn e2e_feedback_from_human_creates_no_phantom_roster_row() {
let state = Arc::new(BrokerState::new(None));
publish_json(
&state,
r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
)
.await;
publish_json(
&state,
r#"{"type":"agent.status","agent_id":"feat-roster","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
)
.await;
publish_json(
&state,
r#"{"type":"agent.feedback","agent_id":"feat-roster","payload":{"from":"human","errors":["fix the flaky test"]}}"#,
)
.await;
let json = get_status(&state).await;
let agents = json["agents"].as_array().expect("agents array");
let ids: Vec<&str> = agents
.iter()
.map(|a| a["agent_id"].as_str().unwrap())
.collect();
assert!(
!ids.contains(&"human"),
"a feedback `from:human` must not mint a phantom roster row; got {ids:?}",
);
assert_eq!(ids.len(), 2, "roster holds exactly the two real agents");
}
#[tokio::test]
async fn e2e_status_shows_cli_for_every_agent() {
let state = Arc::new(BrokerState::new(None));
publish_json(
&state,
r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
)
.await;
publish_json(
&state,
r#"{"type":"agent.status","agent_id":"feat-build","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
)
.await;
let json = get_status(&state).await;
let agents = json["agents"].as_array().expect("agents array");
assert_eq!(agents.len(), 2);
for a in agents {
assert_eq!(
a["cli"].as_str(),
Some("claude-oss"),
"every agent row must carry its cli: {a}",
);
}
}
fn init_test_repo_server(dir: &std::path::Path) {
use std::process::Command;
let run = |args: &[&str]| {
Command::new("git")
.args(args)
.current_dir(dir)
.output()
.expect("git command failed");
};
run(&["init", "-q", "-b", "main"]);
run(&["config", "user.email", "test@example.com"]);
run(&["config", "user.name", "test"]);
run(&["commit", "--allow-empty", "-m", "root", "-q"]);
}
async fn post_watch(state: &Arc<BrokerState>, body: String) -> StatusCode {
router(Arc::clone(state))
.oneshot(
Request::builder()
.method("POST")
.uri("/watch")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap(),
)
.await
.unwrap()
.status()
}
#[tokio::test]
async fn watch_registers_target_and_surfaces_worktree_in_status() {
use super::super::watcher::POLL_INTERVAL;
let tmp = tempfile::tempdir().unwrap();
init_test_repo_server(tmp.path());
let state = Arc::new(BrokerState::new(None));
let (tx, rx) = tokio::sync::watch::channel(false);
state.set_watcher_shutdown_rx(rx);
let body = format!(
r#"{{"agent_id":"feat-hot","worktree_path":"{}","cli":"claude"}}"#,
tmp.path().display()
);
assert_eq!(post_watch(&state, body).await, StatusCode::ACCEPTED);
std::fs::write(tmp.path().join("hot.rs"), "fn hot() {}").unwrap();
let mut found = false;
for _ in 0..20 {
tokio::time::sleep(POLL_INTERVAL / 2).await;
let json = get_status(&state).await;
if let Some(agents) = json["agents"].as_array()
&& agents.iter().any(|a| a["agent_id"] == "feat-hot")
{
found = true;
break;
}
}
assert!(
found,
"a registered worktree must surface its agent in /status from activity"
);
let _ = tx.send(true);
}
#[tokio::test]
async fn watch_duplicate_registration_is_a_noop_success() {
let state = Arc::new(BrokerState::new(None));
let body = r#"{"agent_id":"feat-hot","worktree_path":"/tmp/feat-hot","cli":"claude"}"#;
assert_eq!(
post_watch(&state, body.to_string()).await,
StatusCode::ACCEPTED
);
assert_eq!(
post_watch(&state, body.to_string()).await,
StatusCode::ACCEPTED
);
assert_eq!(
state.read().watched_paths.len(),
1,
"duplicate registration must not record a second target"
);
}
#[tokio::test]
async fn watch_rejects_invalid_agent_id() {
let state = Arc::new(BrokerState::new(None));
let body = r#"{"agent_id":"a","worktree_path":"/tmp/x","cli":"claude"}"#;
assert_eq!(
post_watch(&state, body.to_string()).await,
StatusCode::BAD_REQUEST
);
}
#[tokio::test]
async fn watch_rejects_placeholder_worktree_path() {
let state = Arc::new(BrokerState::new(None));
let body = r#"{"agent_id":"feat-hot","worktree_path":"<path>","cli":"claude"}"#;
assert_eq!(
post_watch(&state, body.to_string()).await,
StatusCode::BAD_REQUEST
);
}
#[tokio::test]
async fn watch_rejects_empty_worktree_path() {
let state = Arc::new(BrokerState::new(None));
let body = r#"{"agent_id":"feat-hot","worktree_path":"","cli":"claude"}"#;
assert_eq!(
post_watch(&state, body.to_string()).await,
StatusCode::BAD_REQUEST
);
}
#[tokio::test]
async fn watch_wrong_content_type_returns_415() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("POST")
.uri("/watch")
.header("content-type", "text/plain")
.body(Body::from("{}"))
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
}
#[tokio::test]
async fn unknown_route_returns_404() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/unknown/route")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn wrong_method_returns_405() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/publish")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[tokio::test]
async fn panic_in_handler_is_isolated() {
let app = Router::new()
.route(
"/panic",
get(|| async {
panic!("deliberate test panic");
#[allow(unreachable_code)]
StatusCode::OK.into_response()
}),
)
.route("/status", get(status))
.with_state(Arc::new(BrokerState::new(None)));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server = tokio::spawn(async move {
axum::serve(listener, app).await.ok();
});
let client =
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
.build_http();
let _panic_resp = client
.request(
Request::builder()
.method("GET")
.uri(format!("http://{addr}/panic"))
.body(axum::body::Body::empty())
.unwrap(),
)
.await;
let status_resp = client
.request(
Request::builder()
.method("GET")
.uri(format!("http://{addr}/status"))
.body(axum::body::Body::empty())
.unwrap(),
)
.await
.expect("server should still be alive after a panic in another handler");
assert_eq!(status_resp.status(), StatusCode::OK);
server.abort();
}
#[tokio::test]
async fn log_returns_full_message_log_in_chronological_order() {
let state = Arc::new(BrokerState::new(None));
for (agent, status_label) in [
("feat-a", "working"),
("feat-b", "blocked"),
("feat-c", "done"),
] {
let msg = BrokerMessage::Status {
agent_id: agent.to_string(),
payload: super::super::messages::StatusPayload {
status: status_label.to_string(),
modified_files: vec![],
message: None,
..Default::default()
},
};
delivery::publish_message(&state, &msg);
}
let app = router(state);
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/log")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
let entries = parsed["entries"].as_array().expect("entries array");
assert_eq!(entries.len(), 3, "all three messages must appear in /log");
assert_eq!(entries[0]["message"]["agent_id"], "feat-a");
assert_eq!(entries[2]["message"]["agent_id"], "feat-c");
assert_eq!(parsed["last_seq"], 3);
}
#[tokio::test]
async fn log_with_since_filters_older_entries() {
let state = Arc::new(BrokerState::new(None));
for agent in ["feat-a", "feat-b", "feat-c"] {
let msg = BrokerMessage::Status {
agent_id: agent.to_string(),
payload: super::super::messages::StatusPayload {
status: "working".to_string(),
modified_files: vec![],
message: None,
..Default::default()
},
};
delivery::publish_message(&state, &msg);
}
let app = router(state);
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/log?since=2")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
let entries = parsed["entries"].as_array().unwrap();
assert_eq!(
entries.len(),
1,
"since=2 must yield only the message at seq=3"
);
assert_eq!(entries[0]["seq"], 3);
}
#[tokio::test]
async fn log_invalid_since_returns_400() {
let app = test_router();
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/log?since=notanumber")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
}
}