1use std::collections::BTreeSet;
8
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12use crate::{
13 CellosError, CloudEventV1, ExportReceiptTargetKind, IdentityFailureOperation, PlacementSpec,
14};
15
16const STARTED: &str = "dev.cellos.events.cell.lifecycle.v1.started";
17const DESTROYED: &str = "dev.cellos.events.cell.lifecycle.v1.destroyed";
18const IDENTITY_MATERIALIZED: &str = "dev.cellos.events.cell.identity.v1.materialized";
19const IDENTITY_FAILED: &str = "dev.cellos.events.cell.identity.v1.failed";
20const IDENTITY_REVOKED: &str = "dev.cellos.events.cell.identity.v1.revoked";
21const COMMAND_COMPLETED: &str = "dev.cellos.events.cell.command.v1.completed";
22const EXPORT_COMPLETED: &str = "dev.cellos.events.cell.export.v2.completed";
23const EXPORT_FAILED: &str = "dev.cellos.events.cell.export.v2.failed";
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26#[serde(rename_all = "camelCase")]
27pub enum ProjectionLifecycleStage {
28 Pending,
29 Started,
30 CommandCompleted,
31 Destroyed,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35#[serde(rename_all = "camelCase")]
36pub enum ProjectionIdentityStage {
37 Unknown,
38 Materialized,
39 MaterializeFailed,
40 Revoked,
41 RevokeFailed,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub enum ProjectionExportStage {
47 None,
48 Completed,
49 Failed,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
53#[serde(rename_all = "camelCase")]
54pub enum ProjectionCurrentState {
55 Pending,
56 Started,
57 IdentityReady,
58 IdentityFailed,
59 CommandSucceeded,
60 CommandFailed,
61 ExportSucceeded,
62 ExportFailed,
63 Destroyed,
64 DestroyFailed,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct ExportProjectionRecord {
70 pub target_kind: Option<ExportReceiptTargetKind>,
71 pub target_name: Option<String>,
72 pub destination: Option<String>,
73 pub bytes_written: Option<u64>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct CellStateProjection {
79 pub spec_id: Option<String>,
80 pub cell_id: Option<String>,
81 pub run_id: Option<String>,
82 pub placement: Option<PlacementSpec>,
83 pub lifecycle_stage: ProjectionLifecycleStage,
84 pub identity_stage: ProjectionIdentityStage,
85 pub export_stage: ProjectionExportStage,
86 pub command_exit_code: Option<i32>,
87 pub destroy_reason: Option<String>,
88 pub last_error: Option<String>,
89 pub exports: Vec<ExportProjectionRecord>,
90 pub processed_events: u64,
91 #[serde(skip)]
92 applied_event_ids: BTreeSet<String>,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct CellStateSnapshot {
98 pub spec_id: Option<String>,
99 pub cell_id: Option<String>,
100 pub run_id: Option<String>,
101 pub placement: Option<PlacementSpec>,
102 pub lifecycle_stage: ProjectionLifecycleStage,
103 pub identity_stage: ProjectionIdentityStage,
104 pub export_stage: ProjectionExportStage,
105 pub current_state: ProjectionCurrentState,
106 pub command_exit_code: Option<i32>,
107 pub destroy_reason: Option<String>,
108 pub last_error: Option<String>,
109 pub exports: Vec<ExportProjectionRecord>,
110 pub processed_events: u64,
111}
112
113impl Default for CellStateProjection {
114 fn default() -> Self {
115 Self {
116 spec_id: None,
117 cell_id: None,
118 run_id: None,
119 placement: None,
120 lifecycle_stage: ProjectionLifecycleStage::Pending,
121 identity_stage: ProjectionIdentityStage::Unknown,
122 export_stage: ProjectionExportStage::None,
123 command_exit_code: None,
124 destroy_reason: None,
125 last_error: None,
126 exports: Vec::new(),
127 processed_events: 0,
128 applied_event_ids: BTreeSet::new(),
129 }
130 }
131}
132
133impl CellStateProjection {
134 pub fn apply(&mut self, event: &CloudEventV1) -> Result<bool, CellosError> {
135 if !self.applied_event_ids.insert(event.id.clone()) {
136 return Ok(false);
137 }
138
139 let Some(data) = event.data.as_ref() else {
140 self.applied_event_ids.remove(&event.id);
141 return Err(CellosError::Lifecycle(format!(
142 "event {:?} is missing data payload",
143 event.ty
144 )));
145 };
146
147 let is_cell_event = self.try_bind_identity(data)?;
148 if !is_cell_event {
149 self.applied_event_ids.remove(&event.id);
150 return Ok(false);
151 }
152
153 match event.ty.as_str() {
154 STARTED => self.apply_started(),
155 IDENTITY_MATERIALIZED => self.apply_identity_materialized(),
156 IDENTITY_FAILED => self.apply_identity_failed(data),
157 IDENTITY_REVOKED => self.apply_identity_revoked(),
158 COMMAND_COMPLETED => self.apply_command_completed(data),
159 EXPORT_COMPLETED => self.apply_export_completed(data),
160 EXPORT_FAILED => self.apply_export_failed(data),
161 DESTROYED => self.apply_destroyed(data),
162 _ => {
163 self.applied_event_ids.remove(&event.id);
164 return Ok(false);
165 }
166 }?;
167
168 self.processed_events += 1;
169 Ok(true)
170 }
171
172 pub fn current_state(&self) -> ProjectionCurrentState {
173 if self.lifecycle_stage == ProjectionLifecycleStage::Destroyed {
174 if self.destroy_reason.is_some() {
175 return ProjectionCurrentState::DestroyFailed;
176 }
177 return ProjectionCurrentState::Destroyed;
178 }
179
180 match self.export_stage {
181 ProjectionExportStage::Completed => return ProjectionCurrentState::ExportSucceeded,
182 ProjectionExportStage::Failed => return ProjectionCurrentState::ExportFailed,
183 ProjectionExportStage::None => {}
184 }
185
186 if let Some(exit_code) = self.command_exit_code {
187 return if exit_code == 0 {
188 ProjectionCurrentState::CommandSucceeded
189 } else {
190 ProjectionCurrentState::CommandFailed
191 };
192 }
193
194 match self.identity_stage {
195 ProjectionIdentityStage::Materialized => ProjectionCurrentState::IdentityReady,
196 ProjectionIdentityStage::MaterializeFailed => ProjectionCurrentState::IdentityFailed,
197 _ if self.lifecycle_stage == ProjectionLifecycleStage::Started => {
198 ProjectionCurrentState::Started
199 }
200 _ => ProjectionCurrentState::Pending,
201 }
202 }
203
204 pub fn snapshot(&self) -> CellStateSnapshot {
205 CellStateSnapshot::from(self)
206 }
207
208 fn apply_started(&mut self) -> Result<(), CellosError> {
209 if self.lifecycle_stage != ProjectionLifecycleStage::Pending {
210 return Err(illegal_transition("started", self));
211 }
212 self.lifecycle_stage = ProjectionLifecycleStage::Started;
213 Ok(())
214 }
215
216 fn apply_identity_materialized(&mut self) -> Result<(), CellosError> {
217 require_started(self, IDENTITY_MATERIALIZED)?;
218 if self.identity_stage == ProjectionIdentityStage::MaterializeFailed {
219 return Err(illegal_transition(
220 "identity.materialized after materialize failure",
221 self,
222 ));
223 }
224 self.identity_stage = ProjectionIdentityStage::Materialized;
225 Ok(())
226 }
227
228 fn apply_identity_failed(&mut self, data: &Value) -> Result<(), CellosError> {
229 require_started(self, IDENTITY_FAILED)?;
230 let operation = match required_str(data, "operation", IDENTITY_FAILED)? {
231 "materialize" => IdentityFailureOperation::Materialize,
232 "revoke" => IdentityFailureOperation::Revoke,
233 other => {
234 return Err(CellosError::Lifecycle(format!(
235 "unknown identity failure operation {other:?}"
236 )))
237 }
238 };
239 let reason = required_str(data, "reason", IDENTITY_FAILED)?;
240 self.last_error = Some(reason.to_string());
241 match operation {
242 IdentityFailureOperation::Materialize => {
243 if self.command_exit_code.is_some() {
244 return Err(illegal_transition(
245 "identity.materialize failure after command completion",
246 self,
247 ));
248 }
249 self.identity_stage = ProjectionIdentityStage::MaterializeFailed;
250 }
251 IdentityFailureOperation::Revoke => {
252 self.identity_stage = ProjectionIdentityStage::RevokeFailed;
253 }
254 }
255 Ok(())
256 }
257
258 fn apply_identity_revoked(&mut self) -> Result<(), CellosError> {
259 require_started(self, IDENTITY_REVOKED)?;
260 self.identity_stage = ProjectionIdentityStage::Revoked;
261 Ok(())
262 }
263
264 fn apply_command_completed(&mut self, data: &Value) -> Result<(), CellosError> {
265 require_started(self, COMMAND_COMPLETED)?;
266 if self.command_exit_code.is_some() {
267 return Err(illegal_transition("command.completed twice", self));
268 }
269 if self.identity_stage == ProjectionIdentityStage::MaterializeFailed {
270 return Err(illegal_transition(
271 "command.completed after identity materialize failure",
272 self,
273 ));
274 }
275 self.command_exit_code = Some(required_i32(data, "exitCode", COMMAND_COMPLETED)?);
276 self.lifecycle_stage = ProjectionLifecycleStage::CommandCompleted;
277 Ok(())
278 }
279
280 fn apply_export_completed(&mut self, data: &Value) -> Result<(), CellosError> {
281 require_command_completed(self, EXPORT_COMPLETED)?;
282 self.export_stage = ProjectionExportStage::Completed;
283 self.exports.push(ExportProjectionRecord {
284 target_kind: optional_enum(data, "targetKind")?,
285 target_name: optional_string(data, "targetName"),
286 destination: optional_string(data, "destination"),
287 bytes_written: optional_u64(data, "bytesWritten"),
288 });
289 Ok(())
290 }
291
292 fn apply_export_failed(&mut self, data: &Value) -> Result<(), CellosError> {
293 require_command_completed(self, EXPORT_FAILED)?;
294 self.export_stage = ProjectionExportStage::Failed;
295 self.last_error = Some(required_str(data, "reason", EXPORT_FAILED)?.to_string());
296 Ok(())
297 }
298
299 fn apply_destroyed(&mut self, data: &Value) -> Result<(), CellosError> {
300 if self.lifecycle_stage == ProjectionLifecycleStage::Pending {
301 return Err(illegal_transition("destroyed before started", self));
302 }
303 self.lifecycle_stage = ProjectionLifecycleStage::Destroyed;
304 self.destroy_reason = optional_string(data, "reason");
305 if let Some(reason) = self.destroy_reason.clone() {
306 self.last_error = Some(reason);
307 }
308 Ok(())
309 }
310
311 fn try_bind_identity(&mut self, data: &Value) -> Result<bool, CellosError> {
312 let spec_id = optional_string(data, "specId");
313 let cell_id = optional_string(data, "cellId");
314 if spec_id.is_none() && cell_id.is_none() {
315 return Ok(false);
316 }
317 bind_optional("specId", &mut self.spec_id, spec_id.as_deref())?;
318 bind_optional("cellId", &mut self.cell_id, cell_id.as_deref())?;
319 bind_optional(
320 "runId",
321 &mut self.run_id,
322 optional_string(data, "runId").as_deref(),
323 )?;
324 bind_optional_placement(&mut self.placement, optional_placement(data)?)?;
325 Ok(true)
326 }
327}
328
329impl From<&CellStateProjection> for CellStateSnapshot {
330 fn from(value: &CellStateProjection) -> Self {
331 Self {
332 spec_id: value.spec_id.clone(),
333 cell_id: value.cell_id.clone(),
334 run_id: value.run_id.clone(),
335 placement: value.placement.clone(),
336 lifecycle_stage: value.lifecycle_stage,
337 identity_stage: value.identity_stage,
338 export_stage: value.export_stage,
339 current_state: value.current_state(),
340 command_exit_code: value.command_exit_code,
341 destroy_reason: value.destroy_reason.clone(),
342 last_error: value.last_error.clone(),
343 exports: value.exports.clone(),
344 processed_events: value.processed_events,
345 }
346 }
347}
348
349fn bind_optional(
350 field: &str,
351 slot: &mut Option<String>,
352 observed: Option<&str>,
353) -> Result<(), CellosError> {
354 let Some(observed) = observed else {
355 return Ok(());
356 };
357 match slot {
358 Some(existing) if existing != observed => Err(CellosError::Lifecycle(format!(
359 "projection saw mismatched {field}: existing={existing:?}, observed={observed:?}"
360 ))),
361 Some(_) => Ok(()),
362 None => {
363 *slot = Some(observed.to_string());
364 Ok(())
365 }
366 }
367}
368
369fn bind_optional_placement(
370 slot: &mut Option<PlacementSpec>,
371 observed: Option<PlacementSpec>,
372) -> Result<(), CellosError> {
373 let Some(observed) = observed else {
374 return Ok(());
375 };
376 match slot {
377 Some(existing) if existing != &observed => Err(CellosError::Lifecycle(format!(
378 "projection saw mismatched placement: existing={existing:?}, observed={observed:?}"
379 ))),
380 Some(_) => Ok(()),
381 None => {
382 *slot = Some(observed);
383 Ok(())
384 }
385 }
386}
387
388fn require_started(projection: &CellStateProjection, event_type: &str) -> Result<(), CellosError> {
389 if projection.lifecycle_stage == ProjectionLifecycleStage::Pending
390 || projection.lifecycle_stage == ProjectionLifecycleStage::Destroyed
391 {
392 return Err(CellosError::Lifecycle(format!(
393 "illegal transition for {event_type}: lifecycle stage is {:?}",
394 projection.lifecycle_stage
395 )));
396 }
397 Ok(())
398}
399
400fn require_command_completed(
401 projection: &CellStateProjection,
402 event_type: &str,
403) -> Result<(), CellosError> {
404 if projection.lifecycle_stage != ProjectionLifecycleStage::CommandCompleted {
405 return Err(CellosError::Lifecycle(format!(
406 "illegal transition for {event_type}: command has not completed"
407 )));
408 }
409 Ok(())
410}
411
412fn illegal_transition(label: &str, projection: &CellStateProjection) -> CellosError {
413 CellosError::Lifecycle(format!(
414 "illegal projection transition: {label}; lifecycle={:?}, identity={:?}, export={:?}, exit_code={:?}",
415 projection.lifecycle_stage,
416 projection.identity_stage,
417 projection.export_stage,
418 projection.command_exit_code,
419 ))
420}
421
422fn required_field<'a>(
423 data: &'a Value,
424 field: &str,
425 event_type: &str,
426) -> Result<&'a Value, CellosError> {
427 data.get(field).ok_or_else(|| {
428 CellosError::Lifecycle(format!(
429 "event {event_type:?} is missing required field {field:?}"
430 ))
431 })
432}
433
434fn required_str<'a>(
435 data: &'a Value,
436 field: &str,
437 event_type: &str,
438) -> Result<&'a str, CellosError> {
439 required_field(data, field, event_type)?
440 .as_str()
441 .ok_or_else(|| {
442 CellosError::Lifecycle(format!(
443 "event {event_type:?} field {field:?} must be a string"
444 ))
445 })
446}
447
448fn required_i32(data: &Value, field: &str, event_type: &str) -> Result<i32, CellosError> {
449 let raw = required_field(data, field, event_type)?;
450 let value = raw.as_i64().ok_or_else(|| {
451 CellosError::Lifecycle(format!(
452 "event {event_type:?} field {field:?} must be an integer"
453 ))
454 })?;
455 i32::try_from(value).map_err(|_| {
456 CellosError::Lifecycle(format!(
457 "event {event_type:?} field {field:?} is out of range for i32"
458 ))
459 })
460}
461
462fn optional_string(data: &Value, field: &str) -> Option<String> {
463 data.get(field).and_then(Value::as_str).map(str::to_string)
464}
465
466fn optional_u64(data: &Value, field: &str) -> Option<u64> {
467 data.get(field).and_then(Value::as_u64)
468}
469
470fn optional_placement(data: &Value) -> Result<Option<PlacementSpec>, CellosError> {
471 let Some(value) = data.get("placement") else {
472 return Ok(None);
473 };
474 serde_json::from_value(value.clone())
475 .map(Some)
476 .map_err(|e| CellosError::Lifecycle(format!("parse field \"placement\": {e}")))
477}
478
479fn optional_enum<T>(data: &Value, field: &str) -> Result<Option<T>, CellosError>
480where
481 T: serde::de::DeserializeOwned,
482{
483 let Some(value) = data.get(field) else {
484 return Ok(None);
485 };
486 serde_json::from_value(value.clone())
487 .map(Some)
488 .map_err(|e| CellosError::Lifecycle(format!("parse field {field:?}: {e}")))
489}
490
491#[cfg(test)]
492mod tests {
493 use serde_json::json;
494
495 use super::*;
496
497 #[test]
498 fn projects_happy_path_to_destroyed() {
499 let mut projection = CellStateProjection::default();
500
501 assert!(projection
502 .apply(&event(
503 "1",
504 STARTED,
505 json!({
506 "cellId": "cell-1",
507 "specId": "spec-1",
508 "runId": "run-1"
509 })
510 ))
511 .unwrap());
512 assert!(projection
513 .apply(&event(
514 "2",
515 IDENTITY_MATERIALIZED,
516 json!({
517 "cellId": "cell-1",
518 "specId": "spec-1",
519 "runId": "run-1"
520 })
521 ))
522 .unwrap());
523 assert!(projection
524 .apply(&event(
525 "3",
526 COMMAND_COMPLETED,
527 json!({
528 "cellId": "cell-1",
529 "specId": "spec-1",
530 "runId": "run-1",
531 "exitCode": 0
532 })
533 ))
534 .unwrap());
535 assert!(projection
536 .apply(&event(
537 "4",
538 EXPORT_COMPLETED,
539 json!({
540 "cellId": "cell-1",
541 "specId": "spec-1",
542 "runId": "run-1",
543 "targetKind": "http",
544 "targetName": "artifact-bucket",
545 "destination": "https://example.test/a.txt",
546 "bytesWritten": 42
547 })
548 ))
549 .unwrap());
550 assert!(projection
551 .apply(&event(
552 "5",
553 DESTROYED,
554 json!({
555 "cellId": "cell-1",
556 "specId": "spec-1",
557 "runId": "run-1",
558 "outcome": "succeeded"
559 })
560 ))
561 .unwrap());
562
563 assert_eq!(
564 projection.current_state(),
565 ProjectionCurrentState::Destroyed
566 );
567 assert_eq!(projection.processed_events, 5);
568 assert_eq!(projection.exports.len(), 1);
569 assert_eq!(projection.exports[0].bytes_written, Some(42));
570 }
571
572 #[test]
573 fn duplicate_event_id_is_ignored() {
574 let mut projection = CellStateProjection::default();
575 let started = event(
576 "1",
577 STARTED,
578 json!({
579 "cellId": "cell-1",
580 "specId": "spec-1"
581 }),
582 );
583
584 assert!(projection.apply(&started).unwrap());
585 assert!(!projection.apply(&started).unwrap());
586 assert_eq!(projection.processed_events, 1);
587 assert_eq!(projection.current_state(), ProjectionCurrentState::Started);
588 }
589
590 #[test]
591 fn export_before_command_is_rejected() {
592 let mut projection = CellStateProjection::default();
593 projection
594 .apply(&event(
595 "1",
596 STARTED,
597 json!({
598 "cellId": "cell-1",
599 "specId": "spec-1"
600 }),
601 ))
602 .unwrap();
603
604 let err = projection
605 .apply(&event(
606 "2",
607 EXPORT_COMPLETED,
608 json!({
609 "cellId": "cell-1",
610 "specId": "spec-1",
611 "targetKind": "s3"
612 }),
613 ))
614 .unwrap_err();
615
616 assert!(err.to_string().contains("command has not completed"));
617 }
618
619 #[test]
620 fn identity_materialize_failure_becomes_identity_failed_state() {
621 let mut projection = CellStateProjection::default();
622 projection
623 .apply(&event(
624 "1",
625 STARTED,
626 json!({
627 "cellId": "cell-1",
628 "specId": "spec-1"
629 }),
630 ))
631 .unwrap();
632 projection
633 .apply(&event(
634 "2",
635 IDENTITY_FAILED,
636 json!({
637 "cellId": "cell-1",
638 "specId": "spec-1",
639 "operation": "materialize",
640 "reason": "missing oidc token"
641 }),
642 ))
643 .unwrap();
644
645 assert_eq!(
646 projection.current_state(),
647 ProjectionCurrentState::IdentityFailed
648 );
649 assert_eq!(projection.last_error.as_deref(), Some("missing oidc token"));
650 }
651
652 #[test]
653 fn snapshot_includes_current_state() {
654 let mut projection = CellStateProjection::default();
655 projection
656 .apply(&event(
657 "1",
658 STARTED,
659 json!({
660 "cellId": "cell-1",
661 "specId": "spec-1"
662 }),
663 ))
664 .unwrap();
665
666 let snapshot = projection.snapshot();
667 assert_eq!(snapshot.current_state, ProjectionCurrentState::Started);
668 assert_eq!(snapshot.cell_id.as_deref(), Some("cell-1"));
669 }
670
671 #[test]
672 fn snapshot_carries_placement_from_started_event() {
673 let mut projection = CellStateProjection::default();
674 projection
675 .apply(&event(
676 "1",
677 STARTED,
678 json!({
679 "cellId": "cell-1",
680 "specId": "spec-1",
681 "placement": {
682 "poolId": "runner-pool-amd64",
683 "queueName": "ci-high"
684 }
685 }),
686 ))
687 .unwrap();
688
689 let snapshot = projection.snapshot();
690 assert_eq!(
691 snapshot
692 .placement
693 .as_ref()
694 .and_then(|placement| placement.pool_id.as_deref()),
695 Some("runner-pool-amd64")
696 );
697 assert_eq!(
698 snapshot
699 .placement
700 .as_ref()
701 .and_then(|placement| placement.queue_name.as_deref()),
702 Some("ci-high")
703 );
704 }
705
706 fn event(id: &str, ty: &str, data: Value) -> CloudEventV1 {
707 CloudEventV1 {
708 specversion: "1.0".into(),
709 id: id.into(),
710 source: "urn:test".into(),
711 ty: ty.into(),
712 datacontenttype: Some("application/json".into()),
713 data: Some(data),
714 time: None,
715 traceparent: None,
716 }
717 }
718}