sayiir_runtime/execution/
lifecycle.rs1use bytes::Bytes;
4use sayiir_core::error::WorkflowError;
5use sayiir_core::snapshot::{
6 ExecutionPosition, SignalKind, TaskHint, WorkflowSnapshot, WorkflowSnapshotState,
7};
8use sayiir_core::workflow::{ConflictPolicy, WorkflowStatus};
9use sayiir_persistence::{SignalStore, SnapshotStore};
10
11use super::helpers::ResumeParkedPosition;
12use crate::error::RuntimeError;
13
14#[derive(Debug)]
16pub enum PrepareRunOutcome {
17 Fresh(Box<WorkflowSnapshot>),
19 ExistingStatus(WorkflowStatus, Option<Bytes>),
21}
22
23pub async fn check_existing_instance<B>(
42 instance_id: &str,
43 definition_hash: &str,
44 backend: &B,
45 conflict_policy: ConflictPolicy,
46) -> Result<Option<(WorkflowStatus, Option<Bytes>)>, RuntimeError>
47where
48 B: SnapshotStore,
49{
50 if matches!(conflict_policy, ConflictPolicy::TerminateExisting) {
51 return Ok(None);
52 }
53 match backend.load_snapshot(instance_id).await {
54 Ok(existing) => {
55 if existing.definition_hash != definition_hash {
56 return Err(WorkflowError::DefinitionMismatch {
57 expected: definition_hash.to_string(),
58 found: existing.definition_hash.clone(),
59 }
60 .into());
61 }
62 match conflict_policy {
63 ConflictPolicy::Fail => {
64 Err(RuntimeError::InstanceAlreadyExists(instance_id.to_string()))
65 }
66 ConflictPolicy::UseExisting => {
67 let output = existing.state.completed_output().cloned();
68 let status = existing.state.as_status();
69 Ok(Some((status, output)))
70 }
71 ConflictPolicy::TerminateExisting => unreachable!(),
72 }
73 }
74 Err(sayiir_persistence::BackendError::NotFound(_)) => Ok(None),
75 Err(e) => Err(e.into()),
76 }
77}
78
79#[tracing::instrument(
94 name = "lifecycle.prepare_run",
95 skip(input_bytes, backend),
96 fields(%instance_id),
97)]
98pub async fn prepare_run<B>(
99 instance_id: String,
100 definition_hash: String,
101 input_bytes: Bytes,
102 first_task: TaskHint,
103 backend: &B,
104 conflict_policy: ConflictPolicy,
105 prechecked: bool,
106) -> Result<PrepareRunOutcome, RuntimeError>
107where
108 B: SnapshotStore + SignalStore,
109{
110 tracing::debug!("preparing fresh workflow run");
111
112 if prechecked {
117 if matches!(conflict_policy, ConflictPolicy::TerminateExisting) {
118 match backend.load_snapshot(&instance_id).await {
120 Ok(_existing) => {
121 tracing::info!("terminating existing instance before restart");
122 backend.delete_snapshot(&instance_id).await?;
123 backend
124 .clear_signal(&instance_id, SignalKind::Cancel)
125 .await?;
126 backend
127 .clear_signal(&instance_id, SignalKind::Pause)
128 .await?;
129 }
130 Err(sayiir_persistence::BackendError::NotFound(_)) => {}
131 Err(e) => return Err(e.into()),
132 }
133 }
134 } else {
135 match backend.load_snapshot(&instance_id).await {
137 Ok(existing) => {
138 if existing.definition_hash != definition_hash {
139 return Err(WorkflowError::DefinitionMismatch {
140 expected: definition_hash,
141 found: existing.definition_hash.clone(),
142 }
143 .into());
144 }
145 match conflict_policy {
146 ConflictPolicy::Fail => {
147 return Err(RuntimeError::InstanceAlreadyExists(instance_id));
148 }
149 ConflictPolicy::UseExisting => {
150 let output = existing.state.completed_output().cloned();
151 let status = existing.state.as_status();
152 return Ok(PrepareRunOutcome::ExistingStatus(status, output));
153 }
154 ConflictPolicy::TerminateExisting => {
155 tracing::info!("terminating existing instance before restart");
156 backend.delete_snapshot(&instance_id).await?;
157 backend
158 .clear_signal(&instance_id, SignalKind::Cancel)
159 .await?;
160 backend
161 .clear_signal(&instance_id, SignalKind::Pause)
162 .await?;
163 }
164 }
165 }
166 Err(sayiir_persistence::BackendError::NotFound(_)) => {
167 }
169 Err(e) => return Err(e.into()),
170 }
171 }
172
173 let mut snapshot =
174 WorkflowSnapshot::with_initial_input(instance_id, definition_hash, input_bytes);
175 #[cfg(feature = "otel")]
176 {
177 snapshot.trace_parent = crate::trace_context::current_trace_parent();
178 }
179 snapshot.update_position(ExecutionPosition::AtTask {
180 task_id: first_task.id.clone(),
181 });
182 snapshot.set_task_hint(&first_task);
183 backend.save_snapshot(&snapshot).await?;
184 Ok(PrepareRunOutcome::Fresh(Box::new(snapshot)))
185}
186
187#[tracing::instrument(
198 name = "lifecycle.prepare_resume",
199 skip(backend),
200 fields(%instance_id),
201)]
202pub async fn prepare_resume<B>(
203 instance_id: &str,
204 definition_hash: &str,
205 backend: &B,
206) -> Result<ResumeOutcome, RuntimeError>
207where
208 B: SignalStore,
209{
210 tracing::debug!("preparing workflow resume");
211 let mut snapshot = backend.load_snapshot(instance_id).await?;
212
213 if snapshot.definition_hash != definition_hash {
215 return Err(WorkflowError::DefinitionMismatch {
216 expected: definition_hash.to_string(),
217 found: snapshot.definition_hash.clone(),
218 }
219 .into());
220 }
221
222 if let Some(status) = snapshot.state.as_terminal_status() {
224 if snapshot.state.is_paused() {
225 return Ok(ResumeOutcome::Paused(status));
226 }
227 return Ok(ResumeOutcome::AlreadyTerminal(status));
228 }
229
230 let parked = ResumeParkedPosition::extract(&snapshot);
234 if let Some(status) = parked.resolve(&mut snapshot, instance_id, backend).await? {
235 return Ok(ResumeOutcome::NotReady(status));
236 }
237
238 let input_bytes = get_resume_input(&snapshot)?;
240 Ok(ResumeOutcome::Ready {
241 snapshot: Box::new(snapshot),
242 input_bytes,
243 })
244}
245
246#[derive(Debug)]
248pub enum ResumeOutcome {
249 Ready {
251 snapshot: Box<WorkflowSnapshot>,
253 input_bytes: Bytes,
255 },
256 AlreadyTerminal(WorkflowStatus),
258 Paused(WorkflowStatus),
260 NotReady(WorkflowStatus),
262}
263
264pub fn get_resume_input(snapshot: &WorkflowSnapshot) -> Result<Bytes, RuntimeError> {
272 match &snapshot.state {
273 WorkflowSnapshotState::InProgress {
274 completed_tasks, ..
275 } => {
276 if completed_tasks.is_empty() {
277 snapshot.initial_input_bytes().ok_or_else(|| {
278 WorkflowError::ResumeError(
279 "no completed tasks and initial input not stored".into(),
280 )
281 .into()
282 })
283 } else {
284 snapshot.get_last_task_output().ok_or_else(|| {
285 WorkflowError::ResumeError("no task results available".into()).into()
286 })
287 }
288 }
289 _ => Err(WorkflowError::ResumeError("workflow not in progress".into()).into()),
290 }
291}
292
293#[tracing::instrument(
305 name = "lifecycle.finalize",
306 skip_all,
307 fields(instance_id = %snapshot.instance_id),
308)]
309pub async fn finalize_execution<B>(
310 result: Result<Bytes, RuntimeError>,
311 snapshot: &mut WorkflowSnapshot,
312 backend: &B,
313) -> Result<(WorkflowStatus, Option<Bytes>), RuntimeError>
314where
315 B: SnapshotStore,
316{
317 tracing::debug!("finalizing workflow execution");
318 match result {
319 Ok(output) => {
320 tracing::info!(instance_id = %snapshot.instance_id, "workflow completed");
321 snapshot.mark_completed(output.clone());
322 backend.save_snapshot(snapshot).await?;
323 Ok((WorkflowStatus::Completed, Some(output)))
324 }
325 Err(RuntimeError::Workflow(WorkflowError::Waiting { wake_at })) => {
326 let delay_id = match &snapshot.state {
327 WorkflowSnapshotState::InProgress {
328 position: ExecutionPosition::AtDelay { delay_id, .. },
329 ..
330 } => delay_id.clone(),
331 WorkflowSnapshotState::InProgress {
332 position: ExecutionPosition::AtFork { fork_id, .. },
333 ..
334 } => fork_id.clone(),
335 _ => String::new(),
336 };
337 tracing::info!(
338 instance_id = %snapshot.instance_id,
339 %delay_id,
340 %wake_at,
341 "workflow parked at delay"
342 );
343 Ok((WorkflowStatus::Waiting { wake_at, delay_id }, None))
344 }
345 Err(RuntimeError::Workflow(WorkflowError::AwaitingSignal {
346 signal_id,
347 signal_name,
348 wake_at,
349 })) => {
350 tracing::info!(
351 instance_id = %snapshot.instance_id,
352 %signal_id,
353 %signal_name,
354 ?wake_at,
355 "workflow parked at signal"
356 );
357 Ok((
358 WorkflowStatus::AwaitingSignal {
359 signal_id,
360 signal_name,
361 wake_at,
362 },
363 None,
364 ))
365 }
366 Err(RuntimeError::Workflow(WorkflowError::Cancelled { .. })) => {
367 tracing::info!(instance_id = %snapshot.instance_id, "workflow cancelled");
368 if let Ok(cancelled_snapshot) = backend.load_snapshot(&snapshot.instance_id).await
370 && let Some((reason, cancelled_by)) =
371 cancelled_snapshot.state.cancellation_details()
372 {
373 return Ok((
374 WorkflowStatus::Cancelled {
375 reason,
376 cancelled_by,
377 },
378 None,
379 ));
380 }
381 Ok((
383 WorkflowStatus::Cancelled {
384 reason: None,
385 cancelled_by: None,
386 },
387 None,
388 ))
389 }
390 Err(RuntimeError::Workflow(WorkflowError::Paused { .. })) => {
391 tracing::info!(instance_id = %snapshot.instance_id, "workflow paused");
392 if let Ok(paused_snapshot) = backend.load_snapshot(&snapshot.instance_id).await
394 && let Some((reason, paused_by)) = paused_snapshot.state.pause_details()
395 {
396 return Ok((WorkflowStatus::Paused { reason, paused_by }, None));
397 }
398 Ok((
399 WorkflowStatus::Paused {
400 reason: None,
401 paused_by: None,
402 },
403 None,
404 ))
405 }
406 Err(e) => {
407 tracing::error!(instance_id = %snapshot.instance_id, error = %e, "workflow failed");
408 snapshot.mark_failed(e.to_string());
409 let _ = backend.save_snapshot(snapshot).await;
410 Ok((WorkflowStatus::Failed(e.to_string()), None))
411 }
412 }
413}