1use axum::extract::{Path, State};
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::Json;
13use cellos_core::events::{
14 cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
15 cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
16 cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
17};
18use serde::{Deserialize, Serialize};
19use uuid::Uuid;
20
21use crate::auth::require_bearer;
22use crate::error::{AppError, AppErrorKind};
23use crate::state::{AppState, FormationRecord, FormationStatus};
24
25#[derive(Debug, Deserialize)]
42pub struct FormationDocument {
43 pub name: String,
44 pub coordinator: String,
45 pub members: Vec<FormationMember>,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct FormationMember {
50 pub id: String,
51 #[serde(rename = "authorizedBy")]
54 pub authorized_by: Option<String>,
55}
56
57#[derive(Debug, Serialize)]
58pub struct FormationCreated {
59 pub id: Uuid,
60 pub name: String,
61 pub status: FormationStatus,
62}
63
64pub async fn create_formation(
67 State(state): State<AppState>,
68 headers: HeaderMap,
69 body: axum::body::Bytes,
70) -> Result<impl IntoResponse, AppError> {
71 require_bearer(&headers, &state.api_token)?;
72
73 let raw: serde_json::Value = serde_json::from_slice(&body)?;
82 let normalized = normalize_formation_document(&raw)?;
83 let doc: FormationDocument = serde_json::from_value(normalized.clone())?;
84
85 validate_formation(&doc)?;
86
87 let id = Uuid::new_v4();
88 let record = FormationRecord {
89 id,
90 name: doc.name.clone(),
91 status: FormationStatus::Pending,
92 document: normalized,
96 };
97
98 state.formations.write().await.insert(id, record);
99
100 let cell_count = doc.members.len() as u32;
107 let no_failed: &[String] = &[];
108 let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
109 let event = cloud_event_v1_formation_created(
110 &id.to_string(),
111 &now_rfc3339,
112 &id.to_string(),
113 &doc.name,
114 cell_count,
115 no_failed,
116 None,
117 );
118 let subject = format!("cellos.events.formations.{id}.created");
119 publish_event(&state, &subject, event).await;
120
121 let body = FormationCreated {
122 id,
123 name: doc.name,
124 status: FormationStatus::Pending,
125 };
126 Ok((StatusCode::CREATED, Json(body)))
127}
128
129#[derive(Debug, Serialize)]
135pub struct FormationsSnapshot {
136 pub formations: Vec<FormationRecord>,
137 pub cursor: u64,
138}
139
140pub async fn list_formations(
143 State(state): State<AppState>,
144 headers: HeaderMap,
145) -> Result<Json<FormationsSnapshot>, AppError> {
146 require_bearer(&headers, &state.api_token)?;
147 let map = state.formations.read().await;
148 Ok(Json(FormationsSnapshot {
149 formations: map.values().cloned().collect(),
150 cursor: state.cursor(),
151 }))
152}
153
154pub async fn get_formation(
156 State(state): State<AppState>,
157 headers: HeaderMap,
158 Path(id): Path<Uuid>,
159) -> Result<Json<FormationRecord>, AppError> {
160 require_bearer(&headers, &state.api_token)?;
161 let map = state.formations.read().await;
162 map.get(&id)
163 .cloned()
164 .map(Json)
165 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
166}
167
168pub async fn get_formation_by_name(
182 State(state): State<AppState>,
183 headers: HeaderMap,
184 Path(name): Path<String>,
185) -> Result<Json<FormationRecord>, AppError> {
186 require_bearer(&headers, &state.api_token)?;
187 let map = state.formations.read().await;
188 map.values()
189 .find(|r| r.name == name)
190 .cloned()
191 .map(Json)
192 .ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))
193}
194
195pub async fn delete_formation_by_name(
200 State(state): State<AppState>,
201 headers: HeaderMap,
202 Path(name): Path<String>,
203) -> Result<StatusCode, AppError> {
204 require_bearer(&headers, &state.api_token)?;
205 let id = {
211 let map = state.formations.read().await;
212 map.values()
213 .find(|r| r.name == name)
214 .map(|r| r.id)
215 .ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))?
216 };
217 delete_formation(State(state), headers, Path(id)).await
218}
219
220pub async fn delete_formation(
225 State(state): State<AppState>,
226 headers: HeaderMap,
227 Path(id): Path<Uuid>,
228) -> Result<StatusCode, AppError> {
229 require_bearer(&headers, &state.api_token)?;
230 let mut map = state.formations.write().await;
231 let (name, cell_count) = {
232 let entry = map
233 .get_mut(&id)
234 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
235 entry.status = FormationStatus::Cancelled;
236 let members = entry
237 .document
238 .get("members")
239 .and_then(|m| m.as_array())
240 .map(|a| a.len() as u32)
241 .unwrap_or(0);
242 (entry.name.clone(), members)
243 };
244 drop(map);
245
246 let no_failed: &[String] = &[];
247 let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
250 let event = cloud_event_v1_formation_failed(
251 &id.to_string(),
252 &now_rfc3339,
253 &id.to_string(),
254 &name,
255 cell_count,
256 no_failed,
257 Some("deleted by operator"),
258 );
259 let subject = format!("cellos.events.formations.{id}.failed");
260 publish_event(&state, &subject, event).await;
261
262 Ok(StatusCode::NO_CONTENT)
263}
264
265#[derive(Debug, Deserialize)]
270pub struct StatusTransition {
271 pub state: String,
272 pub reason: Option<String>,
273 pub failed_cells: Option<Vec<String>>,
274}
275
276pub async fn update_formation_status(
277 State(state): State<AppState>,
278 headers: HeaderMap,
279 Path(id): Path<Uuid>,
280 Json(body): Json<StatusTransition>,
281) -> Result<StatusCode, AppError> {
282 require_bearer(&headers, &state.api_token)?;
283
284 let (new_status, name, cell_count, failed) = {
285 let mut map = state.formations.write().await;
286 let entry = map
287 .get_mut(&id)
288 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
289
290 let new_status = match body.state.to_uppercase().as_str() {
291 "RUNNING" | "LAUNCHING" => FormationStatus::Running,
292 "DEGRADED" => FormationStatus::Running, "COMPLETED" => FormationStatus::Succeeded,
294 "FAILED" => FormationStatus::Failed,
295 other => {
296 return Err(AppError::new(
302 AppErrorKind::BadRequest,
303 format!("unknown state: {other}"),
304 ));
305 }
306 };
307 entry.status = new_status;
308
309 let members = entry
310 .document
311 .get("members")
312 .and_then(|m| m.as_array())
313 .map(|a| a.len() as u32)
314 .unwrap_or(0);
315 let failed = body.failed_cells.unwrap_or_default();
316 (new_status, entry.name.clone(), members, failed)
317 };
318
319 let sid = id.to_string();
320 let reason = body.reason.as_deref();
321 let empty: &[String] = &[];
322 let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
328
329 let (event, phase) = match body.state.to_uppercase().as_str() {
330 "LAUNCHING" => (
331 cloud_event_v1_formation_launching(
332 &sid,
333 &now_rfc3339,
334 &sid,
335 &name,
336 cell_count,
337 empty,
338 reason,
339 ),
340 "launching",
341 ),
342 "RUNNING" => (
343 cloud_event_v1_formation_running(
344 &sid,
345 &now_rfc3339,
346 &sid,
347 &name,
348 cell_count,
349 empty,
350 reason,
351 ),
352 "running",
353 ),
354 "DEGRADED" => (
355 cloud_event_v1_formation_degraded(
356 &sid,
357 &now_rfc3339,
358 &sid,
359 &name,
360 cell_count,
361 &failed,
362 reason,
363 ),
364 "degraded",
365 ),
366 "COMPLETED" => (
367 cloud_event_v1_formation_completed(
368 &sid,
369 &now_rfc3339,
370 &sid,
371 &name,
372 cell_count,
373 empty,
374 reason,
375 ),
376 "completed",
377 ),
378 _ => (
379 cloud_event_v1_formation_failed(
380 &sid,
381 &now_rfc3339,
382 &sid,
383 &name,
384 cell_count,
385 &failed,
386 reason,
387 ),
388 "failed",
389 ),
390 };
391
392 let subject = format!("cellos.events.formations.{id}.{phase}");
393 publish_event(&state, &subject, event).await;
394
395 let _ = new_status; Ok(StatusCode::NO_CONTENT)
397}
398
399async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
403 let Some(nats) = &state.nats else { return };
404 let payload = match serde_json::to_vec(&event) {
405 Ok(b) => b,
406 Err(e) => {
407 tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
408 return;
409 }
410 };
411 if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
412 tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
413 }
414}
415
416fn normalize_formation_document(raw: &serde_json::Value) -> Result<serde_json::Value, AppError> {
450 let Some(obj) = raw.as_object() else {
459 return Ok(raw.clone());
462 };
463
464 const FLAT_KEYS: &[&str] = &["name", "coordinator", "members"];
465 const KUBECTL_KEYS: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
466
467 let flat_keys_present: Vec<&str> = FLAT_KEYS
468 .iter()
469 .copied()
470 .filter(|k| obj.contains_key(*k))
471 .collect();
472 let kubectl_keys_present: Vec<&str> = KUBECTL_KEYS
473 .iter()
474 .copied()
475 .filter(|k| obj.contains_key(*k))
476 .collect();
477
478 let has_flat = !flat_keys_present.is_empty();
479 let has_kubectl = !kubectl_keys_present.is_empty();
480
481 if has_flat && has_kubectl {
482 return Err(AppError::bad_request(format!(
483 "hybrid formation document: top-level flat field(s) {flat:?} \
484 conflict with kubectl-style envelope field(s) {kubectl:?}; \
485 pick exactly one shape (see contracts/schemas/formation-v1.schema.json)",
486 flat = flat_keys_present,
487 kubectl = kubectl_keys_present,
488 )));
489 }
490
491 if !has_kubectl {
492 return Ok(raw.clone());
495 }
496
497 let api_version = obj
499 .get("apiVersion")
500 .and_then(|v| v.as_str())
501 .ok_or_else(|| {
502 AppError::bad_request(
503 "kubectl-style formation: missing or non-string 'apiVersion' (expected \"cellos.dev/v1\")"
504 .to_string(),
505 )
506 })?;
507 if api_version != "cellos.dev/v1" {
508 return Err(AppError::bad_request(format!(
509 "kubectl-style formation: unsupported apiVersion '{api_version}' (expected \"cellos.dev/v1\")"
510 )));
511 }
512
513 let kind = obj.get("kind").and_then(|v| v.as_str()).ok_or_else(|| {
514 AppError::bad_request(
515 "kubectl-style formation: missing or non-string 'kind' (expected \"Formation\")"
516 .to_string(),
517 )
518 })?;
519 if kind != "Formation" {
520 return Err(AppError::bad_request(format!(
521 "kubectl-style formation: unsupported kind '{kind}' (expected \"Formation\")"
522 )));
523 }
524
525 let metadata = obj
526 .get("metadata")
527 .and_then(|v| v.as_object())
528 .ok_or_else(|| {
529 AppError::bad_request("kubectl-style formation: missing 'metadata' object".to_string())
530 })?;
531 let name = metadata
532 .get("name")
533 .and_then(|v| v.as_str())
534 .ok_or_else(|| {
535 AppError::bad_request("kubectl-style formation: missing 'metadata.name'".to_string())
536 })?;
537
538 let spec = obj.get("spec").and_then(|v| v.as_object()).ok_or_else(|| {
539 AppError::bad_request("kubectl-style formation: missing 'spec' object".to_string())
540 })?;
541
542 let coordinator = spec
543 .get("coordinator")
544 .and_then(|v| v.as_str())
545 .ok_or_else(|| {
546 AppError::bad_request("kubectl-style formation: missing 'spec.coordinator'".to_string())
547 })?;
548
549 let members_raw = spec
550 .get("members")
551 .and_then(|v| v.as_array())
552 .ok_or_else(|| {
553 AppError::bad_request(
554 "kubectl-style formation: missing or non-array 'spec.members'".to_string(),
555 )
556 })?;
557
558 let mut members_flat = Vec::with_capacity(members_raw.len());
564 for (idx, m) in members_raw.iter().enumerate() {
565 let m_obj = m.as_object().ok_or_else(|| {
566 AppError::bad_request(format!(
567 "kubectl-style formation: spec.members[{idx}] is not an object"
568 ))
569 })?;
570 let member_name = m_obj.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
571 AppError::bad_request(format!(
572 "kubectl-style formation: spec.members[{idx}] missing 'name'"
573 ))
574 })?;
575
576 let mut rewritten = serde_json::Map::with_capacity(m_obj.len());
577 rewritten.insert(
578 "id".to_string(),
579 serde_json::Value::String(member_name.to_string()),
580 );
581 for (k, v) in m_obj.iter() {
582 if k == "name" {
583 continue; }
585 rewritten.insert(k.clone(), v.clone());
586 }
587 members_flat.push(serde_json::Value::Object(rewritten));
588 }
589
590 let mut flat = serde_json::Map::with_capacity(3);
591 flat.insert(
592 "name".to_string(),
593 serde_json::Value::String(name.to_string()),
594 );
595 flat.insert(
596 "coordinator".to_string(),
597 serde_json::Value::String(coordinator.to_string()),
598 );
599 flat.insert(
600 "members".to_string(),
601 serde_json::Value::Array(members_flat),
602 );
603
604 Ok(serde_json::Value::Object(flat))
605}
606
607fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
630 use std::collections::{HashMap, HashSet};
631
632 let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
634 if !coord_present {
635 return Err(AppError::new(
636 AppErrorKind::FormationNoCoordinator,
637 format!(
638 "coordinator '{}' must appear in members list",
639 doc.coordinator
640 ),
641 ));
642 }
643
644 let mut seen: HashSet<&str> = HashSet::new();
649 for m in &doc.members {
650 if !seen.insert(m.id.as_str()) {
651 return Err(AppError::new(
652 AppErrorKind::FormationMultipleCoordinators,
653 format!("duplicate member id '{}'", m.id),
654 ));
655 }
656 }
657
658 for m in &doc.members {
663 let is_coord = m.id == doc.coordinator;
664 match (is_coord, &m.authorized_by) {
665 (true, Some(_)) => {
666 return Err(AppError::new(
667 AppErrorKind::FormationAuthorityNotNarrowing,
668 format!("coordinator '{}' must not declare authorizedBy", m.id),
669 ));
670 }
671 (false, None) => {
672 return Err(AppError::new(
673 AppErrorKind::FormationAuthorityNotNarrowing,
674 format!("non-coordinator member '{}' missing authorizedBy", m.id),
675 ));
676 }
677 (false, Some(parent)) => {
678 if !seen.contains(parent.as_str()) {
679 return Err(AppError::new(
680 AppErrorKind::FormationAuthorityNotNarrowing,
681 format!("member '{}' references unknown parent '{}'", m.id, parent),
682 ));
683 }
684 }
685 _ => {}
686 }
687 }
688
689 let parent: HashMap<&str, &str> = doc
694 .members
695 .iter()
696 .filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
697 .collect();
698
699 for m in &doc.members {
700 if m.id == doc.coordinator {
701 continue;
702 }
703 let mut cursor = m.id.as_str();
704 for _ in 0..doc.members.len() {
705 let Some(&p) = parent.get(cursor) else {
706 break;
709 };
710 if p == m.id {
711 return Err(AppError::new(
712 AppErrorKind::FormationCycle,
713 format!("authorizedBy cycle detected involving member '{}'", m.id),
714 ));
715 }
716 cursor = p;
717 }
718 if parent.contains_key(cursor) {
719 return Err(AppError::new(
723 AppErrorKind::FormationCycle,
724 format!(
725 "authorizedBy cycle detected on chain starting at '{}'",
726 m.id
727 ),
728 ));
729 }
730 }
731
732 Ok(())
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738 use crate::router;
739 use axum::body::Body;
740 use axum::http::{header, Request};
741 use http_body_util::BodyExt;
742 use tower::ServiceExt;
743
744 const TOKEN: &str = "test-token";
745
746 fn test_state() -> AppState {
747 AppState::new(None, TOKEN)
748 }
749
750 fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
751 let mut b = Request::builder()
752 .method(method)
753 .uri(uri)
754 .header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
755 if body.is_some() {
756 b = b.header(header::CONTENT_TYPE, "application/json");
757 }
758 b.body(
759 body.map(|s| Body::from(s.to_owned()))
760 .unwrap_or_else(Body::empty),
761 )
762 .expect("build request")
763 }
764
765 #[tokio::test]
766 async fn post_valid_formation_returns_201() {
767 let app = router(test_state());
768 let body = serde_json::json!({
769 "name": "demo",
770 "coordinator": "coord",
771 "members": [
772 { "id": "coord" },
773 { "id": "worker-a", "authorizedBy": "coord" },
774 { "id": "worker-b", "authorizedBy": "coord" }
775 ]
776 })
777 .to_string();
778
779 let resp = app
780 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
781 .await
782 .expect("router response");
783 assert_eq!(resp.status(), StatusCode::CREATED);
784
785 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
786 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
787 assert_eq!(parsed["status"], "PENDING");
788 assert_eq!(parsed["name"], "demo");
789 assert!(parsed["id"].as_str().is_some());
790 }
791
792 #[tokio::test]
793 async fn post_formation_missing_coordinator_returns_400() {
794 let app = router(test_state());
795 let body = serde_json::json!({
797 "name": "demo",
798 "coordinator": "coord",
799 "members": [
800 { "id": "worker-a", "authorizedBy": "coord" }
801 ]
802 })
803 .to_string();
804
805 let resp = app
806 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
807 .await
808 .expect("router response");
809 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
810 let ct = resp
811 .headers()
812 .get(header::CONTENT_TYPE)
813 .and_then(|v| v.to_str().ok())
814 .unwrap_or_default()
815 .to_owned();
816 assert!(
817 ct.starts_with("application/problem+json"),
818 "expected RFC 9457 media type, got {ct:?}"
819 );
820 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
821 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
822 assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
823 }
824
825 #[tokio::test]
826 async fn post_formation_member_missing_authorized_by_returns_400() {
827 let app = router(test_state());
828 let body = serde_json::json!({
829 "name": "demo",
830 "coordinator": "coord",
831 "members": [
832 { "id": "coord" },
833 { "id": "worker-a" } ]
835 })
836 .to_string();
837
838 let resp = app
839 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
840 .await
841 .expect("router response");
842 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
843 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
844 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
845 assert_eq!(
846 parsed["type"], "/problems/formation/authority-not-narrowing",
847 "expected authority-not-narrowing discriminant, got {parsed}"
848 );
849 }
850
851 #[tokio::test]
852 async fn get_formations_returns_snapshot_with_cursor() {
853 let app = router(test_state());
855 let resp = app
856 .oneshot(auth_req("GET", "/v1/formations", None))
857 .await
858 .expect("router response");
859 assert_eq!(resp.status(), StatusCode::OK);
860 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
861 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
862 assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
863 let arr = parsed["formations"].as_array().expect("formations array");
864 assert_eq!(arr.len(), 0);
865 assert!(
866 parsed["cursor"].is_u64(),
867 "cursor field must be an unsigned integer, got {}",
868 parsed["cursor"]
869 );
870 assert_eq!(parsed["cursor"].as_u64(), Some(0));
871 }
872
873 #[tokio::test]
874 async fn snapshot_returns_cursor() {
875 let app = router(test_state());
879 let body = serde_json::json!({
880 "name": "with-cursor",
881 "coordinator": "coord",
882 "members": [
883 { "id": "coord" },
884 { "id": "worker-a", "authorizedBy": "coord" }
885 ]
886 })
887 .to_string();
888
889 let resp = app
890 .clone()
891 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
892 .await
893 .expect("router response");
894 assert_eq!(resp.status(), StatusCode::CREATED);
895
896 let resp = app
897 .oneshot(auth_req("GET", "/v1/formations", None))
898 .await
899 .expect("router response");
900 assert_eq!(resp.status(), StatusCode::OK);
901 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
902 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
903 assert!(
904 parsed["cursor"].is_u64(),
905 "cursor must be unsigned integer; got {}",
906 parsed["cursor"]
907 );
908 let formations = parsed["formations"].as_array().expect("formations array");
909 assert_eq!(formations.len(), 1, "expected 1 formation after POST");
910 assert_eq!(formations[0]["name"], "with-cursor");
911 }
912
913 #[tokio::test]
914 async fn missing_bearer_returns_401() {
915 let app = router(test_state());
916 let resp = app
917 .oneshot(
918 Request::builder()
919 .method("GET")
920 .uri("/v1/formations")
921 .body(Body::empty())
922 .unwrap(),
923 )
924 .await
925 .expect("router response");
926 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
927 }
928
929 #[tokio::test]
935 async fn rejects_duplicate_member_ids_with_multiple_coordinators_type() {
936 let app = router(test_state());
937 let body = serde_json::json!({
938 "name": "dup-ids",
939 "coordinator": "coord",
940 "members": [
941 { "id": "coord" },
942 { "id": "coord", "authorizedBy": "coord" }
943 ]
944 })
945 .to_string();
946 let resp = app
947 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
948 .await
949 .expect("router response");
950 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
951 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
952 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
953 assert_eq!(
954 parsed["type"], "/problems/formation/multiple-coordinators",
955 "duplicate member ids must surface multipleCoordinators"
956 );
957 }
958
959 #[tokio::test]
962 async fn rejects_self_authorized_cycle() {
963 let app = router(test_state());
964 let body = serde_json::json!({
965 "name": "self-cycle",
966 "coordinator": "coord",
967 "members": [
968 { "id": "coord" },
969 { "id": "worker-a", "authorizedBy": "worker-a" }
970 ]
971 })
972 .to_string();
973 let resp = app
974 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
975 .await
976 .expect("router response");
977 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
978 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
979 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
980 assert_eq!(parsed["type"], "/problems/formation/cycle");
981 }
982
983 #[tokio::test]
985 async fn rejects_two_node_cycle() {
986 let app = router(test_state());
987 let body = serde_json::json!({
988 "name": "two-cycle",
989 "coordinator": "coord",
990 "members": [
991 { "id": "coord" },
992 { "id": "a", "authorizedBy": "b" },
993 { "id": "b", "authorizedBy": "a" }
994 ]
995 })
996 .to_string();
997 let resp = app
998 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
999 .await
1000 .expect("router response");
1001 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1002 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1003 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1004 assert_eq!(parsed["type"], "/problems/formation/cycle");
1005 }
1006
1007 #[tokio::test]
1011 async fn rejects_orphan_parent_reference() {
1012 let app = router(test_state());
1013 let body = serde_json::json!({
1014 "name": "orphan-parent",
1015 "coordinator": "coord",
1016 "members": [
1017 { "id": "coord" },
1018 { "id": "worker-a", "authorizedBy": "ghost" }
1019 ]
1020 })
1021 .to_string();
1022 let resp = app
1023 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1024 .await
1025 .expect("router response");
1026 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1027 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1028 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1029 assert_eq!(
1030 parsed["type"],
1031 "/problems/formation/authority-not-narrowing"
1032 );
1033 }
1034
1035 #[tokio::test]
1039 async fn post_formation_oversized_body_returns_413() {
1040 let app = router(test_state());
1041 let big = "x".repeat(70 * 1024);
1046 let body =
1047 format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
1048 let resp = app
1049 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1050 .await
1051 .expect("router response");
1052 assert_eq!(
1053 resp.status(),
1054 StatusCode::PAYLOAD_TOO_LARGE,
1055 "oversized body must surface 413; got {:?}",
1056 resp.status(),
1057 );
1058 }
1059
1060 #[tokio::test]
1072 async fn get_formation_by_id_captures_path() {
1073 let state = test_state();
1074 let body = serde_json::json!({
1075 "name": "probe",
1076 "coordinator": "coord",
1077 "members": [
1078 { "id": "coord" },
1079 { "id": "worker-a", "authorizedBy": "coord" }
1080 ]
1081 })
1082 .to_string();
1083 let resp = router(state.clone())
1084 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1085 .await
1086 .expect("router response");
1087 assert_eq!(resp.status(), StatusCode::CREATED);
1088 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1089 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1090 let id = parsed["id"].as_str().expect("uuid string");
1091
1092 let resp = router(state)
1093 .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
1094 .await
1095 .expect("router response");
1096 assert_eq!(
1097 resp.status(),
1098 StatusCode::OK,
1099 "GET /v1/formations/<id> must capture the path segment; got {:?}",
1100 resp.status(),
1101 );
1102 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1103 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1104 assert_eq!(parsed["name"], "probe");
1105 }
1106
1107 #[tokio::test]
1112 async fn post_kubectl_style_formation_returns_201() {
1113 let app = router(test_state());
1114 let body = serde_json::json!({
1115 "apiVersion": "cellos.dev/v1",
1116 "kind": "Formation",
1117 "metadata": { "name": "kubectl-demo" },
1118 "spec": {
1119 "coordinator": "coord",
1120 "members": [
1121 { "name": "coord" },
1122 { "name": "worker-a", "authorizedBy": "coord" },
1123 { "name": "worker-b", "authorizedBy": "coord" }
1124 ]
1125 }
1126 })
1127 .to_string();
1128
1129 let resp = app
1130 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1131 .await
1132 .expect("router response");
1133 assert_eq!(resp.status(), StatusCode::CREATED);
1134
1135 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1136 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1137 assert_eq!(parsed["status"], "PENDING");
1138 assert_eq!(parsed["name"], "kubectl-demo");
1139 assert!(parsed["id"].as_str().is_some());
1140 }
1141
1142 #[tokio::test]
1147 async fn post_kubectl_style_missing_coordinator_returns_no_coordinator() {
1148 let app = router(test_state());
1149 let body = serde_json::json!({
1150 "apiVersion": "cellos.dev/v1",
1151 "kind": "Formation",
1152 "metadata": { "name": "missing-coord" },
1153 "spec": {
1154 "coordinator": "coord",
1155 "members": [
1156 { "name": "worker-a", "authorizedBy": "coord" }
1157 ]
1158 }
1159 })
1160 .to_string();
1161
1162 let resp = app
1163 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1164 .await
1165 .expect("router response");
1166 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1167 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1168 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1169 assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
1170 }
1171
1172 #[tokio::test]
1176 async fn post_hybrid_formation_returns_400_bad_request() {
1177 let app = router(test_state());
1178 let body = serde_json::json!({
1179 "apiVersion": "cellos.dev/v1",
1180 "kind": "Formation",
1181 "metadata": { "name": "hybrid" },
1182 "spec": {
1183 "coordinator": "coord",
1184 "members": [ { "name": "coord" } ]
1185 },
1186 "name": "hybrid",
1188 "members": [ { "id": "coord" } ]
1189 })
1190 .to_string();
1191
1192 let resp = app
1193 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1194 .await
1195 .expect("router response");
1196 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1197 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1198 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1199 assert_eq!(
1200 parsed["type"], "/problems/bad-request",
1201 "hybrid shape must surface a generic bad-request, not an admission discriminant"
1202 );
1203 let detail = parsed["detail"].as_str().unwrap_or_default();
1204 assert!(
1205 detail.contains("hybrid"),
1206 "detail must mention 'hybrid'; got {detail:?}"
1207 );
1208 }
1209
1210 #[tokio::test]
1213 async fn post_kubectl_style_wrong_api_version_returns_400() {
1214 let app = router(test_state());
1215 let body = serde_json::json!({
1216 "apiVersion": "cellos.dev/v2",
1217 "kind": "Formation",
1218 "metadata": { "name": "wrong-api" },
1219 "spec": {
1220 "coordinator": "coord",
1221 "members": [ { "name": "coord" } ]
1222 }
1223 })
1224 .to_string();
1225
1226 let resp = app
1227 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1228 .await
1229 .expect("router response");
1230 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1231 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1232 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1233 assert_eq!(parsed["type"], "/problems/bad-request");
1234 let detail = parsed["detail"].as_str().unwrap_or_default();
1235 assert!(
1236 detail.contains("apiVersion") && detail.contains("cellos.dev/v2"),
1237 "detail must name the bad apiVersion; got {detail:?}"
1238 );
1239 }
1240
1241 #[tokio::test]
1243 async fn post_kubectl_style_wrong_kind_returns_400() {
1244 let app = router(test_state());
1245 let body = serde_json::json!({
1246 "apiVersion": "cellos.dev/v1",
1247 "kind": "Cell",
1248 "metadata": { "name": "wrong-kind" },
1249 "spec": {
1250 "coordinator": "coord",
1251 "members": [ { "name": "coord" } ]
1252 }
1253 })
1254 .to_string();
1255
1256 let resp = app
1257 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1258 .await
1259 .expect("router response");
1260 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1261 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1262 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1263 assert_eq!(parsed["type"], "/problems/bad-request");
1264 let detail = parsed["detail"].as_str().unwrap_or_default();
1265 assert!(
1266 detail.contains("kind") && detail.contains("Cell"),
1267 "detail must name the bad kind; got {detail:?}"
1268 );
1269 }
1270
1271 #[tokio::test]
1275 async fn kubectl_style_post_then_get_returns_normalized_flat_document() {
1276 let state = test_state();
1277 let body = serde_json::json!({
1278 "apiVersion": "cellos.dev/v1",
1279 "kind": "Formation",
1280 "metadata": { "name": "roundtrip" },
1281 "spec": {
1282 "coordinator": "coord",
1283 "members": [
1284 { "name": "coord" },
1285 { "name": "worker-a", "authorizedBy": "coord" }
1286 ]
1287 }
1288 })
1289 .to_string();
1290
1291 let resp = router(state.clone())
1292 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1293 .await
1294 .expect("router response");
1295 assert_eq!(resp.status(), StatusCode::CREATED);
1296 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1297 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1298 let id = parsed["id"].as_str().expect("uuid string");
1299
1300 let resp = router(state)
1301 .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
1302 .await
1303 .expect("router response");
1304 assert_eq!(resp.status(), StatusCode::OK);
1305 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1306 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1307 let doc = &parsed["document"];
1308 assert_eq!(doc["name"], "roundtrip", "flat 'name' present");
1309 assert_eq!(doc["coordinator"], "coord", "flat 'coordinator' present");
1310 let members = doc["members"]
1311 .as_array()
1312 .expect("members array on normalized doc");
1313 assert_eq!(members.len(), 2);
1314 assert_eq!(members[0]["id"], "coord");
1315 assert_eq!(members[1]["id"], "worker-a");
1316 assert_eq!(members[1]["authorizedBy"], "coord");
1317 assert!(
1319 doc.get("apiVersion").is_none(),
1320 "kubectl envelope must not leak into normalized doc"
1321 );
1322 assert!(doc.get("kind").is_none());
1323 assert!(doc.get("metadata").is_none());
1324 assert!(doc.get("spec").is_none());
1325 }
1326
1327 #[tokio::test]
1332 async fn unknown_state_returns_bad_request_problem_type() {
1333 let state = test_state();
1334 let body = serde_json::json!({
1335 "name": "demo",
1336 "coordinator": "coord",
1337 "members": [
1338 { "id": "coord" },
1339 { "id": "worker-a", "authorizedBy": "coord" }
1340 ]
1341 })
1342 .to_string();
1343 let resp = router(state.clone())
1344 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1345 .await
1346 .expect("router response");
1347 assert_eq!(resp.status(), StatusCode::CREATED);
1348 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1349 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1350 let id = parsed["id"].as_str().expect("uuid string").to_owned();
1351
1352 let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
1353 let resp = router(state)
1354 .oneshot(auth_req(
1355 "POST",
1356 &format!("/v1/formations/{id}/status"),
1357 Some(&bad),
1358 ))
1359 .await
1360 .expect("router response");
1361 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1362 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1363 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1364 assert_eq!(
1365 parsed["type"], "/problems/bad-request",
1366 "unknown state must surface generic bad-request, not an ADR-0010 discriminant"
1367 );
1368 }
1369}