Skip to main content

cellos_core/
state_projection.rs

1//! Event-driven cell state projection over versioned CloudEvents.
2//!
3//! This module keeps the state-machine logic in `cellos-core` so future fleet
4//! and control-plane code can share one canonical reducer instead of inferring
5//! current state independently.
6
7use std::collections::BTreeSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12use crate::{
13    CellosError, CloudEventV1, ExportReceiptTargetKind, IdentityFailureOperation, PlacementSpec,
14};
15
16const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
17const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
18const IDENTITY_MATERIALIZED: &str = "dev.cellos.events.cell.identity.v1.materialized";
19const IDENTITY_FAILED: &str = "dev.cellos.events.cell.identity.v1.failed";
20const IDENTITY_REVOKED: &str = "dev.cellos.events.cell.identity.v1.revoked";
21const COMMAND_COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
22const EXPORT_COMPLETED: &str = "dev.cellos.events.cell.export.v2.completed";
23const EXPORT_FAILED: &str = "dev.cellos.events.cell.export.v2.failed";
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub enum ProjectionLifecycleStage {
28    Pending,
29    Started,
30    CommandCompleted,
31    Destroyed,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "camelCase")]
36pub enum ProjectionIdentityStage {
37    Unknown,
38    Materialized,
39    MaterializeFailed,
40    Revoked,
41    RevokeFailed,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub enum ProjectionExportStage {
47    None,
48    Completed,
49    Failed,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
53#[serde(rename_all = "camelCase")]
54pub enum ProjectionCurrentState {
55    Pending,
56    Started,
57    IdentityReady,
58    IdentityFailed,
59    CommandSucceeded,
60    CommandFailed,
61    ExportSucceeded,
62    ExportFailed,
63    Destroyed,
64    DestroyFailed,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct ExportProjectionRecord {
70    pub target_kind: Option<ExportReceiptTargetKind>,
71    pub target_name: Option<String>,
72    pub destination: Option<String>,
73    pub bytes_written: Option<u64>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct CellStateProjection {
79    pub spec_id: Option<String>,
80    pub cell_id: Option<String>,
81    pub run_id: Option<String>,
82    pub placement: Option<PlacementSpec>,
83    pub lifecycle_stage: ProjectionLifecycleStage,
84    pub identity_stage: ProjectionIdentityStage,
85    pub export_stage: ProjectionExportStage,
86    pub command_exit_code: Option<i32>,
87    pub destroy_reason: Option<String>,
88    pub last_error: Option<String>,
89    pub exports: Vec<ExportProjectionRecord>,
90    pub processed_events: u64,
91    #[serde(skip)]
92    applied_event_ids: BTreeSet<String>,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct CellStateSnapshot {
98    pub spec_id: Option<String>,
99    pub cell_id: Option<String>,
100    pub run_id: Option<String>,
101    pub placement: Option<PlacementSpec>,
102    pub lifecycle_stage: ProjectionLifecycleStage,
103    pub identity_stage: ProjectionIdentityStage,
104    pub export_stage: ProjectionExportStage,
105    pub current_state: ProjectionCurrentState,
106    pub command_exit_code: Option<i32>,
107    pub destroy_reason: Option<String>,
108    pub last_error: Option<String>,
109    pub exports: Vec<ExportProjectionRecord>,
110    pub processed_events: u64,
111}
112
113impl Default for CellStateProjection {
114    fn default() -> Self {
115        Self {
116            spec_id: None,
117            cell_id: None,
118            run_id: None,
119            placement: None,
120            lifecycle_stage: ProjectionLifecycleStage::Pending,
121            identity_stage: ProjectionIdentityStage::Unknown,
122            export_stage: ProjectionExportStage::None,
123            command_exit_code: None,
124            destroy_reason: None,
125            last_error: None,
126            exports: Vec::new(),
127            processed_events: 0,
128            applied_event_ids: BTreeSet::new(),
129        }
130    }
131}
132
133impl CellStateProjection {
134    pub fn apply(&mut self, event: &CloudEventV1) -> Result<bool, CellosError> {
135        if !self.applied_event_ids.insert(event.id.clone()) {
136            return Ok(false);
137        }
138
139        let Some(data) = event.data.as_ref() else {
140            self.applied_event_ids.remove(&event.id);
141            return Err(CellosError::Lifecycle(format!(
142                "event {:?} is missing data payload",
143                event.ty
144            )));
145        };
146
147        let is_cell_event = self.try_bind_identity(data)?;
148        if !is_cell_event {
149            self.applied_event_ids.remove(&event.id);
150            return Ok(false);
151        }
152
153        match event.ty.as_str() {
154            STARTED => self.apply_started(),
155            IDENTITY_MATERIALIZED => self.apply_identity_materialized(),
156            IDENTITY_FAILED => self.apply_identity_failed(data),
157            IDENTITY_REVOKED => self.apply_identity_revoked(),
158            COMMAND_COMPLETED => self.apply_command_completed(data),
159            EXPORT_COMPLETED => self.apply_export_completed(data),
160            EXPORT_FAILED => self.apply_export_failed(data),
161            DESTROYED => self.apply_destroyed(data),
162            _ => {
163                self.applied_event_ids.remove(&event.id);
164                return Ok(false);
165            }
166        }?;
167
168        self.processed_events += 1;
169        Ok(true)
170    }
171
172    pub fn current_state(&self) -> ProjectionCurrentState {
173        if self.lifecycle_stage == ProjectionLifecycleStage::Destroyed {
174            if self.destroy_reason.is_some() {
175                return ProjectionCurrentState::DestroyFailed;
176            }
177            return ProjectionCurrentState::Destroyed;
178        }
179
180        match self.export_stage {
181            ProjectionExportStage::Completed => return ProjectionCurrentState::ExportSucceeded,
182            ProjectionExportStage::Failed => return ProjectionCurrentState::ExportFailed,
183            ProjectionExportStage::None => {}
184        }
185
186        if let Some(exit_code) = self.command_exit_code {
187            return if exit_code == 0 {
188                ProjectionCurrentState::CommandSucceeded
189            } else {
190                ProjectionCurrentState::CommandFailed
191            };
192        }
193
194        match self.identity_stage {
195            ProjectionIdentityStage::Materialized => ProjectionCurrentState::IdentityReady,
196            ProjectionIdentityStage::MaterializeFailed => ProjectionCurrentState::IdentityFailed,
197            _ if self.lifecycle_stage == ProjectionLifecycleStage::Started => {
198                ProjectionCurrentState::Started
199            }
200            _ => ProjectionCurrentState::Pending,
201        }
202    }
203
204    pub fn snapshot(&self) -> CellStateSnapshot {
205        CellStateSnapshot::from(self)
206    }
207
208    fn apply_started(&mut self) -> Result<(), CellosError> {
209        if self.lifecycle_stage != ProjectionLifecycleStage::Pending {
210            return Err(illegal_transition("started", self));
211        }
212        self.lifecycle_stage = ProjectionLifecycleStage::Started;
213        Ok(())
214    }
215
216    fn apply_identity_materialized(&mut self) -> Result<(), CellosError> {
217        require_started(self, IDENTITY_MATERIALIZED)?;
218        if self.identity_stage == ProjectionIdentityStage::MaterializeFailed {
219            return Err(illegal_transition(
220                "identity.materialized after materialize failure",
221                self,
222            ));
223        }
224        self.identity_stage = ProjectionIdentityStage::Materialized;
225        Ok(())
226    }
227
228    fn apply_identity_failed(&mut self, data: &Value) -> Result<(), CellosError> {
229        require_started(self, IDENTITY_FAILED)?;
230        let operation = match required_str(data, "operation", IDENTITY_FAILED)? {
231            "materialize" => IdentityFailureOperation::Materialize,
232            "revoke" => IdentityFailureOperation::Revoke,
233            other => {
234                return Err(CellosError::Lifecycle(format!(
235                    "unknown identity failure operation {other:?}"
236                )))
237            }
238        };
239        let reason = required_str(data, "reason", IDENTITY_FAILED)?;
240        self.last_error = Some(reason.to_string());
241        match operation {
242            IdentityFailureOperation::Materialize => {
243                if self.command_exit_code.is_some() {
244                    return Err(illegal_transition(
245                        "identity.materialize failure after command completion",
246                        self,
247                    ));
248                }
249                self.identity_stage = ProjectionIdentityStage::MaterializeFailed;
250            }
251            IdentityFailureOperation::Revoke => {
252                self.identity_stage = ProjectionIdentityStage::RevokeFailed;
253            }
254        }
255        Ok(())
256    }
257
258    fn apply_identity_revoked(&mut self) -> Result<(), CellosError> {
259        require_started(self, IDENTITY_REVOKED)?;
260        self.identity_stage = ProjectionIdentityStage::Revoked;
261        Ok(())
262    }
263
264    fn apply_command_completed(&mut self, data: &Value) -> Result<(), CellosError> {
265        require_started(self, COMMAND_COMPLETED)?;
266        if self.command_exit_code.is_some() {
267            return Err(illegal_transition("command.completed twice", self));
268        }
269        if self.identity_stage == ProjectionIdentityStage::MaterializeFailed {
270            return Err(illegal_transition(
271                "command.completed after identity materialize failure",
272                self,
273            ));
274        }
275        self.command_exit_code = Some(required_i32(data, "exitCode", COMMAND_COMPLETED)?);
276        self.lifecycle_stage = ProjectionLifecycleStage::CommandCompleted;
277        Ok(())
278    }
279
280    fn apply_export_completed(&mut self, data: &Value) -> Result<(), CellosError> {
281        require_command_completed(self, EXPORT_COMPLETED)?;
282        self.export_stage = ProjectionExportStage::Completed;
283        self.exports.push(ExportProjectionRecord {
284            target_kind: optional_enum(data, "targetKind")?,
285            target_name: optional_string(data, "targetName"),
286            destination: optional_string(data, "destination"),
287            bytes_written: optional_u64(data, "bytesWritten"),
288        });
289        Ok(())
290    }
291
292    fn apply_export_failed(&mut self, data: &Value) -> Result<(), CellosError> {
293        require_command_completed(self, EXPORT_FAILED)?;
294        self.export_stage = ProjectionExportStage::Failed;
295        self.last_error = Some(required_str(data, "reason", EXPORT_FAILED)?.to_string());
296        Ok(())
297    }
298
299    fn apply_destroyed(&mut self, data: &Value) -> Result<(), CellosError> {
300        if self.lifecycle_stage == ProjectionLifecycleStage::Pending {
301            return Err(illegal_transition("destroyed before started", self));
302        }
303        self.lifecycle_stage = ProjectionLifecycleStage::Destroyed;
304        self.destroy_reason = optional_string(data, "reason");
305        if let Some(reason) = self.destroy_reason.clone() {
306            self.last_error = Some(reason);
307        }
308        Ok(())
309    }
310
311    fn try_bind_identity(&mut self, data: &Value) -> Result<bool, CellosError> {
312        let spec_id = optional_string(data, "specId");
313        let cell_id = optional_string(data, "cellId");
314        if spec_id.is_none() && cell_id.is_none() {
315            return Ok(false);
316        }
317        bind_optional("specId", &mut self.spec_id, spec_id.as_deref())?;
318        bind_optional("cellId", &mut self.cell_id, cell_id.as_deref())?;
319        bind_optional(
320            "runId",
321            &mut self.run_id,
322            optional_string(data, "runId").as_deref(),
323        )?;
324        bind_optional_placement(&mut self.placement, optional_placement(data)?)?;
325        Ok(true)
326    }
327}
328
329impl From<&CellStateProjection> for CellStateSnapshot {
330    fn from(value: &CellStateProjection) -> Self {
331        Self {
332            spec_id: value.spec_id.clone(),
333            cell_id: value.cell_id.clone(),
334            run_id: value.run_id.clone(),
335            placement: value.placement.clone(),
336            lifecycle_stage: value.lifecycle_stage,
337            identity_stage: value.identity_stage,
338            export_stage: value.export_stage,
339            current_state: value.current_state(),
340            command_exit_code: value.command_exit_code,
341            destroy_reason: value.destroy_reason.clone(),
342            last_error: value.last_error.clone(),
343            exports: value.exports.clone(),
344            processed_events: value.processed_events,
345        }
346    }
347}
348
349fn bind_optional(
350    field: &str,
351    slot: &mut Option<String>,
352    observed: Option<&str>,
353) -> Result<(), CellosError> {
354    let Some(observed) = observed else {
355        return Ok(());
356    };
357    match slot {
358        Some(existing) if existing != observed => Err(CellosError::Lifecycle(format!(
359            "projection saw mismatched {field}: existing={existing:?}, observed={observed:?}"
360        ))),
361        Some(_) => Ok(()),
362        None => {
363            *slot = Some(observed.to_string());
364            Ok(())
365        }
366    }
367}
368
369fn bind_optional_placement(
370    slot: &mut Option<PlacementSpec>,
371    observed: Option<PlacementSpec>,
372) -> Result<(), CellosError> {
373    let Some(observed) = observed else {
374        return Ok(());
375    };
376    match slot {
377        Some(existing) if existing != &observed => Err(CellosError::Lifecycle(format!(
378            "projection saw mismatched placement: existing={existing:?}, observed={observed:?}"
379        ))),
380        Some(_) => Ok(()),
381        None => {
382            *slot = Some(observed);
383            Ok(())
384        }
385    }
386}
387
388fn require_started(projection: &CellStateProjection, event_type: &str) -> Result<(), CellosError> {
389    if projection.lifecycle_stage == ProjectionLifecycleStage::Pending
390        || projection.lifecycle_stage == ProjectionLifecycleStage::Destroyed
391    {
392        return Err(CellosError::Lifecycle(format!(
393            "illegal transition for {event_type}: lifecycle stage is {:?}",
394            projection.lifecycle_stage
395        )));
396    }
397    Ok(())
398}
399
400fn require_command_completed(
401    projection: &CellStateProjection,
402    event_type: &str,
403) -> Result<(), CellosError> {
404    if projection.lifecycle_stage != ProjectionLifecycleStage::CommandCompleted {
405        return Err(CellosError::Lifecycle(format!(
406            "illegal transition for {event_type}: command has not completed"
407        )));
408    }
409    Ok(())
410}
411
412fn illegal_transition(label: &str, projection: &CellStateProjection) -> CellosError {
413    CellosError::Lifecycle(format!(
414        "illegal projection transition: {label}; lifecycle={:?}, identity={:?}, export={:?}, exit_code={:?}",
415        projection.lifecycle_stage,
416        projection.identity_stage,
417        projection.export_stage,
418        projection.command_exit_code,
419    ))
420}
421
422fn required_field<'a>(
423    data: &'a Value,
424    field: &str,
425    event_type: &str,
426) -> Result<&'a Value, CellosError> {
427    data.get(field).ok_or_else(|| {
428        CellosError::Lifecycle(format!(
429            "event {event_type:?} is missing required field {field:?}"
430        ))
431    })
432}
433
434fn required_str<'a>(
435    data: &'a Value,
436    field: &str,
437    event_type: &str,
438) -> Result<&'a str, CellosError> {
439    required_field(data, field, event_type)?
440        .as_str()
441        .ok_or_else(|| {
442            CellosError::Lifecycle(format!(
443                "event {event_type:?} field {field:?} must be a string"
444            ))
445        })
446}
447
448fn required_i32(data: &Value, field: &str, event_type: &str) -> Result<i32, CellosError> {
449    let raw = required_field(data, field, event_type)?;
450    let value = raw.as_i64().ok_or_else(|| {
451        CellosError::Lifecycle(format!(
452            "event {event_type:?} field {field:?} must be an integer"
453        ))
454    })?;
455    i32::try_from(value).map_err(|_| {
456        CellosError::Lifecycle(format!(
457            "event {event_type:?} field {field:?} is out of range for i32"
458        ))
459    })
460}
461
462fn optional_string(data: &Value, field: &str) -> Option<String> {
463    data.get(field).and_then(Value::as_str).map(str::to_string)
464}
465
466fn optional_u64(data: &Value, field: &str) -> Option<u64> {
467    data.get(field).and_then(Value::as_u64)
468}
469
470fn optional_placement(data: &Value) -> Result<Option<PlacementSpec>, CellosError> {
471    let Some(value) = data.get("placement") else {
472        return Ok(None);
473    };
474    serde_json::from_value(value.clone())
475        .map(Some)
476        .map_err(|e| CellosError::Lifecycle(format!("parse field \"placement\": {e}")))
477}
478
479fn optional_enum<T>(data: &Value, field: &str) -> Result<Option<T>, CellosError>
480where
481    T: serde::de::DeserializeOwned,
482{
483    let Some(value) = data.get(field) else {
484        return Ok(None);
485    };
486    serde_json::from_value(value.clone())
487        .map(Some)
488        .map_err(|e| CellosError::Lifecycle(format!("parse field {field:?}: {e}")))
489}
490
491#[cfg(test)]
492mod tests {
493    use serde_json::json;
494
495    use super::*;
496
497    #[test]
498    fn projects_happy_path_to_destroyed() {
499        let mut projection = CellStateProjection::default();
500
501        assert!(projection
502            .apply(&event(
503                "1",
504                STARTED,
505                json!({
506                    "cellId": "cell-1",
507                    "specId": "spec-1",
508                    "runId": "run-1"
509                })
510            ))
511            .unwrap());
512        assert!(projection
513            .apply(&event(
514                "2",
515                IDENTITY_MATERIALIZED,
516                json!({
517                    "cellId": "cell-1",
518                    "specId": "spec-1",
519                    "runId": "run-1"
520                })
521            ))
522            .unwrap());
523        assert!(projection
524            .apply(&event(
525                "3",
526                COMMAND_COMPLETED,
527                json!({
528                    "cellId": "cell-1",
529                    "specId": "spec-1",
530                    "runId": "run-1",
531                    "exitCode": 0
532                })
533            ))
534            .unwrap());
535        assert!(projection
536            .apply(&event(
537                "4",
538                EXPORT_COMPLETED,
539                json!({
540                    "cellId": "cell-1",
541                    "specId": "spec-1",
542                    "runId": "run-1",
543                    "targetKind": "http",
544                    "targetName": "artifact-bucket",
545                    "destination": "https://example.test/a.txt",
546                    "bytesWritten": 42
547                })
548            ))
549            .unwrap());
550        assert!(projection
551            .apply(&event(
552                "5",
553                DESTROYED,
554                json!({
555                    "cellId": "cell-1",
556                    "specId": "spec-1",
557                    "runId": "run-1",
558                    "outcome": "succeeded"
559                })
560            ))
561            .unwrap());
562
563        assert_eq!(
564            projection.current_state(),
565            ProjectionCurrentState::Destroyed
566        );
567        assert_eq!(projection.processed_events, 5);
568        assert_eq!(projection.exports.len(), 1);
569        assert_eq!(projection.exports[0].bytes_written, Some(42));
570    }
571
572    #[test]
573    fn duplicate_event_id_is_ignored() {
574        let mut projection = CellStateProjection::default();
575        let started = event(
576            "1",
577            STARTED,
578            json!({
579                "cellId": "cell-1",
580                "specId": "spec-1"
581            }),
582        );
583
584        assert!(projection.apply(&started).unwrap());
585        assert!(!projection.apply(&started).unwrap());
586        assert_eq!(projection.processed_events, 1);
587        assert_eq!(projection.current_state(), ProjectionCurrentState::Started);
588    }
589
590    #[test]
591    fn export_before_command_is_rejected() {
592        let mut projection = CellStateProjection::default();
593        projection
594            .apply(&event(
595                "1",
596                STARTED,
597                json!({
598                    "cellId": "cell-1",
599                    "specId": "spec-1"
600                }),
601            ))
602            .unwrap();
603
604        let err = projection
605            .apply(&event(
606                "2",
607                EXPORT_COMPLETED,
608                json!({
609                    "cellId": "cell-1",
610                    "specId": "spec-1",
611                    "targetKind": "s3"
612                }),
613            ))
614            .unwrap_err();
615
616        assert!(err.to_string().contains("command has not completed"));
617    }
618
619    #[test]
620    fn identity_materialize_failure_becomes_identity_failed_state() {
621        let mut projection = CellStateProjection::default();
622        projection
623            .apply(&event(
624                "1",
625                STARTED,
626                json!({
627                    "cellId": "cell-1",
628                    "specId": "spec-1"
629                }),
630            ))
631            .unwrap();
632        projection
633            .apply(&event(
634                "2",
635                IDENTITY_FAILED,
636                json!({
637                    "cellId": "cell-1",
638                    "specId": "spec-1",
639                    "operation": "materialize",
640                    "reason": "missing oidc token"
641                }),
642            ))
643            .unwrap();
644
645        assert_eq!(
646            projection.current_state(),
647            ProjectionCurrentState::IdentityFailed
648        );
649        assert_eq!(projection.last_error.as_deref(), Some("missing oidc token"));
650    }
651
652    #[test]
653    fn snapshot_includes_current_state() {
654        let mut projection = CellStateProjection::default();
655        projection
656            .apply(&event(
657                "1",
658                STARTED,
659                json!({
660                    "cellId": "cell-1",
661                    "specId": "spec-1"
662                }),
663            ))
664            .unwrap();
665
666        let snapshot = projection.snapshot();
667        assert_eq!(snapshot.current_state, ProjectionCurrentState::Started);
668        assert_eq!(snapshot.cell_id.as_deref(), Some("cell-1"));
669    }
670
671    #[test]
672    fn snapshot_carries_placement_from_started_event() {
673        let mut projection = CellStateProjection::default();
674        projection
675            .apply(&event(
676                "1",
677                STARTED,
678                json!({
679                    "cellId": "cell-1",
680                    "specId": "spec-1",
681                    "placement": {
682                        "poolId": "runner-pool-amd64",
683                        "queueName": "ci-high"
684                    }
685                }),
686            ))
687            .unwrap();
688
689        let snapshot = projection.snapshot();
690        assert_eq!(
691            snapshot
692                .placement
693                .as_ref()
694                .and_then(|placement| placement.pool_id.as_deref()),
695            Some("runner-pool-amd64")
696        );
697        assert_eq!(
698            snapshot
699                .placement
700                .as_ref()
701                .and_then(|placement| placement.queue_name.as_deref()),
702            Some("ci-high")
703        );
704    }
705
706    fn event(id: &str, ty: &str, data: Value) -> CloudEventV1 {
707        CloudEventV1 {
708            specversion: "1.0".into(),
709            id: id.into(),
710            source: "urn:test".into(),
711            ty: ty.into(),
712            datacontenttype: Some("application/json".into()),
713            data: Some(data),
714            time: None,
715            traceparent: None,
716        }
717    }
718}