Skip to main content

sayiir_persistence/
backend.rs

1//! Persistent backend traits for storing and retrieving workflow snapshots.
2//!
3//! The trait hierarchy is decomposed into focused sub-traits:
4//!
5//! - [`SnapshotStore`]: Core CRUD for workflow snapshots (5 methods).
6//! - [`SignalStore`]: Cancel + pause signal primitives with default composite
7//!   implementations (3 required + 3 default methods).
8//! - [`TaskClaimStore`]: Distributed task claiming (4 methods, opt-in).
9//! - [`PersistentBackend`]: Supertrait = `SnapshotStore + SignalStore`, blanket-implemented.
10//!
11//! A minimal backend only needs to implement `SnapshotStore` + 3 `SignalStore` primitives
12//! (8 methods total) to satisfy `PersistentBackend`.
13
14use chrono::Duration;
15use sayiir_core::snapshot::{
16    PauseRequest, SignalKind, SignalRequest, WorkflowSnapshot, WorkflowSnapshotState,
17};
18use sayiir_core::task_claim::{AvailableTask, TaskClaim};
19
20/// Error type for backend operations.
21#[derive(Debug, thiserror::Error)]
22pub enum BackendError {
23    /// Snapshot not found.
24    #[error("Snapshot not found: {0}")]
25    NotFound(String),
26    /// Serialization/deserialization error.
27    #[error("Serialization error: {0}")]
28    Serialization(String),
29    /// Backend-specific error.
30    #[error("Backend error: {0}")]
31    Backend(String),
32    /// Cannot cancel workflow in current state.
33    #[error("Cannot cancel workflow in state: {0}")]
34    CannotCancel(String),
35    /// Cannot pause workflow in current state.
36    #[error("Cannot pause workflow in state: {0}")]
37    CannotPause(String),
38}
39
40// ---------------------------------------------------------------------------
41// SnapshotStore — core CRUD, every backend implements this
42// ---------------------------------------------------------------------------
43
44/// Core snapshot CRUD operations.
45///
46/// Every persistent backend must implement these 5 methods.
47pub trait SnapshotStore: Send + Sync {
48    /// Save a workflow snapshot.
49    ///
50    /// If a snapshot with the same instance_id already exists, it should be overwritten.
51    fn save_snapshot(
52        &self,
53        snapshot: &WorkflowSnapshot,
54    ) -> impl Future<Output = Result<(), BackendError>> + Send;
55
56    /// Save a single task result atomically.
57    ///
58    /// This is more granular than `save_snapshot` and allows concurrent task
59    /// completions (e.g., in fork branches) without overwriting each other.
60    fn save_task_result(
61        &self,
62        instance_id: &str,
63        task_id: &str,
64        output: bytes::Bytes,
65    ) -> impl Future<Output = Result<(), BackendError>> + Send;
66
67    /// Load a workflow snapshot by instance ID.
68    fn load_snapshot(
69        &self,
70        instance_id: &str,
71    ) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send;
72
73    /// Delete a workflow snapshot.
74    fn delete_snapshot(
75        &self,
76        instance_id: &str,
77    ) -> impl Future<Output = Result<(), BackendError>> + Send;
78
79    /// List all snapshot instance IDs.
80    fn list_snapshots(&self) -> impl Future<Output = Result<Vec<String>, BackendError>> + Send;
81}
82
83// ---------------------------------------------------------------------------
84// SignalStore — cancel + pause via SignalKind
85// ---------------------------------------------------------------------------
86
87/// Signal storage for cancel and pause workflows.
88///
89/// Backends implement the 3 primitives (`store_signal`, `get_signal`,
90/// `clear_signal`). The 3 composite methods (`check_and_cancel`,
91/// `check_and_pause`, `unpause`) have default implementations built from
92/// the primitives + `SnapshotStore`. Backends may override them for atomicity.
93pub trait SignalStore: SnapshotStore {
94    // --- 3 primitives (backend must implement) ---
95
96    /// Store a signal (cancel or pause) for a workflow instance.
97    fn store_signal(
98        &self,
99        instance_id: &str,
100        kind: SignalKind,
101        request: SignalRequest,
102    ) -> impl Future<Output = Result<(), BackendError>> + Send;
103
104    /// Get the pending signal of the given kind, if any.
105    fn get_signal(
106        &self,
107        instance_id: &str,
108        kind: SignalKind,
109    ) -> impl Future<Output = Result<Option<SignalRequest>, BackendError>> + Send;
110
111    /// Clear the signal of the given kind.
112    fn clear_signal(
113        &self,
114        instance_id: &str,
115        kind: SignalKind,
116    ) -> impl Future<Output = Result<(), BackendError>> + Send;
117
118    /// Send an external event to a workflow instance.
119    ///
120    /// Events are buffered per `(instance_id, signal_name)` in FIFO order.
121    /// Sending to a nonexistent or terminal instance silently stores the event.
122    fn send_event(
123        &self,
124        instance_id: &str,
125        signal_name: &str,
126        payload: bytes::Bytes,
127    ) -> impl Future<Output = Result<(), BackendError>> + Send;
128
129    /// Consume the oldest buffered event for the given signal name, if any.
130    ///
131    /// Returns `Some(payload)` if an event was consumed, `None` otherwise.
132    fn consume_event(
133        &self,
134        instance_id: &str,
135        signal_name: &str,
136    ) -> impl Future<Output = Result<Option<bytes::Bytes>, BackendError>> + Send;
137
138    // --- 3 composites with default impls (overridable for atomicity) ---
139
140    /// Atomically check for cancellation and transition to cancelled state.
141    ///
142    /// Returns `true` if the workflow was cancelled, `false` if no cancellation
143    /// was pending.
144    fn check_and_cancel(
145        &self,
146        instance_id: &str,
147        interrupted_at_task: Option<&str>,
148    ) -> impl Future<Output = Result<bool, BackendError>> + Send {
149        async move {
150            let Some(request) = self.get_signal(instance_id, SignalKind::Cancel).await? else {
151                return Ok(false);
152            };
153            let mut snapshot = self.load_snapshot(instance_id).await?;
154            if !snapshot.state.is_in_progress() {
155                return Ok(false);
156            }
157            snapshot.mark_cancelled(
158                request.reason,
159                request.requested_by,
160                interrupted_at_task.map(String::from),
161            );
162            self.save_snapshot(&snapshot).await?;
163            self.clear_signal(instance_id, SignalKind::Cancel).await?;
164            Ok(true)
165        }
166    }
167
168    /// Atomically check for a pause request and transition to paused state.
169    ///
170    /// Returns `true` if the workflow was paused, `false` if no pause was pending.
171    fn check_and_pause(
172        &self,
173        instance_id: &str,
174    ) -> impl Future<Output = Result<bool, BackendError>> + Send {
175        async move {
176            let Some(request) = self.get_signal(instance_id, SignalKind::Pause).await? else {
177                return Ok(false);
178            };
179            let mut snapshot = self.load_snapshot(instance_id).await?;
180            if !snapshot.state.is_in_progress() {
181                return Ok(false);
182            }
183            let pause_request: PauseRequest = request.into();
184            snapshot.mark_paused(&pause_request);
185            self.save_snapshot(&snapshot).await?;
186            self.clear_signal(instance_id, SignalKind::Pause).await?;
187            Ok(true)
188        }
189    }
190
191    /// Transition a paused workflow back to in-progress and return the updated snapshot.
192    fn unpause(
193        &self,
194        instance_id: &str,
195    ) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send {
196        async move {
197            let mut snapshot = self.load_snapshot(instance_id).await?;
198            if !snapshot.state.is_paused() {
199                let state_name = match &snapshot.state {
200                    WorkflowSnapshotState::InProgress { .. } => "InProgress",
201                    WorkflowSnapshotState::Completed { .. } => "Completed",
202                    WorkflowSnapshotState::Failed { .. } => "Failed",
203                    WorkflowSnapshotState::Cancelled { .. } => "Cancelled",
204                    WorkflowSnapshotState::Paused { .. } => "Paused",
205                };
206                return Err(BackendError::CannotPause(format!(
207                    "Workflow is not paused (current state: {state_name:?})"
208                )));
209            }
210            snapshot.mark_unpaused();
211            self.save_snapshot(&snapshot).await?;
212            Ok(snapshot)
213        }
214    }
215}
216
217// ---------------------------------------------------------------------------
218// TaskClaimStore — only for distributed workers
219// ---------------------------------------------------------------------------
220
221/// Task claiming for distributed multi-worker execution.
222///
223/// Only needed when using [`PooledWorker`](crate). Single-process backends
224/// (used with `CheckpointingRunner`) do not need to implement this.
225pub trait TaskClaimStore: Send + Sync {
226    /// Claim a task for execution by a worker node.
227    ///
228    /// Returns `Ok(Some(claim))` if successful, `Ok(None)` if already claimed.
229    fn claim_task(
230        &self,
231        instance_id: &str,
232        task_id: &str,
233        worker_id: &str,
234        ttl: Option<Duration>,
235    ) -> impl Future<Output = Result<Option<TaskClaim>, BackendError>> + Send;
236
237    /// Release a task claim.
238    fn release_task_claim(
239        &self,
240        instance_id: &str,
241        task_id: &str,
242        worker_id: &str,
243    ) -> impl Future<Output = Result<(), BackendError>> + Send;
244
245    /// Extend a task claim's expiration time.
246    fn extend_task_claim(
247        &self,
248        instance_id: &str,
249        task_id: &str,
250        worker_id: &str,
251        additional_duration: Duration,
252    ) -> impl Future<Output = Result<(), BackendError>> + Send;
253
254    /// Find available tasks across all workflow instances.
255    fn find_available_tasks(
256        &self,
257        worker_id: &str,
258        limit: usize,
259    ) -> impl Future<Output = Result<Vec<AvailableTask>, BackendError>> + Send;
260}
261
262// ---------------------------------------------------------------------------
263// PersistentBackend — supertrait + blanket impl
264// ---------------------------------------------------------------------------
265
266/// Supertrait combining [`SnapshotStore`] and [`SignalStore`].
267///
268/// This is the bound used by `CheckpointingRunner` and most of the runtime.
269/// It is blanket-implemented for any type that implements both sub-traits,
270/// so backends never need to implement it directly.
271pub trait PersistentBackend: SnapshotStore + SignalStore {}
272
273impl<T: SnapshotStore + SignalStore> PersistentBackend for T {}
274
275// Re-export Future so the trait method return types resolve.
276use std::future::Future;