use bytes::Bytes;
use sayiir_core::error::WorkflowError;
use sayiir_core::snapshot::{ExecutionPosition, WorkflowSnapshot, WorkflowSnapshotState};
use sayiir_core::workflow::WorkflowStatus;
use sayiir_persistence::{SignalStore, SnapshotStore};
use super::helpers::ResumeParkedPosition;
use crate::error::RuntimeError;
pub async fn prepare_run<B>(
instance_id: String,
definition_hash: String,
input_bytes: Bytes,
first_task_id: String,
backend: &B,
) -> Result<WorkflowSnapshot, RuntimeError>
where
B: SnapshotStore,
{
let mut snapshot =
WorkflowSnapshot::with_initial_input(instance_id, definition_hash, input_bytes);
snapshot.update_position(ExecutionPosition::AtTask {
task_id: first_task_id,
});
backend.save_snapshot(&snapshot).await?;
Ok(snapshot)
}
pub async fn prepare_resume<B>(
instance_id: &str,
definition_hash: &str,
backend: &B,
) -> Result<ResumeOutcome, RuntimeError>
where
B: SignalStore,
{
let mut snapshot = backend.load_snapshot(instance_id).await?;
if snapshot.definition_hash != definition_hash {
return Err(WorkflowError::DefinitionMismatch {
expected: definition_hash.to_string(),
found: snapshot.definition_hash.clone(),
}
.into());
}
if let Some(status) = snapshot.state.as_terminal_status() {
if snapshot.state.is_paused() {
return Ok(ResumeOutcome::Paused(status));
}
return Ok(ResumeOutcome::AlreadyTerminal(status));
}
let parked = ResumeParkedPosition::extract(&snapshot);
if let Some(status) = parked.resolve(&mut snapshot, instance_id, backend).await? {
return Ok(ResumeOutcome::NotReady(status));
}
let input_bytes = get_resume_input(&snapshot)?;
Ok(ResumeOutcome::Ready {
snapshot: Box::new(snapshot),
input_bytes,
})
}
#[derive(Debug)]
pub enum ResumeOutcome {
Ready {
snapshot: Box<WorkflowSnapshot>,
input_bytes: Bytes,
},
AlreadyTerminal(WorkflowStatus),
Paused(WorkflowStatus),
NotReady(WorkflowStatus),
}
pub fn get_resume_input(snapshot: &WorkflowSnapshot) -> Result<Bytes, RuntimeError> {
match &snapshot.state {
WorkflowSnapshotState::InProgress {
completed_tasks, ..
} => {
if completed_tasks.is_empty() {
snapshot.initial_input_bytes().ok_or_else(|| {
WorkflowError::ResumeError(
"no completed tasks and initial input not stored".into(),
)
.into()
})
} else {
snapshot.get_last_task_output().ok_or_else(|| {
WorkflowError::ResumeError("no task results available".into()).into()
})
}
}
_ => Err(WorkflowError::ResumeError("workflow not in progress".into()).into()),
}
}
pub async fn finalize_execution<B>(
result: Result<Bytes, RuntimeError>,
snapshot: &mut WorkflowSnapshot,
backend: &B,
) -> Result<(WorkflowStatus, Option<Bytes>), RuntimeError>
where
B: SnapshotStore,
{
match result {
Ok(output) => {
snapshot.mark_completed(output.clone());
backend.save_snapshot(snapshot).await?;
Ok((WorkflowStatus::Completed, Some(output)))
}
Err(RuntimeError::Workflow(WorkflowError::Waiting { wake_at })) => {
let delay_id = match &snapshot.state {
WorkflowSnapshotState::InProgress {
position: ExecutionPosition::AtDelay { delay_id, .. },
..
} => delay_id.clone(),
WorkflowSnapshotState::InProgress {
position: ExecutionPosition::AtFork { fork_id, .. },
..
} => fork_id.clone(),
_ => String::new(),
};
tracing::info!(
instance_id = %snapshot.instance_id,
%delay_id,
%wake_at,
"workflow parked at delay"
);
Ok((WorkflowStatus::Waiting { wake_at, delay_id }, None))
}
Err(RuntimeError::Workflow(WorkflowError::AwaitingSignal {
signal_id,
signal_name,
wake_at,
})) => {
tracing::info!(
instance_id = %snapshot.instance_id,
%signal_id,
%signal_name,
?wake_at,
"workflow parked at signal"
);
Ok((
WorkflowStatus::AwaitingSignal {
signal_id,
signal_name,
wake_at,
},
None,
))
}
Err(RuntimeError::Workflow(WorkflowError::Cancelled { .. })) => {
if let Ok(cancelled_snapshot) = backend.load_snapshot(&snapshot.instance_id).await
&& let Some((reason, cancelled_by)) =
cancelled_snapshot.state.cancellation_details()
{
return Ok((
WorkflowStatus::Cancelled {
reason,
cancelled_by,
},
None,
));
}
Ok((
WorkflowStatus::Cancelled {
reason: None,
cancelled_by: None,
},
None,
))
}
Err(RuntimeError::Workflow(WorkflowError::Paused { .. })) => {
if let Ok(paused_snapshot) = backend.load_snapshot(&snapshot.instance_id).await
&& let Some((reason, paused_by)) = paused_snapshot.state.pause_details()
{
return Ok((WorkflowStatus::Paused { reason, paused_by }, None));
}
Ok((
WorkflowStatus::Paused {
reason: None,
paused_by: None,
},
None,
))
}
Err(e) => {
snapshot.mark_failed(e.to_string());
let _ = backend.save_snapshot(snapshot).await;
Ok((WorkflowStatus::Failed(e.to_string()), None))
}
}
}