use axum::extract::{Path, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::IntoResponse;
use axum::Json;
use cellos_core::events::{
cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::auth::require_bearer;
use crate::error::{AppError, AppErrorKind};
use crate::state::{AppState, FormationRecord, FormationStatus};
#[derive(Debug, Deserialize)]
pub struct FormationDocument {
pub name: String,
pub coordinator: String,
pub members: Vec<FormationMember>,
}
#[derive(Debug, Deserialize)]
pub struct FormationMember {
pub id: String,
#[serde(rename = "authorizedBy")]
pub authorized_by: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct FormationCreated {
pub id: Uuid,
pub name: String,
pub status: FormationStatus,
}
pub async fn create_formation(
State(state): State<AppState>,
headers: HeaderMap,
body: axum::body::Bytes,
) -> Result<impl IntoResponse, AppError> {
require_bearer(&headers, &state.api_token)?;
let raw: serde_json::Value = serde_json::from_slice(&body)?;
let normalized = normalize_formation_document(&raw)?;
let doc: FormationDocument = serde_json::from_value(normalized.clone())?;
validate_formation(&doc)?;
let id = Uuid::new_v4();
let record = FormationRecord {
id,
name: doc.name.clone(),
status: FormationStatus::Pending,
document: normalized,
};
state.formations.write().await.insert(id, record);
let cell_count = doc.members.len() as u32;
let no_failed: &[String] = &[];
let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let event = cloud_event_v1_formation_created(
&id.to_string(),
&now_rfc3339,
&id.to_string(),
&doc.name,
cell_count,
no_failed,
None,
);
let subject = format!("cellos.events.formations.{id}.created");
publish_event(&state, &subject, event).await;
let body = FormationCreated {
id,
name: doc.name,
status: FormationStatus::Pending,
};
Ok((StatusCode::CREATED, Json(body)))
}
#[derive(Debug, Serialize)]
pub struct FormationsSnapshot {
pub formations: Vec<FormationRecord>,
pub cursor: u64,
}
pub async fn list_formations(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<FormationsSnapshot>, AppError> {
require_bearer(&headers, &state.api_token)?;
let map = state.formations.read().await;
Ok(Json(FormationsSnapshot {
formations: map.values().cloned().collect(),
cursor: state.cursor(),
}))
}
pub async fn get_formation(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> Result<Json<FormationRecord>, AppError> {
require_bearer(&headers, &state.api_token)?;
let map = state.formations.read().await;
map.get(&id)
.cloned()
.map(Json)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
}
pub async fn get_formation_by_name(
State(state): State<AppState>,
headers: HeaderMap,
Path(name): Path<String>,
) -> Result<Json<FormationRecord>, AppError> {
require_bearer(&headers, &state.api_token)?;
let map = state.formations.read().await;
map.values()
.find(|r| r.name == name)
.cloned()
.map(Json)
.ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))
}
pub async fn delete_formation_by_name(
State(state): State<AppState>,
headers: HeaderMap,
Path(name): Path<String>,
) -> Result<StatusCode, AppError> {
require_bearer(&headers, &state.api_token)?;
let id = {
let map = state.formations.read().await;
map.values()
.find(|r| r.name == name)
.map(|r| r.id)
.ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))?
};
delete_formation(State(state), headers, Path(id)).await
}
pub async fn delete_formation(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> Result<StatusCode, AppError> {
require_bearer(&headers, &state.api_token)?;
let mut map = state.formations.write().await;
let (name, cell_count) = {
let entry = map
.get_mut(&id)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
entry.status = FormationStatus::Cancelled;
let members = entry
.document
.get("members")
.and_then(|m| m.as_array())
.map(|a| a.len() as u32)
.unwrap_or(0);
(entry.name.clone(), members)
};
drop(map);
let no_failed: &[String] = &[];
let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let event = cloud_event_v1_formation_failed(
&id.to_string(),
&now_rfc3339,
&id.to_string(),
&name,
cell_count,
no_failed,
Some("deleted by operator"),
);
let subject = format!("cellos.events.formations.{id}.failed");
publish_event(&state, &subject, event).await;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug, Deserialize)]
pub struct StatusTransition {
pub state: String,
pub reason: Option<String>,
pub failed_cells: Option<Vec<String>>,
}
pub async fn update_formation_status(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
Json(body): Json<StatusTransition>,
) -> Result<StatusCode, AppError> {
require_bearer(&headers, &state.api_token)?;
let (new_status, name, cell_count, failed) = {
let mut map = state.formations.write().await;
let entry = map
.get_mut(&id)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
let new_status = match body.state.to_uppercase().as_str() {
"RUNNING" | "LAUNCHING" => FormationStatus::Running,
"DEGRADED" => FormationStatus::Running, "COMPLETED" => FormationStatus::Succeeded,
"FAILED" => FormationStatus::Failed,
other => {
return Err(AppError::new(
AppErrorKind::BadRequest,
format!("unknown state: {other}"),
));
}
};
entry.status = new_status;
let members = entry
.document
.get("members")
.and_then(|m| m.as_array())
.map(|a| a.len() as u32)
.unwrap_or(0);
let failed = body.failed_cells.unwrap_or_default();
(new_status, entry.name.clone(), members, failed)
};
let sid = id.to_string();
let reason = body.reason.as_deref();
let empty: &[String] = &[];
let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let (event, phase) = match body.state.to_uppercase().as_str() {
"LAUNCHING" => (
cloud_event_v1_formation_launching(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
empty,
reason,
),
"launching",
),
"RUNNING" => (
cloud_event_v1_formation_running(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
empty,
reason,
),
"running",
),
"DEGRADED" => (
cloud_event_v1_formation_degraded(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
&failed,
reason,
),
"degraded",
),
"COMPLETED" => (
cloud_event_v1_formation_completed(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
empty,
reason,
),
"completed",
),
_ => (
cloud_event_v1_formation_failed(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
&failed,
reason,
),
"failed",
),
};
let subject = format!("cellos.events.formations.{id}.{phase}");
publish_event(&state, &subject, event).await;
let _ = new_status; Ok(StatusCode::NO_CONTENT)
}
async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
let Some(nats) = &state.nats else { return };
let payload = match serde_json::to_vec(&event) {
Ok(b) => b,
Err(e) => {
tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
return;
}
};
if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
}
}
fn normalize_formation_document(raw: &serde_json::Value) -> Result<serde_json::Value, AppError> {
let Some(obj) = raw.as_object() else {
return Ok(raw.clone());
};
const FLAT_KEYS: &[&str] = &["name", "coordinator", "members"];
const KUBECTL_KEYS: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
let flat_keys_present: Vec<&str> = FLAT_KEYS
.iter()
.copied()
.filter(|k| obj.contains_key(*k))
.collect();
let kubectl_keys_present: Vec<&str> = KUBECTL_KEYS
.iter()
.copied()
.filter(|k| obj.contains_key(*k))
.collect();
let has_flat = !flat_keys_present.is_empty();
let has_kubectl = !kubectl_keys_present.is_empty();
if has_flat && has_kubectl {
return Err(AppError::bad_request(format!(
"hybrid formation document: top-level flat field(s) {flat:?} \
conflict with kubectl-style envelope field(s) {kubectl:?}; \
pick exactly one shape (see contracts/schemas/formation-v1.schema.json)",
flat = flat_keys_present,
kubectl = kubectl_keys_present,
)));
}
if !has_kubectl {
return Ok(raw.clone());
}
let api_version = obj
.get("apiVersion")
.and_then(|v| v.as_str())
.ok_or_else(|| {
AppError::bad_request(
"kubectl-style formation: missing or non-string 'apiVersion' (expected \"cellos.dev/v1\")"
.to_string(),
)
})?;
if api_version != "cellos.dev/v1" {
return Err(AppError::bad_request(format!(
"kubectl-style formation: unsupported apiVersion '{api_version}' (expected \"cellos.dev/v1\")"
)));
}
let kind = obj.get("kind").and_then(|v| v.as_str()).ok_or_else(|| {
AppError::bad_request(
"kubectl-style formation: missing or non-string 'kind' (expected \"Formation\")"
.to_string(),
)
})?;
if kind != "Formation" {
return Err(AppError::bad_request(format!(
"kubectl-style formation: unsupported kind '{kind}' (expected \"Formation\")"
)));
}
let metadata = obj
.get("metadata")
.and_then(|v| v.as_object())
.ok_or_else(|| {
AppError::bad_request("kubectl-style formation: missing 'metadata' object".to_string())
})?;
let name = metadata
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| {
AppError::bad_request("kubectl-style formation: missing 'metadata.name'".to_string())
})?;
let spec = obj.get("spec").and_then(|v| v.as_object()).ok_or_else(|| {
AppError::bad_request("kubectl-style formation: missing 'spec' object".to_string())
})?;
let coordinator = spec
.get("coordinator")
.and_then(|v| v.as_str())
.ok_or_else(|| {
AppError::bad_request("kubectl-style formation: missing 'spec.coordinator'".to_string())
})?;
let members_raw = spec
.get("members")
.and_then(|v| v.as_array())
.ok_or_else(|| {
AppError::bad_request(
"kubectl-style formation: missing or non-array 'spec.members'".to_string(),
)
})?;
let mut members_flat = Vec::with_capacity(members_raw.len());
for (idx, m) in members_raw.iter().enumerate() {
let m_obj = m.as_object().ok_or_else(|| {
AppError::bad_request(format!(
"kubectl-style formation: spec.members[{idx}] is not an object"
))
})?;
let member_name = m_obj.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
AppError::bad_request(format!(
"kubectl-style formation: spec.members[{idx}] missing 'name'"
))
})?;
let mut rewritten = serde_json::Map::with_capacity(m_obj.len());
rewritten.insert(
"id".to_string(),
serde_json::Value::String(member_name.to_string()),
);
for (k, v) in m_obj.iter() {
if k == "name" {
continue; }
rewritten.insert(k.clone(), v.clone());
}
members_flat.push(serde_json::Value::Object(rewritten));
}
let mut flat = serde_json::Map::with_capacity(3);
flat.insert(
"name".to_string(),
serde_json::Value::String(name.to_string()),
);
flat.insert(
"coordinator".to_string(),
serde_json::Value::String(coordinator.to_string()),
);
flat.insert(
"members".to_string(),
serde_json::Value::Array(members_flat),
);
Ok(serde_json::Value::Object(flat))
}
fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
use std::collections::{HashMap, HashSet};
let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
if !coord_present {
return Err(AppError::new(
AppErrorKind::FormationNoCoordinator,
format!(
"coordinator '{}' must appear in members list",
doc.coordinator
),
));
}
let mut seen: HashSet<&str> = HashSet::new();
for m in &doc.members {
if !seen.insert(m.id.as_str()) {
return Err(AppError::new(
AppErrorKind::FormationMultipleCoordinators,
format!("duplicate member id '{}'", m.id),
));
}
}
for m in &doc.members {
let is_coord = m.id == doc.coordinator;
match (is_coord, &m.authorized_by) {
(true, Some(_)) => {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("coordinator '{}' must not declare authorizedBy", m.id),
));
}
(false, None) => {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("non-coordinator member '{}' missing authorizedBy", m.id),
));
}
(false, Some(parent)) => {
if !seen.contains(parent.as_str()) {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("member '{}' references unknown parent '{}'", m.id, parent),
));
}
}
_ => {}
}
}
let parent: HashMap<&str, &str> = doc
.members
.iter()
.filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
.collect();
for m in &doc.members {
if m.id == doc.coordinator {
continue;
}
let mut cursor = m.id.as_str();
for _ in 0..doc.members.len() {
let Some(&p) = parent.get(cursor) else {
break;
};
if p == m.id {
return Err(AppError::new(
AppErrorKind::FormationCycle,
format!("authorizedBy cycle detected involving member '{}'", m.id),
));
}
cursor = p;
}
if parent.contains_key(cursor) {
return Err(AppError::new(
AppErrorKind::FormationCycle,
format!(
"authorizedBy cycle detected on chain starting at '{}'",
m.id
),
));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::router;
use axum::body::Body;
use axum::http::{header, Request};
use http_body_util::BodyExt;
use tower::ServiceExt;
const TOKEN: &str = "test-token";
fn test_state() -> AppState {
AppState::new(None, TOKEN)
}
fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
let mut b = Request::builder()
.method(method)
.uri(uri)
.header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
if body.is_some() {
b = b.header(header::CONTENT_TYPE, "application/json");
}
b.body(
body.map(|s| Body::from(s.to_owned()))
.unwrap_or_else(Body::empty),
)
.expect("build request")
}
#[tokio::test]
async fn post_valid_formation_returns_201() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" },
{ "id": "worker-b", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["status"], "PENDING");
assert_eq!(parsed["name"], "demo");
assert!(parsed["id"].as_str().is_some());
}
#[tokio::test]
async fn post_formation_missing_coordinator_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let ct = resp
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
assert!(
ct.starts_with("application/problem+json"),
"expected RFC 9457 media type, got {ct:?}"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
}
#[tokio::test]
async fn post_formation_member_missing_authorized_by_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a" } ]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/formation/authority-not-narrowing",
"expected authority-not-narrowing discriminant, got {parsed}"
);
}
#[tokio::test]
async fn get_formations_returns_snapshot_with_cursor() {
let app = router(test_state());
let resp = app
.oneshot(auth_req("GET", "/v1/formations", None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
let arr = parsed["formations"].as_array().expect("formations array");
assert_eq!(arr.len(), 0);
assert!(
parsed["cursor"].is_u64(),
"cursor field must be an unsigned integer, got {}",
parsed["cursor"]
);
assert_eq!(parsed["cursor"].as_u64(), Some(0));
}
#[tokio::test]
async fn snapshot_returns_cursor() {
let app = router(test_state());
let body = serde_json::json!({
"name": "with-cursor",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.clone()
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let resp = app
.oneshot(auth_req("GET", "/v1/formations", None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert!(
parsed["cursor"].is_u64(),
"cursor must be unsigned integer; got {}",
parsed["cursor"]
);
let formations = parsed["formations"].as_array().expect("formations array");
assert_eq!(formations.len(), 1, "expected 1 formation after POST");
assert_eq!(formations[0]["name"], "with-cursor");
}
#[tokio::test]
async fn missing_bearer_returns_401() {
let app = router(test_state());
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/v1/formations")
.body(Body::empty())
.unwrap(),
)
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn rejects_duplicate_member_ids_with_multiple_coordinators_type() {
let app = router(test_state());
let body = serde_json::json!({
"name": "dup-ids",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "coord", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/formation/multiple-coordinators",
"duplicate member ids must surface multipleCoordinators"
);
}
#[tokio::test]
async fn rejects_self_authorized_cycle() {
let app = router(test_state());
let body = serde_json::json!({
"name": "self-cycle",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "worker-a" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/cycle");
}
#[tokio::test]
async fn rejects_two_node_cycle() {
let app = router(test_state());
let body = serde_json::json!({
"name": "two-cycle",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "a", "authorizedBy": "b" },
{ "id": "b", "authorizedBy": "a" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/cycle");
}
#[tokio::test]
async fn rejects_orphan_parent_reference() {
let app = router(test_state());
let body = serde_json::json!({
"name": "orphan-parent",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "ghost" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"],
"/problems/formation/authority-not-narrowing"
);
}
#[tokio::test]
async fn post_formation_oversized_body_returns_413() {
let app = router(test_state());
let big = "x".repeat(70 * 1024);
let body =
format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::PAYLOAD_TOO_LARGE,
"oversized body must surface 413; got {:?}",
resp.status(),
);
}
#[tokio::test]
async fn get_formation_by_id_captures_path() {
let state = test_state();
let body = serde_json::json!({
"name": "probe",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let id = parsed["id"].as_str().expect("uuid string");
let resp = router(state)
.oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::OK,
"GET /v1/formations/<id> must capture the path segment; got {:?}",
resp.status(),
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["name"], "probe");
}
#[tokio::test]
async fn post_kubectl_style_formation_returns_201() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "kubectl-demo" },
"spec": {
"coordinator": "coord",
"members": [
{ "name": "coord" },
{ "name": "worker-a", "authorizedBy": "coord" },
{ "name": "worker-b", "authorizedBy": "coord" }
]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["status"], "PENDING");
assert_eq!(parsed["name"], "kubectl-demo");
assert!(parsed["id"].as_str().is_some());
}
#[tokio::test]
async fn post_kubectl_style_missing_coordinator_returns_no_coordinator() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "missing-coord" },
"spec": {
"coordinator": "coord",
"members": [
{ "name": "worker-a", "authorizedBy": "coord" }
]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
}
#[tokio::test]
async fn post_hybrid_formation_returns_400_bad_request() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "hybrid" },
"spec": {
"coordinator": "coord",
"members": [ { "name": "coord" } ]
},
"name": "hybrid",
"members": [ { "id": "coord" } ]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/bad-request",
"hybrid shape must surface a generic bad-request, not an admission discriminant"
);
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("hybrid"),
"detail must mention 'hybrid'; got {detail:?}"
);
}
#[tokio::test]
async fn post_kubectl_style_wrong_api_version_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v2",
"kind": "Formation",
"metadata": { "name": "wrong-api" },
"spec": {
"coordinator": "coord",
"members": [ { "name": "coord" } ]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/bad-request");
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("apiVersion") && detail.contains("cellos.dev/v2"),
"detail must name the bad apiVersion; got {detail:?}"
);
}
#[tokio::test]
async fn post_kubectl_style_wrong_kind_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Cell",
"metadata": { "name": "wrong-kind" },
"spec": {
"coordinator": "coord",
"members": [ { "name": "coord" } ]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/bad-request");
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("kind") && detail.contains("Cell"),
"detail must name the bad kind; got {detail:?}"
);
}
#[tokio::test]
async fn kubectl_style_post_then_get_returns_normalized_flat_document() {
let state = test_state();
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "roundtrip" },
"spec": {
"coordinator": "coord",
"members": [
{ "name": "coord" },
{ "name": "worker-a", "authorizedBy": "coord" }
]
}
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let id = parsed["id"].as_str().expect("uuid string");
let resp = router(state)
.oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let doc = &parsed["document"];
assert_eq!(doc["name"], "roundtrip", "flat 'name' present");
assert_eq!(doc["coordinator"], "coord", "flat 'coordinator' present");
let members = doc["members"]
.as_array()
.expect("members array on normalized doc");
assert_eq!(members.len(), 2);
assert_eq!(members[0]["id"], "coord");
assert_eq!(members[1]["id"], "worker-a");
assert_eq!(members[1]["authorizedBy"], "coord");
assert!(
doc.get("apiVersion").is_none(),
"kubectl envelope must not leak into normalized doc"
);
assert!(doc.get("kind").is_none());
assert!(doc.get("metadata").is_none());
assert!(doc.get("spec").is_none());
}
#[tokio::test]
async fn unknown_state_returns_bad_request_problem_type() {
let state = test_state();
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let id = parsed["id"].as_str().expect("uuid string").to_owned();
let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
let resp = router(state)
.oneshot(auth_req(
"POST",
&format!("/v1/formations/{id}/status"),
Some(&bad),
))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/bad-request",
"unknown state must surface generic bad-request, not an ADR-0010 discriminant"
);
}
}