Skip to main content

sayiir_persistence/
lifecycle.rs

1//! Workflow-lifecycle primitives shared across runtimes.
2//!
3//! [`prepare_run`] handles the snapshot-existence check + conflict-policy
4//! resolution that every binding needs to do at the start of a `run()`.
5//! The runtime (`sayiir-runtime`) and the Cloudflare Workers binding
6//! (`sayiir-cloudflare`) both used to carry their own copies of this
7//! logic; this module is the de-duplicated home.
8//!
9//! The error type [`RunConflict`] carries the three reasons a run can be
10//! rejected (`AlreadyExists`, `DefinitionMismatch`, backend I/O). Callers
11//! convert it into their own error envelopes via `From<RunConflict>`.
12
13use bytes::Bytes;
14use sayiir_core::snapshot::{ExecutionPosition, SignalKind, TaskHint, WorkflowSnapshot};
15use sayiir_core::workflow::{ConflictPolicy, WorkflowStatus};
16
17use crate::{BackendError, SignalStore, SnapshotStore};
18
19/// Outcome of [`prepare_run`].
20#[derive(Debug)]
21pub enum PrepareRunOutcome {
22    /// Snapshot is fresh — execute the workflow from this state.
23    ///
24    /// Boxed so the enum's size is dominated by `ExistingStatus` (the
25    /// cheap variant). `WorkflowSnapshot` is large enough that clippy's
26    /// `large_enum_variant` lint trips otherwise; the caller unboxes once
27    /// at the match site so there's no per-task allocation overhead.
28    Fresh(Box<WorkflowSnapshot>),
29    /// Existing instance reused under `UseExisting`. Caller must return
30    /// the carried status without executing.
31    ExistingStatus(WorkflowStatus, Option<Bytes>),
32}
33
34/// Reasons [`prepare_run`] may reject a call.
35#[derive(Debug)]
36pub enum RunConflict {
37    /// The supplied `instance_id` failed the API-boundary length check.
38    InvalidInstanceId(sayiir_core::InvalidInstanceId),
39    /// `Fail` policy and the instance id is already in use.
40    AlreadyExists(String),
41    /// The existing snapshot was produced from a different workflow definition.
42    DefinitionMismatch {
43        /// Definition hash the caller expected.
44        expected: sayiir_core::DefinitionHash,
45        /// Definition hash actually stored.
46        found: sayiir_core::DefinitionHash,
47    },
48    /// Backend I/O error.
49    Backend(BackendError),
50}
51
52impl From<BackendError> for RunConflict {
53    fn from(e: BackendError) -> Self {
54        Self::Backend(e)
55    }
56}
57
58impl From<sayiir_core::InvalidInstanceId> for RunConflict {
59    fn from(e: sayiir_core::InvalidInstanceId) -> Self {
60        Self::InvalidInstanceId(e)
61    }
62}
63
64impl std::fmt::Display for RunConflict {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        // Names are the canonical strum-serialized forms, which both the
67        // Rust `ConflictPolicy::*` variants and the JS `conflictPolicy:`
68        // engine option accept.
69        match self {
70            Self::InvalidInstanceId(e) => std::fmt::Display::fmt(e, f),
71            Self::AlreadyExists(id) => write!(
72                f,
73                "Workflow instance '{id}' already exists. Use conflict policy 'use_existing' or 'terminate_existing' to override, or resume() instead.",
74            ),
75            Self::DefinitionMismatch { expected, found } => write!(
76                f,
77                "Workflow definition mismatch: expected '{expected}', found '{found}'",
78            ),
79            Self::Backend(e) => std::fmt::Display::fmt(e, f),
80        }
81    }
82}
83
84impl std::error::Error for RunConflict {
85    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
86        match self {
87            Self::InvalidInstanceId(e) => Some(e),
88            Self::Backend(e) => Some(e),
89            _ => None,
90        }
91    }
92}
93
94/// Prepare a workflow run while honouring the configured [`ConflictPolicy`].
95///
96/// - **`Fail`** — returns [`RunConflict::AlreadyExists`] if a snapshot for
97///   `instance_id` already exists. Default; prevents the silent-overwrite
98///   footgun where `run()` is called twice with the same id on a parked
99///   workflow.
100/// - **`UseExisting`** — returns the existing instance's current status
101///   without re-executing; idempotent re-entry for clients that retry.
102/// - **`TerminateExisting`** — deletes the existing snapshot + clears
103///   cancel/pause signals, then starts fresh.
104///
105/// Definition-hash mismatches always abort regardless of policy.
106///
107/// On the `Fresh` path the function:
108/// 1. Builds a `WorkflowSnapshot::with_initial_input`,
109/// 2. Positions execution at the first task,
110/// 3. Stores `first_task`'s hint metadata,
111/// 4. Saves the snapshot,
112/// 5. Returns the boxed snapshot for the caller to execute.
113///
114/// # Errors
115///
116/// Returns [`RunConflict`] for policy rejections, definition mismatches,
117/// or backend I/O failures.
118pub async fn prepare_run<B>(
119    instance_id: &str,
120    definition_hash: sayiir_core::DefinitionHash,
121    input_bytes: Bytes,
122    first_task: TaskHint,
123    backend: &B,
124    conflict_policy: ConflictPolicy,
125) -> Result<PrepareRunOutcome, RunConflict>
126where
127    B: SnapshotStore + SignalStore,
128{
129    sayiir_core::validate_instance_id(instance_id)?;
130    match backend.load_snapshot(instance_id).await {
131        Ok(existing) => {
132            if existing.definition_hash != definition_hash {
133                return Err(RunConflict::DefinitionMismatch {
134                    expected: definition_hash,
135                    found: existing.definition_hash,
136                });
137            }
138            match conflict_policy {
139                ConflictPolicy::Fail => {
140                    return Err(RunConflict::AlreadyExists(instance_id.to_string()));
141                }
142                ConflictPolicy::UseExisting => {
143                    let output = existing.state.completed_output().cloned();
144                    return Ok(PrepareRunOutcome::ExistingStatus(
145                        existing.state.as_status(),
146                        output,
147                    ));
148                }
149                ConflictPolicy::TerminateExisting => {
150                    backend.delete_snapshot(instance_id).await?;
151                    backend
152                        .clear_signal(instance_id, SignalKind::Cancel)
153                        .await?;
154                    backend.clear_signal(instance_id, SignalKind::Pause).await?;
155                }
156            }
157        }
158        Err(BackendError::NotFound(_)) => {}
159        Err(e) => return Err(e.into()),
160    }
161
162    let mut snapshot =
163        WorkflowSnapshot::with_initial_input(instance_id, definition_hash, input_bytes);
164    snapshot.update_position(ExecutionPosition::AtTask {
165        task_id: first_task.id,
166    });
167    snapshot.set_task_hint(&first_task);
168    backend.save_snapshot(&mut snapshot).await?;
169    Ok(PrepareRunOutcome::Fresh(Box::new(snapshot)))
170}