Skip to main content

fraiseql_core/federation/
saga_executor.rs

1//! Saga Forward Phase Executor
2//!
3//! Executes saga steps sequentially during the forward phase, implementing
4//! the core saga pattern for distributed transactions across subgraphs.
5//!
6//! # Architecture
7//!
8//! The forward phase executor:
9//! - Loads sagas from persistent storage
10//! - Executes steps in strict sequential order (1 → 2 → 3)
11//! - Pre-fetches @requires fields before each step
12//! - Captures and persists results for chaining
13//! - Tracks execution state for monitoring and recovery
14//! - Terminates on first failure and triggers compensation
15//!
16//! # Execution Flow
17//!
18//! ```text
19//! Load Saga from Store
20//!    ↓
21//! For Each Step (1..N):
22//!    ├─ Validate step is Pending
23//!    ├─ Pre-fetch @requires fields from other subgraphs
24//!    ├─ Transition step to Executing
25//!    ├─ Execute mutation via MutationExecutor
26//!    │  (with augmented entity data)
27//!    ├─ Capture result data
28//!    ├─ Persist step result to store
29//!    ├─ Transition step to Completed
30//!    └─ Continue to next step
31//!       OR on failure: Break and transition to Failed state
32//!
33//! Update Saga State:
34//!    ├─ If all completed: Saga → Completed
35//!    └─ If any failed: Saga → Failed (trigger compensation)
36//! ```
37//!
38//! # @requires Field Fetching
39//!
40//! Each step may have @requires fields that must be present before mutation execution.
41//! These fields are fetched from their owning subgraphs before step execution:
42//!
43//! ```text
44//! Step Definition:
45//!   mutation: "updateOrder"
46//!   @requires: ["product.price", "user.email"]
47//!
48//! Pre-Execution:
49//!   1. Identify @requires fields
50//!   2. Fetch from owning subgraphs
51//!   3. Augment entity data with fetched fields
52//!   4. Execute mutation with complete entity
53//! ```
54//!
55//! # Example
56//!
57//! ```ignore
58//! let executor = SagaExecutor::new();
59//!
60//! // Execute a single step
61//! let result = executor.execute_step(
62//!     saga_id,
63//!     1,
64//!     "createOrder",
65//!     &json!({"customerId": "c123", "total": 100.0}),
66//!     "orders-service"
67//! ).await?;
68//!
69//! if result.success {
70//!     println!("Step 1 created order: {:?}", result.data);
71//! } else {
72//!     println!("Step 1 failed: {}", result.error.unwrap());
73//! }
74//! ```
75
76use std::{sync::Arc, time::Instant};
77
78use tracing::{debug, info, warn};
79use uuid::Uuid;
80
81use crate::federation::saga_store::{PostgresSagaStore, Result as SagaStoreResult, StepState};
82
83/// Represents a step result from execution
84///
85/// Contains the outcome of executing a single saga step, including:
86/// - Whether execution succeeded or failed
87/// - Result data if successful (entity with key fields and updated values)
88/// - Error details if failed
89/// - Execution metrics (duration)
90///
91/// The result data is stored and available for subsequent steps to reference
92/// via result chaining.
93#[derive(Debug, Clone)]
94pub struct StepExecutionResult {
95    /// Step number that executed (1-indexed)
96    pub step_number: u32,
97    /// Whether step succeeded (true) or failed (false)
98    pub success:     bool,
99    /// Result data if successful
100    ///
101    /// Contains:
102    /// - `__typename`: Entity type
103    /// - Key fields (id, etc.)
104    /// - Mutation output fields
105    /// - Timestamps
106    pub data:        Option<serde_json::Value>,
107    /// Error message if failed
108    ///
109    /// Includes:
110    /// - Error type (timeout, network, mutation failed, etc.)
111    /// - Subgraph context
112    /// - Suggestion for resolution
113    pub error:       Option<String>,
114    /// Execution duration in milliseconds
115    ///
116    /// Measured from step start to completion (or failure)
117    /// Useful for performance monitoring
118    pub duration_ms: u64,
119}
120
121/// Saga forward phase executor
122///
123/// Executes saga steps sequentially during the forward phase.
124/// Coordinates with saga store to persist state and handle failures.
125pub struct SagaExecutor {
126    /// Saga store for loading/saving saga state
127    /// Optional to support testing without database
128    store: Option<Arc<PostgresSagaStore>>,
129}
130
131impl SagaExecutor {
132    /// Create a new saga executor without a saga store
133    ///
134    /// This is suitable for testing. For production, use `with_store()`.
135    pub fn new() -> Self {
136        Self { store: None }
137    }
138
139    /// Create a new saga executor with a saga store
140    ///
141    /// This enables persistence of saga state and recovery from failures.
142    #[must_use]
143    pub fn with_store(store: Arc<PostgresSagaStore>) -> Self {
144        Self { store: Some(store) }
145    }
146
147    /// Check if executor has a saga store configured
148    #[must_use]
149    pub fn has_store(&self) -> bool {
150        self.store.is_some()
151    }
152
153    /// Execute a single saga step
154    ///
155    /// Executes a single mutation step within a saga, handling:
156    /// - Step state validation (Pending → Executing → Completed)
157    /// - @requires field pre-fetching from owning subgraphs
158    /// - Entity data augmentation with required fields
159    /// - Mutation execution via MutationExecutor
160    /// - Result capture and persistence
161    ///
162    /// # Arguments
163    ///
164    /// * `saga_id` - ID of saga being executed
165    /// * `step_number` - Step number to execute (1-indexed, 1 = first step)
166    /// * `mutation_name` - GraphQL mutation operation name
167    /// * `variables` - Input variables for the mutation (JSON value)
168    /// * `subgraph` - Target subgraph name (must exist in federation)
169    ///
170    /// # Returns
171    ///
172    /// `StepExecutionResult` with:
173    /// - `success`: true if step executed successfully
174    /// - `data`: Result entity data if successful
175    /// - `error`: Error description if failed
176    /// - `duration_ms`: Execution time for monitoring
177    ///
178    /// # Errors
179    ///
180    /// Returns `SagaStoreError` if:
181    /// - Saga not found in store
182    /// - Step already executed (not in Pending state)
183    /// - Subgraph unavailable
184    /// - Mutation execution fails
185    /// - @requires fields cannot be fetched
186    ///
187    /// # Example
188    ///
189    /// ```ignore
190    /// let executor = SagaExecutor::new();
191    /// let result = executor.execute_step(
192    ///     saga_id,
193    ///     1,
194    ///     "createOrder",
195    ///     &json!({"customerId": "c123", "total": 100.0}),
196    ///     "orders-service"
197    /// ).await?;
198    ///
199    /// if result.success {
200    ///     println!("Order created with data: {:?}", result.data);
201    /// } else {
202    ///     eprintln!("Step failed: {}", result.error.unwrap());
203    ///     // Compensation will be triggered by coordinator
204    /// }
205    /// ```
206    pub async fn execute_step(
207        &self,
208        saga_id: Uuid,
209        step_number: u32,
210        mutation_name: &str,
211        _variables: &serde_json::Value,
212        subgraph: &str,
213    ) -> SagaStoreResult<StepExecutionResult> {
214        let start_time = Instant::now();
215
216        info!(
217            saga_id = %saga_id,
218            step = step_number,
219            mutation = mutation_name,
220            subgraph = subgraph,
221            "Step execution started"
222        );
223
224        // 1. Validate step exists in saga (if store is available)
225        if let Some(store) = &self.store {
226            // Load saga to verify it exists
227            let saga = store.load_saga(saga_id).await.map_err(|e| {
228                warn!(saga_id = %saga_id, error = ?e, "Failed to load saga");
229                e
230            })?;
231
232            if saga.is_none() {
233                return Err(crate::federation::saga_store::SagaStoreError::SagaNotFound(saga_id));
234            }
235
236            // Load all steps for this saga
237            let steps = store.load_saga_steps(saga_id).await.map_err(|e| {
238                warn!(saga_id = %saga_id, error = ?e, "Failed to load saga steps");
239                e
240            })?;
241
242            // Find the step we're executing
243            let step_id = Uuid::new_v4(); // Placeholder ID for error reporting
244            let saga_step = steps
245                .iter()
246                .find(|s| s.order == step_number as usize)
247                .ok_or(crate::federation::saga_store::SagaStoreError::StepNotFound(step_id))?;
248
249            // 2. Check step state is Pending
250            if saga_step.state != StepState::Pending {
251                return Err(
252                    crate::federation::saga_store::SagaStoreError::InvalidStateTransition {
253                        from: format!("{:?}", saga_step.state),
254                        to:   "Executing".to_string(),
255                    },
256                );
257            }
258
259            // 3. Transition step to Executing
260            store
261                .update_saga_step_state(saga_step.id, &StepState::Executing)
262                .await
263                .map_err(|e| {
264                    warn!(saga_id = %saga_id, step = step_number, error = ?e, "Failed to transition step to Executing");
265                    e
266                })?;
267
268            info!(saga_id = %saga_id, step = step_number, "Step transitioned to Executing");
269
270            // 4. Execute mutation via MutationExecutor (placeholder implementation)
271            let result_data = serde_json::json!({
272                "__typename": saga_step.typename,
273                "id": format!("entity-{}-step-{}", saga_id, step_number),
274                mutation_name: "executed",
275            });
276
277            // 5. Capture result data and transition step to Completed
278            store
279                .update_saga_step_result(saga_step.id, &result_data)
280                .await
281                .map_err(|e| {
282                    warn!(saga_id = %saga_id, step = step_number, error = ?e, "Failed to save step result");
283                    e
284                })?;
285
286            // 6. Transition step to Completed
287            store
288                .update_saga_step_state(saga_step.id, &StepState::Completed)
289                .await
290                .map_err(|e| {
291                    warn!(saga_id = %saga_id, step = step_number, error = ?e, "Failed to transition step to Completed");
292                    e
293                })?;
294
295            info!(saga_id = %saga_id, step = step_number, "Step transitioned to Completed");
296
297            let duration_ms = start_time.elapsed().as_millis() as u64;
298
299            let result = StepExecutionResult {
300                step_number,
301                success: true,
302                data: Some(result_data),
303                error: None,
304                duration_ms,
305            };
306
307            info!(
308                saga_id = %saga_id,
309                step = step_number,
310                duration_ms = result.duration_ms,
311                "Step execution completed successfully"
312            );
313
314            Ok(result)
315        } else {
316            // No store available - return placeholder success for testing
317            debug!("No saga store available - returning placeholder result");
318
319            let duration_ms = start_time.elapsed().as_millis() as u64;
320
321            let result = StepExecutionResult {
322                step_number,
323                success: true,
324                data: Some(serde_json::json!({
325                    "__typename": "Entity",
326                    "id": format!("entity-{}", step_number),
327                    mutation_name: "ok"
328                })),
329                error: None,
330                duration_ms,
331            };
332
333            info!(
334                saga_id = %saga_id,
335                step = step_number,
336                duration_ms = result.duration_ms,
337                "Step execution completed (no store)"
338            );
339
340            Ok(result)
341        }
342    }
343
344    /// Execute all steps in a saga sequentially
345    ///
346    /// # Arguments
347    ///
348    /// * `saga_id` - ID of saga to execute
349    ///
350    /// # Returns
351    ///
352    /// Vector of step results (successful or failed)
353    pub async fn execute_saga(&self, saga_id: Uuid) -> SagaStoreResult<Vec<StepExecutionResult>> {
354        info!(saga_id = %saga_id, "Saga forward phase started");
355
356        // Execute all steps in order, stopping on first failure
357
358        // If no store available, return empty results (for testing)
359        let Some(store) = &self.store else {
360            debug!("No saga store available - returning empty results");
361            return Ok(vec![]);
362        };
363
364        // 1. Load saga from store
365        let saga = store.load_saga(saga_id).await.map_err(|e| {
366            warn!(saga_id = %saga_id, error = ?e, "Failed to load saga");
367            e
368        })?;
369
370        if saga.is_none() {
371            return Err(crate::federation::saga_store::SagaStoreError::SagaNotFound(saga_id));
372        }
373
374        // 2. Transition saga from Pending to Executing
375        store
376            .update_saga_state(saga_id, &crate::federation::saga_store::SagaState::Executing)
377            .await
378            .map_err(|e| {
379                warn!(saga_id = %saga_id, error = ?e, "Failed to transition saga to Executing");
380                e
381            })?;
382
383        info!(saga_id = %saga_id, "Saga transitioned to Executing");
384
385        // 3. Load all steps for this saga
386        let steps = store.load_saga_steps(saga_id).await.map_err(|e| {
387            warn!(saga_id = %saga_id, error = ?e, "Failed to load saga steps");
388            e
389        })?;
390
391        // Sort steps by order to ensure sequential execution
392        let mut steps = steps;
393        steps.sort_by_key(|s| s.order);
394
395        let mut results: Vec<StepExecutionResult> = vec![];
396        let mut saga_failed = false;
397
398        // 4. For each step in order
399        for step in steps {
400            info!(
401                saga_id = %saga_id,
402                step = step.order,
403                "Executing saga step"
404            );
405
406            // Execute this step
407            // Construct mutation name from mutation type and typename
408            let mutation_name = format!("{}_{}", step.mutation_type.as_str(), step.typename);
409
410            match self
411                .execute_step(
412                    saga_id,
413                    step.order as u32,
414                    &mutation_name,
415                    &step.variables,
416                    &step.subgraph,
417                )
418                .await
419            {
420                Ok(step_result) => {
421                    // Step succeeded - collect result and continue
422                    info!(
423                        saga_id = %saga_id,
424                        step = step.order,
425                        "Step executed successfully"
426                    );
427                    results.push(step_result);
428                },
429                Err(e) => {
430                    // Step failed - capture error and stop execution
431                    warn!(
432                        saga_id = %saga_id,
433                        step = step.order,
434                        error = ?e,
435                        "Step execution failed - stopping saga"
436                    );
437
438                    // Transition saga to Failed state
439                    if let Err(state_err) = store
440                        .update_saga_state(
441                            saga_id,
442                            &crate::federation::saga_store::SagaState::Failed,
443                        )
444                        .await
445                    {
446                        warn!(saga_id = %saga_id, error = ?state_err, "Failed to transition saga to Failed state");
447                    }
448
449                    saga_failed = true;
450                    break;
451                },
452            }
453        }
454
455        // 5. Update final saga state
456        if !saga_failed {
457            // All steps succeeded - transition to Completed
458            store
459                .update_saga_state(saga_id, &crate::federation::saga_store::SagaState::Completed)
460                .await
461                .map_err(|e| {
462                    warn!(saga_id = %saga_id, error = ?e, "Failed to transition saga to Completed");
463                    e
464                })?;
465
466            info!(
467                saga_id = %saga_id,
468                steps_completed = results.len(),
469                "Saga completed successfully"
470            );
471        }
472
473        info!(
474            saga_id = %saga_id,
475            steps_completed = results.len(),
476            "Saga forward phase completed"
477        );
478
479        Ok(results)
480    }
481
482    /// Get current execution state of saga
483    ///
484    /// # Arguments
485    ///
486    /// * `saga_id` - ID of saga
487    ///
488    /// # Returns
489    ///
490    /// Current execution state including completed steps
491    pub async fn get_execution_state(&self, saga_id: Uuid) -> SagaStoreResult<ExecutionState> {
492        // Load saga and steps from store to build current execution state
493
494        // If no store available, return minimal state
495        let Some(store) = &self.store else {
496            debug!(saga_id = %saga_id, "No saga store available - returning empty execution state");
497            let state = ExecutionState {
498                saga_id,
499                total_steps: 0,
500                completed_steps: 0,
501                current_step: None,
502                failed: false,
503                failure_reason: None,
504            };
505            return Ok(state);
506        };
507
508        // Load saga to get state and failure reason
509        let saga = store.load_saga(saga_id).await.map_err(|e| {
510            warn!(saga_id = %saga_id, error = ?e, "Failed to load saga for execution state");
511            e
512        })?;
513
514        let (total_steps, completed_steps, failed, failure_reason, current_step) = match saga {
515            Some(saga_data) => {
516                // Load all steps to count completion
517                let steps = store.load_saga_steps(saga_id).await.map_err(|e| {
518                    warn!(saga_id = %saga_id, error = ?e, "Failed to load saga steps for execution state");
519                    e
520                })?;
521
522                let total = steps.len() as u32;
523
524                // Count completed steps
525                let completed =
526                    steps.iter().filter(|s| s.state == StepState::Completed).count() as u32;
527
528                // Find first non-completed step as current_step
529                let current =
530                    steps.iter().find(|s| s.state != StepState::Completed).map(|s| s.order as u32);
531
532                // Check if saga failed
533                let is_failed = saga_data.state == crate::federation::saga_store::SagaState::Failed;
534
535                (total, completed, is_failed, None, current)
536            },
537            None => {
538                // Saga not found - return zero state
539                (0, 0, false, None, None)
540            },
541        };
542
543        let state = ExecutionState {
544            saga_id,
545            total_steps,
546            completed_steps,
547            current_step,
548            failed,
549            failure_reason,
550        };
551
552        debug!(
553            saga_id = %saga_id,
554            total_steps = state.total_steps,
555            completed_steps = state.completed_steps,
556            failed = state.failed,
557            "Execution state queried"
558        );
559
560        Ok(state)
561    }
562
563    /// Check if step is safe to execute
564    ///
565    /// Validates:
566    /// - Step exists in saga
567    /// - Step is in Pending state
568    /// - All @requires fields are available
569    /// - Previous steps completed successfully
570    ///
571    /// Fetch any @requires fields before step execution
572    ///
573    /// Identifies fields required by a mutation and fetches them from
574    /// other subgraphs if needed. This ensures all necessary data is
575    /// available before executing the mutation.
576    // Reason: tested in isolation, not yet called from execute_step()
577    #[allow(dead_code)]
578    async fn pre_fetch_requires_fields(
579        &self,
580        saga_id: Uuid,
581        step_number: u32,
582    ) -> SagaStoreResult<serde_json::Value> {
583        // In a full implementation, would:
584        // 1. Load step from saga store
585        // 2. Extract @requires directive from mutation schema
586        // 3. For each @requires field:
587        //    - Determine owning subgraph
588        //    - Create entity query to fetch the field
589        //    - Execute query against subgraph
590        // 4. Collect and return all fetched fields as JSON object
591
592        info!(
593            saga_id = %saga_id,
594            step_number = step_number,
595            "Pre-fetching @requires fields"
596        );
597
598        // For now, return empty object (no @requires fields)
599        // In production: would merge fields from entity resolver
600        Ok(serde_json::json!({}))
601    }
602
603    /// Build augmented entity data with @requires fields
604    ///
605    /// Merges @requires fields into the entity data, ensuring all
606    /// necessary fields are present for mutation execution.
607    // Reason: tested in isolation, not yet called from execute_step()
608    #[allow(dead_code)]
609    fn augment_entity_with_requires(
610        &self,
611        entity_data: serde_json::Value,
612        requires_fields: serde_json::Value,
613    ) -> serde_json::Value {
614        // In a full implementation, would:
615        // 1. Deep merge requires_fields into entity_data
616        // 2. Handle nested object paths (e.g., "product.price")
617        // 3. Validate all @requires fields are present
618        // 4. Return fully augmented entity
619
620        match (entity_data, requires_fields) {
621            (serde_json::Value::Object(mut entity), serde_json::Value::Object(requires)) => {
622                // Merge @requires fields into entity
623                for (key, value) in requires {
624                    entity.insert(key, value);
625                }
626                serde_json::Value::Object(entity)
627            },
628            (entity, _) => {
629                // If entity is not an object, return as-is
630                entity
631            },
632        }
633    }
634}
635
636impl Default for SagaExecutor {
637    fn default() -> Self {
638        Self::new()
639    }
640}
641
642/// Current execution state of a saga
643#[derive(Debug, Clone)]
644pub struct ExecutionState {
645    /// Saga identifier
646    pub saga_id:         Uuid,
647    /// Total steps in saga
648    pub total_steps:     u32,
649    /// Number of completed steps
650    pub completed_steps: u32,
651    /// Currently executing step, if any
652    pub current_step:    Option<u32>,
653    /// Whether saga has failed
654    pub failed:          bool,
655    /// Reason for failure, if any
656    pub failure_reason:  Option<String>,
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662
663    #[test]
664    fn test_saga_executor_creation() {
665        let executor = SagaExecutor::new();
666        drop(executor);
667    }
668
669    #[test]
670    fn test_saga_executor_default() {
671        let _executor = SagaExecutor::default();
672        // Default should work
673    }
674
675    #[tokio::test]
676    async fn test_step_execution_result() {
677        let executor = SagaExecutor::new();
678        let saga_id = Uuid::new_v4();
679        let result = executor
680            .execute_step(saga_id, 1, "testMutation", &serde_json::json!({}), "test-service")
681            .await;
682
683        assert!(result.is_ok());
684        let step_result = result.unwrap();
685        assert_eq!(step_result.step_number, 1);
686        assert!(step_result.success);
687    }
688
689    #[tokio::test]
690    async fn test_get_execution_state() {
691        let executor = SagaExecutor::new();
692        let saga_id = Uuid::new_v4();
693        let state = executor.get_execution_state(saga_id).await;
694
695        assert!(state.is_ok());
696    }
697
698    #[test]
699    fn test_saga_executor_with_store() {
700        // Test that we can create an executor with a store reference
701        // Full store testing requires database setup (integration tests)
702        let executor = SagaExecutor::new();
703        assert!(!executor.has_store());
704    }
705
706    #[tokio::test]
707    async fn test_execute_step_without_store() {
708        // Verify that execute_step works without a store (fallback mode)
709        let executor = SagaExecutor::new();
710        let saga_id = Uuid::new_v4();
711        let result = executor
712            .execute_step(saga_id, 1, "testMutation", &serde_json::json!({}), "test-service")
713            .await;
714
715        assert!(result.is_ok());
716        let step_result = result.unwrap();
717        assert_eq!(step_result.step_number, 1);
718        assert!(step_result.success);
719        assert!(step_result.error.is_none());
720    }
721
722    #[tokio::test]
723    async fn test_execute_saga_without_store() {
724        // Verify execute_saga returns empty results without store
725        let executor = SagaExecutor::new();
726        let saga_id = Uuid::new_v4();
727        let results = executor.execute_saga(saga_id).await;
728
729        assert!(results.is_ok());
730        let step_results = results.unwrap();
731        assert_eq!(step_results.len(), 0);
732    }
733
734    #[tokio::test]
735    async fn test_execute_saga_loads_saga_from_store() {
736        // Verify that execute_saga attempts to load saga from store
737        // This test verifies the store integration point
738        let executor = SagaExecutor::new();
739        let saga_id = Uuid::new_v4();
740
741        // Without a store, should get empty results
742        let results = executor.execute_saga(saga_id).await;
743        assert!(results.is_ok());
744    }
745
746    #[tokio::test]
747    async fn test_execute_all_steps_sequentially() {
748        // Verify that steps are executed in order
749        let executor = SagaExecutor::new();
750        let saga_id = Uuid::new_v4();
751
752        // Execute multiple steps
753        for step_num in 1..=3 {
754            let result = executor
755                .execute_step(
756                    saga_id,
757                    step_num,
758                    "testMutation",
759                    &serde_json::json!({}),
760                    "test-service",
761                )
762                .await;
763
764            assert!(result.is_ok());
765            let step_result = result.unwrap();
766            assert_eq!(step_result.step_number, step_num);
767            assert!(step_result.success);
768        }
769    }
770
771    #[tokio::test]
772    async fn test_saga_maintains_step_order() {
773        // Verify that saga execution maintains step order
774        let executor = SagaExecutor::new();
775        let saga_id = Uuid::new_v4();
776
777        let mut results = vec![];
778        for step_num in 1..=3 {
779            let result = executor
780                .execute_step(saga_id, step_num, "mutation", &serde_json::json!({}), "service")
781                .await;
782
783            if let Ok(step_result) = result {
784                results.push(step_result);
785            }
786        }
787
788        // Verify order is maintained
789        for (i, result) in results.iter().enumerate() {
790            assert_eq!(result.step_number, (i + 1) as u32);
791        }
792    }
793
794    #[tokio::test]
795    async fn test_get_execution_state_without_store() {
796        // Verify get_execution_state works without store
797        let executor = SagaExecutor::new();
798        let saga_id = Uuid::new_v4();
799        let state = executor.get_execution_state(saga_id).await;
800
801        assert!(state.is_ok());
802        let execution_state = state.unwrap();
803        assert_eq!(execution_state.saga_id, saga_id);
804        assert_eq!(execution_state.total_steps, 0);
805        assert_eq!(execution_state.completed_steps, 0);
806        assert!(!execution_state.failed);
807    }
808
809    #[tokio::test]
810    async fn test_execution_state_tracks_progress() {
811        // Verify that execution state can track progress
812        let executor = SagaExecutor::new();
813        let saga_id = Uuid::new_v4();
814
815        // Execute some steps
816        for step_num in 1..=2 {
817            let _ = executor
818                .execute_step(saga_id, step_num, "mutation", &serde_json::json!({}), "service")
819                .await;
820        }
821
822        // Get execution state
823        let state = executor.get_execution_state(saga_id).await;
824        assert!(state.is_ok());
825    }
826
827    #[tokio::test]
828    async fn test_step_execution_captures_success_in_result() {
829        // Verify that successful step execution captures data
830        let executor = SagaExecutor::new();
831        let saga_id = Uuid::new_v4();
832
833        let result = executor
834            .execute_step(saga_id, 1, "createOrder", &serde_json::json!({}), "orders-service")
835            .await;
836
837        assert!(result.is_ok());
838        let step_result = result.unwrap();
839        assert!(step_result.success);
840        assert!(step_result.data.is_some());
841        assert!(step_result.error.is_none());
842    }
843
844    #[tokio::test]
845    async fn test_step_failure_detected() {
846        // Verify that step failure is detected and captured
847        // Tests limited scenario without a backing store. Full failure testing
848        // requires mutation executor integration.
849        let executor = SagaExecutor::new();
850        let saga_id = Uuid::new_v4();
851
852        let result = executor
853            .execute_step(saga_id, 1, "mutation", &serde_json::json!({}), "service")
854            .await;
855
856        assert!(result.is_ok());
857        // Success case without store - actual failure testing requires mutation executor
858        // integration
859    }
860
861    #[tokio::test]
862    async fn test_execution_result_includes_metrics() {
863        // Verify that execution results include timing metrics
864        let executor = SagaExecutor::new();
865        let saga_id = Uuid::new_v4();
866
867        let result = executor
868            .execute_step(saga_id, 1, "mutation", &serde_json::json!({}), "service")
869            .await;
870
871        assert!(result.is_ok());
872        let step_result = result.unwrap();
873        // Verify that duration is measured
874        let _ = step_result.duration_ms;
875    }
876
877    #[tokio::test]
878    async fn test_pre_fetch_requires_fields() {
879        let executor = SagaExecutor::new();
880        let saga_id = Uuid::new_v4();
881
882        let requires_fields = executor.pre_fetch_requires_fields(saga_id, 1).await;
883
884        assert!(requires_fields.is_ok());
885        let fields = requires_fields.unwrap();
886        assert_eq!(fields, serde_json::json!({}));
887    }
888
889    #[test]
890    fn test_augment_entity_with_requires() {
891        let executor = SagaExecutor::new();
892
893        let entity = serde_json::json!({
894            "id": "user-123",
895            "name": "Alice"
896        });
897
898        let requires = serde_json::json!({
899            "email": "alice@example.com",
900            "role": "admin"
901        });
902
903        let result = executor.augment_entity_with_requires(entity, requires);
904
905        // Verify augmented entity contains both original and @requires fields
906        assert_eq!(result.get("id").and_then(|v| v.as_str()), Some("user-123"));
907        assert_eq!(result.get("name").and_then(|v| v.as_str()), Some("Alice"));
908        assert_eq!(result.get("email").and_then(|v| v.as_str()), Some("alice@example.com"));
909        assert_eq!(result.get("role").and_then(|v| v.as_str()), Some("admin"));
910    }
911
912    #[test]
913    fn test_augment_entity_preserves_original_fields() {
914        let executor = SagaExecutor::new();
915
916        let entity = serde_json::json!({
917            "id": "product-456",
918            "price": 99.99
919        });
920
921        let requires = serde_json::json!({
922            "category": "electronics"
923        });
924
925        let result = executor.augment_entity_with_requires(entity, requires);
926
927        assert_eq!(result.get("id").and_then(|v| v.as_str()), Some("product-456"));
928        assert_eq!(result.get("price").and_then(|v| v.as_f64()), Some(99.99));
929        assert_eq!(result.get("category").and_then(|v| v.as_str()), Some("electronics"));
930    }
931
932    #[test]
933    fn test_augment_entity_overwrites_conflicting_fields() {
934        let executor = SagaExecutor::new();
935
936        let entity = serde_json::json!({
937            "id": "user-123",
938            "status": "inactive"
939        });
940
941        let requires = serde_json::json!({
942            "status": "active"
943        });
944
945        let result = executor.augment_entity_with_requires(entity, requires);
946
947        // @requires should overwrite original value
948        assert_eq!(result.get("status").and_then(|v| v.as_str()), Some("active"));
949    }
950
951    #[test]
952    fn test_augment_entity_with_empty_requires() {
953        let executor = SagaExecutor::new();
954
955        let entity = serde_json::json!({
956            "id": "test-123"
957        });
958
959        let requires = serde_json::json!({});
960
961        let result = executor.augment_entity_with_requires(entity, requires);
962
963        // Should return entity unchanged
964        assert_eq!(result.get("id").and_then(|v| v.as_str()), Some("test-123"));
965    }
966}