Skip to main content

aion_core/
status.rs

1//! Workflow status projection from authoritative event history.
2
3use serde::{Deserialize, Serialize};
4
5use crate::Event;
6
7/// Projected lifecycle status for a workflow execution.
8///
9/// Status must be obtained only by projecting from event history with
10/// [`status_from_events`], never assigned directly or stored as an independent
11/// mutable field. Event history remains authoritative for every workflow state.
12#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Copy, Debug, PartialEq, Eq)]
13pub enum WorkflowStatus {
14    /// The workflow has not recorded a terminal lifecycle event.
15    Running,
16    /// The workflow recorded a [`Event::WorkflowCompleted`] terminal event.
17    Completed,
18    /// The workflow recorded a [`Event::WorkflowFailed`] terminal event.
19    Failed,
20    /// The workflow recorded a [`Event::WorkflowCancelled`] terminal event.
21    Cancelled,
22    /// The workflow recorded a [`Event::WorkflowTimedOut`] terminal event.
23    TimedOut,
24    /// The workflow recorded a [`Event::WorkflowContinuedAsNew`] terminal event.
25    ContinuedAsNew,
26}
27
28impl WorkflowStatus {
29    /// Returns whether this status represents a terminal workflow execution state.
30    #[must_use]
31    pub const fn is_terminal(self) -> bool {
32        match self {
33            Self::Running => false,
34            Self::Completed
35            | Self::Failed
36            | Self::Cancelled
37            | Self::TimedOut
38            | Self::ContinuedAsNew => true,
39        }
40    }
41}
42
43/// Projects workflow status from an event history.
44///
45/// The last terminal workflow lifecycle event determines the projected status.
46/// Histories without a terminal workflow event are considered running.
47/// When a history contains multiple runs for continue-as-new, a later
48/// [`Event::WorkflowStarted`] begins the current run and supersedes earlier
49/// terminal events from the previous run.
50#[must_use]
51pub fn status_from_events(events: &[Event]) -> WorkflowStatus {
52    events
53        .iter()
54        .rev()
55        .find_map(|event| match event {
56            Event::WorkflowStarted { .. } => Some(WorkflowStatus::Running),
57            Event::WorkflowCompleted { .. } => Some(WorkflowStatus::Completed),
58            Event::WorkflowFailed { .. } => Some(WorkflowStatus::Failed),
59            Event::WorkflowCancelled { .. } => Some(WorkflowStatus::Cancelled),
60            Event::WorkflowTimedOut { .. } => Some(WorkflowStatus::TimedOut),
61            Event::WorkflowContinuedAsNew { .. } => Some(WorkflowStatus::ContinuedAsNew),
62            Event::SearchAttributesUpdated { .. }
63            | Event::ActivityScheduled { .. }
64            | Event::ActivityStarted { .. }
65            | Event::ActivityCompleted { .. }
66            | Event::ActivityFailed { .. }
67            | Event::ActivityCancelled { .. }
68            | Event::TimerStarted { .. }
69            | Event::TimerFired { .. }
70            | Event::TimerCancelled { .. }
71            | Event::WithTimeoutCompleted { .. }
72            | Event::SignalReceived { .. }
73            | Event::SignalSent { .. }
74            | Event::ChildWorkflowStarted { .. }
75            | Event::ChildWorkflowCompleted { .. }
76            | Event::ChildWorkflowFailed { .. }
77            | Event::ChildWorkflowCancelled { .. }
78            | Event::ScheduleCreated { .. }
79            | Event::ScheduleUpdated { .. }
80            | Event::SchedulePaused { .. }
81            | Event::ScheduleResumed { .. }
82            | Event::ScheduleDeleted { .. }
83            | Event::ScheduleTriggered { .. } => None,
84        })
85        .unwrap_or(WorkflowStatus::Running)
86}
87
88#[cfg(test)]
89mod tests {
90    use std::collections::HashMap;
91
92    use chrono::{DateTime, Utc};
93    use serde_json::json;
94
95    use super::{WorkflowStatus, status_from_events};
96    use crate::{
97        ActivityId, Event, EventEnvelope, Payload, RunId, ScheduleId, SearchAttributeValue,
98        WorkflowError, WorkflowId,
99    };
100
101    fn recorded_at(offset: i64) -> DateTime<Utc> {
102        DateTime::from_timestamp(1_700_000_000 + offset, 0).unwrap_or_default()
103    }
104
105    fn envelope(seq: u64) -> EventEnvelope {
106        EventEnvelope {
107            seq,
108            recorded_at: recorded_at(i64::try_from(seq).unwrap_or(0)),
109            workflow_id: WorkflowId::new(uuid::Uuid::nil()),
110        }
111    }
112
113    fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
114        Payload::from_json(&json!({ "label": label }))
115    }
116
117    fn workflow_started(seq: u64) -> Result<Event, crate::PayloadError> {
118        Ok(Event::WorkflowStarted {
119            envelope: envelope(seq),
120            workflow_type: String::from("checkout"),
121            input: payload("input")?,
122            run_id: RunId::new(uuid::Uuid::from_u128(1)),
123            parent_run_id: None,
124        })
125    }
126
127    fn workflow_error(message: &str) -> WorkflowError {
128        WorkflowError {
129            message: String::from(message),
130            details: None,
131        }
132    }
133
134    #[test]
135    fn empty_history_projects_to_running() {
136        assert_eq!(status_from_events(&[]), WorkflowStatus::Running);
137    }
138
139    #[test]
140    fn replacement_start_projects_continue_as_new_chain_running() -> Result<(), crate::PayloadError>
141    {
142        let parent_run_id = RunId::new(uuid::Uuid::from_u128(7));
143        let events = vec![
144            workflow_started(1)?,
145            Event::WorkflowContinuedAsNew {
146                envelope: envelope(2),
147                input: payload("replacement")?,
148                workflow_type: None,
149                parent_run_id: parent_run_id.clone(),
150            },
151            Event::WorkflowStarted {
152                envelope: envelope(3),
153                workflow_type: String::from("checkout"),
154                input: payload("replacement")?,
155                run_id: RunId::new(uuid::Uuid::from_u128(1)),
156                parent_run_id: Some(parent_run_id),
157            },
158        ];
159
160        assert_eq!(status_from_events(&events), WorkflowStatus::Running);
161        Ok(())
162    }
163
164    #[test]
165    fn completed_terminal_event_projects_to_completed() -> Result<(), Box<dyn std::error::Error>> {
166        let events = vec![
167            workflow_started(1)?,
168            Event::WorkflowCompleted {
169                envelope: envelope(2),
170                result: payload("result")?,
171            },
172        ];
173
174        assert_eq!(status_from_events(&events), WorkflowStatus::Completed);
175        Ok(())
176    }
177
178    #[test]
179    fn failed_terminal_event_projects_to_failed() -> Result<(), Box<dyn std::error::Error>> {
180        let events = vec![
181            workflow_started(1)?,
182            Event::WorkflowFailed {
183                envelope: envelope(2),
184                error: workflow_error("failed"),
185            },
186        ];
187
188        assert_eq!(status_from_events(&events), WorkflowStatus::Failed);
189        Ok(())
190    }
191
192    #[test]
193    fn cancelled_terminal_event_projects_to_cancelled() -> Result<(), Box<dyn std::error::Error>> {
194        let events = vec![
195            workflow_started(1)?,
196            Event::WorkflowCancelled {
197                envelope: envelope(2),
198                reason: String::from("caller requested cancellation"),
199            },
200        ];
201
202        assert_eq!(status_from_events(&events), WorkflowStatus::Cancelled);
203        Ok(())
204    }
205
206    #[test]
207    fn timed_out_terminal_event_projects_to_timed_out() -> Result<(), Box<dyn std::error::Error>> {
208        let events = vec![
209            workflow_started(1)?,
210            Event::WorkflowTimedOut {
211                envelope: envelope(2),
212                timeout: String::from("execution"),
213            },
214        ];
215
216        assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
217        Ok(())
218    }
219
220    #[test]
221    fn continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
222        let events = vec![
223            workflow_started(1)?,
224            Event::WorkflowContinuedAsNew {
225                envelope: envelope(2),
226                input: payload("continued-input")?,
227                workflow_type: Some(String::from("checkout-v2")),
228                parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
229            },
230        ];
231
232        assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
233        Ok(())
234    }
235
236    #[test]
237    fn workflow_status_terminality_classifies_running_and_terminal_statuses() {
238        assert!(!WorkflowStatus::Running.is_terminal());
239        assert!(WorkflowStatus::Completed.is_terminal());
240        assert!(WorkflowStatus::Failed.is_terminal());
241        assert!(WorkflowStatus::Cancelled.is_terminal());
242        assert!(WorkflowStatus::TimedOut.is_terminal());
243        assert!(WorkflowStatus::ContinuedAsNew.is_terminal());
244    }
245
246    #[test]
247    fn started_then_continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
248        let events = vec![
249            workflow_started(1)?,
250            Event::WorkflowContinuedAsNew {
251                envelope: envelope(2),
252                input: payload("continued-input")?,
253                workflow_type: None,
254                parent_run_id: RunId::new(uuid::Uuid::from_u128(3)),
255            },
256        ];
257
258        assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
259        Ok(())
260    }
261
262    #[test]
263    fn non_terminal_history_projects_to_running() -> Result<(), Box<dyn std::error::Error>> {
264        let events = vec![
265            workflow_started(1)?,
266            Event::SearchAttributesUpdated {
267                envelope: envelope(2),
268                workflow_id: WorkflowId::new(uuid::Uuid::nil()),
269                attributes: HashMap::from([(
270                    String::from("customer_id"),
271                    SearchAttributeValue::String(String::from("customer-123")),
272                )]),
273            },
274            Event::ActivityScheduled {
275                envelope: envelope(3),
276                activity_id: ActivityId::from_sequence_position(3),
277                activity_type: String::from("charge-card"),
278                input: payload("activity-input")?,
279            },
280        ];
281
282        assert_eq!(status_from_events(&events), WorkflowStatus::Running);
283        Ok(())
284    }
285
286    #[test]
287    fn schedule_events_do_not_change_workflow_status() -> Result<(), Box<dyn std::error::Error>> {
288        let events = vec![
289            workflow_started(1)?,
290            Event::SchedulePaused {
291                envelope: envelope(2),
292                schedule_id: ScheduleId::new(uuid::Uuid::from_u128(2)),
293            },
294        ];
295
296        assert_eq!(status_from_events(&events), WorkflowStatus::Running);
297        Ok(())
298    }
299
300    #[test]
301    fn projection_is_deterministic() -> Result<(), Box<dyn std::error::Error>> {
302        let events = vec![
303            workflow_started(1)?,
304            Event::WorkflowCompleted {
305                envelope: envelope(2),
306                result: payload("result")?,
307            },
308        ];
309
310        let first = status_from_events(&events);
311        let second = status_from_events(&events);
312
313        assert_eq!(first, second);
314        Ok(())
315    }
316
317    #[test]
318    fn last_terminal_lifecycle_event_determines_status() -> Result<(), Box<dyn std::error::Error>> {
319        let events = vec![
320            workflow_started(1)?,
321            Event::WorkflowCompleted {
322                envelope: envelope(2),
323                result: payload("result")?,
324            },
325            Event::WorkflowTimedOut {
326                envelope: envelope(3),
327                timeout: String::from("execution"),
328            },
329        ];
330
331        assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
332        Ok(())
333    }
334}