Skip to main content

git_paw/broker/
server.rs

1//! Axum HTTP server for the broker.
2//!
3//! Defines the router and endpoint handlers for `/publish`, `/messages/:agent_id`,
4//! and `/status`. All handlers follow the lock discipline documented in
5//! [`super`] — no `RwLock` guard is held across an `.await` boundary.
6
7use std::sync::{Arc, OnceLock};
8
9use axum::Router;
10use axum::extract::{Path, Query, State};
11use axum::http::{HeaderMap, StatusCode};
12use axum::response::{IntoResponse, Response};
13use axum::routing::{get, post};
14use regex::Regex;
15use serde::{Deserialize, Serialize};
16
17use super::BrokerState;
18use super::delivery;
19use super::messages::BrokerMessage;
20use super::{WatchTarget, watcher};
21
22/// Compiled-once regex matching the only `agent_id` shapes the broker accepts:
23/// `"supervisor"`, or a `feat-{name}` / `feat/{name}` slug whose `{name}`
24/// begins with `[a-z0-9]` and consists of `[a-z0-9-]+`. See
25/// `supervisor-bugfixes-v0-5-x` §4 + `broker-messages` spec.
26fn agent_id_regex() -> &'static Regex {
27    static RE: OnceLock<Regex> = OnceLock::new();
28    RE.get_or_init(|| {
29        Regex::new(r"^(supervisor|feat/[a-z0-9][a-z0-9-]+|feat-[a-z0-9][a-z0-9-]+)$")
30            .expect("AGENT_ID_RE compiles")
31    })
32}
33
34/// Compiled-once regex matching unfilled placeholder strings — exact match
35/// `<anything>` from start to end.
36fn placeholder_regex() -> &'static Regex {
37    static RE: OnceLock<Regex> = OnceLock::new();
38    RE.get_or_init(|| Regex::new(r"^<.*>$").expect("PLACEHOLDER_RE compiles"))
39}
40
41/// Build the HTTP 400 response for an `agent_id` that did not match
42/// [`agent_id_regex`].
43fn agent_id_rejection(value: &str) -> Response {
44    (
45        StatusCode::BAD_REQUEST,
46        axum::Json(serde_json::json!({
47            "error": "invalid agent_id",
48            "value": value,
49            "detail": "agent_id must be 'supervisor' or match feat-{name} / feat/{name}",
50        })),
51    )
52        .into_response()
53}
54
55/// Build the HTTP 400 response for a payload string that looks like an
56/// unfilled placeholder (`<…>`).
57fn placeholder_rejection(field: &str, value: &str) -> Response {
58    (
59        StatusCode::BAD_REQUEST,
60        axum::Json(serde_json::json!({
61            "error": "field looks like an unfilled placeholder",
62            "field": field,
63            "value": value,
64            "detail": "substitute the real value before publishing",
65        })),
66    )
67        .into_response()
68}
69
70/// Returns `Some(response)` if any tracked payload string field of `msg`
71/// matches [`placeholder_regex`]; otherwise `None`.
72///
73/// Per `supervisor-bugfixes-v0-5-x` design D5, the placeholder check covers
74/// only the fields the supervisor skill's example curls populate:
75/// `payload.question`, `payload.needs`, and each string element of
76/// `payload.errors[]`. Other free-form string fields (`StatusPayload.message`,
77/// `VerifiedPayload.message`) are left alone — real human content sometimes
78/// uses angle brackets inline.
79fn check_placeholder_fields(msg: &BrokerMessage) -> Option<Response> {
80    let re = placeholder_regex();
81    match msg {
82        BrokerMessage::Question { payload, .. } => {
83            if re.is_match(&payload.question) {
84                return Some(placeholder_rejection("question", &payload.question));
85            }
86        }
87        BrokerMessage::Blocked { payload, .. } => {
88            if re.is_match(&payload.needs) {
89                return Some(placeholder_rejection("needs", &payload.needs));
90            }
91        }
92        BrokerMessage::Feedback { payload, .. } => {
93            for err in &payload.errors {
94                if re.is_match(err) {
95                    return Some(placeholder_rejection("errors", err));
96                }
97            }
98        }
99        BrokerMessage::Status { .. }
100        | BrokerMessage::Artifact { .. }
101        | BrokerMessage::Verified { .. }
102        | BrokerMessage::Intent { .. }
103        | BrokerMessage::AdvancedMain { .. }
104        | BrokerMessage::Learning { .. }
105        | BrokerMessage::VerifyNow { .. } => {}
106    }
107    None
108}
109
110/// Request body for the `POST /watch` endpoint — a live filesystem watch
111/// target to register on the running broker.
112#[derive(Deserialize)]
113struct WatchRequest {
114    /// Agent identifier (slugified branch name) that owns the worktree.
115    agent_id: String,
116    /// Absolute path to the worktree root to begin watching.
117    worktree_path: String,
118    /// CLI label running in the agent's pane (e.g. `"claude"`). Optional —
119    /// an absent or empty value leaves the roster's CLI column unseeded,
120    /// matching the start-time behaviour for a blank CLI.
121    #[serde(default)]
122    cli: String,
123}
124
125/// Query parameters for the `GET /messages/:agent_id` endpoint.
126#[derive(Deserialize)]
127struct PollQuery {
128    /// Return only messages with sequence number > `since`. Defaults to 0.
129    since: Option<String>,
130}
131
132/// Response body for the `GET /messages/:agent_id` endpoint.
133#[derive(Serialize)]
134struct PollResponse {
135    /// Messages newer than the requested cursor.
136    messages: Vec<BrokerMessage>,
137    /// Highest sequence number in the result (0 if empty).
138    last_seq: u64,
139}
140
141/// Response body for the `GET /log` endpoint.
142#[derive(Serialize)]
143struct LogResponse {
144    /// All messages with `seq > since`, in chronological order.
145    /// Each entry is `[seq, timestamp_unix_secs, message]`.
146    entries: Vec<LogEntry>,
147    /// Highest sequence number in the result (0 if empty).
148    last_seq: u64,
149}
150
151/// One entry in `GET /log`.
152#[derive(Serialize)]
153struct LogEntry {
154    /// Sequence number assigned at publish time.
155    seq: u64,
156    /// Wall-clock seconds since the Unix epoch when the message was published.
157    timestamp_unix_secs: u64,
158    /// The original broker message.
159    message: BrokerMessage,
160}
161
162/// Builds the axum [`Router`] with all broker endpoints.
163pub fn router(state: Arc<BrokerState>) -> Router {
164    Router::new()
165        .route("/publish", post(publish))
166        .route("/watch", post(watch))
167        .route("/messages/{agent_id}", get(messages))
168        .route("/status", get(status))
169        .route("/log", get(log))
170        .with_state(state)
171}
172
173/// `POST /publish` — accepts a JSON [`BrokerMessage`] and queues it for delivery.
174///
175/// - 415 if `Content-Type` is missing or not `application/json`
176/// - 400 if body is empty or fails validation
177/// - 202 on success
178async fn publish(
179    State(state): State<Arc<BrokerState>>,
180    headers: HeaderMap,
181    body: String,
182) -> Response {
183    // Check Content-Type
184    let content_type = headers
185        .get("content-type")
186        .and_then(|v| v.to_str().ok())
187        .unwrap_or("");
188
189    if !content_type.starts_with("application/json") {
190        return (
191            StatusCode::UNSUPPORTED_MEDIA_TYPE,
192            axum::Json(serde_json::json!({"error": "Content-Type must be application/json"})),
193        )
194            .into_response();
195    }
196
197    // Check for empty body
198    if body.is_empty() {
199        return (
200            StatusCode::BAD_REQUEST,
201            axum::Json(serde_json::json!({"error": "request body must not be empty"})),
202        )
203            .into_response();
204    }
205
206    // Parse and validate
207    match BrokerMessage::from_json(&body) {
208        Ok(msg) => {
209            // Validate the top-level agent_id against the broker regex.
210            // Phantom debris (`"a"`, `"<agent-id>"`, empty strings) is
211            // rejected at the API boundary so it cannot leak into
212            // `/status`.
213            if !agent_id_regex().is_match(msg.agent_id()) {
214                return agent_id_rejection(msg.agent_id());
215            }
216            // Reject obviously-unfilled placeholder strings in the few
217            // payload fields the supervisor skill's examples touch.
218            if let Some(rejection) = check_placeholder_fields(&msg) {
219                return rejection;
220            }
221            delivery::publish_message(&state, &msg);
222            StatusCode::ACCEPTED.into_response()
223        }
224        Err(e) => (
225            StatusCode::BAD_REQUEST,
226            axum::Json(serde_json::json!({"error": e.to_string()})),
227        )
228            .into_response(),
229    }
230}
231
232/// `POST /watch` — registers a live filesystem watch target on the running
233/// broker so the watcher begins surfacing the worktree's activity without a
234/// restart.
235///
236/// Bound to loopback only, on the same listener as `/publish` and `/status`.
237///
238/// - 415 if `Content-Type` is missing or not `application/json`
239/// - 400 if the body is empty, malformed, has an invalid `agent_id`, or an
240///   empty / placeholder `worktree_path`
241/// - 202 on success (including an idempotent re-registration, which records
242///   nothing new and spawns no watcher)
243async fn watch(
244    State(state): State<Arc<BrokerState>>,
245    headers: HeaderMap,
246    body: String,
247) -> Response {
248    let content_type = headers
249        .get("content-type")
250        .and_then(|v| v.to_str().ok())
251        .unwrap_or("");
252    if !content_type.starts_with("application/json") {
253        return (
254            StatusCode::UNSUPPORTED_MEDIA_TYPE,
255            axum::Json(serde_json::json!({"error": "Content-Type must be application/json"})),
256        )
257            .into_response();
258    }
259    if body.is_empty() {
260        return (
261            StatusCode::BAD_REQUEST,
262            axum::Json(serde_json::json!({"error": "request body must not be empty"})),
263        )
264            .into_response();
265    }
266
267    let req: WatchRequest = match serde_json::from_str(&body) {
268        Ok(r) => r,
269        Err(e) => {
270            return (
271                StatusCode::BAD_REQUEST,
272                axum::Json(serde_json::json!({"error": e.to_string()})),
273            )
274                .into_response();
275        }
276    };
277
278    // Validate agent_id against the same regex `/publish` enforces, so
279    // phantom debris cannot mint a watch target.
280    if !agent_id_regex().is_match(&req.agent_id) {
281        return agent_id_rejection(&req.agent_id);
282    }
283    // The worktree path must be present and not an unfilled placeholder.
284    if req.worktree_path.is_empty() {
285        return (
286            StatusCode::BAD_REQUEST,
287            axum::Json(serde_json::json!({"error": "worktree_path must not be empty"})),
288        )
289            .into_response();
290    }
291    if placeholder_regex().is_match(&req.worktree_path) {
292        return placeholder_rejection("worktree_path", &req.worktree_path);
293    }
294
295    let target = WatchTarget {
296        agent_id: req.agent_id,
297        cli: req.cli,
298        worktree_path: std::path::PathBuf::from(req.worktree_path),
299    };
300
301    // Record the target (idempotent). Only spawn a watcher for a freshly
302    // registered path, and only when the broker has a live shutdown signal to
303    // enroll it in (absent in router-only unit tests).
304    if state.register_watch_target(&target)
305        && let Some(rx) = state.watcher_shutdown_rx()
306    {
307        tokio::spawn(watcher::watch_worktree(Arc::clone(&state), target, rx));
308    }
309
310    StatusCode::ACCEPTED.into_response()
311}
312
313/// `GET /messages/:agent_id?since=N` — polls for messages destined to the given agent.
314///
315/// - 400 if `agent_id` contains invalid characters
316/// - 400 if `since` is present but not a valid `u64`
317/// - 200 with `{"messages": [...], "last_seq": N}` on success
318async fn messages(
319    State(state): State<Arc<BrokerState>>,
320    Path(agent_id): Path<String>,
321    Query(params): Query<PollQuery>,
322) -> Response {
323    // Validate agent_id: only lowercase alphanumeric, hyphens, underscores
324    if agent_id.is_empty()
325        || !agent_id
326            .chars()
327            .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_')
328    {
329        return (
330            StatusCode::BAD_REQUEST,
331            axum::Json(serde_json::json!({"error": "agent_id must match [a-z0-9-_]+"})),
332        )
333            .into_response();
334    }
335
336    let since = match params.since {
337        Some(s) => match s.parse::<u64>() {
338            Ok(n) => n,
339            Err(_) => {
340                return (
341                    StatusCode::BAD_REQUEST,
342                    axum::Json(serde_json::json!({"error": "since must be a valid u64"})),
343                )
344                    .into_response();
345            }
346        },
347        None => 0,
348    };
349
350    let (msgs, last_seq) = delivery::poll_messages(&state, &agent_id, since);
351    (
352        StatusCode::OK,
353        axum::Json(PollResponse {
354            messages: msgs,
355            last_seq,
356        }),
357    )
358        .into_response()
359}
360
361/// `GET /log?since=N` — returns the broker's full message log filtered to
362/// `seq > since`.
363///
364/// Used by `cmd_supervisor` to reconstruct broker state from outside the
365/// dashboard process so it can build the dependency graph for merge ordering
366/// and write a real session summary instead of an empty one.
367async fn log(State(state): State<Arc<BrokerState>>, Query(params): Query<PollQuery>) -> Response {
368    let since = match params.since {
369        Some(s) => match s.parse::<u64>() {
370            Ok(n) => n,
371            Err(_) => {
372                return (
373                    StatusCode::BAD_REQUEST,
374                    axum::Json(serde_json::json!({"error": "since must be a valid u64"})),
375                )
376                    .into_response();
377            }
378        },
379        None => 0,
380    };
381
382    let raw = delivery::full_log(&state, since);
383    let last_seq = raw.iter().map(|(s, _, _)| *s).max().unwrap_or(0);
384    let entries: Vec<LogEntry> = raw
385        .into_iter()
386        .map(|(seq, ts, message)| LogEntry {
387            seq,
388            timestamp_unix_secs: ts
389                .duration_since(std::time::UNIX_EPOCH)
390                .map_or(0, |d| d.as_secs()),
391            message,
392        })
393        .collect();
394
395    (
396        StatusCode::OK,
397        axum::Json(LogResponse { entries, last_seq }),
398    )
399        .into_response()
400}
401
402/// `GET /status` — returns broker health and agent summary.
403async fn status(State(state): State<Arc<BrokerState>>) -> Response {
404    let uptime = state.uptime_seconds();
405    let agents = delivery::agent_status_snapshot(&state);
406    (
407        StatusCode::OK,
408        axum::Json(serde_json::json!({
409            "git_paw": true,
410            "version": env!("CARGO_PKG_VERSION"),
411            "uptime_seconds": uptime,
412            "agents": agents,
413        })),
414    )
415        .into_response()
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use axum::body::Body;
422    use axum::http::Request;
423    use tower::ServiceExt;
424
425    fn test_router() -> Router {
426        router(Arc::new(BrokerState::new(None)))
427    }
428
429    #[tokio::test]
430    async fn publish_valid_message_returns_202() {
431        let app = test_router();
432        let resp = app
433            .oneshot(
434                Request::builder()
435                    .method("POST")
436                    .uri("/publish")
437                    .header("content-type", "application/json")
438                    .body(Body::from(
439                        r#"{"type":"agent.status","agent_id":"feat-xx","payload":{"status":"idle","modified_files":[]}}"#,
440                    ))
441                    .unwrap(),
442            )
443            .await
444            .unwrap();
445        assert_eq!(resp.status(), StatusCode::ACCEPTED);
446    }
447
448    #[tokio::test]
449    async fn publish_invalid_json_returns_400() {
450        let app = test_router();
451        let resp = app
452            .oneshot(
453                Request::builder()
454                    .method("POST")
455                    .uri("/publish")
456                    .header("content-type", "application/json")
457                    .body(Body::from("not json"))
458                    .unwrap(),
459            )
460            .await
461            .unwrap();
462        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
463    }
464
465    #[tokio::test]
466    async fn publish_empty_body_returns_400() {
467        let app = test_router();
468        let resp = app
469            .oneshot(
470                Request::builder()
471                    .method("POST")
472                    .uri("/publish")
473                    .header("content-type", "application/json")
474                    .body(Body::empty())
475                    .unwrap(),
476            )
477            .await
478            .unwrap();
479        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
480    }
481
482    #[tokio::test]
483    async fn publish_wrong_content_type_returns_415() {
484        let app = test_router();
485        let resp = app
486            .oneshot(
487                Request::builder()
488                    .method("POST")
489                    .uri("/publish")
490                    .header("content-type", "text/plain")
491                    .body(Body::from("{}"))
492                    .unwrap(),
493            )
494            .await
495            .unwrap();
496        assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
497    }
498
499    #[tokio::test]
500    async fn publish_missing_content_type_returns_415() {
501        let app = test_router();
502        let resp = app
503            .oneshot(
504                Request::builder()
505                    .method("POST")
506                    .uri("/publish")
507                    .body(Body::from("{}"))
508                    .unwrap(),
509            )
510            .await
511            .unwrap();
512        assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
513    }
514
515    #[tokio::test]
516    async fn publish_empty_agent_id_returns_400() {
517        let app = test_router();
518        let resp = app
519            .oneshot(
520                Request::builder()
521                    .method("POST")
522                    .uri("/publish")
523                    .header("content-type", "application/json")
524                    .body(Body::from(
525                        r#"{"type":"agent.status","agent_id":"","payload":{"status":"idle","modified_files":[]}}"#,
526                    ))
527                    .unwrap(),
528            )
529            .await
530            .unwrap();
531        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
532    }
533
534    // -----------------------------------------------------------------------
535    // supervisor-bugfixes-v0-5-x §4 — broker validates agent_id + payload
536    // placeholder syntax. The unit-test matrix below covers the spec scenarios
537    // for invalid + valid agent_ids and the placeholder rejection rules.
538    // -----------------------------------------------------------------------
539
540    /// Helper: POST a body to `/publish` and return (status, body-bytes).
541    async fn post_publish(body: &'static str) -> (StatusCode, axum::body::Bytes) {
542        let app = test_router();
543        let resp = app
544            .oneshot(
545                Request::builder()
546                    .method("POST")
547                    .uri("/publish")
548                    .header("content-type", "application/json")
549                    .body(Body::from(body))
550                    .unwrap(),
551            )
552            .await
553            .unwrap();
554        let status = resp.status();
555        let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
556            .await
557            .unwrap();
558        (status, bytes)
559    }
560
561    #[tokio::test]
562    async fn agent_id_rejects_single_letter() {
563        let (status, body) = post_publish(
564            r#"{"type":"agent.status","agent_id":"a","payload":{"status":"working","modified_files":[]}}"#,
565        )
566        .await;
567        assert_eq!(status, StatusCode::BAD_REQUEST);
568        let text = String::from_utf8_lossy(&body);
569        assert!(
570            text.contains("invalid agent_id"),
571            "body should mention 'invalid agent_id'; got: {text}"
572        );
573    }
574
575    #[tokio::test]
576    async fn agent_id_rejects_placeholder() {
577        let (status, body) = post_publish(
578            r#"{"type":"agent.status","agent_id":"<agent-id>","payload":{"status":"working","modified_files":[]}}"#,
579        )
580        .await;
581        assert_eq!(status, StatusCode::BAD_REQUEST);
582        let text = String::from_utf8_lossy(&body);
583        assert!(text.contains("invalid agent_id"), "body: {text}");
584    }
585
586    #[tokio::test]
587    async fn agent_id_rejects_empty() {
588        let (status, body) = post_publish(
589            r#"{"type":"agent.status","agent_id":"","payload":{"status":"working","modified_files":[]}}"#,
590        )
591        .await;
592        assert_eq!(status, StatusCode::BAD_REQUEST);
593        // The empty-string case is caught either by from_json's
594        // EmptyAgentId validation or by the regex — both surface as 400.
595        let _ = body;
596    }
597
598    #[tokio::test]
599    async fn agent_id_accepts_supervisor() {
600        let (status, _) = post_publish(
601            r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[]}}"#,
602        )
603        .await;
604        assert!(
605            status == StatusCode::ACCEPTED || status == StatusCode::OK,
606            "supervisor should be accepted; got: {status}"
607        );
608    }
609
610    #[tokio::test]
611    async fn agent_id_accepts_feat_dash() {
612        let (status, _) = post_publish(
613            r#"{"type":"agent.status","agent_id":"feat-test-branch","payload":{"status":"working","modified_files":[]}}"#,
614        )
615        .await;
616        assert!(
617            status == StatusCode::ACCEPTED || status == StatusCode::OK,
618            "feat-test-branch should be accepted; got: {status}"
619        );
620    }
621
622    #[tokio::test]
623    async fn agent_id_accepts_feat_slash() {
624        let (status, _) = post_publish(
625            r#"{"type":"agent.status","agent_id":"feat/test-branch","payload":{"status":"working","modified_files":[]}}"#,
626        )
627        .await;
628        assert!(
629            status == StatusCode::ACCEPTED || status == StatusCode::OK,
630            "feat/test-branch should be accepted; got: {status}"
631        );
632    }
633
634    #[tokio::test]
635    async fn payload_question_rejects_placeholder() {
636        let (status, body) = post_publish(
637            r#"{"type":"agent.question","agent_id":"feat-test-branch","payload":{"question":"<your specific question>"}}"#,
638        )
639        .await;
640        assert_eq!(status, StatusCode::BAD_REQUEST);
641        let text = String::from_utf8_lossy(&body);
642        assert!(
643            text.contains("placeholder") && text.contains("question"),
644            "body should mention both 'placeholder' and 'question'; got: {text}"
645        );
646    }
647
648    #[tokio::test]
649    async fn payload_question_accepts_real_content() {
650        let (status, _) = post_publish(
651            r#"{"type":"agent.question","agent_id":"feat-test-branch","payload":{"question":"Should we use bcrypt or argon2?"}}"#,
652        )
653        .await;
654        assert!(
655            status == StatusCode::ACCEPTED || status == StatusCode::OK,
656            "real human content should be accepted; got: {status}"
657        );
658    }
659
660    #[tokio::test]
661    async fn payload_blocked_rejects_placeholder_needs() {
662        let (status, body) = post_publish(
663            r#"{"type":"agent.blocked","agent_id":"feat-test-branch","payload":{"needs":"<what>","from":"feat-other"}}"#,
664        )
665        .await;
666        assert_eq!(status, StatusCode::BAD_REQUEST);
667        let text = String::from_utf8_lossy(&body);
668        assert!(
669            text.contains("placeholder") && text.contains("needs"),
670            "body: {text}"
671        );
672    }
673
674    #[tokio::test]
675    async fn payload_feedback_rejects_placeholder_error_entry() {
676        let (status, body) = post_publish(
677            r#"{"type":"agent.feedback","agent_id":"feat-test-branch","payload":{"from":"supervisor","errors":["<error 1>"]}}"#,
678        )
679        .await;
680        assert_eq!(status, StatusCode::BAD_REQUEST);
681        let text = String::from_utf8_lossy(&body);
682        assert!(
683            text.contains("placeholder") && text.contains("errors"),
684            "body: {text}"
685        );
686    }
687
688    // === agent.advanced-main routing + validation (advanced-main-event §3) ===
689
690    #[tokio::test]
691    async fn advanced_main_accepted_through_publish_endpoint() {
692        // No new endpoint: the variant flows through the existing /publish.
693        let (status, _) = post_publish(
694            r#"{"type":"agent.advanced-main","from":"supervisor","merged_branch":"feat/auth","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z","summary":"landed auth"}"#,
695        )
696        .await;
697        assert_eq!(
698            status,
699            StatusCode::ACCEPTED,
700            "a well-formed advanced-main must be accepted (202)"
701        );
702    }
703
704    #[tokio::test]
705    async fn advanced_main_missing_field_returns_400_naming_field() {
706        let (status, body) = post_publish(
707            r#"{"type":"agent.advanced-main","from":"supervisor","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z"}"#,
708        )
709        .await;
710        assert_eq!(status, StatusCode::BAD_REQUEST);
711        let text = String::from_utf8_lossy(&body);
712        assert!(
713            text.contains("merged_branch"),
714            "the 400 must name the missing field; got: {text}"
715        );
716    }
717
718    #[tokio::test]
719    async fn advanced_main_routes_to_every_registered_agent() {
720        // After two agents register, a supervisor-published advance lands in
721        // both their inboxes within one poll.
722        let state = Arc::new(BrokerState::new(None));
723        publish_json(
724            &state,
725            r#"{"type":"agent.status","agent_id":"feat-alpha","payload":{"status":"working","modified_files":[]}}"#,
726        )
727        .await;
728        publish_json(
729            &state,
730            r#"{"type":"agent.status","agent_id":"feat-beta","payload":{"status":"working","modified_files":[]}}"#,
731        )
732        .await;
733        publish_json(
734            &state,
735            r#"{"type":"agent.advanced-main","from":"supervisor","merged_branch":"feat/alpha","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z"}"#,
736        )
737        .await;
738
739        for agent in ["feat-alpha", "feat-beta"] {
740            let (msgs, _) = delivery::poll_messages(&state, agent, 0);
741            assert!(
742                msgs.iter()
743                    .any(|m| matches!(m, BrokerMessage::AdvancedMain { .. })),
744                "{agent} inbox must surface the advanced-main event"
745            );
746        }
747    }
748
749    #[tokio::test]
750    async fn messages_valid_agent_returns_200_with_last_seq() {
751        let app = test_router();
752        let resp = app
753            .oneshot(
754                Request::builder()
755                    .method("GET")
756                    .uri("/messages/feat-x")
757                    .body(Body::empty())
758                    .unwrap(),
759            )
760            .await
761            .unwrap();
762        assert_eq!(resp.status(), StatusCode::OK);
763        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
764            .await
765            .unwrap();
766        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
767        assert_eq!(json["messages"], serde_json::json!([]));
768        assert_eq!(json["last_seq"], serde_json::json!(0));
769    }
770
771    #[tokio::test]
772    async fn messages_invalid_agent_returns_400() {
773        let app = test_router();
774        let resp = app
775            .oneshot(
776                Request::builder()
777                    .method("GET")
778                    .uri("/messages/INVALID!")
779                    .body(Body::empty())
780                    .unwrap(),
781            )
782            .await
783            .unwrap();
784        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
785    }
786
787    #[tokio::test]
788    async fn messages_invalid_since_returns_400() {
789        let app = test_router();
790        let resp = app
791            .oneshot(
792                Request::builder()
793                    .method("GET")
794                    .uri("/messages/feat-x?since=abc")
795                    .body(Body::empty())
796                    .unwrap(),
797            )
798            .await
799            .unwrap();
800        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
801    }
802
803    #[tokio::test]
804    async fn status_returns_marker_and_version() {
805        let app = test_router();
806        let resp = app
807            .oneshot(
808                Request::builder()
809                    .method("GET")
810                    .uri("/status")
811                    .body(Body::empty())
812                    .unwrap(),
813            )
814            .await
815            .unwrap();
816        assert_eq!(resp.status(), StatusCode::OK);
817        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
818            .await
819            .unwrap();
820        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
821        assert_eq!(json["git_paw"], true);
822        assert!(json["version"].is_string());
823        assert!(json["uptime_seconds"].is_number());
824        assert_eq!(json["agents"], serde_json::json!([]));
825    }
826
827    /// POSTs a JSON body to `/publish` against a router built from `state`.
828    async fn publish_json(state: &Arc<BrokerState>, body: &'static str) {
829        let resp = router(Arc::clone(state))
830            .oneshot(
831                Request::builder()
832                    .method("POST")
833                    .uri("/publish")
834                    .header("content-type", "application/json")
835                    .body(Body::from(body))
836                    .unwrap(),
837            )
838            .await
839            .unwrap();
840        assert_eq!(resp.status(), StatusCode::ACCEPTED);
841    }
842
843    /// GETs `/status` against a router built from `state` and returns the
844    /// parsed JSON body.
845    async fn get_status(state: &Arc<BrokerState>) -> serde_json::Value {
846        let resp = router(Arc::clone(state))
847            .oneshot(
848                Request::builder()
849                    .method("GET")
850                    .uri("/status")
851                    .body(Body::empty())
852                    .unwrap(),
853            )
854            .await
855            .unwrap();
856        assert_eq!(resp.status(), StatusCode::OK);
857        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
858            .await
859            .unwrap();
860        serde_json::from_slice(&body).unwrap()
861    }
862
863    #[tokio::test]
864    async fn e2e_feedback_from_human_creates_no_phantom_roster_row() {
865        // W15-16 end-to-end: two real agents register via `agent.status`,
866        // then a `agent.feedback` with `from:"human"` is published. The
867        // `/status` roster must hold exactly the two real agents — no
868        // phantom `"human"` row.
869        let state = Arc::new(BrokerState::new(None));
870        publish_json(
871            &state,
872            r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
873        )
874        .await;
875        publish_json(
876            &state,
877            r#"{"type":"agent.status","agent_id":"feat-roster","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
878        )
879        .await;
880        publish_json(
881            &state,
882            r#"{"type":"agent.feedback","agent_id":"feat-roster","payload":{"from":"human","errors":["fix the flaky test"]}}"#,
883        )
884        .await;
885
886        let json = get_status(&state).await;
887        let agents = json["agents"].as_array().expect("agents array");
888        let ids: Vec<&str> = agents
889            .iter()
890            .map(|a| a["agent_id"].as_str().unwrap())
891            .collect();
892        assert!(
893            !ids.contains(&"human"),
894            "a feedback `from:human` must not mint a phantom roster row; got {ids:?}",
895        );
896        assert_eq!(ids.len(), 2, "roster holds exactly the two real agents");
897    }
898
899    #[tokio::test]
900    async fn e2e_status_shows_cli_for_every_agent() {
901        // W15-15 end-to-end: every agent that publishes `agent.status` with a
902        // `cli` shows that CLI in the `/status` roster — not just the
903        // supervisor.
904        let state = Arc::new(BrokerState::new(None));
905        publish_json(
906            &state,
907            r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
908        )
909        .await;
910        publish_json(
911            &state,
912            r#"{"type":"agent.status","agent_id":"feat-build","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
913        )
914        .await;
915
916        let json = get_status(&state).await;
917        let agents = json["agents"].as_array().expect("agents array");
918        assert_eq!(agents.len(), 2);
919        for a in agents {
920            assert_eq!(
921                a["cli"].as_str(),
922                Some("claude-oss"),
923                "every agent row must carry its cli: {a}",
924            );
925        }
926    }
927
928    // -----------------------------------------------------------------------
929    // broker-live-watch-registration — POST /watch
930    // -----------------------------------------------------------------------
931
932    /// Minimal git repo for the watch integration test.
933    fn init_test_repo_server(dir: &std::path::Path) {
934        use std::process::Command;
935        let run = |args: &[&str]| {
936            Command::new("git")
937                .args(args)
938                .current_dir(dir)
939                .output()
940                .expect("git command failed");
941        };
942        run(&["init", "-q", "-b", "main"]);
943        run(&["config", "user.email", "test@example.com"]);
944        run(&["config", "user.name", "test"]);
945        run(&["commit", "--allow-empty", "-m", "root", "-q"]);
946    }
947
948    /// POSTs a body to `/watch` against a router built from `state`.
949    async fn post_watch(state: &Arc<BrokerState>, body: String) -> StatusCode {
950        router(Arc::clone(state))
951            .oneshot(
952                Request::builder()
953                    .method("POST")
954                    .uri("/watch")
955                    .header("content-type", "application/json")
956                    .body(Body::from(body))
957                    .unwrap(),
958            )
959            .await
960            .unwrap()
961            .status()
962    }
963
964    /// Spec scenario: registering a target surfaces the worktree via the
965    /// watcher without a broker restart (tasks 2.3 + 3.3 at the broker layer).
966    #[tokio::test]
967    async fn watch_registers_target_and_surfaces_worktree_in_status() {
968        use super::super::watcher::POLL_INTERVAL;
969        let tmp = tempfile::tempdir().unwrap();
970        init_test_repo_server(tmp.path());
971
972        let state = Arc::new(BrokerState::new(None));
973        // Wire the shared watcher shutdown signal the way start_broker_with
974        // does, so the handler enrolls the spawned watcher in it.
975        let (tx, rx) = tokio::sync::watch::channel(false);
976        state.set_watcher_shutdown_rx(rx);
977
978        let body = format!(
979            r#"{{"agent_id":"feat-hot","worktree_path":"{}","cli":"claude"}}"#,
980            tmp.path().display()
981        );
982        assert_eq!(post_watch(&state, body).await, StatusCode::ACCEPTED);
983
984        // Dirty the worktree so the watcher has activity to surface.
985        std::fs::write(tmp.path().join("hot.rs"), "fn hot() {}").unwrap();
986
987        let mut found = false;
988        for _ in 0..20 {
989            tokio::time::sleep(POLL_INTERVAL / 2).await;
990            let json = get_status(&state).await;
991            if let Some(agents) = json["agents"].as_array()
992                && agents.iter().any(|a| a["agent_id"] == "feat-hot")
993            {
994                found = true;
995                break;
996            }
997        }
998        assert!(
999            found,
1000            "a registered worktree must surface its agent in /status from activity"
1001        );
1002
1003        let _ = tx.send(true);
1004    }
1005
1006    /// Spec scenario: registration is idempotent — a duplicate POST still
1007    /// succeeds and records no second target.
1008    #[tokio::test]
1009    async fn watch_duplicate_registration_is_a_noop_success() {
1010        let state = Arc::new(BrokerState::new(None));
1011        let body = r#"{"agent_id":"feat-hot","worktree_path":"/tmp/feat-hot","cli":"claude"}"#;
1012        assert_eq!(
1013            post_watch(&state, body.to_string()).await,
1014            StatusCode::ACCEPTED
1015        );
1016        assert_eq!(
1017            post_watch(&state, body.to_string()).await,
1018            StatusCode::ACCEPTED
1019        );
1020        assert_eq!(
1021            state.read().watched_paths.len(),
1022            1,
1023            "duplicate registration must not record a second target"
1024        );
1025    }
1026
1027    #[tokio::test]
1028    async fn watch_rejects_invalid_agent_id() {
1029        let state = Arc::new(BrokerState::new(None));
1030        let body = r#"{"agent_id":"a","worktree_path":"/tmp/x","cli":"claude"}"#;
1031        assert_eq!(
1032            post_watch(&state, body.to_string()).await,
1033            StatusCode::BAD_REQUEST
1034        );
1035    }
1036
1037    #[tokio::test]
1038    async fn watch_rejects_placeholder_worktree_path() {
1039        let state = Arc::new(BrokerState::new(None));
1040        let body = r#"{"agent_id":"feat-hot","worktree_path":"<path>","cli":"claude"}"#;
1041        assert_eq!(
1042            post_watch(&state, body.to_string()).await,
1043            StatusCode::BAD_REQUEST
1044        );
1045    }
1046
1047    #[tokio::test]
1048    async fn watch_rejects_empty_worktree_path() {
1049        let state = Arc::new(BrokerState::new(None));
1050        let body = r#"{"agent_id":"feat-hot","worktree_path":"","cli":"claude"}"#;
1051        assert_eq!(
1052            post_watch(&state, body.to_string()).await,
1053            StatusCode::BAD_REQUEST
1054        );
1055    }
1056
1057    #[tokio::test]
1058    async fn watch_wrong_content_type_returns_415() {
1059        let app = test_router();
1060        let resp = app
1061            .oneshot(
1062                Request::builder()
1063                    .method("POST")
1064                    .uri("/watch")
1065                    .header("content-type", "text/plain")
1066                    .body(Body::from("{}"))
1067                    .unwrap(),
1068            )
1069            .await
1070            .unwrap();
1071        assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
1072    }
1073
1074    #[tokio::test]
1075    async fn unknown_route_returns_404() {
1076        let app = test_router();
1077        let resp = app
1078            .oneshot(
1079                Request::builder()
1080                    .method("GET")
1081                    .uri("/unknown/route")
1082                    .body(Body::empty())
1083                    .unwrap(),
1084            )
1085            .await
1086            .unwrap();
1087        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1088    }
1089
1090    #[tokio::test]
1091    async fn wrong_method_returns_405() {
1092        let app = test_router();
1093        let resp = app
1094            .oneshot(
1095                Request::builder()
1096                    .method("GET")
1097                    .uri("/publish")
1098                    .body(Body::empty())
1099                    .unwrap(),
1100            )
1101            .await
1102            .unwrap();
1103        assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
1104    }
1105
1106    #[tokio::test]
1107    async fn panic_in_handler_is_isolated() {
1108        // Verify that a panicking handler does not take down the server.
1109        let app = Router::new()
1110            .route(
1111                "/panic",
1112                get(|| async {
1113                    panic!("deliberate test panic");
1114                    #[allow(unreachable_code)]
1115                    StatusCode::OK.into_response()
1116                }),
1117            )
1118            .route("/status", get(status))
1119            .with_state(Arc::new(BrokerState::new(None)));
1120
1121        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1122        let addr = listener.local_addr().unwrap();
1123
1124        let server = tokio::spawn(async move {
1125            axum::serve(listener, app).await.ok();
1126        });
1127
1128        let client =
1129            hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
1130                .build_http();
1131
1132        // Request to /panic — should not crash the server.
1133        let _panic_resp = client
1134            .request(
1135                Request::builder()
1136                    .method("GET")
1137                    .uri(format!("http://{addr}/panic"))
1138                    .body(axum::body::Body::empty())
1139                    .unwrap(),
1140            )
1141            .await;
1142
1143        // The panicking connection may return an error or a 500.
1144        // Either is acceptable — the key test is that /status still works after.
1145
1146        let status_resp = client
1147            .request(
1148                Request::builder()
1149                    .method("GET")
1150                    .uri(format!("http://{addr}/status"))
1151                    .body(axum::body::Body::empty())
1152                    .unwrap(),
1153            )
1154            .await
1155            .expect("server should still be alive after a panic in another handler");
1156
1157        assert_eq!(status_resp.status(), StatusCode::OK);
1158
1159        server.abort();
1160    }
1161
1162    #[tokio::test]
1163    async fn log_returns_full_message_log_in_chronological_order() {
1164        let state = Arc::new(BrokerState::new(None));
1165        // Seed the broker with three published messages.
1166        for (agent, status_label) in [
1167            ("feat-a", "working"),
1168            ("feat-b", "blocked"),
1169            ("feat-c", "done"),
1170        ] {
1171            let msg = BrokerMessage::Status {
1172                agent_id: agent.to_string(),
1173                payload: super::super::messages::StatusPayload {
1174                    status: status_label.to_string(),
1175                    modified_files: vec![],
1176                    message: None,
1177                    ..Default::default()
1178                },
1179            };
1180            delivery::publish_message(&state, &msg);
1181        }
1182
1183        let app = router(state);
1184        let resp = app
1185            .oneshot(
1186                Request::builder()
1187                    .method("GET")
1188                    .uri("/log")
1189                    .body(Body::empty())
1190                    .unwrap(),
1191            )
1192            .await
1193            .unwrap();
1194        assert_eq!(resp.status(), StatusCode::OK);
1195
1196        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1197            .await
1198            .unwrap();
1199        let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
1200        let entries = parsed["entries"].as_array().expect("entries array");
1201        assert_eq!(entries.len(), 3, "all three messages must appear in /log");
1202        // Chronological order: feat-a first, feat-c last.
1203        assert_eq!(entries[0]["message"]["agent_id"], "feat-a");
1204        assert_eq!(entries[2]["message"]["agent_id"], "feat-c");
1205        assert_eq!(parsed["last_seq"], 3);
1206    }
1207
1208    #[tokio::test]
1209    async fn log_with_since_filters_older_entries() {
1210        let state = Arc::new(BrokerState::new(None));
1211        for agent in ["feat-a", "feat-b", "feat-c"] {
1212            let msg = BrokerMessage::Status {
1213                agent_id: agent.to_string(),
1214                payload: super::super::messages::StatusPayload {
1215                    status: "working".to_string(),
1216                    modified_files: vec![],
1217                    message: None,
1218                    ..Default::default()
1219                },
1220            };
1221            delivery::publish_message(&state, &msg);
1222        }
1223
1224        let app = router(state);
1225        let resp = app
1226            .oneshot(
1227                Request::builder()
1228                    .method("GET")
1229                    .uri("/log?since=2")
1230                    .body(Body::empty())
1231                    .unwrap(),
1232            )
1233            .await
1234            .unwrap();
1235        let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1236            .await
1237            .unwrap();
1238        let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
1239        let entries = parsed["entries"].as_array().unwrap();
1240        assert_eq!(
1241            entries.len(),
1242            1,
1243            "since=2 must yield only the message at seq=3"
1244        );
1245        assert_eq!(entries[0]["seq"], 3);
1246    }
1247
1248    #[tokio::test]
1249    async fn log_invalid_since_returns_400() {
1250        let app = test_router();
1251        let resp = app
1252            .oneshot(
1253                Request::builder()
1254                    .method("GET")
1255                    .uri("/log?since=notanumber")
1256                    .body(Body::empty())
1257                    .unwrap(),
1258            )
1259            .await
1260            .unwrap();
1261        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1262    }
1263}