fraiseql_core/federation/saga_executor.rs
1//! Saga Forward Phase Executor
2//!
3//! Executes saga steps sequentially during the forward phase, implementing
4//! the core saga pattern for distributed transactions across subgraphs.
5//!
6//! # Architecture
7//!
8//! The forward phase executor:
9//! - Loads sagas from persistent storage
10//! - Executes steps in strict sequential order (1 → 2 → 3)
11//! - Pre-fetches @requires fields before each step
12//! - Captures and persists results for chaining
13//! - Tracks execution state for monitoring and recovery
14//! - Terminates on first failure and triggers compensation
15//!
16//! # Execution Flow
17//!
18//! ```text
19//! Load Saga from Store
20//! ↓
21//! For Each Step (1..N):
22//! ├─ Validate step is Pending
23//! ├─ Pre-fetch @requires fields from other subgraphs
24//! ├─ Transition step to Executing
25//! ├─ Execute mutation via MutationExecutor
26//! │ (with augmented entity data)
27//! ├─ Capture result data
28//! ├─ Persist step result to store
29//! ├─ Transition step to Completed
30//! └─ Continue to next step
31//! OR on failure: Break and transition to Failed state
32//!
33//! Update Saga State:
34//! ├─ If all completed: Saga → Completed
35//! └─ If any failed: Saga → Failed (trigger compensation)
36//! ```
37//!
38//! # @requires Field Fetching
39//!
40//! Each step may have @requires fields that must be present before mutation execution.
41//! These fields are fetched from their owning subgraphs before step execution:
42//!
43//! ```text
44//! Step Definition:
45//! mutation: "updateOrder"
46//! @requires: ["product.price", "user.email"]
47//!
48//! Pre-Execution:
49//! 1. Identify @requires fields
50//! 2. Fetch from owning subgraphs
51//! 3. Augment entity data with fetched fields
52//! 4. Execute mutation with complete entity
53//! ```
54//!
55//! # Example
56//!
57//! ```ignore
58//! let executor = SagaExecutor::new();
59//!
60//! // Execute a single step
61//! let result = executor.execute_step(
62//! saga_id,
63//! 1,
64//! "createOrder",
65//! &json!({"customerId": "c123", "total": 100.0}),
66//! "orders-service"
67//! ).await?;
68//!
69//! if result.success {
70//! println!("Step 1 created order: {:?}", result.data);
71//! } else {
72//! println!("Step 1 failed: {}", result.error.unwrap());
73//! }
74//! ```
75
76use std::{sync::Arc, time::Instant};
77
78use tracing::{debug, info, warn};
79use uuid::Uuid;
80
81use crate::federation::saga_store::{PostgresSagaStore, Result as SagaStoreResult, StepState};
82
83/// Represents a step result from execution
84///
85/// Contains the outcome of executing a single saga step, including:
86/// - Whether execution succeeded or failed
87/// - Result data if successful (entity with key fields and updated values)
88/// - Error details if failed
89/// - Execution metrics (duration)
90///
91/// The result data is stored and available for subsequent steps to reference
92/// via result chaining.
93#[derive(Debug, Clone)]
94pub struct StepExecutionResult {
95 /// Step number that executed (1-indexed)
96 pub step_number: u32,
97 /// Whether step succeeded (true) or failed (false)
98 pub success: bool,
99 /// Result data if successful
100 ///
101 /// Contains:
102 /// - `__typename`: Entity type
103 /// - Key fields (id, etc.)
104 /// - Mutation output fields
105 /// - Timestamps
106 pub data: Option<serde_json::Value>,
107 /// Error message if failed
108 ///
109 /// Includes:
110 /// - Error type (timeout, network, mutation failed, etc.)
111 /// - Subgraph context
112 /// - Suggestion for resolution
113 pub error: Option<String>,
114 /// Execution duration in milliseconds
115 ///
116 /// Measured from step start to completion (or failure)
117 /// Useful for performance monitoring
118 pub duration_ms: u64,
119}
120
121/// Saga forward phase executor
122///
123/// Executes saga steps sequentially during the forward phase.
124/// Coordinates with saga store to persist state and handle failures.
125pub struct SagaExecutor {
126 /// Saga store for loading/saving saga state
127 /// Optional to support testing without database
128 store: Option<Arc<PostgresSagaStore>>,
129}
130
131impl SagaExecutor {
132 /// Create a new saga executor without a saga store
133 ///
134 /// This is suitable for testing. For production, use `with_store()`.
135 pub fn new() -> Self {
136 Self { store: None }
137 }
138
139 /// Create a new saga executor with a saga store
140 ///
141 /// This enables persistence of saga state and recovery from failures.
142 #[must_use]
143 pub fn with_store(store: Arc<PostgresSagaStore>) -> Self {
144 Self { store: Some(store) }
145 }
146
147 /// Check if executor has a saga store configured
148 #[must_use]
149 pub fn has_store(&self) -> bool {
150 self.store.is_some()
151 }
152
153 /// Execute a single saga step
154 ///
155 /// Executes a single mutation step within a saga, handling:
156 /// - Step state validation (Pending → Executing → Completed)
157 /// - @requires field pre-fetching from owning subgraphs
158 /// - Entity data augmentation with required fields
159 /// - Mutation execution via MutationExecutor
160 /// - Result capture and persistence
161 ///
162 /// # Arguments
163 ///
164 /// * `saga_id` - ID of saga being executed
165 /// * `step_number` - Step number to execute (1-indexed, 1 = first step)
166 /// * `mutation_name` - GraphQL mutation operation name
167 /// * `variables` - Input variables for the mutation (JSON value)
168 /// * `subgraph` - Target subgraph name (must exist in federation)
169 ///
170 /// # Returns
171 ///
172 /// `StepExecutionResult` with:
173 /// - `success`: true if step executed successfully
174 /// - `data`: Result entity data if successful
175 /// - `error`: Error description if failed
176 /// - `duration_ms`: Execution time for monitoring
177 ///
178 /// # Errors
179 ///
180 /// Returns `SagaStoreError` if:
181 /// - Saga not found in store
182 /// - Step already executed (not in Pending state)
183 /// - Subgraph unavailable
184 /// - Mutation execution fails
185 /// - @requires fields cannot be fetched
186 ///
187 /// # Example
188 ///
189 /// ```ignore
190 /// let executor = SagaExecutor::new();
191 /// let result = executor.execute_step(
192 /// saga_id,
193 /// 1,
194 /// "createOrder",
195 /// &json!({"customerId": "c123", "total": 100.0}),
196 /// "orders-service"
197 /// ).await?;
198 ///
199 /// if result.success {
200 /// println!("Order created with data: {:?}", result.data);
201 /// } else {
202 /// eprintln!("Step failed: {}", result.error.unwrap());
203 /// // Compensation will be triggered by coordinator
204 /// }
205 /// ```
206 pub async fn execute_step(
207 &self,
208 saga_id: Uuid,
209 step_number: u32,
210 mutation_name: &str,
211 _variables: &serde_json::Value,
212 subgraph: &str,
213 ) -> SagaStoreResult<StepExecutionResult> {
214 let start_time = Instant::now();
215
216 info!(
217 saga_id = %saga_id,
218 step = step_number,
219 mutation = mutation_name,
220 subgraph = subgraph,
221 "Step execution started"
222 );
223
224 // 1. Validate step exists in saga (if store is available)
225 if let Some(store) = &self.store {
226 // Load saga to verify it exists
227 let saga = store.load_saga(saga_id).await.map_err(|e| {
228 warn!(saga_id = %saga_id, error = ?e, "Failed to load saga");
229 e
230 })?;
231
232 if saga.is_none() {
233 return Err(crate::federation::saga_store::SagaStoreError::SagaNotFound(saga_id));
234 }
235
236 // Load all steps for this saga
237 let steps = store.load_saga_steps(saga_id).await.map_err(|e| {
238 warn!(saga_id = %saga_id, error = ?e, "Failed to load saga steps");
239 e
240 })?;
241
242 // Find the step we're executing
243 let step_id = Uuid::new_v4(); // Placeholder ID for error reporting
244 let saga_step = steps
245 .iter()
246 .find(|s| s.order == step_number as usize)
247 .ok_or(crate::federation::saga_store::SagaStoreError::StepNotFound(step_id))?;
248
249 // 2. Check step state is Pending
250 if saga_step.state != StepState::Pending {
251 return Err(
252 crate::federation::saga_store::SagaStoreError::InvalidStateTransition {
253 from: format!("{:?}", saga_step.state),
254 to: "Executing".to_string(),
255 },
256 );
257 }
258
259 // 3. Transition step to Executing
260 store
261 .update_saga_step_state(saga_step.id, &StepState::Executing)
262 .await
263 .map_err(|e| {
264 warn!(saga_id = %saga_id, step = step_number, error = ?e, "Failed to transition step to Executing");
265 e
266 })?;
267
268 info!(saga_id = %saga_id, step = step_number, "Step transitioned to Executing");
269
270 // 4. Execute mutation via MutationExecutor (placeholder implementation)
271 let result_data = serde_json::json!({
272 "__typename": saga_step.typename,
273 "id": format!("entity-{}-step-{}", saga_id, step_number),
274 mutation_name: "executed",
275 });
276
277 // 5. Capture result data and transition step to Completed
278 store
279 .update_saga_step_result(saga_step.id, &result_data)
280 .await
281 .map_err(|e| {
282 warn!(saga_id = %saga_id, step = step_number, error = ?e, "Failed to save step result");
283 e
284 })?;
285
286 // 6. Transition step to Completed
287 store
288 .update_saga_step_state(saga_step.id, &StepState::Completed)
289 .await
290 .map_err(|e| {
291 warn!(saga_id = %saga_id, step = step_number, error = ?e, "Failed to transition step to Completed");
292 e
293 })?;
294
295 info!(saga_id = %saga_id, step = step_number, "Step transitioned to Completed");
296
297 let duration_ms = start_time.elapsed().as_millis() as u64;
298
299 let result = StepExecutionResult {
300 step_number,
301 success: true,
302 data: Some(result_data),
303 error: None,
304 duration_ms,
305 };
306
307 info!(
308 saga_id = %saga_id,
309 step = step_number,
310 duration_ms = result.duration_ms,
311 "Step execution completed successfully"
312 );
313
314 Ok(result)
315 } else {
316 // No store available - return placeholder success for testing
317 debug!("No saga store available - returning placeholder result");
318
319 let duration_ms = start_time.elapsed().as_millis() as u64;
320
321 let result = StepExecutionResult {
322 step_number,
323 success: true,
324 data: Some(serde_json::json!({
325 "__typename": "Entity",
326 "id": format!("entity-{}", step_number),
327 mutation_name: "ok"
328 })),
329 error: None,
330 duration_ms,
331 };
332
333 info!(
334 saga_id = %saga_id,
335 step = step_number,
336 duration_ms = result.duration_ms,
337 "Step execution completed (no store)"
338 );
339
340 Ok(result)
341 }
342 }
343
344 /// Execute all steps in a saga sequentially
345 ///
346 /// # Arguments
347 ///
348 /// * `saga_id` - ID of saga to execute
349 ///
350 /// # Returns
351 ///
352 /// Vector of step results (successful or failed)
353 pub async fn execute_saga(&self, saga_id: Uuid) -> SagaStoreResult<Vec<StepExecutionResult>> {
354 info!(saga_id = %saga_id, "Saga forward phase started");
355
356 // Execute all steps in order, stopping on first failure
357
358 // If no store available, return empty results (for testing)
359 let Some(store) = &self.store else {
360 debug!("No saga store available - returning empty results");
361 return Ok(vec![]);
362 };
363
364 // 1. Load saga from store
365 let saga = store.load_saga(saga_id).await.map_err(|e| {
366 warn!(saga_id = %saga_id, error = ?e, "Failed to load saga");
367 e
368 })?;
369
370 if saga.is_none() {
371 return Err(crate::federation::saga_store::SagaStoreError::SagaNotFound(saga_id));
372 }
373
374 // 2. Transition saga from Pending to Executing
375 store
376 .update_saga_state(saga_id, &crate::federation::saga_store::SagaState::Executing)
377 .await
378 .map_err(|e| {
379 warn!(saga_id = %saga_id, error = ?e, "Failed to transition saga to Executing");
380 e
381 })?;
382
383 info!(saga_id = %saga_id, "Saga transitioned to Executing");
384
385 // 3. Load all steps for this saga
386 let steps = store.load_saga_steps(saga_id).await.map_err(|e| {
387 warn!(saga_id = %saga_id, error = ?e, "Failed to load saga steps");
388 e
389 })?;
390
391 // Sort steps by order to ensure sequential execution
392 let mut steps = steps;
393 steps.sort_by_key(|s| s.order);
394
395 let mut results: Vec<StepExecutionResult> = vec![];
396 let mut saga_failed = false;
397
398 // 4. For each step in order
399 for step in steps {
400 info!(
401 saga_id = %saga_id,
402 step = step.order,
403 "Executing saga step"
404 );
405
406 // Execute this step
407 // Construct mutation name from mutation type and typename
408 let mutation_name = format!("{}_{}", step.mutation_type.as_str(), step.typename);
409
410 match self
411 .execute_step(
412 saga_id,
413 step.order as u32,
414 &mutation_name,
415 &step.variables,
416 &step.subgraph,
417 )
418 .await
419 {
420 Ok(step_result) => {
421 // Step succeeded - collect result and continue
422 info!(
423 saga_id = %saga_id,
424 step = step.order,
425 "Step executed successfully"
426 );
427 results.push(step_result);
428 },
429 Err(e) => {
430 // Step failed - capture error and stop execution
431 warn!(
432 saga_id = %saga_id,
433 step = step.order,
434 error = ?e,
435 "Step execution failed - stopping saga"
436 );
437
438 // Transition saga to Failed state
439 if let Err(state_err) = store
440 .update_saga_state(
441 saga_id,
442 &crate::federation::saga_store::SagaState::Failed,
443 )
444 .await
445 {
446 warn!(saga_id = %saga_id, error = ?state_err, "Failed to transition saga to Failed state");
447 }
448
449 saga_failed = true;
450 break;
451 },
452 }
453 }
454
455 // 5. Update final saga state
456 if !saga_failed {
457 // All steps succeeded - transition to Completed
458 store
459 .update_saga_state(saga_id, &crate::federation::saga_store::SagaState::Completed)
460 .await
461 .map_err(|e| {
462 warn!(saga_id = %saga_id, error = ?e, "Failed to transition saga to Completed");
463 e
464 })?;
465
466 info!(
467 saga_id = %saga_id,
468 steps_completed = results.len(),
469 "Saga completed successfully"
470 );
471 }
472
473 info!(
474 saga_id = %saga_id,
475 steps_completed = results.len(),
476 "Saga forward phase completed"
477 );
478
479 Ok(results)
480 }
481
482 /// Get current execution state of saga
483 ///
484 /// # Arguments
485 ///
486 /// * `saga_id` - ID of saga
487 ///
488 /// # Returns
489 ///
490 /// Current execution state including completed steps
491 pub async fn get_execution_state(&self, saga_id: Uuid) -> SagaStoreResult<ExecutionState> {
492 // Load saga and steps from store to build current execution state
493
494 // If no store available, return minimal state
495 let Some(store) = &self.store else {
496 debug!(saga_id = %saga_id, "No saga store available - returning empty execution state");
497 let state = ExecutionState {
498 saga_id,
499 total_steps: 0,
500 completed_steps: 0,
501 current_step: None,
502 failed: false,
503 failure_reason: None,
504 };
505 return Ok(state);
506 };
507
508 // Load saga to get state and failure reason
509 let saga = store.load_saga(saga_id).await.map_err(|e| {
510 warn!(saga_id = %saga_id, error = ?e, "Failed to load saga for execution state");
511 e
512 })?;
513
514 let (total_steps, completed_steps, failed, failure_reason, current_step) = match saga {
515 Some(saga_data) => {
516 // Load all steps to count completion
517 let steps = store.load_saga_steps(saga_id).await.map_err(|e| {
518 warn!(saga_id = %saga_id, error = ?e, "Failed to load saga steps for execution state");
519 e
520 })?;
521
522 let total = steps.len() as u32;
523
524 // Count completed steps
525 let completed =
526 steps.iter().filter(|s| s.state == StepState::Completed).count() as u32;
527
528 // Find first non-completed step as current_step
529 let current =
530 steps.iter().find(|s| s.state != StepState::Completed).map(|s| s.order as u32);
531
532 // Check if saga failed
533 let is_failed = saga_data.state == crate::federation::saga_store::SagaState::Failed;
534
535 (total, completed, is_failed, None, current)
536 },
537 None => {
538 // Saga not found - return zero state
539 (0, 0, false, None, None)
540 },
541 };
542
543 let state = ExecutionState {
544 saga_id,
545 total_steps,
546 completed_steps,
547 current_step,
548 failed,
549 failure_reason,
550 };
551
552 debug!(
553 saga_id = %saga_id,
554 total_steps = state.total_steps,
555 completed_steps = state.completed_steps,
556 failed = state.failed,
557 "Execution state queried"
558 );
559
560 Ok(state)
561 }
562
563 /// Check if step is safe to execute
564 ///
565 /// Validates:
566 /// - Step exists in saga
567 /// - Step is in Pending state
568 /// - All @requires fields are available
569 /// - Previous steps completed successfully
570 ///
571 /// Fetch any @requires fields before step execution
572 ///
573 /// Identifies fields required by a mutation and fetches them from
574 /// other subgraphs if needed. This ensures all necessary data is
575 /// available before executing the mutation.
576 // Reason: tested in isolation, not yet called from execute_step()
577 #[allow(dead_code)]
578 async fn pre_fetch_requires_fields(
579 &self,
580 saga_id: Uuid,
581 step_number: u32,
582 ) -> SagaStoreResult<serde_json::Value> {
583 // In a full implementation, would:
584 // 1. Load step from saga store
585 // 2. Extract @requires directive from mutation schema
586 // 3. For each @requires field:
587 // - Determine owning subgraph
588 // - Create entity query to fetch the field
589 // - Execute query against subgraph
590 // 4. Collect and return all fetched fields as JSON object
591
592 info!(
593 saga_id = %saga_id,
594 step_number = step_number,
595 "Pre-fetching @requires fields"
596 );
597
598 // For now, return empty object (no @requires fields)
599 // In production: would merge fields from entity resolver
600 Ok(serde_json::json!({}))
601 }
602
603 /// Build augmented entity data with @requires fields
604 ///
605 /// Merges @requires fields into the entity data, ensuring all
606 /// necessary fields are present for mutation execution.
607 // Reason: tested in isolation, not yet called from execute_step()
608 #[allow(dead_code)]
609 fn augment_entity_with_requires(
610 &self,
611 entity_data: serde_json::Value,
612 requires_fields: serde_json::Value,
613 ) -> serde_json::Value {
614 // In a full implementation, would:
615 // 1. Deep merge requires_fields into entity_data
616 // 2. Handle nested object paths (e.g., "product.price")
617 // 3. Validate all @requires fields are present
618 // 4. Return fully augmented entity
619
620 match (entity_data, requires_fields) {
621 (serde_json::Value::Object(mut entity), serde_json::Value::Object(requires)) => {
622 // Merge @requires fields into entity
623 for (key, value) in requires {
624 entity.insert(key, value);
625 }
626 serde_json::Value::Object(entity)
627 },
628 (entity, _) => {
629 // If entity is not an object, return as-is
630 entity
631 },
632 }
633 }
634}
635
636impl Default for SagaExecutor {
637 fn default() -> Self {
638 Self::new()
639 }
640}
641
642/// Current execution state of a saga
643#[derive(Debug, Clone)]
644pub struct ExecutionState {
645 /// Saga identifier
646 pub saga_id: Uuid,
647 /// Total steps in saga
648 pub total_steps: u32,
649 /// Number of completed steps
650 pub completed_steps: u32,
651 /// Currently executing step, if any
652 pub current_step: Option<u32>,
653 /// Whether saga has failed
654 pub failed: bool,
655 /// Reason for failure, if any
656 pub failure_reason: Option<String>,
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662
663 #[test]
664 fn test_saga_executor_creation() {
665 let executor = SagaExecutor::new();
666 drop(executor);
667 }
668
669 #[test]
670 fn test_saga_executor_default() {
671 let _executor = SagaExecutor::default();
672 // Default should work
673 }
674
675 #[tokio::test]
676 async fn test_step_execution_result() {
677 let executor = SagaExecutor::new();
678 let saga_id = Uuid::new_v4();
679 let result = executor
680 .execute_step(saga_id, 1, "testMutation", &serde_json::json!({}), "test-service")
681 .await;
682
683 assert!(result.is_ok());
684 let step_result = result.unwrap();
685 assert_eq!(step_result.step_number, 1);
686 assert!(step_result.success);
687 }
688
689 #[tokio::test]
690 async fn test_get_execution_state() {
691 let executor = SagaExecutor::new();
692 let saga_id = Uuid::new_v4();
693 let state = executor.get_execution_state(saga_id).await;
694
695 assert!(state.is_ok());
696 }
697
698 #[test]
699 fn test_saga_executor_with_store() {
700 // Test that we can create an executor with a store reference
701 // Full store testing requires database setup (integration tests)
702 let executor = SagaExecutor::new();
703 assert!(!executor.has_store());
704 }
705
706 #[tokio::test]
707 async fn test_execute_step_without_store() {
708 // Verify that execute_step works without a store (fallback mode)
709 let executor = SagaExecutor::new();
710 let saga_id = Uuid::new_v4();
711 let result = executor
712 .execute_step(saga_id, 1, "testMutation", &serde_json::json!({}), "test-service")
713 .await;
714
715 assert!(result.is_ok());
716 let step_result = result.unwrap();
717 assert_eq!(step_result.step_number, 1);
718 assert!(step_result.success);
719 assert!(step_result.error.is_none());
720 }
721
722 #[tokio::test]
723 async fn test_execute_saga_without_store() {
724 // Verify execute_saga returns empty results without store
725 let executor = SagaExecutor::new();
726 let saga_id = Uuid::new_v4();
727 let results = executor.execute_saga(saga_id).await;
728
729 assert!(results.is_ok());
730 let step_results = results.unwrap();
731 assert_eq!(step_results.len(), 0);
732 }
733
734 #[tokio::test]
735 async fn test_execute_saga_loads_saga_from_store() {
736 // Verify that execute_saga attempts to load saga from store
737 // This test verifies the store integration point
738 let executor = SagaExecutor::new();
739 let saga_id = Uuid::new_v4();
740
741 // Without a store, should get empty results
742 let results = executor.execute_saga(saga_id).await;
743 assert!(results.is_ok());
744 }
745
746 #[tokio::test]
747 async fn test_execute_all_steps_sequentially() {
748 // Verify that steps are executed in order
749 let executor = SagaExecutor::new();
750 let saga_id = Uuid::new_v4();
751
752 // Execute multiple steps
753 for step_num in 1..=3 {
754 let result = executor
755 .execute_step(
756 saga_id,
757 step_num,
758 "testMutation",
759 &serde_json::json!({}),
760 "test-service",
761 )
762 .await;
763
764 assert!(result.is_ok());
765 let step_result = result.unwrap();
766 assert_eq!(step_result.step_number, step_num);
767 assert!(step_result.success);
768 }
769 }
770
771 #[tokio::test]
772 async fn test_saga_maintains_step_order() {
773 // Verify that saga execution maintains step order
774 let executor = SagaExecutor::new();
775 let saga_id = Uuid::new_v4();
776
777 let mut results = vec![];
778 for step_num in 1..=3 {
779 let result = executor
780 .execute_step(saga_id, step_num, "mutation", &serde_json::json!({}), "service")
781 .await;
782
783 if let Ok(step_result) = result {
784 results.push(step_result);
785 }
786 }
787
788 // Verify order is maintained
789 for (i, result) in results.iter().enumerate() {
790 assert_eq!(result.step_number, (i + 1) as u32);
791 }
792 }
793
794 #[tokio::test]
795 async fn test_get_execution_state_without_store() {
796 // Verify get_execution_state works without store
797 let executor = SagaExecutor::new();
798 let saga_id = Uuid::new_v4();
799 let state = executor.get_execution_state(saga_id).await;
800
801 assert!(state.is_ok());
802 let execution_state = state.unwrap();
803 assert_eq!(execution_state.saga_id, saga_id);
804 assert_eq!(execution_state.total_steps, 0);
805 assert_eq!(execution_state.completed_steps, 0);
806 assert!(!execution_state.failed);
807 }
808
809 #[tokio::test]
810 async fn test_execution_state_tracks_progress() {
811 // Verify that execution state can track progress
812 let executor = SagaExecutor::new();
813 let saga_id = Uuid::new_v4();
814
815 // Execute some steps
816 for step_num in 1..=2 {
817 let _ = executor
818 .execute_step(saga_id, step_num, "mutation", &serde_json::json!({}), "service")
819 .await;
820 }
821
822 // Get execution state
823 let state = executor.get_execution_state(saga_id).await;
824 assert!(state.is_ok());
825 }
826
827 #[tokio::test]
828 async fn test_step_execution_captures_success_in_result() {
829 // Verify that successful step execution captures data
830 let executor = SagaExecutor::new();
831 let saga_id = Uuid::new_v4();
832
833 let result = executor
834 .execute_step(saga_id, 1, "createOrder", &serde_json::json!({}), "orders-service")
835 .await;
836
837 assert!(result.is_ok());
838 let step_result = result.unwrap();
839 assert!(step_result.success);
840 assert!(step_result.data.is_some());
841 assert!(step_result.error.is_none());
842 }
843
844 #[tokio::test]
845 async fn test_step_failure_detected() {
846 // Verify that step failure is detected and captured
847 // Tests limited scenario without a backing store. Full failure testing
848 // requires mutation executor integration.
849 let executor = SagaExecutor::new();
850 let saga_id = Uuid::new_v4();
851
852 let result = executor
853 .execute_step(saga_id, 1, "mutation", &serde_json::json!({}), "service")
854 .await;
855
856 assert!(result.is_ok());
857 // Success case without store - actual failure testing requires mutation executor
858 // integration
859 }
860
861 #[tokio::test]
862 async fn test_execution_result_includes_metrics() {
863 // Verify that execution results include timing metrics
864 let executor = SagaExecutor::new();
865 let saga_id = Uuid::new_v4();
866
867 let result = executor
868 .execute_step(saga_id, 1, "mutation", &serde_json::json!({}), "service")
869 .await;
870
871 assert!(result.is_ok());
872 let step_result = result.unwrap();
873 // Verify that duration is measured
874 let _ = step_result.duration_ms;
875 }
876
877 #[tokio::test]
878 async fn test_pre_fetch_requires_fields() {
879 let executor = SagaExecutor::new();
880 let saga_id = Uuid::new_v4();
881
882 let requires_fields = executor.pre_fetch_requires_fields(saga_id, 1).await;
883
884 assert!(requires_fields.is_ok());
885 let fields = requires_fields.unwrap();
886 assert_eq!(fields, serde_json::json!({}));
887 }
888
889 #[test]
890 fn test_augment_entity_with_requires() {
891 let executor = SagaExecutor::new();
892
893 let entity = serde_json::json!({
894 "id": "user-123",
895 "name": "Alice"
896 });
897
898 let requires = serde_json::json!({
899 "email": "alice@example.com",
900 "role": "admin"
901 });
902
903 let result = executor.augment_entity_with_requires(entity, requires);
904
905 // Verify augmented entity contains both original and @requires fields
906 assert_eq!(result.get("id").and_then(|v| v.as_str()), Some("user-123"));
907 assert_eq!(result.get("name").and_then(|v| v.as_str()), Some("Alice"));
908 assert_eq!(result.get("email").and_then(|v| v.as_str()), Some("alice@example.com"));
909 assert_eq!(result.get("role").and_then(|v| v.as_str()), Some("admin"));
910 }
911
912 #[test]
913 fn test_augment_entity_preserves_original_fields() {
914 let executor = SagaExecutor::new();
915
916 let entity = serde_json::json!({
917 "id": "product-456",
918 "price": 99.99
919 });
920
921 let requires = serde_json::json!({
922 "category": "electronics"
923 });
924
925 let result = executor.augment_entity_with_requires(entity, requires);
926
927 assert_eq!(result.get("id").and_then(|v| v.as_str()), Some("product-456"));
928 assert_eq!(result.get("price").and_then(|v| v.as_f64()), Some(99.99));
929 assert_eq!(result.get("category").and_then(|v| v.as_str()), Some("electronics"));
930 }
931
932 #[test]
933 fn test_augment_entity_overwrites_conflicting_fields() {
934 let executor = SagaExecutor::new();
935
936 let entity = serde_json::json!({
937 "id": "user-123",
938 "status": "inactive"
939 });
940
941 let requires = serde_json::json!({
942 "status": "active"
943 });
944
945 let result = executor.augment_entity_with_requires(entity, requires);
946
947 // @requires should overwrite original value
948 assert_eq!(result.get("status").and_then(|v| v.as_str()), Some("active"));
949 }
950
951 #[test]
952 fn test_augment_entity_with_empty_requires() {
953 let executor = SagaExecutor::new();
954
955 let entity = serde_json::json!({
956 "id": "test-123"
957 });
958
959 let requires = serde_json::json!({});
960
961 let result = executor.augment_entity_with_requires(entity, requires);
962
963 // Should return entity unchanged
964 assert_eq!(result.get("id").and_then(|v| v.as_str()), Some("test-123"));
965 }
966}