1use std::sync::{Arc, OnceLock};
8
9use axum::Router;
10use axum::extract::{Path, Query, State};
11use axum::http::{HeaderMap, StatusCode};
12use axum::response::{IntoResponse, Response};
13use axum::routing::{get, post};
14use regex::Regex;
15use serde::{Deserialize, Serialize};
16
17use super::BrokerState;
18use super::delivery;
19use super::messages::BrokerMessage;
20use super::{WatchTarget, watcher};
21
22fn agent_id_regex() -> &'static Regex {
27 static RE: OnceLock<Regex> = OnceLock::new();
28 RE.get_or_init(|| {
29 Regex::new(r"^(supervisor|feat/[a-z0-9][a-z0-9-]+|feat-[a-z0-9][a-z0-9-]+)$")
30 .expect("AGENT_ID_RE compiles")
31 })
32}
33
34fn placeholder_regex() -> &'static Regex {
37 static RE: OnceLock<Regex> = OnceLock::new();
38 RE.get_or_init(|| Regex::new(r"^<.*>$").expect("PLACEHOLDER_RE compiles"))
39}
40
41fn agent_id_rejection(value: &str) -> Response {
44 (
45 StatusCode::BAD_REQUEST,
46 axum::Json(serde_json::json!({
47 "error": "invalid agent_id",
48 "value": value,
49 "detail": "agent_id must be 'supervisor' or match feat-{name} / feat/{name}",
50 })),
51 )
52 .into_response()
53}
54
55fn placeholder_rejection(field: &str, value: &str) -> Response {
58 (
59 StatusCode::BAD_REQUEST,
60 axum::Json(serde_json::json!({
61 "error": "field looks like an unfilled placeholder",
62 "field": field,
63 "value": value,
64 "detail": "substitute the real value before publishing",
65 })),
66 )
67 .into_response()
68}
69
70fn check_placeholder_fields(msg: &BrokerMessage) -> Option<Response> {
80 let re = placeholder_regex();
81 match msg {
82 BrokerMessage::Question { payload, .. } => {
83 if re.is_match(&payload.question) {
84 return Some(placeholder_rejection("question", &payload.question));
85 }
86 }
87 BrokerMessage::Blocked { payload, .. } => {
88 if re.is_match(&payload.needs) {
89 return Some(placeholder_rejection("needs", &payload.needs));
90 }
91 }
92 BrokerMessage::Feedback { payload, .. } => {
93 for err in &payload.errors {
94 if re.is_match(err) {
95 return Some(placeholder_rejection("errors", err));
96 }
97 }
98 }
99 BrokerMessage::Status { .. }
100 | BrokerMessage::Artifact { .. }
101 | BrokerMessage::Verified { .. }
102 | BrokerMessage::Intent { .. }
103 | BrokerMessage::AdvancedMain { .. }
104 | BrokerMessage::Learning { .. }
105 | BrokerMessage::VerifyNow { .. } => {}
106 }
107 None
108}
109
110#[derive(Deserialize)]
113struct WatchRequest {
114 agent_id: String,
116 worktree_path: String,
118 #[serde(default)]
122 cli: String,
123}
124
125#[derive(Deserialize)]
127struct PollQuery {
128 since: Option<String>,
130}
131
132#[derive(Serialize)]
134struct PollResponse {
135 messages: Vec<BrokerMessage>,
137 last_seq: u64,
139}
140
141#[derive(Serialize)]
143struct LogResponse {
144 entries: Vec<LogEntry>,
147 last_seq: u64,
149}
150
151#[derive(Serialize)]
153struct LogEntry {
154 seq: u64,
156 timestamp_unix_secs: u64,
158 message: BrokerMessage,
160}
161
162pub fn router(state: Arc<BrokerState>) -> Router {
164 Router::new()
165 .route("/publish", post(publish))
166 .route("/watch", post(watch))
167 .route("/messages/{agent_id}", get(messages))
168 .route("/status", get(status))
169 .route("/log", get(log))
170 .with_state(state)
171}
172
173async fn publish(
179 State(state): State<Arc<BrokerState>>,
180 headers: HeaderMap,
181 body: String,
182) -> Response {
183 let content_type = headers
185 .get("content-type")
186 .and_then(|v| v.to_str().ok())
187 .unwrap_or("");
188
189 if !content_type.starts_with("application/json") {
190 return (
191 StatusCode::UNSUPPORTED_MEDIA_TYPE,
192 axum::Json(serde_json::json!({"error": "Content-Type must be application/json"})),
193 )
194 .into_response();
195 }
196
197 if body.is_empty() {
199 return (
200 StatusCode::BAD_REQUEST,
201 axum::Json(serde_json::json!({"error": "request body must not be empty"})),
202 )
203 .into_response();
204 }
205
206 match BrokerMessage::from_json(&body) {
208 Ok(msg) => {
209 if !agent_id_regex().is_match(msg.agent_id()) {
214 return agent_id_rejection(msg.agent_id());
215 }
216 if let Some(rejection) = check_placeholder_fields(&msg) {
219 return rejection;
220 }
221 delivery::publish_message(&state, &msg);
222 StatusCode::ACCEPTED.into_response()
223 }
224 Err(e) => (
225 StatusCode::BAD_REQUEST,
226 axum::Json(serde_json::json!({"error": e.to_string()})),
227 )
228 .into_response(),
229 }
230}
231
232async fn watch(
244 State(state): State<Arc<BrokerState>>,
245 headers: HeaderMap,
246 body: String,
247) -> Response {
248 let content_type = headers
249 .get("content-type")
250 .and_then(|v| v.to_str().ok())
251 .unwrap_or("");
252 if !content_type.starts_with("application/json") {
253 return (
254 StatusCode::UNSUPPORTED_MEDIA_TYPE,
255 axum::Json(serde_json::json!({"error": "Content-Type must be application/json"})),
256 )
257 .into_response();
258 }
259 if body.is_empty() {
260 return (
261 StatusCode::BAD_REQUEST,
262 axum::Json(serde_json::json!({"error": "request body must not be empty"})),
263 )
264 .into_response();
265 }
266
267 let req: WatchRequest = match serde_json::from_str(&body) {
268 Ok(r) => r,
269 Err(e) => {
270 return (
271 StatusCode::BAD_REQUEST,
272 axum::Json(serde_json::json!({"error": e.to_string()})),
273 )
274 .into_response();
275 }
276 };
277
278 if !agent_id_regex().is_match(&req.agent_id) {
281 return agent_id_rejection(&req.agent_id);
282 }
283 if req.worktree_path.is_empty() {
285 return (
286 StatusCode::BAD_REQUEST,
287 axum::Json(serde_json::json!({"error": "worktree_path must not be empty"})),
288 )
289 .into_response();
290 }
291 if placeholder_regex().is_match(&req.worktree_path) {
292 return placeholder_rejection("worktree_path", &req.worktree_path);
293 }
294
295 let target = WatchTarget {
296 agent_id: req.agent_id,
297 cli: req.cli,
298 worktree_path: std::path::PathBuf::from(req.worktree_path),
299 };
300
301 if state.register_watch_target(&target)
305 && let Some(rx) = state.watcher_shutdown_rx()
306 {
307 tokio::spawn(watcher::watch_worktree(Arc::clone(&state), target, rx));
308 }
309
310 StatusCode::ACCEPTED.into_response()
311}
312
313async fn messages(
319 State(state): State<Arc<BrokerState>>,
320 Path(agent_id): Path<String>,
321 Query(params): Query<PollQuery>,
322) -> Response {
323 if agent_id.is_empty()
325 || !agent_id
326 .chars()
327 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '_')
328 {
329 return (
330 StatusCode::BAD_REQUEST,
331 axum::Json(serde_json::json!({"error": "agent_id must match [a-z0-9-_]+"})),
332 )
333 .into_response();
334 }
335
336 let since = match params.since {
337 Some(s) => match s.parse::<u64>() {
338 Ok(n) => n,
339 Err(_) => {
340 return (
341 StatusCode::BAD_REQUEST,
342 axum::Json(serde_json::json!({"error": "since must be a valid u64"})),
343 )
344 .into_response();
345 }
346 },
347 None => 0,
348 };
349
350 let (msgs, last_seq) = delivery::poll_messages(&state, &agent_id, since);
351 (
352 StatusCode::OK,
353 axum::Json(PollResponse {
354 messages: msgs,
355 last_seq,
356 }),
357 )
358 .into_response()
359}
360
361async fn log(State(state): State<Arc<BrokerState>>, Query(params): Query<PollQuery>) -> Response {
368 let since = match params.since {
369 Some(s) => match s.parse::<u64>() {
370 Ok(n) => n,
371 Err(_) => {
372 return (
373 StatusCode::BAD_REQUEST,
374 axum::Json(serde_json::json!({"error": "since must be a valid u64"})),
375 )
376 .into_response();
377 }
378 },
379 None => 0,
380 };
381
382 let raw = delivery::full_log(&state, since);
383 let last_seq = raw.iter().map(|(s, _, _)| *s).max().unwrap_or(0);
384 let entries: Vec<LogEntry> = raw
385 .into_iter()
386 .map(|(seq, ts, message)| LogEntry {
387 seq,
388 timestamp_unix_secs: ts
389 .duration_since(std::time::UNIX_EPOCH)
390 .map_or(0, |d| d.as_secs()),
391 message,
392 })
393 .collect();
394
395 (
396 StatusCode::OK,
397 axum::Json(LogResponse { entries, last_seq }),
398 )
399 .into_response()
400}
401
402async fn status(State(state): State<Arc<BrokerState>>) -> Response {
404 let uptime = state.uptime_seconds();
405 let agents = delivery::agent_status_snapshot(&state);
406 (
407 StatusCode::OK,
408 axum::Json(serde_json::json!({
409 "git_paw": true,
410 "version": env!("CARGO_PKG_VERSION"),
411 "uptime_seconds": uptime,
412 "agents": agents,
413 })),
414 )
415 .into_response()
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use axum::body::Body;
422 use axum::http::Request;
423 use tower::ServiceExt;
424
425 fn test_router() -> Router {
426 router(Arc::new(BrokerState::new(None)))
427 }
428
429 #[tokio::test]
430 async fn publish_valid_message_returns_202() {
431 let app = test_router();
432 let resp = app
433 .oneshot(
434 Request::builder()
435 .method("POST")
436 .uri("/publish")
437 .header("content-type", "application/json")
438 .body(Body::from(
439 r#"{"type":"agent.status","agent_id":"feat-xx","payload":{"status":"idle","modified_files":[]}}"#,
440 ))
441 .unwrap(),
442 )
443 .await
444 .unwrap();
445 assert_eq!(resp.status(), StatusCode::ACCEPTED);
446 }
447
448 #[tokio::test]
449 async fn publish_invalid_json_returns_400() {
450 let app = test_router();
451 let resp = app
452 .oneshot(
453 Request::builder()
454 .method("POST")
455 .uri("/publish")
456 .header("content-type", "application/json")
457 .body(Body::from("not json"))
458 .unwrap(),
459 )
460 .await
461 .unwrap();
462 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
463 }
464
465 #[tokio::test]
466 async fn publish_empty_body_returns_400() {
467 let app = test_router();
468 let resp = app
469 .oneshot(
470 Request::builder()
471 .method("POST")
472 .uri("/publish")
473 .header("content-type", "application/json")
474 .body(Body::empty())
475 .unwrap(),
476 )
477 .await
478 .unwrap();
479 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
480 }
481
482 #[tokio::test]
483 async fn publish_wrong_content_type_returns_415() {
484 let app = test_router();
485 let resp = app
486 .oneshot(
487 Request::builder()
488 .method("POST")
489 .uri("/publish")
490 .header("content-type", "text/plain")
491 .body(Body::from("{}"))
492 .unwrap(),
493 )
494 .await
495 .unwrap();
496 assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
497 }
498
499 #[tokio::test]
500 async fn publish_missing_content_type_returns_415() {
501 let app = test_router();
502 let resp = app
503 .oneshot(
504 Request::builder()
505 .method("POST")
506 .uri("/publish")
507 .body(Body::from("{}"))
508 .unwrap(),
509 )
510 .await
511 .unwrap();
512 assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
513 }
514
515 #[tokio::test]
516 async fn publish_empty_agent_id_returns_400() {
517 let app = test_router();
518 let resp = app
519 .oneshot(
520 Request::builder()
521 .method("POST")
522 .uri("/publish")
523 .header("content-type", "application/json")
524 .body(Body::from(
525 r#"{"type":"agent.status","agent_id":"","payload":{"status":"idle","modified_files":[]}}"#,
526 ))
527 .unwrap(),
528 )
529 .await
530 .unwrap();
531 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
532 }
533
534 async fn post_publish(body: &'static str) -> (StatusCode, axum::body::Bytes) {
542 let app = test_router();
543 let resp = app
544 .oneshot(
545 Request::builder()
546 .method("POST")
547 .uri("/publish")
548 .header("content-type", "application/json")
549 .body(Body::from(body))
550 .unwrap(),
551 )
552 .await
553 .unwrap();
554 let status = resp.status();
555 let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
556 .await
557 .unwrap();
558 (status, bytes)
559 }
560
561 #[tokio::test]
562 async fn agent_id_rejects_single_letter() {
563 let (status, body) = post_publish(
564 r#"{"type":"agent.status","agent_id":"a","payload":{"status":"working","modified_files":[]}}"#,
565 )
566 .await;
567 assert_eq!(status, StatusCode::BAD_REQUEST);
568 let text = String::from_utf8_lossy(&body);
569 assert!(
570 text.contains("invalid agent_id"),
571 "body should mention 'invalid agent_id'; got: {text}"
572 );
573 }
574
575 #[tokio::test]
576 async fn agent_id_rejects_placeholder() {
577 let (status, body) = post_publish(
578 r#"{"type":"agent.status","agent_id":"<agent-id>","payload":{"status":"working","modified_files":[]}}"#,
579 )
580 .await;
581 assert_eq!(status, StatusCode::BAD_REQUEST);
582 let text = String::from_utf8_lossy(&body);
583 assert!(text.contains("invalid agent_id"), "body: {text}");
584 }
585
586 #[tokio::test]
587 async fn agent_id_rejects_empty() {
588 let (status, body) = post_publish(
589 r#"{"type":"agent.status","agent_id":"","payload":{"status":"working","modified_files":[]}}"#,
590 )
591 .await;
592 assert_eq!(status, StatusCode::BAD_REQUEST);
593 let _ = body;
596 }
597
598 #[tokio::test]
599 async fn agent_id_accepts_supervisor() {
600 let (status, _) = post_publish(
601 r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[]}}"#,
602 )
603 .await;
604 assert!(
605 status == StatusCode::ACCEPTED || status == StatusCode::OK,
606 "supervisor should be accepted; got: {status}"
607 );
608 }
609
610 #[tokio::test]
611 async fn agent_id_accepts_feat_dash() {
612 let (status, _) = post_publish(
613 r#"{"type":"agent.status","agent_id":"feat-test-branch","payload":{"status":"working","modified_files":[]}}"#,
614 )
615 .await;
616 assert!(
617 status == StatusCode::ACCEPTED || status == StatusCode::OK,
618 "feat-test-branch should be accepted; got: {status}"
619 );
620 }
621
622 #[tokio::test]
623 async fn agent_id_accepts_feat_slash() {
624 let (status, _) = post_publish(
625 r#"{"type":"agent.status","agent_id":"feat/test-branch","payload":{"status":"working","modified_files":[]}}"#,
626 )
627 .await;
628 assert!(
629 status == StatusCode::ACCEPTED || status == StatusCode::OK,
630 "feat/test-branch should be accepted; got: {status}"
631 );
632 }
633
634 #[tokio::test]
635 async fn payload_question_rejects_placeholder() {
636 let (status, body) = post_publish(
637 r#"{"type":"agent.question","agent_id":"feat-test-branch","payload":{"question":"<your specific question>"}}"#,
638 )
639 .await;
640 assert_eq!(status, StatusCode::BAD_REQUEST);
641 let text = String::from_utf8_lossy(&body);
642 assert!(
643 text.contains("placeholder") && text.contains("question"),
644 "body should mention both 'placeholder' and 'question'; got: {text}"
645 );
646 }
647
648 #[tokio::test]
649 async fn payload_question_accepts_real_content() {
650 let (status, _) = post_publish(
651 r#"{"type":"agent.question","agent_id":"feat-test-branch","payload":{"question":"Should we use bcrypt or argon2?"}}"#,
652 )
653 .await;
654 assert!(
655 status == StatusCode::ACCEPTED || status == StatusCode::OK,
656 "real human content should be accepted; got: {status}"
657 );
658 }
659
660 #[tokio::test]
661 async fn payload_blocked_rejects_placeholder_needs() {
662 let (status, body) = post_publish(
663 r#"{"type":"agent.blocked","agent_id":"feat-test-branch","payload":{"needs":"<what>","from":"feat-other"}}"#,
664 )
665 .await;
666 assert_eq!(status, StatusCode::BAD_REQUEST);
667 let text = String::from_utf8_lossy(&body);
668 assert!(
669 text.contains("placeholder") && text.contains("needs"),
670 "body: {text}"
671 );
672 }
673
674 #[tokio::test]
675 async fn payload_feedback_rejects_placeholder_error_entry() {
676 let (status, body) = post_publish(
677 r#"{"type":"agent.feedback","agent_id":"feat-test-branch","payload":{"from":"supervisor","errors":["<error 1>"]}}"#,
678 )
679 .await;
680 assert_eq!(status, StatusCode::BAD_REQUEST);
681 let text = String::from_utf8_lossy(&body);
682 assert!(
683 text.contains("placeholder") && text.contains("errors"),
684 "body: {text}"
685 );
686 }
687
688 #[tokio::test]
691 async fn advanced_main_accepted_through_publish_endpoint() {
692 let (status, _) = post_publish(
694 r#"{"type":"agent.advanced-main","from":"supervisor","merged_branch":"feat/auth","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z","summary":"landed auth"}"#,
695 )
696 .await;
697 assert_eq!(
698 status,
699 StatusCode::ACCEPTED,
700 "a well-formed advanced-main must be accepted (202)"
701 );
702 }
703
704 #[tokio::test]
705 async fn advanced_main_missing_field_returns_400_naming_field() {
706 let (status, body) = post_publish(
707 r#"{"type":"agent.advanced-main","from":"supervisor","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z"}"#,
708 )
709 .await;
710 assert_eq!(status, StatusCode::BAD_REQUEST);
711 let text = String::from_utf8_lossy(&body);
712 assert!(
713 text.contains("merged_branch"),
714 "the 400 must name the missing field; got: {text}"
715 );
716 }
717
718 #[tokio::test]
719 async fn advanced_main_routes_to_every_registered_agent() {
720 let state = Arc::new(BrokerState::new(None));
723 publish_json(
724 &state,
725 r#"{"type":"agent.status","agent_id":"feat-alpha","payload":{"status":"working","modified_files":[]}}"#,
726 )
727 .await;
728 publish_json(
729 &state,
730 r#"{"type":"agent.status","agent_id":"feat-beta","payload":{"status":"working","modified_files":[]}}"#,
731 )
732 .await;
733 publish_json(
734 &state,
735 r#"{"type":"agent.advanced-main","from":"supervisor","merged_branch":"feat/alpha","new_main_sha":"a1b2c3d4e5f6","base":"main","merged_at":"2026-06-04T13:30:00Z"}"#,
736 )
737 .await;
738
739 for agent in ["feat-alpha", "feat-beta"] {
740 let (msgs, _) = delivery::poll_messages(&state, agent, 0);
741 assert!(
742 msgs.iter()
743 .any(|m| matches!(m, BrokerMessage::AdvancedMain { .. })),
744 "{agent} inbox must surface the advanced-main event"
745 );
746 }
747 }
748
749 #[tokio::test]
750 async fn messages_valid_agent_returns_200_with_last_seq() {
751 let app = test_router();
752 let resp = app
753 .oneshot(
754 Request::builder()
755 .method("GET")
756 .uri("/messages/feat-x")
757 .body(Body::empty())
758 .unwrap(),
759 )
760 .await
761 .unwrap();
762 assert_eq!(resp.status(), StatusCode::OK);
763 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
764 .await
765 .unwrap();
766 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
767 assert_eq!(json["messages"], serde_json::json!([]));
768 assert_eq!(json["last_seq"], serde_json::json!(0));
769 }
770
771 #[tokio::test]
772 async fn messages_invalid_agent_returns_400() {
773 let app = test_router();
774 let resp = app
775 .oneshot(
776 Request::builder()
777 .method("GET")
778 .uri("/messages/INVALID!")
779 .body(Body::empty())
780 .unwrap(),
781 )
782 .await
783 .unwrap();
784 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
785 }
786
787 #[tokio::test]
788 async fn messages_invalid_since_returns_400() {
789 let app = test_router();
790 let resp = app
791 .oneshot(
792 Request::builder()
793 .method("GET")
794 .uri("/messages/feat-x?since=abc")
795 .body(Body::empty())
796 .unwrap(),
797 )
798 .await
799 .unwrap();
800 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
801 }
802
803 #[tokio::test]
804 async fn status_returns_marker_and_version() {
805 let app = test_router();
806 let resp = app
807 .oneshot(
808 Request::builder()
809 .method("GET")
810 .uri("/status")
811 .body(Body::empty())
812 .unwrap(),
813 )
814 .await
815 .unwrap();
816 assert_eq!(resp.status(), StatusCode::OK);
817 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
818 .await
819 .unwrap();
820 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
821 assert_eq!(json["git_paw"], true);
822 assert!(json["version"].is_string());
823 assert!(json["uptime_seconds"].is_number());
824 assert_eq!(json["agents"], serde_json::json!([]));
825 }
826
827 async fn publish_json(state: &Arc<BrokerState>, body: &'static str) {
829 let resp = router(Arc::clone(state))
830 .oneshot(
831 Request::builder()
832 .method("POST")
833 .uri("/publish")
834 .header("content-type", "application/json")
835 .body(Body::from(body))
836 .unwrap(),
837 )
838 .await
839 .unwrap();
840 assert_eq!(resp.status(), StatusCode::ACCEPTED);
841 }
842
843 async fn get_status(state: &Arc<BrokerState>) -> serde_json::Value {
846 let resp = router(Arc::clone(state))
847 .oneshot(
848 Request::builder()
849 .method("GET")
850 .uri("/status")
851 .body(Body::empty())
852 .unwrap(),
853 )
854 .await
855 .unwrap();
856 assert_eq!(resp.status(), StatusCode::OK);
857 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
858 .await
859 .unwrap();
860 serde_json::from_slice(&body).unwrap()
861 }
862
863 #[tokio::test]
864 async fn e2e_feedback_from_human_creates_no_phantom_roster_row() {
865 let state = Arc::new(BrokerState::new(None));
870 publish_json(
871 &state,
872 r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
873 )
874 .await;
875 publish_json(
876 &state,
877 r#"{"type":"agent.status","agent_id":"feat-roster","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
878 )
879 .await;
880 publish_json(
881 &state,
882 r#"{"type":"agent.feedback","agent_id":"feat-roster","payload":{"from":"human","errors":["fix the flaky test"]}}"#,
883 )
884 .await;
885
886 let json = get_status(&state).await;
887 let agents = json["agents"].as_array().expect("agents array");
888 let ids: Vec<&str> = agents
889 .iter()
890 .map(|a| a["agent_id"].as_str().unwrap())
891 .collect();
892 assert!(
893 !ids.contains(&"human"),
894 "a feedback `from:human` must not mint a phantom roster row; got {ids:?}",
895 );
896 assert_eq!(ids.len(), 2, "roster holds exactly the two real agents");
897 }
898
899 #[tokio::test]
900 async fn e2e_status_shows_cli_for_every_agent() {
901 let state = Arc::new(BrokerState::new(None));
905 publish_json(
906 &state,
907 r#"{"type":"agent.status","agent_id":"supervisor","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
908 )
909 .await;
910 publish_json(
911 &state,
912 r#"{"type":"agent.status","agent_id":"feat-build","payload":{"status":"working","modified_files":[],"cli":"claude-oss"}}"#,
913 )
914 .await;
915
916 let json = get_status(&state).await;
917 let agents = json["agents"].as_array().expect("agents array");
918 assert_eq!(agents.len(), 2);
919 for a in agents {
920 assert_eq!(
921 a["cli"].as_str(),
922 Some("claude-oss"),
923 "every agent row must carry its cli: {a}",
924 );
925 }
926 }
927
928 fn init_test_repo_server(dir: &std::path::Path) {
934 use std::process::Command;
935 let run = |args: &[&str]| {
936 Command::new("git")
937 .args(args)
938 .current_dir(dir)
939 .output()
940 .expect("git command failed");
941 };
942 run(&["init", "-q", "-b", "main"]);
943 run(&["config", "user.email", "test@example.com"]);
944 run(&["config", "user.name", "test"]);
945 run(&["commit", "--allow-empty", "-m", "root", "-q"]);
946 }
947
948 async fn post_watch(state: &Arc<BrokerState>, body: String) -> StatusCode {
950 router(Arc::clone(state))
951 .oneshot(
952 Request::builder()
953 .method("POST")
954 .uri("/watch")
955 .header("content-type", "application/json")
956 .body(Body::from(body))
957 .unwrap(),
958 )
959 .await
960 .unwrap()
961 .status()
962 }
963
964 #[tokio::test]
967 async fn watch_registers_target_and_surfaces_worktree_in_status() {
968 use super::super::watcher::POLL_INTERVAL;
969 let tmp = tempfile::tempdir().unwrap();
970 init_test_repo_server(tmp.path());
971
972 let state = Arc::new(BrokerState::new(None));
973 let (tx, rx) = tokio::sync::watch::channel(false);
976 state.set_watcher_shutdown_rx(rx);
977
978 let body = format!(
979 r#"{{"agent_id":"feat-hot","worktree_path":"{}","cli":"claude"}}"#,
980 tmp.path().display()
981 );
982 assert_eq!(post_watch(&state, body).await, StatusCode::ACCEPTED);
983
984 std::fs::write(tmp.path().join("hot.rs"), "fn hot() {}").unwrap();
986
987 let mut found = false;
988 for _ in 0..20 {
989 tokio::time::sleep(POLL_INTERVAL / 2).await;
990 let json = get_status(&state).await;
991 if let Some(agents) = json["agents"].as_array()
992 && agents.iter().any(|a| a["agent_id"] == "feat-hot")
993 {
994 found = true;
995 break;
996 }
997 }
998 assert!(
999 found,
1000 "a registered worktree must surface its agent in /status from activity"
1001 );
1002
1003 let _ = tx.send(true);
1004 }
1005
1006 #[tokio::test]
1009 async fn watch_duplicate_registration_is_a_noop_success() {
1010 let state = Arc::new(BrokerState::new(None));
1011 let body = r#"{"agent_id":"feat-hot","worktree_path":"/tmp/feat-hot","cli":"claude"}"#;
1012 assert_eq!(
1013 post_watch(&state, body.to_string()).await,
1014 StatusCode::ACCEPTED
1015 );
1016 assert_eq!(
1017 post_watch(&state, body.to_string()).await,
1018 StatusCode::ACCEPTED
1019 );
1020 assert_eq!(
1021 state.read().watched_paths.len(),
1022 1,
1023 "duplicate registration must not record a second target"
1024 );
1025 }
1026
1027 #[tokio::test]
1028 async fn watch_rejects_invalid_agent_id() {
1029 let state = Arc::new(BrokerState::new(None));
1030 let body = r#"{"agent_id":"a","worktree_path":"/tmp/x","cli":"claude"}"#;
1031 assert_eq!(
1032 post_watch(&state, body.to_string()).await,
1033 StatusCode::BAD_REQUEST
1034 );
1035 }
1036
1037 #[tokio::test]
1038 async fn watch_rejects_placeholder_worktree_path() {
1039 let state = Arc::new(BrokerState::new(None));
1040 let body = r#"{"agent_id":"feat-hot","worktree_path":"<path>","cli":"claude"}"#;
1041 assert_eq!(
1042 post_watch(&state, body.to_string()).await,
1043 StatusCode::BAD_REQUEST
1044 );
1045 }
1046
1047 #[tokio::test]
1048 async fn watch_rejects_empty_worktree_path() {
1049 let state = Arc::new(BrokerState::new(None));
1050 let body = r#"{"agent_id":"feat-hot","worktree_path":"","cli":"claude"}"#;
1051 assert_eq!(
1052 post_watch(&state, body.to_string()).await,
1053 StatusCode::BAD_REQUEST
1054 );
1055 }
1056
1057 #[tokio::test]
1058 async fn watch_wrong_content_type_returns_415() {
1059 let app = test_router();
1060 let resp = app
1061 .oneshot(
1062 Request::builder()
1063 .method("POST")
1064 .uri("/watch")
1065 .header("content-type", "text/plain")
1066 .body(Body::from("{}"))
1067 .unwrap(),
1068 )
1069 .await
1070 .unwrap();
1071 assert_eq!(resp.status(), StatusCode::UNSUPPORTED_MEDIA_TYPE);
1072 }
1073
1074 #[tokio::test]
1075 async fn unknown_route_returns_404() {
1076 let app = test_router();
1077 let resp = app
1078 .oneshot(
1079 Request::builder()
1080 .method("GET")
1081 .uri("/unknown/route")
1082 .body(Body::empty())
1083 .unwrap(),
1084 )
1085 .await
1086 .unwrap();
1087 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
1088 }
1089
1090 #[tokio::test]
1091 async fn wrong_method_returns_405() {
1092 let app = test_router();
1093 let resp = app
1094 .oneshot(
1095 Request::builder()
1096 .method("GET")
1097 .uri("/publish")
1098 .body(Body::empty())
1099 .unwrap(),
1100 )
1101 .await
1102 .unwrap();
1103 assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
1104 }
1105
1106 #[tokio::test]
1107 async fn panic_in_handler_is_isolated() {
1108 let app = Router::new()
1110 .route(
1111 "/panic",
1112 get(|| async {
1113 panic!("deliberate test panic");
1114 #[allow(unreachable_code)]
1115 StatusCode::OK.into_response()
1116 }),
1117 )
1118 .route("/status", get(status))
1119 .with_state(Arc::new(BrokerState::new(None)));
1120
1121 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1122 let addr = listener.local_addr().unwrap();
1123
1124 let server = tokio::spawn(async move {
1125 axum::serve(listener, app).await.ok();
1126 });
1127
1128 let client =
1129 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
1130 .build_http();
1131
1132 let _panic_resp = client
1134 .request(
1135 Request::builder()
1136 .method("GET")
1137 .uri(format!("http://{addr}/panic"))
1138 .body(axum::body::Body::empty())
1139 .unwrap(),
1140 )
1141 .await;
1142
1143 let status_resp = client
1147 .request(
1148 Request::builder()
1149 .method("GET")
1150 .uri(format!("http://{addr}/status"))
1151 .body(axum::body::Body::empty())
1152 .unwrap(),
1153 )
1154 .await
1155 .expect("server should still be alive after a panic in another handler");
1156
1157 assert_eq!(status_resp.status(), StatusCode::OK);
1158
1159 server.abort();
1160 }
1161
1162 #[tokio::test]
1163 async fn log_returns_full_message_log_in_chronological_order() {
1164 let state = Arc::new(BrokerState::new(None));
1165 for (agent, status_label) in [
1167 ("feat-a", "working"),
1168 ("feat-b", "blocked"),
1169 ("feat-c", "done"),
1170 ] {
1171 let msg = BrokerMessage::Status {
1172 agent_id: agent.to_string(),
1173 payload: super::super::messages::StatusPayload {
1174 status: status_label.to_string(),
1175 modified_files: vec![],
1176 message: None,
1177 ..Default::default()
1178 },
1179 };
1180 delivery::publish_message(&state, &msg);
1181 }
1182
1183 let app = router(state);
1184 let resp = app
1185 .oneshot(
1186 Request::builder()
1187 .method("GET")
1188 .uri("/log")
1189 .body(Body::empty())
1190 .unwrap(),
1191 )
1192 .await
1193 .unwrap();
1194 assert_eq!(resp.status(), StatusCode::OK);
1195
1196 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1197 .await
1198 .unwrap();
1199 let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
1200 let entries = parsed["entries"].as_array().expect("entries array");
1201 assert_eq!(entries.len(), 3, "all three messages must appear in /log");
1202 assert_eq!(entries[0]["message"]["agent_id"], "feat-a");
1204 assert_eq!(entries[2]["message"]["agent_id"], "feat-c");
1205 assert_eq!(parsed["last_seq"], 3);
1206 }
1207
1208 #[tokio::test]
1209 async fn log_with_since_filters_older_entries() {
1210 let state = Arc::new(BrokerState::new(None));
1211 for agent in ["feat-a", "feat-b", "feat-c"] {
1212 let msg = BrokerMessage::Status {
1213 agent_id: agent.to_string(),
1214 payload: super::super::messages::StatusPayload {
1215 status: "working".to_string(),
1216 modified_files: vec![],
1217 message: None,
1218 ..Default::default()
1219 },
1220 };
1221 delivery::publish_message(&state, &msg);
1222 }
1223
1224 let app = router(state);
1225 let resp = app
1226 .oneshot(
1227 Request::builder()
1228 .method("GET")
1229 .uri("/log?since=2")
1230 .body(Body::empty())
1231 .unwrap(),
1232 )
1233 .await
1234 .unwrap();
1235 let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
1236 .await
1237 .unwrap();
1238 let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
1239 let entries = parsed["entries"].as_array().unwrap();
1240 assert_eq!(
1241 entries.len(),
1242 1,
1243 "since=2 must yield only the message at seq=3"
1244 );
1245 assert_eq!(entries[0]["seq"], 3);
1246 }
1247
1248 #[tokio::test]
1249 async fn log_invalid_since_returns_400() {
1250 let app = test_router();
1251 let resp = app
1252 .oneshot(
1253 Request::builder()
1254 .method("GET")
1255 .uri("/log?since=notanumber")
1256 .body(Body::empty())
1257 .unwrap(),
1258 )
1259 .await
1260 .unwrap();
1261 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1262 }
1263}