1use axum::extract::{Path, State};
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::Json;
13use cellos_core::events::{
14 cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
15 cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
16 cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
17};
18use serde::{Deserialize, Serialize};
19use uuid::Uuid;
20
21use crate::auth::require_bearer;
22use crate::error::{AppError, AppErrorKind};
23use crate::state::{AppState, FormationRecord, FormationStatus};
24
25#[derive(Debug, Deserialize)]
42pub struct FormationDocument {
43 pub name: String,
44 pub coordinator: String,
45 pub members: Vec<FormationMember>,
46}
47
48#[derive(Debug, Deserialize)]
49pub struct FormationMember {
50 pub id: String,
51 #[serde(rename = "authorizedBy")]
54 pub authorized_by: Option<String>,
55}
56
57#[derive(Debug, Serialize)]
58pub struct FormationCreated {
59 pub id: Uuid,
60 pub name: String,
61 pub status: FormationStatus,
62}
63
64pub async fn create_formation(
67 State(state): State<AppState>,
68 headers: HeaderMap,
69 body: axum::body::Bytes,
70) -> Result<impl IntoResponse, AppError> {
71 require_bearer(&headers, &state.api_token)?;
72
73 let raw: serde_json::Value = serde_json::from_slice(&body)?;
82 let normalized = normalize_formation_document(&raw)?;
83 let doc: FormationDocument = serde_json::from_value(normalized.clone())?;
84
85 validate_formation_name(&doc.name)?;
92
93 validate_formation(&doc)?;
94
95 let id = Uuid::new_v4();
107 let record = FormationRecord {
108 id,
109 name: doc.name.clone(),
110 status: FormationStatus::Pending,
111 document: normalized,
115 };
116
117 {
118 let mut map = state.formations.write().await;
119 if let Some(existing) = map.values().find(|r| r.name == doc.name) {
120 return Err(AppError::new(
121 AppErrorKind::Conflict,
122 format!(
123 "formation name '{}' already in use by {}",
124 doc.name, existing.id
125 ),
126 ));
127 }
128 map.insert(id, record);
129 }
130
131 let cell_count = doc.members.len() as u32;
138 let no_failed: &[String] = &[];
139 let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
140 let event = cloud_event_v1_formation_created(
141 &id.to_string(),
142 &now_rfc3339,
143 &id.to_string(),
144 &doc.name,
145 cell_count,
146 no_failed,
147 None,
148 );
149 let subject = format!("cellos.events.formations.{id}.created");
150 publish_event(&state, &subject, event).await;
151
152 let body = FormationCreated {
153 id,
154 name: doc.name,
155 status: FormationStatus::Pending,
156 };
157 Ok((StatusCode::CREATED, Json(body)))
158}
159
160#[derive(Debug, Serialize)]
166pub struct FormationsSnapshot {
167 pub formations: Vec<FormationRecord>,
168 pub cursor: u64,
169}
170
171pub async fn list_formations(
174 State(state): State<AppState>,
175 headers: HeaderMap,
176) -> Result<Json<FormationsSnapshot>, AppError> {
177 require_bearer(&headers, &state.api_token)?;
178 let map = state.formations.read().await;
179 Ok(Json(FormationsSnapshot {
180 formations: map.values().cloned().collect(),
181 cursor: state.cursor(),
182 }))
183}
184
185pub async fn get_formation(
187 State(state): State<AppState>,
188 headers: HeaderMap,
189 Path(id): Path<Uuid>,
190) -> Result<Json<FormationRecord>, AppError> {
191 require_bearer(&headers, &state.api_token)?;
192 let map = state.formations.read().await;
193 map.get(&id)
194 .cloned()
195 .map(Json)
196 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
197}
198
199pub async fn get_formation_by_name(
213 State(state): State<AppState>,
214 headers: HeaderMap,
215 Path(name): Path<String>,
216) -> Result<Json<FormationRecord>, AppError> {
217 require_bearer(&headers, &state.api_token)?;
218 let map = state.formations.read().await;
219 map.values()
220 .find(|r| r.name == name)
221 .cloned()
222 .map(Json)
223 .ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))
224}
225
226pub async fn delete_formation_by_name(
231 State(state): State<AppState>,
232 headers: HeaderMap,
233 Path(name): Path<String>,
234) -> Result<StatusCode, AppError> {
235 require_bearer(&headers, &state.api_token)?;
236 let id = {
251 let map = state.formations.read().await;
252 let matches: Vec<Uuid> = map
253 .values()
254 .filter(|r| r.name == name)
255 .map(|r| r.id)
256 .collect();
257 match matches.len() {
258 0 => return Err(AppError::not_found(format!("formation '{name}' not found"))),
259 1 => matches[0],
260 _ => {
261 let mut ids = matches;
264 ids.sort();
265 let id_list = ids
266 .iter()
267 .map(Uuid::to_string)
268 .collect::<Vec<_>>()
269 .join(", ");
270 return Err(AppError::new(
271 AppErrorKind::Conflict,
272 format!(
273 "multiple formations share name '{name}': [{id_list}]; \
274 delete by UUID via /v1/formations/{{id}} to disambiguate"
275 ),
276 ));
277 }
278 }
279 };
280 delete_formation(State(state), headers, Path(id)).await
281}
282
283pub async fn delete_formation(
288 State(state): State<AppState>,
289 headers: HeaderMap,
290 Path(id): Path<Uuid>,
291) -> Result<StatusCode, AppError> {
292 require_bearer(&headers, &state.api_token)?;
293 let mut map = state.formations.write().await;
294 let (name, cell_count) = {
295 let entry = map
296 .get_mut(&id)
297 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
298 entry.status = FormationStatus::Cancelled;
299 let members = entry
300 .document
301 .get("members")
302 .and_then(|m| m.as_array())
303 .map(|a| a.len() as u32)
304 .unwrap_or(0);
305 (entry.name.clone(), members)
306 };
307 drop(map);
308
309 let no_failed: &[String] = &[];
310 let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
313 let event = cloud_event_v1_formation_failed(
314 &id.to_string(),
315 &now_rfc3339,
316 &id.to_string(),
317 &name,
318 cell_count,
319 no_failed,
320 Some("deleted by operator"),
321 );
322 let subject = format!("cellos.events.formations.{id}.failed");
323 publish_event(&state, &subject, event).await;
324
325 Ok(StatusCode::NO_CONTENT)
326}
327
328#[derive(Debug, Deserialize)]
333pub struct StatusTransition {
334 pub state: String,
335 pub reason: Option<String>,
336 pub failed_cells: Option<Vec<String>>,
337}
338
339pub async fn update_formation_status(
340 State(state): State<AppState>,
341 headers: HeaderMap,
342 Path(id): Path<Uuid>,
343 Json(body): Json<StatusTransition>,
344) -> Result<StatusCode, AppError> {
345 require_bearer(&headers, &state.api_token)?;
346
347 let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
361
362 let (new_status, name, cell_count, failed) = {
363 let mut map = state.formations.write().await;
364 let entry = map
365 .get_mut(&id)
366 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
367
368 let new_status = match body.state.to_uppercase().as_str() {
369 "RUNNING" | "LAUNCHING" => FormationStatus::Running,
370 "DEGRADED" => FormationStatus::Running, "COMPLETED" => FormationStatus::Succeeded,
372 "FAILED" => FormationStatus::Failed,
373 other => {
374 return Err(AppError::new(
380 AppErrorKind::BadRequest,
381 format!("unknown state: {other}"),
382 ));
383 }
384 };
385 entry.status = new_status;
386
387 let members = entry
388 .document
389 .get("members")
390 .and_then(|m| m.as_array())
391 .map(|a| a.len() as u32)
392 .unwrap_or(0);
393 let failed = body.failed_cells.unwrap_or_default();
394 (new_status, entry.name.clone(), members, failed)
395 };
396
397 let sid = id.to_string();
398 let reason = body.reason.as_deref();
399 let empty: &[String] = &[];
400 let (event, phase) = match body.state.to_uppercase().as_str() {
409 "LAUNCHING" => (
410 cloud_event_v1_formation_launching(
411 &sid,
412 &now_rfc3339,
413 &sid,
414 &name,
415 cell_count,
416 empty,
417 reason,
418 ),
419 "launching",
420 ),
421 "RUNNING" => (
422 cloud_event_v1_formation_running(
423 &sid,
424 &now_rfc3339,
425 &sid,
426 &name,
427 cell_count,
428 empty,
429 reason,
430 ),
431 "running",
432 ),
433 "DEGRADED" => (
434 cloud_event_v1_formation_degraded(
435 &sid,
436 &now_rfc3339,
437 &sid,
438 &name,
439 cell_count,
440 &failed,
441 reason,
442 ),
443 "degraded",
444 ),
445 "COMPLETED" => (
446 cloud_event_v1_formation_completed(
447 &sid,
448 &now_rfc3339,
449 &sid,
450 &name,
451 cell_count,
452 empty,
453 reason,
454 ),
455 "completed",
456 ),
457 _ => (
458 cloud_event_v1_formation_failed(
459 &sid,
460 &now_rfc3339,
461 &sid,
462 &name,
463 cell_count,
464 &failed,
465 reason,
466 ),
467 "failed",
468 ),
469 };
470
471 let subject = format!("cellos.events.formations.{id}.{phase}");
472 publish_event(&state, &subject, event).await;
473
474 let _ = new_status; Ok(StatusCode::NO_CONTENT)
476}
477
478async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
482 let Some(nats) = &state.nats else { return };
483 let payload = match serde_json::to_vec(&event) {
484 Ok(b) => b,
485 Err(e) => {
486 tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
487 return;
488 }
489 };
490 if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
491 tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
492 }
493}
494
495fn normalize_formation_document(raw: &serde_json::Value) -> Result<serde_json::Value, AppError> {
529 let Some(obj) = raw.as_object() else {
538 return Ok(raw.clone());
541 };
542
543 const FLAT_KEYS: &[&str] = &["name", "coordinator", "members"];
544 const KUBECTL_KEYS: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
545
546 const KUBECTL_ALLOWED: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
557
558 let flat_keys_present: Vec<&str> = FLAT_KEYS
559 .iter()
560 .copied()
561 .filter(|k| obj.contains_key(*k))
562 .collect();
563 let kubectl_keys_present: Vec<&str> = KUBECTL_KEYS
564 .iter()
565 .copied()
566 .filter(|k| obj.contains_key(*k))
567 .collect();
568
569 let has_flat = !flat_keys_present.is_empty();
570 let has_kubectl = !kubectl_keys_present.is_empty();
571
572 if has_flat && has_kubectl {
573 return Err(AppError::bad_request(format!(
574 "hybrid formation document: top-level flat field(s) {flat:?} \
575 conflict with kubectl-style envelope field(s) {kubectl:?}; \
576 pick exactly one shape (see contracts/schemas/formation-v1.schema.json)",
577 flat = flat_keys_present,
578 kubectl = kubectl_keys_present,
579 )));
580 }
581
582 if !has_kubectl {
583 return Ok(raw.clone());
586 }
587
588 let unknown_top_level: Vec<&str> = obj
592 .keys()
593 .map(|s| s.as_str())
594 .filter(|k| !KUBECTL_ALLOWED.contains(k))
595 .collect();
596 if !unknown_top_level.is_empty() {
597 return Err(AppError::bad_request(format!(
598 "kubectl-style formation: unknown top-level field(s) {unknown_top_level:?}; \
599 allowed: {KUBECTL_ALLOWED:?}",
600 )));
601 }
602
603 let api_version = obj
605 .get("apiVersion")
606 .and_then(|v| v.as_str())
607 .ok_or_else(|| {
608 AppError::bad_request(
609 "kubectl-style formation: missing or non-string 'apiVersion' (expected \"cellos.dev/v1\")"
610 .to_string(),
611 )
612 })?;
613 if api_version != "cellos.dev/v1" {
614 return Err(AppError::bad_request(format!(
615 "kubectl-style formation: unsupported apiVersion '{api_version}' (expected \"cellos.dev/v1\")"
616 )));
617 }
618
619 let kind = obj.get("kind").and_then(|v| v.as_str()).ok_or_else(|| {
620 AppError::bad_request(
621 "kubectl-style formation: missing or non-string 'kind' (expected \"Formation\")"
622 .to_string(),
623 )
624 })?;
625 if kind != "Formation" {
626 return Err(AppError::bad_request(format!(
627 "kubectl-style formation: unsupported kind '{kind}' (expected \"Formation\")"
628 )));
629 }
630
631 let metadata = obj
632 .get("metadata")
633 .and_then(|v| v.as_object())
634 .ok_or_else(|| {
635 AppError::bad_request("kubectl-style formation: missing 'metadata' object".to_string())
636 })?;
637 let name = metadata
638 .get("name")
639 .and_then(|v| v.as_str())
640 .ok_or_else(|| {
641 AppError::bad_request("kubectl-style formation: missing 'metadata.name'".to_string())
642 })?;
643
644 let spec = obj.get("spec").and_then(|v| v.as_object()).ok_or_else(|| {
645 AppError::bad_request("kubectl-style formation: missing 'spec' object".to_string())
646 })?;
647
648 let coordinator = spec
649 .get("coordinator")
650 .and_then(|v| v.as_str())
651 .ok_or_else(|| {
652 AppError::bad_request("kubectl-style formation: missing 'spec.coordinator'".to_string())
653 })?;
654
655 let members_raw = spec
656 .get("members")
657 .and_then(|v| v.as_array())
658 .ok_or_else(|| {
659 AppError::bad_request(
660 "kubectl-style formation: missing or non-array 'spec.members'".to_string(),
661 )
662 })?;
663
664 let mut members_flat = Vec::with_capacity(members_raw.len());
670 for (idx, m) in members_raw.iter().enumerate() {
671 let m_obj = m.as_object().ok_or_else(|| {
672 AppError::bad_request(format!(
673 "kubectl-style formation: spec.members[{idx}] is not an object"
674 ))
675 })?;
676 let member_name = m_obj.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
677 AppError::bad_request(format!(
678 "kubectl-style formation: spec.members[{idx}] missing 'name'"
679 ))
680 })?;
681
682 if m_obj.contains_key("id") {
692 return Err(AppError::bad_request(format!(
693 "kubectl-style formation: spec.members[{idx}] declares both 'name' \
694 and 'id'; kubectl manifests address members by 'name' only — \
695 remove the 'id' field"
696 )));
697 }
698
699 let mut rewritten = serde_json::Map::with_capacity(m_obj.len());
700 rewritten.insert(
701 "id".to_string(),
702 serde_json::Value::String(member_name.to_string()),
703 );
704 for (k, v) in m_obj.iter() {
705 if k == "name" {
706 continue; }
708 rewritten.insert(k.clone(), v.clone());
711 }
712 members_flat.push(serde_json::Value::Object(rewritten));
713 }
714
715 let mut flat_metadata = serde_json::Map::new();
724 if let Some(labels) = metadata.get("labels") {
725 if !labels.is_object() {
731 return Err(AppError::bad_request(
732 "kubectl-style formation: 'metadata.labels' must be an object",
733 ));
734 }
735 flat_metadata.insert("labels".to_string(), labels.clone());
736 }
737 if let Some(annotations) = metadata.get("annotations") {
738 if !annotations.is_object() {
739 return Err(AppError::bad_request(
740 "kubectl-style formation: 'metadata.annotations' must be an object",
741 ));
742 }
743 flat_metadata.insert("annotations".to_string(), annotations.clone());
744 }
745
746 let mut flat = serde_json::Map::with_capacity(4);
747 flat.insert(
748 "name".to_string(),
749 serde_json::Value::String(name.to_string()),
750 );
751 flat.insert(
752 "coordinator".to_string(),
753 serde_json::Value::String(coordinator.to_string()),
754 );
755 flat.insert(
756 "members".to_string(),
757 serde_json::Value::Array(members_flat),
758 );
759 if !flat_metadata.is_empty() {
760 flat.insert(
761 "metadata".to_string(),
762 serde_json::Value::Object(flat_metadata),
763 );
764 }
765
766 Ok(serde_json::Value::Object(flat))
767}
768
769fn validate_formation_name(name: &str) -> Result<(), AppError> {
792 if name.is_empty() {
794 return Err(AppError::bad_request(
795 "formation name must not be empty".to_string(),
796 ));
797 }
798 if name.len() > 253 {
799 return Err(AppError::bad_request(format!(
800 "formation name length {} exceeds maximum of 253 bytes",
801 name.len()
802 )));
803 }
804
805 if name == "." || name == ".." {
807 return Err(AppError::bad_request(format!(
808 "formation name '{name}' is reserved"
809 )));
810 }
811
812 for (idx, b) in name.as_bytes().iter().enumerate() {
816 let ok = matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.');
817 if !ok {
818 let rendered = if b.is_ascii_graphic() {
822 format!("'{}'", *b as char)
823 } else {
824 format!("\\x{b:02x}")
825 };
826 return Err(AppError::bad_request(format!(
827 "formation name contains disallowed character {rendered} at byte offset {idx} \
828 (allowed: A-Z a-z 0-9 . - _)"
829 )));
830 }
831 }
832
833 let first = name.as_bytes()[0];
835 let last = name.as_bytes()[name.len() - 1];
836 if first == b'-' || first == b'.' {
837 return Err(AppError::bad_request(format!(
838 "formation name '{name}' must not start with '-' or '.'"
839 )));
840 }
841 if last == b'-' || last == b'.' {
842 return Err(AppError::bad_request(format!(
843 "formation name '{name}' must not end with '-' or '.'"
844 )));
845 }
846
847 Ok(())
848}
849
850fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
873 use std::collections::{HashMap, HashSet};
874
875 let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
877 if !coord_present {
878 return Err(AppError::new(
879 AppErrorKind::FormationNoCoordinator,
880 format!(
881 "coordinator '{}' must appear in members list",
882 doc.coordinator
883 ),
884 ));
885 }
886
887 let mut seen: HashSet<&str> = HashSet::new();
899 for m in &doc.members {
900 if !seen.insert(m.id.as_str()) {
901 return Err(AppError::new(
902 AppErrorKind::FormationDuplicateMemberId,
903 format!("duplicate member id '{}'", m.id),
904 ));
905 }
906 }
907
908 for m in &doc.members {
913 let is_coord = m.id == doc.coordinator;
914 match (is_coord, &m.authorized_by) {
915 (true, Some(_)) => {
916 return Err(AppError::new(
917 AppErrorKind::FormationAuthorityNotNarrowing,
918 format!("coordinator '{}' must not declare authorizedBy", m.id),
919 ));
920 }
921 (false, None) => {
922 return Err(AppError::new(
923 AppErrorKind::FormationAuthorityNotNarrowing,
924 format!("non-coordinator member '{}' missing authorizedBy", m.id),
925 ));
926 }
927 (false, Some(parent)) => {
928 if !seen.contains(parent.as_str()) {
929 return Err(AppError::new(
930 AppErrorKind::FormationAuthorityNotNarrowing,
931 format!("member '{}' references unknown parent '{}'", m.id, parent),
932 ));
933 }
934 }
935 _ => {}
936 }
937 }
938
939 let parent: HashMap<&str, &str> = doc
944 .members
945 .iter()
946 .filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
947 .collect();
948
949 for m in &doc.members {
950 if m.id == doc.coordinator {
951 continue;
952 }
953 let mut cursor = m.id.as_str();
954 for _ in 0..doc.members.len() {
955 let Some(&p) = parent.get(cursor) else {
956 break;
959 };
960 if p == m.id {
961 return Err(AppError::new(
962 AppErrorKind::FormationCycle,
963 format!("authorizedBy cycle detected involving member '{}'", m.id),
964 ));
965 }
966 cursor = p;
967 }
968 if parent.contains_key(cursor) {
969 return Err(AppError::new(
973 AppErrorKind::FormationCycle,
974 format!(
975 "authorizedBy cycle detected on chain starting at '{}'",
976 m.id
977 ),
978 ));
979 }
980 }
981
982 Ok(())
983}
984
985#[cfg(test)]
986mod tests {
987 use super::*;
988 use crate::router;
989 use axum::body::Body;
990 use axum::http::{header, Request};
991 use http_body_util::BodyExt;
992 use tower::ServiceExt;
993
994 const TOKEN: &str = "test-token";
995
996 fn test_state() -> AppState {
997 AppState::new(None, TOKEN)
998 }
999
1000 fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
1001 let mut b = Request::builder()
1002 .method(method)
1003 .uri(uri)
1004 .header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
1005 if body.is_some() {
1006 b = b.header(header::CONTENT_TYPE, "application/json");
1007 }
1008 b.body(
1009 body.map(|s| Body::from(s.to_owned()))
1010 .unwrap_or_else(Body::empty),
1011 )
1012 .expect("build request")
1013 }
1014
1015 #[tokio::test]
1016 async fn post_valid_formation_returns_201() {
1017 let app = router(test_state());
1018 let body = serde_json::json!({
1019 "name": "demo",
1020 "coordinator": "coord",
1021 "members": [
1022 { "id": "coord" },
1023 { "id": "worker-a", "authorizedBy": "coord" },
1024 { "id": "worker-b", "authorizedBy": "coord" }
1025 ]
1026 })
1027 .to_string();
1028
1029 let resp = app
1030 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1031 .await
1032 .expect("router response");
1033 assert_eq!(resp.status(), StatusCode::CREATED);
1034
1035 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1036 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1037 assert_eq!(parsed["status"], "PENDING");
1038 assert_eq!(parsed["name"], "demo");
1039 assert!(parsed["id"].as_str().is_some());
1040 }
1041
1042 #[tokio::test]
1043 async fn post_formation_missing_coordinator_returns_400() {
1044 let app = router(test_state());
1045 let body = serde_json::json!({
1047 "name": "demo",
1048 "coordinator": "coord",
1049 "members": [
1050 { "id": "worker-a", "authorizedBy": "coord" }
1051 ]
1052 })
1053 .to_string();
1054
1055 let resp = app
1056 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1057 .await
1058 .expect("router response");
1059 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1060 let ct = resp
1061 .headers()
1062 .get(header::CONTENT_TYPE)
1063 .and_then(|v| v.to_str().ok())
1064 .unwrap_or_default()
1065 .to_owned();
1066 assert!(
1067 ct.starts_with("application/problem+json"),
1068 "expected RFC 9457 media type, got {ct:?}"
1069 );
1070 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1071 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1072 assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
1073 }
1074
1075 #[tokio::test]
1076 async fn post_formation_member_missing_authorized_by_returns_400() {
1077 let app = router(test_state());
1078 let body = serde_json::json!({
1079 "name": "demo",
1080 "coordinator": "coord",
1081 "members": [
1082 { "id": "coord" },
1083 { "id": "worker-a" } ]
1085 })
1086 .to_string();
1087
1088 let resp = app
1089 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1090 .await
1091 .expect("router response");
1092 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1093 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1094 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1095 assert_eq!(
1096 parsed["type"], "/problems/formation/authority-not-narrowing",
1097 "expected authority-not-narrowing discriminant, got {parsed}"
1098 );
1099 }
1100
1101 #[tokio::test]
1102 async fn get_formations_returns_snapshot_with_cursor() {
1103 let app = router(test_state());
1105 let resp = app
1106 .oneshot(auth_req("GET", "/v1/formations", None))
1107 .await
1108 .expect("router response");
1109 assert_eq!(resp.status(), StatusCode::OK);
1110 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1111 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1112 assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
1113 let arr = parsed["formations"].as_array().expect("formations array");
1114 assert_eq!(arr.len(), 0);
1115 assert!(
1116 parsed["cursor"].is_u64(),
1117 "cursor field must be an unsigned integer, got {}",
1118 parsed["cursor"]
1119 );
1120 assert_eq!(parsed["cursor"].as_u64(), Some(0));
1121 }
1122
1123 #[tokio::test]
1124 async fn snapshot_returns_cursor() {
1125 let app = router(test_state());
1129 let body = serde_json::json!({
1130 "name": "with-cursor",
1131 "coordinator": "coord",
1132 "members": [
1133 { "id": "coord" },
1134 { "id": "worker-a", "authorizedBy": "coord" }
1135 ]
1136 })
1137 .to_string();
1138
1139 let resp = app
1140 .clone()
1141 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1142 .await
1143 .expect("router response");
1144 assert_eq!(resp.status(), StatusCode::CREATED);
1145
1146 let resp = app
1147 .oneshot(auth_req("GET", "/v1/formations", None))
1148 .await
1149 .expect("router response");
1150 assert_eq!(resp.status(), StatusCode::OK);
1151 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1152 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1153 assert!(
1154 parsed["cursor"].is_u64(),
1155 "cursor must be unsigned integer; got {}",
1156 parsed["cursor"]
1157 );
1158 let formations = parsed["formations"].as_array().expect("formations array");
1159 assert_eq!(formations.len(), 1, "expected 1 formation after POST");
1160 assert_eq!(formations[0]["name"], "with-cursor");
1161 }
1162
1163 #[tokio::test]
1164 async fn missing_bearer_returns_401() {
1165 let app = router(test_state());
1166 let resp = app
1167 .oneshot(
1168 Request::builder()
1169 .method("GET")
1170 .uri("/v1/formations")
1171 .body(Body::empty())
1172 .unwrap(),
1173 )
1174 .await
1175 .expect("router response");
1176 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
1177 }
1178
1179 #[tokio::test]
1192 async fn rejects_duplicate_member_ids_with_dedicated_type() {
1193 let app = router(test_state());
1194 let body = serde_json::json!({
1195 "name": "dup-ids",
1196 "coordinator": "coord",
1197 "members": [
1198 { "id": "coord" },
1199 { "id": "coord", "authorizedBy": "coord" }
1200 ]
1201 })
1202 .to_string();
1203 let resp = app
1204 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1205 .await
1206 .expect("router response");
1207 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1208 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1209 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1210 assert_eq!(
1211 parsed["type"], "/problems/formation/duplicate-member-id",
1212 "duplicate member ids must surface the dedicated discriminant"
1213 );
1214 let detail = parsed["detail"].as_str().unwrap_or("");
1217 assert!(
1218 detail.contains("coord"),
1219 "detail must name the duplicate id, got: {detail}",
1220 );
1221 }
1222
1223 #[tokio::test]
1226 async fn rejects_self_authorized_cycle() {
1227 let app = router(test_state());
1228 let body = serde_json::json!({
1229 "name": "self-cycle",
1230 "coordinator": "coord",
1231 "members": [
1232 { "id": "coord" },
1233 { "id": "worker-a", "authorizedBy": "worker-a" }
1234 ]
1235 })
1236 .to_string();
1237 let resp = app
1238 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1239 .await
1240 .expect("router response");
1241 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1242 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1243 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1244 assert_eq!(parsed["type"], "/problems/formation/cycle");
1245 }
1246
1247 #[tokio::test]
1249 async fn rejects_two_node_cycle() {
1250 let app = router(test_state());
1251 let body = serde_json::json!({
1252 "name": "two-cycle",
1253 "coordinator": "coord",
1254 "members": [
1255 { "id": "coord" },
1256 { "id": "a", "authorizedBy": "b" },
1257 { "id": "b", "authorizedBy": "a" }
1258 ]
1259 })
1260 .to_string();
1261 let resp = app
1262 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1263 .await
1264 .expect("router response");
1265 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1266 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1267 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1268 assert_eq!(parsed["type"], "/problems/formation/cycle");
1269 }
1270
1271 #[tokio::test]
1275 async fn rejects_orphan_parent_reference() {
1276 let app = router(test_state());
1277 let body = serde_json::json!({
1278 "name": "orphan-parent",
1279 "coordinator": "coord",
1280 "members": [
1281 { "id": "coord" },
1282 { "id": "worker-a", "authorizedBy": "ghost" }
1283 ]
1284 })
1285 .to_string();
1286 let resp = app
1287 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1288 .await
1289 .expect("router response");
1290 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1291 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1292 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1293 assert_eq!(
1294 parsed["type"],
1295 "/problems/formation/authority-not-narrowing"
1296 );
1297 }
1298
1299 #[tokio::test]
1303 async fn post_formation_oversized_body_returns_413() {
1304 let app = router(test_state());
1305 let big = "x".repeat(70 * 1024);
1310 let body =
1311 format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
1312 let resp = app
1313 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1314 .await
1315 .expect("router response");
1316 assert_eq!(
1317 resp.status(),
1318 StatusCode::PAYLOAD_TOO_LARGE,
1319 "oversized body must surface 413; got {:?}",
1320 resp.status(),
1321 );
1322 }
1323
1324 #[tokio::test]
1336 async fn get_formation_by_id_captures_path() {
1337 let state = test_state();
1338 let body = serde_json::json!({
1339 "name": "probe",
1340 "coordinator": "coord",
1341 "members": [
1342 { "id": "coord" },
1343 { "id": "worker-a", "authorizedBy": "coord" }
1344 ]
1345 })
1346 .to_string();
1347 let resp = router(state.clone())
1348 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1349 .await
1350 .expect("router response");
1351 assert_eq!(resp.status(), StatusCode::CREATED);
1352 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1353 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1354 let id = parsed["id"].as_str().expect("uuid string");
1355
1356 let resp = router(state)
1357 .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
1358 .await
1359 .expect("router response");
1360 assert_eq!(
1361 resp.status(),
1362 StatusCode::OK,
1363 "GET /v1/formations/<id> must capture the path segment; got {:?}",
1364 resp.status(),
1365 );
1366 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1367 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1368 assert_eq!(parsed["name"], "probe");
1369 }
1370
1371 #[tokio::test]
1376 async fn post_kubectl_style_formation_returns_201() {
1377 let app = router(test_state());
1378 let body = serde_json::json!({
1379 "apiVersion": "cellos.dev/v1",
1380 "kind": "Formation",
1381 "metadata": { "name": "kubectl-demo" },
1382 "spec": {
1383 "coordinator": "coord",
1384 "members": [
1385 { "name": "coord" },
1386 { "name": "worker-a", "authorizedBy": "coord" },
1387 { "name": "worker-b", "authorizedBy": "coord" }
1388 ]
1389 }
1390 })
1391 .to_string();
1392
1393 let resp = app
1394 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1395 .await
1396 .expect("router response");
1397 assert_eq!(resp.status(), StatusCode::CREATED);
1398
1399 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1400 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1401 assert_eq!(parsed["status"], "PENDING");
1402 assert_eq!(parsed["name"], "kubectl-demo");
1403 assert!(parsed["id"].as_str().is_some());
1404 }
1405
1406 #[tokio::test]
1411 async fn post_kubectl_style_missing_coordinator_returns_no_coordinator() {
1412 let app = router(test_state());
1413 let body = serde_json::json!({
1414 "apiVersion": "cellos.dev/v1",
1415 "kind": "Formation",
1416 "metadata": { "name": "missing-coord" },
1417 "spec": {
1418 "coordinator": "coord",
1419 "members": [
1420 { "name": "worker-a", "authorizedBy": "coord" }
1421 ]
1422 }
1423 })
1424 .to_string();
1425
1426 let resp = app
1427 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1428 .await
1429 .expect("router response");
1430 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1431 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1432 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1433 assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
1434 }
1435
1436 #[tokio::test]
1440 async fn post_hybrid_formation_returns_400_bad_request() {
1441 let app = router(test_state());
1442 let body = serde_json::json!({
1443 "apiVersion": "cellos.dev/v1",
1444 "kind": "Formation",
1445 "metadata": { "name": "hybrid" },
1446 "spec": {
1447 "coordinator": "coord",
1448 "members": [ { "name": "coord" } ]
1449 },
1450 "name": "hybrid",
1452 "members": [ { "id": "coord" } ]
1453 })
1454 .to_string();
1455
1456 let resp = app
1457 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1458 .await
1459 .expect("router response");
1460 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1461 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1462 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1463 assert_eq!(
1464 parsed["type"], "/problems/bad-request",
1465 "hybrid shape must surface a generic bad-request, not an admission discriminant"
1466 );
1467 let detail = parsed["detail"].as_str().unwrap_or_default();
1468 assert!(
1469 detail.contains("hybrid"),
1470 "detail must mention 'hybrid'; got {detail:?}"
1471 );
1472 }
1473
1474 #[tokio::test]
1477 async fn post_kubectl_style_wrong_api_version_returns_400() {
1478 let app = router(test_state());
1479 let body = serde_json::json!({
1480 "apiVersion": "cellos.dev/v2",
1481 "kind": "Formation",
1482 "metadata": { "name": "wrong-api" },
1483 "spec": {
1484 "coordinator": "coord",
1485 "members": [ { "name": "coord" } ]
1486 }
1487 })
1488 .to_string();
1489
1490 let resp = app
1491 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1492 .await
1493 .expect("router response");
1494 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1495 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1496 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1497 assert_eq!(parsed["type"], "/problems/bad-request");
1498 let detail = parsed["detail"].as_str().unwrap_or_default();
1499 assert!(
1500 detail.contains("apiVersion") && detail.contains("cellos.dev/v2"),
1501 "detail must name the bad apiVersion; got {detail:?}"
1502 );
1503 }
1504
1505 #[tokio::test]
1507 async fn post_kubectl_style_wrong_kind_returns_400() {
1508 let app = router(test_state());
1509 let body = serde_json::json!({
1510 "apiVersion": "cellos.dev/v1",
1511 "kind": "Cell",
1512 "metadata": { "name": "wrong-kind" },
1513 "spec": {
1514 "coordinator": "coord",
1515 "members": [ { "name": "coord" } ]
1516 }
1517 })
1518 .to_string();
1519
1520 let resp = app
1521 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1522 .await
1523 .expect("router response");
1524 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1525 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1526 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1527 assert_eq!(parsed["type"], "/problems/bad-request");
1528 let detail = parsed["detail"].as_str().unwrap_or_default();
1529 assert!(
1530 detail.contains("kind") && detail.contains("Cell"),
1531 "detail must name the bad kind; got {detail:?}"
1532 );
1533 }
1534
1535 #[tokio::test]
1539 async fn kubectl_style_post_then_get_returns_normalized_flat_document() {
1540 let state = test_state();
1541 let body = serde_json::json!({
1542 "apiVersion": "cellos.dev/v1",
1543 "kind": "Formation",
1544 "metadata": { "name": "roundtrip" },
1545 "spec": {
1546 "coordinator": "coord",
1547 "members": [
1548 { "name": "coord" },
1549 { "name": "worker-a", "authorizedBy": "coord" }
1550 ]
1551 }
1552 })
1553 .to_string();
1554
1555 let resp = router(state.clone())
1556 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1557 .await
1558 .expect("router response");
1559 assert_eq!(resp.status(), StatusCode::CREATED);
1560 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1561 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1562 let id = parsed["id"].as_str().expect("uuid string");
1563
1564 let resp = router(state)
1565 .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
1566 .await
1567 .expect("router response");
1568 assert_eq!(resp.status(), StatusCode::OK);
1569 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1570 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1571 let doc = &parsed["document"];
1572 assert_eq!(doc["name"], "roundtrip", "flat 'name' present");
1573 assert_eq!(doc["coordinator"], "coord", "flat 'coordinator' present");
1574 let members = doc["members"]
1575 .as_array()
1576 .expect("members array on normalized doc");
1577 assert_eq!(members.len(), 2);
1578 assert_eq!(members[0]["id"], "coord");
1579 assert_eq!(members[1]["id"], "worker-a");
1580 assert_eq!(members[1]["authorizedBy"], "coord");
1581 assert!(
1583 doc.get("apiVersion").is_none(),
1584 "kubectl envelope must not leak into normalized doc"
1585 );
1586 assert!(doc.get("kind").is_none());
1587 assert!(doc.get("metadata").is_none());
1588 assert!(doc.get("spec").is_none());
1589 }
1590
1591 #[tokio::test]
1598 async fn kubectl_member_with_explicit_id_returns_400() {
1599 let app = router(test_state());
1600 let body = serde_json::json!({
1601 "apiVersion": "cellos.dev/v1",
1602 "kind": "Formation",
1603 "metadata": { "name": "rt3-ctl-003-a" },
1604 "spec": {
1605 "coordinator": "alice",
1606 "members": [
1607 { "name": "alice", "id": "bob" },
1612 { "name": "worker-a", "authorizedBy": "alice" }
1613 ]
1614 }
1615 })
1616 .to_string();
1617
1618 let resp = app
1619 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1620 .await
1621 .expect("router response");
1622 assert_eq!(
1623 resp.status(),
1624 StatusCode::BAD_REQUEST,
1625 "manifest with both name+id at member level must be rejected"
1626 );
1627 let ct = resp
1628 .headers()
1629 .get(header::CONTENT_TYPE)
1630 .and_then(|v| v.to_str().ok())
1631 .unwrap_or_default()
1632 .to_owned();
1633 assert!(
1634 ct.starts_with("application/problem+json"),
1635 "expected RFC 9457 media type, got {ct:?}"
1636 );
1637 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1638 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1639 assert_eq!(
1640 parsed["type"], "/problems/bad-request",
1641 "kubectl id-conflict is a generic bad-request, not an ADR-0010 discriminant"
1642 );
1643 let detail = parsed["detail"].as_str().unwrap_or_default();
1644 assert!(
1645 detail.contains("'name'") && detail.contains("'id'"),
1646 "detail must name both conflicting fields; got {detail:?}"
1647 );
1648 }
1649
1650 #[tokio::test]
1657 async fn delete_by_name_with_duplicates_returns_409() {
1658 let state = test_state();
1659
1660 let id_a = Uuid::new_v4();
1665 let id_b = Uuid::new_v4();
1666 {
1667 let mut map = state.formations.write().await;
1668 for id in [id_a, id_b] {
1669 map.insert(
1670 id,
1671 FormationRecord {
1672 id,
1673 name: "rt3-dup".to_string(),
1674 status: FormationStatus::Pending,
1675 document: serde_json::json!({"name": "rt3-dup"}),
1676 },
1677 );
1678 }
1679 }
1680
1681 let resp = router(state.clone())
1682 .oneshot(auth_req("DELETE", "/v1/formations/by-name/rt3-dup", None))
1683 .await
1684 .expect("router response");
1685 assert_eq!(
1686 resp.status(),
1687 StatusCode::CONFLICT,
1688 "duplicate-name DELETE must surface 409, not silently delete"
1689 );
1690
1691 let ct = resp
1692 .headers()
1693 .get(header::CONTENT_TYPE)
1694 .and_then(|v| v.to_str().ok())
1695 .unwrap_or_default()
1696 .to_owned();
1697 assert!(
1698 ct.starts_with("application/problem+json"),
1699 "expected RFC 9457 media type, got {ct:?}"
1700 );
1701 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1702 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1703 assert_eq!(parsed["type"], "/problems/conflict");
1704 let detail = parsed["detail"].as_str().unwrap_or_default();
1705 assert!(
1706 detail.contains(&id_a.to_string()) && detail.contains(&id_b.to_string()),
1707 "detail must list BOTH conflicting UUIDs so the operator can disambiguate; \
1708 got {detail:?}"
1709 );
1710 assert!(
1711 detail.contains("rt3-dup"),
1712 "detail must name the conflicting formation name; got {detail:?}"
1713 );
1714
1715 let map = state.formations.read().await;
1717 assert!(map.contains_key(&id_a), "id_a must still exist after 409");
1718 assert!(map.contains_key(&id_b), "id_b must still exist after 409");
1719 assert_eq!(map.get(&id_a).unwrap().status, FormationStatus::Pending);
1720 assert_eq!(map.get(&id_b).unwrap().status, FormationStatus::Pending);
1721 }
1722
1723 #[tokio::test]
1728 async fn unknown_state_returns_bad_request_problem_type() {
1729 let state = test_state();
1730 let body = serde_json::json!({
1731 "name": "demo",
1732 "coordinator": "coord",
1733 "members": [
1734 { "id": "coord" },
1735 { "id": "worker-a", "authorizedBy": "coord" }
1736 ]
1737 })
1738 .to_string();
1739 let resp = router(state.clone())
1740 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1741 .await
1742 .expect("router response");
1743 assert_eq!(resp.status(), StatusCode::CREATED);
1744 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1745 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1746 let id = parsed["id"].as_str().expect("uuid string").to_owned();
1747
1748 let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
1749 let resp = router(state)
1750 .oneshot(auth_req(
1751 "POST",
1752 &format!("/v1/formations/{id}/status"),
1753 Some(&bad),
1754 ))
1755 .await
1756 .expect("router response");
1757 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1758 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1759 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1760 assert_eq!(
1761 parsed["type"], "/problems/bad-request",
1762 "unknown state must surface generic bad-request, not an ADR-0010 discriminant"
1763 );
1764 }
1765
1766 fn minimal_body(name: &str) -> String {
1779 serde_json::json!({
1780 "name": name,
1781 "coordinator": "coord",
1782 "members": [
1783 { "id": "coord" },
1784 { "id": "worker-a", "authorizedBy": "coord" }
1785 ]
1786 })
1787 .to_string()
1788 }
1789
1790 async fn assert_name_rejected_bad_request(name: &str, expect_in_detail: &str) {
1795 let app = router(test_state());
1796 let body = minimal_body(name);
1797 let resp = app
1798 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1799 .await
1800 .expect("router response");
1801 assert_eq!(
1802 resp.status(),
1803 StatusCode::BAD_REQUEST,
1804 "name {name:?} must be rejected with 400; got {:?}",
1805 resp.status()
1806 );
1807 let ct = resp
1808 .headers()
1809 .get(header::CONTENT_TYPE)
1810 .and_then(|v| v.to_str().ok())
1811 .unwrap_or_default()
1812 .to_owned();
1813 assert!(
1814 ct.starts_with("application/problem+json"),
1815 "name {name:?} must surface RFC 9457 media type; got {ct:?}"
1816 );
1817 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1818 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1819 assert_eq!(
1820 parsed["type"], "/problems/bad-request",
1821 "name {name:?} must surface generic bad-request, not an admission discriminant; got {parsed}"
1822 );
1823 let detail = parsed["detail"].as_str().unwrap_or_default();
1824 assert!(
1825 detail.contains(expect_in_detail),
1826 "detail for {name:?} must contain {expect_in_detail:?}; got {detail:?}"
1827 );
1828 }
1829
1830 #[tokio::test]
1832 async fn rejects_empty_name() {
1833 assert_name_rejected_bad_request("", "empty").await;
1834 }
1835
1836 #[tokio::test]
1839 async fn rejects_whitespace_only_name() {
1840 assert_name_rejected_bad_request(" ", "disallowed character").await;
1841 }
1842
1843 #[tokio::test]
1846 async fn rejects_newline_in_name() {
1847 assert_name_rejected_bad_request("a\nb", "disallowed character").await;
1848 }
1849
1850 #[tokio::test]
1853 async fn rejects_tab_in_name() {
1854 assert_name_rejected_bad_request("a\tb", "disallowed character").await;
1855 }
1856
1857 #[tokio::test]
1860 async fn rejects_nul_byte_in_name() {
1861 assert_name_rejected_bad_request("a\0b", "disallowed character").await;
1862 }
1863
1864 #[tokio::test]
1868 async fn rejects_non_ascii_in_name() {
1869 assert_name_rejected_bad_request("café", "disallowed character").await;
1870 }
1871
1872 #[tokio::test]
1875 async fn rejects_overlong_name() {
1876 let long = "a".repeat(254);
1877 assert_name_rejected_bad_request(&long, "exceeds maximum").await;
1878 }
1879
1880 #[tokio::test]
1883 async fn rejects_leading_hyphen() {
1884 assert_name_rejected_bad_request("-leading", "start with").await;
1885 }
1886
1887 #[tokio::test]
1890 async fn rejects_trailing_dot() {
1891 assert_name_rejected_bad_request("trailing.", "end with").await;
1892 }
1893
1894 #[tokio::test]
1898 async fn rejects_single_dot_reserved_name() {
1899 assert_name_rejected_bad_request(".", "reserved").await;
1900 }
1901
1902 #[tokio::test]
1906 async fn rejects_double_dot_reserved_name() {
1907 let app = router(test_state());
1908 let body = minimal_body("..");
1909 let resp = app
1910 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1911 .await
1912 .expect("router response");
1913 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
1914 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1915 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1916 assert_eq!(parsed["type"], "/problems/bad-request");
1917 }
1918
1919 #[tokio::test]
1921 async fn accepts_simple_lowercase_name() {
1922 let app = router(test_state());
1923 let body = minimal_body("demo");
1924 let resp = app
1925 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1926 .await
1927 .expect("router response");
1928 assert_eq!(resp.status(), StatusCode::CREATED);
1929 }
1930
1931 #[tokio::test]
1934 async fn accepts_hyphenated_name() {
1935 let app = router(test_state());
1936 let body = minimal_body("my-formation-v2");
1937 let resp = app
1938 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1939 .await
1940 .expect("router response");
1941 assert_eq!(resp.status(), StatusCode::CREATED);
1942 }
1943
1944 #[tokio::test]
1947 async fn accepts_dotted_and_underscored_name() {
1948 let app = router(test_state());
1949 let body = minimal_body("team.alpha_one");
1950 let resp = app
1951 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1952 .await
1953 .expect("router response");
1954 assert_eq!(resp.status(), StatusCode::CREATED);
1955 }
1956
1957 #[tokio::test]
1970 async fn duplicate_name_returns_409() {
1971 let state = test_state();
1972 let body = minimal_body("demo");
1973
1974 let resp = router(state.clone())
1976 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1977 .await
1978 .expect("router response");
1979 assert_eq!(
1980 resp.status(),
1981 StatusCode::CREATED,
1982 "first POST must succeed"
1983 );
1984 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
1985 let first: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
1986 let first_id = first["id"]
1987 .as_str()
1988 .expect("first POST returned uuid")
1989 .to_owned();
1990
1991 let resp = router(state.clone())
1993 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
1994 .await
1995 .expect("router response");
1996 assert_eq!(
1997 resp.status(),
1998 StatusCode::CONFLICT,
1999 "duplicate name must surface 409; got {:?}",
2000 resp.status()
2001 );
2002 let ct = resp
2003 .headers()
2004 .get(header::CONTENT_TYPE)
2005 .and_then(|v| v.to_str().ok())
2006 .unwrap_or_default()
2007 .to_owned();
2008 assert!(
2009 ct.starts_with("application/problem+json"),
2010 "409 must use RFC 9457 media type; got {ct:?}"
2011 );
2012 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2013 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2014 assert_eq!(parsed["type"], "/problems/conflict");
2015 let detail = parsed["detail"].as_str().unwrap_or_default();
2016 assert!(
2017 detail.contains(&first_id),
2018 "conflict detail must name the existing UUID {first_id}; got {detail:?}"
2019 );
2020 assert!(
2021 detail.contains("demo"),
2022 "conflict detail must name the conflicting name; got {detail:?}"
2023 );
2024
2025 let resp = router(state)
2028 .oneshot(auth_req("GET", "/v1/formations/by-name/demo", None))
2029 .await
2030 .expect("router response");
2031 assert_eq!(resp.status(), StatusCode::OK);
2032 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2033 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2034 assert_eq!(
2035 parsed["id"].as_str(),
2036 Some(first_id.as_str()),
2037 "first formation must remain addressable by name"
2038 );
2039 }
2040
2041 #[tokio::test]
2047 async fn kubectl_metadata_labels_and_annotations_round_trip() {
2048 let state = test_state();
2049 let body = serde_json::json!({
2050 "apiVersion": "cellos.dev/v1",
2051 "kind": "Formation",
2052 "metadata": {
2053 "name": "labelled",
2054 "labels": {
2055 "app": "celltest",
2056 "tier": "frontend"
2057 },
2058 "annotations": {
2059 "operator": "ryan@cellos.dev",
2060 "audit/ticket": "OPS-1234"
2061 }
2062 },
2063 "spec": {
2064 "coordinator": "coord",
2065 "members": [
2066 { "name": "coord" },
2067 { "name": "worker-a", "authorizedBy": "coord" }
2068 ]
2069 }
2070 })
2071 .to_string();
2072 let resp = router(state.clone())
2073 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
2074 .await
2075 .expect("router response");
2076 assert_eq!(resp.status(), StatusCode::CREATED);
2077
2078 let resp = router(state)
2079 .oneshot(auth_req("GET", "/v1/formations/by-name/labelled", None))
2080 .await
2081 .expect("router response");
2082 assert_eq!(resp.status(), StatusCode::OK);
2083 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2084 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2085
2086 let labels = &parsed["document"]["metadata"]["labels"];
2087 assert_eq!(labels["app"], "celltest", "label 'app' lost in round-trip");
2088 assert_eq!(
2089 labels["tier"], "frontend",
2090 "label 'tier' lost in round-trip"
2091 );
2092
2093 let annotations = &parsed["document"]["metadata"]["annotations"];
2094 assert_eq!(
2095 annotations["operator"], "ryan@cellos.dev",
2096 "annotation 'operator' lost in round-trip"
2097 );
2098 assert_eq!(
2099 annotations["audit/ticket"], "OPS-1234",
2100 "annotation 'audit/ticket' lost in round-trip"
2101 );
2102 }
2103
2104 #[tokio::test]
2110 async fn kubectl_unknown_top_level_field_returns_400() {
2111 let app = router(test_state());
2112 let body = serde_json::json!({
2113 "apiVersion": "cellos.dev/v1",
2114 "kind": "Formation",
2115 "metadata": { "name": "with-status" },
2116 "spec": {
2117 "coordinator": "coord",
2118 "members": [{ "name": "coord" }]
2119 },
2120 "status": {
2121 "phase": "Running",
2122 "observedGeneration": 42
2123 }
2124 })
2125 .to_string();
2126 let resp = app
2127 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
2128 .await
2129 .expect("router response");
2130 assert_eq!(
2131 resp.status(),
2132 StatusCode::BAD_REQUEST,
2133 "unknown top-level 'status' must surface 400; got {:?}",
2134 resp.status(),
2135 );
2136 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2137 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2138 assert_eq!(parsed["type"], "/problems/bad-request");
2139 let detail = parsed["detail"].as_str().unwrap_or_default();
2140 assert!(
2141 detail.contains("status"),
2142 "detail must name the offending field 'status'; got: {detail}",
2143 );
2144 assert!(
2145 detail.contains("unknown top-level"),
2146 "detail must say 'unknown top-level' so the operator knows the failure class; got: {detail}",
2147 );
2148 }
2149
2150 #[tokio::test]
2155 async fn kubectl_non_object_labels_returns_400() {
2156 let app = router(test_state());
2157 let body = serde_json::json!({
2158 "apiVersion": "cellos.dev/v1",
2159 "kind": "Formation",
2160 "metadata": {
2161 "name": "bad-labels",
2162 "labels": "app=celltest"
2163 },
2164 "spec": {
2165 "coordinator": "coord",
2166 "members": [{ "name": "coord" }]
2167 }
2168 })
2169 .to_string();
2170 let resp = app
2171 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
2172 .await
2173 .expect("router response");
2174 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
2175 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
2176 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
2177 let detail = parsed["detail"].as_str().unwrap_or_default();
2178 assert!(
2179 detail.contains("metadata.labels"),
2180 "detail must name 'metadata.labels'; got: {detail}",
2181 );
2182 }
2183
2184 #[test]
2194 fn update_formation_status_captures_time_before_lock() {
2195 let src = include_str!("formations.rs");
2196 let fn_start = src
2205 .find("pub async fn update_formation_status(")
2206 .expect("update_formation_status not found");
2207 let after_fn = &src[fn_start..];
2208 let next_top_level_fn = after_fn[1..]
2211 .find("\nfn ")
2212 .map(|i| i + 1)
2213 .or_else(|| after_fn[1..].find("\nasync fn ").map(|i| i + 1))
2214 .or_else(|| after_fn[1..].find("\npub fn ").map(|i| i + 1))
2215 .or_else(|| after_fn[1..].find("\npub async fn ").map(|i| i + 1))
2216 .unwrap_or(after_fn.len() - 1);
2217 let body = &after_fn[..next_top_level_fn];
2218
2219 let time_capture_pos = body
2220 .find("let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(")
2221 .expect(
2222 "EVT-CONTENT-001-C regression: `let now_rfc3339 = chrono::Utc::now()` not found \
2223 in update_formation_status",
2224 );
2225 let lock_pos = body.find("state.formations.write().await").expect(
2226 "update_formation_status no longer acquires a write lock — test must be updated",
2227 );
2228
2229 assert!(
2230 time_capture_pos < lock_pos,
2231 "EVT-CONTENT-001-C regression: `now_rfc3339` was captured at byte offset \
2232 {time_capture_pos} but the write lock was taken at offset {lock_pos}. \
2233 Move the `chrono::Utc::now().to_rfc3339_opts(...)` call ABOVE the \
2234 `state.formations.write().await` line so CloudEvent `time` reflects \
2235 transition arrival rather than post-commit wall-clock.",
2236 );
2237 }
2238}