use axum::extract::{Path, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::IntoResponse;
use axum::Json;
use cellos_core::events::{
cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::auth::require_bearer;
use crate::error::{AppError, AppErrorKind};
use crate::state::{AppState, FormationRecord, FormationStatus};
#[derive(Debug, Deserialize)]
pub struct FormationDocument {
pub name: String,
pub coordinator: String,
pub members: Vec<FormationMember>,
}
#[derive(Debug, Deserialize)]
pub struct FormationMember {
pub id: String,
#[serde(rename = "authorizedBy")]
pub authorized_by: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct FormationCreated {
pub id: Uuid,
pub name: String,
pub status: FormationStatus,
}
pub async fn create_formation(
State(state): State<AppState>,
headers: HeaderMap,
body: axum::body::Bytes,
) -> Result<impl IntoResponse, AppError> {
require_bearer(&headers, &state.api_token)?;
let raw: serde_json::Value = serde_json::from_slice(&body)?;
let doc: FormationDocument = serde_json::from_value(raw.clone())?;
validate_formation(&doc)?;
let id = Uuid::new_v4();
let record = FormationRecord {
id,
name: doc.name.clone(),
status: FormationStatus::Pending,
document: raw,
};
state.formations.write().await.insert(id, record);
let cell_count = doc.members.len() as u32;
let no_failed: &[String] = &[];
let event = cloud_event_v1_formation_created(
&id.to_string(),
id.to_string().as_str(),
&id.to_string(),
&doc.name,
cell_count,
no_failed,
None,
);
let subject = format!("cellos.events.formations.{id}.created");
publish_event(&state, &subject, event).await;
let body = FormationCreated {
id,
name: doc.name,
status: FormationStatus::Pending,
};
Ok((StatusCode::CREATED, Json(body)))
}
#[derive(Debug, Serialize)]
pub struct FormationsSnapshot {
pub formations: Vec<FormationRecord>,
pub cursor: u64,
}
pub async fn list_formations(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<FormationsSnapshot>, AppError> {
require_bearer(&headers, &state.api_token)?;
let map = state.formations.read().await;
Ok(Json(FormationsSnapshot {
formations: map.values().cloned().collect(),
cursor: state.cursor(),
}))
}
pub async fn get_formation(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> Result<Json<FormationRecord>, AppError> {
require_bearer(&headers, &state.api_token)?;
let map = state.formations.read().await;
map.get(&id)
.cloned()
.map(Json)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
}
pub async fn delete_formation(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> Result<StatusCode, AppError> {
require_bearer(&headers, &state.api_token)?;
let mut map = state.formations.write().await;
let (name, cell_count) = {
let entry = map
.get_mut(&id)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
entry.status = FormationStatus::Cancelled;
let members = entry
.document
.get("members")
.and_then(|m| m.as_array())
.map(|a| a.len() as u32)
.unwrap_or(0);
(entry.name.clone(), members)
};
drop(map);
let no_failed: &[String] = &[];
let event = cloud_event_v1_formation_failed(
&id.to_string(),
id.to_string().as_str(),
&id.to_string(),
&name,
cell_count,
no_failed,
Some("deleted by operator"),
);
let subject = format!("cellos.events.formations.{id}.failed");
publish_event(&state, &subject, event).await;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug, Deserialize)]
pub struct StatusTransition {
pub state: String,
pub reason: Option<String>,
pub failed_cells: Option<Vec<String>>,
}
pub async fn update_formation_status(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
Json(body): Json<StatusTransition>,
) -> Result<StatusCode, AppError> {
require_bearer(&headers, &state.api_token)?;
let (new_status, name, cell_count, failed) = {
let mut map = state.formations.write().await;
let entry = map
.get_mut(&id)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
let new_status = match body.state.to_uppercase().as_str() {
"RUNNING" | "LAUNCHING" => FormationStatus::Running,
"DEGRADED" => FormationStatus::Running, "COMPLETED" => FormationStatus::Succeeded,
"FAILED" => FormationStatus::Failed,
other => {
return Err(AppError::new(
AppErrorKind::BadRequest,
format!("unknown state: {other}"),
));
}
};
entry.status = new_status;
let members = entry
.document
.get("members")
.and_then(|m| m.as_array())
.map(|a| a.len() as u32)
.unwrap_or(0);
let failed = body.failed_cells.unwrap_or_default();
(new_status, entry.name.clone(), members, failed)
};
let sid = id.to_string();
let reason = body.reason.as_deref();
let empty: &[String] = &[];
let (event, phase) = match body.state.to_uppercase().as_str() {
"LAUNCHING" => (
cloud_event_v1_formation_launching(&sid, &sid, &sid, &name, cell_count, empty, reason),
"launching",
),
"RUNNING" => (
cloud_event_v1_formation_running(&sid, &sid, &sid, &name, cell_count, empty, reason),
"running",
),
"DEGRADED" => (
cloud_event_v1_formation_degraded(&sid, &sid, &sid, &name, cell_count, &failed, reason),
"degraded",
),
"COMPLETED" => (
cloud_event_v1_formation_completed(&sid, &sid, &sid, &name, cell_count, empty, reason),
"completed",
),
_ => (
cloud_event_v1_formation_failed(&sid, &sid, &sid, &name, cell_count, &failed, reason),
"failed",
),
};
let subject = format!("cellos.events.formations.{id}.{phase}");
publish_event(&state, &subject, event).await;
let _ = new_status; Ok(StatusCode::NO_CONTENT)
}
async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
let Some(nats) = &state.nats else { return };
let payload = match serde_json::to_vec(&event) {
Ok(b) => b,
Err(e) => {
tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
return;
}
};
if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
}
}
fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
use std::collections::{HashMap, HashSet};
let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
if !coord_present {
return Err(AppError::new(
AppErrorKind::FormationNoCoordinator,
format!(
"coordinator '{}' must appear in members list",
doc.coordinator
),
));
}
let mut seen: HashSet<&str> = HashSet::new();
for m in &doc.members {
if !seen.insert(m.id.as_str()) {
return Err(AppError::new(
AppErrorKind::FormationMultipleCoordinators,
format!("duplicate member id '{}'", m.id),
));
}
}
for m in &doc.members {
let is_coord = m.id == doc.coordinator;
match (is_coord, &m.authorized_by) {
(true, Some(_)) => {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("coordinator '{}' must not declare authorizedBy", m.id),
));
}
(false, None) => {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("non-coordinator member '{}' missing authorizedBy", m.id),
));
}
(false, Some(parent)) => {
if !seen.contains(parent.as_str()) {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("member '{}' references unknown parent '{}'", m.id, parent),
));
}
}
_ => {}
}
}
let parent: HashMap<&str, &str> = doc
.members
.iter()
.filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
.collect();
for m in &doc.members {
if m.id == doc.coordinator {
continue;
}
let mut cursor = m.id.as_str();
for _ in 0..doc.members.len() {
let Some(&p) = parent.get(cursor) else {
break;
};
if p == m.id {
return Err(AppError::new(
AppErrorKind::FormationCycle,
format!("authorizedBy cycle detected involving member '{}'", m.id),
));
}
cursor = p;
}
if parent.contains_key(cursor) {
return Err(AppError::new(
AppErrorKind::FormationCycle,
format!(
"authorizedBy cycle detected on chain starting at '{}'",
m.id
),
));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::router;
use axum::body::Body;
use axum::http::{header, Request};
use http_body_util::BodyExt;
use tower::ServiceExt;
const TOKEN: &str = "test-token";
fn test_state() -> AppState {
AppState::new(None, TOKEN)
}
fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
let mut b = Request::builder()
.method(method)
.uri(uri)
.header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
if body.is_some() {
b = b.header(header::CONTENT_TYPE, "application/json");
}
b.body(
body.map(|s| Body::from(s.to_owned()))
.unwrap_or_else(Body::empty),
)
.expect("build request")
}
#[tokio::test]
async fn post_valid_formation_returns_201() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" },
{ "id": "worker-b", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["status"], "PENDING");
assert_eq!(parsed["name"], "demo");
assert!(parsed["id"].as_str().is_some());
}
#[tokio::test]
async fn post_formation_missing_coordinator_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let ct = resp
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
assert!(
ct.starts_with("application/problem+json"),
"expected RFC 9457 media type, got {ct:?}"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
}
#[tokio::test]
async fn post_formation_member_missing_authorized_by_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a" } ]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/formation/authority-not-narrowing",
"expected authority-not-narrowing discriminant, got {parsed}"
);
}
#[tokio::test]
async fn get_formations_returns_snapshot_with_cursor() {
let app = router(test_state());
let resp = app
.oneshot(auth_req("GET", "/v1/formations", None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
let arr = parsed["formations"].as_array().expect("formations array");
assert_eq!(arr.len(), 0);
assert!(
parsed["cursor"].is_u64(),
"cursor field must be an unsigned integer, got {}",
parsed["cursor"]
);
assert_eq!(parsed["cursor"].as_u64(), Some(0));
}
#[tokio::test]
async fn snapshot_returns_cursor() {
let app = router(test_state());
let body = serde_json::json!({
"name": "with-cursor",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.clone()
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let resp = app
.oneshot(auth_req("GET", "/v1/formations", None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert!(
parsed["cursor"].is_u64(),
"cursor must be unsigned integer; got {}",
parsed["cursor"]
);
let formations = parsed["formations"].as_array().expect("formations array");
assert_eq!(formations.len(), 1, "expected 1 formation after POST");
assert_eq!(formations[0]["name"], "with-cursor");
}
#[tokio::test]
async fn missing_bearer_returns_401() {
let app = router(test_state());
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/v1/formations")
.body(Body::empty())
.unwrap(),
)
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn rejects_duplicate_member_ids_with_multiple_coordinators_type() {
let app = router(test_state());
let body = serde_json::json!({
"name": "dup-ids",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "coord", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/formation/multiple-coordinators",
"duplicate member ids must surface multipleCoordinators"
);
}
#[tokio::test]
async fn rejects_self_authorized_cycle() {
let app = router(test_state());
let body = serde_json::json!({
"name": "self-cycle",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "worker-a" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/cycle");
}
#[tokio::test]
async fn rejects_two_node_cycle() {
let app = router(test_state());
let body = serde_json::json!({
"name": "two-cycle",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "a", "authorizedBy": "b" },
{ "id": "b", "authorizedBy": "a" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/cycle");
}
#[tokio::test]
async fn rejects_orphan_parent_reference() {
let app = router(test_state());
let body = serde_json::json!({
"name": "orphan-parent",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "ghost" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"],
"/problems/formation/authority-not-narrowing"
);
}
#[tokio::test]
async fn post_formation_oversized_body_returns_413() {
let app = router(test_state());
let big = "x".repeat(70 * 1024);
let body =
format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::PAYLOAD_TOO_LARGE,
"oversized body must surface 413; got {:?}",
resp.status(),
);
}
#[tokio::test]
async fn get_formation_by_id_captures_path() {
let state = test_state();
let body = serde_json::json!({
"name": "probe",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let id = parsed["id"].as_str().expect("uuid string");
let resp = router(state)
.oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::OK,
"GET /v1/formations/<id> must capture the path segment; got {:?}",
resp.status(),
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["name"], "probe");
}
#[tokio::test]
async fn unknown_state_returns_bad_request_problem_type() {
let state = test_state();
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let id = parsed["id"].as_str().expect("uuid string").to_owned();
let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
let resp = router(state)
.oneshot(auth_req(
"POST",
&format!("/v1/formations/{id}/status"),
Some(&bad),
))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/bad-request",
"unknown state must surface generic bad-request, not an ADR-0010 discriminant"
);
}
}