Skip to main content

shaperail_core/
saga.rs

1use serde::{Deserialize, Serialize};
2
3/// A distributed saga definition parsed from `sagas/<name>.saga.yaml`.
4///
5/// Sagas coordinate multi-service transactions with compensating actions.
6///
7/// ```yaml
8/// saga: create_order
9/// version: 1
10/// steps:
11///   - name: reserve_inventory
12///     service: inventory-api
13///     action: POST /v1/reservations
14///     compensate: DELETE /v1/reservations/:id
15///     timeout_secs: 5
16///   - name: charge_payment
17///     service: payments-api
18///     action: POST /v1/charges
19///     compensate: POST /v1/charges/:id/refund
20///     timeout_secs: 10
21/// ```
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23#[serde(deny_unknown_fields)]
24pub struct SagaDefinition {
25    /// Saga name (unique within workspace).
26    pub saga: String,
27
28    /// Saga version.
29    #[serde(default = "default_saga_version")]
30    pub version: u32,
31
32    /// Ordered list of saga steps. Executed sequentially; on failure,
33    /// compensating actions run in reverse order.
34    pub steps: Vec<SagaStep>,
35}
36
37fn default_saga_version() -> u32 {
38    1
39}
40
41/// A single step within a saga.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43#[serde(deny_unknown_fields)]
44pub struct SagaStep {
45    /// Step name (unique within saga).
46    pub name: String,
47
48    /// Target service name (must exist in workspace).
49    pub service: String,
50
51    /// Forward action: HTTP method + path (e.g. "POST /v1/reservations").
52    pub action: String,
53
54    /// Compensating action: HTTP method + path, run on rollback.
55    pub compensate: String,
56
57    /// Optional input mapping (JSON template).
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub input: Option<serde_json::Value>,
60
61    /// Step timeout in seconds. Default: 30.
62    #[serde(default = "default_step_timeout")]
63    pub timeout_secs: u64,
64}
65
66fn default_step_timeout() -> u64 {
67    30
68}
69
70/// Runtime status of a saga execution.
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "lowercase")]
73pub enum SagaExecutionStatus {
74    /// Saga is running forward steps.
75    Running,
76    /// All steps completed successfully.
77    Completed,
78    /// A step failed; compensating actions are running.
79    Compensating,
80    /// All compensating actions finished (saga rolled back).
81    Compensated,
82    /// Compensating actions also failed — requires manual intervention.
83    Failed,
84}
85
86impl std::fmt::Display for SagaExecutionStatus {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            Self::Running => write!(f, "running"),
90            Self::Completed => write!(f, "completed"),
91            Self::Compensating => write!(f, "compensating"),
92            Self::Compensated => write!(f, "compensated"),
93            Self::Failed => write!(f, "failed"),
94        }
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn saga_definition_minimal() {
104        let json = r#"{
105            "saga": "create_order",
106            "steps": [
107                {
108                    "name": "reserve_inventory",
109                    "service": "inventory-api",
110                    "action": "POST /v1/reservations",
111                    "compensate": "DELETE /v1/reservations/:id"
112                }
113            ]
114        }"#;
115        let saga: SagaDefinition = serde_json::from_str(json).unwrap();
116        assert_eq!(saga.saga, "create_order");
117        assert_eq!(saga.version, 1);
118        assert_eq!(saga.steps.len(), 1);
119        assert_eq!(saga.steps[0].timeout_secs, 30);
120    }
121
122    #[test]
123    fn saga_definition_full() {
124        let json = r#"{
125            "saga": "create_order",
126            "version": 2,
127            "steps": [
128                {
129                    "name": "reserve",
130                    "service": "inventory-api",
131                    "action": "POST /v1/reservations",
132                    "compensate": "DELETE /v1/reservations/:id",
133                    "input": {"product_id": "from:order.product_id"},
134                    "timeout_secs": 5
135                },
136                {
137                    "name": "charge",
138                    "service": "payments-api",
139                    "action": "POST /v1/charges",
140                    "compensate": "POST /v1/charges/:id/refund",
141                    "timeout_secs": 10
142                }
143            ]
144        }"#;
145        let saga: SagaDefinition = serde_json::from_str(json).unwrap();
146        assert_eq!(saga.version, 2);
147        assert_eq!(saga.steps.len(), 2);
148        assert_eq!(saga.steps[0].timeout_secs, 5);
149        assert!(saga.steps[0].input.is_some());
150        assert_eq!(saga.steps[1].service, "payments-api");
151    }
152
153    #[test]
154    fn saga_definition_unknown_field_fails() {
155        let json = r#"{
156            "saga": "test",
157            "steps": [],
158            "unknown": true
159        }"#;
160        let err = serde_json::from_str::<SagaDefinition>(json);
161        assert!(err.is_err());
162    }
163
164    #[test]
165    fn saga_definition_serde_roundtrip() {
166        let saga = SagaDefinition {
167            saga: "roundtrip".to_string(),
168            version: 1,
169            steps: vec![
170                SagaStep {
171                    name: "step1".to_string(),
172                    service: "svc-a".to_string(),
173                    action: "POST /v1/items".to_string(),
174                    compensate: "DELETE /v1/items/:id".to_string(),
175                    input: None,
176                    timeout_secs: 30,
177                },
178                SagaStep {
179                    name: "step2".to_string(),
180                    service: "svc-b".to_string(),
181                    action: "POST /v1/records".to_string(),
182                    compensate: "DELETE /v1/records/:id".to_string(),
183                    input: Some(serde_json::json!({"key": "value"})),
184                    timeout_secs: 15,
185                },
186            ],
187        };
188        let json = serde_json::to_string(&saga).unwrap();
189        let back: SagaDefinition = serde_json::from_str(&json).unwrap();
190        assert_eq!(saga, back);
191    }
192
193    #[test]
194    fn saga_execution_status_display() {
195        assert_eq!(SagaExecutionStatus::Running.to_string(), "running");
196        assert_eq!(SagaExecutionStatus::Completed.to_string(), "completed");
197        assert_eq!(
198            SagaExecutionStatus::Compensating.to_string(),
199            "compensating"
200        );
201        assert_eq!(SagaExecutionStatus::Compensated.to_string(), "compensated");
202        assert_eq!(SagaExecutionStatus::Failed.to_string(), "failed");
203    }
204
205    #[test]
206    fn saga_step_defaults() {
207        let json = r#"{
208            "name": "step",
209            "service": "svc",
210            "action": "POST /v1/x",
211            "compensate": "DELETE /v1/x/:id"
212        }"#;
213        let step: SagaStep = serde_json::from_str(json).unwrap();
214        assert_eq!(step.timeout_secs, 30);
215        assert!(step.input.is_none());
216    }
217}