use bytes::Bytes;
use sayiir_core::snapshot::{ExecutionPosition, SignalKind, TaskHint, WorkflowSnapshot};
use sayiir_core::workflow::{ConflictPolicy, WorkflowStatus};
use crate::{BackendError, SignalStore, SnapshotStore};
#[derive(Debug)]
pub enum PrepareRunOutcome {
Fresh(Box<WorkflowSnapshot>),
ExistingStatus(WorkflowStatus, Option<Bytes>),
}
#[derive(Debug)]
pub enum RunConflict {
AlreadyExists(String),
DefinitionMismatch {
expected: String,
found: String,
},
Backend(BackendError),
}
impl From<BackendError> for RunConflict {
fn from(e: BackendError) -> Self {
Self::Backend(e)
}
}
impl std::fmt::Display for RunConflict {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AlreadyExists(id) => write!(
f,
"Workflow instance '{id}' already exists. Use conflict policy 'use_existing' or 'terminate_existing' to override, or resume() instead.",
),
Self::DefinitionMismatch { expected, found } => write!(
f,
"Workflow definition mismatch: expected '{expected}', found '{found}'",
),
Self::Backend(e) => std::fmt::Display::fmt(e, f),
}
}
}
impl std::error::Error for RunConflict {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Backend(e) => Some(e),
_ => None,
}
}
}
pub async fn prepare_run<B>(
instance_id: String,
definition_hash: String,
input_bytes: Bytes,
first_task: TaskHint,
backend: &B,
conflict_policy: ConflictPolicy,
) -> Result<PrepareRunOutcome, RunConflict>
where
B: SnapshotStore + SignalStore,
{
match backend.load_snapshot(&instance_id).await {
Ok(existing) => {
if existing.definition_hash != definition_hash {
return Err(RunConflict::DefinitionMismatch {
expected: definition_hash,
found: existing.definition_hash,
});
}
match conflict_policy {
ConflictPolicy::Fail => return Err(RunConflict::AlreadyExists(instance_id)),
ConflictPolicy::UseExisting => {
let output = existing.state.completed_output().cloned();
return Ok(PrepareRunOutcome::ExistingStatus(
existing.state.as_status(),
output,
));
}
ConflictPolicy::TerminateExisting => {
backend.delete_snapshot(&instance_id).await?;
backend
.clear_signal(&instance_id, SignalKind::Cancel)
.await?;
backend
.clear_signal(&instance_id, SignalKind::Pause)
.await?;
}
}
}
Err(BackendError::NotFound(_)) => {}
Err(e) => return Err(e.into()),
}
let mut snapshot =
WorkflowSnapshot::with_initial_input(instance_id, definition_hash, input_bytes);
snapshot.update_position(ExecutionPosition::AtTask {
task_id: first_task.id.clone(),
});
snapshot.set_task_hint(&first_task);
backend.save_snapshot(&snapshot).await?;
Ok(PrepareRunOutcome::Fresh(Box::new(snapshot)))
}