#![cfg(feature = "cqrs")]
use std::{collections::HashMap, sync::Arc};
use serde_json;
#[async_trait::async_trait]
pub trait Saga: Send + Sync {
fn saga_type(&self) -> &'static str;
fn steps(&self) -> Vec<Arc<dyn SagaStep>>;
fn initial_data(&self) -> serde_json::Value;
fn user_id(&self) -> &str;
}
#[async_trait::async_trait]
pub trait SagaStep: Send + Sync {
async fn execute(&self, ctx: &SagaContext) -> StepExecutionResult;
async fn compensate(&self, ctx: &SagaContext) -> CompensationResult;
fn name(&self) -> &str;
fn timeout_seconds(&self) -> u64 {
30 }
fn requires_compensation(&self) -> bool {
true }
}
#[derive(Debug, Clone)]
pub struct SagaContext {
pub saga_id: String,
pub step_outputs: HashMap<String, serde_json::Value>,
pub metadata: HashMap<String, serde_json::Value>,
}
impl SagaContext {
pub fn new(saga_id: String) -> Self {
Self {
saga_id,
step_outputs: HashMap::new(),
metadata: HashMap::new(),
}
}
pub fn set_step_output(&mut self, step_name: &str, output: serde_json::Value) {
self.step_outputs.insert(step_name.to_string(), output);
}
pub fn get_step_output(&self, step_name: &str) -> Option<&serde_json::Value> {
self.step_outputs.get(step_name)
}
pub fn set_metadata(&mut self, key: &str, value: serde_json::Value) {
self.metadata.insert(key.to_string(), value);
}
pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
self.metadata.get(key)
}
}
#[derive(Debug, Clone)]
pub enum StepExecutionResult {
Success {
output: Option<serde_json::Value>,
},
Failure {
error: String,
},
}
impl StepExecutionResult {
pub fn success() -> Self {
Self::Success { output: None }
}
pub fn success_with_output(output: serde_json::Value) -> Self {
Self::Success {
output: Some(output),
}
}
pub fn failure(error: String) -> Self {
Self::Failure { error }
}
pub fn is_success(&self) -> bool {
matches!(self, Self::Success { .. })
}
pub fn output(&self) -> Option<&serde_json::Value> {
match self {
Self::Success { output } => output.as_ref(),
Self::Failure { .. } => None,
}
}
pub fn error(&self) -> Option<&str> {
match self {
Self::Success { .. } => None,
Self::Failure { error } => Some(error),
}
}
}
#[derive(Debug, Clone)]
pub enum CompensationResult {
Success,
Failure {
error: String,
},
NotNeeded,
}
impl CompensationResult {
pub fn success() -> Self {
Self::Success
}
pub fn failure(error: String) -> Self {
Self::Failure { error }
}
pub fn not_needed() -> Self {
Self::NotNeeded
}
pub fn is_success(&self) -> bool {
matches!(self, Self::Success)
}
pub fn is_not_needed(&self) -> bool {
matches!(self, Self::NotNeeded)
}
}
pub trait StepOutput: serde::de::DeserializeOwned + serde::Serialize {
fn from_context(ctx: &SagaContext, step_name: &str) -> Result<Self, SagaError>;
}
#[derive(Debug, Clone)]
pub enum SagaError {
StepOutputNotFound {
step_name: String,
},
StepOutputParse {
step_name: String,
error: String,
},
StepExecutionFailed {
step_name: String,
error: String,
},
CompensationFailed {
step_name: String,
error: String,
},
SagaNotFound {
saga_id: String,
},
InvalidState {
saga_id: String,
message: String,
},
}
impl std::fmt::Display for SagaError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SagaError::StepOutputNotFound { step_name } => {
write!(f, "Step output not found for step: {}", step_name)
}
SagaError::StepOutputParse { step_name, error } => {
write!(
f,
"Failed to parse output for step {}: {}",
step_name, error
)
}
SagaError::StepExecutionFailed { step_name, error } => {
write!(f, "Step {} execution failed: {}", step_name, error)
}
SagaError::CompensationFailed { step_name, error } => {
write!(f, "Compensation failed for step {}: {}", step_name, error)
}
SagaError::SagaNotFound { saga_id } => {
write!(f, "Saga not found: {}", saga_id)
}
SagaError::InvalidState { saga_id, message } => {
write!(f, "Invalid saga state for {}: {}", saga_id, message)
}
}
}
}
impl std::error::Error for SagaError {}
impl<E> From<SagaError> for Result<E, SagaError> {
fn from(error: SagaError) -> Self {
Err(error)
}
}
pub struct MacroSagaOrchestrator {
}
impl Default for MacroSagaOrchestrator {
fn default() -> Self {
Self::new()
}
}
impl MacroSagaOrchestrator {
pub fn new() -> Self {
Self {}
}
pub async fn execute(&self, _saga: Arc<dyn Saga>) -> Result<(), SagaError> {
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct FileSnapshot {
pub path: std::path::PathBuf,
pub content: Vec<u8>,
}
impl FileSnapshot {
pub async fn capture(path: &std::path::Path) -> Result<Self, String> {
let path_buf = path.to_path_buf();
let content = tokio::fs::read(&path_buf)
.await
.map_err(|e| format!("FileSnapshot capture: {}", e))?;
Ok(Self {
path: path_buf,
content,
})
}
pub async fn restore(&self) -> Result<(), String> {
tokio::fs::write(&self.path, &self.content)
.await
.map_err(|e| format!("FileSnapshot restore: {}", e))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompensationStrategy {
LocalRollback,
}
pub struct WriteFileStep {
pub path: std::path::PathBuf,
pub content: String,
snapshot: tokio::sync::Mutex<Option<FileSnapshot>>,
}
impl WriteFileStep {
pub fn new(path: std::path::PathBuf, content: String) -> Self {
Self {
path,
content,
snapshot: tokio::sync::Mutex::new(None),
}
}
}
#[async_trait::async_trait]
impl<E: super::Event> super::saga_orchestrator::SagaStep<E> for WriteFileStep {
async fn execute(&self) -> Result<Vec<E>, String> {
if self.path.exists() {
let snap = FileSnapshot::capture(&self.path).await?;
*self.snapshot.lock().await = Some(snap);
}
tokio::fs::write(&self.path, &self.content)
.await
.map_err(|e| format!("WriteFileStep: {}", e))?;
Ok(vec![])
}
async fn compensate(&self) -> Result<Vec<E>, String> {
if let Some(snap) = self.snapshot.lock().await.as_ref() {
snap.restore().await?;
}
Ok(vec![])
}
fn name(&self) -> &str {
"WriteFileStep"
}
}
#[cfg(feature = "cqrs-sqlite")]
pub struct SqliteSavepoint<'conn> {
conn: &'conn rusqlite::Connection,
name: String,
}
#[cfg(feature = "cqrs-sqlite")]
impl<'conn> SqliteSavepoint<'conn> {
pub fn create(conn: &'conn rusqlite::Connection, name: &str) -> Result<Self, String> {
conn.execute_batch(&format!("SAVEPOINT {}", name))
.map_err(|e| format!("Savepoint create: {}", e))?;
Ok(Self {
conn,
name: name.to_string(),
})
}
pub fn rollback(&self) -> Result<(), String> {
self.conn
.execute_batch(&format!("ROLLBACK TO SAVEPOINT {}", self.name))
.map_err(|e| format!("Savepoint rollback: {}", e))
}
}