sayiir_persistence/backend.rs
1//! Persistent backend traits for storing and retrieving workflow snapshots.
2//!
3//! The trait hierarchy is decomposed into focused sub-traits:
4//!
5//! - [`SnapshotStore`]: Core CRUD for workflow snapshots (5 methods).
6//! - [`SignalStore`]: Cancel + pause signal primitives with default composite
7//! implementations (3 required + 3 default methods).
8//! - [`TaskClaimStore`]: Distributed task claiming (4 methods, opt-in).
9//! - [`PersistentBackend`]: Supertrait = `SnapshotStore + SignalStore`, blanket-implemented.
10//!
11//! A minimal backend only needs to implement `SnapshotStore` + 3 `SignalStore` primitives
12//! (8 methods total) to satisfy `PersistentBackend`.
13
14use chrono::Duration;
15use sayiir_core::snapshot::{
16 PauseRequest, SignalKind, SignalRequest, WorkflowSnapshot, WorkflowSnapshotState,
17};
18use sayiir_core::task_claim::{AvailableTask, TaskClaim};
19
20/// Error type for backend operations.
21#[derive(Debug, thiserror::Error)]
22pub enum BackendError {
23 /// Snapshot not found.
24 #[error("Snapshot not found: {0}")]
25 NotFound(String),
26 /// Serialization/deserialization error.
27 #[error("Serialization error: {0}")]
28 Serialization(String),
29 /// Backend-specific error.
30 #[error("Backend error: {0}")]
31 Backend(String),
32 /// Cannot cancel workflow in current state.
33 #[error("Cannot cancel workflow in state: {0}")]
34 CannotCancel(String),
35 /// Cannot pause workflow in current state.
36 #[error("Cannot pause workflow in state: {0}")]
37 CannotPause(String),
38}
39
40// ---------------------------------------------------------------------------
41// SnapshotStore — core CRUD, every backend implements this
42// ---------------------------------------------------------------------------
43
44/// Core snapshot CRUD operations.
45///
46/// Every persistent backend must implement these 5 methods.
47pub trait SnapshotStore: Send + Sync {
48 /// Save a workflow snapshot.
49 ///
50 /// If a snapshot with the same instance_id already exists, it should be overwritten.
51 fn save_snapshot(
52 &self,
53 snapshot: &WorkflowSnapshot,
54 ) -> impl Future<Output = Result<(), BackendError>> + Send;
55
56 /// Save a single task result atomically.
57 ///
58 /// This is more granular than `save_snapshot` and allows concurrent task
59 /// completions (e.g., in fork branches) without overwriting each other.
60 fn save_task_result(
61 &self,
62 instance_id: &str,
63 task_id: &str,
64 output: bytes::Bytes,
65 ) -> impl Future<Output = Result<(), BackendError>> + Send;
66
67 /// Load a workflow snapshot by instance ID.
68 fn load_snapshot(
69 &self,
70 instance_id: &str,
71 ) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send;
72
73 /// Delete a workflow snapshot.
74 fn delete_snapshot(
75 &self,
76 instance_id: &str,
77 ) -> impl Future<Output = Result<(), BackendError>> + Send;
78
79 /// List all snapshot instance IDs.
80 fn list_snapshots(&self) -> impl Future<Output = Result<Vec<String>, BackendError>> + Send;
81}
82
83// ---------------------------------------------------------------------------
84// SignalStore — cancel + pause via SignalKind
85// ---------------------------------------------------------------------------
86
87/// Signal storage for cancel and pause workflows.
88///
89/// Backends implement the 3 primitives (`store_signal`, `get_signal`,
90/// `clear_signal`). The 3 composite methods (`check_and_cancel`,
91/// `check_and_pause`, `unpause`) have default implementations built from
92/// the primitives + `SnapshotStore`. Backends may override them for atomicity.
93pub trait SignalStore: SnapshotStore {
94 // --- 3 primitives (backend must implement) ---
95
96 /// Store a signal (cancel or pause) for a workflow instance.
97 fn store_signal(
98 &self,
99 instance_id: &str,
100 kind: SignalKind,
101 request: SignalRequest,
102 ) -> impl Future<Output = Result<(), BackendError>> + Send;
103
104 /// Get the pending signal of the given kind, if any.
105 fn get_signal(
106 &self,
107 instance_id: &str,
108 kind: SignalKind,
109 ) -> impl Future<Output = Result<Option<SignalRequest>, BackendError>> + Send;
110
111 /// Clear the signal of the given kind.
112 fn clear_signal(
113 &self,
114 instance_id: &str,
115 kind: SignalKind,
116 ) -> impl Future<Output = Result<(), BackendError>> + Send;
117
118 /// Send an external event to a workflow instance.
119 ///
120 /// Events are buffered per `(instance_id, signal_name)` in FIFO order.
121 /// Sending to a nonexistent or terminal instance silently stores the event.
122 fn send_event(
123 &self,
124 instance_id: &str,
125 signal_name: &str,
126 payload: bytes::Bytes,
127 ) -> impl Future<Output = Result<(), BackendError>> + Send;
128
129 /// Consume the oldest buffered event for the given signal name, if any.
130 ///
131 /// Returns `Some(payload)` if an event was consumed, `None` otherwise.
132 fn consume_event(
133 &self,
134 instance_id: &str,
135 signal_name: &str,
136 ) -> impl Future<Output = Result<Option<bytes::Bytes>, BackendError>> + Send;
137
138 // --- 3 composites with default impls (overridable for atomicity) ---
139
140 /// Atomically check for cancellation and transition to cancelled state.
141 ///
142 /// Returns `true` if the workflow was cancelled, `false` if no cancellation
143 /// was pending.
144 fn check_and_cancel(
145 &self,
146 instance_id: &str,
147 interrupted_at_task: Option<&str>,
148 ) -> impl Future<Output = Result<bool, BackendError>> + Send {
149 async move {
150 let Some(request) = self.get_signal(instance_id, SignalKind::Cancel).await? else {
151 return Ok(false);
152 };
153 let mut snapshot = self.load_snapshot(instance_id).await?;
154 if !snapshot.state.is_in_progress() {
155 return Ok(false);
156 }
157 snapshot.mark_cancelled(
158 request.reason,
159 request.requested_by,
160 interrupted_at_task.map(String::from),
161 );
162 self.save_snapshot(&snapshot).await?;
163 self.clear_signal(instance_id, SignalKind::Cancel).await?;
164 Ok(true)
165 }
166 }
167
168 /// Atomically check for a pause request and transition to paused state.
169 ///
170 /// Returns `true` if the workflow was paused, `false` if no pause was pending.
171 fn check_and_pause(
172 &self,
173 instance_id: &str,
174 ) -> impl Future<Output = Result<bool, BackendError>> + Send {
175 async move {
176 let Some(request) = self.get_signal(instance_id, SignalKind::Pause).await? else {
177 return Ok(false);
178 };
179 let mut snapshot = self.load_snapshot(instance_id).await?;
180 if !snapshot.state.is_in_progress() {
181 return Ok(false);
182 }
183 let pause_request: PauseRequest = request.into();
184 snapshot.mark_paused(&pause_request);
185 self.save_snapshot(&snapshot).await?;
186 self.clear_signal(instance_id, SignalKind::Pause).await?;
187 Ok(true)
188 }
189 }
190
191 /// Transition a paused workflow back to in-progress and return the updated snapshot.
192 fn unpause(
193 &self,
194 instance_id: &str,
195 ) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send {
196 async move {
197 let mut snapshot = self.load_snapshot(instance_id).await?;
198 if !snapshot.state.is_paused() {
199 let state_name = match &snapshot.state {
200 WorkflowSnapshotState::InProgress { .. } => "InProgress",
201 WorkflowSnapshotState::Completed { .. } => "Completed",
202 WorkflowSnapshotState::Failed { .. } => "Failed",
203 WorkflowSnapshotState::Cancelled { .. } => "Cancelled",
204 WorkflowSnapshotState::Paused { .. } => "Paused",
205 };
206 return Err(BackendError::CannotPause(format!(
207 "Workflow is not paused (current state: {state_name:?})"
208 )));
209 }
210 snapshot.mark_unpaused();
211 self.save_snapshot(&snapshot).await?;
212 Ok(snapshot)
213 }
214 }
215}
216
217// ---------------------------------------------------------------------------
218// TaskClaimStore — only for distributed workers
219// ---------------------------------------------------------------------------
220
221/// Task claiming for distributed multi-worker execution.
222///
223/// Only needed when using [`PooledWorker`](crate). Single-process backends
224/// (used with `CheckpointingRunner`) do not need to implement this.
225pub trait TaskClaimStore: Send + Sync {
226 /// Claim a task for execution by a worker node.
227 ///
228 /// Returns `Ok(Some(claim))` if successful, `Ok(None)` if already claimed.
229 fn claim_task(
230 &self,
231 instance_id: &str,
232 task_id: &str,
233 worker_id: &str,
234 ttl: Option<Duration>,
235 ) -> impl Future<Output = Result<Option<TaskClaim>, BackendError>> + Send;
236
237 /// Release a task claim.
238 fn release_task_claim(
239 &self,
240 instance_id: &str,
241 task_id: &str,
242 worker_id: &str,
243 ) -> impl Future<Output = Result<(), BackendError>> + Send;
244
245 /// Extend a task claim's expiration time.
246 fn extend_task_claim(
247 &self,
248 instance_id: &str,
249 task_id: &str,
250 worker_id: &str,
251 additional_duration: Duration,
252 ) -> impl Future<Output = Result<(), BackendError>> + Send;
253
254 /// Find available tasks across all workflow instances.
255 fn find_available_tasks(
256 &self,
257 worker_id: &str,
258 limit: usize,
259 ) -> impl Future<Output = Result<Vec<AvailableTask>, BackendError>> + Send;
260}
261
262// ---------------------------------------------------------------------------
263// PersistentBackend — supertrait + blanket impl
264// ---------------------------------------------------------------------------
265
266/// Supertrait combining [`SnapshotStore`] and [`SignalStore`].
267///
268/// This is the bound used by `CheckpointingRunner` and most of the runtime.
269/// It is blanket-implemented for any type that implements both sub-traits,
270/// so backends never need to implement it directly.
271pub trait PersistentBackend: SnapshotStore + SignalStore {}
272
273impl<T: SnapshotStore + SignalStore> PersistentBackend for T {}
274
275// Re-export Future so the trait method return types resolve.
276use std::future::Future;