use bytes::Bytes;
use sayiir_core::error::WorkflowError;
use sayiir_core::snapshot::{
ExecutionPosition, SignalKind, TaskHint, WorkflowSnapshot, WorkflowSnapshotState,
};
use sayiir_core::workflow::{ConflictPolicy, WorkflowStatus};
pub use sayiir_persistence::PrepareRunOutcome;
use sayiir_persistence::{SignalStore, SnapshotStore};
use super::helpers::ResumeParkedPosition;
use crate::error::RuntimeError;
pub async fn check_existing_instance<B>(
instance_id: &str,
definition_hash: &sayiir_core::DefinitionHash,
backend: &B,
conflict_policy: ConflictPolicy,
) -> Result<Option<(WorkflowStatus, Option<Bytes>)>, RuntimeError>
where
B: SnapshotStore,
{
sayiir_core::validate_instance_id(instance_id)?;
if matches!(conflict_policy, ConflictPolicy::TerminateExisting) {
return Ok(None);
}
match backend.load_snapshot(instance_id).await {
Ok(existing) => {
if existing.definition_hash != *definition_hash {
return Err(WorkflowError::DefinitionMismatch {
expected: *definition_hash,
found: existing.definition_hash,
}
.into());
}
match conflict_policy {
ConflictPolicy::Fail => {
Err(RuntimeError::InstanceAlreadyExists(instance_id.to_string()))
}
ConflictPolicy::UseExisting => {
let output = existing.state.completed_output().cloned();
let status = existing.state.as_status();
Ok(Some((status, output)))
}
ConflictPolicy::TerminateExisting => unreachable!(),
}
}
Err(sayiir_persistence::BackendError::NotFound(_)) => Ok(None),
Err(e) => Err(e.into()),
}
}
#[tracing::instrument(
name = "lifecycle.prepare_run",
skip(input_bytes, backend),
fields(%instance_id),
)]
pub async fn prepare_run<B>(
instance_id: &str,
definition_hash: sayiir_core::DefinitionHash,
input_bytes: Bytes,
first_task: TaskHint,
backend: &B,
conflict_policy: ConflictPolicy,
) -> Result<PrepareRunOutcome, RuntimeError>
where
B: SnapshotStore + SignalStore,
{
sayiir_core::validate_instance_id(instance_id)?;
if matches!(conflict_policy, ConflictPolicy::TerminateExisting) {
match backend.load_snapshot(instance_id).await {
Ok(_existing) => {
tracing::info!("terminating existing instance before restart");
backend.delete_snapshot(instance_id).await?;
backend
.clear_signal(instance_id, SignalKind::Cancel)
.await?;
backend.clear_signal(instance_id, SignalKind::Pause).await?;
}
Err(sayiir_persistence::BackendError::NotFound(_)) => {}
Err(e) => return Err(e.into()),
}
}
let mut snapshot =
WorkflowSnapshot::with_initial_input(instance_id, definition_hash, input_bytes);
#[cfg(feature = "otel")]
{
snapshot.trace_parent = crate::trace_context::current_trace_parent();
}
snapshot.update_position(ExecutionPosition::AtTask {
task_id: first_task.id,
});
snapshot.set_task_hint(&first_task);
backend.save_snapshot(&mut snapshot).await?;
Ok(PrepareRunOutcome::Fresh(Box::new(snapshot)))
}
#[tracing::instrument(
name = "lifecycle.prepare_resume",
skip(backend),
fields(%instance_id),
)]
pub async fn prepare_resume<B>(
instance_id: &str,
definition_hash: &sayiir_core::DefinitionHash,
backend: &B,
) -> Result<ResumeOutcome, RuntimeError>
where
B: SignalStore,
{
sayiir_core::validate_instance_id(instance_id)?;
tracing::debug!("preparing workflow resume");
let mut snapshot = backend.load_snapshot(instance_id).await?;
if snapshot.definition_hash != *definition_hash {
return Err(WorkflowError::DefinitionMismatch {
expected: *definition_hash,
found: snapshot.definition_hash,
}
.into());
}
if let Some(status) = snapshot.state.as_terminal_status() {
if snapshot.state.is_paused() {
return Ok(ResumeOutcome::Paused(status));
}
return Ok(ResumeOutcome::AlreadyTerminal(status));
}
let parked = ResumeParkedPosition::extract(&snapshot);
if let Some(status) = parked.resolve(&mut snapshot, instance_id, backend).await? {
return Ok(ResumeOutcome::NotReady(status));
}
let input_bytes = get_resume_input(&snapshot)?;
Ok(ResumeOutcome::Ready {
snapshot: Box::new(snapshot),
input_bytes,
})
}
#[derive(Debug)]
pub enum ResumeOutcome {
Ready {
snapshot: Box<WorkflowSnapshot>,
input_bytes: Bytes,
},
AlreadyTerminal(WorkflowStatus),
Paused(WorkflowStatus),
NotReady(WorkflowStatus),
}
pub fn get_resume_input(snapshot: &WorkflowSnapshot) -> Result<Bytes, RuntimeError> {
match &snapshot.state {
WorkflowSnapshotState::InProgress {
completed_tasks, ..
} => {
if completed_tasks.is_empty() {
snapshot.initial_input_bytes().ok_or_else(|| {
WorkflowError::ResumeError(
"no completed tasks and initial input not stored".into(),
)
.into()
})
} else {
snapshot.get_last_task_output().ok_or_else(|| {
WorkflowError::ResumeError("no task results available".into()).into()
})
}
}
_ => Err(WorkflowError::ResumeError("workflow not in progress".into()).into()),
}
}
#[tracing::instrument(
name = "lifecycle.finalize",
skip_all,
fields(instance_id = %snapshot.instance_id),
)]
pub async fn finalize_execution<B>(
result: Result<Bytes, RuntimeError>,
snapshot: &mut WorkflowSnapshot,
backend: &B,
) -> Result<(WorkflowStatus, Option<Bytes>), RuntimeError>
where
B: SnapshotStore,
{
tracing::debug!("finalizing workflow execution");
match result {
Ok(output) => {
tracing::info!(instance_id = %snapshot.instance_id, "workflow completed");
snapshot.mark_completed(output.clone());
backend.save_snapshot(snapshot).await?;
Ok((WorkflowStatus::Completed, Some(output)))
}
Err(RuntimeError::Workflow(WorkflowError::Waiting { wake_at })) => {
let delay_id = match &snapshot.state {
WorkflowSnapshotState::InProgress {
position: ExecutionPosition::AtDelay { delay_id, .. },
..
} => *delay_id,
WorkflowSnapshotState::InProgress {
position: ExecutionPosition::AtFork { fork_id, .. },
..
} => *fork_id,
_ => sayiir_core::TaskId::default(),
};
tracing::info!(
instance_id = %snapshot.instance_id,
%delay_id,
%wake_at,
"workflow parked at delay"
);
Ok((WorkflowStatus::Waiting { wake_at, delay_id }, None))
}
Err(RuntimeError::Workflow(WorkflowError::AwaitingSignal {
signal_id,
signal_name,
wake_at,
})) => {
tracing::info!(
instance_id = %snapshot.instance_id,
%signal_id,
%signal_name,
?wake_at,
"workflow parked at signal"
);
Ok((
WorkflowStatus::AwaitingSignal {
signal_id,
signal_name,
wake_at,
},
None,
))
}
Err(RuntimeError::Workflow(WorkflowError::Cancelled { .. })) => {
tracing::info!(instance_id = %snapshot.instance_id, "workflow cancelled");
if let Ok(cancelled_snapshot) = backend.load_snapshot(&snapshot.instance_id).await
&& let Some((reason, cancelled_by)) =
cancelled_snapshot.state.cancellation_details()
{
return Ok((
WorkflowStatus::Cancelled {
reason,
cancelled_by,
},
None,
));
}
Ok((
WorkflowStatus::Cancelled {
reason: None,
cancelled_by: None,
},
None,
))
}
Err(RuntimeError::Workflow(WorkflowError::Paused { .. })) => {
tracing::info!(instance_id = %snapshot.instance_id, "workflow paused");
if let Ok(paused_snapshot) = backend.load_snapshot(&snapshot.instance_id).await
&& let Some((reason, paused_by)) = paused_snapshot.state.pause_details()
{
return Ok((WorkflowStatus::Paused { reason, paused_by }, None));
}
Ok((
WorkflowStatus::Paused {
reason: None,
paused_by: None,
},
None,
))
}
Err(e) => {
tracing::error!(instance_id = %snapshot.instance_id, error = %e, "workflow failed");
snapshot.mark_failed(e.to_string());
let _ = backend.save_snapshot(snapshot).await;
Ok((WorkflowStatus::Failed(e.to_string()), None))
}
}
}