Skip to main content

cellos_server/routes/
formations.rs

1//! `/v1/formations` — CRUD handlers.
2//!
3//! POST validates the submitted `FormationDocument` per ADR-0010
4//! (single coordinator + every non-coordinator member carries
5//! `authorizedBy`). The full DAG/cycle/scope-narrowing admission gate
6//! lives in `cellos-supervisor`; we surface the same RFC 9457
7//! discriminants here so cellctl can render either source uniformly.
8
9use axum::extract::{Path, State};
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::Json;
13use cellos_core::events::{
14    cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
15    cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
16    cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
17};
18use serde::{Deserialize, Serialize};
19use uuid::Uuid;
20
21use crate::auth::require_bearer;
22use crate::error::{AppError, AppErrorKind};
23use crate::state::{AppState, FormationRecord, FormationStatus};
24
25/// Subset of the `formation-v1` document the admission gate cares about.
26/// Additional fields are tolerated and preserved verbatim in
27/// `FormationRecord::document` (via the captured `serde_json::Value`).
28#[derive(Debug, Deserialize)]
29pub struct FormationDocument {
30    pub name: String,
31    pub coordinator: String,
32    pub members: Vec<FormationMember>,
33}
34
35#[derive(Debug, Deserialize)]
36pub struct FormationMember {
37    pub id: String,
38    /// Required on every non-coordinator member; forbidden on the
39    /// coordinator (ADR-0010 §Enforcement).
40    #[serde(rename = "authorizedBy")]
41    pub authorized_by: Option<String>,
42}
43
44#[derive(Debug, Serialize)]
45pub struct FormationCreated {
46    pub id: Uuid,
47    pub name: String,
48    pub status: FormationStatus,
49}
50
51/// POST /v1/formations — admit a new formation. Returns 201 with the
52/// generated id on success; RFC 9457 problem+json on validation failure.
53pub async fn create_formation(
54    State(state): State<AppState>,
55    headers: HeaderMap,
56    body: axum::body::Bytes,
57) -> Result<impl IntoResponse, AppError> {
58    require_bearer(&headers, &state.api_token)?;
59
60    // Parse twice: once into our validated struct, once into a generic
61    // Value so we can echo the original document back on GET. This keeps
62    // the public schema forward-compatible with fields we don't yet
63    // recognise.
64    let raw: serde_json::Value = serde_json::from_slice(&body)?;
65    let doc: FormationDocument = serde_json::from_value(raw.clone())?;
66
67    validate_formation(&doc)?;
68
69    let id = Uuid::new_v4();
70    let record = FormationRecord {
71        id,
72        name: doc.name.clone(),
73        status: FormationStatus::Pending,
74        document: raw,
75    };
76
77    state.formations.write().await.insert(id, record);
78
79    // Emit formation.v1.created so the WebSocket stream and audit log see it.
80    let cell_count = doc.members.len() as u32;
81    let no_failed: &[String] = &[];
82    let event = cloud_event_v1_formation_created(
83        &id.to_string(),
84        id.to_string().as_str(),
85        &id.to_string(),
86        &doc.name,
87        cell_count,
88        no_failed,
89        None,
90    );
91    let subject = format!("cellos.events.formations.{id}.created");
92    publish_event(&state, &subject, event).await;
93
94    let body = FormationCreated {
95        id,
96        name: doc.name,
97        status: FormationStatus::Pending,
98    };
99    Ok((StatusCode::CREATED, Json(body)))
100}
101
102/// Response shape for `GET /v1/formations` per ADR-0015 §D2. The
103/// `cursor` is the highest JetStream stream-sequence the server's
104/// projection has applied; clients hand it back as
105/// `/ws/events?since=<cursor>` so they can resume the live stream
106/// without missing any event between the snapshot and the WS open.
107#[derive(Debug, Serialize)]
108pub struct FormationsSnapshot {
109    pub formations: Vec<FormationRecord>,
110    pub cursor: u64,
111}
112
113/// GET /v1/formations — list all known formations plus the current
114/// projection cursor (ADR-0015).
115pub async fn list_formations(
116    State(state): State<AppState>,
117    headers: HeaderMap,
118) -> Result<Json<FormationsSnapshot>, AppError> {
119    require_bearer(&headers, &state.api_token)?;
120    let map = state.formations.read().await;
121    Ok(Json(FormationsSnapshot {
122        formations: map.values().cloned().collect(),
123        cursor: state.cursor(),
124    }))
125}
126
127/// GET /v1/formations/{id} — fetch one formation by uuid.
128pub async fn get_formation(
129    State(state): State<AppState>,
130    headers: HeaderMap,
131    Path(id): Path<Uuid>,
132) -> Result<Json<FormationRecord>, AppError> {
133    require_bearer(&headers, &state.api_token)?;
134    let map = state.formations.read().await;
135    map.get(&id)
136        .cloned()
137        .map(Json)
138        .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
139}
140
141/// DELETE /v1/formations/{id} — best-effort cancellation. The actual
142/// teardown is performed asynchronously by the supervisor once the
143/// `formation.cancelled` event lands on JetStream; we only mark the
144/// in-memory projection.
145pub async fn delete_formation(
146    State(state): State<AppState>,
147    headers: HeaderMap,
148    Path(id): Path<Uuid>,
149) -> Result<StatusCode, AppError> {
150    require_bearer(&headers, &state.api_token)?;
151    let mut map = state.formations.write().await;
152    let (name, cell_count) = {
153        let entry = map
154            .get_mut(&id)
155            .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
156        entry.status = FormationStatus::Cancelled;
157        let members = entry
158            .document
159            .get("members")
160            .and_then(|m| m.as_array())
161            .map(|a| a.len() as u32)
162            .unwrap_or(0);
163        (entry.name.clone(), members)
164    };
165    drop(map);
166
167    let no_failed: &[String] = &[];
168    let event = cloud_event_v1_formation_failed(
169        &id.to_string(),
170        id.to_string().as_str(),
171        &id.to_string(),
172        &name,
173        cell_count,
174        no_failed,
175        Some("deleted by operator"),
176    );
177    let subject = format!("cellos.events.formations.{id}.failed");
178    publish_event(&state, &subject, event).await;
179
180    Ok(StatusCode::NO_CONTENT)
181}
182
183/// POST /v1/formations/{id}/status — receive a state-transition notification
184/// from the supervisor or an operator tool. Updates the in-memory projection
185/// and emits the matching `formation.v1.*` CloudEvent to NATS so the
186/// WebSocket stream carries it to connected web-view clients.
187#[derive(Debug, Deserialize)]
188pub struct StatusTransition {
189    pub state: String,
190    pub reason: Option<String>,
191    pub failed_cells: Option<Vec<String>>,
192}
193
194pub async fn update_formation_status(
195    State(state): State<AppState>,
196    headers: HeaderMap,
197    Path(id): Path<Uuid>,
198    Json(body): Json<StatusTransition>,
199) -> Result<StatusCode, AppError> {
200    require_bearer(&headers, &state.api_token)?;
201
202    let (new_status, name, cell_count, failed) = {
203        let mut map = state.formations.write().await;
204        let entry = map
205            .get_mut(&id)
206            .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
207
208        let new_status = match body.state.to_uppercase().as_str() {
209            "RUNNING" | "LAUNCHING" => FormationStatus::Running,
210            "DEGRADED" => FormationStatus::Running, // DEGRADED keeps running
211            "COMPLETED" => FormationStatus::Succeeded,
212            "FAILED" => FormationStatus::Failed,
213            other => {
214                // RFC-9457 §3.1: this is a generic bad-request, not an
215                // ADR-0010 admission-gate rejection. Returning the
216                // `FormationNoCoordinator` discriminant here would
217                // hijack a load-bearing identifier that clients switch
218                // on per ADR-0010 §Enforcement.
219                return Err(AppError::new(
220                    AppErrorKind::BadRequest,
221                    format!("unknown state: {other}"),
222                ));
223            }
224        };
225        entry.status = new_status;
226
227        let members = entry
228            .document
229            .get("members")
230            .and_then(|m| m.as_array())
231            .map(|a| a.len() as u32)
232            .unwrap_or(0);
233        let failed = body.failed_cells.unwrap_or_default();
234        (new_status, entry.name.clone(), members, failed)
235    };
236
237    let sid = id.to_string();
238    let reason = body.reason.as_deref();
239    let empty: &[String] = &[];
240
241    let (event, phase) = match body.state.to_uppercase().as_str() {
242        "LAUNCHING" => (
243            cloud_event_v1_formation_launching(&sid, &sid, &sid, &name, cell_count, empty, reason),
244            "launching",
245        ),
246        "RUNNING" => (
247            cloud_event_v1_formation_running(&sid, &sid, &sid, &name, cell_count, empty, reason),
248            "running",
249        ),
250        "DEGRADED" => (
251            cloud_event_v1_formation_degraded(&sid, &sid, &sid, &name, cell_count, &failed, reason),
252            "degraded",
253        ),
254        "COMPLETED" => (
255            cloud_event_v1_formation_completed(&sid, &sid, &sid, &name, cell_count, empty, reason),
256            "completed",
257        ),
258        _ => (
259            cloud_event_v1_formation_failed(&sid, &sid, &sid, &name, cell_count, &failed, reason),
260            "failed",
261        ),
262    };
263
264    let subject = format!("cellos.events.formations.{id}.{phase}");
265    publish_event(&state, &subject, event).await;
266
267    let _ = new_status; // used above
268    Ok(StatusCode::NO_CONTENT)
269}
270
271/// Publish a CloudEvent JSON payload to NATS if a client is connected.
272/// Failures are logged and swallowed — event loss is surfaced via the DLQ
273/// (P3-03) once that crate lands; the HTTP response is never blocked by NATS.
274async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
275    let Some(nats) = &state.nats else { return };
276    let payload = match serde_json::to_vec(&event) {
277        Ok(b) => b,
278        Err(e) => {
279            tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
280            return;
281        }
282    };
283    if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
284        tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
285    }
286}
287
288/// Apply the structural admission-gate checks ADR-0010 §Enforcement
289/// requires the server to re-run regardless of client behaviour:
290///
291/// 1. **noCoordinator** — the coordinator named in `coordinator` MUST
292///    appear in `members`.
293/// 2. **multipleCoordinators** — every `members[*].id` MUST be unique.
294///    The JSON schema declares `uniqueItems`; we re-enforce because
295///    the server cannot assume schema validation ran on the client.
296/// 3. **authorityNotNarrowing** — the coordinator MUST NOT carry
297///    `authorizedBy`; every non-coordinator MUST carry it AND the
298///    referenced parent MUST exist in `members` (an orphan parent is
299///    an unbounded A₀ — exactly the failure mode ADR-0010 §Proof
300///    forbids).
301/// 4. **cycle** — the `authorizedBy` edges MUST form a DAG. A cycle
302///    (including the self-edge `authorizedBy: self`) breaks the
303///    induction that proves every member's authority chains back to
304///    the coordinator.
305///
306/// The per-edge authority-subset check (`A_c ⊆ A_p`) lives in the
307/// supervisor today because the `formation-v1` document parsed here
308/// does not yet carry per-member declared authority sets; that is the
309/// only ADR-0010 check the server still defers.
310fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
311    use std::collections::{HashMap, HashSet};
312
313    // 1. noCoordinator.
314    let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
315    if !coord_present {
316        return Err(AppError::new(
317            AppErrorKind::FormationNoCoordinator,
318            format!(
319                "coordinator '{}' must appear in members list",
320                doc.coordinator
321            ),
322        ));
323    }
324
325    // 2. multipleCoordinators — duplicate member ids. We treat the
326    // duplicate-id failure under this discriminant because the ADR
327    // §Consequences canonical case is "two members both named
328    // `coord`": admission cannot pick which one is the coordinator.
329    let mut seen: HashSet<&str> = HashSet::new();
330    for m in &doc.members {
331        if !seen.insert(m.id.as_str()) {
332            return Err(AppError::new(
333                AppErrorKind::FormationMultipleCoordinators,
334                format!("duplicate member id '{}'", m.id),
335            ));
336        }
337    }
338
339    // 3. authorityNotNarrowing — coord-forbid, non-coord require, plus
340    //    orphan-parent rejection. An `authorizedBy` reference that has
341    //    no member entry has no parent edge → no narrowing → admission
342    //    fails.
343    for m in &doc.members {
344        let is_coord = m.id == doc.coordinator;
345        match (is_coord, &m.authorized_by) {
346            (true, Some(_)) => {
347                return Err(AppError::new(
348                    AppErrorKind::FormationAuthorityNotNarrowing,
349                    format!("coordinator '{}' must not declare authorizedBy", m.id),
350                ));
351            }
352            (false, None) => {
353                return Err(AppError::new(
354                    AppErrorKind::FormationAuthorityNotNarrowing,
355                    format!("non-coordinator member '{}' missing authorizedBy", m.id),
356                ));
357            }
358            (false, Some(parent)) => {
359                if !seen.contains(parent.as_str()) {
360                    return Err(AppError::new(
361                        AppErrorKind::FormationAuthorityNotNarrowing,
362                        format!("member '{}' references unknown parent '{}'", m.id, parent),
363                    ));
364                }
365            }
366            _ => {}
367        }
368    }
369
370    // 4. cycle — walk each non-coordinator's authorizedBy chain. In a
371    //    valid DAG with exactly one out-edge per non-root, the walk
372    //    terminates at the coordinator within strictly fewer hops than
373    //    members.len(). Self-loops are caught on the first hop.
374    let parent: HashMap<&str, &str> = doc
375        .members
376        .iter()
377        .filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
378        .collect();
379
380    for m in &doc.members {
381        if m.id == doc.coordinator {
382            continue;
383        }
384        let mut cursor = m.id.as_str();
385        for _ in 0..doc.members.len() {
386            let Some(&p) = parent.get(cursor) else {
387                // No outgoing edge from cursor → cursor is the
388                // coordinator (proven in check 1 to be present). Done.
389                break;
390            };
391            if p == m.id {
392                return Err(AppError::new(
393                    AppErrorKind::FormationCycle,
394                    format!("authorizedBy cycle detected involving member '{}'", m.id),
395                ));
396            }
397            cursor = p;
398        }
399        if parent.contains_key(cursor) {
400            // Exhausted hop budget without reaching the coordinator —
401            // a cycle exists on the chain (not necessarily through
402            // `m.id` itself).
403            return Err(AppError::new(
404                AppErrorKind::FormationCycle,
405                format!(
406                    "authorizedBy cycle detected on chain starting at '{}'",
407                    m.id
408                ),
409            ));
410        }
411    }
412
413    Ok(())
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::router;
420    use axum::body::Body;
421    use axum::http::{header, Request};
422    use http_body_util::BodyExt;
423    use tower::ServiceExt;
424
425    const TOKEN: &str = "test-token";
426
427    fn test_state() -> AppState {
428        AppState::new(None, TOKEN)
429    }
430
431    fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
432        let mut b = Request::builder()
433            .method(method)
434            .uri(uri)
435            .header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
436        if body.is_some() {
437            b = b.header(header::CONTENT_TYPE, "application/json");
438        }
439        b.body(
440            body.map(|s| Body::from(s.to_owned()))
441                .unwrap_or_else(Body::empty),
442        )
443        .expect("build request")
444    }
445
446    #[tokio::test]
447    async fn post_valid_formation_returns_201() {
448        let app = router(test_state());
449        let body = serde_json::json!({
450            "name": "demo",
451            "coordinator": "coord",
452            "members": [
453                { "id": "coord" },
454                { "id": "worker-a", "authorizedBy": "coord" },
455                { "id": "worker-b", "authorizedBy": "coord" }
456            ]
457        })
458        .to_string();
459
460        let resp = app
461            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
462            .await
463            .expect("router response");
464        assert_eq!(resp.status(), StatusCode::CREATED);
465
466        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
467        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
468        assert_eq!(parsed["status"], "PENDING");
469        assert_eq!(parsed["name"], "demo");
470        assert!(parsed["id"].as_str().is_some());
471    }
472
473    #[tokio::test]
474    async fn post_formation_missing_coordinator_returns_400() {
475        let app = router(test_state());
476        // coordinator names "coord" but no such member exists.
477        let body = serde_json::json!({
478            "name": "demo",
479            "coordinator": "coord",
480            "members": [
481                { "id": "worker-a", "authorizedBy": "coord" }
482            ]
483        })
484        .to_string();
485
486        let resp = app
487            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
488            .await
489            .expect("router response");
490        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
491        let ct = resp
492            .headers()
493            .get(header::CONTENT_TYPE)
494            .and_then(|v| v.to_str().ok())
495            .unwrap_or_default()
496            .to_owned();
497        assert!(
498            ct.starts_with("application/problem+json"),
499            "expected RFC 9457 media type, got {ct:?}"
500        );
501        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
502        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
503        assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
504    }
505
506    #[tokio::test]
507    async fn post_formation_member_missing_authorized_by_returns_400() {
508        let app = router(test_state());
509        let body = serde_json::json!({
510            "name": "demo",
511            "coordinator": "coord",
512            "members": [
513                { "id": "coord" },
514                { "id": "worker-a" } // missing authorizedBy
515            ]
516        })
517        .to_string();
518
519        let resp = app
520            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
521            .await
522            .expect("router response");
523        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
524        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
525        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
526        assert_eq!(
527            parsed["type"], "/problems/formation/authority-not-narrowing",
528            "expected authority-not-narrowing discriminant, got {parsed}"
529        );
530    }
531
532    #[tokio::test]
533    async fn get_formations_returns_snapshot_with_cursor() {
534        // ADR-0015 §D2: GET /v1/formations is `{ formations: [...], cursor: u64 }`.
535        let app = router(test_state());
536        let resp = app
537            .oneshot(auth_req("GET", "/v1/formations", None))
538            .await
539            .expect("router response");
540        assert_eq!(resp.status(), StatusCode::OK);
541        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
542        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
543        assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
544        let arr = parsed["formations"].as_array().expect("formations array");
545        assert_eq!(arr.len(), 0);
546        assert!(
547            parsed["cursor"].is_u64(),
548            "cursor field must be an unsigned integer, got {}",
549            parsed["cursor"]
550        );
551        assert_eq!(parsed["cursor"].as_u64(), Some(0));
552    }
553
554    #[tokio::test]
555    async fn snapshot_returns_cursor() {
556        // ADR-0015 §D2 + §E: after POSTing a formation, the snapshot
557        // response MUST carry a `cursor` field of integer type so the
558        // client can hand it to `/ws/events?since=<cursor>`.
559        let app = router(test_state());
560        let body = serde_json::json!({
561            "name": "with-cursor",
562            "coordinator": "coord",
563            "members": [
564                { "id": "coord" },
565                { "id": "worker-a", "authorizedBy": "coord" }
566            ]
567        })
568        .to_string();
569
570        let resp = app
571            .clone()
572            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
573            .await
574            .expect("router response");
575        assert_eq!(resp.status(), StatusCode::CREATED);
576
577        let resp = app
578            .oneshot(auth_req("GET", "/v1/formations", None))
579            .await
580            .expect("router response");
581        assert_eq!(resp.status(), StatusCode::OK);
582        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
583        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
584        assert!(
585            parsed["cursor"].is_u64(),
586            "cursor must be unsigned integer; got {}",
587            parsed["cursor"]
588        );
589        let formations = parsed["formations"].as_array().expect("formations array");
590        assert_eq!(formations.len(), 1, "expected 1 formation after POST");
591        assert_eq!(formations[0]["name"], "with-cursor");
592    }
593
594    #[tokio::test]
595    async fn missing_bearer_returns_401() {
596        let app = router(test_state());
597        let resp = app
598            .oneshot(
599                Request::builder()
600                    .method("GET")
601                    .uri("/v1/formations")
602                    .body(Body::empty())
603                    .unwrap(),
604            )
605            .await
606            .expect("router response");
607        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
608    }
609
610    /// ADR-0010 §Enforcement `multipleCoordinators` discriminant.
611    /// Duplicate `members[*].id` MUST be rejected with this type even
612    /// when the duplicates carry valid `authorizedBy`. The JSON schema
613    /// has `uniqueItems` but the server cannot assume schema validation
614    /// ran on the client.
615    #[tokio::test]
616    async fn rejects_duplicate_member_ids_with_multiple_coordinators_type() {
617        let app = router(test_state());
618        let body = serde_json::json!({
619            "name": "dup-ids",
620            "coordinator": "coord",
621            "members": [
622                { "id": "coord" },
623                { "id": "coord", "authorizedBy": "coord" }
624            ]
625        })
626        .to_string();
627        let resp = app
628            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
629            .await
630            .expect("router response");
631        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
632        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
633        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
634        assert_eq!(
635            parsed["type"], "/problems/formation/multiple-coordinators",
636            "duplicate member ids must surface multipleCoordinators"
637        );
638    }
639
640    /// ADR-0010 §Enforcement `cycle` discriminant. `authorizedBy: self`
641    /// is the minimal cycle.
642    #[tokio::test]
643    async fn rejects_self_authorized_cycle() {
644        let app = router(test_state());
645        let body = serde_json::json!({
646            "name": "self-cycle",
647            "coordinator": "coord",
648            "members": [
649                { "id": "coord" },
650                { "id": "worker-a", "authorizedBy": "worker-a" }
651            ]
652        })
653        .to_string();
654        let resp = app
655            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
656            .await
657            .expect("router response");
658        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
659        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
660        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
661        assert_eq!(parsed["type"], "/problems/formation/cycle");
662    }
663
664    /// Two-node cycle a→b→a; neither chains back to coordinator.
665    #[tokio::test]
666    async fn rejects_two_node_cycle() {
667        let app = router(test_state());
668        let body = serde_json::json!({
669            "name": "two-cycle",
670            "coordinator": "coord",
671            "members": [
672                { "id": "coord" },
673                { "id": "a", "authorizedBy": "b" },
674                { "id": "b", "authorizedBy": "a" }
675            ]
676        })
677        .to_string();
678        let resp = app
679            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
680            .await
681            .expect("router response");
682        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
683        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
684        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
685        assert_eq!(parsed["type"], "/problems/formation/cycle");
686    }
687
688    /// Orphan parent: `authorizedBy: ghost` where `ghost` is not in
689    /// `members`. Without a parent edge the member has no narrowing
690    /// path → `authorityNotNarrowing`.
691    #[tokio::test]
692    async fn rejects_orphan_parent_reference() {
693        let app = router(test_state());
694        let body = serde_json::json!({
695            "name": "orphan-parent",
696            "coordinator": "coord",
697            "members": [
698                { "id": "coord" },
699                { "id": "worker-a", "authorizedBy": "ghost" }
700            ]
701        })
702        .to_string();
703        let resp = app
704            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
705            .await
706            .expect("router response");
707        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
708        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
709        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
710        assert_eq!(
711            parsed["type"],
712            "/problems/formation/authority-not-narrowing"
713        );
714    }
715
716    /// Red-team finding: POST /v1/formations was using axum's default
717    /// 2 MiB body limit. We cap at 64 KiB so a >64 KiB payload returns
718    /// 413 Payload Too Large rather than burning CPU on serde parsing.
719    #[tokio::test]
720    async fn post_formation_oversized_body_returns_413() {
721        let app = router(test_state());
722        // Build a JSON document larger than 64 KiB by stuffing a long
723        // `name` field. The body-limit layer rejects before serde
724        // parses, so the document does not need to be semantically
725        // valid past the limit.
726        let big = "x".repeat(70 * 1024);
727        let body =
728            format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
729        let resp = app
730            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
731            .await
732            .expect("router response");
733        assert_eq!(
734            resp.status(),
735            StatusCode::PAYLOAD_TOO_LARGE,
736            "oversized body must surface 413; got {:?}",
737            resp.status(),
738        );
739    }
740
741    /// Sanity-probe: the parameterized route `/v1/formations/{id}`
742    /// actually captures the path segment. Existing tests only hit the
743    /// non-parameterized `/v1/formations`; if the route registration
744    /// ever regresses to literal-brace matching (e.g. on a future axum
745    /// upgrade that changes path syntax), this test fails loudly.
746    ///
747    /// We POST a real formation then GET it by id and check the
748    /// returned record matches. A 404 with empty/non-problem+json
749    /// content-type indicates router-level miss (literal route); a
750    /// 404 with problem+json indicates handler-level not-found
751    /// (route matched, formation absent). We expect 200.
752    #[tokio::test]
753    async fn get_formation_by_id_captures_path() {
754        let state = test_state();
755        let body = serde_json::json!({
756            "name": "probe",
757            "coordinator": "coord",
758            "members": [
759                { "id": "coord" },
760                { "id": "worker-a", "authorizedBy": "coord" }
761            ]
762        })
763        .to_string();
764        let resp = router(state.clone())
765            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
766            .await
767            .expect("router response");
768        assert_eq!(resp.status(), StatusCode::CREATED);
769        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
770        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
771        let id = parsed["id"].as_str().expect("uuid string");
772
773        let resp = router(state)
774            .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
775            .await
776            .expect("router response");
777        assert_eq!(
778            resp.status(),
779            StatusCode::OK,
780            "GET /v1/formations/<id> must capture the path segment; got {:?}",
781            resp.status(),
782        );
783        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
784        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
785        assert_eq!(parsed["name"], "probe");
786    }
787
788    /// Red-team finding: `update_formation_status` previously returned
789    /// the ADR-0010 `no-coordinator` discriminant for unknown state
790    /// strings, hijacking a load-bearing admission-gate identifier.
791    /// Unknown state is a generic bad-request.
792    #[tokio::test]
793    async fn unknown_state_returns_bad_request_problem_type() {
794        let state = test_state();
795        let body = serde_json::json!({
796            "name": "demo",
797            "coordinator": "coord",
798            "members": [
799                { "id": "coord" },
800                { "id": "worker-a", "authorizedBy": "coord" }
801            ]
802        })
803        .to_string();
804        let resp = router(state.clone())
805            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
806            .await
807            .expect("router response");
808        assert_eq!(resp.status(), StatusCode::CREATED);
809        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
810        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
811        let id = parsed["id"].as_str().expect("uuid string").to_owned();
812
813        let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
814        let resp = router(state)
815            .oneshot(auth_req(
816                "POST",
817                &format!("/v1/formations/{id}/status"),
818                Some(&bad),
819            ))
820            .await
821            .expect("router response");
822        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
823        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
824        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
825        assert_eq!(
826            parsed["type"], "/problems/bad-request",
827            "unknown state must surface generic bad-request, not an ADR-0010 discriminant"
828        );
829    }
830}