1use axum::extract::{Path, State};
10use axum::http::{HeaderMap, StatusCode};
11use axum::response::IntoResponse;
12use axum::Json;
13use cellos_core::events::{
14 cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
15 cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
16 cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
17};
18use serde::{Deserialize, Serialize};
19use uuid::Uuid;
20
21use crate::auth::require_bearer;
22use crate::error::{AppError, AppErrorKind};
23use crate::state::{AppState, FormationRecord, FormationStatus};
24
25#[derive(Debug, Deserialize)]
29pub struct FormationDocument {
30 pub name: String,
31 pub coordinator: String,
32 pub members: Vec<FormationMember>,
33}
34
35#[derive(Debug, Deserialize)]
36pub struct FormationMember {
37 pub id: String,
38 #[serde(rename = "authorizedBy")]
41 pub authorized_by: Option<String>,
42}
43
44#[derive(Debug, Serialize)]
45pub struct FormationCreated {
46 pub id: Uuid,
47 pub name: String,
48 pub status: FormationStatus,
49}
50
51pub async fn create_formation(
54 State(state): State<AppState>,
55 headers: HeaderMap,
56 body: axum::body::Bytes,
57) -> Result<impl IntoResponse, AppError> {
58 require_bearer(&headers, &state.api_token)?;
59
60 let raw: serde_json::Value = serde_json::from_slice(&body)?;
65 let doc: FormationDocument = serde_json::from_value(raw.clone())?;
66
67 validate_formation(&doc)?;
68
69 let id = Uuid::new_v4();
70 let record = FormationRecord {
71 id,
72 name: doc.name.clone(),
73 status: FormationStatus::Pending,
74 document: raw,
75 };
76
77 state.formations.write().await.insert(id, record);
78
79 let cell_count = doc.members.len() as u32;
81 let no_failed: &[String] = &[];
82 let event = cloud_event_v1_formation_created(
83 &id.to_string(),
84 id.to_string().as_str(),
85 &id.to_string(),
86 &doc.name,
87 cell_count,
88 no_failed,
89 None,
90 );
91 let subject = format!("cellos.events.formations.{id}.created");
92 publish_event(&state, &subject, event).await;
93
94 let body = FormationCreated {
95 id,
96 name: doc.name,
97 status: FormationStatus::Pending,
98 };
99 Ok((StatusCode::CREATED, Json(body)))
100}
101
102#[derive(Debug, Serialize)]
108pub struct FormationsSnapshot {
109 pub formations: Vec<FormationRecord>,
110 pub cursor: u64,
111}
112
113pub async fn list_formations(
116 State(state): State<AppState>,
117 headers: HeaderMap,
118) -> Result<Json<FormationsSnapshot>, AppError> {
119 require_bearer(&headers, &state.api_token)?;
120 let map = state.formations.read().await;
121 Ok(Json(FormationsSnapshot {
122 formations: map.values().cloned().collect(),
123 cursor: state.cursor(),
124 }))
125}
126
127pub async fn get_formation(
129 State(state): State<AppState>,
130 headers: HeaderMap,
131 Path(id): Path<Uuid>,
132) -> Result<Json<FormationRecord>, AppError> {
133 require_bearer(&headers, &state.api_token)?;
134 let map = state.formations.read().await;
135 map.get(&id)
136 .cloned()
137 .map(Json)
138 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
139}
140
141pub async fn delete_formation(
146 State(state): State<AppState>,
147 headers: HeaderMap,
148 Path(id): Path<Uuid>,
149) -> Result<StatusCode, AppError> {
150 require_bearer(&headers, &state.api_token)?;
151 let mut map = state.formations.write().await;
152 let (name, cell_count) = {
153 let entry = map
154 .get_mut(&id)
155 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
156 entry.status = FormationStatus::Cancelled;
157 let members = entry
158 .document
159 .get("members")
160 .and_then(|m| m.as_array())
161 .map(|a| a.len() as u32)
162 .unwrap_or(0);
163 (entry.name.clone(), members)
164 };
165 drop(map);
166
167 let no_failed: &[String] = &[];
168 let event = cloud_event_v1_formation_failed(
169 &id.to_string(),
170 id.to_string().as_str(),
171 &id.to_string(),
172 &name,
173 cell_count,
174 no_failed,
175 Some("deleted by operator"),
176 );
177 let subject = format!("cellos.events.formations.{id}.failed");
178 publish_event(&state, &subject, event).await;
179
180 Ok(StatusCode::NO_CONTENT)
181}
182
183#[derive(Debug, Deserialize)]
188pub struct StatusTransition {
189 pub state: String,
190 pub reason: Option<String>,
191 pub failed_cells: Option<Vec<String>>,
192}
193
194pub async fn update_formation_status(
195 State(state): State<AppState>,
196 headers: HeaderMap,
197 Path(id): Path<Uuid>,
198 Json(body): Json<StatusTransition>,
199) -> Result<StatusCode, AppError> {
200 require_bearer(&headers, &state.api_token)?;
201
202 let (new_status, name, cell_count, failed) = {
203 let mut map = state.formations.write().await;
204 let entry = map
205 .get_mut(&id)
206 .ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
207
208 let new_status = match body.state.to_uppercase().as_str() {
209 "RUNNING" | "LAUNCHING" => FormationStatus::Running,
210 "DEGRADED" => FormationStatus::Running, "COMPLETED" => FormationStatus::Succeeded,
212 "FAILED" => FormationStatus::Failed,
213 other => {
214 return Err(AppError::new(
220 AppErrorKind::BadRequest,
221 format!("unknown state: {other}"),
222 ));
223 }
224 };
225 entry.status = new_status;
226
227 let members = entry
228 .document
229 .get("members")
230 .and_then(|m| m.as_array())
231 .map(|a| a.len() as u32)
232 .unwrap_or(0);
233 let failed = body.failed_cells.unwrap_or_default();
234 (new_status, entry.name.clone(), members, failed)
235 };
236
237 let sid = id.to_string();
238 let reason = body.reason.as_deref();
239 let empty: &[String] = &[];
240
241 let (event, phase) = match body.state.to_uppercase().as_str() {
242 "LAUNCHING" => (
243 cloud_event_v1_formation_launching(&sid, &sid, &sid, &name, cell_count, empty, reason),
244 "launching",
245 ),
246 "RUNNING" => (
247 cloud_event_v1_formation_running(&sid, &sid, &sid, &name, cell_count, empty, reason),
248 "running",
249 ),
250 "DEGRADED" => (
251 cloud_event_v1_formation_degraded(&sid, &sid, &sid, &name, cell_count, &failed, reason),
252 "degraded",
253 ),
254 "COMPLETED" => (
255 cloud_event_v1_formation_completed(&sid, &sid, &sid, &name, cell_count, empty, reason),
256 "completed",
257 ),
258 _ => (
259 cloud_event_v1_formation_failed(&sid, &sid, &sid, &name, cell_count, &failed, reason),
260 "failed",
261 ),
262 };
263
264 let subject = format!("cellos.events.formations.{id}.{phase}");
265 publish_event(&state, &subject, event).await;
266
267 let _ = new_status; Ok(StatusCode::NO_CONTENT)
269}
270
271async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
275 let Some(nats) = &state.nats else { return };
276 let payload = match serde_json::to_vec(&event) {
277 Ok(b) => b,
278 Err(e) => {
279 tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
280 return;
281 }
282 };
283 if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
284 tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
285 }
286}
287
288fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
311 use std::collections::{HashMap, HashSet};
312
313 let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
315 if !coord_present {
316 return Err(AppError::new(
317 AppErrorKind::FormationNoCoordinator,
318 format!(
319 "coordinator '{}' must appear in members list",
320 doc.coordinator
321 ),
322 ));
323 }
324
325 let mut seen: HashSet<&str> = HashSet::new();
330 for m in &doc.members {
331 if !seen.insert(m.id.as_str()) {
332 return Err(AppError::new(
333 AppErrorKind::FormationMultipleCoordinators,
334 format!("duplicate member id '{}'", m.id),
335 ));
336 }
337 }
338
339 for m in &doc.members {
344 let is_coord = m.id == doc.coordinator;
345 match (is_coord, &m.authorized_by) {
346 (true, Some(_)) => {
347 return Err(AppError::new(
348 AppErrorKind::FormationAuthorityNotNarrowing,
349 format!("coordinator '{}' must not declare authorizedBy", m.id),
350 ));
351 }
352 (false, None) => {
353 return Err(AppError::new(
354 AppErrorKind::FormationAuthorityNotNarrowing,
355 format!("non-coordinator member '{}' missing authorizedBy", m.id),
356 ));
357 }
358 (false, Some(parent)) => {
359 if !seen.contains(parent.as_str()) {
360 return Err(AppError::new(
361 AppErrorKind::FormationAuthorityNotNarrowing,
362 format!("member '{}' references unknown parent '{}'", m.id, parent),
363 ));
364 }
365 }
366 _ => {}
367 }
368 }
369
370 let parent: HashMap<&str, &str> = doc
375 .members
376 .iter()
377 .filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
378 .collect();
379
380 for m in &doc.members {
381 if m.id == doc.coordinator {
382 continue;
383 }
384 let mut cursor = m.id.as_str();
385 for _ in 0..doc.members.len() {
386 let Some(&p) = parent.get(cursor) else {
387 break;
390 };
391 if p == m.id {
392 return Err(AppError::new(
393 AppErrorKind::FormationCycle,
394 format!("authorizedBy cycle detected involving member '{}'", m.id),
395 ));
396 }
397 cursor = p;
398 }
399 if parent.contains_key(cursor) {
400 return Err(AppError::new(
404 AppErrorKind::FormationCycle,
405 format!(
406 "authorizedBy cycle detected on chain starting at '{}'",
407 m.id
408 ),
409 ));
410 }
411 }
412
413 Ok(())
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use crate::router;
420 use axum::body::Body;
421 use axum::http::{header, Request};
422 use http_body_util::BodyExt;
423 use tower::ServiceExt;
424
425 const TOKEN: &str = "test-token";
426
427 fn test_state() -> AppState {
428 AppState::new(None, TOKEN)
429 }
430
431 fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
432 let mut b = Request::builder()
433 .method(method)
434 .uri(uri)
435 .header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
436 if body.is_some() {
437 b = b.header(header::CONTENT_TYPE, "application/json");
438 }
439 b.body(
440 body.map(|s| Body::from(s.to_owned()))
441 .unwrap_or_else(Body::empty),
442 )
443 .expect("build request")
444 }
445
446 #[tokio::test]
447 async fn post_valid_formation_returns_201() {
448 let app = router(test_state());
449 let body = serde_json::json!({
450 "name": "demo",
451 "coordinator": "coord",
452 "members": [
453 { "id": "coord" },
454 { "id": "worker-a", "authorizedBy": "coord" },
455 { "id": "worker-b", "authorizedBy": "coord" }
456 ]
457 })
458 .to_string();
459
460 let resp = app
461 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
462 .await
463 .expect("router response");
464 assert_eq!(resp.status(), StatusCode::CREATED);
465
466 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
467 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
468 assert_eq!(parsed["status"], "PENDING");
469 assert_eq!(parsed["name"], "demo");
470 assert!(parsed["id"].as_str().is_some());
471 }
472
473 #[tokio::test]
474 async fn post_formation_missing_coordinator_returns_400() {
475 let app = router(test_state());
476 let body = serde_json::json!({
478 "name": "demo",
479 "coordinator": "coord",
480 "members": [
481 { "id": "worker-a", "authorizedBy": "coord" }
482 ]
483 })
484 .to_string();
485
486 let resp = app
487 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
488 .await
489 .expect("router response");
490 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
491 let ct = resp
492 .headers()
493 .get(header::CONTENT_TYPE)
494 .and_then(|v| v.to_str().ok())
495 .unwrap_or_default()
496 .to_owned();
497 assert!(
498 ct.starts_with("application/problem+json"),
499 "expected RFC 9457 media type, got {ct:?}"
500 );
501 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
502 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
503 assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
504 }
505
506 #[tokio::test]
507 async fn post_formation_member_missing_authorized_by_returns_400() {
508 let app = router(test_state());
509 let body = serde_json::json!({
510 "name": "demo",
511 "coordinator": "coord",
512 "members": [
513 { "id": "coord" },
514 { "id": "worker-a" } ]
516 })
517 .to_string();
518
519 let resp = app
520 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
521 .await
522 .expect("router response");
523 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
524 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
525 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
526 assert_eq!(
527 parsed["type"], "/problems/formation/authority-not-narrowing",
528 "expected authority-not-narrowing discriminant, got {parsed}"
529 );
530 }
531
532 #[tokio::test]
533 async fn get_formations_returns_snapshot_with_cursor() {
534 let app = router(test_state());
536 let resp = app
537 .oneshot(auth_req("GET", "/v1/formations", None))
538 .await
539 .expect("router response");
540 assert_eq!(resp.status(), StatusCode::OK);
541 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
542 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
543 assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
544 let arr = parsed["formations"].as_array().expect("formations array");
545 assert_eq!(arr.len(), 0);
546 assert!(
547 parsed["cursor"].is_u64(),
548 "cursor field must be an unsigned integer, got {}",
549 parsed["cursor"]
550 );
551 assert_eq!(parsed["cursor"].as_u64(), Some(0));
552 }
553
554 #[tokio::test]
555 async fn snapshot_returns_cursor() {
556 let app = router(test_state());
560 let body = serde_json::json!({
561 "name": "with-cursor",
562 "coordinator": "coord",
563 "members": [
564 { "id": "coord" },
565 { "id": "worker-a", "authorizedBy": "coord" }
566 ]
567 })
568 .to_string();
569
570 let resp = app
571 .clone()
572 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
573 .await
574 .expect("router response");
575 assert_eq!(resp.status(), StatusCode::CREATED);
576
577 let resp = app
578 .oneshot(auth_req("GET", "/v1/formations", None))
579 .await
580 .expect("router response");
581 assert_eq!(resp.status(), StatusCode::OK);
582 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
583 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
584 assert!(
585 parsed["cursor"].is_u64(),
586 "cursor must be unsigned integer; got {}",
587 parsed["cursor"]
588 );
589 let formations = parsed["formations"].as_array().expect("formations array");
590 assert_eq!(formations.len(), 1, "expected 1 formation after POST");
591 assert_eq!(formations[0]["name"], "with-cursor");
592 }
593
594 #[tokio::test]
595 async fn missing_bearer_returns_401() {
596 let app = router(test_state());
597 let resp = app
598 .oneshot(
599 Request::builder()
600 .method("GET")
601 .uri("/v1/formations")
602 .body(Body::empty())
603 .unwrap(),
604 )
605 .await
606 .expect("router response");
607 assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
608 }
609
610 #[tokio::test]
616 async fn rejects_duplicate_member_ids_with_multiple_coordinators_type() {
617 let app = router(test_state());
618 let body = serde_json::json!({
619 "name": "dup-ids",
620 "coordinator": "coord",
621 "members": [
622 { "id": "coord" },
623 { "id": "coord", "authorizedBy": "coord" }
624 ]
625 })
626 .to_string();
627 let resp = app
628 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
629 .await
630 .expect("router response");
631 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
632 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
633 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
634 assert_eq!(
635 parsed["type"], "/problems/formation/multiple-coordinators",
636 "duplicate member ids must surface multipleCoordinators"
637 );
638 }
639
640 #[tokio::test]
643 async fn rejects_self_authorized_cycle() {
644 let app = router(test_state());
645 let body = serde_json::json!({
646 "name": "self-cycle",
647 "coordinator": "coord",
648 "members": [
649 { "id": "coord" },
650 { "id": "worker-a", "authorizedBy": "worker-a" }
651 ]
652 })
653 .to_string();
654 let resp = app
655 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
656 .await
657 .expect("router response");
658 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
659 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
660 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
661 assert_eq!(parsed["type"], "/problems/formation/cycle");
662 }
663
664 #[tokio::test]
666 async fn rejects_two_node_cycle() {
667 let app = router(test_state());
668 let body = serde_json::json!({
669 "name": "two-cycle",
670 "coordinator": "coord",
671 "members": [
672 { "id": "coord" },
673 { "id": "a", "authorizedBy": "b" },
674 { "id": "b", "authorizedBy": "a" }
675 ]
676 })
677 .to_string();
678 let resp = app
679 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
680 .await
681 .expect("router response");
682 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
683 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
684 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
685 assert_eq!(parsed["type"], "/problems/formation/cycle");
686 }
687
688 #[tokio::test]
692 async fn rejects_orphan_parent_reference() {
693 let app = router(test_state());
694 let body = serde_json::json!({
695 "name": "orphan-parent",
696 "coordinator": "coord",
697 "members": [
698 { "id": "coord" },
699 { "id": "worker-a", "authorizedBy": "ghost" }
700 ]
701 })
702 .to_string();
703 let resp = app
704 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
705 .await
706 .expect("router response");
707 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
708 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
709 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
710 assert_eq!(
711 parsed["type"],
712 "/problems/formation/authority-not-narrowing"
713 );
714 }
715
716 #[tokio::test]
720 async fn post_formation_oversized_body_returns_413() {
721 let app = router(test_state());
722 let big = "x".repeat(70 * 1024);
727 let body =
728 format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
729 let resp = app
730 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
731 .await
732 .expect("router response");
733 assert_eq!(
734 resp.status(),
735 StatusCode::PAYLOAD_TOO_LARGE,
736 "oversized body must surface 413; got {:?}",
737 resp.status(),
738 );
739 }
740
741 #[tokio::test]
753 async fn get_formation_by_id_captures_path() {
754 let state = test_state();
755 let body = serde_json::json!({
756 "name": "probe",
757 "coordinator": "coord",
758 "members": [
759 { "id": "coord" },
760 { "id": "worker-a", "authorizedBy": "coord" }
761 ]
762 })
763 .to_string();
764 let resp = router(state.clone())
765 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
766 .await
767 .expect("router response");
768 assert_eq!(resp.status(), StatusCode::CREATED);
769 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
770 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
771 let id = parsed["id"].as_str().expect("uuid string");
772
773 let resp = router(state)
774 .oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
775 .await
776 .expect("router response");
777 assert_eq!(
778 resp.status(),
779 StatusCode::OK,
780 "GET /v1/formations/<id> must capture the path segment; got {:?}",
781 resp.status(),
782 );
783 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
784 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
785 assert_eq!(parsed["name"], "probe");
786 }
787
788 #[tokio::test]
793 async fn unknown_state_returns_bad_request_problem_type() {
794 let state = test_state();
795 let body = serde_json::json!({
796 "name": "demo",
797 "coordinator": "coord",
798 "members": [
799 { "id": "coord" },
800 { "id": "worker-a", "authorizedBy": "coord" }
801 ]
802 })
803 .to_string();
804 let resp = router(state.clone())
805 .oneshot(auth_req("POST", "/v1/formations", Some(&body)))
806 .await
807 .expect("router response");
808 assert_eq!(resp.status(), StatusCode::CREATED);
809 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
810 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
811 let id = parsed["id"].as_str().expect("uuid string").to_owned();
812
813 let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
814 let resp = router(state)
815 .oneshot(auth_req(
816 "POST",
817 &format!("/v1/formations/{id}/status"),
818 Some(&bad),
819 ))
820 .await
821 .expect("router response");
822 assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
823 let bytes = resp.into_body().collect().await.unwrap().to_bytes();
824 let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
825 assert_eq!(
826 parsed["type"], "/problems/bad-request",
827 "unknown state must surface generic bad-request, not an ADR-0010 discriminant"
828 );
829 }
830}