Skip to main content

sayiir_runtime/execution/
lifecycle.rs

1//! Workflow lifecycle: prepare, resume, and finalize.
2
3use bytes::Bytes;
4use sayiir_core::error::WorkflowError;
5use sayiir_core::snapshot::{
6    ExecutionPosition, SignalKind, TaskHint, WorkflowSnapshot, WorkflowSnapshotState,
7};
8use sayiir_core::workflow::{ConflictPolicy, WorkflowStatus};
9use sayiir_persistence::{SignalStore, SnapshotStore};
10
11use super::helpers::ResumeParkedPosition;
12use crate::error::RuntimeError;
13
14/// Outcome of [`prepare_run`] when the conflict policy allows early return.
15#[derive(Debug)]
16pub enum PrepareRunOutcome {
17    /// A fresh snapshot was created — proceed with execution.
18    Fresh(Box<WorkflowSnapshot>),
19    /// The instance already exists and the policy says to reuse it.
20    ExistingStatus(WorkflowStatus, Option<Bytes>),
21}
22
23/// Check for an existing instance before encoding input.
24///
25/// For **`Fail`** and **`UseExisting`** policies, this avoids unnecessary codec
26/// work by checking the backend before the caller serialises the workflow input.
27///
28/// Returns `Ok(Some((status, output)))` when the caller should return early
29/// (instance exists and the policy says to reuse it, or the policy rejects the
30/// duplicate).  Returns `Ok(None)` when the caller should proceed to encode
31/// input and call [`prepare_run`].
32///
33/// **`TerminateExisting`** always returns `Ok(None)` — the actual cleanup is
34/// deferred to [`prepare_run`].
35///
36/// # Errors
37/// Returns [`RuntimeError::InstanceAlreadyExists`] for `Fail` when the
38/// instance already exists, [`WorkflowError::DefinitionMismatch`] when the
39/// existing snapshot has a different definition hash, or propagates backend
40/// I/O errors.
41pub async fn check_existing_instance<B>(
42    instance_id: &str,
43    definition_hash: &str,
44    backend: &B,
45    conflict_policy: ConflictPolicy,
46) -> Result<Option<(WorkflowStatus, Option<Bytes>)>, RuntimeError>
47where
48    B: SnapshotStore,
49{
50    if matches!(conflict_policy, ConflictPolicy::TerminateExisting) {
51        return Ok(None);
52    }
53    match backend.load_snapshot(instance_id).await {
54        Ok(existing) => {
55            if existing.definition_hash != definition_hash {
56                return Err(WorkflowError::DefinitionMismatch {
57                    expected: definition_hash.to_string(),
58                    found: existing.definition_hash.clone(),
59                }
60                .into());
61            }
62            match conflict_policy {
63                ConflictPolicy::Fail => {
64                    Err(RuntimeError::InstanceAlreadyExists(instance_id.to_string()))
65                }
66                ConflictPolicy::UseExisting => {
67                    let output = existing.state.completed_output().cloned();
68                    let status = existing.state.as_status();
69                    Ok(Some((status, output)))
70                }
71                ConflictPolicy::TerminateExisting => unreachable!(),
72            }
73        }
74        Err(sayiir_persistence::BackendError::NotFound(_)) => Ok(None),
75        Err(e) => Err(e.into()),
76    }
77}
78
79/// Prepare a fresh workflow run: create initial snapshot and save it.
80///
81/// When called after [`check_existing_instance`] (the recommended path), set
82/// `prechecked` to `true` to skip the redundant `load_snapshot`.  The only
83/// policy that still needs a backend read is **`TerminateExisting`** (cleanup
84/// of the old snapshot), which is handled regardless of the flag.
85///
86/// When `prechecked` is `false` the function performs the full existence check
87/// internally as a safety net — this is useful for callers that cannot
88/// guarantee a prior call to `check_existing_instance`.
89///
90/// # Errors
91/// Returns an error if saving the initial snapshot fails or the conflict policy
92/// rejects the duplicate.
93#[tracing::instrument(
94    name = "lifecycle.prepare_run",
95    skip(input_bytes, backend),
96    fields(%instance_id),
97)]
98pub async fn prepare_run<B>(
99    instance_id: String,
100    definition_hash: String,
101    input_bytes: Bytes,
102    first_task: TaskHint,
103    backend: &B,
104    conflict_policy: ConflictPolicy,
105    prechecked: bool,
106) -> Result<PrepareRunOutcome, RuntimeError>
107where
108    B: SnapshotStore + SignalStore,
109{
110    tracing::debug!("preparing fresh workflow run");
111
112    // When prechecked == true the caller already verified via
113    // check_existing_instance that the instance doesn't exist (Fail /
114    // UseExisting) or that cleanup is needed (TerminateExisting).  We only
115    // need the backend round-trip for TerminateExisting cleanup.
116    if prechecked {
117        if matches!(conflict_policy, ConflictPolicy::TerminateExisting) {
118            // Best-effort cleanup — if nothing exists the deletes are no-ops.
119            match backend.load_snapshot(&instance_id).await {
120                Ok(_existing) => {
121                    tracing::info!("terminating existing instance before restart");
122                    backend.delete_snapshot(&instance_id).await?;
123                    backend
124                        .clear_signal(&instance_id, SignalKind::Cancel)
125                        .await?;
126                    backend
127                        .clear_signal(&instance_id, SignalKind::Pause)
128                        .await?;
129                }
130                Err(sayiir_persistence::BackendError::NotFound(_)) => {}
131                Err(e) => return Err(e.into()),
132            }
133        }
134    } else {
135        // Full safety-net path — check for an existing snapshot.
136        match backend.load_snapshot(&instance_id).await {
137            Ok(existing) => {
138                if existing.definition_hash != definition_hash {
139                    return Err(WorkflowError::DefinitionMismatch {
140                        expected: definition_hash,
141                        found: existing.definition_hash.clone(),
142                    }
143                    .into());
144                }
145                match conflict_policy {
146                    ConflictPolicy::Fail => {
147                        return Err(RuntimeError::InstanceAlreadyExists(instance_id));
148                    }
149                    ConflictPolicy::UseExisting => {
150                        let output = existing.state.completed_output().cloned();
151                        let status = existing.state.as_status();
152                        return Ok(PrepareRunOutcome::ExistingStatus(status, output));
153                    }
154                    ConflictPolicy::TerminateExisting => {
155                        tracing::info!("terminating existing instance before restart");
156                        backend.delete_snapshot(&instance_id).await?;
157                        backend
158                            .clear_signal(&instance_id, SignalKind::Cancel)
159                            .await?;
160                        backend
161                            .clear_signal(&instance_id, SignalKind::Pause)
162                            .await?;
163                    }
164                }
165            }
166            Err(sayiir_persistence::BackendError::NotFound(_)) => {
167                // No existing snapshot — proceed normally
168            }
169            Err(e) => return Err(e.into()),
170        }
171    }
172
173    let mut snapshot =
174        WorkflowSnapshot::with_initial_input(instance_id, definition_hash, input_bytes);
175    #[cfg(feature = "otel")]
176    {
177        snapshot.trace_parent = crate::trace_context::current_trace_parent();
178    }
179    snapshot.update_position(ExecutionPosition::AtTask {
180        task_id: first_task.id.clone(),
181    });
182    snapshot.set_task_hint(&first_task);
183    backend.save_snapshot(&snapshot).await?;
184    Ok(PrepareRunOutcome::Fresh(Box::new(snapshot)))
185}
186
187/// Prepare to resume a workflow from a saved snapshot.
188///
189/// Loads the snapshot, validates the definition hash, checks for terminal states,
190/// and determines the correct resume input.
191///
192/// Returns `Ok(Some((snapshot, input)))` if the workflow can be resumed,
193/// or `Ok(None)` with the terminal status if the workflow is already done.
194///
195/// # Errors
196/// Returns an error if the snapshot cannot be loaded or the definition hash mismatches.
197#[tracing::instrument(
198    name = "lifecycle.prepare_resume",
199    skip(backend),
200    fields(%instance_id),
201)]
202pub async fn prepare_resume<B>(
203    instance_id: &str,
204    definition_hash: &str,
205    backend: &B,
206) -> Result<ResumeOutcome, RuntimeError>
207where
208    B: SignalStore,
209{
210    tracing::debug!("preparing workflow resume");
211    let mut snapshot = backend.load_snapshot(instance_id).await?;
212
213    // Validate definition hash
214    if snapshot.definition_hash != definition_hash {
215        return Err(WorkflowError::DefinitionMismatch {
216            expected: definition_hash.to_string(),
217            found: snapshot.definition_hash.clone(),
218        }
219        .into());
220    }
221
222    // Check if already in terminal state
223    if let Some(status) = snapshot.state.as_terminal_status() {
224        if snapshot.state.is_paused() {
225            return Ok(ResumeOutcome::Paused(status));
226        }
227        return Ok(ResumeOutcome::AlreadyTerminal(status));
228    }
229
230    // Resolve any parked position (delay / signal / fork) before resuming.
231    // This consumes buffered signals, checks delay expiry, etc. and updates
232    // the snapshot so get_resume_input picks up the correct value.
233    let parked = ResumeParkedPosition::extract(&snapshot);
234    if let Some(status) = parked.resolve(&mut snapshot, instance_id, backend).await? {
235        return Ok(ResumeOutcome::NotReady(status));
236    }
237
238    // Determine resume input (after resolve, so signal payloads are reflected)
239    let input_bytes = get_resume_input(&snapshot)?;
240    Ok(ResumeOutcome::Ready {
241        snapshot: Box::new(snapshot),
242        input_bytes,
243    })
244}
245
246/// Outcome of [`prepare_resume`].
247#[derive(Debug)]
248pub enum ResumeOutcome {
249    /// Workflow can be resumed with this snapshot and input.
250    Ready {
251        /// The loaded snapshot (in-progress state).
252        snapshot: Box<WorkflowSnapshot>,
253        /// The input bytes for the next task.
254        input_bytes: Bytes,
255    },
256    /// Workflow is already in a terminal state.
257    AlreadyTerminal(WorkflowStatus),
258    /// Workflow is paused (not terminal, but cannot execute until unpaused).
259    Paused(WorkflowStatus),
260    /// Parked position not yet ready (delay not expired, signal not arrived, etc.).
261    NotReady(WorkflowStatus),
262}
263
264/// Get the input for resuming execution from a snapshot.
265///
266/// Uses the last completed task's output, or the initial input if no tasks
267/// have completed yet.
268///
269/// # Errors
270/// Returns an error if no resume input can be determined.
271pub fn get_resume_input(snapshot: &WorkflowSnapshot) -> Result<Bytes, RuntimeError> {
272    match &snapshot.state {
273        WorkflowSnapshotState::InProgress {
274            completed_tasks, ..
275        } => {
276            if completed_tasks.is_empty() {
277                snapshot.initial_input_bytes().ok_or_else(|| {
278                    WorkflowError::ResumeError(
279                        "no completed tasks and initial input not stored".into(),
280                    )
281                    .into()
282                })
283            } else {
284                snapshot.get_last_task_output().ok_or_else(|| {
285                    WorkflowError::ResumeError("no task results available".into()).into()
286                })
287            }
288        }
289        _ => Err(WorkflowError::ResumeError("workflow not in progress".into()).into()),
290    }
291}
292
293/// Finalize a workflow execution, converting the result to a [`WorkflowStatus`].
294///
295/// On success, marks the workflow as completed in the snapshot and returns the
296/// output bytes alongside the status.
297/// On cancellation error, returns `Cancelled` status with details from the backend.
298/// On other errors, marks the workflow as failed.
299///
300/// This mirrors `CheckpointingRunner::handle_execution_result`.
301///
302/// # Errors
303/// Returns an error if saving the snapshot to the backend fails.
304#[tracing::instrument(
305    name = "lifecycle.finalize",
306    skip_all,
307    fields(instance_id = %snapshot.instance_id),
308)]
309pub async fn finalize_execution<B>(
310    result: Result<Bytes, RuntimeError>,
311    snapshot: &mut WorkflowSnapshot,
312    backend: &B,
313) -> Result<(WorkflowStatus, Option<Bytes>), RuntimeError>
314where
315    B: SnapshotStore,
316{
317    tracing::debug!("finalizing workflow execution");
318    match result {
319        Ok(output) => {
320            tracing::info!(instance_id = %snapshot.instance_id, "workflow completed");
321            snapshot.mark_completed(output.clone());
322            backend.save_snapshot(snapshot).await?;
323            Ok((WorkflowStatus::Completed, Some(output)))
324        }
325        Err(RuntimeError::Workflow(WorkflowError::Waiting { wake_at })) => {
326            let delay_id = match &snapshot.state {
327                WorkflowSnapshotState::InProgress {
328                    position: ExecutionPosition::AtDelay { delay_id, .. },
329                    ..
330                } => delay_id.clone(),
331                WorkflowSnapshotState::InProgress {
332                    position: ExecutionPosition::AtFork { fork_id, .. },
333                    ..
334                } => fork_id.clone(),
335                _ => String::new(),
336            };
337            tracing::info!(
338                instance_id = %snapshot.instance_id,
339                %delay_id,
340                %wake_at,
341                "workflow parked at delay"
342            );
343            Ok((WorkflowStatus::Waiting { wake_at, delay_id }, None))
344        }
345        Err(RuntimeError::Workflow(WorkflowError::AwaitingSignal {
346            signal_id,
347            signal_name,
348            wake_at,
349        })) => {
350            tracing::info!(
351                instance_id = %snapshot.instance_id,
352                %signal_id,
353                %signal_name,
354                ?wake_at,
355                "workflow parked at signal"
356            );
357            Ok((
358                WorkflowStatus::AwaitingSignal {
359                    signal_id,
360                    signal_name,
361                    wake_at,
362                },
363                None,
364            ))
365        }
366        Err(RuntimeError::Workflow(WorkflowError::Cancelled { .. })) => {
367            tracing::info!(instance_id = %snapshot.instance_id, "workflow cancelled");
368            // Reload snapshot to get cancellation details (set by check_and_cancel)
369            if let Ok(cancelled_snapshot) = backend.load_snapshot(&snapshot.instance_id).await
370                && let Some((reason, cancelled_by)) =
371                    cancelled_snapshot.state.cancellation_details()
372            {
373                return Ok((
374                    WorkflowStatus::Cancelled {
375                        reason,
376                        cancelled_by,
377                    },
378                    None,
379                ));
380            }
381            // Fallback if we couldn't get details
382            Ok((
383                WorkflowStatus::Cancelled {
384                    reason: None,
385                    cancelled_by: None,
386                },
387                None,
388            ))
389        }
390        Err(RuntimeError::Workflow(WorkflowError::Paused { .. })) => {
391            tracing::info!(instance_id = %snapshot.instance_id, "workflow paused");
392            // Reload snapshot to get pause details (set by check_and_pause)
393            if let Ok(paused_snapshot) = backend.load_snapshot(&snapshot.instance_id).await
394                && let Some((reason, paused_by)) = paused_snapshot.state.pause_details()
395            {
396                return Ok((WorkflowStatus::Paused { reason, paused_by }, None));
397            }
398            Ok((
399                WorkflowStatus::Paused {
400                    reason: None,
401                    paused_by: None,
402                },
403                None,
404            ))
405        }
406        Err(e) => {
407            tracing::error!(instance_id = %snapshot.instance_id, error = %e, "workflow failed");
408            snapshot.mark_failed(e.to_string());
409            let _ = backend.save_snapshot(snapshot).await;
410            Ok((WorkflowStatus::Failed(e.to_string()), None))
411        }
412    }
413}