sayiir_persistence/lifecycle.rs
1//! Workflow-lifecycle primitives shared across runtimes.
2//!
3//! [`prepare_run`] handles the snapshot-existence check + conflict-policy
4//! resolution that every binding needs to do at the start of a `run()`.
5//! The runtime (`sayiir-runtime`) and the Cloudflare Workers binding
6//! (`sayiir-cloudflare`) both used to carry their own copies of this
7//! logic; this module is the de-duplicated home.
8//!
9//! The error type [`RunConflict`] carries the three reasons a run can be
10//! rejected (`AlreadyExists`, `DefinitionMismatch`, backend I/O). Callers
11//! convert it into their own error envelopes via `From<RunConflict>`.
12
13use bytes::Bytes;
14use sayiir_core::snapshot::{ExecutionPosition, SignalKind, TaskHint, WorkflowSnapshot};
15use sayiir_core::workflow::{ConflictPolicy, WorkflowStatus};
16
17use crate::{BackendError, SignalStore, SnapshotStore};
18
19/// Outcome of [`prepare_run`].
20#[derive(Debug)]
21pub enum PrepareRunOutcome {
22 /// Snapshot is fresh — execute the workflow from this state.
23 ///
24 /// Boxed so the enum's size is dominated by `ExistingStatus` (the
25 /// cheap variant). `WorkflowSnapshot` is large enough that clippy's
26 /// `large_enum_variant` lint trips otherwise; the caller unboxes once
27 /// at the match site so there's no per-task allocation overhead.
28 Fresh(Box<WorkflowSnapshot>),
29 /// Existing instance reused under `UseExisting`. Caller must return
30 /// the carried status without executing.
31 ExistingStatus(WorkflowStatus, Option<Bytes>),
32}
33
34/// Reasons [`prepare_run`] may reject a call.
35#[derive(Debug)]
36pub enum RunConflict {
37 /// The supplied `instance_id` failed the API-boundary length check.
38 InvalidInstanceId(sayiir_core::InvalidInstanceId),
39 /// `Fail` policy and the instance id is already in use.
40 AlreadyExists(String),
41 /// The existing snapshot was produced from a different workflow definition.
42 DefinitionMismatch {
43 /// Definition hash the caller expected.
44 expected: sayiir_core::DefinitionHash,
45 /// Definition hash actually stored.
46 found: sayiir_core::DefinitionHash,
47 },
48 /// Backend I/O error.
49 Backend(BackendError),
50}
51
52impl From<BackendError> for RunConflict {
53 fn from(e: BackendError) -> Self {
54 Self::Backend(e)
55 }
56}
57
58impl From<sayiir_core::InvalidInstanceId> for RunConflict {
59 fn from(e: sayiir_core::InvalidInstanceId) -> Self {
60 Self::InvalidInstanceId(e)
61 }
62}
63
64impl std::fmt::Display for RunConflict {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 // Names are the canonical strum-serialized forms, which both the
67 // Rust `ConflictPolicy::*` variants and the JS `conflictPolicy:`
68 // engine option accept.
69 match self {
70 Self::InvalidInstanceId(e) => std::fmt::Display::fmt(e, f),
71 Self::AlreadyExists(id) => write!(
72 f,
73 "Workflow instance '{id}' already exists. Use conflict policy 'use_existing' or 'terminate_existing' to override, or resume() instead.",
74 ),
75 Self::DefinitionMismatch { expected, found } => write!(
76 f,
77 "Workflow definition mismatch: expected '{expected}', found '{found}'",
78 ),
79 Self::Backend(e) => std::fmt::Display::fmt(e, f),
80 }
81 }
82}
83
84impl std::error::Error for RunConflict {
85 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
86 match self {
87 Self::InvalidInstanceId(e) => Some(e),
88 Self::Backend(e) => Some(e),
89 _ => None,
90 }
91 }
92}
93
94/// Prepare a workflow run while honouring the configured [`ConflictPolicy`].
95///
96/// - **`Fail`** — returns [`RunConflict::AlreadyExists`] if a snapshot for
97/// `instance_id` already exists. Default; prevents the silent-overwrite
98/// footgun where `run()` is called twice with the same id on a parked
99/// workflow.
100/// - **`UseExisting`** — returns the existing instance's current status
101/// without re-executing; idempotent re-entry for clients that retry.
102/// - **`TerminateExisting`** — deletes the existing snapshot + clears
103/// cancel/pause signals, then starts fresh.
104///
105/// Definition-hash mismatches always abort regardless of policy.
106///
107/// On the `Fresh` path the function:
108/// 1. Builds a `WorkflowSnapshot::with_initial_input`,
109/// 2. Positions execution at the first task,
110/// 3. Stores `first_task`'s hint metadata,
111/// 4. Saves the snapshot,
112/// 5. Returns the boxed snapshot for the caller to execute.
113///
114/// # Errors
115///
116/// Returns [`RunConflict`] for policy rejections, definition mismatches,
117/// or backend I/O failures.
118pub async fn prepare_run<B>(
119 instance_id: &str,
120 definition_hash: sayiir_core::DefinitionHash,
121 input_bytes: Bytes,
122 first_task: TaskHint,
123 backend: &B,
124 conflict_policy: ConflictPolicy,
125) -> Result<PrepareRunOutcome, RunConflict>
126where
127 B: SnapshotStore + SignalStore,
128{
129 sayiir_core::validate_instance_id(instance_id)?;
130 match backend.load_snapshot(instance_id).await {
131 Ok(existing) => {
132 if existing.definition_hash != definition_hash {
133 return Err(RunConflict::DefinitionMismatch {
134 expected: definition_hash,
135 found: existing.definition_hash,
136 });
137 }
138 match conflict_policy {
139 ConflictPolicy::Fail => {
140 return Err(RunConflict::AlreadyExists(instance_id.to_string()));
141 }
142 ConflictPolicy::UseExisting => {
143 let output = existing.state.completed_output().cloned();
144 return Ok(PrepareRunOutcome::ExistingStatus(
145 existing.state.as_status(),
146 output,
147 ));
148 }
149 ConflictPolicy::TerminateExisting => {
150 backend.delete_snapshot(instance_id).await?;
151 backend
152 .clear_signal(instance_id, SignalKind::Cancel)
153 .await?;
154 backend.clear_signal(instance_id, SignalKind::Pause).await?;
155 }
156 }
157 }
158 Err(BackendError::NotFound(_)) => {}
159 Err(e) => return Err(e.into()),
160 }
161
162 let mut snapshot =
163 WorkflowSnapshot::with_initial_input(instance_id, definition_hash, input_bytes);
164 snapshot.update_position(ExecutionPosition::AtTask {
165 task_id: first_task.id,
166 });
167 snapshot.set_task_hint(&first_task);
168 backend.save_snapshot(&mut snapshot).await?;
169 Ok(PrepareRunOutcome::Fresh(Box::new(snapshot)))
170}