Skip to main content

cellos_server/routes/
formations.rs

1//! `/v1/formations` — CRUD handlers.
2//!
3//! POST validates the submitted `FormationDocument` per ADR-0010
4//! (single coordinator + every non-coordinator member carries
5//! `authorizedBy`). The full DAG/cycle/scope-narrowing admission gate
6//! lives in `cellos-supervisor`; we surface the same RFC 9457
7//! discriminants here so cellctl can render either source uniformly.
8
9use axum::extract::{Path, State};
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::Json;
13use cellos_core::events::{
14    cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
15    cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
16    cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
17};
18use serde::{Deserialize, Serialize};
19use uuid::Uuid;
20
21use crate::auth::require_bearer;
22use crate::error::{AppError, AppErrorKind};
23use crate::state::{AppState, FormationRecord, FormationStatus};
24
25/// Subset of the `formation-v1` document the admission gate cares about.
26/// Additional fields are tolerated and preserved verbatim in
27/// `FormationRecord::document` (via the captured `serde_json::Value`).
28///
29/// **Wire shapes accepted.** Operators may submit either:
30///
31/// 1. **Flat** (server-internal canonical form):
32///    `{ "name": "...", "coordinator": "...", "members": [ { "id": "...", "authorizedBy": "..." } ] }`
33/// 2. **Kubectl-style** (matches `contracts/schemas/formation-v1.schema.json`):
34///    `{ "apiVersion": "cellos.dev/v1", "kind": "Formation",
35///       "metadata": { "name": "..." },
36///       "spec": { "coordinator": "...", "members": [ { "name": "...", "authorizedBy": "..." } ] } }`
37///
38/// `normalize_formation_document` runs first; everything below operates on
39/// the canonical flat shape. See ADR-0010 §Enforcement for why admission
40/// re-runs server-side regardless of client behaviour.
41#[derive(Debug, Deserialize)]
42pub struct FormationDocument {
43    pub name: String,
44    pub coordinator: String,
45    pub members: Vec<FormationMember>,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct FormationMember {
50    pub id: String,
51    /// Required on every non-coordinator member; forbidden on the
52    /// coordinator (ADR-0010 §Enforcement).
53    #[serde(rename = "authorizedBy")]
54    pub authorized_by: Option<String>,
55}
56
57#[derive(Debug, Serialize)]
58pub struct FormationCreated {
59    pub id: Uuid,
60    pub name: String,
61    pub status: FormationStatus,
62}
63
64/// POST /v1/formations — admit a new formation. Returns 201 with the
65/// generated id on success; RFC 9457 problem+json on validation failure.
66pub async fn create_formation(
67    State(state): State<AppState>,
68    headers: HeaderMap,
69    body: axum::body::Bytes,
70) -> Result<impl IntoResponse, AppError> {
71    require_bearer(&headers, &state.api_token)?;
72
73    // Parse the wire payload, then normalize kubectl-style → flat. The
74    // canonical internal form is the flat `{name, coordinator, members:
75    // [{id, authorizedBy}]}` shape; the public schema documents the
76    // kubectl form. `normalize_formation_document` collapses both into
77    // the flat form before any admission validation runs, so existing
78    // ADR-0010 checks below operate on a single shape. We then parse
79    // twice: once into our validated struct, once kept as a generic
80    // Value (already-normalized) so GET echoes a stable internal shape.
81    let raw: serde_json::Value = serde_json::from_slice(&body)?;
82    let normalized = normalize_formation_document(&raw)?;
83    let doc: FormationDocument = serde_json::from_value(normalized.clone())?;
84
85    validate_formation(&doc)?;
86
87    let id = Uuid::new_v4();
88    let record = FormationRecord {
89        id,
90        name: doc.name.clone(),
91        status: FormationStatus::Pending,
92        // Store the normalized (flat) form so GET, replay projection,
93        // and downstream consumers see one stable shape regardless of
94        // whether the operator submitted kubectl-style or flat-style.
95        document: normalized,
96    };
97
98    state.formations.write().await.insert(id, record);
99
100    // Emit formation.v1.created so the WebSocket stream and audit log see it.
101    //
102    // EVT-CONTENT-001: the second positional argument is the CloudEvents 1.0
103    // `time` field (RFC3339 timestamp); published 0.5.0 incorrectly passed
104    // the formation UUID here, producing spec-non-compliant envelopes that
105    // failed schema-validating consumers (gateways, audit log, etc.).
106    let cell_count = doc.members.len() as u32;
107    let no_failed: &[String] = &[];
108    let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
109    let event = cloud_event_v1_formation_created(
110        &id.to_string(),
111        &now_rfc3339,
112        &id.to_string(),
113        &doc.name,
114        cell_count,
115        no_failed,
116        None,
117    );
118    let subject = format!("cellos.events.formations.{id}.created");
119    publish_event(&state, &subject, event).await;
120
121    let body = FormationCreated {
122        id,
123        name: doc.name,
124        status: FormationStatus::Pending,
125    };
126    Ok((StatusCode::CREATED, Json(body)))
127}
128
129/// Response shape for `GET /v1/formations` per ADR-0015 §D2. The
130/// `cursor` is the highest JetStream stream-sequence the server's
131/// projection has applied; clients hand it back as
132/// `/ws/events?since=<cursor>` so they can resume the live stream
133/// without missing any event between the snapshot and the WS open.
134#[derive(Debug, Serialize)]
135pub struct FormationsSnapshot {
136    pub formations: Vec<FormationRecord>,
137    pub cursor: u64,
138}
139
140/// GET /v1/formations — list all known formations plus the current
141/// projection cursor (ADR-0015).
142pub async fn list_formations(
143    State(state): State<AppState>,
144    headers: HeaderMap,
145) -> Result<Json<FormationsSnapshot>, AppError> {
146    require_bearer(&headers, &state.api_token)?;
147    let map = state.formations.read().await;
148    Ok(Json(FormationsSnapshot {
149        formations: map.values().cloned().collect(),
150        cursor: state.cursor(),
151    }))
152}
153
154/// GET /v1/formations/{id} — fetch one formation by uuid.
155pub async fn get_formation(
156    State(state): State<AppState>,
157    headers: HeaderMap,
158    Path(id): Path<Uuid>,
159) -> Result<Json<FormationRecord>, AppError> {
160    require_bearer(&headers, &state.api_token)?;
161    let map = state.formations.read().await;
162    map.get(&id)
163        .cloned()
164        .map(Json)
165        .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
166}
167
168/// GET /v1/formations/by-name/{name} — fetch one formation by name.
169///
170/// CTL-002 (E2E report): `cellctl describe formation <name>` and
171/// `cellctl delete formation <name>` previously sent the name verbatim
172/// to `/v1/formations/{id}` which rejected with `Invalid URL: UUID
173/// parsing failed`. This parallel route lets cellctl address formations
174/// by name without changing the existing UUID extractor on
175/// `/v1/formations/{id}` (no parser ambiguity, one round-trip).
176///
177/// Name uniqueness is NOT currently enforced at admission (see CTL-002
178/// follow-up); when multiple formations share a name this route returns
179/// the first match by UUID order (BTreeMap iteration order). That is a
180/// known limitation tracked separately from CTL-002.
181pub async fn get_formation_by_name(
182    State(state): State<AppState>,
183    headers: HeaderMap,
184    Path(name): Path<String>,
185) -> Result<Json<FormationRecord>, AppError> {
186    require_bearer(&headers, &state.api_token)?;
187    let map = state.formations.read().await;
188    map.values()
189        .find(|r| r.name == name)
190        .cloned()
191        .map(Json)
192        .ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))
193}
194
195/// DELETE /v1/formations/by-name/{name} — name-addressed counterpart of
196/// [`delete_formation`]. Looks up the formation by name and delegates to
197/// the same cancellation path so both routes emit the same
198/// `formation.v1.failed` event and surface identical projection state.
199pub async fn delete_formation_by_name(
200    State(state): State<AppState>,
201    headers: HeaderMap,
202    Path(name): Path<String>,
203) -> Result<StatusCode, AppError> {
204    require_bearer(&headers, &state.api_token)?;
205    // Resolve name → id under a read lock; release before re-acquiring
206    // the write lock in `delete_formation`. The two-step resolve is
207    // race-tolerant: if the formation is deleted between resolve and
208    // delegate, the second step surfaces the same 404 the UUID-addressed
209    // route would.
210    let id = {
211        let map = state.formations.read().await;
212        map.values()
213            .find(|r| r.name == name)
214            .map(|r| r.id)
215            .ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))?
216    };
217    delete_formation(State(state), headers, Path(id)).await
218}
219
220/// DELETE /v1/formations/{id} — best-effort cancellation. The actual
221/// teardown is performed asynchronously by the supervisor once the
222/// `formation.cancelled` event lands on JetStream; we only mark the
223/// in-memory projection.
224pub async fn delete_formation(
225    State(state): State<AppState>,
226    headers: HeaderMap,
227    Path(id): Path<Uuid>,
228) -> Result<StatusCode, AppError> {
229    require_bearer(&headers, &state.api_token)?;
230    let mut map = state.formations.write().await;
231    let (name, cell_count) = {
232        let entry = map
233            .get_mut(&id)
234            .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
235        entry.status = FormationStatus::Cancelled;
236        let members = entry
237            .document
238            .get("members")
239            .and_then(|m| m.as_array())
240            .map(|a| a.len() as u32)
241            .unwrap_or(0);
242        (entry.name.clone(), members)
243    };
244    drop(map);
245
246    let no_failed: &[String] = &[];
247    // EVT-CONTENT-001: second arg is the CloudEvents 1.0 `time` field
248    // (RFC3339); published 0.5.0 passed the UUID here. See create_formation.
249    let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
250    let event = cloud_event_v1_formation_failed(
251        &id.to_string(),
252        &now_rfc3339,
253        &id.to_string(),
254        &name,
255        cell_count,
256        no_failed,
257        Some("deleted by operator"),
258    );
259    let subject = format!("cellos.events.formations.{id}.failed");
260    publish_event(&state, &subject, event).await;
261
262    Ok(StatusCode::NO_CONTENT)
263}
264
265/// POST /v1/formations/{id}/status — receive a state-transition notification
266/// from the supervisor or an operator tool. Updates the in-memory projection
267/// and emits the matching `formation.v1.*` CloudEvent to NATS so the
268/// WebSocket stream carries it to connected web-view clients.
269#[derive(Debug, Deserialize)]
270pub struct StatusTransition {
271    pub state: String,
272    pub reason: Option<String>,
273    pub failed_cells: Option<Vec<String>>,
274}
275
276pub async fn update_formation_status(
277    State(state): State<AppState>,
278    headers: HeaderMap,
279    Path(id): Path<Uuid>,
280    Json(body): Json<StatusTransition>,
281) -> Result<StatusCode, AppError> {
282    require_bearer(&headers, &state.api_token)?;
283
284    let (new_status, name, cell_count, failed) = {
285        let mut map = state.formations.write().await;
286        let entry = map
287            .get_mut(&id)
288            .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
289
290        let new_status = match body.state.to_uppercase().as_str() {
291            "RUNNING" | "LAUNCHING" => FormationStatus::Running,
292            "DEGRADED" => FormationStatus::Running, // DEGRADED keeps running
293            "COMPLETED" => FormationStatus::Succeeded,
294            "FAILED" => FormationStatus::Failed,
295            other => {
296                // RFC-9457 §3.1: this is a generic bad-request, not an
297                // ADR-0010 admission-gate rejection. Returning the
298                // `FormationNoCoordinator` discriminant here would
299                // hijack a load-bearing identifier that clients switch
300                // on per ADR-0010 §Enforcement.
301                return Err(AppError::new(
302                    AppErrorKind::BadRequest,
303                    format!("unknown state: {other}"),
304                ));
305            }
306        };
307        entry.status = new_status;
308
309        let members = entry
310            .document
311            .get("members")
312            .and_then(|m| m.as_array())
313            .map(|a| a.len() as u32)
314            .unwrap_or(0);
315        let failed = body.failed_cells.unwrap_or_default();
316        (new_status, entry.name.clone(), members, failed)
317    };
318
319    let sid = id.to_string();
320    let reason = body.reason.as_deref();
321    let empty: &[String] = &[];
322    // EVT-CONTENT-001: the second positional arg to every
323    // `cloud_event_v1_formation_*` constructor is the CloudEvents 1.0 `time`
324    // field (RFC3339); published 0.5.0 incorrectly passed the formation
325    // UUID here. Capture one timestamp per HTTP request so all phases on a
326    // single state transition share an envelope time.
327    let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
328
329    let (event, phase) = match body.state.to_uppercase().as_str() {
330        "LAUNCHING" => (
331            cloud_event_v1_formation_launching(
332                &sid,
333                &now_rfc3339,
334                &sid,
335                &name,
336                cell_count,
337                empty,
338                reason,
339            ),
340            "launching",
341        ),
342        "RUNNING" => (
343            cloud_event_v1_formation_running(
344                &sid,
345                &now_rfc3339,
346                &sid,
347                &name,
348                cell_count,
349                empty,
350                reason,
351            ),
352            "running",
353        ),
354        "DEGRADED" => (
355            cloud_event_v1_formation_degraded(
356                &sid,
357                &now_rfc3339,
358                &sid,
359                &name,
360                cell_count,
361                &failed,
362                reason,
363            ),
364            "degraded",
365        ),
366        "COMPLETED" => (
367            cloud_event_v1_formation_completed(
368                &sid,
369                &now_rfc3339,
370                &sid,
371                &name,
372                cell_count,
373                empty,
374                reason,
375            ),
376            "completed",
377        ),
378        _ => (
379            cloud_event_v1_formation_failed(
380                &sid,
381                &now_rfc3339,
382                &sid,
383                &name,
384                cell_count,
385                &failed,
386                reason,
387            ),
388            "failed",
389        ),
390    };
391
392    let subject = format!("cellos.events.formations.{id}.{phase}");
393    publish_event(&state, &subject, event).await;
394
395    let _ = new_status; // used above
396    Ok(StatusCode::NO_CONTENT)
397}
398
399/// Publish a CloudEvent JSON payload to NATS if a client is connected.
400/// Failures are logged and swallowed — event loss is surfaced via the DLQ
401/// (P3-03) once that crate lands; the HTTP response is never blocked by NATS.
402async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
403    let Some(nats) = &state.nats else { return };
404    let payload = match serde_json::to_vec(&event) {
405        Ok(b) => b,
406        Err(e) => {
407            tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
408            return;
409        }
410    };
411    if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
412        tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
413    }
414}
415
416/// Detect the wire shape of an incoming formation document and
417/// normalize to the server's canonical flat form.
418///
419/// Two shapes are accepted (CTL-003 / SCHEMA-001 fix):
420///
421/// - **Flat** (canonical, what the server consumed historically):
422///   `{ "name", "coordinator", "members": [ { "id", "authorizedBy"? } ] }`
423/// - **Kubectl-style** (matches `contracts/schemas/formation-v1.schema.json`):
424///   `{ "apiVersion": "cellos.dev/v1", "kind": "Formation",
425///      "metadata": { "name", ... }, "spec": { "coordinator", "members": [...] } }`
426///
427/// Mapping (kubectl → flat):
428///
429/// | kubectl path                       | flat path                |
430/// |------------------------------------|--------------------------|
431/// | `metadata.name`                    | `name`                   |
432/// | `spec.coordinator`                 | `coordinator`            |
433/// | `spec.members[].name`              | `members[].id`           |
434/// | `spec.members[].authorizedBy`      | `members[].authorizedBy` |
435///
436/// **Hybrid documents are rejected.** A document carrying BOTH a
437/// top-level `name`/`coordinator`/`members` AND any of `apiVersion`,
438/// `kind`, `metadata`, `spec` is ambiguous: the operator likely meant
439/// one form but accidentally typed both. We surface a generic
440/// `/problems/bad-request` (RFC 9457 §3.1) listing the conflicting
441/// fields so cellctl can render a precise error.
442///
443/// `apiVersion` and `kind` MUST match the kubectl envelope literals
444/// (`cellos.dev/v1` and `Formation`); other values are rejected.
445///
446/// Non-object roots (arrays, strings, etc.) are passed through
447/// unchanged — the subsequent `serde_json::from_value::<FormationDocument>`
448/// will fail with the same descriptive error operators already see.
449fn normalize_formation_document(raw: &serde_json::Value) -> Result<serde_json::Value, AppError> {
450    // Detection rules.
451    //
452    // Flat signal:    top-level `name` or `members` (the two fields a
453    //                 flat document is required to carry).
454    // Kubectl signal: top-level `apiVersion`, `kind`, `metadata`, or
455    //                 `spec` (any of the four envelope fields).
456    //
457    // We look at the union so we can detect hybrids precisely.
458    let Some(obj) = raw.as_object() else {
459        // Non-object: let the downstream typed parse produce the
460        // canonical error message.
461        return Ok(raw.clone());
462    };
463
464    const FLAT_KEYS: &[&str] = &["name", "coordinator", "members"];
465    const KUBECTL_KEYS: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
466
467    let flat_keys_present: Vec<&str> = FLAT_KEYS
468        .iter()
469        .copied()
470        .filter(|k| obj.contains_key(*k))
471        .collect();
472    let kubectl_keys_present: Vec<&str> = KUBECTL_KEYS
473        .iter()
474        .copied()
475        .filter(|k| obj.contains_key(*k))
476        .collect();
477
478    let has_flat = !flat_keys_present.is_empty();
479    let has_kubectl = !kubectl_keys_present.is_empty();
480
481    if has_flat && has_kubectl {
482        return Err(AppError::bad_request(format!(
483            "hybrid formation document: top-level flat field(s) {flat:?} \
484             conflict with kubectl-style envelope field(s) {kubectl:?}; \
485             pick exactly one shape (see contracts/schemas/formation-v1.schema.json)",
486            flat = flat_keys_present,
487            kubectl = kubectl_keys_present,
488        )));
489    }
490
491    if !has_kubectl {
492        // No envelope fields → flat (or so malformed the typed parse
493        // will reject it). Pass through.
494        return Ok(raw.clone());
495    }
496
497    // Kubectl-style. Validate envelope literals.
498    let api_version = obj
499        .get("apiVersion")
500        .and_then(|v| v.as_str())
501        .ok_or_else(|| {
502            AppError::bad_request(
503                "kubectl-style formation: missing or non-string 'apiVersion' (expected \"cellos.dev/v1\")"
504                    .to_string(),
505            )
506        })?;
507    if api_version != "cellos.dev/v1" {
508        return Err(AppError::bad_request(format!(
509            "kubectl-style formation: unsupported apiVersion '{api_version}' (expected \"cellos.dev/v1\")"
510        )));
511    }
512
513    let kind = obj.get("kind").and_then(|v| v.as_str()).ok_or_else(|| {
514        AppError::bad_request(
515            "kubectl-style formation: missing or non-string 'kind' (expected \"Formation\")"
516                .to_string(),
517        )
518    })?;
519    if kind != "Formation" {
520        return Err(AppError::bad_request(format!(
521            "kubectl-style formation: unsupported kind '{kind}' (expected \"Formation\")"
522        )));
523    }
524
525    let metadata = obj
526        .get("metadata")
527        .and_then(|v| v.as_object())
528        .ok_or_else(|| {
529            AppError::bad_request("kubectl-style formation: missing 'metadata' object".to_string())
530        })?;
531    let name = metadata
532        .get("name")
533        .and_then(|v| v.as_str())
534        .ok_or_else(|| {
535            AppError::bad_request("kubectl-style formation: missing 'metadata.name'".to_string())
536        })?;
537
538    let spec = obj.get("spec").and_then(|v| v.as_object()).ok_or_else(|| {
539        AppError::bad_request("kubectl-style formation: missing 'spec' object".to_string())
540    })?;
541
542    let coordinator = spec
543        .get("coordinator")
544        .and_then(|v| v.as_str())
545        .ok_or_else(|| {
546            AppError::bad_request("kubectl-style formation: missing 'spec.coordinator'".to_string())
547        })?;
548
549    let members_raw = spec
550        .get("members")
551        .and_then(|v| v.as_array())
552        .ok_or_else(|| {
553            AppError::bad_request(
554                "kubectl-style formation: missing or non-array 'spec.members'".to_string(),
555            )
556        })?;
557
558    // Rewrite each member: `name` → `id`. `authorizedBy` carries
559    // through. Any extra fields (`critical`, `spec`, future fields)
560    // are preserved verbatim — the admission gate only inspects
561    // `id`/`authorizedBy`, but we keep the rest so downstream
562    // consumers (supervisor, projection) see what the operator wrote.
563    let mut members_flat = Vec::with_capacity(members_raw.len());
564    for (idx, m) in members_raw.iter().enumerate() {
565        let m_obj = m.as_object().ok_or_else(|| {
566            AppError::bad_request(format!(
567                "kubectl-style formation: spec.members[{idx}] is not an object"
568            ))
569        })?;
570        let member_name = m_obj.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
571            AppError::bad_request(format!(
572                "kubectl-style formation: spec.members[{idx}] missing 'name'"
573            ))
574        })?;
575
576        let mut rewritten = serde_json::Map::with_capacity(m_obj.len());
577        rewritten.insert(
578            "id".to_string(),
579            serde_json::Value::String(member_name.to_string()),
580        );
581        for (k, v) in m_obj.iter() {
582            if k == "name" {
583                continue; // already rewritten to `id`
584            }
585            rewritten.insert(k.clone(), v.clone());
586        }
587        members_flat.push(serde_json::Value::Object(rewritten));
588    }
589
590    let mut flat = serde_json::Map::with_capacity(3);
591    flat.insert(
592        "name".to_string(),
593        serde_json::Value::String(name.to_string()),
594    );
595    flat.insert(
596        "coordinator".to_string(),
597        serde_json::Value::String(coordinator.to_string()),
598    );
599    flat.insert(
600        "members".to_string(),
601        serde_json::Value::Array(members_flat),
602    );
603
604    Ok(serde_json::Value::Object(flat))
605}
606
607/// Apply the structural admission-gate checks ADR-0010 §Enforcement
608/// requires the server to re-run regardless of client behaviour:
609///
610/// 1. **noCoordinator** — the coordinator named in `coordinator` MUST
611///    appear in `members`.
612/// 2. **multipleCoordinators** — every `members[*].id` MUST be unique.
613///    The JSON schema declares `uniqueItems`; we re-enforce because
614///    the server cannot assume schema validation ran on the client.
615/// 3. **authorityNotNarrowing** — the coordinator MUST NOT carry
616///    `authorizedBy`; every non-coordinator MUST carry it AND the
617///    referenced parent MUST exist in `members` (an orphan parent is
618///    an unbounded A₀ — exactly the failure mode ADR-0010 §Proof
619///    forbids).
620/// 4. **cycle** — the `authorizedBy` edges MUST form a DAG. A cycle
621///    (including the self-edge `authorizedBy: self`) breaks the
622///    induction that proves every member's authority chains back to
623///    the coordinator.
624///
625/// The per-edge authority-subset check (`A_c ⊆ A_p`) lives in the
626/// supervisor today because the `formation-v1` document parsed here
627/// does not yet carry per-member declared authority sets; that is the
628/// only ADR-0010 check the server still defers.
629fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
630    use std::collections::{HashMap, HashSet};
631
632    // 1. noCoordinator.
633    let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
634    if !coord_present {
635        return Err(AppError::new(
636            AppErrorKind::FormationNoCoordinator,
637            format!(
638                "coordinator '{}' must appear in members list",
639                doc.coordinator
640            ),
641        ));
642    }
643
644    // 2. multipleCoordinators — duplicate member ids. We treat the
645    // duplicate-id failure under this discriminant because the ADR
646    // §Consequences canonical case is "two members both named
647    // `coord`": admission cannot pick which one is the coordinator.
648    let mut seen: HashSet<&str> = HashSet::new();
649    for m in &doc.members {
650        if !seen.insert(m.id.as_str()) {
651            return Err(AppError::new(
652                AppErrorKind::FormationMultipleCoordinators,
653                format!("duplicate member id '{}'", m.id),
654            ));
655        }
656    }
657
658    // 3. authorityNotNarrowing — coord-forbid, non-coord require, plus
659    //    orphan-parent rejection. An `authorizedBy` reference that has
660    //    no member entry has no parent edge → no narrowing → admission
661    //    fails.
662    for m in &doc.members {
663        let is_coord = m.id == doc.coordinator;
664        match (is_coord, &m.authorized_by) {
665            (true, Some(_)) => {
666                return Err(AppError::new(
667                    AppErrorKind::FormationAuthorityNotNarrowing,
668                    format!("coordinator '{}' must not declare authorizedBy", m.id),
669                ));
670            }
671            (false, None) => {
672                return Err(AppError::new(
673                    AppErrorKind::FormationAuthorityNotNarrowing,
674                    format!("non-coordinator member '{}' missing authorizedBy", m.id),
675                ));
676            }
677            (false, Some(parent)) => {
678                if !seen.contains(parent.as_str()) {
679                    return Err(AppError::new(
680                        AppErrorKind::FormationAuthorityNotNarrowing,
681                        format!("member '{}' references unknown parent '{}'", m.id, parent),
682                    ));
683                }
684            }
685            _ => {}
686        }
687    }
688
689    // 4. cycle — walk each non-coordinator's authorizedBy chain. In a
690    //    valid DAG with exactly one out-edge per non-root, the walk
691    //    terminates at the coordinator within strictly fewer hops than
692    //    members.len(). Self-loops are caught on the first hop.
693    let parent: HashMap<&str, &str> = doc
694        .members
695        .iter()
696        .filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
697        .collect();
698
699    for m in &doc.members {
700        if m.id == doc.coordinator {
701            continue;
702        }
703        let mut cursor = m.id.as_str();
704        for _ in 0..doc.members.len() {
705            let Some(&p) = parent.get(cursor) else {
706                // No outgoing edge from cursor → cursor is the
707                // coordinator (proven in check 1 to be present). Done.
708                break;
709            };
710            if p == m.id {
711                return Err(AppError::new(
712                    AppErrorKind::FormationCycle,
713                    format!("authorizedBy cycle detected involving member '{}'", m.id),
714                ));
715            }
716            cursor = p;
717        }
718        if parent.contains_key(cursor) {
719            // Exhausted hop budget without reaching the coordinator —
720            // a cycle exists on the chain (not necessarily through
721            // `m.id` itself).
722            return Err(AppError::new(
723                AppErrorKind::FormationCycle,
724                format!(
725                    "authorizedBy cycle detected on chain starting at '{}'",
726                    m.id
727                ),
728            ));
729        }
730    }
731
732    Ok(())
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738    use crate::router;
739    use axum::body::Body;
740    use axum::http::{header, Request};
741    use http_body_util::BodyExt;
742    use tower::ServiceExt;
743
744    const TOKEN: &str = "test-token";
745
746    fn test_state() -> AppState {
747        AppState::new(None, TOKEN)
748    }
749
750    fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
751        let mut b = Request::builder()
752            .method(method)
753            .uri(uri)
754            .header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
755        if body.is_some() {
756            b = b.header(header::CONTENT_TYPE, "application/json");
757        }
758        b.body(
759            body.map(|s| Body::from(s.to_owned()))
760                .unwrap_or_else(Body::empty),
761        )
762        .expect("build request")
763    }
764
765    #[tokio::test]
766    async fn post_valid_formation_returns_201() {
767        let app = router(test_state());
768        let body = serde_json::json!({
769            "name": "demo",
770            "coordinator": "coord",
771            "members": [
772                { "id": "coord" },
773                { "id": "worker-a", "authorizedBy": "coord" },
774                { "id": "worker-b", "authorizedBy": "coord" }
775            ]
776        })
777        .to_string();
778
779        let resp = app
780            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
781            .await
782            .expect("router response");
783        assert_eq!(resp.status(), StatusCode::CREATED);
784
785        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
786        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
787        assert_eq!(parsed["status"], "PENDING");
788        assert_eq!(parsed["name"], "demo");
789        assert!(parsed["id"].as_str().is_some());
790    }
791
792    #[tokio::test]
793    async fn post_formation_missing_coordinator_returns_400() {
794        let app = router(test_state());
795        // coordinator names "coord" but no such member exists.
796        let body = serde_json::json!({
797            "name": "demo",
798            "coordinator": "coord",
799            "members": [
800                { "id": "worker-a", "authorizedBy": "coord" }
801            ]
802        })
803        .to_string();
804
805        let resp = app
806            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
807            .await
808            .expect("router response");
809        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
810        let ct = resp
811            .headers()
812            .get(header::CONTENT_TYPE)
813            .and_then(|v| v.to_str().ok())
814            .unwrap_or_default()
815            .to_owned();
816        assert!(
817            ct.starts_with("application/problem+json"),
818            "expected RFC 9457 media type, got {ct:?}"
819        );
820        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
821        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
822        assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
823    }
824
825    #[tokio::test]
826    async fn post_formation_member_missing_authorized_by_returns_400() {
827        let app = router(test_state());
828        let body = serde_json::json!({
829            "name": "demo",
830            "coordinator": "coord",
831            "members": [
832                { "id": "coord" },
833                { "id": "worker-a" } // missing authorizedBy
834            ]
835        })
836        .to_string();
837
838        let resp = app
839            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
840            .await
841            .expect("router response");
842        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
843        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
844        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
845        assert_eq!(
846            parsed["type"], "/problems/formation/authority-not-narrowing",
847            "expected authority-not-narrowing discriminant, got {parsed}"
848        );
849    }
850
851    #[tokio::test]
852    async fn get_formations_returns_snapshot_with_cursor() {
853        // ADR-0015 §D2: GET /v1/formations is `{ formations: [...], cursor: u64 }`.
854        let app = router(test_state());
855        let resp = app
856            .oneshot(auth_req("GET", "/v1/formations", None))
857            .await
858            .expect("router response");
859        assert_eq!(resp.status(), StatusCode::OK);
860        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
861        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
862        assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
863        let arr = parsed["formations"].as_array().expect("formations array");
864        assert_eq!(arr.len(), 0);
865        assert!(
866            parsed["cursor"].is_u64(),
867            "cursor field must be an unsigned integer, got {}",
868            parsed["cursor"]
869        );
870        assert_eq!(parsed["cursor"].as_u64(), Some(0));
871    }
872
873    #[tokio::test]
874    async fn snapshot_returns_cursor() {
875        // ADR-0015 §D2 + §E: after POSTing a formation, the snapshot
876        // response MUST carry a `cursor` field of integer type so the
877        // client can hand it to `/ws/events?since=<cursor>`.
878        let app = router(test_state());
879        let body = serde_json::json!({
880            "name": "with-cursor",
881            "coordinator": "coord",
882            "members": [
883                { "id": "coord" },
884                { "id": "worker-a", "authorizedBy": "coord" }
885            ]
886        })
887        .to_string();
888
889        let resp = app
890            .clone()
891            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
892            .await
893            .expect("router response");
894        assert_eq!(resp.status(), StatusCode::CREATED);
895
896        let resp = app
897            .oneshot(auth_req("GET", "/v1/formations", None))
898            .await
899            .expect("router response");
900        assert_eq!(resp.status(), StatusCode::OK);
901        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
902        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
903        assert!(
904            parsed["cursor"].is_u64(),
905            "cursor must be unsigned integer; got {}",
906            parsed["cursor"]
907        );
908        let formations = parsed["formations"].as_array().expect("formations array");
909        assert_eq!(formations.len(), 1, "expected 1 formation after POST");
910        assert_eq!(formations[0]["name"], "with-cursor");
911    }
912
913    #[tokio::test]
914    async fn missing_bearer_returns_401() {
915        let app = router(test_state());
916        let resp = app
917            .oneshot(
918                Request::builder()
919                    .method("GET")
920                    .uri("/v1/formations")
921                    .body(Body::empty())
922                    .unwrap(),
923            )
924            .await
925            .expect("router response");
926        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
927    }
928
929    /// ADR-0010 §Enforcement `multipleCoordinators` discriminant.
930    /// Duplicate `members[*].id` MUST be rejected with this type even
931    /// when the duplicates carry valid `authorizedBy`. The JSON schema
932    /// has `uniqueItems` but the server cannot assume schema validation
933    /// ran on the client.
934    #[tokio::test]
935    async fn rejects_duplicate_member_ids_with_multiple_coordinators_type() {
936        let app = router(test_state());
937        let body = serde_json::json!({
938            "name": "dup-ids",
939            "coordinator": "coord",
940            "members": [
941                { "id": "coord" },
942                { "id": "coord", "authorizedBy": "coord" }
943            ]
944        })
945        .to_string();
946        let resp = app
947            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
948            .await
949            .expect("router response");
950        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
951        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
952        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
953        assert_eq!(
954            parsed["type"], "/problems/formation/multiple-coordinators",
955            "duplicate member ids must surface multipleCoordinators"
956        );
957    }
958
959    /// ADR-0010 §Enforcement `cycle` discriminant. `authorizedBy: self`
960    /// is the minimal cycle.
961    #[tokio::test]
962    async fn rejects_self_authorized_cycle() {
963        let app = router(test_state());
964        let body = serde_json::json!({
965            "name": "self-cycle",
966            "coordinator": "coord",
967            "members": [
968                { "id": "coord" },
969                { "id": "worker-a", "authorizedBy": "worker-a" }
970            ]
971        })
972        .to_string();
973        let resp = app
974            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
975            .await
976            .expect("router response");
977        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
978        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
979        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
980        assert_eq!(parsed["type"], "/problems/formation/cycle");
981    }
982
983    /// Two-node cycle a→b→a; neither chains back to coordinator.
984    #[tokio::test]
985    async fn rejects_two_node_cycle() {
986        let app = router(test_state());
987        let body = serde_json::json!({
988            "name": "two-cycle",
989            "coordinator": "coord",
990            "members": [
991                { "id": "coord" },
992                { "id": "a", "authorizedBy": "b" },
993                { "id": "b", "authorizedBy": "a" }
994            ]
995        })
996        .to_string();
997        let resp = app
998            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
999            .await
1000            .expect("router response");
1001        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1002        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1003        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1004        assert_eq!(parsed["type"], "/problems/formation/cycle");
1005    }
1006
1007    /// Orphan parent: `authorizedBy: ghost` where `ghost` is not in
1008    /// `members`. Without a parent edge the member has no narrowing
1009    /// path → `authorityNotNarrowing`.
1010    #[tokio::test]
1011    async fn rejects_orphan_parent_reference() {
1012        let app = router(test_state());
1013        let body = serde_json::json!({
1014            "name": "orphan-parent",
1015            "coordinator": "coord",
1016            "members": [
1017                { "id": "coord" },
1018                { "id": "worker-a", "authorizedBy": "ghost" }
1019            ]
1020        })
1021        .to_string();
1022        let resp = app
1023            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1024            .await
1025            .expect("router response");
1026        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1027        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1028        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1029        assert_eq!(
1030            parsed["type"],
1031            "/problems/formation/authority-not-narrowing"
1032        );
1033    }
1034
1035    /// Red-team finding: POST /v1/formations was using axum's default
1036    /// 2 MiB body limit. We cap at 64 KiB so a >64 KiB payload returns
1037    /// 413 Payload Too Large rather than burning CPU on serde parsing.
1038    #[tokio::test]
1039    async fn post_formation_oversized_body_returns_413() {
1040        let app = router(test_state());
1041        // Build a JSON document larger than 64 KiB by stuffing a long
1042        // `name` field. The body-limit layer rejects before serde
1043        // parses, so the document does not need to be semantically
1044        // valid past the limit.
1045        let big = "x".repeat(70 * 1024);
1046        let body =
1047            format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
1048        let resp = app
1049            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1050            .await
1051            .expect("router response");
1052        assert_eq!(
1053            resp.status(),
1054            StatusCode::PAYLOAD_TOO_LARGE,
1055            "oversized body must surface 413; got {:?}",
1056            resp.status(),
1057        );
1058    }
1059
1060    /// Sanity-probe: the parameterized route `/v1/formations/{id}`
1061    /// actually captures the path segment. Existing tests only hit the
1062    /// non-parameterized `/v1/formations`; if the route registration
1063    /// ever regresses to literal-brace matching (e.g. on a future axum
1064    /// upgrade that changes path syntax), this test fails loudly.
1065    ///
1066    /// We POST a real formation then GET it by id and check the
1067    /// returned record matches. A 404 with empty/non-problem+json
1068    /// content-type indicates router-level miss (literal route); a
1069    /// 404 with problem+json indicates handler-level not-found
1070    /// (route matched, formation absent). We expect 200.
1071    #[tokio::test]
1072    async fn get_formation_by_id_captures_path() {
1073        let state = test_state();
1074        let body = serde_json::json!({
1075            "name": "probe",
1076            "coordinator": "coord",
1077            "members": [
1078                { "id": "coord" },
1079                { "id": "worker-a", "authorizedBy": "coord" }
1080            ]
1081        })
1082        .to_string();
1083        let resp = router(state.clone())
1084            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1085            .await
1086            .expect("router response");
1087        assert_eq!(resp.status(), StatusCode::CREATED);
1088        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1089        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1090        let id = parsed["id"].as_str().expect("uuid string");
1091
1092        let resp = router(state)
1093            .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
1094            .await
1095            .expect("router response");
1096        assert_eq!(
1097            resp.status(),
1098            StatusCode::OK,
1099            "GET /v1/formations/<id> must capture the path segment; got {:?}",
1100            resp.status(),
1101        );
1102        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1103        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1104        assert_eq!(parsed["name"], "probe");
1105    }
1106
1107    /// CTL-003 / SCHEMA-001 — kubectl-style happy path.
1108    /// `contracts/schemas/formation-v1.schema.json` documents the
1109    /// kubectl envelope; the server now accepts it via an admission
1110    /// adapter and normalizes to flat internally.
1111    #[tokio::test]
1112    async fn post_kubectl_style_formation_returns_201() {
1113        let app = router(test_state());
1114        let body = serde_json::json!({
1115            "apiVersion": "cellos.dev/v1",
1116            "kind": "Formation",
1117            "metadata": { "name": "kubectl-demo" },
1118            "spec": {
1119                "coordinator": "coord",
1120                "members": [
1121                    { "name": "coord" },
1122                    { "name": "worker-a", "authorizedBy": "coord" },
1123                    { "name": "worker-b", "authorizedBy": "coord" }
1124                ]
1125            }
1126        })
1127        .to_string();
1128
1129        let resp = app
1130            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1131            .await
1132            .expect("router response");
1133        assert_eq!(resp.status(), StatusCode::CREATED);
1134
1135        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1136        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1137        assert_eq!(parsed["status"], "PENDING");
1138        assert_eq!(parsed["name"], "kubectl-demo");
1139        assert!(parsed["id"].as_str().is_some());
1140    }
1141
1142    /// Kubectl-style → ADR-0010 admission still fires on the
1143    /// normalized flat form. Missing-coordinator on a kubectl-shaped
1144    /// payload must surface the same `/problems/formation/no-coordinator`
1145    /// discriminant the flat path produces.
1146    #[tokio::test]
1147    async fn post_kubectl_style_missing_coordinator_returns_no_coordinator() {
1148        let app = router(test_state());
1149        let body = serde_json::json!({
1150            "apiVersion": "cellos.dev/v1",
1151            "kind": "Formation",
1152            "metadata": { "name": "missing-coord" },
1153            "spec": {
1154                "coordinator": "coord",
1155                "members": [
1156                    { "name": "worker-a", "authorizedBy": "coord" }
1157                ]
1158            }
1159        })
1160        .to_string();
1161
1162        let resp = app
1163            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1164            .await
1165            .expect("router response");
1166        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1167        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1168        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1169        assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
1170    }
1171
1172    /// Hybrid: top-level `name` + top-level `apiVersion`. Operator
1173    /// ambiguity — reject with 400 `/problems/bad-request` listing the
1174    /// conflicting fields.
1175    #[tokio::test]
1176    async fn post_hybrid_formation_returns_400_bad_request() {
1177        let app = router(test_state());
1178        let body = serde_json::json!({
1179            "apiVersion": "cellos.dev/v1",
1180            "kind": "Formation",
1181            "metadata": { "name": "hybrid" },
1182            "spec": {
1183                "coordinator": "coord",
1184                "members": [ { "name": "coord" } ]
1185            },
1186            // Stray flat-style field — operator confused two shapes.
1187            "name": "hybrid",
1188            "members": [ { "id": "coord" } ]
1189        })
1190        .to_string();
1191
1192        let resp = app
1193            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1194            .await
1195            .expect("router response");
1196        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1197        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1198        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1199        assert_eq!(
1200            parsed["type"], "/problems/bad-request",
1201            "hybrid shape must surface a generic bad-request, not an admission discriminant"
1202        );
1203        let detail = parsed["detail"].as_str().unwrap_or_default();
1204        assert!(
1205            detail.contains("hybrid"),
1206            "detail must mention 'hybrid'; got {detail:?}"
1207        );
1208    }
1209
1210    /// Kubectl-style with wrong `apiVersion` is rejected as bad-request
1211    /// before admission runs.
1212    #[tokio::test]
1213    async fn post_kubectl_style_wrong_api_version_returns_400() {
1214        let app = router(test_state());
1215        let body = serde_json::json!({
1216            "apiVersion": "cellos.dev/v2",
1217            "kind": "Formation",
1218            "metadata": { "name": "wrong-api" },
1219            "spec": {
1220                "coordinator": "coord",
1221                "members": [ { "name": "coord" } ]
1222            }
1223        })
1224        .to_string();
1225
1226        let resp = app
1227            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1228            .await
1229            .expect("router response");
1230        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1231        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1232        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1233        assert_eq!(parsed["type"], "/problems/bad-request");
1234        let detail = parsed["detail"].as_str().unwrap_or_default();
1235        assert!(
1236            detail.contains("apiVersion") && detail.contains("cellos.dev/v2"),
1237            "detail must name the bad apiVersion; got {detail:?}"
1238        );
1239    }
1240
1241    /// Kubectl-style with wrong `kind` is rejected as bad-request.
1242    #[tokio::test]
1243    async fn post_kubectl_style_wrong_kind_returns_400() {
1244        let app = router(test_state());
1245        let body = serde_json::json!({
1246            "apiVersion": "cellos.dev/v1",
1247            "kind": "Cell",
1248            "metadata": { "name": "wrong-kind" },
1249            "spec": {
1250                "coordinator": "coord",
1251                "members": [ { "name": "coord" } ]
1252            }
1253        })
1254        .to_string();
1255
1256        let resp = app
1257            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1258            .await
1259            .expect("router response");
1260        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1261        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1262        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1263        assert_eq!(parsed["type"], "/problems/bad-request");
1264        let detail = parsed["detail"].as_str().unwrap_or_default();
1265        assert!(
1266            detail.contains("kind") && detail.contains("Cell"),
1267            "detail must name the bad kind; got {detail:?}"
1268        );
1269    }
1270
1271    /// After a kubectl-style POST, the GET round-trip MUST echo the
1272    /// normalized (flat) shape so downstream consumers see one stable
1273    /// document layout.
1274    #[tokio::test]
1275    async fn kubectl_style_post_then_get_returns_normalized_flat_document() {
1276        let state = test_state();
1277        let body = serde_json::json!({
1278            "apiVersion": "cellos.dev/v1",
1279            "kind": "Formation",
1280            "metadata": { "name": "roundtrip" },
1281            "spec": {
1282                "coordinator": "coord",
1283                "members": [
1284                    { "name": "coord" },
1285                    { "name": "worker-a", "authorizedBy": "coord" }
1286                ]
1287            }
1288        })
1289        .to_string();
1290
1291        let resp = router(state.clone())
1292            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1293            .await
1294            .expect("router response");
1295        assert_eq!(resp.status(), StatusCode::CREATED);
1296        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1297        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1298        let id = parsed["id"].as_str().expect("uuid string");
1299
1300        let resp = router(state)
1301            .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
1302            .await
1303            .expect("router response");
1304        assert_eq!(resp.status(), StatusCode::OK);
1305        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1306        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1307        let doc = &parsed["document"];
1308        assert_eq!(doc["name"], "roundtrip", "flat 'name' present");
1309        assert_eq!(doc["coordinator"], "coord", "flat 'coordinator' present");
1310        let members = doc["members"]
1311            .as_array()
1312            .expect("members array on normalized doc");
1313        assert_eq!(members.len(), 2);
1314        assert_eq!(members[0]["id"], "coord");
1315        assert_eq!(members[1]["id"], "worker-a");
1316        assert_eq!(members[1]["authorizedBy"], "coord");
1317        // Envelope fields stripped on normalization.
1318        assert!(
1319            doc.get("apiVersion").is_none(),
1320            "kubectl envelope must not leak into normalized doc"
1321        );
1322        assert!(doc.get("kind").is_none());
1323        assert!(doc.get("metadata").is_none());
1324        assert!(doc.get("spec").is_none());
1325    }
1326
1327    /// Red-team finding: `update_formation_status` previously returned
1328    /// the ADR-0010 `no-coordinator` discriminant for unknown state
1329    /// strings, hijacking a load-bearing admission-gate identifier.
1330    /// Unknown state is a generic bad-request.
1331    #[tokio::test]
1332    async fn unknown_state_returns_bad_request_problem_type() {
1333        let state = test_state();
1334        let body = serde_json::json!({
1335            "name": "demo",
1336            "coordinator": "coord",
1337            "members": [
1338                { "id": "coord" },
1339                { "id": "worker-a", "authorizedBy": "coord" }
1340            ]
1341        })
1342        .to_string();
1343        let resp = router(state.clone())
1344            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1345            .await
1346            .expect("router response");
1347        assert_eq!(resp.status(), StatusCode::CREATED);
1348        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1349        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1350        let id = parsed["id"].as_str().expect("uuid string").to_owned();
1351
1352        let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
1353        let resp = router(state)
1354            .oneshot(auth_req(
1355                "POST",
1356                &format!("/v1/formations/{id}/status"),
1357                Some(&bad),
1358            ))
1359            .await
1360            .expect("router response");
1361        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1362        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1363        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1364        assert_eq!(
1365            parsed["type"], "/problems/bad-request",
1366            "unknown state must surface generic bad-request, not an ADR-0010 discriminant"
1367        );
1368    }
1369}