use serde::{Deserialize, Serialize};
use crate::Event;
#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Copy, Debug, PartialEq, Eq)]
pub enum WorkflowStatus {
Running,
Completed,
Failed,
Cancelled,
TimedOut,
ContinuedAsNew,
}
impl WorkflowStatus {
#[must_use]
pub const fn is_terminal(self) -> bool {
match self {
Self::Running => false,
Self::Completed
| Self::Failed
| Self::Cancelled
| Self::TimedOut
| Self::ContinuedAsNew => true,
}
}
}
#[must_use]
pub fn status_from_events(events: &[Event]) -> WorkflowStatus {
events
.iter()
.rev()
.find_map(|event| match event {
Event::WorkflowStarted { .. } => Some(WorkflowStatus::Running),
Event::WorkflowCompleted { .. } => Some(WorkflowStatus::Completed),
Event::WorkflowFailed { .. } => Some(WorkflowStatus::Failed),
Event::WorkflowCancelled { .. } => Some(WorkflowStatus::Cancelled),
Event::WorkflowTimedOut { .. } => Some(WorkflowStatus::TimedOut),
Event::WorkflowContinuedAsNew { .. } => Some(WorkflowStatus::ContinuedAsNew),
Event::SearchAttributesUpdated { .. }
| Event::ActivityScheduled { .. }
| Event::ActivityStarted { .. }
| Event::ActivityCompleted { .. }
| Event::ActivityFailed { .. }
| Event::ActivityCancelled { .. }
| Event::TimerStarted { .. }
| Event::TimerFired { .. }
| Event::TimerCancelled { .. }
| Event::WithTimeoutCompleted { .. }
| Event::SignalReceived { .. }
| Event::SignalSent { .. }
| Event::ChildWorkflowStarted { .. }
| Event::ChildWorkflowCompleted { .. }
| Event::ChildWorkflowFailed { .. }
| Event::ChildWorkflowCancelled { .. }
| Event::ScheduleCreated { .. }
| Event::ScheduleUpdated { .. }
| Event::SchedulePaused { .. }
| Event::ScheduleResumed { .. }
| Event::ScheduleDeleted { .. }
| Event::ScheduleTriggered { .. } => None,
})
.unwrap_or(WorkflowStatus::Running)
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use serde_json::json;
use super::{WorkflowStatus, status_from_events};
use crate::{
ActivityId, Event, EventEnvelope, Payload, RunId, ScheduleId, SearchAttributeValue,
WorkflowError, WorkflowId,
};
fn recorded_at(offset: i64) -> DateTime<Utc> {
DateTime::from_timestamp(1_700_000_000 + offset, 0).unwrap_or_default()
}
fn envelope(seq: u64) -> EventEnvelope {
EventEnvelope {
seq,
recorded_at: recorded_at(i64::try_from(seq).unwrap_or(0)),
workflow_id: WorkflowId::new(uuid::Uuid::nil()),
}
}
fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
Payload::from_json(&json!({ "label": label }))
}
fn workflow_started(seq: u64) -> Result<Event, crate::PayloadError> {
Ok(Event::WorkflowStarted {
envelope: envelope(seq),
workflow_type: String::from("checkout"),
input: payload("input")?,
run_id: RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: crate::PackageVersion::new("a".repeat(64)),
})
}
fn workflow_error(message: &str) -> WorkflowError {
WorkflowError {
message: String::from(message),
details: None,
}
}
#[test]
fn empty_history_projects_to_running() {
assert_eq!(status_from_events(&[]), WorkflowStatus::Running);
}
#[test]
fn replacement_start_projects_continue_as_new_chain_running() -> Result<(), crate::PayloadError>
{
let parent_run_id = RunId::new(uuid::Uuid::from_u128(7));
let events = vec![
workflow_started(1)?,
Event::WorkflowContinuedAsNew {
envelope: envelope(2),
input: payload("replacement")?,
workflow_type: None,
parent_run_id: parent_run_id.clone(),
},
Event::WorkflowStarted {
envelope: envelope(3),
workflow_type: String::from("checkout"),
input: payload("replacement")?,
run_id: RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: Some(parent_run_id),
package_version: crate::PackageVersion::new("a".repeat(64)),
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::Running);
Ok(())
}
#[test]
fn completed_terminal_event_projects_to_completed() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::WorkflowCompleted {
envelope: envelope(2),
result: payload("result")?,
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::Completed);
Ok(())
}
#[test]
fn failed_terminal_event_projects_to_failed() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::WorkflowFailed {
envelope: envelope(2),
error: workflow_error("failed"),
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::Failed);
Ok(())
}
#[test]
fn cancelled_terminal_event_projects_to_cancelled() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::WorkflowCancelled {
envelope: envelope(2),
reason: String::from("caller requested cancellation"),
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::Cancelled);
Ok(())
}
#[test]
fn timed_out_terminal_event_projects_to_timed_out() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::WorkflowTimedOut {
envelope: envelope(2),
timeout: String::from("execution"),
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
Ok(())
}
#[test]
fn continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::WorkflowContinuedAsNew {
envelope: envelope(2),
input: payload("continued-input")?,
workflow_type: Some(String::from("checkout-v2")),
parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
Ok(())
}
#[test]
fn workflow_status_terminality_classifies_running_and_terminal_statuses() {
assert!(!WorkflowStatus::Running.is_terminal());
assert!(WorkflowStatus::Completed.is_terminal());
assert!(WorkflowStatus::Failed.is_terminal());
assert!(WorkflowStatus::Cancelled.is_terminal());
assert!(WorkflowStatus::TimedOut.is_terminal());
assert!(WorkflowStatus::ContinuedAsNew.is_terminal());
}
#[test]
fn started_then_continued_as_new_projects_status() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::WorkflowContinuedAsNew {
envelope: envelope(2),
input: payload("continued-input")?,
workflow_type: None,
parent_run_id: RunId::new(uuid::Uuid::from_u128(3)),
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::ContinuedAsNew);
Ok(())
}
#[test]
fn non_terminal_history_projects_to_running() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::SearchAttributesUpdated {
envelope: envelope(2),
workflow_id: WorkflowId::new(uuid::Uuid::nil()),
attributes: HashMap::from([(
String::from("customer_id"),
SearchAttributeValue::String(String::from("customer-123")),
)]),
},
Event::ActivityScheduled {
envelope: envelope(3),
activity_id: ActivityId::from_sequence_position(3),
activity_type: String::from("charge-card"),
input: payload("activity-input")?,
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::Running);
Ok(())
}
#[test]
fn schedule_events_do_not_change_workflow_status() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::SchedulePaused {
envelope: envelope(2),
schedule_id: ScheduleId::new(uuid::Uuid::from_u128(2)),
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::Running);
Ok(())
}
#[test]
fn projection_is_deterministic() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::WorkflowCompleted {
envelope: envelope(2),
result: payload("result")?,
},
];
let first = status_from_events(&events);
let second = status_from_events(&events);
assert_eq!(first, second);
Ok(())
}
#[test]
fn last_terminal_lifecycle_event_determines_status() -> Result<(), Box<dyn std::error::Error>> {
let events = vec![
workflow_started(1)?,
Event::WorkflowCompleted {
envelope: envelope(2),
result: payload("result")?,
},
Event::WorkflowTimedOut {
envelope: envelope(3),
timeout: String::from("execution"),
},
];
assert_eq!(status_from_events(&events), WorkflowStatus::TimedOut);
Ok(())
}
}