Skip to main content

sayiir_persistence/
backend.rs

1//! Persistent backend traits for storing and retrieving workflow snapshots.
2//!
3//! The trait hierarchy is decomposed into focused sub-traits:
4//!
5//! - [`SnapshotStore`]: Core CRUD for workflow snapshots (5 methods).
6//! - [`SignalStore`]: Cancel + pause signal primitives with default composite
7//!   implementations (3 required + 3 default methods).
8//! - [`TaskClaimStore`]: Distributed task claiming (4 methods, opt-in).
9//! - [`PersistentBackend`]: Supertrait = `SnapshotStore + SignalStore`, blanket-implemented.
10//!
11//! A minimal backend only needs to implement `SnapshotStore` + 3 `SignalStore` primitives
12//! (8 methods total) to satisfy `PersistentBackend`.
13
14use chrono::Duration;
15use sayiir_core::snapshot::{
16    PauseRequest, SignalKind, SignalRequest, WorkflowSnapshot, WorkflowSnapshotState,
17};
18use sayiir_core::task_claim::{AvailableTask, TaskClaim};
19
20/// Error type for backend operations.
21#[derive(Debug, thiserror::Error)]
22pub enum BackendError {
23    /// Snapshot not found.
24    #[error("Snapshot not found: {0}")]
25    NotFound(String),
26    /// Serialization/deserialization error.
27    #[error("Serialization error: {0}")]
28    Serialization(String),
29    /// Backend-specific error.
30    #[error("Backend error: {0}")]
31    Backend(String),
32    /// Cannot cancel workflow in current state.
33    #[error("Cannot cancel workflow in state: {0}")]
34    CannotCancel(String),
35    /// Cannot pause workflow in current state.
36    #[error("Cannot pause workflow in state: {0}")]
37    CannotPause(String),
38}
39
40// ---------------------------------------------------------------------------
41// SnapshotStore — core CRUD, every backend implements this
42// ---------------------------------------------------------------------------
43
44/// Core snapshot CRUD operations.
45///
46/// Every persistent backend must implement these 5 methods.
47pub trait SnapshotStore: Send + Sync {
48    /// Save a workflow snapshot.
49    ///
50    /// If a snapshot with the same instance_id already exists, it should be overwritten.
51    fn save_snapshot(
52        &self,
53        snapshot: &WorkflowSnapshot,
54    ) -> impl Future<Output = Result<(), BackendError>> + Send;
55
56    /// Save a single task result atomically.
57    ///
58    /// This is more granular than `save_snapshot` and allows concurrent task
59    /// completions (e.g., in fork branches) without overwriting each other.
60    fn save_task_result(
61        &self,
62        instance_id: &str,
63        task_id: &str,
64        output: bytes::Bytes,
65    ) -> impl Future<Output = Result<(), BackendError>> + Send;
66
67    /// Load a workflow snapshot by instance ID.
68    fn load_snapshot(
69        &self,
70        instance_id: &str,
71    ) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send;
72
73    /// Delete a workflow snapshot.
74    fn delete_snapshot(
75        &self,
76        instance_id: &str,
77    ) -> impl Future<Output = Result<(), BackendError>> + Send;
78
79    /// List all snapshot instance IDs.
80    fn list_snapshots(&self) -> impl Future<Output = Result<Vec<String>, BackendError>> + Send;
81}
82
83// ---------------------------------------------------------------------------
84// SignalStore — cancel + pause via SignalKind
85// ---------------------------------------------------------------------------
86
87/// Signal storage for cancel and pause workflows.
88///
89/// Backends implement the 3 primitives (`store_signal`, `get_signal`,
90/// `clear_signal`). The 3 composite methods (`check_and_cancel`,
91/// `check_and_pause`, `unpause`) have default implementations built from
92/// the primitives + `SnapshotStore`. Backends may override them for atomicity.
93pub trait SignalStore: SnapshotStore {
94    // --- 3 primitives (backend must implement) ---
95
96    /// Store a signal (cancel or pause) for a workflow instance.
97    fn store_signal(
98        &self,
99        instance_id: &str,
100        kind: SignalKind,
101        request: SignalRequest,
102    ) -> impl Future<Output = Result<(), BackendError>> + Send;
103
104    /// Get the pending signal of the given kind, if any.
105    fn get_signal(
106        &self,
107        instance_id: &str,
108        kind: SignalKind,
109    ) -> impl Future<Output = Result<Option<SignalRequest>, BackendError>> + Send;
110
111    /// Clear the signal of the given kind.
112    fn clear_signal(
113        &self,
114        instance_id: &str,
115        kind: SignalKind,
116    ) -> impl Future<Output = Result<(), BackendError>> + Send;
117
118    // --- 3 composites with default impls (overridable for atomicity) ---
119
120    /// Atomically check for cancellation and transition to cancelled state.
121    ///
122    /// Returns `true` if the workflow was cancelled, `false` if no cancellation
123    /// was pending.
124    fn check_and_cancel(
125        &self,
126        instance_id: &str,
127        interrupted_at_task: Option<&str>,
128    ) -> impl Future<Output = Result<bool, BackendError>> + Send {
129        async move {
130            let Some(request) = self.get_signal(instance_id, SignalKind::Cancel).await? else {
131                return Ok(false);
132            };
133            let mut snapshot = self.load_snapshot(instance_id).await?;
134            if !snapshot.state.is_in_progress() {
135                return Ok(false);
136            }
137            snapshot.mark_cancelled(
138                request.reason,
139                request.requested_by,
140                interrupted_at_task.map(String::from),
141            );
142            self.save_snapshot(&snapshot).await?;
143            self.clear_signal(instance_id, SignalKind::Cancel).await?;
144            Ok(true)
145        }
146    }
147
148    /// Atomically check for a pause request and transition to paused state.
149    ///
150    /// Returns `true` if the workflow was paused, `false` if no pause was pending.
151    fn check_and_pause(
152        &self,
153        instance_id: &str,
154    ) -> impl Future<Output = Result<bool, BackendError>> + Send {
155        async move {
156            let Some(request) = self.get_signal(instance_id, SignalKind::Pause).await? else {
157                return Ok(false);
158            };
159            let mut snapshot = self.load_snapshot(instance_id).await?;
160            if !snapshot.state.is_in_progress() {
161                return Ok(false);
162            }
163            let pause_request: PauseRequest = request.into();
164            snapshot.mark_paused(&pause_request);
165            self.save_snapshot(&snapshot).await?;
166            self.clear_signal(instance_id, SignalKind::Pause).await?;
167            Ok(true)
168        }
169    }
170
171    /// Transition a paused workflow back to in-progress and return the updated snapshot.
172    fn unpause(
173        &self,
174        instance_id: &str,
175    ) -> impl Future<Output = Result<WorkflowSnapshot, BackendError>> + Send {
176        async move {
177            let mut snapshot = self.load_snapshot(instance_id).await?;
178            if !snapshot.state.is_paused() {
179                let state_name = match &snapshot.state {
180                    WorkflowSnapshotState::InProgress { .. } => "InProgress",
181                    WorkflowSnapshotState::Completed { .. } => "Completed",
182                    WorkflowSnapshotState::Failed { .. } => "Failed",
183                    WorkflowSnapshotState::Cancelled { .. } => "Cancelled",
184                    WorkflowSnapshotState::Paused { .. } => "Paused",
185                };
186                return Err(BackendError::CannotPause(format!(
187                    "Workflow is not paused (current state: {state_name:?})"
188                )));
189            }
190            snapshot.mark_unpaused();
191            self.save_snapshot(&snapshot).await?;
192            Ok(snapshot)
193        }
194    }
195}
196
197// ---------------------------------------------------------------------------
198// TaskClaimStore — only for distributed workers
199// ---------------------------------------------------------------------------
200
201/// Task claiming for distributed multi-worker execution.
202///
203/// Only needed when using [`PooledWorker`](crate). Single-process backends
204/// (used with `CheckpointingRunner`) do not need to implement this.
205pub trait TaskClaimStore: Send + Sync {
206    /// Claim a task for execution by a worker node.
207    ///
208    /// Returns `Ok(Some(claim))` if successful, `Ok(None)` if already claimed.
209    fn claim_task(
210        &self,
211        instance_id: &str,
212        task_id: &str,
213        worker_id: &str,
214        ttl: Option<Duration>,
215    ) -> impl Future<Output = Result<Option<TaskClaim>, BackendError>> + Send;
216
217    /// Release a task claim.
218    fn release_task_claim(
219        &self,
220        instance_id: &str,
221        task_id: &str,
222        worker_id: &str,
223    ) -> impl Future<Output = Result<(), BackendError>> + Send;
224
225    /// Extend a task claim's expiration time.
226    fn extend_task_claim(
227        &self,
228        instance_id: &str,
229        task_id: &str,
230        worker_id: &str,
231        additional_duration: Duration,
232    ) -> impl Future<Output = Result<(), BackendError>> + Send;
233
234    /// Find available tasks across all workflow instances.
235    fn find_available_tasks(
236        &self,
237        worker_id: &str,
238        limit: usize,
239    ) -> impl Future<Output = Result<Vec<AvailableTask>, BackendError>> + Send;
240}
241
242// ---------------------------------------------------------------------------
243// PersistentBackend — supertrait + blanket impl
244// ---------------------------------------------------------------------------
245
246/// Supertrait combining [`SnapshotStore`] and [`SignalStore`].
247///
248/// This is the bound used by `CheckpointingRunner` and most of the runtime.
249/// It is blanket-implemented for any type that implements both sub-traits,
250/// so backends never need to implement it directly.
251pub trait PersistentBackend: SnapshotStore + SignalStore {}
252
253impl<T: SnapshotStore + SignalStore> PersistentBackend for T {}
254
255// Re-export Future so the trait method return types resolve.
256use std::future::Future;