1use serde::{Deserialize, Serialize};
4
5use crate::Event;
6
7#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Copy, Debug, PartialEq, Eq)]
13pub enum WorkflowStatus {
14 Running,
16 Completed,
18 Failed,
20 Cancelled,
22 TimedOut,
24 ContinuedAsNew,
26}
27
28impl WorkflowStatus {
29 #[must_use]
31 pub const fn is_terminal(self) -> bool {
32 match self {
33 Self::Running => false,
34 Self::Completed
35 | Self::Failed
36 | Self::Cancelled
37 | Self::TimedOut
38 | Self::ContinuedAsNew => true,
39 }
40 }
41}
42
43#[must_use]
51pub fn status_from_events(events: &[Event]) -> WorkflowStatus {
52 events
53 .iter()
54 .rev()
55 .find_map(|event| match event {
56 Event::WorkflowStarted { .. } => Some(WorkflowStatus::Running),
57 Event::WorkflowCompleted { .. } => Some(WorkflowStatus::Completed),
58 Event::WorkflowFailed { .. } => Some(WorkflowStatus::Failed),
59 Event::WorkflowCancelled { .. } => Some(WorkflowStatus::Cancelled),
60 Event::WorkflowTimedOut { .. } => Some(WorkflowStatus::TimedOut),
61 Event::WorkflowContinuedAsNew { .. } => Some(WorkflowStatus::ContinuedAsNew),
62 Event::SearchAttributesUpdated { .. }
63 | Event::ActivityScheduled { .. }
64 | Event::ActivityStarted { .. }
65 | Event::ActivityCompleted { .. }
66 | Event::ActivityFailed { .. }
67 | Event::ActivityCancelled { .. }
68 | Event::TimerStarted { .. }
69 | Event::TimerFired { .. }
70 | Event::TimerCancelled { .. }
71 | Event::WithTimeoutCompleted { .. }
72 | Event::SignalReceived { .. }
73 | Event::SignalSent { .. }
74 | Event::ChildWorkflowStarted { .. }
75 | Event::ChildWorkflowCompleted { .. }
76 | Event::ChildWorkflowFailed { .. }
77 | Event::ChildWorkflowCancelled { .. }
78 | Event::ScheduleCreated { .. }
79 | Event::ScheduleUpdated { .. }
80 | Event::SchedulePaused { .. }
81 | Event::ScheduleResumed { .. }
82 | Event::ScheduleDeleted { .. }
83 | Event::ScheduleTriggered { .. } => None,
84 })
85 .unwrap_or(WorkflowStatus::Running)
86}
87
88#[cfg(test)]
89mod tests {
90 use std::collections::HashMap;
91
92 use chrono::{DateTime, Utc};
93 use serde_json::json;
94
95 use super::{WorkflowStatus, status_from_events};
96 use crate::{
97 ActivityId, Event, EventEnvelope, Payload, RunId, ScheduleId, SearchAttributeValue,
98 WorkflowError, WorkflowId,
99 };
100
101 fn recorded_at(offset: i64) -> DateTime<Utc> {
102 DateTime::from_timestamp(1_700_000_000 + offset, 0).unwrap_or_default()
103 }
104
105 fn envelope(seq: u64) -> EventEnvelope {
106 EventEnvelope {
107 seq,
108 recorded_at: recorded_at(i64::try_from(seq).unwrap_or(0)),
109 workflow_id: WorkflowId::new(uuid::Uuid::nil()),
110 }
111 }
112
113 fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
114 Payload::from_json(&json!({ "label": label }))
115 }
116
117 fn workflow_started(seq: u64) -> Result<Event, crate::PayloadError> {
118 Ok(Event::WorkflowStarted {
119 envelope: envelope(seq),
120 workflow_type: String::from("checkout"),
121 input: payload("input")?,
122 run_id: RunId::new(uuid::Uuid::from_u128(1)),
123 parent_run_id: None,
124 })
125 }
126
127 fn workflow_error(message: &str) -> WorkflowError {
128 WorkflowError {
129 message: String::from(message),
130 details: None,
131 }
132 }
133
134 #[test]
135 fn empty_history_projects_to_running() {
136 assert_eq!(status_from_events(&[]), WorkflowStatus::Running);
137 }
138
139 #[test]
140 fn replacement_start_projects_continue_as_new_chain_running() -> Result<(), crate::PayloadError>
141 {
142 let parent_run_id = RunId::new(uuid::Uuid::from_u128(7));
143 let events = vec![
144 workflow_started(1)?,
145 Event::WorkflowContinuedAsNew {
146 envelope: envelope(2),
147 input: payload("replacement")?,
148 workflow_type: None,
149 parent_run_id: parent_run_id.clone(),
150 },
151 Event::WorkflowStarted {
152 envelope: envelope(3),
153 workflow_type: String::from("checkout"),
154 input: payload("replacement")?,
155 run_id: RunId::new(uuid::Uuid::from_u128(1)),
156 parent_run_id: Some(parent_run_id),
157 },
158 ];
159
160 assert_eq!(status_from_events(&events), WorkflowStatus::Running);
161 Ok(())
162 }
163
164 #[test]
165 fn completed_terminal_event_projects_to_completed() -> Result<(), Box<dyn std::error::Error>> {
166 let events = vec![
167 workflow_started(1)?,
168 Event::WorkflowCompleted {
169 envelope: envelope(2),
170 result: payload("result")?,
171 },
172 ];
173
174 assert_eq!(status_from_events(&events), WorkflowStatus::Completed);
175 Ok(())
176 }
177
178 #[test]
179 fn failed_terminal_event_projects_to_failed() -> Result<(), Box<dyn std::error::Error>> {
180 let events = vec![
181 workflow_started(1)?,
182 Event::WorkflowFailed {
183 envelope: envelope(2),
184 error: workflow_error("failed"),
185 },
186 ];
187
188 assert_eq!(status_from_events(&events), WorkflowStatus::Failed);
189 Ok(())
190 }
191
192 #[test]
193 fn cancelled_terminal_event_projects_to_cancelled() -> Result<(), Box<dyn std::error::Error>> {
194 let events = vec![
195 workflow_started(1)?,
196 Event::WorkflowCancelled {
197 envelope: envelope(2),
198 reason: String::from("caller requested cancellation"),
199 },
200 ];
201
202 assert_eq!(status_from_events(&events), WorkflowStatus::Cancelled);
203 Ok(())
204 }
205
206 #[test]
207 fn timed_out_terminal_event_projects_to_timed_out() -> Result<(), Box<dyn std::error::Error>> {
208 let events = vec![
209 workflow_started(1)?,
210 Event::WorkflowTimedOut {
211 envelope: envelope(2),
212 timeout: String::from("execution"),
213 },
214 ];
215
216 assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
217 Ok(())
218 }
219
220 #[test]
221 fn continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
222 let events = vec![
223 workflow_started(1)?,
224 Event::WorkflowContinuedAsNew {
225 envelope: envelope(2),
226 input: payload("continued-input")?,
227 workflow_type: Some(String::from("checkout-v2")),
228 parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
229 },
230 ];
231
232 assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
233 Ok(())
234 }
235
236 #[test]
237 fn workflow_status_terminality_classifies_running_and_terminal_statuses() {
238 assert!(!WorkflowStatus::Running.is_terminal());
239 assert!(WorkflowStatus::Completed.is_terminal());
240 assert!(WorkflowStatus::Failed.is_terminal());
241 assert!(WorkflowStatus::Cancelled.is_terminal());
242 assert!(WorkflowStatus::TimedOut.is_terminal());
243 assert!(WorkflowStatus::ContinuedAsNew.is_terminal());
244 }
245
246 #[test]
247 fn started_then_continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
248 let events = vec![
249 workflow_started(1)?,
250 Event::WorkflowContinuedAsNew {
251 envelope: envelope(2),
252 input: payload("continued-input")?,
253 workflow_type: None,
254 parent_run_id: RunId::new(uuid::Uuid::from_u128(3)),
255 },
256 ];
257
258 assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
259 Ok(())
260 }
261
262 #[test]
263 fn non_terminal_history_projects_to_running() -> Result<(), Box<dyn std::error::Error>> {
264 let events = vec![
265 workflow_started(1)?,
266 Event::SearchAttributesUpdated {
267 envelope: envelope(2),
268 workflow_id: WorkflowId::new(uuid::Uuid::nil()),
269 attributes: HashMap::from([(
270 String::from("customer_id"),
271 SearchAttributeValue::String(String::from("customer-123")),
272 )]),
273 },
274 Event::ActivityScheduled {
275 envelope: envelope(3),
276 activity_id: ActivityId::from_sequence_position(3),
277 activity_type: String::from("charge-card"),
278 input: payload("activity-input")?,
279 },
280 ];
281
282 assert_eq!(status_from_events(&events), WorkflowStatus::Running);
283 Ok(())
284 }
285
286 #[test]
287 fn schedule_events_do_not_change_workflow_status() -> Result<(), Box<dyn std::error::Error>> {
288 let events = vec![
289 workflow_started(1)?,
290 Event::SchedulePaused {
291 envelope: envelope(2),
292 schedule_id: ScheduleId::new(uuid::Uuid::from_u128(2)),
293 },
294 ];
295
296 assert_eq!(status_from_events(&events), WorkflowStatus::Running);
297 Ok(())
298 }
299
300 #[test]
301 fn projection_is_deterministic() -> Result<(), Box<dyn std::error::Error>> {
302 let events = vec![
303 workflow_started(1)?,
304 Event::WorkflowCompleted {
305 envelope: envelope(2),
306 result: payload("result")?,
307 },
308 ];
309
310 let first = status_from_events(&events);
311 let second = status_from_events(&events);
312
313 assert_eq!(first, second);
314 Ok(())
315 }
316
317 #[test]
318 fn last_terminal_lifecycle_event_determines_status() -> Result<(), Box<dyn std::error::Error>> {
319 let events = vec![
320 workflow_started(1)?,
321 Event::WorkflowCompleted {
322 envelope: envelope(2),
323 result: payload("result")?,
324 },
325 Event::WorkflowTimedOut {
326 envelope: envelope(3),
327 timeout: String::from("execution"),
328 },
329 ];
330
331 assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
332 Ok(())
333 }
334}