use bytes::Bytes;
use sayiir_core::snapshot::{ExecutionPosition, SignalKind, TaskHint, WorkflowSnapshot};
use sayiir_core::workflow::{ConflictPolicy, WorkflowStatus};
use crate::{BackendError, SignalStore, SnapshotStore};
#[derive(Debug)]
pub enum PrepareRunOutcome {
Fresh(Box<WorkflowSnapshot>),
ExistingStatus(WorkflowStatus, Option<Bytes>),
}
#[derive(Debug)]
pub enum RunConflict {
InvalidInstanceId(sayiir_core::InvalidInstanceId),
AlreadyExists(String),
DefinitionMismatch {
expected: sayiir_core::DefinitionHash,
found: sayiir_core::DefinitionHash,
},
Backend(BackendError),
}
impl From<BackendError> for RunConflict {
fn from(e: BackendError) -> Self {
Self::Backend(e)
}
}
impl From<sayiir_core::InvalidInstanceId> for RunConflict {
fn from(e: sayiir_core::InvalidInstanceId) -> Self {
Self::InvalidInstanceId(e)
}
}
impl std::fmt::Display for RunConflict {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidInstanceId(e) => std::fmt::Display::fmt(e, f),
Self::AlreadyExists(id) => write!(
f,
"Workflow instance '{id}' already exists. Use conflict policy 'use_existing' or 'terminate_existing' to override, or resume() instead.",
),
Self::DefinitionMismatch { expected, found } => write!(
f,
"Workflow definition mismatch: expected '{expected}', found '{found}'",
),
Self::Backend(e) => std::fmt::Display::fmt(e, f),
}
}
}
impl std::error::Error for RunConflict {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::InvalidInstanceId(e) => Some(e),
Self::Backend(e) => Some(e),
_ => None,
}
}
}
pub async fn prepare_run<B>(
instance_id: &str,
definition_hash: sayiir_core::DefinitionHash,
input_bytes: Bytes,
first_task: TaskHint,
backend: &B,
conflict_policy: ConflictPolicy,
) -> Result<PrepareRunOutcome, RunConflict>
where
B: SnapshotStore + SignalStore,
{
sayiir_core::validate_instance_id(instance_id)?;
match backend.load_snapshot(instance_id).await {
Ok(existing) => {
if existing.definition_hash != definition_hash {
return Err(RunConflict::DefinitionMismatch {
expected: definition_hash,
found: existing.definition_hash,
});
}
match conflict_policy {
ConflictPolicy::Fail => {
return Err(RunConflict::AlreadyExists(instance_id.to_string()));
}
ConflictPolicy::UseExisting => {
let output = existing.state.completed_output().cloned();
return Ok(PrepareRunOutcome::ExistingStatus(
existing.state.as_status(),
output,
));
}
ConflictPolicy::TerminateExisting => {
backend.delete_snapshot(instance_id).await?;
backend
.clear_signal(instance_id, SignalKind::Cancel)
.await?;
backend.clear_signal(instance_id, SignalKind::Pause).await?;
}
}
}
Err(BackendError::NotFound(_)) => {}
Err(e) => return Err(e.into()),
}
let mut snapshot =
WorkflowSnapshot::with_initial_input(instance_id, definition_hash, input_bytes);
snapshot.update_position(ExecutionPosition::AtTask {
task_id: first_task.id,
});
snapshot.set_task_hint(&first_task);
backend.save_snapshot(&mut snapshot).await?;
Ok(PrepareRunOutcome::Fresh(Box::new(snapshot)))
}