Skip to main content

allframe_core/cqrs/
saga.rs

1//! Saga pattern implementation with macro support
2//!
3//! This module provides the trait definitions and types used by the saga macros
4//! in allframe-macros. It provides a higher-level, macro-driven approach
5//! compared to the lower-level saga_orchestrator.
6
7#![cfg(feature = "cqrs")]
8
9use std::{collections::HashMap, sync::Arc};
10
11use serde_json;
12
13/// A saga that coordinates multiple steps with automatic compensation
14#[async_trait::async_trait]
15pub trait Saga: Send + Sync {
16    /// Get the saga type name for identification
17    fn saga_type(&self) -> &'static str;
18
19    /// Get all steps in execution order
20    fn steps(&self) -> Vec<Arc<dyn SagaStep>>;
21
22    /// Get the initial saga data as JSON
23    fn initial_data(&self) -> serde_json::Value;
24
25    /// Get the user ID associated with this saga
26    fn user_id(&self) -> &str;
27}
28
29/// A single step in a saga
30#[async_trait::async_trait]
31pub trait SagaStep: Send + Sync {
32    /// Execute the step
33    async fn execute(&self, ctx: &SagaContext) -> StepExecutionResult;
34
35    /// Compensate for this step (rollback)
36    async fn compensate(&self, ctx: &SagaContext) -> CompensationResult;
37
38    /// Get the step name for logging/debugging
39    fn name(&self) -> &str;
40
41    /// Get the timeout for this step in seconds
42    fn timeout_seconds(&self) -> u64 {
43        30 // Default 30 seconds
44    }
45
46    /// Whether this step requires compensation on failure
47    fn requires_compensation(&self) -> bool {
48        true // Default to requiring compensation
49    }
50}
51
52/// Context passed to saga steps during execution
53#[derive(Debug, Clone)]
54pub struct SagaContext {
55    /// Unique saga instance ID
56    pub saga_id: String,
57    /// Step outputs from previously executed steps
58    pub step_outputs: HashMap<String, serde_json::Value>,
59    /// Saga metadata
60    pub metadata: HashMap<String, serde_json::Value>,
61}
62
63impl SagaContext {
64    /// Create a new saga context
65    pub fn new(saga_id: String) -> Self {
66        Self {
67            saga_id,
68            step_outputs: HashMap::new(),
69            metadata: HashMap::new(),
70        }
71    }
72
73    /// Store output from a completed step
74    pub fn set_step_output(&mut self, step_name: &str, output: serde_json::Value) {
75        self.step_outputs.insert(step_name.to_string(), output);
76    }
77
78    /// Get output from a previously executed step
79    pub fn get_step_output(&self, step_name: &str) -> Option<&serde_json::Value> {
80        self.step_outputs.get(step_name)
81    }
82
83    /// Store arbitrary metadata
84    pub fn set_metadata(&mut self, key: &str, value: serde_json::Value) {
85        self.metadata.insert(key.to_string(), value);
86    }
87
88    /// Get metadata value
89    pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
90        self.metadata.get(key)
91    }
92}
93
94/// Result of a step execution
95#[derive(Debug, Clone)]
96pub enum StepExecutionResult {
97    /// Step completed successfully
98    Success {
99        /// Optional output from the step
100        output: Option<serde_json::Value>,
101    },
102    /// Step failed
103    Failure {
104        /// Error message describing the failure
105        error: String,
106    },
107}
108
109impl StepExecutionResult {
110    /// Create a success result with no output
111    pub fn success() -> Self {
112        Self::Success { output: None }
113    }
114
115    /// Create a success result with output
116    pub fn success_with_output(output: serde_json::Value) -> Self {
117        Self::Success {
118            output: Some(output),
119        }
120    }
121
122    /// Create a failure result
123    pub fn failure(error: String) -> Self {
124        Self::Failure { error }
125    }
126
127    /// Check if the result is successful
128    pub fn is_success(&self) -> bool {
129        matches!(self, Self::Success { .. })
130    }
131
132    /// Get the output if successful
133    pub fn output(&self) -> Option<&serde_json::Value> {
134        match self {
135            Self::Success { output } => output.as_ref(),
136            Self::Failure { .. } => None,
137        }
138    }
139
140    /// Get the error if failed
141    pub fn error(&self) -> Option<&str> {
142        match self {
143            Self::Success { .. } => None,
144            Self::Failure { error } => Some(error),
145        }
146    }
147}
148
149/// Result of a compensation operation
150#[derive(Debug, Clone)]
151pub enum CompensationResult {
152    /// Compensation completed successfully
153    Success,
154    /// Compensation failed
155    Failure {
156        /// Error message describing the failure
157        error: String,
158    },
159    /// No compensation needed
160    NotNeeded,
161}
162
163impl CompensationResult {
164    /// Create a success result
165    pub fn success() -> Self {
166        Self::Success
167    }
168
169    /// Create a failure result
170    pub fn failure(error: String) -> Self {
171        Self::Failure { error }
172    }
173
174    /// Create a not needed result
175    pub fn not_needed() -> Self {
176        Self::NotNeeded
177    }
178
179    /// Check if compensation was successful
180    pub fn is_success(&self) -> bool {
181        matches!(self, Self::Success)
182    }
183
184    /// Check if compensation is not needed
185    pub fn is_not_needed(&self) -> bool {
186        matches!(self, Self::NotNeeded)
187    }
188}
189
190/// Trait for types that can be used as step outputs
191pub trait StepOutput: serde::de::DeserializeOwned + serde::Serialize {
192    /// Extract this type from saga context
193    fn from_context(ctx: &SagaContext, step_name: &str) -> Result<Self, SagaError>;
194}
195
196/// Errors that can occur during saga operations
197#[derive(Debug, Clone)]
198pub enum SagaError {
199    /// Step output not found
200    StepOutputNotFound {
201        /// Name of the step whose output was not found
202        step_name: String,
203    },
204    /// Failed to parse step output
205    StepOutputParse {
206        /// Name of the step whose output failed to parse
207        step_name: String,
208        /// Parse error message
209        error: String,
210    },
211    /// Step execution failed
212    StepExecutionFailed {
213        /// Name of the step that failed
214        step_name: String,
215        /// Execution error message
216        error: String,
217    },
218    /// Compensation failed
219    CompensationFailed {
220        /// Name of the step whose compensation failed
221        step_name: String,
222        /// Compensation error message
223        error: String,
224    },
225    /// Saga not found
226    SagaNotFound {
227        /// ID of the saga that was not found
228        saga_id: String,
229    },
230    /// Invalid saga state
231    InvalidState {
232        /// ID of the saga with invalid state
233        saga_id: String,
234        /// Description of the invalid state
235        message: String,
236    },
237}
238
239impl std::fmt::Display for SagaError {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        match self {
242            SagaError::StepOutputNotFound { step_name } => {
243                write!(f, "Step output not found for step: {}", step_name)
244            }
245            SagaError::StepOutputParse { step_name, error } => {
246                write!(
247                    f,
248                    "Failed to parse output for step {}: {}",
249                    step_name, error
250                )
251            }
252            SagaError::StepExecutionFailed { step_name, error } => {
253                write!(f, "Step {} execution failed: {}", step_name, error)
254            }
255            SagaError::CompensationFailed { step_name, error } => {
256                write!(f, "Compensation failed for step {}: {}", step_name, error)
257            }
258            SagaError::SagaNotFound { saga_id } => {
259                write!(f, "Saga not found: {}", saga_id)
260            }
261            SagaError::InvalidState { saga_id, message } => {
262                write!(f, "Invalid saga state for {}: {}", saga_id, message)
263            }
264        }
265    }
266}
267
268impl std::error::Error for SagaError {}
269
270impl<E> From<SagaError> for Result<E, SagaError> {
271    fn from(error: SagaError) -> Self {
272        Err(error)
273    }
274}
275
276/// Saga orchestrator for executing macro-generated sagas
277pub struct MacroSagaOrchestrator {
278    // Future: Add persistence, monitoring, etc.
279}
280
281impl Default for MacroSagaOrchestrator {
282    fn default() -> Self {
283        Self::new()
284    }
285}
286
287impl MacroSagaOrchestrator {
288    /// Create a new orchestrator
289    pub fn new() -> Self {
290        Self {}
291    }
292
293    /// Execute a saga (placeholder implementation)
294    pub async fn execute(&self, _saga: Arc<dyn Saga>) -> Result<(), SagaError> {
295        // TODO: Implement saga execution logic
296        // This is a placeholder until the full orchestrator is implemented
297        Ok(())
298    }
299}
300
301// ============================================================================
302// UC-036.7: Saga Compensation Primitives
303// ============================================================================
304
305/// A snapshot of a file's contents, used for rollback compensation.
306#[derive(Debug, Clone)]
307pub struct FileSnapshot {
308    /// Path to the file
309    pub path: std::path::PathBuf,
310    /// Original content
311    pub content: Vec<u8>,
312}
313
314impl FileSnapshot {
315    /// Capture the current content of a file.
316    pub async fn capture(path: &std::path::Path) -> Result<Self, String> {
317        let path_buf = path.to_path_buf();
318        let content = tokio::fs::read(&path_buf)
319            .await
320            .map_err(|e| format!("FileSnapshot capture: {}", e))?;
321        Ok(Self {
322            path: path_buf,
323            content,
324        })
325    }
326
327    /// Restore the file to its captured content.
328    pub async fn restore(&self) -> Result<(), String> {
329        tokio::fs::write(&self.path, &self.content)
330            .await
331            .map_err(|e| format!("FileSnapshot restore: {}", e))
332    }
333}
334
335/// Strategy for compensating saga steps on failure.
336#[derive(Debug, Clone, PartialEq, Eq)]
337pub enum CompensationStrategy {
338    /// Roll back using local file snapshots / savepoints.
339    LocalRollback,
340}
341
342/// A saga step that writes content to a file, with automatic snapshot for compensation.
343pub struct WriteFileStep {
344    /// Path to write to.
345    pub path: std::path::PathBuf,
346    /// Content to write.
347    pub content: String,
348    /// Internal snapshot taken before execution.
349    snapshot: tokio::sync::Mutex<Option<FileSnapshot>>,
350}
351
352impl WriteFileStep {
353    /// Create a new WriteFileStep (convenience for tests that set path/content directly).
354    pub fn new(path: std::path::PathBuf, content: String) -> Self {
355        Self {
356            path,
357            content,
358            snapshot: tokio::sync::Mutex::new(None),
359        }
360    }
361}
362
363#[async_trait::async_trait]
364impl<E: super::Event> super::saga_orchestrator::SagaStep<E> for WriteFileStep {
365    async fn execute(&self) -> Result<Vec<E>, String> {
366        // Capture snapshot before writing
367        if self.path.exists() {
368            let snap = FileSnapshot::capture(&self.path).await?;
369            *self.snapshot.lock().await = Some(snap);
370        }
371        tokio::fs::write(&self.path, &self.content)
372            .await
373            .map_err(|e| format!("WriteFileStep: {}", e))?;
374        Ok(vec![])
375    }
376
377    async fn compensate(&self) -> Result<Vec<E>, String> {
378        if let Some(snap) = self.snapshot.lock().await.as_ref() {
379            snap.restore().await?;
380        }
381        Ok(vec![])
382    }
383
384    fn name(&self) -> &str {
385        "WriteFileStep"
386    }
387}
388
389/// SQLite savepoint for transactional rollback within a saga step.
390///
391/// Holds a reference to the connection and the savepoint name.
392/// The caller must ensure the `SqliteSavepoint` does not outlive the connection.
393#[cfg(feature = "cqrs-sqlite")]
394pub struct SqliteSavepoint<'conn> {
395    conn: &'conn rusqlite::Connection,
396    name: String,
397}
398
399#[cfg(feature = "cqrs-sqlite")]
400impl<'conn> SqliteSavepoint<'conn> {
401    /// Create a savepoint on the given connection.
402    pub fn create(conn: &'conn rusqlite::Connection, name: &str) -> Result<Self, String> {
403        conn.execute_batch(&format!("SAVEPOINT {}", name))
404            .map_err(|e| format!("Savepoint create: {}", e))?;
405        Ok(Self {
406            conn,
407            name: name.to_string(),
408        })
409    }
410
411    /// Rollback to this savepoint.
412    pub fn rollback(&self) -> Result<(), String> {
413        self.conn
414            .execute_batch(&format!("ROLLBACK TO SAVEPOINT {}", self.name))
415            .map_err(|e| format!("Savepoint rollback: {}", e))
416    }
417}