Skip to main content

cellos_server/routes/
formations.rs

1//! `/v1/formations` — CRUD handlers.
2//!
3//! POST validates the submitted `FormationDocument` per ADR-0010
4//! (single coordinator + every non-coordinator member carries
5//! `authorizedBy`). The full DAG/cycle/scope-narrowing admission gate
6//! lives in `cellos-supervisor`; we surface the same RFC 9457
7//! discriminants here so cellctl can render either source uniformly.
8
9use axum::extract::{Path, State};
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::Json;
13use cellos_core::events::{
14    cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
15    cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
16    cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
17};
18use serde::{Deserialize, Serialize};
19use uuid::Uuid;
20
21use crate::auth::require_bearer;
22use crate::error::{AppError, AppErrorKind};
23use crate::state::{AppState, FormationRecord, FormationStatus};
24
25/// Subset of the `formation-v1` document the admission gate cares about.
26/// Additional fields are tolerated and preserved verbatim in
27/// `FormationRecord::document` (via the captured `serde_json::Value`).
28///
29/// **Wire shapes accepted.** Operators may submit either:
30///
31/// 1. **Flat** (server-internal canonical form):
32///    `{ "name": "...", "coordinator": "...", "members": [ { "id": "...", "authorizedBy": "..." } ] }`
33/// 2. **Kubectl-style** (matches `contracts/schemas/formation-v1.schema.json`):
34///    `{ "apiVersion": "cellos.dev/v1", "kind": "Formation",
35///       "metadata": { "name": "..." },
36///       "spec": { "coordinator": "...", "members": [ { "name": "...", "authorizedBy": "..." } ] } }`
37///
38/// `normalize_formation_document` runs first; everything below operates on
39/// the canonical flat shape. See ADR-0010 §Enforcement for why admission
40/// re-runs server-side regardless of client behaviour.
41#[derive(Debug, Deserialize)]
42pub struct FormationDocument {
43    pub name: String,
44    pub coordinator: String,
45    pub members: Vec<FormationMember>,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct FormationMember {
50    pub id: String,
51    /// Required on every non-coordinator member; forbidden on the
52    /// coordinator (ADR-0010 §Enforcement).
53    #[serde(rename = "authorizedBy")]
54    pub authorized_by: Option<String>,
55}
56
57#[derive(Debug, Serialize)]
58pub struct FormationCreated {
59    pub id: Uuid,
60    pub name: String,
61    pub status: FormationStatus,
62}
63
64/// POST /v1/formations — admit a new formation. Returns 201 with the
65/// generated id on success; RFC 9457 problem+json on validation failure.
66pub async fn create_formation(
67    State(state): State<AppState>,
68    headers: HeaderMap,
69    body: axum::body::Bytes,
70) -> Result<impl IntoResponse, AppError> {
71    require_bearer(&headers, &state.api_token)?;
72
73    // Parse the wire payload, then normalize kubectl-style → flat. The
74    // canonical internal form is the flat `{name, coordinator, members:
75    // [{id, authorizedBy}]}` shape; the public schema documents the
76    // kubectl form. `normalize_formation_document` collapses both into
77    // the flat form before any admission validation runs, so existing
78    // ADR-0010 checks below operate on a single shape. We then parse
79    // twice: once into our validated struct, once kept as a generic
80    // Value (already-normalized) so GET echoes a stable internal shape.
81    let raw: serde_json::Value = serde_json::from_slice(&body)?;
82    let normalized = normalize_formation_document(&raw)?;
83    let doc: FormationDocument = serde_json::from_value(normalized.clone())?;
84
85    // FUZZ-HIGH-1: name-validation MUST run before structural admission.
86    // The fuzz wave admitted `""`, `"   "`, and `"a\nb"` as formation
87    // names — names then corrupted by-name lookup (URL routing can't
88    // carry newlines), log lines, and cellctl rendering. Reject hostile
89    // names with the generic `/problems/bad-request` discriminant before
90    // any other check so the operator sees the precise rule violated.
91    validate_formation_name(&doc.name)?;
92
93    validate_formation(&doc)?;
94
95    // FUZZ-HIGH-2: enforce name uniqueness at admission. Without this
96    // check two formations can share a name; `GET /v1/formations/by-name/{name}`
97    // then returns the first match by UUID order and silently hides the
98    // rest. Both lookup and admission must agree that names are unique.
99    //
100    // The duplicate-name check and the insert happen under the SAME
101    // write lock so two concurrent admissions with the same name cannot
102    // both succeed (TOCTOU: check-then-insert under a read lock would
103    // race). The sibling FIX-RT3-HIGH-3 hardens
104    // `delete_formation_by_name` against legacy duplicates that may
105    // already exist in long-lived projections.
106    let id = Uuid::new_v4();
107    let record = FormationRecord {
108        id,
109        name: doc.name.clone(),
110        status: FormationStatus::Pending,
111        // Store the normalized (flat) form so GET, replay projection,
112        // and downstream consumers see one stable shape regardless of
113        // whether the operator submitted kubectl-style or flat-style.
114        document: normalized,
115    };
116
117    {
118        let mut map = state.formations.write().await;
119        if let Some(existing) = map.values().find(|r| r.name == doc.name) {
120            return Err(AppError::new(
121                AppErrorKind::Conflict,
122                format!(
123                    "formation name '{}' already in use by {}",
124                    doc.name, existing.id
125                ),
126            ));
127        }
128        map.insert(id, record);
129    }
130
131    // Emit formation.v1.created so the WebSocket stream and audit log see it.
132    //
133    // EVT-CONTENT-001: the second positional argument is the CloudEvents 1.0
134    // `time` field (RFC3339 timestamp); published 0.5.0 incorrectly passed
135    // the formation UUID here, producing spec-non-compliant envelopes that
136    // failed schema-validating consumers (gateways, audit log, etc.).
137    let cell_count = doc.members.len() as u32;
138    let no_failed: &[String] = &[];
139    let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
140    let event = cloud_event_v1_formation_created(
141        &id.to_string(),
142        &now_rfc3339,
143        &id.to_string(),
144        &doc.name,
145        cell_count,
146        no_failed,
147        None,
148    );
149    let subject = format!("cellos.events.formations.{id}.created");
150    publish_event(&state, &subject, event).await;
151
152    let body = FormationCreated {
153        id,
154        name: doc.name,
155        status: FormationStatus::Pending,
156    };
157    Ok((StatusCode::CREATED, Json(body)))
158}
159
160/// Response shape for `GET /v1/formations` per ADR-0015 §D2. The
161/// `cursor` is the highest JetStream stream-sequence the server's
162/// projection has applied; clients hand it back as
163/// `/ws/events?since=<cursor>` so they can resume the live stream
164/// without missing any event between the snapshot and the WS open.
165#[derive(Debug, Serialize)]
166pub struct FormationsSnapshot {
167    pub formations: Vec<FormationRecord>,
168    pub cursor: u64,
169}
170
171/// GET /v1/formations — list all known formations plus the current
172/// projection cursor (ADR-0015).
173pub async fn list_formations(
174    State(state): State<AppState>,
175    headers: HeaderMap,
176) -> Result<Json<FormationsSnapshot>, AppError> {
177    require_bearer(&headers, &state.api_token)?;
178    let map = state.formations.read().await;
179    Ok(Json(FormationsSnapshot {
180        formations: map.values().cloned().collect(),
181        cursor: state.cursor(),
182    }))
183}
184
185/// GET /v1/formations/{id} — fetch one formation by uuid.
186pub async fn get_formation(
187    State(state): State<AppState>,
188    headers: HeaderMap,
189    Path(id): Path<Uuid>,
190) -> Result<Json<FormationRecord>, AppError> {
191    require_bearer(&headers, &state.api_token)?;
192    let map = state.formations.read().await;
193    map.get(&id)
194        .cloned()
195        .map(Json)
196        .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
197}
198
199/// GET /v1/formations/by-name/{name} — fetch one formation by name.
200///
201/// CTL-002 (E2E report): `cellctl describe formation <name>` and
202/// `cellctl delete formation <name>` previously sent the name verbatim
203/// to `/v1/formations/{id}` which rejected with `Invalid URL: UUID
204/// parsing failed`. This parallel route lets cellctl address formations
205/// by name without changing the existing UUID extractor on
206/// `/v1/formations/{id}` (no parser ambiguity, one round-trip).
207///
208/// Name uniqueness is NOT currently enforced at admission (see CTL-002
209/// follow-up); when multiple formations share a name this route returns
210/// the first match by UUID order (BTreeMap iteration order). That is a
211/// known limitation tracked separately from CTL-002.
212pub async fn get_formation_by_name(
213    State(state): State<AppState>,
214    headers: HeaderMap,
215    Path(name): Path<String>,
216) -> Result<Json<FormationRecord>, AppError> {
217    require_bearer(&headers, &state.api_token)?;
218    let map = state.formations.read().await;
219    map.values()
220        .find(|r| r.name == name)
221        .cloned()
222        .map(Json)
223        .ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))
224}
225
226/// DELETE /v1/formations/by-name/{name} — name-addressed counterpart of
227/// [`delete_formation`]. Looks up the formation by name and delegates to
228/// the same cancellation path so both routes emit the same
229/// `formation.v1.failed` event and surface identical projection state.
230pub async fn delete_formation_by_name(
231    State(state): State<AppState>,
232    headers: HeaderMap,
233    Path(name): Path<String>,
234) -> Result<StatusCode, AppError> {
235    require_bearer(&headers, &state.api_token)?;
236    // Resolve name → id under a read lock; release before re-acquiring
237    // the write lock in `delete_formation`. The two-step resolve is
238    // race-tolerant: if the formation is deleted between resolve and
239    // delegate, the second step surfaces the same 404 the UUID-addressed
240    // route would.
241    //
242    // RT3-HIGH-3 (CTL-002-A): admission today does not yet enforce
243    // name-uniqueness across formations (sibling fix-agent stream). When
244    // two formations share a name, picking the BTreeMap-first UUID and
245    // deleting THAT one is a silent wrong-deletion — exactly the
246    // operator-trust failure the red-team flagged for DELETE. Defense
247    // in depth: enumerate all matches; if there is more than one,
248    // refuse and force the operator to disambiguate by UUID. (GET by
249    // name still picks first; that is read-only and low-stakes.)
250    let id = {
251        let map = state.formations.read().await;
252        let matches: Vec<Uuid> = map
253            .values()
254            .filter(|r| r.name == name)
255            .map(|r| r.id)
256            .collect();
257        match matches.len() {
258            0 => return Err(AppError::not_found(format!("formation '{name}' not found"))),
259            1 => matches[0],
260            _ => {
261                // Sort so the detail string is deterministic across
262                // BTreeMap-iteration variants (the test relies on this).
263                let mut ids = matches;
264                ids.sort();
265                let id_list = ids
266                    .iter()
267                    .map(Uuid::to_string)
268                    .collect::<Vec<_>>()
269                    .join(", ");
270                return Err(AppError::new(
271                    AppErrorKind::Conflict,
272                    format!(
273                        "multiple formations share name '{name}': [{id_list}]; \
274                         delete by UUID via /v1/formations/{{id}} to disambiguate"
275                    ),
276                ));
277            }
278        }
279    };
280    delete_formation(State(state), headers, Path(id)).await
281}
282
283/// DELETE /v1/formations/{id} — best-effort cancellation. The actual
284/// teardown is performed asynchronously by the supervisor once the
285/// `formation.cancelled` event lands on JetStream; we only mark the
286/// in-memory projection.
287pub async fn delete_formation(
288    State(state): State<AppState>,
289    headers: HeaderMap,
290    Path(id): Path<Uuid>,
291) -> Result<StatusCode, AppError> {
292    require_bearer(&headers, &state.api_token)?;
293    let mut map = state.formations.write().await;
294    let (name, cell_count) = {
295        let entry = map
296            .get_mut(&id)
297            .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
298        entry.status = FormationStatus::Cancelled;
299        let members = entry
300            .document
301            .get("members")
302            .and_then(|m| m.as_array())
303            .map(|a| a.len() as u32)
304            .unwrap_or(0);
305        (entry.name.clone(), members)
306    };
307    drop(map);
308
309    let no_failed: &[String] = &[];
310    // EVT-CONTENT-001: second arg is the CloudEvents 1.0 `time` field
311    // (RFC3339); published 0.5.0 passed the UUID here. See create_formation.
312    let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
313    let event = cloud_event_v1_formation_failed(
314        &id.to_string(),
315        &now_rfc3339,
316        &id.to_string(),
317        &name,
318        cell_count,
319        no_failed,
320        Some("deleted by operator"),
321    );
322    let subject = format!("cellos.events.formations.{id}.failed");
323    publish_event(&state, &subject, event).await;
324
325    Ok(StatusCode::NO_CONTENT)
326}
327
328/// POST /v1/formations/{id}/status — receive a state-transition notification
329/// from the supervisor or an operator tool. Updates the in-memory projection
330/// and emits the matching `formation.v1.*` CloudEvent to NATS so the
331/// WebSocket stream carries it to connected web-view clients.
332#[derive(Debug, Deserialize)]
333pub struct StatusTransition {
334    pub state: String,
335    pub reason: Option<String>,
336    pub failed_cells: Option<Vec<String>>,
337}
338
339pub async fn update_formation_status(
340    State(state): State<AppState>,
341    headers: HeaderMap,
342    Path(id): Path<Uuid>,
343    Json(body): Json<StatusTransition>,
344) -> Result<StatusCode, AppError> {
345    require_bearer(&headers, &state.api_token)?;
346
347    // EVT-CONTENT-001-C: capture the CloudEvents `time` field BEFORE
348    // we take the write lock so it reflects when the state transition
349    // *happened*, not when this task happened to win the lock. Under
350    // load the lock can be held by sibling /status writers for hundreds
351    // of milliseconds; capturing `time` after release skewed CloudEvent
352    // ordering in audit consumers (they could see two events with the
353    // same `id` but a `time` ordering that disagreed with stream-sequence
354    // ordering). The replacement guarantees `time` is monotone with
355    // request arrival.
356    //
357    // The wave-4 source-position test `update_formation_status_captures
358    // _time_before_lock` asserts this ordering textually so a future
359    // refactor that moves the capture back below the lock fails loudly.
360    let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
361
362    let (new_status, name, cell_count, failed) = {
363        let mut map = state.formations.write().await;
364        let entry = map
365            .get_mut(&id)
366            .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
367
368        let new_status = match body.state.to_uppercase().as_str() {
369            "RUNNING" | "LAUNCHING" => FormationStatus::Running,
370            "DEGRADED" => FormationStatus::Running, // DEGRADED keeps running
371            "COMPLETED" => FormationStatus::Succeeded,
372            "FAILED" => FormationStatus::Failed,
373            other => {
374                // RFC-9457 §3.1: this is a generic bad-request, not an
375                // ADR-0010 admission-gate rejection. Returning the
376                // `FormationNoCoordinator` discriminant here would
377                // hijack a load-bearing identifier that clients switch
378                // on per ADR-0010 §Enforcement.
379                return Err(AppError::new(
380                    AppErrorKind::BadRequest,
381                    format!("unknown state: {other}"),
382                ));
383            }
384        };
385        entry.status = new_status;
386
387        let members = entry
388            .document
389            .get("members")
390            .and_then(|m| m.as_array())
391            .map(|a| a.len() as u32)
392            .unwrap_or(0);
393        let failed = body.failed_cells.unwrap_or_default();
394        (new_status, entry.name.clone(), members, failed)
395    };
396
397    let sid = id.to_string();
398    let reason = body.reason.as_deref();
399    let empty: &[String] = &[];
400    // EVT-CONTENT-001: the second positional arg to every
401    // `cloud_event_v1_formation_*` constructor is the CloudEvents 1.0
402    // `time` field (RFC3339); published 0.5.0 incorrectly passed the
403    // formation UUID here. EVT-CONTENT-001-C moved the capture above
404    // the write lock — `now_rfc3339` is reused below so all phases on
405    // a single state transition share one envelope time and that time
406    // reflects request arrival, not lock-release.
407
408    let (event, phase) = match body.state.to_uppercase().as_str() {
409        "LAUNCHING" => (
410            cloud_event_v1_formation_launching(
411                &sid,
412                &now_rfc3339,
413                &sid,
414                &name,
415                cell_count,
416                empty,
417                reason,
418            ),
419            "launching",
420        ),
421        "RUNNING" => (
422            cloud_event_v1_formation_running(
423                &sid,
424                &now_rfc3339,
425                &sid,
426                &name,
427                cell_count,
428                empty,
429                reason,
430            ),
431            "running",
432        ),
433        "DEGRADED" => (
434            cloud_event_v1_formation_degraded(
435                &sid,
436                &now_rfc3339,
437                &sid,
438                &name,
439                cell_count,
440                &failed,
441                reason,
442            ),
443            "degraded",
444        ),
445        "COMPLETED" => (
446            cloud_event_v1_formation_completed(
447                &sid,
448                &now_rfc3339,
449                &sid,
450                &name,
451                cell_count,
452                empty,
453                reason,
454            ),
455            "completed",
456        ),
457        _ => (
458            cloud_event_v1_formation_failed(
459                &sid,
460                &now_rfc3339,
461                &sid,
462                &name,
463                cell_count,
464                &failed,
465                reason,
466            ),
467            "failed",
468        ),
469    };
470
471    let subject = format!("cellos.events.formations.{id}.{phase}");
472    publish_event(&state, &subject, event).await;
473
474    let _ = new_status; // used above
475    Ok(StatusCode::NO_CONTENT)
476}
477
478/// Publish a CloudEvent JSON payload to NATS if a client is connected.
479/// Failures are logged and swallowed — event loss is surfaced via the DLQ
480/// (P3-03) once that crate lands; the HTTP response is never blocked by NATS.
481async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
482    let Some(nats) = &state.nats else { return };
483    let payload = match serde_json::to_vec(&event) {
484        Ok(b) => b,
485        Err(e) => {
486            tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
487            return;
488        }
489    };
490    if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
491        tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
492    }
493}
494
495/// Detect the wire shape of an incoming formation document and
496/// normalize to the server's canonical flat form.
497///
498/// Two shapes are accepted (CTL-003 / SCHEMA-001 fix):
499///
500/// - **Flat** (canonical, what the server consumed historically):
501///   `{ "name", "coordinator", "members": [ { "id", "authorizedBy"? } ] }`
502/// - **Kubectl-style** (matches `contracts/schemas/formation-v1.schema.json`):
503///   `{ "apiVersion": "cellos.dev/v1", "kind": "Formation",
504///      "metadata": { "name", ... }, "spec": { "coordinator", "members": [...] } }`
505///
506/// Mapping (kubectl → flat):
507///
508/// | kubectl path                       | flat path                |
509/// |------------------------------------|--------------------------|
510/// | `metadata.name`                    | `name`                   |
511/// | `spec.coordinator`                 | `coordinator`            |
512/// | `spec.members[].name`              | `members[].id`           |
513/// | `spec.members[].authorizedBy`      | `members[].authorizedBy` |
514///
515/// **Hybrid documents are rejected.** A document carrying BOTH a
516/// top-level `name`/`coordinator`/`members` AND any of `apiVersion`,
517/// `kind`, `metadata`, `spec` is ambiguous: the operator likely meant
518/// one form but accidentally typed both. We surface a generic
519/// `/problems/bad-request` (RFC 9457 §3.1) listing the conflicting
520/// fields so cellctl can render a precise error.
521///
522/// `apiVersion` and `kind` MUST match the kubectl envelope literals
523/// (`cellos.dev/v1` and `Formation`); other values are rejected.
524///
525/// Non-object roots (arrays, strings, etc.) are passed through
526/// unchanged — the subsequent `serde_json::from_value::<FormationDocument>`
527/// will fail with the same descriptive error operators already see.
528fn normalize_formation_document(raw: &serde_json::Value) -> Result<serde_json::Value, AppError> {
529    // Detection rules.
530    //
531    // Flat signal:    top-level `name` or `members` (the two fields a
532    //                 flat document is required to carry).
533    // Kubectl signal: top-level `apiVersion`, `kind`, `metadata`, or
534    //                 `spec` (any of the four envelope fields).
535    //
536    // We look at the union so we can detect hybrids precisely.
537    let Some(obj) = raw.as_object() else {
538        // Non-object: let the downstream typed parse produce the
539        // canonical error message.
540        return Ok(raw.clone());
541    };
542
543    const FLAT_KEYS: &[&str] = &["name", "coordinator", "members"];
544    const KUBECTL_KEYS: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
545
546    // CTL-003-B: unknown top-level keys in a kubectl-style document are
547    // operator typos or projection drift (e.g. someone splicing the
548    // server-side `status` block from a GET response back into a POST
549    // body). Silently dropping them is the worst failure mode — the
550    // operator thinks they applied A but the server admitted B. We
551    // reject early with `/problems/bad-request` and name the offending
552    // field so cellctl can render a precise error. The check runs on
553    // the kubectl-signal arm only; the flat arm is the server's
554    // historical contract and tolerating its extras would silently
555    // break existing operators on upgrade.
556    const KUBECTL_ALLOWED: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
557
558    let flat_keys_present: Vec<&str> = FLAT_KEYS
559        .iter()
560        .copied()
561        .filter(|k| obj.contains_key(*k))
562        .collect();
563    let kubectl_keys_present: Vec<&str> = KUBECTL_KEYS
564        .iter()
565        .copied()
566        .filter(|k| obj.contains_key(*k))
567        .collect();
568
569    let has_flat = !flat_keys_present.is_empty();
570    let has_kubectl = !kubectl_keys_present.is_empty();
571
572    if has_flat && has_kubectl {
573        return Err(AppError::bad_request(format!(
574            "hybrid formation document: top-level flat field(s) {flat:?} \
575             conflict with kubectl-style envelope field(s) {kubectl:?}; \
576             pick exactly one shape (see contracts/schemas/formation-v1.schema.json)",
577            flat = flat_keys_present,
578            kubectl = kubectl_keys_present,
579        )));
580    }
581
582    if !has_kubectl {
583        // No envelope fields → flat (or so malformed the typed parse
584        // will reject it). Pass through.
585        return Ok(raw.clone());
586    }
587
588    // CTL-003-B: kubectl-style. Reject unknown top-level keys before
589    // anything else so the operator sees a precise error and the
590    // dropped-field failure mode is impossible by construction.
591    let unknown_top_level: Vec<&str> = obj
592        .keys()
593        .map(|s| s.as_str())
594        .filter(|k| !KUBECTL_ALLOWED.contains(k))
595        .collect();
596    if !unknown_top_level.is_empty() {
597        return Err(AppError::bad_request(format!(
598            "kubectl-style formation: unknown top-level field(s) {unknown_top_level:?}; \
599             allowed: {KUBECTL_ALLOWED:?}",
600        )));
601    }
602
603    // Kubectl-style. Validate envelope literals.
604    let api_version = obj
605        .get("apiVersion")
606        .and_then(|v| v.as_str())
607        .ok_or_else(|| {
608            AppError::bad_request(
609                "kubectl-style formation: missing or non-string 'apiVersion' (expected \"cellos.dev/v1\")"
610                    .to_string(),
611            )
612        })?;
613    if api_version != "cellos.dev/v1" {
614        return Err(AppError::bad_request(format!(
615            "kubectl-style formation: unsupported apiVersion '{api_version}' (expected \"cellos.dev/v1\")"
616        )));
617    }
618
619    let kind = obj.get("kind").and_then(|v| v.as_str()).ok_or_else(|| {
620        AppError::bad_request(
621            "kubectl-style formation: missing or non-string 'kind' (expected \"Formation\")"
622                .to_string(),
623        )
624    })?;
625    if kind != "Formation" {
626        return Err(AppError::bad_request(format!(
627            "kubectl-style formation: unsupported kind '{kind}' (expected \"Formation\")"
628        )));
629    }
630
631    let metadata = obj
632        .get("metadata")
633        .and_then(|v| v.as_object())
634        .ok_or_else(|| {
635            AppError::bad_request("kubectl-style formation: missing 'metadata' object".to_string())
636        })?;
637    let name = metadata
638        .get("name")
639        .and_then(|v| v.as_str())
640        .ok_or_else(|| {
641            AppError::bad_request("kubectl-style formation: missing 'metadata.name'".to_string())
642        })?;
643
644    let spec = obj.get("spec").and_then(|v| v.as_object()).ok_or_else(|| {
645        AppError::bad_request("kubectl-style formation: missing 'spec' object".to_string())
646    })?;
647
648    let coordinator = spec
649        .get("coordinator")
650        .and_then(|v| v.as_str())
651        .ok_or_else(|| {
652            AppError::bad_request("kubectl-style formation: missing 'spec.coordinator'".to_string())
653        })?;
654
655    let members_raw = spec
656        .get("members")
657        .and_then(|v| v.as_array())
658        .ok_or_else(|| {
659            AppError::bad_request(
660                "kubectl-style formation: missing or non-array 'spec.members'".to_string(),
661            )
662        })?;
663
664    // Rewrite each member: `name` → `id`. `authorizedBy` carries
665    // through. Any extra fields (`critical`, `spec`, future fields)
666    // are preserved verbatim — the admission gate only inspects
667    // `id`/`authorizedBy`, but we keep the rest so downstream
668    // consumers (supervisor, projection) see what the operator wrote.
669    let mut members_flat = Vec::with_capacity(members_raw.len());
670    for (idx, m) in members_raw.iter().enumerate() {
671        let m_obj = m.as_object().ok_or_else(|| {
672            AppError::bad_request(format!(
673                "kubectl-style formation: spec.members[{idx}] is not an object"
674            ))
675        })?;
676        let member_name = m_obj.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
677            AppError::bad_request(format!(
678                "kubectl-style formation: spec.members[{idx}] missing 'name'"
679            ))
680        })?;
681
682        // RT3-HIGH-3 (CTL-003-A): kubectl convention says `metadata.name`
683        // (and at member level, `name`) is the canonical identifier;
684        // spec-level fields MUST NOT shadow it. Earlier code populated
685        // `id := name` and then iterated the member's keys with
686        // last-write-wins, so an operator who declared BOTH
687        // `name: alice` AND `id: bob` ended up with `id: bob` — the
688        // operator's mistake silently overrode the kubectl identifier.
689        // Admission is strict: reject with bad-request and name the
690        // conflict so the operator knows which field to drop.
691        if m_obj.contains_key("id") {
692            return Err(AppError::bad_request(format!(
693                "kubectl-style formation: spec.members[{idx}] declares both 'name' \
694                 and 'id'; kubectl manifests address members by 'name' only — \
695                 remove the 'id' field"
696            )));
697        }
698
699        let mut rewritten = serde_json::Map::with_capacity(m_obj.len());
700        rewritten.insert(
701            "id".to_string(),
702            serde_json::Value::String(member_name.to_string()),
703        );
704        for (k, v) in m_obj.iter() {
705            if k == "name" {
706                continue; // already rewritten to `id`
707            }
708            // `id` is rejected above; the loop now only carries through
709            // `authorizedBy` and operator-supplied extras.
710            rewritten.insert(k.clone(), v.clone());
711        }
712        members_flat.push(serde_json::Value::Object(rewritten));
713    }
714
715    // CTL-003-B: preserve `metadata.labels` and `metadata.annotations`
716    // verbatim on the normalized record so GET round-trips them. The
717    // kubectl ecosystem uses these for selectors, ownership, and
718    // tooling annotations; silently dropping them on admission breaks
719    // the principle of least surprise for kubectl-style operators.
720    // We place them under a flat `metadata` object so the flat shape
721    // remains self-describing: anything not `name`/`coordinator`/
722    // `members` is metadata, mirroring the kubectl envelope.
723    let mut flat_metadata = serde_json::Map::new();
724    if let Some(labels) = metadata.get("labels") {
725        // Reject non-object labels early — kubectl convention is
726        // `metadata.labels` is `{string: string}`. We don't enforce the
727        // value type here (operators may emit numeric values that
728        // serde will round-trip), but the outer container must be an
729        // object.
730        if !labels.is_object() {
731            return Err(AppError::bad_request(
732                "kubectl-style formation: 'metadata.labels' must be an object",
733            ));
734        }
735        flat_metadata.insert("labels".to_string(), labels.clone());
736    }
737    if let Some(annotations) = metadata.get("annotations") {
738        if !annotations.is_object() {
739            return Err(AppError::bad_request(
740                "kubectl-style formation: 'metadata.annotations' must be an object",
741            ));
742        }
743        flat_metadata.insert("annotations".to_string(), annotations.clone());
744    }
745
746    let mut flat = serde_json::Map::with_capacity(4);
747    flat.insert(
748        "name".to_string(),
749        serde_json::Value::String(name.to_string()),
750    );
751    flat.insert(
752        "coordinator".to_string(),
753        serde_json::Value::String(coordinator.to_string()),
754    );
755    flat.insert(
756        "members".to_string(),
757        serde_json::Value::Array(members_flat),
758    );
759    if !flat_metadata.is_empty() {
760        flat.insert(
761            "metadata".to_string(),
762            serde_json::Value::Object(flat_metadata),
763        );
764    }
765
766    Ok(serde_json::Value::Object(flat))
767}
768
769/// Validate a formation `name` against the conservative character set
770/// the by-name lookup, URL routing, and log rendering can all carry
771/// safely. FUZZ-WAVE-1 (FUZZ-HIGH-1) found admission accepted `""`,
772/// `"   "`, and `"a\nb"`; downstream routing then broke (URLs can't
773/// carry newlines, empty names are ambiguous) and log lines were
774/// corrupted by control characters.
775///
776/// Rules (deliberately conservative — relaxing is non-breaking, but
777/// tightening after release would be):
778///
779/// - **Length**: `1 ≤ name.len() ≤ 253` (matches DNS label length).
780/// - **Characters**: `[A-Za-z0-9._-]` only. Reject every control
781///   character, whitespace, newline, slash, and every byte outside
782///   ASCII.
783/// - **Edges**: cannot start or end with `-` or `.`.
784/// - **Reserved**: cannot be `.` or `..` (these collide with relative
785///   filesystem paths cellctl renders into).
786///
787/// All violations surface as `/problems/bad-request` (RFC 9457 §3.1)
788/// with a `detail` string naming the rule. Tighter formation-specific
789/// discriminants would be a breaking expansion of the type-uri namespace
790/// for what is, fundamentally, a malformed input.
791fn validate_formation_name(name: &str) -> Result<(), AppError> {
792    // Length.
793    if name.is_empty() {
794        return Err(AppError::bad_request(
795            "formation name must not be empty".to_string(),
796        ));
797    }
798    if name.len() > 253 {
799        return Err(AppError::bad_request(format!(
800            "formation name length {} exceeds maximum of 253 bytes",
801            name.len()
802        )));
803    }
804
805    // Reserved names.
806    if name == "." || name == ".." {
807        return Err(AppError::bad_request(format!(
808            "formation name '{name}' is reserved"
809        )));
810    }
811
812    // Character class — operate on bytes; the allowed set is pure ASCII,
813    // so any non-ASCII byte (and thus any multi-byte UTF-8 sequence)
814    // fails this check and the operator gets a precise reason.
815    for (idx, b) in name.as_bytes().iter().enumerate() {
816        let ok = matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.');
817        if !ok {
818            // Render the offending byte: printable ASCII as itself,
819            // otherwise as a `\xNN` escape so newlines/control bytes
820            // don't smuggle themselves into the response body.
821            let rendered = if b.is_ascii_graphic() {
822                format!("'{}'", *b as char)
823            } else {
824                format!("\\x{b:02x}")
825            };
826            return Err(AppError::bad_request(format!(
827                "formation name contains disallowed character {rendered} at byte offset {idx} \
828                 (allowed: A-Z a-z 0-9 . - _)"
829            )));
830        }
831    }
832
833    // Edges. Safe to index by byte: ASCII-only at this point.
834    let first = name.as_bytes()[0];
835    let last = name.as_bytes()[name.len() - 1];
836    if first == b'-' || first == b'.' {
837        return Err(AppError::bad_request(format!(
838            "formation name '{name}' must not start with '-' or '.'"
839        )));
840    }
841    if last == b'-' || last == b'.' {
842        return Err(AppError::bad_request(format!(
843            "formation name '{name}' must not end with '-' or '.'"
844        )));
845    }
846
847    Ok(())
848}
849
850/// Apply the structural admission-gate checks ADR-0010 §Enforcement
851/// requires the server to re-run regardless of client behaviour:
852///
853/// 1. **noCoordinator** — the coordinator named in `coordinator` MUST
854///    appear in `members`.
855/// 2. **multipleCoordinators** — every `members[*].id` MUST be unique.
856///    The JSON schema declares `uniqueItems`; we re-enforce because
857///    the server cannot assume schema validation ran on the client.
858/// 3. **authorityNotNarrowing** — the coordinator MUST NOT carry
859///    `authorizedBy`; every non-coordinator MUST carry it AND the
860///    referenced parent MUST exist in `members` (an orphan parent is
861///    an unbounded A₀ — exactly the failure mode ADR-0010 §Proof
862///    forbids).
863/// 4. **cycle** — the `authorizedBy` edges MUST form a DAG. A cycle
864///    (including the self-edge `authorizedBy: self`) breaks the
865///    induction that proves every member's authority chains back to
866///    the coordinator.
867///
868/// The per-edge authority-subset check (`A_c ⊆ A_p`) lives in the
869/// supervisor today because the `formation-v1` document parsed here
870/// does not yet carry per-member declared authority sets; that is the
871/// only ADR-0010 check the server still defers.
872fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
873    use std::collections::{HashMap, HashSet};
874
875    // 1. noCoordinator.
876    let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
877    if !coord_present {
878        return Err(AppError::new(
879            AppErrorKind::FormationNoCoordinator,
880            format!(
881                "coordinator '{}' must appear in members list",
882                doc.coordinator
883            ),
884        ));
885    }
886
887    // 2. duplicateMemberId — `members[*].id` uniqueness. FUZZ-MED-4:
888    // historically this case re-used the `multiple-coordinators`
889    // discriminant because ADR-0010 §Consequences narrated the same
890    // scenario ("two members both named `coord`"). Operators reading
891    // the problem-type couldn't tell "manifest declares two
892    // coordinators" (structural design error) from "manifest declares
893    // two members with the same id" (typo). We now emit a dedicated
894    // discriminant so cellctl can render a typo-specific hint. The
895    // legacy `multiple-coordinators` discriminant is preserved on the
896    // AppErrorKind enum so callers who explicitly construct it still
897    // see the old wire shape — only the admission path here switched.
898    let mut seen: HashSet<&str> = HashSet::new();
899    for m in &doc.members {
900        if !seen.insert(m.id.as_str()) {
901            return Err(AppError::new(
902                AppErrorKind::FormationDuplicateMemberId,
903                format!("duplicate member id '{}'", m.id),
904            ));
905        }
906    }
907
908    // 3. authorityNotNarrowing — coord-forbid, non-coord require, plus
909    //    orphan-parent rejection. An `authorizedBy` reference that has
910    //    no member entry has no parent edge → no narrowing → admission
911    //    fails.
912    for m in &doc.members {
913        let is_coord = m.id == doc.coordinator;
914        match (is_coord, &m.authorized_by) {
915            (true, Some(_)) => {
916                return Err(AppError::new(
917                    AppErrorKind::FormationAuthorityNotNarrowing,
918                    format!("coordinator '{}' must not declare authorizedBy", m.id),
919                ));
920            }
921            (false, None) => {
922                return Err(AppError::new(
923                    AppErrorKind::FormationAuthorityNotNarrowing,
924                    format!("non-coordinator member '{}' missing authorizedBy", m.id),
925                ));
926            }
927            (false, Some(parent)) => {
928                if !seen.contains(parent.as_str()) {
929                    return Err(AppError::new(
930                        AppErrorKind::FormationAuthorityNotNarrowing,
931                        format!("member '{}' references unknown parent '{}'", m.id, parent),
932                    ));
933                }
934            }
935            _ => {}
936        }
937    }
938
939    // 4. cycle — walk each non-coordinator's authorizedBy chain. In a
940    //    valid DAG with exactly one out-edge per non-root, the walk
941    //    terminates at the coordinator within strictly fewer hops than
942    //    members.len(). Self-loops are caught on the first hop.
943    let parent: HashMap<&str, &str> = doc
944        .members
945        .iter()
946        .filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
947        .collect();
948
949    for m in &doc.members {
950        if m.id == doc.coordinator {
951            continue;
952        }
953        let mut cursor = m.id.as_str();
954        for _ in 0..doc.members.len() {
955            let Some(&p) = parent.get(cursor) else {
956                // No outgoing edge from cursor → cursor is the
957                // coordinator (proven in check 1 to be present). Done.
958                break;
959            };
960            if p == m.id {
961                return Err(AppError::new(
962                    AppErrorKind::FormationCycle,
963                    format!("authorizedBy cycle detected involving member '{}'", m.id),
964                ));
965            }
966            cursor = p;
967        }
968        if parent.contains_key(cursor) {
969            // Exhausted hop budget without reaching the coordinator —
970            // a cycle exists on the chain (not necessarily through
971            // `m.id` itself).
972            return Err(AppError::new(
973                AppErrorKind::FormationCycle,
974                format!(
975                    "authorizedBy cycle detected on chain starting at '{}'",
976                    m.id
977                ),
978            ));
979        }
980    }
981
982    Ok(())
983}
984
985#[cfg(test)]
986mod tests {
987    use super::*;
988    use crate::router;
989    use axum::body::Body;
990    use axum::http::{header, Request};
991    use http_body_util::BodyExt;
992    use tower::ServiceExt;
993
994    const TOKEN: &str = "test-token";
995
996    fn test_state() -> AppState {
997        AppState::new(None, TOKEN)
998    }
999
1000    fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
1001        let mut b = Request::builder()
1002            .method(method)
1003            .uri(uri)
1004            .header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
1005        if body.is_some() {
1006            b = b.header(header::CONTENT_TYPE, "application/json");
1007        }
1008        b.body(
1009            body.map(|s| Body::from(s.to_owned()))
1010                .unwrap_or_else(Body::empty),
1011        )
1012        .expect("build request")
1013    }
1014
1015    #[tokio::test]
1016    async fn post_valid_formation_returns_201() {
1017        let app = router(test_state());
1018        let body = serde_json::json!({
1019            "name": "demo",
1020            "coordinator": "coord",
1021            "members": [
1022                { "id": "coord" },
1023                { "id": "worker-a", "authorizedBy": "coord" },
1024                { "id": "worker-b", "authorizedBy": "coord" }
1025            ]
1026        })
1027        .to_string();
1028
1029        let resp = app
1030            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1031            .await
1032            .expect("router response");
1033        assert_eq!(resp.status(), StatusCode::CREATED);
1034
1035        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1036        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1037        assert_eq!(parsed["status"], "PENDING");
1038        assert_eq!(parsed["name"], "demo");
1039        assert!(parsed["id"].as_str().is_some());
1040    }
1041
1042    #[tokio::test]
1043    async fn post_formation_missing_coordinator_returns_400() {
1044        let app = router(test_state());
1045        // coordinator names "coord" but no such member exists.
1046        let body = serde_json::json!({
1047            "name": "demo",
1048            "coordinator": "coord",
1049            "members": [
1050                { "id": "worker-a", "authorizedBy": "coord" }
1051            ]
1052        })
1053        .to_string();
1054
1055        let resp = app
1056            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1057            .await
1058            .expect("router response");
1059        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1060        let ct = resp
1061            .headers()
1062            .get(header::CONTENT_TYPE)
1063            .and_then(|v| v.to_str().ok())
1064            .unwrap_or_default()
1065            .to_owned();
1066        assert!(
1067            ct.starts_with("application/problem+json"),
1068            "expected RFC 9457 media type, got {ct:?}"
1069        );
1070        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1071        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1072        assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
1073    }
1074
1075    #[tokio::test]
1076    async fn post_formation_member_missing_authorized_by_returns_400() {
1077        let app = router(test_state());
1078        let body = serde_json::json!({
1079            "name": "demo",
1080            "coordinator": "coord",
1081            "members": [
1082                { "id": "coord" },
1083                { "id": "worker-a" } // missing authorizedBy
1084            ]
1085        })
1086        .to_string();
1087
1088        let resp = app
1089            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1090            .await
1091            .expect("router response");
1092        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1093        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1094        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1095        assert_eq!(
1096            parsed["type"], "/problems/formation/authority-not-narrowing",
1097            "expected authority-not-narrowing discriminant, got {parsed}"
1098        );
1099    }
1100
1101    #[tokio::test]
1102    async fn get_formations_returns_snapshot_with_cursor() {
1103        // ADR-0015 §D2: GET /v1/formations is `{ formations: [...], cursor: u64 }`.
1104        let app = router(test_state());
1105        let resp = app
1106            .oneshot(auth_req("GET", "/v1/formations", None))
1107            .await
1108            .expect("router response");
1109        assert_eq!(resp.status(), StatusCode::OK);
1110        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1111        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1112        assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
1113        let arr = parsed["formations"].as_array().expect("formations array");
1114        assert_eq!(arr.len(), 0);
1115        assert!(
1116            parsed["cursor"].is_u64(),
1117            "cursor field must be an unsigned integer, got {}",
1118            parsed["cursor"]
1119        );
1120        assert_eq!(parsed["cursor"].as_u64(), Some(0));
1121    }
1122
1123    #[tokio::test]
1124    async fn snapshot_returns_cursor() {
1125        // ADR-0015 §D2 + §E: after POSTing a formation, the snapshot
1126        // response MUST carry a `cursor` field of integer type so the
1127        // client can hand it to `/ws/events?since=<cursor>`.
1128        let app = router(test_state());
1129        let body = serde_json::json!({
1130            "name": "with-cursor",
1131            "coordinator": "coord",
1132            "members": [
1133                { "id": "coord" },
1134                { "id": "worker-a", "authorizedBy": "coord" }
1135            ]
1136        })
1137        .to_string();
1138
1139        let resp = app
1140            .clone()
1141            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1142            .await
1143            .expect("router response");
1144        assert_eq!(resp.status(), StatusCode::CREATED);
1145
1146        let resp = app
1147            .oneshot(auth_req("GET", "/v1/formations", None))
1148            .await
1149            .expect("router response");
1150        assert_eq!(resp.status(), StatusCode::OK);
1151        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1152        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1153        assert!(
1154            parsed["cursor"].is_u64(),
1155            "cursor must be unsigned integer; got {}",
1156            parsed["cursor"]
1157        );
1158        let formations = parsed["formations"].as_array().expect("formations array");
1159        assert_eq!(formations.len(), 1, "expected 1 formation after POST");
1160        assert_eq!(formations[0]["name"], "with-cursor");
1161    }
1162
1163    #[tokio::test]
1164    async fn missing_bearer_returns_401() {
1165        let app = router(test_state());
1166        let resp = app
1167            .oneshot(
1168                Request::builder()
1169                    .method("GET")
1170                    .uri("/v1/formations")
1171                    .body(Body::empty())
1172                    .unwrap(),
1173            )
1174            .await
1175            .expect("router response");
1176        assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1177    }
1178
1179    /// FUZZ-MED-4: duplicate `members[*].id` now surfaces its own
1180    /// `/problems/formation/duplicate-member-id` discriminant rather
1181    /// than riding on `multiple-coordinators`. The JSON schema declares
1182    /// `uniqueItems`; we re-enforce because the server cannot assume
1183    /// schema validation ran on the client.
1184    ///
1185    /// Wire-compat note: the old discriminant
1186    /// `/problems/formation/multiple-coordinators` is preserved on the
1187    /// enum and continues to be emitted by other admission paths that
1188    /// legitimately mean "two coordinators". Clients pinning on the
1189    /// old type still see 400; they just won't trigger on this
1190    /// specific case anymore.
1191    #[tokio::test]
1192    async fn rejects_duplicate_member_ids_with_dedicated_type() {
1193        let app = router(test_state());
1194        let body = serde_json::json!({
1195            "name": "dup-ids",
1196            "coordinator": "coord",
1197            "members": [
1198                { "id": "coord" },
1199                { "id": "coord", "authorizedBy": "coord" }
1200            ]
1201        })
1202        .to_string();
1203        let resp = app
1204            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1205            .await
1206            .expect("router response");
1207        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1208        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1209        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1210        assert_eq!(
1211            parsed["type"], "/problems/formation/duplicate-member-id",
1212            "duplicate member ids must surface the dedicated discriminant"
1213        );
1214        // Detail must name the offending id so operators can find the
1215        // typo without re-reading their whole manifest.
1216        let detail = parsed["detail"].as_str().unwrap_or("");
1217        assert!(
1218            detail.contains("coord"),
1219            "detail must name the duplicate id, got: {detail}",
1220        );
1221    }
1222
1223    /// ADR-0010 §Enforcement `cycle` discriminant. `authorizedBy: self`
1224    /// is the minimal cycle.
1225    #[tokio::test]
1226    async fn rejects_self_authorized_cycle() {
1227        let app = router(test_state());
1228        let body = serde_json::json!({
1229            "name": "self-cycle",
1230            "coordinator": "coord",
1231            "members": [
1232                { "id": "coord" },
1233                { "id": "worker-a", "authorizedBy": "worker-a" }
1234            ]
1235        })
1236        .to_string();
1237        let resp = app
1238            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1239            .await
1240            .expect("router response");
1241        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1242        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1243        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1244        assert_eq!(parsed["type"], "/problems/formation/cycle");
1245    }
1246
1247    /// Two-node cycle a→b→a; neither chains back to coordinator.
1248    #[tokio::test]
1249    async fn rejects_two_node_cycle() {
1250        let app = router(test_state());
1251        let body = serde_json::json!({
1252            "name": "two-cycle",
1253            "coordinator": "coord",
1254            "members": [
1255                { "id": "coord" },
1256                { "id": "a", "authorizedBy": "b" },
1257                { "id": "b", "authorizedBy": "a" }
1258            ]
1259        })
1260        .to_string();
1261        let resp = app
1262            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1263            .await
1264            .expect("router response");
1265        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1266        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1267        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1268        assert_eq!(parsed["type"], "/problems/formation/cycle");
1269    }
1270
1271    /// Orphan parent: `authorizedBy: ghost` where `ghost` is not in
1272    /// `members`. Without a parent edge the member has no narrowing
1273    /// path → `authorityNotNarrowing`.
1274    #[tokio::test]
1275    async fn rejects_orphan_parent_reference() {
1276        let app = router(test_state());
1277        let body = serde_json::json!({
1278            "name": "orphan-parent",
1279            "coordinator": "coord",
1280            "members": [
1281                { "id": "coord" },
1282                { "id": "worker-a", "authorizedBy": "ghost" }
1283            ]
1284        })
1285        .to_string();
1286        let resp = app
1287            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1288            .await
1289            .expect("router response");
1290        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1291        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1292        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1293        assert_eq!(
1294            parsed["type"],
1295            "/problems/formation/authority-not-narrowing"
1296        );
1297    }
1298
1299    /// Red-team finding: POST /v1/formations was using axum's default
1300    /// 2 MiB body limit. We cap at 64 KiB so a >64 KiB payload returns
1301    /// 413 Payload Too Large rather than burning CPU on serde parsing.
1302    #[tokio::test]
1303    async fn post_formation_oversized_body_returns_413() {
1304        let app = router(test_state());
1305        // Build a JSON document larger than 64 KiB by stuffing a long
1306        // `name` field. The body-limit layer rejects before serde
1307        // parses, so the document does not need to be semantically
1308        // valid past the limit.
1309        let big = "x".repeat(70 * 1024);
1310        let body =
1311            format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
1312        let resp = app
1313            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1314            .await
1315            .expect("router response");
1316        assert_eq!(
1317            resp.status(),
1318            StatusCode::PAYLOAD_TOO_LARGE,
1319            "oversized body must surface 413; got {:?}",
1320            resp.status(),
1321        );
1322    }
1323
1324    /// Sanity-probe: the parameterized route `/v1/formations/{id}`
1325    /// actually captures the path segment. Existing tests only hit the
1326    /// non-parameterized `/v1/formations`; if the route registration
1327    /// ever regresses to literal-brace matching (e.g. on a future axum
1328    /// upgrade that changes path syntax), this test fails loudly.
1329    ///
1330    /// We POST a real formation then GET it by id and check the
1331    /// returned record matches. A 404 with empty/non-problem+json
1332    /// content-type indicates router-level miss (literal route); a
1333    /// 404 with problem+json indicates handler-level not-found
1334    /// (route matched, formation absent). We expect 200.
1335    #[tokio::test]
1336    async fn get_formation_by_id_captures_path() {
1337        let state = test_state();
1338        let body = serde_json::json!({
1339            "name": "probe",
1340            "coordinator": "coord",
1341            "members": [
1342                { "id": "coord" },
1343                { "id": "worker-a", "authorizedBy": "coord" }
1344            ]
1345        })
1346        .to_string();
1347        let resp = router(state.clone())
1348            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1349            .await
1350            .expect("router response");
1351        assert_eq!(resp.status(), StatusCode::CREATED);
1352        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1353        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1354        let id = parsed["id"].as_str().expect("uuid string");
1355
1356        let resp = router(state)
1357            .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
1358            .await
1359            .expect("router response");
1360        assert_eq!(
1361            resp.status(),
1362            StatusCode::OK,
1363            "GET /v1/formations/<id> must capture the path segment; got {:?}",
1364            resp.status(),
1365        );
1366        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1367        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1368        assert_eq!(parsed["name"], "probe");
1369    }
1370
1371    /// CTL-003 / SCHEMA-001 — kubectl-style happy path.
1372    /// `contracts/schemas/formation-v1.schema.json` documents the
1373    /// kubectl envelope; the server now accepts it via an admission
1374    /// adapter and normalizes to flat internally.
1375    #[tokio::test]
1376    async fn post_kubectl_style_formation_returns_201() {
1377        let app = router(test_state());
1378        let body = serde_json::json!({
1379            "apiVersion": "cellos.dev/v1",
1380            "kind": "Formation",
1381            "metadata": { "name": "kubectl-demo" },
1382            "spec": {
1383                "coordinator": "coord",
1384                "members": [
1385                    { "name": "coord" },
1386                    { "name": "worker-a", "authorizedBy": "coord" },
1387                    { "name": "worker-b", "authorizedBy": "coord" }
1388                ]
1389            }
1390        })
1391        .to_string();
1392
1393        let resp = app
1394            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1395            .await
1396            .expect("router response");
1397        assert_eq!(resp.status(), StatusCode::CREATED);
1398
1399        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1400        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1401        assert_eq!(parsed["status"], "PENDING");
1402        assert_eq!(parsed["name"], "kubectl-demo");
1403        assert!(parsed["id"].as_str().is_some());
1404    }
1405
1406    /// Kubectl-style → ADR-0010 admission still fires on the
1407    /// normalized flat form. Missing-coordinator on a kubectl-shaped
1408    /// payload must surface the same `/problems/formation/no-coordinator`
1409    /// discriminant the flat path produces.
1410    #[tokio::test]
1411    async fn post_kubectl_style_missing_coordinator_returns_no_coordinator() {
1412        let app = router(test_state());
1413        let body = serde_json::json!({
1414            "apiVersion": "cellos.dev/v1",
1415            "kind": "Formation",
1416            "metadata": { "name": "missing-coord" },
1417            "spec": {
1418                "coordinator": "coord",
1419                "members": [
1420                    { "name": "worker-a", "authorizedBy": "coord" }
1421                ]
1422            }
1423        })
1424        .to_string();
1425
1426        let resp = app
1427            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1428            .await
1429            .expect("router response");
1430        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1431        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1432        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1433        assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
1434    }
1435
1436    /// Hybrid: top-level `name` + top-level `apiVersion`. Operator
1437    /// ambiguity — reject with 400 `/problems/bad-request` listing the
1438    /// conflicting fields.
1439    #[tokio::test]
1440    async fn post_hybrid_formation_returns_400_bad_request() {
1441        let app = router(test_state());
1442        let body = serde_json::json!({
1443            "apiVersion": "cellos.dev/v1",
1444            "kind": "Formation",
1445            "metadata": { "name": "hybrid" },
1446            "spec": {
1447                "coordinator": "coord",
1448                "members": [ { "name": "coord" } ]
1449            },
1450            // Stray flat-style field — operator confused two shapes.
1451            "name": "hybrid",
1452            "members": [ { "id": "coord" } ]
1453        })
1454        .to_string();
1455
1456        let resp = app
1457            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1458            .await
1459            .expect("router response");
1460        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1461        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1462        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1463        assert_eq!(
1464            parsed["type"], "/problems/bad-request",
1465            "hybrid shape must surface a generic bad-request, not an admission discriminant"
1466        );
1467        let detail = parsed["detail"].as_str().unwrap_or_default();
1468        assert!(
1469            detail.contains("hybrid"),
1470            "detail must mention 'hybrid'; got {detail:?}"
1471        );
1472    }
1473
1474    /// Kubectl-style with wrong `apiVersion` is rejected as bad-request
1475    /// before admission runs.
1476    #[tokio::test]
1477    async fn post_kubectl_style_wrong_api_version_returns_400() {
1478        let app = router(test_state());
1479        let body = serde_json::json!({
1480            "apiVersion": "cellos.dev/v2",
1481            "kind": "Formation",
1482            "metadata": { "name": "wrong-api" },
1483            "spec": {
1484                "coordinator": "coord",
1485                "members": [ { "name": "coord" } ]
1486            }
1487        })
1488        .to_string();
1489
1490        let resp = app
1491            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1492            .await
1493            .expect("router response");
1494        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1495        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1496        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1497        assert_eq!(parsed["type"], "/problems/bad-request");
1498        let detail = parsed["detail"].as_str().unwrap_or_default();
1499        assert!(
1500            detail.contains("apiVersion") && detail.contains("cellos.dev/v2"),
1501            "detail must name the bad apiVersion; got {detail:?}"
1502        );
1503    }
1504
1505    /// Kubectl-style with wrong `kind` is rejected as bad-request.
1506    #[tokio::test]
1507    async fn post_kubectl_style_wrong_kind_returns_400() {
1508        let app = router(test_state());
1509        let body = serde_json::json!({
1510            "apiVersion": "cellos.dev/v1",
1511            "kind": "Cell",
1512            "metadata": { "name": "wrong-kind" },
1513            "spec": {
1514                "coordinator": "coord",
1515                "members": [ { "name": "coord" } ]
1516            }
1517        })
1518        .to_string();
1519
1520        let resp = app
1521            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1522            .await
1523            .expect("router response");
1524        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1525        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1526        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1527        assert_eq!(parsed["type"], "/problems/bad-request");
1528        let detail = parsed["detail"].as_str().unwrap_or_default();
1529        assert!(
1530            detail.contains("kind") && detail.contains("Cell"),
1531            "detail must name the bad kind; got {detail:?}"
1532        );
1533    }
1534
1535    /// After a kubectl-style POST, the GET round-trip MUST echo the
1536    /// normalized (flat) shape so downstream consumers see one stable
1537    /// document layout.
1538    #[tokio::test]
1539    async fn kubectl_style_post_then_get_returns_normalized_flat_document() {
1540        let state = test_state();
1541        let body = serde_json::json!({
1542            "apiVersion": "cellos.dev/v1",
1543            "kind": "Formation",
1544            "metadata": { "name": "roundtrip" },
1545            "spec": {
1546                "coordinator": "coord",
1547                "members": [
1548                    { "name": "coord" },
1549                    { "name": "worker-a", "authorizedBy": "coord" }
1550                ]
1551            }
1552        })
1553        .to_string();
1554
1555        let resp = router(state.clone())
1556            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1557            .await
1558            .expect("router response");
1559        assert_eq!(resp.status(), StatusCode::CREATED);
1560        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1561        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1562        let id = parsed["id"].as_str().expect("uuid string");
1563
1564        let resp = router(state)
1565            .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
1566            .await
1567            .expect("router response");
1568        assert_eq!(resp.status(), StatusCode::OK);
1569        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1570        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1571        let doc = &parsed["document"];
1572        assert_eq!(doc["name"], "roundtrip", "flat 'name' present");
1573        assert_eq!(doc["coordinator"], "coord", "flat 'coordinator' present");
1574        let members = doc["members"]
1575            .as_array()
1576            .expect("members array on normalized doc");
1577        assert_eq!(members.len(), 2);
1578        assert_eq!(members[0]["id"], "coord");
1579        assert_eq!(members[1]["id"], "worker-a");
1580        assert_eq!(members[1]["authorizedBy"], "coord");
1581        // Envelope fields stripped on normalization.
1582        assert!(
1583            doc.get("apiVersion").is_none(),
1584            "kubectl envelope must not leak into normalized doc"
1585        );
1586        assert!(doc.get("kind").is_none());
1587        assert!(doc.get("metadata").is_none());
1588        assert!(doc.get("spec").is_none());
1589    }
1590
1591    /// RT3-HIGH-3 (CTL-003-A): a kubectl-style member that declares
1592    /// BOTH `name` and `id` is a manifest mistake — kubectl manifests
1593    /// address members by `name` only, and the previous normalization
1594    /// loop silently let the operator-supplied `id` win over the
1595    /// canonical name (`Map::insert` is last-write-wins). Admission is
1596    /// strict: reject with `/problems/bad-request` and name the conflict.
1597    #[tokio::test]
1598    async fn kubectl_member_with_explicit_id_returns_400() {
1599        let app = router(test_state());
1600        let body = serde_json::json!({
1601            "apiVersion": "cellos.dev/v1",
1602            "kind": "Formation",
1603            "metadata": { "name": "rt3-ctl-003-a" },
1604            "spec": {
1605                "coordinator": "alice",
1606                "members": [
1607                    // The mistake: declaring BOTH name AND id. The old
1608                    // code would set id := "name", then overwrite with
1609                    // id := "bob" from the spec field. New behaviour:
1610                    // reject the manifest outright.
1611                    { "name": "alice", "id": "bob" },
1612                    { "name": "worker-a", "authorizedBy": "alice" }
1613                ]
1614            }
1615        })
1616        .to_string();
1617
1618        let resp = app
1619            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1620            .await
1621            .expect("router response");
1622        assert_eq!(
1623            resp.status(),
1624            StatusCode::BAD_REQUEST,
1625            "manifest with both name+id at member level must be rejected"
1626        );
1627        let ct = resp
1628            .headers()
1629            .get(header::CONTENT_TYPE)
1630            .and_then(|v| v.to_str().ok())
1631            .unwrap_or_default()
1632            .to_owned();
1633        assert!(
1634            ct.starts_with("application/problem+json"),
1635            "expected RFC 9457 media type, got {ct:?}"
1636        );
1637        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1638        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1639        assert_eq!(
1640            parsed["type"], "/problems/bad-request",
1641            "kubectl id-conflict is a generic bad-request, not an ADR-0010 discriminant"
1642        );
1643        let detail = parsed["detail"].as_str().unwrap_or_default();
1644        assert!(
1645            detail.contains("'name'") && detail.contains("'id'"),
1646            "detail must name both conflicting fields; got {detail:?}"
1647        );
1648    }
1649
1650    /// RT3-HIGH-3 (CTL-002-A): when two formations share a name,
1651    /// `DELETE /v1/formations/by-name/{name}` MUST refuse with 409
1652    /// Conflict instead of silently deleting the BTreeMap-first match.
1653    /// Silent wrong-deletion is the operator-trust failure mode the
1654    /// red-team flagged; admission-time uniqueness is being added in a
1655    /// sibling stream, but defense in depth keeps this guard in place.
1656    #[tokio::test]
1657    async fn delete_by_name_with_duplicates_returns_409() {
1658        let state = test_state();
1659
1660        // Inject two FormationRecords with the same name directly into
1661        // the projection. This bypasses admission (which is the whole
1662        // point of the test: admission may or may not catch this, but
1663        // DELETE must NOT silently pick one).
1664        let id_a = Uuid::new_v4();
1665        let id_b = Uuid::new_v4();
1666        {
1667            let mut map = state.formations.write().await;
1668            for id in [id_a, id_b] {
1669                map.insert(
1670                    id,
1671                    FormationRecord {
1672                        id,
1673                        name: "rt3-dup".to_string(),
1674                        status: FormationStatus::Pending,
1675                        document: serde_json::json!({"name": "rt3-dup"}),
1676                    },
1677                );
1678            }
1679        }
1680
1681        let resp = router(state.clone())
1682            .oneshot(auth_req("DELETE", "/v1/formations/by-name/rt3-dup", None))
1683            .await
1684            .expect("router response");
1685        assert_eq!(
1686            resp.status(),
1687            StatusCode::CONFLICT,
1688            "duplicate-name DELETE must surface 409, not silently delete"
1689        );
1690
1691        let ct = resp
1692            .headers()
1693            .get(header::CONTENT_TYPE)
1694            .and_then(|v| v.to_str().ok())
1695            .unwrap_or_default()
1696            .to_owned();
1697        assert!(
1698            ct.starts_with("application/problem+json"),
1699            "expected RFC 9457 media type, got {ct:?}"
1700        );
1701        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1702        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1703        assert_eq!(parsed["type"], "/problems/conflict");
1704        let detail = parsed["detail"].as_str().unwrap_or_default();
1705        assert!(
1706            detail.contains(&id_a.to_string()) && detail.contains(&id_b.to_string()),
1707            "detail must list BOTH conflicting UUIDs so the operator can disambiguate; \
1708             got {detail:?}"
1709        );
1710        assert!(
1711            detail.contains("rt3-dup"),
1712            "detail must name the conflicting formation name; got {detail:?}"
1713        );
1714
1715        // Defense in depth: neither formation should have been mutated.
1716        let map = state.formations.read().await;
1717        assert!(map.contains_key(&id_a), "id_a must still exist after 409");
1718        assert!(map.contains_key(&id_b), "id_b must still exist after 409");
1719        assert_eq!(map.get(&id_a).unwrap().status, FormationStatus::Pending);
1720        assert_eq!(map.get(&id_b).unwrap().status, FormationStatus::Pending);
1721    }
1722
1723    /// Red-team finding: `update_formation_status` previously returned
1724    /// the ADR-0010 `no-coordinator` discriminant for unknown state
1725    /// strings, hijacking a load-bearing admission-gate identifier.
1726    /// Unknown state is a generic bad-request.
1727    #[tokio::test]
1728    async fn unknown_state_returns_bad_request_problem_type() {
1729        let state = test_state();
1730        let body = serde_json::json!({
1731            "name": "demo",
1732            "coordinator": "coord",
1733            "members": [
1734                { "id": "coord" },
1735                { "id": "worker-a", "authorizedBy": "coord" }
1736            ]
1737        })
1738        .to_string();
1739        let resp = router(state.clone())
1740            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1741            .await
1742            .expect("router response");
1743        assert_eq!(resp.status(), StatusCode::CREATED);
1744        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1745        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1746        let id = parsed["id"].as_str().expect("uuid string").to_owned();
1747
1748        let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
1749        let resp = router(state)
1750            .oneshot(auth_req(
1751                "POST",
1752                &format!("/v1/formations/{id}/status"),
1753                Some(&bad),
1754            ))
1755            .await
1756            .expect("router response");
1757        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1758        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1759        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1760        assert_eq!(
1761            parsed["type"], "/problems/bad-request",
1762            "unknown state must surface generic bad-request, not an ADR-0010 discriminant"
1763        );
1764    }
1765
1766    // ----------------------------------------------------------------
1767    // FUZZ-HIGH-1 — name validation
1768    //
1769    // Fuzz wave 1 admitted hostile names (`""`, `"   "`, `"a\nb"`).
1770    // Every negative case below MUST return 400 problem+json with the
1771    // generic `/problems/bad-request` discriminant; every positive case
1772    // must reach the ADR-0010 admission gate and succeed with 201.
1773    // ----------------------------------------------------------------
1774
1775    /// Build a minimal valid POST body for a given name. Used by both
1776    /// name-validation and uniqueness tests below. Keeps the test cases
1777    /// focused on the field actually under test.
1778    fn minimal_body(name: &str) -> String {
1779        serde_json::json!({
1780            "name": name,
1781            "coordinator": "coord",
1782            "members": [
1783                { "id": "coord" },
1784                { "id": "worker-a", "authorizedBy": "coord" }
1785            ]
1786        })
1787        .to_string()
1788    }
1789
1790    /// Assert that POSTing `name` yields 400 `/problems/bad-request`.
1791    /// The `expect_in_detail` substring lets each case prove its OWN
1792    /// rule fired, not a different one — empty-name and 254-byte-name
1793    /// both produce bad-request, but for different reasons.
1794    async fn assert_name_rejected_bad_request(name: &str, expect_in_detail: &str) {
1795        let app = router(test_state());
1796        let body = minimal_body(name);
1797        let resp = app
1798            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1799            .await
1800            .expect("router response");
1801        assert_eq!(
1802            resp.status(),
1803            StatusCode::BAD_REQUEST,
1804            "name {name:?} must be rejected with 400; got {:?}",
1805            resp.status()
1806        );
1807        let ct = resp
1808            .headers()
1809            .get(header::CONTENT_TYPE)
1810            .and_then(|v| v.to_str().ok())
1811            .unwrap_or_default()
1812            .to_owned();
1813        assert!(
1814            ct.starts_with("application/problem+json"),
1815            "name {name:?} must surface RFC 9457 media type; got {ct:?}"
1816        );
1817        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1818        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1819        assert_eq!(
1820            parsed["type"], "/problems/bad-request",
1821            "name {name:?} must surface generic bad-request, not an admission discriminant; got {parsed}"
1822        );
1823        let detail = parsed["detail"].as_str().unwrap_or_default();
1824        assert!(
1825            detail.contains(expect_in_detail),
1826            "detail for {name:?} must contain {expect_in_detail:?}; got {detail:?}"
1827        );
1828    }
1829
1830    /// FUZZ-HIGH-1 / F14 — admission accepted `""`. Reject as bad-request.
1831    #[tokio::test]
1832    async fn rejects_empty_name() {
1833        assert_name_rejected_bad_request("", "empty").await;
1834    }
1835
1836    /// FUZZ-HIGH-1 — whitespace-only name. Spaces are not in the
1837    /// allow-set, so the character-class check fires.
1838    #[tokio::test]
1839    async fn rejects_whitespace_only_name() {
1840        assert_name_rejected_bad_request("   ", "disallowed character").await;
1841    }
1842
1843    /// FUZZ-HIGH-1 / F44 — newline in name. The fuzz report flagged this
1844    /// because it corrupts log lines and breaks URL routing.
1845    #[tokio::test]
1846    async fn rejects_newline_in_name() {
1847        assert_name_rejected_bad_request("a\nb", "disallowed character").await;
1848    }
1849
1850    /// FUZZ-HIGH-1 — tab character. Same class as newline: control byte
1851    /// outside the allow-set.
1852    #[tokio::test]
1853    async fn rejects_tab_in_name() {
1854        assert_name_rejected_bad_request("a\tb", "disallowed character").await;
1855    }
1856
1857    /// FUZZ-HIGH-1 — embedded NUL byte. Breaks C-string boundaries in
1858    /// any downstream consumer that ever hands the name to a syscall.
1859    #[tokio::test]
1860    async fn rejects_nul_byte_in_name() {
1861        assert_name_rejected_bad_request("a\0b", "disallowed character").await;
1862    }
1863
1864    /// FUZZ-HIGH-1 — non-ASCII (UTF-8 multi-byte). The allow-set is
1865    /// pure ASCII; emoji and accented characters fail the byte-class
1866    /// check.
1867    #[tokio::test]
1868    async fn rejects_non_ascii_in_name() {
1869        assert_name_rejected_bad_request("café", "disallowed character").await;
1870    }
1871
1872    /// FUZZ-HIGH-1 — length cap. A 254-byte name (one over the DNS
1873    /// label limit) is rejected.
1874    #[tokio::test]
1875    async fn rejects_overlong_name() {
1876        let long = "a".repeat(254);
1877        assert_name_rejected_bad_request(&long, "exceeds maximum").await;
1878    }
1879
1880    /// FUZZ-HIGH-1 — leading `-`. Mirrors DNS label rules; many
1881    /// downstream tools special-case leading hyphens as flags.
1882    #[tokio::test]
1883    async fn rejects_leading_hyphen() {
1884        assert_name_rejected_bad_request("-leading", "start with").await;
1885    }
1886
1887    /// FUZZ-HIGH-1 — trailing `.`. Trailing dots collide with relative
1888    /// filesystem paths and DNS-style fully-qualified-name conventions.
1889    #[tokio::test]
1890    async fn rejects_trailing_dot() {
1891        assert_name_rejected_bad_request("trailing.", "end with").await;
1892    }
1893
1894    /// FUZZ-HIGH-1 — reserved name `.`. Would alias the current-directory
1895    /// path segment in cellctl rendering. The reserved-name rule runs
1896    /// before the edge rule, so the operator sees the precise reason.
1897    #[tokio::test]
1898    async fn rejects_single_dot_reserved_name() {
1899        assert_name_rejected_bad_request(".", "reserved").await;
1900    }
1901
1902    /// FUZZ-HIGH-1 — reserved name `..`. Same class as `.`; also begins
1903    /// with `.` so the edge rule fires first. We assert bad-request
1904    /// without pinning which rule trips — both are correct rejections.
1905    #[tokio::test]
1906    async fn rejects_double_dot_reserved_name() {
1907        let app = router(test_state());
1908        let body = minimal_body("..");
1909        let resp = app
1910            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1911            .await
1912            .expect("router response");
1913        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1914        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1915        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1916        assert_eq!(parsed["type"], "/problems/bad-request");
1917    }
1918
1919    /// FUZZ-HIGH-1 positive — simple lowercase name.
1920    #[tokio::test]
1921    async fn accepts_simple_lowercase_name() {
1922        let app = router(test_state());
1923        let body = minimal_body("demo");
1924        let resp = app
1925            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1926            .await
1927            .expect("router response");
1928        assert_eq!(resp.status(), StatusCode::CREATED);
1929    }
1930
1931    /// FUZZ-HIGH-1 positive — hyphenated name (the dominant convention
1932    /// in the existing test corpus: `demo`, `with-cursor`, `valid-dag`).
1933    #[tokio::test]
1934    async fn accepts_hyphenated_name() {
1935        let app = router(test_state());
1936        let body = minimal_body("my-formation-v2");
1937        let resp = app
1938            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1939            .await
1940            .expect("router response");
1941        assert_eq!(resp.status(), StatusCode::CREATED);
1942    }
1943
1944    /// FUZZ-HIGH-1 positive — dotted name (mid-name dots and underscores
1945    /// are both allowed; only edge dots/hyphens are rejected).
1946    #[tokio::test]
1947    async fn accepts_dotted_and_underscored_name() {
1948        let app = router(test_state());
1949        let body = minimal_body("team.alpha_one");
1950        let resp = app
1951            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1952            .await
1953            .expect("router response");
1954        assert_eq!(resp.status(), StatusCode::CREATED);
1955    }
1956
1957    // ----------------------------------------------------------------
1958    // FUZZ-HIGH-2 — name uniqueness
1959    //
1960    // Two formations sharing a name break `GET /v1/formations/by-name/{name}`
1961    // (it returns the first match and hides the rest). Admission must
1962    // enforce uniqueness so by-name lookup is total.
1963    // ----------------------------------------------------------------
1964
1965    /// FUZZ-HIGH-2 / F-dup-name — POST `name=demo` twice. The first
1966    /// request succeeds with 201; the second MUST return 409 with the
1967    /// `/problems/conflict` discriminant. The first formation must
1968    /// remain queryable by-name (proves we didn't accidentally evict it).
1969    #[tokio::test]
1970    async fn duplicate_name_returns_409() {
1971        let state = test_state();
1972        let body = minimal_body("demo");
1973
1974        // First POST — succeeds.
1975        let resp = router(state.clone())
1976            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1977            .await
1978            .expect("router response");
1979        assert_eq!(
1980            resp.status(),
1981            StatusCode::CREATED,
1982            "first POST must succeed"
1983        );
1984        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1985        let first: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1986        let first_id = first["id"]
1987            .as_str()
1988            .expect("first POST returned uuid")
1989            .to_owned();
1990
1991        // Second POST with same name — MUST conflict.
1992        let resp = router(state.clone())
1993            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1994            .await
1995            .expect("router response");
1996        assert_eq!(
1997            resp.status(),
1998            StatusCode::CONFLICT,
1999            "duplicate name must surface 409; got {:?}",
2000            resp.status()
2001        );
2002        let ct = resp
2003            .headers()
2004            .get(header::CONTENT_TYPE)
2005            .and_then(|v| v.to_str().ok())
2006            .unwrap_or_default()
2007            .to_owned();
2008        assert!(
2009            ct.starts_with("application/problem+json"),
2010            "409 must use RFC 9457 media type; got {ct:?}"
2011        );
2012        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2013        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2014        assert_eq!(parsed["type"], "/problems/conflict");
2015        let detail = parsed["detail"].as_str().unwrap_or_default();
2016        assert!(
2017            detail.contains(&first_id),
2018            "conflict detail must name the existing UUID {first_id}; got {detail:?}"
2019        );
2020        assert!(
2021            detail.contains("demo"),
2022            "conflict detail must name the conflicting name; got {detail:?}"
2023        );
2024
2025        // First formation MUST still be queryable by-name (proves the
2026        // second request didn't clobber or shadow it).
2027        let resp = router(state)
2028            .oneshot(auth_req("GET", "/v1/formations/by-name/demo", None))
2029            .await
2030            .expect("router response");
2031        assert_eq!(resp.status(), StatusCode::OK);
2032        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2033        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2034        assert_eq!(
2035            parsed["id"].as_str(),
2036            Some(first_id.as_str()),
2037            "first formation must remain addressable by name"
2038        );
2039    }
2040
2041    /// CTL-003-B: a kubectl manifest with `metadata.labels` and
2042    /// `metadata.annotations` MUST round-trip through POST → GET. The
2043    /// kubectl ecosystem leans on these for selectors and tooling
2044    /// annotations; silently dropping them on admission would break
2045    /// the principle of least surprise for kubectl-style operators.
2046    #[tokio::test]
2047    async fn kubectl_metadata_labels_and_annotations_round_trip() {
2048        let state = test_state();
2049        let body = serde_json::json!({
2050            "apiVersion": "cellos.dev/v1",
2051            "kind": "Formation",
2052            "metadata": {
2053                "name": "labelled",
2054                "labels": {
2055                    "app": "celltest",
2056                    "tier": "frontend"
2057                },
2058                "annotations": {
2059                    "operator": "ryan@cellos.dev",
2060                    "audit/ticket": "OPS-1234"
2061                }
2062            },
2063            "spec": {
2064                "coordinator": "coord",
2065                "members": [
2066                    { "name": "coord" },
2067                    { "name": "worker-a", "authorizedBy": "coord" }
2068                ]
2069            }
2070        })
2071        .to_string();
2072        let resp = router(state.clone())
2073            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
2074            .await
2075            .expect("router response");
2076        assert_eq!(resp.status(), StatusCode::CREATED);
2077
2078        let resp = router(state)
2079            .oneshot(auth_req("GET", "/v1/formations/by-name/labelled", None))
2080            .await
2081            .expect("router response");
2082        assert_eq!(resp.status(), StatusCode::OK);
2083        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2084        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2085
2086        let labels = &parsed["document"]["metadata"]["labels"];
2087        assert_eq!(labels["app"], "celltest", "label 'app' lost in round-trip");
2088        assert_eq!(
2089            labels["tier"], "frontend",
2090            "label 'tier' lost in round-trip"
2091        );
2092
2093        let annotations = &parsed["document"]["metadata"]["annotations"];
2094        assert_eq!(
2095            annotations["operator"], "ryan@cellos.dev",
2096            "annotation 'operator' lost in round-trip"
2097        );
2098        assert_eq!(
2099            annotations["audit/ticket"], "OPS-1234",
2100            "annotation 'audit/ticket' lost in round-trip"
2101        );
2102    }
2103
2104    /// CTL-003-B: a kubectl manifest with a `status` block (or any
2105    /// other unknown top-level field) MUST be rejected with 400 + a
2106    /// detail naming the field, so an operator who copy-pasted a GET
2107    /// response back into a POST body sees a precise error rather than
2108    /// silently shipping a stripped manifest.
2109    #[tokio::test]
2110    async fn kubectl_unknown_top_level_field_returns_400() {
2111        let app = router(test_state());
2112        let body = serde_json::json!({
2113            "apiVersion": "cellos.dev/v1",
2114            "kind": "Formation",
2115            "metadata": { "name": "with-status" },
2116            "spec": {
2117                "coordinator": "coord",
2118                "members": [{ "name": "coord" }]
2119            },
2120            "status": {
2121                "phase": "Running",
2122                "observedGeneration": 42
2123            }
2124        })
2125        .to_string();
2126        let resp = app
2127            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
2128            .await
2129            .expect("router response");
2130        assert_eq!(
2131            resp.status(),
2132            StatusCode::BAD_REQUEST,
2133            "unknown top-level 'status' must surface 400; got {:?}",
2134            resp.status(),
2135        );
2136        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2137        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2138        assert_eq!(parsed["type"], "/problems/bad-request");
2139        let detail = parsed["detail"].as_str().unwrap_or_default();
2140        assert!(
2141            detail.contains("status"),
2142            "detail must name the offending field 'status'; got: {detail}",
2143        );
2144        assert!(
2145            detail.contains("unknown top-level"),
2146            "detail must say 'unknown top-level' so the operator knows the failure class; got: {detail}",
2147        );
2148    }
2149
2150    /// CTL-003-B: `metadata.labels` must be an object — non-object
2151    /// labels (e.g. a string or array) are rejected with a precise
2152    /// detail so the operator sees the shape mismatch immediately
2153    /// rather than discovering it via a downstream selector failure.
2154    #[tokio::test]
2155    async fn kubectl_non_object_labels_returns_400() {
2156        let app = router(test_state());
2157        let body = serde_json::json!({
2158            "apiVersion": "cellos.dev/v1",
2159            "kind": "Formation",
2160            "metadata": {
2161                "name": "bad-labels",
2162                "labels": "app=celltest"
2163            },
2164            "spec": {
2165                "coordinator": "coord",
2166                "members": [{ "name": "coord" }]
2167            }
2168        })
2169        .to_string();
2170        let resp = app
2171            .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
2172            .await
2173            .expect("router response");
2174        assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2175        let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2176        let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2177        let detail = parsed["detail"].as_str().unwrap_or_default();
2178        assert!(
2179            detail.contains("metadata.labels"),
2180            "detail must name 'metadata.labels'; got: {detail}",
2181        );
2182    }
2183
2184    /// EVT-CONTENT-001-C source-position test: `now_rfc3339` MUST be
2185    /// captured BEFORE `state.formations.write().await` is acquired so
2186    /// the CloudEvent `time` field reflects transition arrival, not
2187    /// post-commit time. We assert textually because the alternative —
2188    /// driving two concurrent /status updates and comparing wall-clock
2189    /// against event order — is non-deterministic and ties the unit
2190    /// test to a real broker. A textual ordering check is brittle to
2191    /// formatting changes but precise about the invariant: regressions
2192    /// that move the capture back below the lock fail loudly.
2193    #[test]
2194    fn update_formation_status_captures_time_before_lock() {
2195        let src = include_str!("formations.rs");
2196        // Locate the function body. The function signature is stable
2197        // (its arguments are part of the axum handler contract); we
2198        // anchor on the `pub async fn update_formation_status(` opener
2199        // and walk to the matching closing brace by depth-counting
2200        // outside string literals. For this assertion we only need a
2201        // window that is unambiguously inside the function — a window
2202        // bounded by the next sibling `fn`/`async fn` is sufficient
2203        // and avoids the full brace-matcher complexity.
2204        let fn_start = src
2205            .find("pub async fn update_formation_status(")
2206            .expect("update_formation_status not found");
2207        let after_fn = &src[fn_start..];
2208        // The next `async fn ` or `fn ` outside of this function bounds
2209        // the body. We look for one indented at 0 columns (top-level).
2210        let next_top_level_fn = after_fn[1..]
2211            .find("\nfn ")
2212            .map(|i| i + 1)
2213            .or_else(|| after_fn[1..].find("\nasync fn ").map(|i| i + 1))
2214            .or_else(|| after_fn[1..].find("\npub fn ").map(|i| i + 1))
2215            .or_else(|| after_fn[1..].find("\npub async fn ").map(|i| i + 1))
2216            .unwrap_or(after_fn.len() - 1);
2217        let body = &after_fn[..next_top_level_fn];
2218
2219        let time_capture_pos = body
2220            .find("let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(")
2221            .expect(
2222                "EVT-CONTENT-001-C regression: `let now_rfc3339 = chrono::Utc::now()` not found \
2223                 in update_formation_status",
2224            );
2225        let lock_pos = body.find("state.formations.write().await").expect(
2226            "update_formation_status no longer acquires a write lock — test must be updated",
2227        );
2228
2229        assert!(
2230            time_capture_pos < lock_pos,
2231            "EVT-CONTENT-001-C regression: `now_rfc3339` was captured at byte offset \
2232             {time_capture_pos} but the write lock was taken at offset {lock_pos}. \
2233             Move the `chrono::Utc::now().to_rfc3339_opts(...)` call ABOVE the \
2234             `state.formations.write().await` line so CloudEvent `time` reflects \
2235             transition arrival rather than post-commit wall-clock.",
2236        );
2237    }
2238}