use chrono::Duration;
use sayiir_core::snapshot::{
PauseRequest, SignalKind, SignalRequest, WorkflowSnapshot, WorkflowSnapshotState,
};
use sayiir_core::task_claim::{AvailableTask, TaskClaim};
#[derive(Debug, thiserror::Error)]
pub enum BackendError {
#[error("Snapshot not found: {0}")]
NotFound(String),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Backend error: {0}")]
Backend(String),
#[error("Cannot cancel workflow in state: {0}")]
CannotCancel(String),
#[error("Cannot pause workflow in state: {0}")]
CannotPause(String),
}
pub trait SnapshotStore: Send + Sync {
fn save_snapshot(
&self,
snapshot: &WorkflowSnapshot,
) -> impl Future<Output = Result<(), BackendError>> + Send;
fn save_task_result(
&self,
instance_id: &str,
task_id: &str,
output: bytes::Bytes,
) -> impl Future<Output = Result<(), BackendError>> + Send;
fn load_snapshot(
&self,
instance_id: &str,
) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send;
fn delete_snapshot(
&self,
instance_id: &str,
) -> impl Future<Output = Result<(), BackendError>> + Send;
fn list_snapshots(&self) -> impl Future<Output = Result<Vec<String>, BackendError>> + Send;
}
pub trait SignalStore: SnapshotStore {
fn store_signal(
&self,
instance_id: &str,
kind: SignalKind,
request: SignalRequest,
) -> impl Future<Output = Result<(), BackendError>> + Send;
fn get_signal(
&self,
instance_id: &str,
kind: SignalKind,
) -> impl Future<Output = Result<Option<SignalRequest>, BackendError>> + Send;
fn clear_signal(
&self,
instance_id: &str,
kind: SignalKind,
) -> impl Future<Output = Result<(), BackendError>> + Send;
fn send_event(
&self,
instance_id: &str,
signal_name: &str,
payload: bytes::Bytes,
) -> impl Future<Output = Result<(), BackendError>> + Send;
fn consume_event(
&self,
instance_id: &str,
signal_name: &str,
) -> impl Future<Output = Result<Option<bytes::Bytes>, BackendError>> + Send;
fn check_and_cancel(
&self,
instance_id: &str,
interrupted_at_task: Option<&str>,
) -> impl Future<Output = Result<bool, BackendError>> + Send {
async move {
let Some(request) = self.get_signal(instance_id, SignalKind::Cancel).await? else {
return Ok(false);
};
let mut snapshot = self.load_snapshot(instance_id).await?;
if !snapshot.state.is_in_progress() {
return Ok(false);
}
snapshot.mark_cancelled(
request.reason,
request.requested_by,
interrupted_at_task.map(String::from),
);
self.save_snapshot(&snapshot).await?;
self.clear_signal(instance_id, SignalKind::Cancel).await?;
Ok(true)
}
}
fn check_and_pause(
&self,
instance_id: &str,
) -> impl Future<Output = Result<bool, BackendError>> + Send {
async move {
let Some(request) = self.get_signal(instance_id, SignalKind::Pause).await? else {
return Ok(false);
};
let mut snapshot = self.load_snapshot(instance_id).await?;
if !snapshot.state.is_in_progress() {
return Ok(false);
}
let pause_request: PauseRequest = request.into();
snapshot.mark_paused(&pause_request);
self.save_snapshot(&snapshot).await?;
self.clear_signal(instance_id, SignalKind::Pause).await?;
Ok(true)
}
}
fn unpause(
&self,
instance_id: &str,
) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send {
async move {
let mut snapshot = self.load_snapshot(instance_id).await?;
if !snapshot.state.is_paused() {
let state_name = match &snapshot.state {
WorkflowSnapshotState::InProgress { .. } => "InProgress",
WorkflowSnapshotState::Completed { .. } => "Completed",
WorkflowSnapshotState::Failed { .. } => "Failed",
WorkflowSnapshotState::Cancelled { .. } => "Cancelled",
WorkflowSnapshotState::Paused { .. } => "Paused",
};
return Err(BackendError::CannotPause(format!(
"Workflow is not paused (current state: {state_name:?})"
)));
}
snapshot.mark_unpaused();
self.save_snapshot(&snapshot).await?;
Ok(snapshot)
}
}
}
pub trait TaskClaimStore: Send + Sync {
fn claim_task(
&self,
instance_id: &str,
task_id: &str,
worker_id: &str,
ttl: Option<Duration>,
) -> impl Future<Output = Result<Option<TaskClaim>, BackendError>> + Send;
fn release_task_claim(
&self,
instance_id: &str,
task_id: &str,
worker_id: &str,
) -> impl Future<Output = Result<(), BackendError>> + Send;
fn extend_task_claim(
&self,
instance_id: &str,
task_id: &str,
worker_id: &str,
additional_duration: Duration,
) -> impl Future<Output = Result<(), BackendError>> + Send;
fn find_available_tasks(
&self,
worker_id: &str,
limit: usize,
aging_interval: Duration,
worker_tags: &[String],
) -> impl Future<Output = Result<Vec<AvailableTask>, BackendError>> + Send;
}
pub trait TaskResultStore: SnapshotStore {
fn load_task_result(
&self,
instance_id: &str,
task_id: &str,
) -> impl Future<Output = Result<Option<bytes::Bytes>, BackendError>> + Send;
}
pub trait PersistentBackend: SnapshotStore + SignalStore {}
impl<T: SnapshotStore + SignalStore> PersistentBackend for T {}
use std::future::Future;