1#![cfg(feature = "cqrs")]
8
9use std::{collections::HashMap, sync::Arc};
10
11use serde_json;
12
13#[async_trait::async_trait]
15pub trait Saga: Send + Sync {
16 fn saga_type(&self) -> &'static str;
18
19 fn steps(&self) -> Vec<Arc<dyn SagaStep>>;
21
22 fn initial_data(&self) -> serde_json::Value;
24
25 fn user_id(&self) -> &str;
27}
28
29#[async_trait::async_trait]
31pub trait SagaStep: Send + Sync {
32 async fn execute(&self, ctx: &SagaContext) -> StepExecutionResult;
34
35 async fn compensate(&self, ctx: &SagaContext) -> CompensationResult;
37
38 fn name(&self) -> &str;
40
41 fn timeout_seconds(&self) -> u64 {
43 30 }
45
46 fn requires_compensation(&self) -> bool {
48 true }
50}
51
52#[derive(Debug, Clone)]
54pub struct SagaContext {
55 pub saga_id: String,
57 pub step_outputs: HashMap<String, serde_json::Value>,
59 pub metadata: HashMap<String, serde_json::Value>,
61}
62
63impl SagaContext {
64 pub fn new(saga_id: String) -> Self {
66 Self {
67 saga_id,
68 step_outputs: HashMap::new(),
69 metadata: HashMap::new(),
70 }
71 }
72
73 pub fn set_step_output(&mut self, step_name: &str, output: serde_json::Value) {
75 self.step_outputs.insert(step_name.to_string(), output);
76 }
77
78 pub fn get_step_output(&self, step_name: &str) -> Option<&serde_json::Value> {
80 self.step_outputs.get(step_name)
81 }
82
83 pub fn set_metadata(&mut self, key: &str, value: serde_json::Value) {
85 self.metadata.insert(key.to_string(), value);
86 }
87
88 pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
90 self.metadata.get(key)
91 }
92}
93
94#[derive(Debug, Clone)]
96pub enum StepExecutionResult {
97 Success {
99 output: Option<serde_json::Value>,
101 },
102 Failure {
104 error: String,
106 },
107}
108
109impl StepExecutionResult {
110 pub fn success() -> Self {
112 Self::Success { output: None }
113 }
114
115 pub fn success_with_output(output: serde_json::Value) -> Self {
117 Self::Success {
118 output: Some(output),
119 }
120 }
121
122 pub fn failure(error: String) -> Self {
124 Self::Failure { error }
125 }
126
127 pub fn is_success(&self) -> bool {
129 matches!(self, Self::Success { .. })
130 }
131
132 pub fn output(&self) -> Option<&serde_json::Value> {
134 match self {
135 Self::Success { output } => output.as_ref(),
136 Self::Failure { .. } => None,
137 }
138 }
139
140 pub fn error(&self) -> Option<&str> {
142 match self {
143 Self::Success { .. } => None,
144 Self::Failure { error } => Some(error),
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
151pub enum CompensationResult {
152 Success,
154 Failure {
156 error: String,
158 },
159 NotNeeded,
161}
162
163impl CompensationResult {
164 pub fn success() -> Self {
166 Self::Success
167 }
168
169 pub fn failure(error: String) -> Self {
171 Self::Failure { error }
172 }
173
174 pub fn not_needed() -> Self {
176 Self::NotNeeded
177 }
178
179 pub fn is_success(&self) -> bool {
181 matches!(self, Self::Success)
182 }
183
184 pub fn is_not_needed(&self) -> bool {
186 matches!(self, Self::NotNeeded)
187 }
188}
189
190pub trait StepOutput: serde::de::DeserializeOwned + serde::Serialize {
192 fn from_context(ctx: &SagaContext, step_name: &str) -> Result<Self, SagaError>;
194}
195
196#[derive(Debug, Clone)]
198pub enum SagaError {
199 StepOutputNotFound {
201 step_name: String,
203 },
204 StepOutputParse {
206 step_name: String,
208 error: String,
210 },
211 StepExecutionFailed {
213 step_name: String,
215 error: String,
217 },
218 CompensationFailed {
220 step_name: String,
222 error: String,
224 },
225 SagaNotFound {
227 saga_id: String,
229 },
230 InvalidState {
232 saga_id: String,
234 message: String,
236 },
237}
238
239impl std::fmt::Display for SagaError {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 match self {
242 SagaError::StepOutputNotFound { step_name } => {
243 write!(f, "Step output not found for step: {}", step_name)
244 }
245 SagaError::StepOutputParse { step_name, error } => {
246 write!(
247 f,
248 "Failed to parse output for step {}: {}",
249 step_name, error
250 )
251 }
252 SagaError::StepExecutionFailed { step_name, error } => {
253 write!(f, "Step {} execution failed: {}", step_name, error)
254 }
255 SagaError::CompensationFailed { step_name, error } => {
256 write!(f, "Compensation failed for step {}: {}", step_name, error)
257 }
258 SagaError::SagaNotFound { saga_id } => {
259 write!(f, "Saga not found: {}", saga_id)
260 }
261 SagaError::InvalidState { saga_id, message } => {
262 write!(f, "Invalid saga state for {}: {}", saga_id, message)
263 }
264 }
265 }
266}
267
268impl std::error::Error for SagaError {}
269
270impl<E> From<SagaError> for Result<E, SagaError> {
271 fn from(error: SagaError) -> Self {
272 Err(error)
273 }
274}
275
276pub struct MacroSagaOrchestrator {
278 }
280
281impl Default for MacroSagaOrchestrator {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287impl MacroSagaOrchestrator {
288 pub fn new() -> Self {
290 Self {}
291 }
292
293 pub async fn execute(&self, _saga: Arc<dyn Saga>) -> Result<(), SagaError> {
295 Ok(())
298 }
299}
300
301#[derive(Debug, Clone)]
307pub struct FileSnapshot {
308 pub path: std::path::PathBuf,
310 pub content: Vec<u8>,
312}
313
314impl FileSnapshot {
315 pub async fn capture(path: &std::path::Path) -> Result<Self, String> {
317 let path_buf = path.to_path_buf();
318 let content = tokio::fs::read(&path_buf)
319 .await
320 .map_err(|e| format!("FileSnapshot capture: {}", e))?;
321 Ok(Self {
322 path: path_buf,
323 content,
324 })
325 }
326
327 pub async fn restore(&self) -> Result<(), String> {
329 tokio::fs::write(&self.path, &self.content)
330 .await
331 .map_err(|e| format!("FileSnapshot restore: {}", e))
332 }
333}
334
335#[derive(Debug, Clone, PartialEq, Eq)]
337pub enum CompensationStrategy {
338 LocalRollback,
340}
341
342pub struct WriteFileStep {
344 pub path: std::path::PathBuf,
346 pub content: String,
348 snapshot: tokio::sync::Mutex<Option<FileSnapshot>>,
350}
351
352impl WriteFileStep {
353 pub fn new(path: std::path::PathBuf, content: String) -> Self {
355 Self {
356 path,
357 content,
358 snapshot: tokio::sync::Mutex::new(None),
359 }
360 }
361}
362
363#[async_trait::async_trait]
364impl<E: super::Event> super::saga_orchestrator::SagaStep<E> for WriteFileStep {
365 async fn execute(&self) -> Result<Vec<E>, String> {
366 if self.path.exists() {
368 let snap = FileSnapshot::capture(&self.path).await?;
369 *self.snapshot.lock().await = Some(snap);
370 }
371 tokio::fs::write(&self.path, &self.content)
372 .await
373 .map_err(|e| format!("WriteFileStep: {}", e))?;
374 Ok(vec![])
375 }
376
377 async fn compensate(&self) -> Result<Vec<E>, String> {
378 if let Some(snap) = self.snapshot.lock().await.as_ref() {
379 snap.restore().await?;
380 }
381 Ok(vec![])
382 }
383
384 fn name(&self) -> &str {
385 "WriteFileStep"
386 }
387}
388
389#[cfg(feature = "cqrs-sqlite")]
394pub struct SqliteSavepoint<'conn> {
395 conn: &'conn rusqlite::Connection,
396 name: String,
397}
398
399#[cfg(feature = "cqrs-sqlite")]
400impl<'conn> SqliteSavepoint<'conn> {
401 pub fn create(conn: &'conn rusqlite::Connection, name: &str) -> Result<Self, String> {
403 conn.execute_batch(&format!("SAVEPOINT {}", name))
404 .map_err(|e| format!("Savepoint create: {}", e))?;
405 Ok(Self {
406 conn,
407 name: name.to_string(),
408 })
409 }
410
411 pub fn rollback(&self) -> Result<(), String> {
413 self.conn
414 .execute_batch(&format!("ROLLBACK TO SAVEPOINT {}", self.name))
415 .map_err(|e| format!("Savepoint rollback: {}", e))
416 }
417}