use std::collections::BTreeSet;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{
CellosError, CloudEventV1, ExportReceiptTargetKind, IdentityFailureOperation, PlacementSpec,
};
const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
const IDENTITY_MATERIALIZED: &str = "dev.cellos.events.cell.identity.v1.materialized";
const IDENTITY_FAILED: &str = "dev.cellos.events.cell.identity.v1.failed";
const IDENTITY_REVOKED: &str = "dev.cellos.events.cell.identity.v1.revoked";
const COMMAND_COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
const EXPORT_COMPLETED: &str = "dev.cellos.events.cell.export.v2.completed";
const EXPORT_FAILED: &str = "dev.cellos.events.cell.export.v2.failed";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ProjectionLifecycleStage {
Pending,
Started,
CommandCompleted,
Destroyed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ProjectionIdentityStage {
Unknown,
Materialized,
MaterializeFailed,
Revoked,
RevokeFailed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ProjectionExportStage {
None,
Completed,
Failed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ProjectionCurrentState {
Pending,
Started,
IdentityReady,
IdentityFailed,
CommandSucceeded,
CommandFailed,
ExportSucceeded,
ExportFailed,
Destroyed,
DestroyFailed,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ExportProjectionRecord {
pub target_kind: Option<ExportReceiptTargetKind>,
pub target_name: Option<String>,
pub destination: Option<String>,
pub bytes_written: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CellStateProjection {
pub spec_id: Option<String>,
pub cell_id: Option<String>,
pub run_id: Option<String>,
pub placement: Option<PlacementSpec>,
pub lifecycle_stage: ProjectionLifecycleStage,
pub identity_stage: ProjectionIdentityStage,
pub export_stage: ProjectionExportStage,
pub command_exit_code: Option<i32>,
pub destroy_reason: Option<String>,
pub last_error: Option<String>,
pub exports: Vec<ExportProjectionRecord>,
pub processed_events: u64,
#[serde(skip)]
applied_event_ids: BTreeSet<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CellStateSnapshot {
pub spec_id: Option<String>,
pub cell_id: Option<String>,
pub run_id: Option<String>,
pub placement: Option<PlacementSpec>,
pub lifecycle_stage: ProjectionLifecycleStage,
pub identity_stage: ProjectionIdentityStage,
pub export_stage: ProjectionExportStage,
pub current_state: ProjectionCurrentState,
pub command_exit_code: Option<i32>,
pub destroy_reason: Option<String>,
pub last_error: Option<String>,
pub exports: Vec<ExportProjectionRecord>,
pub processed_events: u64,
}
impl Default for CellStateProjection {
fn default() -> Self {
Self {
spec_id: None,
cell_id: None,
run_id: None,
placement: None,
lifecycle_stage: ProjectionLifecycleStage::Pending,
identity_stage: ProjectionIdentityStage::Unknown,
export_stage: ProjectionExportStage::None,
command_exit_code: None,
destroy_reason: None,
last_error: None,
exports: Vec::new(),
processed_events: 0,
applied_event_ids: BTreeSet::new(),
}
}
}
impl CellStateProjection {
pub fn apply(&mut self, event: &CloudEventV1) -> Result<bool, CellosError> {
if !self.applied_event_ids.insert(event.id.clone()) {
return Ok(false);
}
let Some(data) = event.data.as_ref() else {
self.applied_event_ids.remove(&event.id);
return Err(CellosError::Lifecycle(format!(
"event {:?} is missing data payload",
event.ty
)));
};
let is_cell_event = self.try_bind_identity(data)?;
if !is_cell_event {
self.applied_event_ids.remove(&event.id);
return Ok(false);
}
match event.ty.as_str() {
STARTED => self.apply_started(),
IDENTITY_MATERIALIZED => self.apply_identity_materialized(),
IDENTITY_FAILED => self.apply_identity_failed(data),
IDENTITY_REVOKED => self.apply_identity_revoked(),
COMMAND_COMPLETED => self.apply_command_completed(data),
EXPORT_COMPLETED => self.apply_export_completed(data),
EXPORT_FAILED => self.apply_export_failed(data),
DESTROYED => self.apply_destroyed(data),
_ => {
self.applied_event_ids.remove(&event.id);
return Ok(false);
}
}?;
self.processed_events += 1;
Ok(true)
}
pub fn current_state(&self) -> ProjectionCurrentState {
if self.lifecycle_stage == ProjectionLifecycleStage::Destroyed {
if self.destroy_reason.is_some() {
return ProjectionCurrentState::DestroyFailed;
}
return ProjectionCurrentState::Destroyed;
}
match self.export_stage {
ProjectionExportStage::Completed => return ProjectionCurrentState::ExportSucceeded,
ProjectionExportStage::Failed => return ProjectionCurrentState::ExportFailed,
ProjectionExportStage::None => {}
}
if let Some(exit_code) = self.command_exit_code {
return if exit_code == 0 {
ProjectionCurrentState::CommandSucceeded
} else {
ProjectionCurrentState::CommandFailed
};
}
match self.identity_stage {
ProjectionIdentityStage::Materialized => ProjectionCurrentState::IdentityReady,
ProjectionIdentityStage::MaterializeFailed => ProjectionCurrentState::IdentityFailed,
_ if self.lifecycle_stage == ProjectionLifecycleStage::Started => {
ProjectionCurrentState::Started
}
_ => ProjectionCurrentState::Pending,
}
}
pub fn snapshot(&self) -> CellStateSnapshot {
CellStateSnapshot::from(self)
}
fn apply_started(&mut self) -> Result<(), CellosError> {
if self.lifecycle_stage != ProjectionLifecycleStage::Pending {
return Err(illegal_transition("started", self));
}
self.lifecycle_stage = ProjectionLifecycleStage::Started;
Ok(())
}
fn apply_identity_materialized(&mut self) -> Result<(), CellosError> {
require_started(self, IDENTITY_MATERIALIZED)?;
if self.identity_stage == ProjectionIdentityStage::MaterializeFailed {
return Err(illegal_transition(
"identity.materialized after materialize failure",
self,
));
}
self.identity_stage = ProjectionIdentityStage::Materialized;
Ok(())
}
fn apply_identity_failed(&mut self, data: &Value) -> Result<(), CellosError> {
require_started(self, IDENTITY_FAILED)?;
let operation = match required_str(data, "operation", IDENTITY_FAILED)? {
"materialize" => IdentityFailureOperation::Materialize,
"revoke" => IdentityFailureOperation::Revoke,
other => {
return Err(CellosError::Lifecycle(format!(
"unknown identity failure operation {other:?}"
)))
}
};
let reason = required_str(data, "reason", IDENTITY_FAILED)?;
self.last_error = Some(reason.to_string());
match operation {
IdentityFailureOperation::Materialize => {
if self.command_exit_code.is_some() {
return Err(illegal_transition(
"identity.materialize failure after command completion",
self,
));
}
self.identity_stage = ProjectionIdentityStage::MaterializeFailed;
}
IdentityFailureOperation::Revoke => {
self.identity_stage = ProjectionIdentityStage::RevokeFailed;
}
}
Ok(())
}
fn apply_identity_revoked(&mut self) -> Result<(), CellosError> {
require_started(self, IDENTITY_REVOKED)?;
self.identity_stage = ProjectionIdentityStage::Revoked;
Ok(())
}
fn apply_command_completed(&mut self, data: &Value) -> Result<(), CellosError> {
require_started(self, COMMAND_COMPLETED)?;
if self.command_exit_code.is_some() {
return Err(illegal_transition("command.completed twice", self));
}
if self.identity_stage == ProjectionIdentityStage::MaterializeFailed {
return Err(illegal_transition(
"command.completed after identity materialize failure",
self,
));
}
self.command_exit_code = Some(required_i32(data, "exitCode", COMMAND_COMPLETED)?);
self.lifecycle_stage = ProjectionLifecycleStage::CommandCompleted;
Ok(())
}
fn apply_export_completed(&mut self, data: &Value) -> Result<(), CellosError> {
require_command_completed(self, EXPORT_COMPLETED)?;
self.export_stage = ProjectionExportStage::Completed;
self.exports.push(ExportProjectionRecord {
target_kind: optional_enum(data, "targetKind")?,
target_name: optional_string(data, "targetName"),
destination: optional_string(data, "destination"),
bytes_written: optional_u64(data, "bytesWritten"),
});
Ok(())
}
fn apply_export_failed(&mut self, data: &Value) -> Result<(), CellosError> {
require_command_completed(self, EXPORT_FAILED)?;
self.export_stage = ProjectionExportStage::Failed;
self.last_error = Some(required_str(data, "reason", EXPORT_FAILED)?.to_string());
Ok(())
}
fn apply_destroyed(&mut self, data: &Value) -> Result<(), CellosError> {
if self.lifecycle_stage == ProjectionLifecycleStage::Pending {
return Err(illegal_transition("destroyed before started", self));
}
self.lifecycle_stage = ProjectionLifecycleStage::Destroyed;
self.destroy_reason = optional_string(data, "reason");
if let Some(reason) = self.destroy_reason.clone() {
self.last_error = Some(reason);
}
Ok(())
}
fn try_bind_identity(&mut self, data: &Value) -> Result<bool, CellosError> {
let spec_id = optional_string(data, "specId");
let cell_id = optional_string(data, "cellId");
if spec_id.is_none() && cell_id.is_none() {
return Ok(false);
}
bind_optional("specId", &mut self.spec_id, spec_id.as_deref())?;
bind_optional("cellId", &mut self.cell_id, cell_id.as_deref())?;
bind_optional(
"runId",
&mut self.run_id,
optional_string(data, "runId").as_deref(),
)?;
bind_optional_placement(&mut self.placement, optional_placement(data)?)?;
Ok(true)
}
}
impl From<&CellStateProjection> for CellStateSnapshot {
fn from(value: &CellStateProjection) -> Self {
Self {
spec_id: value.spec_id.clone(),
cell_id: value.cell_id.clone(),
run_id: value.run_id.clone(),
placement: value.placement.clone(),
lifecycle_stage: value.lifecycle_stage,
identity_stage: value.identity_stage,
export_stage: value.export_stage,
current_state: value.current_state(),
command_exit_code: value.command_exit_code,
destroy_reason: value.destroy_reason.clone(),
last_error: value.last_error.clone(),
exports: value.exports.clone(),
processed_events: value.processed_events,
}
}
}
fn bind_optional(
field: &str,
slot: &mut Option<String>,
observed: Option<&str>,
) -> Result<(), CellosError> {
let Some(observed) = observed else {
return Ok(());
};
match slot {
Some(existing) if existing != observed => Err(CellosError::Lifecycle(format!(
"projection saw mismatched {field}: existing={existing:?}, observed={observed:?}"
))),
Some(_) => Ok(()),
None => {
*slot = Some(observed.to_string());
Ok(())
}
}
}
fn bind_optional_placement(
slot: &mut Option<PlacementSpec>,
observed: Option<PlacementSpec>,
) -> Result<(), CellosError> {
let Some(observed) = observed else {
return Ok(());
};
match slot {
Some(existing) if existing != &observed => Err(CellosError::Lifecycle(format!(
"projection saw mismatched placement: existing={existing:?}, observed={observed:?}"
))),
Some(_) => Ok(()),
None => {
*slot = Some(observed);
Ok(())
}
}
}
fn require_started(projection: &CellStateProjection, event_type: &str) -> Result<(), CellosError> {
if projection.lifecycle_stage == ProjectionLifecycleStage::Pending
|| projection.lifecycle_stage == ProjectionLifecycleStage::Destroyed
{
return Err(CellosError::Lifecycle(format!(
"illegal transition for {event_type}: lifecycle stage is {:?}",
projection.lifecycle_stage
)));
}
Ok(())
}
fn require_command_completed(
projection: &CellStateProjection,
event_type: &str,
) -> Result<(), CellosError> {
if projection.lifecycle_stage != ProjectionLifecycleStage::CommandCompleted {
return Err(CellosError::Lifecycle(format!(
"illegal transition for {event_type}: command has not completed"
)));
}
Ok(())
}
fn illegal_transition(label: &str, projection: &CellStateProjection) -> CellosError {
CellosError::Lifecycle(format!(
"illegal projection transition: {label}; lifecycle={:?}, identity={:?}, export={:?}, exit_code={:?}",
projection.lifecycle_stage,
projection.identity_stage,
projection.export_stage,
projection.command_exit_code,
))
}
fn required_field<'a>(
data: &'a Value,
field: &str,
event_type: &str,
) -> Result<&'a Value, CellosError> {
data.get(field).ok_or_else(|| {
CellosError::Lifecycle(format!(
"event {event_type:?} is missing required field {field:?}"
))
})
}
fn required_str<'a>(
data: &'a Value,
field: &str,
event_type: &str,
) -> Result<&'a str, CellosError> {
required_field(data, field, event_type)?
.as_str()
.ok_or_else(|| {
CellosError::Lifecycle(format!(
"event {event_type:?} field {field:?} must be a string"
))
})
}
fn required_i32(data: &Value, field: &str, event_type: &str) -> Result<i32, CellosError> {
let raw = required_field(data, field, event_type)?;
let value = raw.as_i64().ok_or_else(|| {
CellosError::Lifecycle(format!(
"event {event_type:?} field {field:?} must be an integer"
))
})?;
i32::try_from(value).map_err(|_| {
CellosError::Lifecycle(format!(
"event {event_type:?} field {field:?} is out of range for i32"
))
})
}
fn optional_string(data: &Value, field: &str) -> Option<String> {
data.get(field).and_then(Value::as_str).map(str::to_string)
}
fn optional_u64(data: &Value, field: &str) -> Option<u64> {
data.get(field).and_then(Value::as_u64)
}
fn optional_placement(data: &Value) -> Result<Option<PlacementSpec>, CellosError> {
let Some(value) = data.get("placement") else {
return Ok(None);
};
serde_json::from_value(value.clone())
.map(Some)
.map_err(|e| CellosError::Lifecycle(format!("parse field \"placement\": {e}")))
}
fn optional_enum<T>(data: &Value, field: &str) -> Result<Option<T>, CellosError>
where
T: serde::de::DeserializeOwned,
{
let Some(value) = data.get(field) else {
return Ok(None);
};
serde_json::from_value(value.clone())
.map(Some)
.map_err(|e| CellosError::Lifecycle(format!("parse field {field:?}: {e}")))
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn projects_happy_path_to_destroyed() {
let mut projection = CellStateProjection::default();
assert!(projection
.apply(&event(
"1",
STARTED,
json!({
"cellId": "cell-1",
"specId": "spec-1",
"runId": "run-1"
})
))
.unwrap());
assert!(projection
.apply(&event(
"2",
IDENTITY_MATERIALIZED,
json!({
"cellId": "cell-1",
"specId": "spec-1",
"runId": "run-1"
})
))
.unwrap());
assert!(projection
.apply(&event(
"3",
COMMAND_COMPLETED,
json!({
"cellId": "cell-1",
"specId": "spec-1",
"runId": "run-1",
"exitCode": 0
})
))
.unwrap());
assert!(projection
.apply(&event(
"4",
EXPORT_COMPLETED,
json!({
"cellId": "cell-1",
"specId": "spec-1",
"runId": "run-1",
"targetKind": "http",
"targetName": "artifact-bucket",
"destination": "https://example.test/a.txt",
"bytesWritten": 42
})
))
.unwrap());
assert!(projection
.apply(&event(
"5",
DESTROYED,
json!({
"cellId": "cell-1",
"specId": "spec-1",
"runId": "run-1",
"outcome": "succeeded"
})
))
.unwrap());
assert_eq!(
projection.current_state(),
ProjectionCurrentState::Destroyed
);
assert_eq!(projection.processed_events, 5);
assert_eq!(projection.exports.len(), 1);
assert_eq!(projection.exports[0].bytes_written, Some(42));
}
#[test]
fn duplicate_event_id_is_ignored() {
let mut projection = CellStateProjection::default();
let started = event(
"1",
STARTED,
json!({
"cellId": "cell-1",
"specId": "spec-1"
}),
);
assert!(projection.apply(&started).unwrap());
assert!(!projection.apply(&started).unwrap());
assert_eq!(projection.processed_events, 1);
assert_eq!(projection.current_state(), ProjectionCurrentState::Started);
}
#[test]
fn export_before_command_is_rejected() {
let mut projection = CellStateProjection::default();
projection
.apply(&event(
"1",
STARTED,
json!({
"cellId": "cell-1",
"specId": "spec-1"
}),
))
.unwrap();
let err = projection
.apply(&event(
"2",
EXPORT_COMPLETED,
json!({
"cellId": "cell-1",
"specId": "spec-1",
"targetKind": "s3"
}),
))
.unwrap_err();
assert!(err.to_string().contains("command has not completed"));
}
#[test]
fn identity_materialize_failure_becomes_identity_failed_state() {
let mut projection = CellStateProjection::default();
projection
.apply(&event(
"1",
STARTED,
json!({
"cellId": "cell-1",
"specId": "spec-1"
}),
))
.unwrap();
projection
.apply(&event(
"2",
IDENTITY_FAILED,
json!({
"cellId": "cell-1",
"specId": "spec-1",
"operation": "materialize",
"reason": "missing oidc token"
}),
))
.unwrap();
assert_eq!(
projection.current_state(),
ProjectionCurrentState::IdentityFailed
);
assert_eq!(projection.last_error.as_deref(), Some("missing oidc token"));
}
#[test]
fn snapshot_includes_current_state() {
let mut projection = CellStateProjection::default();
projection
.apply(&event(
"1",
STARTED,
json!({
"cellId": "cell-1",
"specId": "spec-1"
}),
))
.unwrap();
let snapshot = projection.snapshot();
assert_eq!(snapshot.current_state, ProjectionCurrentState::Started);
assert_eq!(snapshot.cell_id.as_deref(), Some("cell-1"));
}
#[test]
fn snapshot_carries_placement_from_started_event() {
let mut projection = CellStateProjection::default();
projection
.apply(&event(
"1",
STARTED,
json!({
"cellId": "cell-1",
"specId": "spec-1",
"placement": {
"poolId": "runner-pool-amd64",
"queueName": "ci-high"
}
}),
))
.unwrap();
let snapshot = projection.snapshot();
assert_eq!(
snapshot
.placement
.as_ref()
.and_then(|placement| placement.pool_id.as_deref()),
Some("runner-pool-amd64")
);
assert_eq!(
snapshot
.placement
.as_ref()
.and_then(|placement| placement.queue_name.as_deref()),
Some("ci-high")
);
}
fn event(id: &str, ty: &str, data: Value) -> CloudEventV1 {
CloudEventV1 {
specversion: "1.0".into(),
id: id.into(),
source: "urn:test".into(),
ty: ty.into(),
datacontenttype: Some("application/json".into()),
data: Some(data),
time: None,
traceparent: None,
}
}
}