1use serde::{Deserialize, Serialize};
4
5use crate::Event;
6
7#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Copy, Debug, PartialEq, Eq)]
13pub enum WorkflowStatus {
14 Running,
16 Completed,
18 Failed,
20 Cancelled,
22 TimedOut,
24 ContinuedAsNew,
26}
27
28impl WorkflowStatus {
29 #[must_use]
31 pub const fn is_terminal(self) -> bool {
32 match self {
33 Self::Running => false,
34 Self::Completed
35 | Self::Failed
36 | Self::Cancelled
37 | Self::TimedOut
38 | Self::ContinuedAsNew => true,
39 }
40 }
41}
42
43#[must_use]
51pub fn status_from_events(events: &[Event]) -> WorkflowStatus {
52 events
53 .iter()
54 .rev()
55 .find_map(|event| match event {
56 Event::WorkflowStarted { .. } => Some(WorkflowStatus::Running),
57 Event::WorkflowCompleted { .. } => Some(WorkflowStatus::Completed),
58 Event::WorkflowFailed { .. } => Some(WorkflowStatus::Failed),
59 Event::WorkflowCancelled { .. } => Some(WorkflowStatus::Cancelled),
60 Event::WorkflowTimedOut { .. } => Some(WorkflowStatus::TimedOut),
61 Event::WorkflowContinuedAsNew { .. } => Some(WorkflowStatus::ContinuedAsNew),
62 Event::SearchAttributesUpdated { .. }
63 | Event::ActivityScheduled { .. }
64 | Event::ActivityStarted { .. }
65 | Event::ActivityCompleted { .. }
66 | Event::ActivityFailed { .. }
67 | Event::ActivityCancelled { .. }
68 | Event::TimerStarted { .. }
69 | Event::TimerFired { .. }
70 | Event::TimerCancelled { .. }
71 | Event::WithTimeoutCompleted { .. }
72 | Event::SignalReceived { .. }
73 | Event::SignalSent { .. }
74 | Event::ChildWorkflowStarted { .. }
75 | Event::ChildWorkflowCompleted { .. }
76 | Event::ChildWorkflowFailed { .. }
77 | Event::ChildWorkflowCancelled { .. }
78 | Event::ScheduleCreated { .. }
79 | Event::ScheduleUpdated { .. }
80 | Event::SchedulePaused { .. }
81 | Event::ScheduleResumed { .. }
82 | Event::ScheduleDeleted { .. }
83 | Event::ScheduleTriggered { .. } => None,
84 })
85 .unwrap_or(WorkflowStatus::Running)
86}
87
88#[cfg(test)]
89mod tests {
90 use std::collections::HashMap;
91
92 use chrono::{DateTime, Utc};
93 use serde_json::json;
94
95 use super::{WorkflowStatus, status_from_events};
96 use crate::{
97 ActivityId, Event, EventEnvelope, Payload, RunId, ScheduleId, SearchAttributeValue,
98 WorkflowError, WorkflowId,
99 };
100
101 fn recorded_at(offset: i64) -> DateTime<Utc> {
102 DateTime::from_timestamp(1_700_000_000 + offset, 0).unwrap_or_default()
103 }
104
105 fn envelope(seq: u64) -> EventEnvelope {
106 EventEnvelope {
107 seq,
108 recorded_at: recorded_at(i64::try_from(seq).unwrap_or(0)),
109 workflow_id: WorkflowId::new(uuid::Uuid::nil()),
110 }
111 }
112
113 fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
114 Payload::from_json(&json!({ "label": label }))
115 }
116
117 fn workflow_started(seq: u64) -> Result<Event, crate::PayloadError> {
118 Ok(Event::WorkflowStarted {
119 envelope: envelope(seq),
120 workflow_type: String::from("checkout"),
121 input: payload("input")?,
122 run_id: RunId::new(uuid::Uuid::from_u128(1)),
123 parent_run_id: None,
124 package_version: crate::PackageVersion::new("a".repeat(64)),
125 })
126 }
127
128 fn workflow_error(message: &str) -> WorkflowError {
129 WorkflowError {
130 message: String::from(message),
131 details: None,
132 }
133 }
134
135 #[test]
136 fn empty_history_projects_to_running() {
137 assert_eq!(status_from_events(&[]), WorkflowStatus::Running);
138 }
139
140 #[test]
141 fn replacement_start_projects_continue_as_new_chain_running() -> Result<(), crate::PayloadError>
142 {
143 let parent_run_id = RunId::new(uuid::Uuid::from_u128(7));
144 let events = vec![
145 workflow_started(1)?,
146 Event::WorkflowContinuedAsNew {
147 envelope: envelope(2),
148 input: payload("replacement")?,
149 workflow_type: None,
150 parent_run_id: parent_run_id.clone(),
151 },
152 Event::WorkflowStarted {
153 envelope: envelope(3),
154 workflow_type: String::from("checkout"),
155 input: payload("replacement")?,
156 run_id: RunId::new(uuid::Uuid::from_u128(1)),
157 parent_run_id: Some(parent_run_id),
158 package_version: crate::PackageVersion::new("a".repeat(64)),
159 },
160 ];
161
162 assert_eq!(status_from_events(&events), WorkflowStatus::Running);
163 Ok(())
164 }
165
166 #[test]
167 fn completed_terminal_event_projects_to_completed() -> Result<(), Box<dyn std::error::Error>> {
168 let events = vec![
169 workflow_started(1)?,
170 Event::WorkflowCompleted {
171 envelope: envelope(2),
172 result: payload("result")?,
173 },
174 ];
175
176 assert_eq!(status_from_events(&events), WorkflowStatus::Completed);
177 Ok(())
178 }
179
180 #[test]
181 fn failed_terminal_event_projects_to_failed() -> Result<(), Box<dyn std::error::Error>> {
182 let events = vec![
183 workflow_started(1)?,
184 Event::WorkflowFailed {
185 envelope: envelope(2),
186 error: workflow_error("failed"),
187 },
188 ];
189
190 assert_eq!(status_from_events(&events), WorkflowStatus::Failed);
191 Ok(())
192 }
193
194 #[test]
195 fn cancelled_terminal_event_projects_to_cancelled() -> Result<(), Box<dyn std::error::Error>> {
196 let events = vec![
197 workflow_started(1)?,
198 Event::WorkflowCancelled {
199 envelope: envelope(2),
200 reason: String::from("caller requested cancellation"),
201 },
202 ];
203
204 assert_eq!(status_from_events(&events), WorkflowStatus::Cancelled);
205 Ok(())
206 }
207
208 #[test]
209 fn timed_out_terminal_event_projects_to_timed_out() -> Result<(), Box<dyn std::error::Error>> {
210 let events = vec![
211 workflow_started(1)?,
212 Event::WorkflowTimedOut {
213 envelope: envelope(2),
214 timeout: String::from("execution"),
215 },
216 ];
217
218 assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
219 Ok(())
220 }
221
222 #[test]
223 fn continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
224 let events = vec![
225 workflow_started(1)?,
226 Event::WorkflowContinuedAsNew {
227 envelope: envelope(2),
228 input: payload("continued-input")?,
229 workflow_type: Some(String::from("checkout-v2")),
230 parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
231 },
232 ];
233
234 assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
235 Ok(())
236 }
237
238 #[test]
239 fn workflow_status_terminality_classifies_running_and_terminal_statuses() {
240 assert!(!WorkflowStatus::Running.is_terminal());
241 assert!(WorkflowStatus::Completed.is_terminal());
242 assert!(WorkflowStatus::Failed.is_terminal());
243 assert!(WorkflowStatus::Cancelled.is_terminal());
244 assert!(WorkflowStatus::TimedOut.is_terminal());
245 assert!(WorkflowStatus::ContinuedAsNew.is_terminal());
246 }
247
248 #[test]
249 fn started_then_continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
250 let events = vec![
251 workflow_started(1)?,
252 Event::WorkflowContinuedAsNew {
253 envelope: envelope(2),
254 input: payload("continued-input")?,
255 workflow_type: None,
256 parent_run_id: RunId::new(uuid::Uuid::from_u128(3)),
257 },
258 ];
259
260 assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
261 Ok(())
262 }
263
264 #[test]
265 fn non_terminal_history_projects_to_running() -> Result<(), Box<dyn std::error::Error>> {
266 let events = vec![
267 workflow_started(1)?,
268 Event::SearchAttributesUpdated {
269 envelope: envelope(2),
270 workflow_id: WorkflowId::new(uuid::Uuid::nil()),
271 attributes: HashMap::from([(
272 String::from("customer_id"),
273 SearchAttributeValue::String(String::from("customer-123")),
274 )]),
275 },
276 Event::ActivityScheduled {
277 envelope: envelope(3),
278 activity_id: ActivityId::from_sequence_position(3),
279 activity_type: String::from("charge-card"),
280 input: payload("activity-input")?,
281 },
282 ];
283
284 assert_eq!(status_from_events(&events), WorkflowStatus::Running);
285 Ok(())
286 }
287
288 #[test]
289 fn schedule_events_do_not_change_workflow_status() -> Result<(), Box<dyn std::error::Error>> {
290 let events = vec![
291 workflow_started(1)?,
292 Event::SchedulePaused {
293 envelope: envelope(2),
294 schedule_id: ScheduleId::new(uuid::Uuid::from_u128(2)),
295 },
296 ];
297
298 assert_eq!(status_from_events(&events), WorkflowStatus::Running);
299 Ok(())
300 }
301
302 #[test]
303 fn projection_is_deterministic() -> Result<(), Box<dyn std::error::Error>> {
304 let events = vec![
305 workflow_started(1)?,
306 Event::WorkflowCompleted {
307 envelope: envelope(2),
308 result: payload("result")?,
309 },
310 ];
311
312 let first = status_from_events(&events);
313 let second = status_from_events(&events);
314
315 assert_eq!(first, second);
316 Ok(())
317 }
318
319 #[test]
320 fn last_terminal_lifecycle_event_determines_status() -> Result<(), Box<dyn std::error::Error>> {
321 let events = vec![
322 workflow_started(1)?,
323 Event::WorkflowCompleted {
324 envelope: envelope(2),
325 result: payload("result")?,
326 },
327 Event::WorkflowTimedOut {
328 envelope: envelope(3),
329 timeout: String::from("execution"),
330 },
331 ];
332
333 assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
334 Ok(())
335 }
336}