sayiir_persistence/backend.rs
1//! Persistent backend traits for storing and retrieving workflow snapshots.
2//!
3//! The trait hierarchy is decomposed into focused sub-traits:
4//!
5//! - [`SnapshotStore`]: Core CRUD for workflow snapshots (5 methods).
6//! - [`SignalStore`]: Cancel + pause signal primitives with default composite
7//! implementations (3 required + 3 default methods).
8//! - [`TaskClaimStore`]: Distributed task claiming (4 methods, opt-in).
9//! - [`PersistentBackend`]: Supertrait = `SnapshotStore + SignalStore`, blanket-implemented.
10//!
11//! A minimal backend only needs to implement `SnapshotStore` + 3 `SignalStore` primitives
12//! (8 methods total) to satisfy `PersistentBackend`.
13
14use chrono::Duration;
15use sayiir_core::snapshot::{
16 PauseRequest, SignalKind, SignalRequest, WorkflowSnapshot, WorkflowSnapshotState,
17};
18use sayiir_core::task_claim::{AvailableTask, TaskClaim};
19
20/// Error type for backend operations.
21#[derive(Debug, thiserror::Error)]
22pub enum BackendError {
23 /// Snapshot not found.
24 #[error("Snapshot not found: {0}")]
25 NotFound(String),
26 /// Serialization/deserialization error.
27 #[error("Serialization error: {0}")]
28 Serialization(String),
29 /// Backend-specific error.
30 #[error("Backend error: {0}")]
31 Backend(String),
32 /// Cannot cancel workflow in current state.
33 #[error("Cannot cancel workflow in state: {0}")]
34 CannotCancel(String),
35 /// Cannot pause workflow in current state.
36 #[error("Cannot pause workflow in state: {0}")]
37 CannotPause(String),
38}
39
40// ---------------------------------------------------------------------------
41// SnapshotStore — core CRUD, every backend implements this
42// ---------------------------------------------------------------------------
43
44/// Core snapshot CRUD operations.
45///
46/// Every persistent backend must implement these 5 methods.
47pub trait SnapshotStore: Send + Sync {
48 /// Save a workflow snapshot.
49 ///
50 /// If a snapshot with the same instance_id already exists, it should be overwritten.
51 fn save_snapshot(
52 &self,
53 snapshot: &WorkflowSnapshot,
54 ) -> impl Future<Output = Result<(), BackendError>> + Send;
55
56 /// Save a single task result atomically.
57 ///
58 /// This is more granular than `save_snapshot` and allows concurrent task
59 /// completions (e.g., in fork branches) without overwriting each other.
60 fn save_task_result(
61 &self,
62 instance_id: &str,
63 task_id: &str,
64 output: bytes::Bytes,
65 ) -> impl Future<Output = Result<(), BackendError>> + Send;
66
67 /// Load a workflow snapshot by instance ID.
68 fn load_snapshot(
69 &self,
70 instance_id: &str,
71 ) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send;
72
73 /// Delete a workflow snapshot.
74 fn delete_snapshot(
75 &self,
76 instance_id: &str,
77 ) -> impl Future<Output = Result<(), BackendError>> + Send;
78
79 /// List all snapshot instance IDs.
80 fn list_snapshots(&self) -> impl Future<Output = Result<Vec<String>, BackendError>> + Send;
81}
82
83// ---------------------------------------------------------------------------
84// SignalStore — cancel + pause via SignalKind
85// ---------------------------------------------------------------------------
86
87/// Signal storage for cancel and pause workflows.
88///
89/// Backends implement the 3 primitives (`store_signal`, `get_signal`,
90/// `clear_signal`). The 3 composite methods (`check_and_cancel`,
91/// `check_and_pause`, `unpause`) have default implementations built from
92/// the primitives + `SnapshotStore`. Backends may override them for atomicity.
93pub trait SignalStore: SnapshotStore {
94 // --- 3 primitives (backend must implement) ---
95
96 /// Store a signal (cancel or pause) for a workflow instance.
97 fn store_signal(
98 &self,
99 instance_id: &str,
100 kind: SignalKind,
101 request: SignalRequest,
102 ) -> impl Future<Output = Result<(), BackendError>> + Send;
103
104 /// Get the pending signal of the given kind, if any.
105 fn get_signal(
106 &self,
107 instance_id: &str,
108 kind: SignalKind,
109 ) -> impl Future<Output = Result<Option<SignalRequest>, BackendError>> + Send;
110
111 /// Clear the signal of the given kind.
112 fn clear_signal(
113 &self,
114 instance_id: &str,
115 kind: SignalKind,
116 ) -> impl Future<Output = Result<(), BackendError>> + Send;
117
118 // --- 3 composites with default impls (overridable for atomicity) ---
119
120 /// Atomically check for cancellation and transition to cancelled state.
121 ///
122 /// Returns `true` if the workflow was cancelled, `false` if no cancellation
123 /// was pending.
124 fn check_and_cancel(
125 &self,
126 instance_id: &str,
127 interrupted_at_task: Option<&str>,
128 ) -> impl Future<Output = Result<bool, BackendError>> + Send {
129 async move {
130 let Some(request) = self.get_signal(instance_id, SignalKind::Cancel).await? else {
131 return Ok(false);
132 };
133 let mut snapshot = self.load_snapshot(instance_id).await?;
134 if !snapshot.state.is_in_progress() {
135 return Ok(false);
136 }
137 snapshot.mark_cancelled(
138 request.reason,
139 request.requested_by,
140 interrupted_at_task.map(String::from),
141 );
142 self.save_snapshot(&snapshot).await?;
143 self.clear_signal(instance_id, SignalKind::Cancel).await?;
144 Ok(true)
145 }
146 }
147
148 /// Atomically check for a pause request and transition to paused state.
149 ///
150 /// Returns `true` if the workflow was paused, `false` if no pause was pending.
151 fn check_and_pause(
152 &self,
153 instance_id: &str,
154 ) -> impl Future<Output = Result<bool, BackendError>> + Send {
155 async move {
156 let Some(request) = self.get_signal(instance_id, SignalKind::Pause).await? else {
157 return Ok(false);
158 };
159 let mut snapshot = self.load_snapshot(instance_id).await?;
160 if !snapshot.state.is_in_progress() {
161 return Ok(false);
162 }
163 let pause_request: PauseRequest = request.into();
164 snapshot.mark_paused(&pause_request);
165 self.save_snapshot(&snapshot).await?;
166 self.clear_signal(instance_id, SignalKind::Pause).await?;
167 Ok(true)
168 }
169 }
170
171 /// Transition a paused workflow back to in-progress and return the updated snapshot.
172 fn unpause(
173 &self,
174 instance_id: &str,
175 ) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send {
176 async move {
177 let mut snapshot = self.load_snapshot(instance_id).await?;
178 if !snapshot.state.is_paused() {
179 let state_name = match &snapshot.state {
180 WorkflowSnapshotState::InProgress { .. } => "InProgress",
181 WorkflowSnapshotState::Completed { .. } => "Completed",
182 WorkflowSnapshotState::Failed { .. } => "Failed",
183 WorkflowSnapshotState::Cancelled { .. } => "Cancelled",
184 WorkflowSnapshotState::Paused { .. } => "Paused",
185 };
186 return Err(BackendError::CannotPause(format!(
187 "Workflow is not paused (current state: {state_name:?})"
188 )));
189 }
190 snapshot.mark_unpaused();
191 self.save_snapshot(&snapshot).await?;
192 Ok(snapshot)
193 }
194 }
195}
196
197// ---------------------------------------------------------------------------
198// TaskClaimStore — only for distributed workers
199// ---------------------------------------------------------------------------
200
201/// Task claiming for distributed multi-worker execution.
202///
203/// Only needed when using [`PooledWorker`](crate). Single-process backends
204/// (used with `CheckpointingRunner`) do not need to implement this.
205pub trait TaskClaimStore: Send + Sync {
206 /// Claim a task for execution by a worker node.
207 ///
208 /// Returns `Ok(Some(claim))` if successful, `Ok(None)` if already claimed.
209 fn claim_task(
210 &self,
211 instance_id: &str,
212 task_id: &str,
213 worker_id: &str,
214 ttl: Option<Duration>,
215 ) -> impl Future<Output = Result<Option<TaskClaim>, BackendError>> + Send;
216
217 /// Release a task claim.
218 fn release_task_claim(
219 &self,
220 instance_id: &str,
221 task_id: &str,
222 worker_id: &str,
223 ) -> impl Future<Output = Result<(), BackendError>> + Send;
224
225 /// Extend a task claim's expiration time.
226 fn extend_task_claim(
227 &self,
228 instance_id: &str,
229 task_id: &str,
230 worker_id: &str,
231 additional_duration: Duration,
232 ) -> impl Future<Output = Result<(), BackendError>> + Send;
233
234 /// Find available tasks across all workflow instances.
235 fn find_available_tasks(
236 &self,
237 worker_id: &str,
238 limit: usize,
239 ) -> impl Future<Output = Result<Vec<AvailableTask>, BackendError>> + Send;
240}
241
242// ---------------------------------------------------------------------------
243// PersistentBackend — supertrait + blanket impl
244// ---------------------------------------------------------------------------
245
246/// Supertrait combining [`SnapshotStore`] and [`SignalStore`].
247///
248/// This is the bound used by `CheckpointingRunner` and most of the runtime.
249/// It is blanket-implemented for any type that implements both sub-traits,
250/// so backends never need to implement it directly.
251pub trait PersistentBackend: SnapshotStore + SignalStore {}
252
253impl<T: SnapshotStore + SignalStore> PersistentBackend for T {}
254
255// Re-export Future so the trait method return types resolve.
256use std::future::Future;