use cellos_core::events::{
cloud_event_v1_formation_completed, cloud_event_v1_formation_created,
cloud_event_v1_formation_degraded, cloud_event_v1_formation_failed,
cloud_event_v1_formation_launching, cloud_event_v1_formation_running,
};
use cellos_server::{AppState, FormationStatus};
use uuid::Uuid;
fn ce_bytes(ev: cellos_core::CloudEventV1) -> Vec<u8> {
serde_json::to_vec(&ev).expect("CloudEventV1 serialises")
}
type PhaseCase = (
&'static str,
&'static dyn Fn(&str, &str) -> cellos_core::CloudEventV1,
FormationStatus,
);
#[tokio::test]
async fn canonical_formation_emitters_drive_projection_to_expected_status() {
let cases: &[PhaseCase] = &[
(
"created",
&|id, name| {
cloud_event_v1_formation_created(id, "1970-01-01T00:00:00Z", id, name, 0, &[], None)
},
FormationStatus::Pending,
),
(
"launching",
&|id, name| {
cloud_event_v1_formation_launching(
id,
"1970-01-01T00:00:00Z",
id,
name,
0,
&[],
None,
)
},
FormationStatus::Running,
),
(
"running",
&|id, name| {
cloud_event_v1_formation_running(id, "1970-01-01T00:00:00Z", id, name, 0, &[], None)
},
FormationStatus::Running,
),
(
"degraded",
&|id, name| {
cloud_event_v1_formation_degraded(
id,
"1970-01-01T00:00:00Z",
id,
name,
1,
&["cell-a".to_string()],
Some("critical cell down"),
)
},
FormationStatus::Running,
),
(
"completed",
&|id, name| {
cloud_event_v1_formation_completed(
id,
"1970-01-01T00:00:00Z",
id,
name,
0,
&[],
None,
)
},
FormationStatus::Succeeded,
),
(
"failed",
&|id, name| {
cloud_event_v1_formation_failed(
id,
"1970-01-01T00:00:00Z",
id,
name,
1,
&["cell-a".to_string()],
Some("admission gate"),
)
},
FormationStatus::Failed,
),
];
let state = AppState::new(None, "test");
for (phase, build, expected) in cases {
let id = Uuid::new_v4();
let id_str = id.to_string();
let name = format!("wave2-{phase}-formation");
let ev = (build)(id_str.as_str(), name.as_str());
let payload = ce_bytes(ev);
let outcome = state
.apply_event_payload(&payload)
.await
.unwrap_or_else(|e| panic!("apply_event_payload failed for phase {phase}: {e}"));
assert!(
matches!(outcome, cellos_server::state::ApplyOutcome::Applied),
"phase {phase}: expected ApplyOutcome::Applied, got {outcome:?}"
);
let map = state.formations.read().await;
let rec = map.get(&id).unwrap_or_else(|| {
panic!("phase {phase}: formation {id_str} not present in projection")
});
assert_eq!(
rec.status, *expected,
"phase {phase}: status drift — got {:?}, want {:?}",
rec.status, expected
);
assert_eq!(
rec.name, name,
"phase {phase}: name drift — got {:?}, want {:?}",
rec.name, name
);
drop(map);
}
}
#[tokio::test]
async fn legacy_io_cellos_formation_prefix_still_replays() {
let state = AppState::new(None, "test");
let id = Uuid::new_v4();
let payload = serde_json::to_vec(&serde_json::json!({
"specversion": "1.0",
"id": "legacy-evt-1",
"source": format!("/formations/{id}"),
"type": "io.cellos.formation.v1.created",
"data": {
"formation_id": id.to_string(),
"name": "legacy-shape",
},
}))
.unwrap();
let outcome = state
.apply_event_payload(&payload)
.await
.expect("legacy shape must apply");
assert!(matches!(
outcome,
cellos_server::state::ApplyOutcome::Applied
));
let map = state.formations.read().await;
let rec = map.get(&id).expect("legacy formation present");
assert_eq!(rec.status, FormationStatus::Pending);
assert_eq!(rec.name, "legacy-shape");
}
#[test]
fn formation_record_serialises_status_field_in_uppercase() {
let rec = cellos_server::FormationRecord {
id: Uuid::nil(),
name: "ctr".into(),
status: FormationStatus::Running,
document: serde_json::Value::Null,
};
let body = serde_json::to_value(&rec).expect("serialise");
assert!(
body.get("status").is_some(),
"FormationRecord wire shape must keep `status` (cellos-events depends on it): {body}"
);
assert_eq!(
body.get("status").and_then(|v| v.as_str()),
Some("RUNNING"),
"FormationStatus must serialise UPPERCASE: {body}"
);
}