use axum::extract::{Path, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::IntoResponse;
use axum::Json;
use cellos_core::events::{
cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::auth::require_bearer;
use crate::error::{AppError, AppErrorKind};
use crate::state::{AppState, FormationRecord, FormationStatus};
#[derive(Debug, Deserialize)]
pub struct FormationDocument {
pub name: String,
pub coordinator: String,
pub members: Vec<FormationMember>,
}
#[derive(Debug, Deserialize)]
pub struct FormationMember {
pub id: String,
#[serde(rename = "authorizedBy")]
pub authorized_by: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct FormationCreated {
pub id: Uuid,
pub name: String,
pub status: FormationStatus,
}
pub async fn create_formation(
State(state): State<AppState>,
headers: HeaderMap,
body: axum::body::Bytes,
) -> Result<impl IntoResponse, AppError> {
require_bearer(&headers, &state.api_token)?;
let raw: serde_json::Value = serde_json::from_slice(&body)?;
let normalized = normalize_formation_document(&raw)?;
let doc: FormationDocument = serde_json::from_value(normalized.clone())?;
validate_formation_name(&doc.name)?;
validate_formation(&doc)?;
let id = Uuid::new_v4();
let record = FormationRecord {
id,
name: doc.name.clone(),
status: FormationStatus::Pending,
document: normalized,
};
{
let mut map = state.formations.write().await;
if let Some(existing) = map.values().find(|r| r.name == doc.name) {
return Err(AppError::new(
AppErrorKind::Conflict,
format!(
"formation name '{}' already in use by {}",
doc.name, existing.id
),
));
}
map.insert(id, record);
}
let cell_count = doc.members.len() as u32;
let no_failed: &[String] = &[];
let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let event = cloud_event_v1_formation_created(
&id.to_string(),
&now_rfc3339,
&id.to_string(),
&doc.name,
cell_count,
no_failed,
None,
);
let subject = format!("cellos.events.formations.{id}.created");
publish_event(&state, &subject, event).await;
let body = FormationCreated {
id,
name: doc.name,
status: FormationStatus::Pending,
};
Ok((StatusCode::CREATED, Json(body)))
}
#[derive(Debug, Serialize)]
pub struct FormationsSnapshot {
pub formations: Vec<FormationRecord>,
pub cursor: u64,
}
pub async fn list_formations(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<FormationsSnapshot>, AppError> {
require_bearer(&headers, &state.api_token)?;
let map = state.formations.read().await;
Ok(Json(FormationsSnapshot {
formations: map.values().cloned().collect(),
cursor: state.cursor(),
}))
}
pub async fn get_formation(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> Result<Json<FormationRecord>, AppError> {
require_bearer(&headers, &state.api_token)?;
let map = state.formations.read().await;
map.get(&id)
.cloned()
.map(Json)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))
}
pub async fn get_formation_by_name(
State(state): State<AppState>,
headers: HeaderMap,
Path(name): Path<String>,
) -> Result<Json<FormationRecord>, AppError> {
require_bearer(&headers, &state.api_token)?;
let map = state.formations.read().await;
map.values()
.find(|r| r.name == name)
.cloned()
.map(Json)
.ok_or_else(|| AppError::not_found(format!("formation '{name}' not found")))
}
pub async fn delete_formation_by_name(
State(state): State<AppState>,
headers: HeaderMap,
Path(name): Path<String>,
) -> Result<StatusCode, AppError> {
require_bearer(&headers, &state.api_token)?;
let id = {
let map = state.formations.read().await;
let matches: Vec<Uuid> = map
.values()
.filter(|r| r.name == name)
.map(|r| r.id)
.collect();
match matches.len() {
0 => return Err(AppError::not_found(format!("formation '{name}' not found"))),
1 => matches[0],
_ => {
let mut ids = matches;
ids.sort();
let id_list = ids
.iter()
.map(Uuid::to_string)
.collect::<Vec<_>>()
.join(", ");
return Err(AppError::new(
AppErrorKind::Conflict,
format!(
"multiple formations share name '{name}': [{id_list}]; \
delete by UUID via /v1/formations/{{id}} to disambiguate"
),
));
}
}
};
delete_formation(State(state), headers, Path(id)).await
}
pub async fn delete_formation(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
) -> Result<StatusCode, AppError> {
require_bearer(&headers, &state.api_token)?;
let mut map = state.formations.write().await;
let (name, cell_count) = {
let entry = map
.get_mut(&id)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
entry.status = FormationStatus::Cancelled;
let members = entry
.document
.get("members")
.and_then(|m| m.as_array())
.map(|a| a.len() as u32)
.unwrap_or(0);
(entry.name.clone(), members)
};
drop(map);
let no_failed: &[String] = &[];
let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let event = cloud_event_v1_formation_failed(
&id.to_string(),
&now_rfc3339,
&id.to_string(),
&name,
cell_count,
no_failed,
Some("deleted by operator"),
);
let subject = format!("cellos.events.formations.{id}.failed");
publish_event(&state, &subject, event).await;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug, Deserialize)]
pub struct StatusTransition {
pub state: String,
pub reason: Option<String>,
pub failed_cells: Option<Vec<String>>,
}
pub async fn update_formation_status(
State(state): State<AppState>,
headers: HeaderMap,
Path(id): Path<Uuid>,
Json(body): Json<StatusTransition>,
) -> Result<StatusCode, AppError> {
require_bearer(&headers, &state.api_token)?;
let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let (new_status, name, cell_count, failed) = {
let mut map = state.formations.write().await;
let entry = map
.get_mut(&id)
.ok_or_else(|| AppError::not_found(format!("formation {id} not found")))?;
let new_status = match body.state.to_uppercase().as_str() {
"RUNNING" | "LAUNCHING" => FormationStatus::Running,
"DEGRADED" => FormationStatus::Running, "COMPLETED" => FormationStatus::Succeeded,
"FAILED" => FormationStatus::Failed,
other => {
return Err(AppError::new(
AppErrorKind::BadRequest,
format!("unknown state: {other}"),
));
}
};
entry.status = new_status;
let members = entry
.document
.get("members")
.and_then(|m| m.as_array())
.map(|a| a.len() as u32)
.unwrap_or(0);
let failed = body.failed_cells.unwrap_or_default();
(new_status, entry.name.clone(), members, failed)
};
let sid = id.to_string();
let reason = body.reason.as_deref();
let empty: &[String] = &[];
let (event, phase) = match body.state.to_uppercase().as_str() {
"LAUNCHING" => (
cloud_event_v1_formation_launching(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
empty,
reason,
),
"launching",
),
"RUNNING" => (
cloud_event_v1_formation_running(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
empty,
reason,
),
"running",
),
"DEGRADED" => (
cloud_event_v1_formation_degraded(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
&failed,
reason,
),
"degraded",
),
"COMPLETED" => (
cloud_event_v1_formation_completed(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
empty,
reason,
),
"completed",
),
_ => (
cloud_event_v1_formation_failed(
&sid,
&now_rfc3339,
&sid,
&name,
cell_count,
&failed,
reason,
),
"failed",
),
};
let subject = format!("cellos.events.formations.{id}.{phase}");
publish_event(&state, &subject, event).await;
let _ = new_status; Ok(StatusCode::NO_CONTENT)
}
async fn publish_event(state: &AppState, subject: &str, event: impl serde::Serialize) {
let Some(nats) = &state.nats else { return };
let payload = match serde_json::to_vec(&event) {
Ok(b) => b,
Err(e) => {
tracing::warn!(subject, error = %e, "failed to serialise formation CloudEvent");
return;
}
};
if let Err(e) = nats.publish(subject.to_owned(), payload.into()).await {
tracing::warn!(subject, error = %e, "failed to publish formation CloudEvent to NATS");
}
}
fn normalize_formation_document(raw: &serde_json::Value) -> Result<serde_json::Value, AppError> {
let Some(obj) = raw.as_object() else {
return Ok(raw.clone());
};
const FLAT_KEYS: &[&str] = &["name", "coordinator", "members"];
const KUBECTL_KEYS: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
const KUBECTL_ALLOWED: &[&str] = &["apiVersion", "kind", "metadata", "spec"];
let flat_keys_present: Vec<&str> = FLAT_KEYS
.iter()
.copied()
.filter(|k| obj.contains_key(*k))
.collect();
let kubectl_keys_present: Vec<&str> = KUBECTL_KEYS
.iter()
.copied()
.filter(|k| obj.contains_key(*k))
.collect();
let has_flat = !flat_keys_present.is_empty();
let has_kubectl = !kubectl_keys_present.is_empty();
if has_flat && has_kubectl {
return Err(AppError::bad_request(format!(
"hybrid formation document: top-level flat field(s) {flat:?} \
conflict with kubectl-style envelope field(s) {kubectl:?}; \
pick exactly one shape (see contracts/schemas/formation-v1.schema.json)",
flat = flat_keys_present,
kubectl = kubectl_keys_present,
)));
}
if !has_kubectl {
return Ok(raw.clone());
}
let unknown_top_level: Vec<&str> = obj
.keys()
.map(|s| s.as_str())
.filter(|k| !KUBECTL_ALLOWED.contains(k))
.collect();
if !unknown_top_level.is_empty() {
return Err(AppError::bad_request(format!(
"kubectl-style formation: unknown top-level field(s) {unknown_top_level:?}; \
allowed: {KUBECTL_ALLOWED:?}",
)));
}
let api_version = obj
.get("apiVersion")
.and_then(|v| v.as_str())
.ok_or_else(|| {
AppError::bad_request(
"kubectl-style formation: missing or non-string 'apiVersion' (expected \"cellos.dev/v1\")"
.to_string(),
)
})?;
if api_version != "cellos.dev/v1" {
return Err(AppError::bad_request(format!(
"kubectl-style formation: unsupported apiVersion '{api_version}' (expected \"cellos.dev/v1\")"
)));
}
let kind = obj.get("kind").and_then(|v| v.as_str()).ok_or_else(|| {
AppError::bad_request(
"kubectl-style formation: missing or non-string 'kind' (expected \"Formation\")"
.to_string(),
)
})?;
if kind != "Formation" {
return Err(AppError::bad_request(format!(
"kubectl-style formation: unsupported kind '{kind}' (expected \"Formation\")"
)));
}
let metadata = obj
.get("metadata")
.and_then(|v| v.as_object())
.ok_or_else(|| {
AppError::bad_request("kubectl-style formation: missing 'metadata' object".to_string())
})?;
let name = metadata
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| {
AppError::bad_request("kubectl-style formation: missing 'metadata.name'".to_string())
})?;
let spec = obj.get("spec").and_then(|v| v.as_object()).ok_or_else(|| {
AppError::bad_request("kubectl-style formation: missing 'spec' object".to_string())
})?;
let coordinator = spec
.get("coordinator")
.and_then(|v| v.as_str())
.ok_or_else(|| {
AppError::bad_request("kubectl-style formation: missing 'spec.coordinator'".to_string())
})?;
let members_raw = spec
.get("members")
.and_then(|v| v.as_array())
.ok_or_else(|| {
AppError::bad_request(
"kubectl-style formation: missing or non-array 'spec.members'".to_string(),
)
})?;
let mut members_flat = Vec::with_capacity(members_raw.len());
for (idx, m) in members_raw.iter().enumerate() {
let m_obj = m.as_object().ok_or_else(|| {
AppError::bad_request(format!(
"kubectl-style formation: spec.members[{idx}] is not an object"
))
})?;
let member_name = m_obj.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
AppError::bad_request(format!(
"kubectl-style formation: spec.members[{idx}] missing 'name'"
))
})?;
if m_obj.contains_key("id") {
return Err(AppError::bad_request(format!(
"kubectl-style formation: spec.members[{idx}] declares both 'name' \
and 'id'; kubectl manifests address members by 'name' only — \
remove the 'id' field"
)));
}
let mut rewritten = serde_json::Map::with_capacity(m_obj.len());
rewritten.insert(
"id".to_string(),
serde_json::Value::String(member_name.to_string()),
);
for (k, v) in m_obj.iter() {
if k == "name" {
continue; }
rewritten.insert(k.clone(), v.clone());
}
members_flat.push(serde_json::Value::Object(rewritten));
}
let mut flat_metadata = serde_json::Map::new();
if let Some(labels) = metadata.get("labels") {
if !labels.is_object() {
return Err(AppError::bad_request(
"kubectl-style formation: 'metadata.labels' must be an object",
));
}
flat_metadata.insert("labels".to_string(), labels.clone());
}
if let Some(annotations) = metadata.get("annotations") {
if !annotations.is_object() {
return Err(AppError::bad_request(
"kubectl-style formation: 'metadata.annotations' must be an object",
));
}
flat_metadata.insert("annotations".to_string(), annotations.clone());
}
let mut flat = serde_json::Map::with_capacity(4);
flat.insert(
"name".to_string(),
serde_json::Value::String(name.to_string()),
);
flat.insert(
"coordinator".to_string(),
serde_json::Value::String(coordinator.to_string()),
);
flat.insert(
"members".to_string(),
serde_json::Value::Array(members_flat),
);
if !flat_metadata.is_empty() {
flat.insert(
"metadata".to_string(),
serde_json::Value::Object(flat_metadata),
);
}
Ok(serde_json::Value::Object(flat))
}
fn validate_formation_name(name: &str) -> Result<(), AppError> {
if name.is_empty() {
return Err(AppError::bad_request(
"formation name must not be empty".to_string(),
));
}
if name.len() > 253 {
return Err(AppError::bad_request(format!(
"formation name length {} exceeds maximum of 253 bytes",
name.len()
)));
}
if name == "." || name == ".." {
return Err(AppError::bad_request(format!(
"formation name '{name}' is reserved"
)));
}
for (idx, b) in name.as_bytes().iter().enumerate() {
let ok = matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'-' | b'_' | b'.');
if !ok {
let rendered = if b.is_ascii_graphic() {
format!("'{}'", *b as char)
} else {
format!("\\x{b:02x}")
};
return Err(AppError::bad_request(format!(
"formation name contains disallowed character {rendered} at byte offset {idx} \
(allowed: A-Z a-z 0-9 . - _)"
)));
}
}
let first = name.as_bytes()[0];
let last = name.as_bytes()[name.len() - 1];
if first == b'-' || first == b'.' {
return Err(AppError::bad_request(format!(
"formation name '{name}' must not start with '-' or '.'"
)));
}
if last == b'-' || last == b'.' {
return Err(AppError::bad_request(format!(
"formation name '{name}' must not end with '-' or '.'"
)));
}
Ok(())
}
fn validate_formation(doc: &FormationDocument) -> Result<(), AppError> {
use std::collections::{HashMap, HashSet};
let coord_present = doc.members.iter().any(|m| m.id == doc.coordinator);
if !coord_present {
return Err(AppError::new(
AppErrorKind::FormationNoCoordinator,
format!(
"coordinator '{}' must appear in members list",
doc.coordinator
),
));
}
let mut seen: HashSet<&str> = HashSet::new();
for m in &doc.members {
if !seen.insert(m.id.as_str()) {
return Err(AppError::new(
AppErrorKind::FormationDuplicateMemberId,
format!("duplicate member id '{}'", m.id),
));
}
}
for m in &doc.members {
let is_coord = m.id == doc.coordinator;
match (is_coord, &m.authorized_by) {
(true, Some(_)) => {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("coordinator '{}' must not declare authorizedBy", m.id),
));
}
(false, None) => {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("non-coordinator member '{}' missing authorizedBy", m.id),
));
}
(false, Some(parent)) => {
if !seen.contains(parent.as_str()) {
return Err(AppError::new(
AppErrorKind::FormationAuthorityNotNarrowing,
format!("member '{}' references unknown parent '{}'", m.id, parent),
));
}
}
_ => {}
}
}
let parent: HashMap<&str, &str> = doc
.members
.iter()
.filter_map(|m| m.authorized_by.as_deref().map(|p| (m.id.as_str(), p)))
.collect();
for m in &doc.members {
if m.id == doc.coordinator {
continue;
}
let mut cursor = m.id.as_str();
for _ in 0..doc.members.len() {
let Some(&p) = parent.get(cursor) else {
break;
};
if p == m.id {
return Err(AppError::new(
AppErrorKind::FormationCycle,
format!("authorizedBy cycle detected involving member '{}'", m.id),
));
}
cursor = p;
}
if parent.contains_key(cursor) {
return Err(AppError::new(
AppErrorKind::FormationCycle,
format!(
"authorizedBy cycle detected on chain starting at '{}'",
m.id
),
));
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::router;
use axum::body::Body;
use axum::http::{header, Request};
use http_body_util::BodyExt;
use tower::ServiceExt;
const TOKEN: &str = "test-token";
fn test_state() -> AppState {
AppState::new(None, TOKEN)
}
fn auth_req(method: &str, uri: &str, body: Option<&str>) -> Request<Body> {
let mut b = Request::builder()
.method(method)
.uri(uri)
.header(header::AUTHORIZATION, format!("Bearer {TOKEN}"));
if body.is_some() {
b = b.header(header::CONTENT_TYPE, "application/json");
}
b.body(
body.map(|s| Body::from(s.to_owned()))
.unwrap_or_else(Body::empty),
)
.expect("build request")
}
#[tokio::test]
async fn post_valid_formation_returns_201() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" },
{ "id": "worker-b", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["status"], "PENDING");
assert_eq!(parsed["name"], "demo");
assert!(parsed["id"].as_str().is_some());
}
#[tokio::test]
async fn post_formation_missing_coordinator_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let ct = resp
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
assert!(
ct.starts_with("application/problem+json"),
"expected RFC 9457 media type, got {ct:?}"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
}
#[tokio::test]
async fn post_formation_member_missing_authorized_by_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a" } ]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/formation/authority-not-narrowing",
"expected authority-not-narrowing discriminant, got {parsed}"
);
}
#[tokio::test]
async fn get_formations_returns_snapshot_with_cursor() {
let app = router(test_state());
let resp = app
.oneshot(auth_req("GET", "/v1/formations", None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert!(parsed.is_object(), "expected snapshot object, got {parsed}");
let arr = parsed["formations"].as_array().expect("formations array");
assert_eq!(arr.len(), 0);
assert!(
parsed["cursor"].is_u64(),
"cursor field must be an unsigned integer, got {}",
parsed["cursor"]
);
assert_eq!(parsed["cursor"].as_u64(), Some(0));
}
#[tokio::test]
async fn snapshot_returns_cursor() {
let app = router(test_state());
let body = serde_json::json!({
"name": "with-cursor",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.clone()
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let resp = app
.oneshot(auth_req("GET", "/v1/formations", None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert!(
parsed["cursor"].is_u64(),
"cursor must be unsigned integer; got {}",
parsed["cursor"]
);
let formations = parsed["formations"].as_array().expect("formations array");
assert_eq!(formations.len(), 1, "expected 1 formation after POST");
assert_eq!(formations[0]["name"], "with-cursor");
}
#[tokio::test]
async fn missing_bearer_returns_401() {
let app = router(test_state());
let resp = app
.oneshot(
Request::builder()
.method("GET")
.uri("/v1/formations")
.body(Body::empty())
.unwrap(),
)
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn rejects_duplicate_member_ids_with_dedicated_type() {
let app = router(test_state());
let body = serde_json::json!({
"name": "dup-ids",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "coord", "authorizedBy": "coord" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/formation/duplicate-member-id",
"duplicate member ids must surface the dedicated discriminant"
);
let detail = parsed["detail"].as_str().unwrap_or("");
assert!(
detail.contains("coord"),
"detail must name the duplicate id, got: {detail}",
);
}
#[tokio::test]
async fn rejects_self_authorized_cycle() {
let app = router(test_state());
let body = serde_json::json!({
"name": "self-cycle",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "worker-a" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/cycle");
}
#[tokio::test]
async fn rejects_two_node_cycle() {
let app = router(test_state());
let body = serde_json::json!({
"name": "two-cycle",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "a", "authorizedBy": "b" },
{ "id": "b", "authorizedBy": "a" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/cycle");
}
#[tokio::test]
async fn rejects_orphan_parent_reference() {
let app = router(test_state());
let body = serde_json::json!({
"name": "orphan-parent",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "ghost" }
]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"],
"/problems/formation/authority-not-narrowing"
);
}
#[tokio::test]
async fn post_formation_oversized_body_returns_413() {
let app = router(test_state());
let big = "x".repeat(70 * 1024);
let body =
format!(r#"{{"name":"{big}","coordinator":"coord","members":[{{"id":"coord"}}]}}"#,);
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::PAYLOAD_TOO_LARGE,
"oversized body must surface 413; got {:?}",
resp.status(),
);
}
#[tokio::test]
async fn get_formation_by_id_captures_path() {
let state = test_state();
let body = serde_json::json!({
"name": "probe",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let id = parsed["id"].as_str().expect("uuid string");
let resp = router(state)
.oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::OK,
"GET /v1/formations/<id> must capture the path segment; got {:?}",
resp.status(),
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["name"], "probe");
}
#[tokio::test]
async fn post_kubectl_style_formation_returns_201() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "kubectl-demo" },
"spec": {
"coordinator": "coord",
"members": [
{ "name": "coord" },
{ "name": "worker-a", "authorizedBy": "coord" },
{ "name": "worker-b", "authorizedBy": "coord" }
]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["status"], "PENDING");
assert_eq!(parsed["name"], "kubectl-demo");
assert!(parsed["id"].as_str().is_some());
}
#[tokio::test]
async fn post_kubectl_style_missing_coordinator_returns_no_coordinator() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "missing-coord" },
"spec": {
"coordinator": "coord",
"members": [
{ "name": "worker-a", "authorizedBy": "coord" }
]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/formation/no-coordinator");
}
#[tokio::test]
async fn post_hybrid_formation_returns_400_bad_request() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "hybrid" },
"spec": {
"coordinator": "coord",
"members": [ { "name": "coord" } ]
},
"name": "hybrid",
"members": [ { "id": "coord" } ]
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/bad-request",
"hybrid shape must surface a generic bad-request, not an admission discriminant"
);
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("hybrid"),
"detail must mention 'hybrid'; got {detail:?}"
);
}
#[tokio::test]
async fn post_kubectl_style_wrong_api_version_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v2",
"kind": "Formation",
"metadata": { "name": "wrong-api" },
"spec": {
"coordinator": "coord",
"members": [ { "name": "coord" } ]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/bad-request");
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("apiVersion") && detail.contains("cellos.dev/v2"),
"detail must name the bad apiVersion; got {detail:?}"
);
}
#[tokio::test]
async fn post_kubectl_style_wrong_kind_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Cell",
"metadata": { "name": "wrong-kind" },
"spec": {
"coordinator": "coord",
"members": [ { "name": "coord" } ]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/bad-request");
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("kind") && detail.contains("Cell"),
"detail must name the bad kind; got {detail:?}"
);
}
#[tokio::test]
async fn kubectl_style_post_then_get_returns_normalized_flat_document() {
let state = test_state();
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "roundtrip" },
"spec": {
"coordinator": "coord",
"members": [
{ "name": "coord" },
{ "name": "worker-a", "authorizedBy": "coord" }
]
}
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let id = parsed["id"].as_str().expect("uuid string");
let resp = router(state)
.oneshot(auth_req("GET", &format!("/v1/formations/{id}"), None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let doc = &parsed["document"];
assert_eq!(doc["name"], "roundtrip", "flat 'name' present");
assert_eq!(doc["coordinator"], "coord", "flat 'coordinator' present");
let members = doc["members"]
.as_array()
.expect("members array on normalized doc");
assert_eq!(members.len(), 2);
assert_eq!(members[0]["id"], "coord");
assert_eq!(members[1]["id"], "worker-a");
assert_eq!(members[1]["authorizedBy"], "coord");
assert!(
doc.get("apiVersion").is_none(),
"kubectl envelope must not leak into normalized doc"
);
assert!(doc.get("kind").is_none());
assert!(doc.get("metadata").is_none());
assert!(doc.get("spec").is_none());
}
#[tokio::test]
async fn kubectl_member_with_explicit_id_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "rt3-ctl-003-a" },
"spec": {
"coordinator": "alice",
"members": [
{ "name": "alice", "id": "bob" },
{ "name": "worker-a", "authorizedBy": "alice" }
]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::BAD_REQUEST,
"manifest with both name+id at member level must be rejected"
);
let ct = resp
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
assert!(
ct.starts_with("application/problem+json"),
"expected RFC 9457 media type, got {ct:?}"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/bad-request",
"kubectl id-conflict is a generic bad-request, not an ADR-0010 discriminant"
);
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("'name'") && detail.contains("'id'"),
"detail must name both conflicting fields; got {detail:?}"
);
}
#[tokio::test]
async fn delete_by_name_with_duplicates_returns_409() {
let state = test_state();
let id_a = Uuid::new_v4();
let id_b = Uuid::new_v4();
{
let mut map = state.formations.write().await;
for id in [id_a, id_b] {
map.insert(
id,
FormationRecord {
id,
name: "rt3-dup".to_string(),
status: FormationStatus::Pending,
document: serde_json::json!({"name": "rt3-dup"}),
},
);
}
}
let resp = router(state.clone())
.oneshot(auth_req("DELETE", "/v1/formations/by-name/rt3-dup", None))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::CONFLICT,
"duplicate-name DELETE must surface 409, not silently delete"
);
let ct = resp
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
assert!(
ct.starts_with("application/problem+json"),
"expected RFC 9457 media type, got {ct:?}"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/conflict");
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains(&id_a.to_string()) && detail.contains(&id_b.to_string()),
"detail must list BOTH conflicting UUIDs so the operator can disambiguate; \
got {detail:?}"
);
assert!(
detail.contains("rt3-dup"),
"detail must name the conflicting formation name; got {detail:?}"
);
let map = state.formations.read().await;
assert!(map.contains_key(&id_a), "id_a must still exist after 409");
assert!(map.contains_key(&id_b), "id_b must still exist after 409");
assert_eq!(map.get(&id_a).unwrap().status, FormationStatus::Pending);
assert_eq!(map.get(&id_b).unwrap().status, FormationStatus::Pending);
}
#[tokio::test]
async fn unknown_state_returns_bad_request_problem_type() {
let state = test_state();
let body = serde_json::json!({
"name": "demo",
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let id = parsed["id"].as_str().expect("uuid string").to_owned();
let bad = serde_json::json!({ "state": "TELEPORTING" }).to_string();
let resp = router(state)
.oneshot(auth_req(
"POST",
&format!("/v1/formations/{id}/status"),
Some(&bad),
))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/bad-request",
"unknown state must surface generic bad-request, not an ADR-0010 discriminant"
);
}
fn minimal_body(name: &str) -> String {
serde_json::json!({
"name": name,
"coordinator": "coord",
"members": [
{ "id": "coord" },
{ "id": "worker-a", "authorizedBy": "coord" }
]
})
.to_string()
}
async fn assert_name_rejected_bad_request(name: &str, expect_in_detail: &str) {
let app = router(test_state());
let body = minimal_body(name);
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::BAD_REQUEST,
"name {name:?} must be rejected with 400; got {:?}",
resp.status()
);
let ct = resp
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
assert!(
ct.starts_with("application/problem+json"),
"name {name:?} must surface RFC 9457 media type; got {ct:?}"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["type"], "/problems/bad-request",
"name {name:?} must surface generic bad-request, not an admission discriminant; got {parsed}"
);
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains(expect_in_detail),
"detail for {name:?} must contain {expect_in_detail:?}; got {detail:?}"
);
}
#[tokio::test]
async fn rejects_empty_name() {
assert_name_rejected_bad_request("", "empty").await;
}
#[tokio::test]
async fn rejects_whitespace_only_name() {
assert_name_rejected_bad_request(" ", "disallowed character").await;
}
#[tokio::test]
async fn rejects_newline_in_name() {
assert_name_rejected_bad_request("a\nb", "disallowed character").await;
}
#[tokio::test]
async fn rejects_tab_in_name() {
assert_name_rejected_bad_request("a\tb", "disallowed character").await;
}
#[tokio::test]
async fn rejects_nul_byte_in_name() {
assert_name_rejected_bad_request("a\0b", "disallowed character").await;
}
#[tokio::test]
async fn rejects_non_ascii_in_name() {
assert_name_rejected_bad_request("café", "disallowed character").await;
}
#[tokio::test]
async fn rejects_overlong_name() {
let long = "a".repeat(254);
assert_name_rejected_bad_request(&long, "exceeds maximum").await;
}
#[tokio::test]
async fn rejects_leading_hyphen() {
assert_name_rejected_bad_request("-leading", "start with").await;
}
#[tokio::test]
async fn rejects_trailing_dot() {
assert_name_rejected_bad_request("trailing.", "end with").await;
}
#[tokio::test]
async fn rejects_single_dot_reserved_name() {
assert_name_rejected_bad_request(".", "reserved").await;
}
#[tokio::test]
async fn rejects_double_dot_reserved_name() {
let app = router(test_state());
let body = minimal_body("..");
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/bad-request");
}
#[tokio::test]
async fn accepts_simple_lowercase_name() {
let app = router(test_state());
let body = minimal_body("demo");
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
}
#[tokio::test]
async fn accepts_hyphenated_name() {
let app = router(test_state());
let body = minimal_body("my-formation-v2");
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
}
#[tokio::test]
async fn accepts_dotted_and_underscored_name() {
let app = router(test_state());
let body = minimal_body("team.alpha_one");
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
}
#[tokio::test]
async fn duplicate_name_returns_409() {
let state = test_state();
let body = minimal_body("demo");
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::CREATED,
"first POST must succeed"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let first: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let first_id = first["id"]
.as_str()
.expect("first POST returned uuid")
.to_owned();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::CONFLICT,
"duplicate name must surface 409; got {:?}",
resp.status()
);
let ct = resp
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_owned();
assert!(
ct.starts_with("application/problem+json"),
"409 must use RFC 9457 media type; got {ct:?}"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/conflict");
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains(&first_id),
"conflict detail must name the existing UUID {first_id}; got {detail:?}"
);
assert!(
detail.contains("demo"),
"conflict detail must name the conflicting name; got {detail:?}"
);
let resp = router(state)
.oneshot(auth_req("GET", "/v1/formations/by-name/demo", None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(
parsed["id"].as_str(),
Some(first_id.as_str()),
"first formation must remain addressable by name"
);
}
#[tokio::test]
async fn kubectl_metadata_labels_and_annotations_round_trip() {
let state = test_state();
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": {
"name": "labelled",
"labels": {
"app": "celltest",
"tier": "frontend"
},
"annotations": {
"operator": "ryan@cellos.dev",
"audit/ticket": "OPS-1234"
}
},
"spec": {
"coordinator": "coord",
"members": [
{ "name": "coord" },
{ "name": "worker-a", "authorizedBy": "coord" }
]
}
})
.to_string();
let resp = router(state.clone())
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::CREATED);
let resp = router(state)
.oneshot(auth_req("GET", "/v1/formations/by-name/labelled", None))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let labels = &parsed["document"]["metadata"]["labels"];
assert_eq!(labels["app"], "celltest", "label 'app' lost in round-trip");
assert_eq!(
labels["tier"], "frontend",
"label 'tier' lost in round-trip"
);
let annotations = &parsed["document"]["metadata"]["annotations"];
assert_eq!(
annotations["operator"], "ryan@cellos.dev",
"annotation 'operator' lost in round-trip"
);
assert_eq!(
annotations["audit/ticket"], "OPS-1234",
"annotation 'audit/ticket' lost in round-trip"
);
}
#[tokio::test]
async fn kubectl_unknown_top_level_field_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": { "name": "with-status" },
"spec": {
"coordinator": "coord",
"members": [{ "name": "coord" }]
},
"status": {
"phase": "Running",
"observedGeneration": 42
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(
resp.status(),
StatusCode::BAD_REQUEST,
"unknown top-level 'status' must surface 400; got {:?}",
resp.status(),
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(parsed["type"], "/problems/bad-request");
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("status"),
"detail must name the offending field 'status'; got: {detail}",
);
assert!(
detail.contains("unknown top-level"),
"detail must say 'unknown top-level' so the operator knows the failure class; got: {detail}",
);
}
#[tokio::test]
async fn kubectl_non_object_labels_returns_400() {
let app = router(test_state());
let body = serde_json::json!({
"apiVersion": "cellos.dev/v1",
"kind": "Formation",
"metadata": {
"name": "bad-labels",
"labels": "app=celltest"
},
"spec": {
"coordinator": "coord",
"members": [{ "name": "coord" }]
}
})
.to_string();
let resp = app
.oneshot(auth_req("POST", "/v1/formations", Some(&body)))
.await
.expect("router response");
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let parsed: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
let detail = parsed["detail"].as_str().unwrap_or_default();
assert!(
detail.contains("metadata.labels"),
"detail must name 'metadata.labels'; got: {detail}",
);
}
#[test]
fn update_formation_status_captures_time_before_lock() {
let src = include_str!("formations.rs");
let fn_start = src
.find("pub async fn update_formation_status(")
.expect("update_formation_status not found");
let after_fn = &src[fn_start..];
let next_top_level_fn = after_fn[1..]
.find("\nfn ")
.map(|i| i + 1)
.or_else(|| after_fn[1..].find("\nasync fn ").map(|i| i + 1))
.or_else(|| after_fn[1..].find("\npub fn ").map(|i| i + 1))
.or_else(|| after_fn[1..].find("\npub async fn ").map(|i| i + 1))
.unwrap_or(after_fn.len() - 1);
let body = &after_fn[..next_top_level_fn];
let time_capture_pos = body
.find("let now_rfc3339 = chrono::Utc::now().to_rfc3339_opts(")
.expect(
"EVT-CONTENT-001-C regression: `let now_rfc3339 = chrono::Utc::now()` not found \
in update_formation_status",
);
let lock_pos = body.find("state.formations.write().await").expect(
"update_formation_status no longer acquires a write lock — test must be updated",
);
assert!(
time_capture_pos < lock_pos,
"EVT-CONTENT-001-C regression: `now_rfc3339` was captured at byte offset \
{time_capture_pos} but the write lock was taken at offset {lock_pos}. \
Move the `chrono::Utc::now().to_rfc3339_opts(...)` call ABOVE the \
`state.formations.write().await` line so CloudEvent `time` reflects \
transition arrival rather than post-commit wall-clock.",
);
}
}