Skip to main content

aion_core/
status.rs

1//! Workflow status projection from authoritative event history.
2
3use serde::{Deserialize, Serialize};
4
5use crate::Event;
6
7/// Projected lifecycle status for a workflow execution.
8///
9/// Status must be obtained only by projecting from event history with
10/// [`status_from_events`], never assigned directly or stored as an independent
11/// mutable field. Event history remains authoritative for every workflow state.
12#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Copy, Debug, PartialEq, Eq)]
13pub enum WorkflowStatus {
14    /// The workflow has not recorded a terminal lifecycle event.
15    Running,
16    /// The workflow recorded a [`Event::WorkflowCompleted`] terminal event.
17    Completed,
18    /// The workflow recorded a [`Event::WorkflowFailed`] terminal event.
19    Failed,
20    /// The workflow recorded a [`Event::WorkflowCancelled`] terminal event.
21    Cancelled,
22    /// The workflow recorded a [`Event::WorkflowTimedOut`] terminal event.
23    TimedOut,
24    /// The workflow recorded a [`Event::WorkflowContinuedAsNew`] terminal event.
25    ContinuedAsNew,
26}
27
28impl WorkflowStatus {
29    /// Returns whether this status represents a terminal workflow execution state.
30    #[must_use]
31    pub const fn is_terminal(self) -> bool {
32        match self {
33            Self::Running => false,
34            Self::Completed
35            | Self::Failed
36            | Self::Cancelled
37            | Self::TimedOut
38            | Self::ContinuedAsNew => true,
39        }
40    }
41}
42
43/// Projects workflow status from an event history.
44///
45/// The last terminal workflow lifecycle event determines the projected status.
46/// Histories without a terminal workflow event are considered running.
47/// When a history contains multiple runs for continue-as-new, a later
48/// [`Event::WorkflowStarted`] begins the current run and supersedes earlier
49/// terminal events from the previous run.
50#[must_use]
51pub fn status_from_events(events: &[Event]) -> WorkflowStatus {
52    events
53        .iter()
54        .rev()
55        .find_map(|event| match event {
56            Event::WorkflowStarted { .. } => Some(WorkflowStatus::Running),
57            Event::WorkflowCompleted { .. } => Some(WorkflowStatus::Completed),
58            Event::WorkflowFailed { .. } => Some(WorkflowStatus::Failed),
59            Event::WorkflowCancelled { .. } => Some(WorkflowStatus::Cancelled),
60            Event::WorkflowTimedOut { .. } => Some(WorkflowStatus::TimedOut),
61            Event::WorkflowContinuedAsNew { .. } => Some(WorkflowStatus::ContinuedAsNew),
62            Event::SearchAttributesUpdated { .. }
63            | Event::ActivityScheduled { .. }
64            | Event::ActivityStarted { .. }
65            | Event::ActivityCompleted { .. }
66            | Event::ActivityFailed { .. }
67            | Event::ActivityCancelled { .. }
68            | Event::TimerStarted { .. }
69            | Event::TimerFired { .. }
70            | Event::TimerCancelled { .. }
71            | Event::WithTimeoutCompleted { .. }
72            | Event::SignalReceived { .. }
73            | Event::SignalSent { .. }
74            | Event::ChildWorkflowStarted { .. }
75            | Event::ChildWorkflowCompleted { .. }
76            | Event::ChildWorkflowFailed { .. }
77            | Event::ChildWorkflowCancelled { .. }
78            | Event::ScheduleCreated { .. }
79            | Event::ScheduleUpdated { .. }
80            | Event::SchedulePaused { .. }
81            | Event::ScheduleResumed { .. }
82            | Event::ScheduleDeleted { .. }
83            | Event::ScheduleTriggered { .. } => None,
84        })
85        .unwrap_or(WorkflowStatus::Running)
86}
87
88#[cfg(test)]
89mod tests {
90    use std::collections::HashMap;
91
92    use chrono::{DateTime, Utc};
93    use serde_json::json;
94
95    use super::{WorkflowStatus, status_from_events};
96    use crate::{
97        ActivityId, Event, EventEnvelope, Payload, RunId, ScheduleId, SearchAttributeValue,
98        WorkflowError, WorkflowId,
99    };
100
101    fn recorded_at(offset: i64) -> DateTime<Utc> {
102        DateTime::from_timestamp(1_700_000_000 + offset, 0).unwrap_or_default()
103    }
104
105    fn envelope(seq: u64) -> EventEnvelope {
106        EventEnvelope {
107            seq,
108            recorded_at: recorded_at(i64::try_from(seq).unwrap_or(0)),
109            workflow_id: WorkflowId::new(uuid::Uuid::nil()),
110        }
111    }
112
113    fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
114        Payload::from_json(&json!({ "label": label }))
115    }
116
117    fn workflow_started(seq: u64) -> Result<Event, crate::PayloadError> {
118        Ok(Event::WorkflowStarted {
119            envelope: envelope(seq),
120            workflow_type: String::from("checkout"),
121            input: payload("input")?,
122            run_id: RunId::new(uuid::Uuid::from_u128(1)),
123            parent_run_id: None,
124            package_version: crate::PackageVersion::new("a".repeat(64)),
125        })
126    }
127
128    fn workflow_error(message: &str) -> WorkflowError {
129        WorkflowError {
130            message: String::from(message),
131            details: None,
132        }
133    }
134
135    #[test]
136    fn empty_history_projects_to_running() {
137        assert_eq!(status_from_events(&[]), WorkflowStatus::Running);
138    }
139
140    #[test]
141    fn replacement_start_projects_continue_as_new_chain_running() -> Result<(), crate::PayloadError>
142    {
143        let parent_run_id = RunId::new(uuid::Uuid::from_u128(7));
144        let events = vec![
145            workflow_started(1)?,
146            Event::WorkflowContinuedAsNew {
147                envelope: envelope(2),
148                input: payload("replacement")?,
149                workflow_type: None,
150                parent_run_id: parent_run_id.clone(),
151            },
152            Event::WorkflowStarted {
153                envelope: envelope(3),
154                workflow_type: String::from("checkout"),
155                input: payload("replacement")?,
156                run_id: RunId::new(uuid::Uuid::from_u128(1)),
157                parent_run_id: Some(parent_run_id),
158                package_version: crate::PackageVersion::new("a".repeat(64)),
159            },
160        ];
161
162        assert_eq!(status_from_events(&events), WorkflowStatus::Running);
163        Ok(())
164    }
165
166    #[test]
167    fn completed_terminal_event_projects_to_completed() -> Result<(), Box<dyn std::error::Error>> {
168        let events = vec![
169            workflow_started(1)?,
170            Event::WorkflowCompleted {
171                envelope: envelope(2),
172                result: payload("result")?,
173            },
174        ];
175
176        assert_eq!(status_from_events(&events), WorkflowStatus::Completed);
177        Ok(())
178    }
179
180    #[test]
181    fn failed_terminal_event_projects_to_failed() -> Result<(), Box<dyn std::error::Error>> {
182        let events = vec![
183            workflow_started(1)?,
184            Event::WorkflowFailed {
185                envelope: envelope(2),
186                error: workflow_error("failed"),
187            },
188        ];
189
190        assert_eq!(status_from_events(&events), WorkflowStatus::Failed);
191        Ok(())
192    }
193
194    #[test]
195    fn cancelled_terminal_event_projects_to_cancelled() -> Result<(), Box<dyn std::error::Error>> {
196        let events = vec![
197            workflow_started(1)?,
198            Event::WorkflowCancelled {
199                envelope: envelope(2),
200                reason: String::from("caller requested cancellation"),
201            },
202        ];
203
204        assert_eq!(status_from_events(&events), WorkflowStatus::Cancelled);
205        Ok(())
206    }
207
208    #[test]
209    fn timed_out_terminal_event_projects_to_timed_out() -> Result<(), Box<dyn std::error::Error>> {
210        let events = vec![
211            workflow_started(1)?,
212            Event::WorkflowTimedOut {
213                envelope: envelope(2),
214                timeout: String::from("execution"),
215            },
216        ];
217
218        assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
219        Ok(())
220    }
221
222    #[test]
223    fn continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
224        let events = vec![
225            workflow_started(1)?,
226            Event::WorkflowContinuedAsNew {
227                envelope: envelope(2),
228                input: payload("continued-input")?,
229                workflow_type: Some(String::from("checkout-v2")),
230                parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
231            },
232        ];
233
234        assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
235        Ok(())
236    }
237
238    #[test]
239    fn workflow_status_terminality_classifies_running_and_terminal_statuses() {
240        assert!(!WorkflowStatus::Running.is_terminal());
241        assert!(WorkflowStatus::Completed.is_terminal());
242        assert!(WorkflowStatus::Failed.is_terminal());
243        assert!(WorkflowStatus::Cancelled.is_terminal());
244        assert!(WorkflowStatus::TimedOut.is_terminal());
245        assert!(WorkflowStatus::ContinuedAsNew.is_terminal());
246    }
247
248    #[test]
249    fn started_then_continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
250        let events = vec![
251            workflow_started(1)?,
252            Event::WorkflowContinuedAsNew {
253                envelope: envelope(2),
254                input: payload("continued-input")?,
255                workflow_type: None,
256                parent_run_id: RunId::new(uuid::Uuid::from_u128(3)),
257            },
258        ];
259
260        assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
261        Ok(())
262    }
263
264    #[test]
265    fn non_terminal_history_projects_to_running() -> Result<(), Box<dyn std::error::Error>> {
266        let events = vec![
267            workflow_started(1)?,
268            Event::SearchAttributesUpdated {
269                envelope: envelope(2),
270                workflow_id: WorkflowId::new(uuid::Uuid::nil()),
271                attributes: HashMap::from([(
272                    String::from("customer_id"),
273                    SearchAttributeValue::String(String::from("customer-123")),
274                )]),
275            },
276            Event::ActivityScheduled {
277                envelope: envelope(3),
278                activity_id: ActivityId::from_sequence_position(3),
279                activity_type: String::from("charge-card"),
280                input: payload("activity-input")?,
281            },
282        ];
283
284        assert_eq!(status_from_events(&events), WorkflowStatus::Running);
285        Ok(())
286    }
287
288    #[test]
289    fn schedule_events_do_not_change_workflow_status() -> Result<(), Box<dyn std::error::Error>> {
290        let events = vec![
291            workflow_started(1)?,
292            Event::SchedulePaused {
293                envelope: envelope(2),
294                schedule_id: ScheduleId::new(uuid::Uuid::from_u128(2)),
295            },
296        ];
297
298        assert_eq!(status_from_events(&events), WorkflowStatus::Running);
299        Ok(())
300    }
301
302    #[test]
303    fn projection_is_deterministic() -> Result<(), Box<dyn std::error::Error>> {
304        let events = vec![
305            workflow_started(1)?,
306            Event::WorkflowCompleted {
307                envelope: envelope(2),
308                result: payload("result")?,
309            },
310        ];
311
312        let first = status_from_events(&events);
313        let second = status_from_events(&events);
314
315        assert_eq!(first, second);
316        Ok(())
317    }
318
319    #[test]
320    fn last_terminal_lifecycle_event_determines_status() -> Result<(), Box<dyn std::error::Error>> {
321        let events = vec![
322            workflow_started(1)?,
323            Event::WorkflowCompleted {
324                envelope: envelope(2),
325                result: payload("result")?,
326            },
327            Event::WorkflowTimedOut {
328                envelope: envelope(3),
329                timeout: String::from("execution"),
330            },
331        ];
332
333        assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
334        Ok(())
335    }
336}