Skip to main content

durable_execution_sdk/state/
execution_state.rs

1//! Main execution state management.
2//!
3//! This module provides the [`ExecutionState`] struct which manages the execution
4//! state for a durable execution, including checkpoint tracking, replay logic,
5//! and operation state management.
6
7use std::collections::{HashMap, HashSet};
8use std::sync::atomic::{AtomicU8, Ordering};
9use std::sync::Arc;
10use std::time::Duration as StdDuration;
11
12use tokio::sync::{Mutex, RwLock};
13
14use crate::client::SharedDurableServiceClient;
15use crate::error::{DurableError, ErrorObject};
16use crate::operation::{Operation, OperationStatus, OperationType, OperationUpdate};
17use crate::types::ExecutionArn;
18
19use super::batcher::{
20    create_checkpoint_queue, CheckpointBatcher, CheckpointBatcherConfig, CheckpointSender,
21};
22use super::checkpoint_result::CheckpointedResult;
23use super::replay_status::ReplayStatus;
24
25/// Manages the execution state for a durable execution.
26///
27/// This struct tracks all checkpointed operations, handles replay logic,
28/// and manages communication with the Lambda durable execution service.
29pub struct ExecutionState {
30    /// The ARN of the durable execution
31    durable_execution_arn: String,
32
33    /// The current checkpoint token (updated after each checkpoint batch)
34    checkpoint_token: Arc<RwLock<String>>,
35
36    /// Map of operation_id to Operation for quick lookup during replay
37    operations: RwLock<HashMap<String, Operation>>,
38
39    /// The service client for communicating with Lambda
40    service_client: SharedDurableServiceClient,
41
42    /// Current replay status (Replay or New)
43    replay_status: AtomicU8,
44
45    /// Set of operation IDs that have been replayed (for tracking replay progress)
46    replayed_operations: RwLock<HashSet<String>>,
47
48    /// Marker for pagination when loading additional operations
49    next_marker: RwLock<Option<String>>,
50
51    /// Set of parent operation IDs that have completed (for orphan prevention)
52    parent_done_lock: Mutex<HashSet<String>>,
53
54    /// Optional checkpoint sender for batched checkpointing
55    checkpoint_sender: Option<CheckpointSender>,
56
57    /// The EXECUTION operation (first operation in state) - provides access to original input
58    /// Requirements: 19.1, 19.2
59    execution_operation: Option<Operation>,
60
61    /// The checkpointing mode that controls the trade-off between durability and performance.
62    ///
63    /// # Requirements
64    ///
65    /// - 24.1: THE Performance_Configuration SHALL support eager checkpointing mode
66    /// - 24.2: THE Performance_Configuration SHALL support batched checkpointing mode
67    /// - 24.3: THE Performance_Configuration SHALL support optimistic execution mode
68    /// - 24.4: THE Performance_Configuration SHALL document the default behavior and trade-offs
69    checkpointing_mode: crate::config::CheckpointingMode,
70}
71
72impl ExecutionState {
73    /// Creates a new ExecutionState from the Lambda invocation input.
74    ///
75    /// # Arguments
76    ///
77    /// * `durable_execution_arn` - The ARN of the durable execution
78    /// * `checkpoint_token` - The initial checkpoint token
79    /// * `initial_state` - The initial execution state with operations
80    /// * `service_client` - The service client for Lambda communication
81    ///
82    /// # Requirements
83    ///
84    /// - 19.1: THE EXECUTION_Operation SHALL be recognized as the first operation in state
85    /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
86    pub fn new(
87        durable_execution_arn: impl Into<String>,
88        checkpoint_token: impl Into<String>,
89        initial_state: crate::lambda::InitialExecutionState,
90        service_client: SharedDurableServiceClient,
91    ) -> Self {
92        Self::with_checkpointing_mode(
93            durable_execution_arn,
94            checkpoint_token,
95            initial_state,
96            service_client,
97            crate::config::CheckpointingMode::default(),
98        )
99    }
100
101    /// Creates a new ExecutionState with a specific checkpointing mode.
102    ///
103    /// # Requirements
104    ///
105    /// - 19.1: THE EXECUTION_Operation SHALL be recognized as the first operation in state
106    /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
107    /// - 24.1: THE Performance_Configuration SHALL support eager checkpointing mode
108    /// - 24.2: THE Performance_Configuration SHALL support batched checkpointing mode
109    /// - 24.3: THE Performance_Configuration SHALL support optimistic execution mode
110    pub fn with_checkpointing_mode(
111        durable_execution_arn: impl Into<String>,
112        checkpoint_token: impl Into<String>,
113        initial_state: crate::lambda::InitialExecutionState,
114        service_client: SharedDurableServiceClient,
115        checkpointing_mode: crate::config::CheckpointingMode,
116    ) -> Self {
117        // Find and extract the EXECUTION operation (first operation of type EXECUTION)
118        let execution_operation = initial_state
119            .operations
120            .iter()
121            .find(|op| op.operation_type == OperationType::Execution)
122            .cloned();
123
124        // Build the operations map from the initial state
125        let operations: HashMap<String, Operation> = initial_state
126            .operations
127            .into_iter()
128            .map(|op| (op.operation_id.clone(), op))
129            .collect();
130
131        // Determine initial replay status based on whether we have operations
132        let replay_status = if operations.is_empty() {
133            ReplayStatus::New
134        } else {
135            ReplayStatus::Replay
136        };
137
138        Self {
139            durable_execution_arn: durable_execution_arn.into(),
140            checkpoint_token: Arc::new(RwLock::new(checkpoint_token.into())),
141            operations: RwLock::new(operations),
142            service_client,
143            replay_status: AtomicU8::new(replay_status as u8),
144            replayed_operations: RwLock::new(HashSet::new()),
145            next_marker: RwLock::new(initial_state.next_marker),
146            parent_done_lock: Mutex::new(HashSet::new()),
147            checkpoint_sender: None,
148            execution_operation,
149            checkpointing_mode,
150        }
151    }
152
153    /// Creates a new ExecutionState with a checkpoint batcher.
154    ///
155    /// This method sets up the checkpoint queue and batcher for efficient
156    /// batched checkpointing. Returns the ExecutionState and a handle to
157    /// the batcher that should be run in a background task.
158    ///
159    /// # Requirements
160    ///
161    /// - 19.1: THE EXECUTION_Operation SHALL be recognized as the first operation in state
162    /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
163    pub fn with_batcher(
164        durable_execution_arn: impl Into<String>,
165        checkpoint_token: impl Into<String>,
166        initial_state: crate::lambda::InitialExecutionState,
167        service_client: SharedDurableServiceClient,
168        batcher_config: CheckpointBatcherConfig,
169        queue_buffer_size: usize,
170    ) -> (Self, CheckpointBatcher) {
171        Self::with_batcher_and_mode(
172            durable_execution_arn,
173            checkpoint_token,
174            initial_state,
175            service_client,
176            batcher_config,
177            queue_buffer_size,
178            crate::config::CheckpointingMode::default(),
179        )
180    }
181
182    /// Creates a new ExecutionState with a checkpoint batcher and specific checkpointing mode.
183    ///
184    /// # Requirements
185    ///
186    /// - 19.1: THE EXECUTION_Operation SHALL be recognized as the first operation in state
187    /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
188    /// - 24.1: THE Performance_Configuration SHALL support eager checkpointing mode
189    /// - 24.2: THE Performance_Configuration SHALL support batched checkpointing mode
190    /// - 24.3: THE Performance_Configuration SHALL support optimistic execution mode
191    pub fn with_batcher_and_mode(
192        durable_execution_arn: impl Into<String>,
193        checkpoint_token: impl Into<String>,
194        initial_state: crate::lambda::InitialExecutionState,
195        service_client: SharedDurableServiceClient,
196        batcher_config: CheckpointBatcherConfig,
197        queue_buffer_size: usize,
198        checkpointing_mode: crate::config::CheckpointingMode,
199    ) -> (Self, CheckpointBatcher) {
200        let arn: String = durable_execution_arn.into();
201        let token: String = checkpoint_token.into();
202
203        // Find and extract the EXECUTION operation
204        let execution_operation = initial_state
205            .operations
206            .iter()
207            .find(|op| op.operation_type == OperationType::Execution)
208            .cloned();
209
210        // Build the operations map from the initial state
211        let operations: HashMap<String, Operation> = initial_state
212            .operations
213            .into_iter()
214            .map(|op| (op.operation_id.clone(), op))
215            .collect();
216
217        // Determine initial replay status based on whether we have operations
218        let replay_status = if operations.is_empty() {
219            ReplayStatus::New
220        } else {
221            ReplayStatus::Replay
222        };
223
224        // Create the checkpoint queue
225        let (sender, rx) = create_checkpoint_queue(queue_buffer_size);
226        let checkpoint_token = Arc::new(RwLock::new(token));
227
228        // Create the batcher
229        let batcher = CheckpointBatcher::new(
230            batcher_config,
231            rx,
232            service_client.clone(),
233            arn.clone(),
234            checkpoint_token.clone(),
235        );
236
237        let state = Self {
238            durable_execution_arn: arn,
239            checkpoint_token,
240            operations: RwLock::new(operations),
241            service_client,
242            replay_status: AtomicU8::new(replay_status as u8),
243            replayed_operations: RwLock::new(HashSet::new()),
244            next_marker: RwLock::new(initial_state.next_marker),
245            parent_done_lock: Mutex::new(HashSet::new()),
246            checkpoint_sender: Some(sender),
247            execution_operation,
248            checkpointing_mode,
249        };
250
251        (state, batcher)
252    }
253
254    /// Returns the durable execution ARN.
255    pub fn durable_execution_arn(&self) -> &str {
256        &self.durable_execution_arn
257    }
258
259    /// Returns the durable execution ARN as an `ExecutionArn` newtype.
260    #[inline]
261    pub fn durable_execution_arn_typed(&self) -> ExecutionArn {
262        ExecutionArn::from(self.durable_execution_arn.clone())
263    }
264
265    /// Returns the current checkpoint token.
266    pub async fn checkpoint_token(&self) -> String {
267        self.checkpoint_token.read().await.clone()
268    }
269
270    /// Updates the checkpoint token after a successful checkpoint.
271    pub async fn set_checkpoint_token(&self, token: impl Into<String>) {
272        let mut guard = self.checkpoint_token.write().await;
273        *guard = token.into();
274    }
275
276    /// Returns the current replay status.
277    ///
278    /// # Memory Ordering
279    ///
280    /// Uses `Ordering::Acquire` to ensure that when we read the replay status,
281    /// we also see all the operations that were replayed before the status was
282    /// set to `New`. This creates a happens-before relationship with the
283    /// `Release` store in `track_replay`.
284    ///
285    /// Requirements: 4.2, 4.3, 4.6
286    pub fn replay_status(&self) -> ReplayStatus {
287        // Acquire ordering ensures we see all writes that happened before
288        // the corresponding Release store that set this value.
289        ReplayStatus::from(self.replay_status.load(Ordering::Acquire))
290    }
291
292    /// Returns true if currently in replay mode.
293    pub fn is_replay(&self) -> bool {
294        self.replay_status().is_replay()
295    }
296
297    /// Returns true if executing new operations.
298    pub fn is_new(&self) -> bool {
299        self.replay_status().is_new()
300    }
301
302    /// Returns the current checkpointing mode.
303    pub fn checkpointing_mode(&self) -> crate::config::CheckpointingMode {
304        self.checkpointing_mode
305    }
306
307    /// Returns true if eager checkpointing mode is enabled.
308    pub fn is_eager_checkpointing(&self) -> bool {
309        self.checkpointing_mode.is_eager()
310    }
311
312    /// Returns true if batched checkpointing mode is enabled.
313    pub fn is_batched_checkpointing(&self) -> bool {
314        self.checkpointing_mode.is_batched()
315    }
316
317    /// Returns true if optimistic checkpointing mode is enabled.
318    pub fn is_optimistic_checkpointing(&self) -> bool {
319        self.checkpointing_mode.is_optimistic()
320    }
321
322    /// Returns a reference to the EXECUTION operation if it exists.
323    ///
324    /// # Requirements
325    ///
326    /// - 19.1: THE EXECUTION_Operation SHALL be recognized as the first operation in state
327    pub fn execution_operation(&self) -> Option<&Operation> {
328        self.execution_operation.as_ref()
329    }
330
331    /// Returns the original user input from the EXECUTION operation.
332    ///
333    /// # Requirements
334    ///
335    /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
336    pub fn get_original_input_raw(&self) -> Option<&str> {
337        self.execution_operation
338            .as_ref()
339            .and_then(|op| op.execution_details.as_ref())
340            .and_then(|details| details.input_payload.as_deref())
341    }
342
343    /// Returns the EXECUTION operation's ID if it exists.
344    pub fn execution_operation_id(&self) -> Option<&str> {
345        self.execution_operation
346            .as_ref()
347            .map(|op| op.operation_id.as_str())
348    }
349
350    /// Completes the execution with a successful result via checkpointing.
351    ///
352    /// # Requirements
353    ///
354    /// - 19.3: THE EXECUTION_Operation SHALL support completing execution via SUCCEED action with result
355    /// - 19.5: WHEN execution result exceeds response size limits, THE EXECUTION_Operation SHALL checkpoint the result and return empty Result field
356    pub async fn complete_execution_success(
357        &self,
358        result: Option<String>,
359    ) -> Result<(), DurableError> {
360        let execution_id =
361            self.execution_operation_id()
362                .ok_or_else(|| DurableError::Validation {
363                    message: "Cannot complete execution: no EXECUTION operation exists".to_string(),
364                })?;
365
366        let update = OperationUpdate::succeed(execution_id, OperationType::Execution, result);
367
368        self.create_checkpoint(update, true).await
369    }
370
371    /// Completes the execution with a failure via checkpointing.
372    ///
373    /// # Requirements
374    ///
375    /// - 19.4: THE EXECUTION_Operation SHALL support completing execution via FAIL action with error
376    pub async fn complete_execution_failure(&self, error: ErrorObject) -> Result<(), DurableError> {
377        let execution_id =
378            self.execution_operation_id()
379                .ok_or_else(|| DurableError::Validation {
380                    message: "Cannot complete execution: no EXECUTION operation exists".to_string(),
381                })?;
382
383        let update = OperationUpdate::fail(execution_id, OperationType::Execution, error);
384
385        self.create_checkpoint(update, true).await
386    }
387
388    /// Gets the checkpoint result for an operation.
389    pub async fn get_checkpoint_result(&self, operation_id: &str) -> CheckpointedResult {
390        let operations = self.operations.read().await;
391        CheckpointedResult::new(operations.get(operation_id).cloned())
392    }
393
394    /// Tracks that an operation has been replayed.
395    pub async fn track_replay(&self, operation_id: &str) {
396        {
397            let mut replayed = self.replayed_operations.write().await;
398            replayed.insert(operation_id.to_string());
399        }
400
401        let (replayed_count, total_count) = {
402            let replayed = self.replayed_operations.read().await;
403            let operations = self.operations.read().await;
404            (replayed.len(), operations.len())
405        };
406
407        if replayed_count >= total_count {
408            let has_more = self.next_marker.read().await.is_some();
409            if !has_more {
410                // Release ordering ensures that all the replay tracking writes
411                // (replayed_operations updates) are visible to any thread that
412                // subsequently reads the replay_status with Acquire ordering.
413                // This establishes a happens-before relationship.
414                //
415                // Requirements: 4.2, 4.3, 4.6
416                self.replay_status
417                    .store(ReplayStatus::New as u8, Ordering::Release);
418            }
419        }
420    }
421
422    /// Loads additional operations from the service for pagination.
423    ///
424    /// # Requirements
425    ///
426    /// - 18.5: THE AWS_Integration SHALL handle ThrottlingException with appropriate retry behavior
427    pub async fn load_more_operations(&self) -> Result<bool, DurableError> {
428        let marker = {
429            let guard = self.next_marker.read().await;
430            match guard.as_ref() {
431                Some(m) => m.clone(),
432                None => return Ok(false),
433            }
434        };
435
436        let response = self.get_operations_with_retry(&marker).await?;
437
438        {
439            let mut operations = self.operations.write().await;
440            for op in response.operations {
441                operations.insert(op.operation_id.clone(), op);
442            }
443        }
444
445        {
446            let mut next_marker = self.next_marker.write().await;
447            *next_marker = response.next_marker;
448        }
449
450        Ok(true)
451    }
452
453    /// Fetches operations from the service with retry for throttling errors.
454    async fn get_operations_with_retry(
455        &self,
456        marker: &str,
457    ) -> Result<crate::client::GetOperationsResponse, DurableError> {
458        const MAX_RETRIES: u32 = 5;
459        const INITIAL_DELAY_MS: u64 = 100;
460        const MAX_DELAY_MS: u64 = 10_000;
461        const BACKOFF_MULTIPLIER: u64 = 2;
462
463        let mut attempt = 0;
464        let mut delay_ms = INITIAL_DELAY_MS;
465
466        loop {
467            let result = self
468                .service_client
469                .get_operations(&self.durable_execution_arn, marker)
470                .await;
471
472            match result {
473                Ok(response) => return Ok(response),
474                Err(error) if error.is_throttling() => {
475                    attempt += 1;
476                    if attempt > MAX_RETRIES {
477                        tracing::warn!(
478                            attempt = attempt,
479                            "GetOperations throttling: max retries exceeded"
480                        );
481                        return Err(error);
482                    }
483
484                    let actual_delay = error.get_retry_after_ms().unwrap_or(delay_ms);
485                    tracing::debug!(
486                        attempt = attempt,
487                        delay_ms = actual_delay,
488                        "GetOperations throttled, retrying"
489                    );
490                    tokio::time::sleep(StdDuration::from_millis(actual_delay)).await;
491                    delay_ms = (delay_ms * BACKOFF_MULTIPLIER).min(MAX_DELAY_MS);
492                }
493                Err(error) => return Err(error),
494            }
495        }
496    }
497
498    /// Loads all remaining operations from the service.
499    pub async fn load_all_operations(&self) -> Result<(), DurableError> {
500        while self.load_more_operations().await? {}
501        Ok(())
502    }
503
504    /// Returns true if there are more operations to load.
505    pub async fn has_more_operations(&self) -> bool {
506        self.next_marker.read().await.is_some()
507    }
508
509    /// Returns the number of loaded operations.
510    pub async fn operation_count(&self) -> usize {
511        self.operations.read().await.len()
512    }
513
514    /// Returns a reference to the service client.
515    pub fn service_client(&self) -> &SharedDurableServiceClient {
516        &self.service_client
517    }
518
519    /// Adds an operation to the local cache.
520    pub async fn add_operation(&self, operation: Operation) {
521        let mut operations = self.operations.write().await;
522        operations.insert(operation.operation_id.clone(), operation);
523    }
524
525    /// Updates an existing operation in the local cache.
526    pub async fn update_operation(
527        &self,
528        operation_id: &str,
529        update_fn: impl FnOnce(&mut Operation),
530    ) {
531        let mut operations = self.operations.write().await;
532        if let Some(op) = operations.get_mut(operation_id) {
533            update_fn(op);
534        }
535    }
536
537    /// Checks if an operation exists in the local cache.
538    pub async fn has_operation(&self, operation_id: &str) -> bool {
539        self.operations.read().await.contains_key(operation_id)
540    }
541
542    /// Gets a clone of an operation from the local cache.
543    pub async fn get_operation(&self, operation_id: &str) -> Option<Operation> {
544        self.operations.read().await.get(operation_id).cloned()
545    }
546
547    /// Marks a parent operation as done.
548    pub async fn mark_parent_done(&self, parent_id: &str) {
549        let mut done_parents = self.parent_done_lock.lock().await;
550        done_parents.insert(parent_id.to_string());
551    }
552
553    /// Checks if a parent operation has been marked as done.
554    pub async fn is_parent_done(&self, parent_id: &str) -> bool {
555        let done_parents = self.parent_done_lock.lock().await;
556        done_parents.contains(parent_id)
557    }
558
559    /// Checks if an operation would be orphaned.
560    pub async fn is_orphaned(&self, parent_id: Option<&str>) -> bool {
561        match parent_id {
562            Some(pid) => self.is_parent_done(pid).await,
563            None => false,
564        }
565    }
566
567    /// Creates a checkpoint for an operation.
568    ///
569    /// # Requirements
570    ///
571    /// - 24.1: THE Performance_Configuration SHALL support eager checkpointing mode
572    /// - 24.2: THE Performance_Configuration SHALL support batched checkpointing mode
573    /// - 24.3: THE Performance_Configuration SHALL support optimistic execution mode
574    pub async fn create_checkpoint(
575        &self,
576        operation: OperationUpdate,
577        is_sync: bool,
578    ) -> Result<(), DurableError> {
579        // Check for orphaned child
580        if let Some(ref parent_id) = operation.parent_id {
581            if self.is_parent_done(parent_id).await {
582                return Err(DurableError::OrphanedChild {
583                    message: format!(
584                        "Cannot checkpoint operation {} - parent {} has completed",
585                        operation.operation_id, parent_id
586                    ),
587                    operation_id: operation.operation_id.clone(),
588                });
589            }
590        }
591
592        // Determine effective sync behavior based on checkpointing mode
593        let effective_is_sync = match self.checkpointing_mode {
594            crate::config::CheckpointingMode::Eager => true,
595            crate::config::CheckpointingMode::Batched => is_sync,
596            crate::config::CheckpointingMode::Optimistic => is_sync,
597        };
598
599        // In Eager mode, bypass the batcher and send directly
600        if self.checkpointing_mode.is_eager() {
601            return self.checkpoint_direct(operation, effective_is_sync).await;
602        }
603
604        // Use the checkpoint sender if available (batched mode)
605        if let Some(ref sender) = self.checkpoint_sender {
606            let result = sender
607                .checkpoint(operation.clone(), effective_is_sync)
608                .await;
609            if result.is_ok() {
610                self.update_local_cache_from_update(&operation).await;
611            }
612            return result;
613        }
614
615        // Direct checkpoint (non-batched mode)
616        self.checkpoint_direct(operation, effective_is_sync).await
617    }
618
619    /// Sends a checkpoint directly to the service, bypassing the batcher.
620    async fn checkpoint_direct(
621        &self,
622        operation: OperationUpdate,
623        _is_sync: bool,
624    ) -> Result<(), DurableError> {
625        let token = self.checkpoint_token.read().await.clone();
626        let response = self
627            .service_client
628            .checkpoint(&self.durable_execution_arn, &token, vec![operation.clone()])
629            .await?;
630
631        {
632            let mut token_guard = self.checkpoint_token.write().await;
633            *token_guard = response.checkpoint_token;
634        }
635
636        // Update local cache from the response's NewExecutionState if available
637        if let Some(ref new_state) = response.new_execution_state {
638            self.update_local_cache_from_response(new_state).await;
639        } else {
640            self.update_local_cache_from_update(&operation).await;
641        }
642        Ok(())
643    }
644
645    /// Creates a checkpoint and returns the full response including NewExecutionState.
646    /// This is useful for operations like CALLBACK that need service-generated values.
647    pub async fn create_checkpoint_with_response(
648        &self,
649        operation: OperationUpdate,
650    ) -> Result<crate::client::CheckpointResponse, DurableError> {
651        // Check for orphaned child
652        if let Some(ref parent_id) = operation.parent_id {
653            if self.is_parent_done(parent_id).await {
654                return Err(DurableError::OrphanedChild {
655                    message: format!(
656                        "Cannot checkpoint operation {} - parent {} has completed",
657                        operation.operation_id, parent_id
658                    ),
659                    operation_id: operation.operation_id.clone(),
660                });
661            }
662        }
663
664        let token = self.checkpoint_token.read().await.clone();
665        let response = self
666            .service_client
667            .checkpoint(&self.durable_execution_arn, &token, vec![operation.clone()])
668            .await?;
669
670        tracing::debug!(
671            has_new_state = response.new_execution_state.is_some(),
672            num_operations = response
673                .new_execution_state
674                .as_ref()
675                .map(|s| s.operations.len())
676                .unwrap_or(0),
677            "Checkpoint response received"
678        );
679
680        {
681            let mut token_guard = self.checkpoint_token.write().await;
682            *token_guard = response.checkpoint_token.clone();
683        }
684
685        // Update local cache from the response's NewExecutionState if available
686        if let Some(ref new_state) = response.new_execution_state {
687            self.update_local_cache_from_response(new_state).await;
688        } else {
689            self.update_local_cache_from_update(&operation).await;
690        }
691
692        Ok(response)
693    }
694
695    /// Updates the local operation cache from a checkpoint response's NewExecutionState.
696    async fn update_local_cache_from_response(&self, new_state: &crate::client::NewExecutionState) {
697        let mut operations = self.operations.write().await;
698        for op in &new_state.operations {
699            operations.insert(op.operation_id.clone(), op.clone());
700        }
701    }
702
703    /// Creates a synchronous checkpoint (waits for confirmation).
704    pub async fn checkpoint_sync(&self, operation: OperationUpdate) -> Result<(), DurableError> {
705        self.create_checkpoint(operation, true).await
706    }
707
708    /// Creates an asynchronous checkpoint (fire-and-forget).
709    pub async fn checkpoint_async(&self, operation: OperationUpdate) -> Result<(), DurableError> {
710        self.create_checkpoint(operation, false).await
711    }
712
713    /// Creates a checkpoint using the optimal sync behavior for the current mode.
714    pub async fn checkpoint_optimal(
715        &self,
716        operation: OperationUpdate,
717        prefer_sync: bool,
718    ) -> Result<(), DurableError> {
719        let is_sync = match self.checkpointing_mode {
720            crate::config::CheckpointingMode::Eager => true,
721            crate::config::CheckpointingMode::Batched => prefer_sync,
722            crate::config::CheckpointingMode::Optimistic => prefer_sync,
723        };
724        self.create_checkpoint(operation, is_sync).await
725    }
726
727    /// Returns whether async checkpointing is recommended for the current mode.
728    pub fn should_use_async_checkpoint(&self) -> bool {
729        match self.checkpointing_mode {
730            crate::config::CheckpointingMode::Eager => false,
731            crate::config::CheckpointingMode::Batched => true,
732            crate::config::CheckpointingMode::Optimistic => true,
733        }
734    }
735
736    /// Returns whether the current mode prioritizes performance over durability.
737    pub fn prioritizes_performance(&self) -> bool {
738        self.checkpointing_mode.is_optimistic()
739    }
740
741    /// Returns whether the current mode prioritizes durability over performance.
742    pub fn prioritizes_durability(&self) -> bool {
743        self.checkpointing_mode.is_eager()
744    }
745
746    /// Updates the local operation cache based on an operation update.
747    async fn update_local_cache_from_update(&self, update: &OperationUpdate) {
748        let mut operations = self.operations.write().await;
749
750        match update.action {
751            crate::operation::OperationAction::Start => {
752                let mut op = Operation::new(&update.operation_id, update.operation_type);
753                op.parent_id = update.parent_id.clone();
754                op.name = update.name.clone();
755                operations.insert(update.operation_id.clone(), op);
756            }
757            crate::operation::OperationAction::Succeed => {
758                if let Some(op) = operations.get_mut(&update.operation_id) {
759                    op.status = OperationStatus::Succeeded;
760                    op.result = update.result.clone();
761                } else {
762                    let mut op = Operation::new(&update.operation_id, update.operation_type);
763                    op.status = OperationStatus::Succeeded;
764                    op.result = update.result.clone();
765                    op.parent_id = update.parent_id.clone();
766                    op.name = update.name.clone();
767                    operations.insert(update.operation_id.clone(), op);
768                }
769            }
770            crate::operation::OperationAction::Fail => {
771                if let Some(op) = operations.get_mut(&update.operation_id) {
772                    op.status = OperationStatus::Failed;
773                    op.error = update.error.clone();
774                } else {
775                    let mut op = Operation::new(&update.operation_id, update.operation_type);
776                    op.status = OperationStatus::Failed;
777                    op.error = update.error.clone();
778                    op.parent_id = update.parent_id.clone();
779                    op.name = update.name.clone();
780                    operations.insert(update.operation_id.clone(), op);
781                }
782            }
783            crate::operation::OperationAction::Cancel => {
784                if let Some(op) = operations.get_mut(&update.operation_id) {
785                    op.status = OperationStatus::Cancelled;
786                } else {
787                    let mut op = Operation::new(&update.operation_id, update.operation_type);
788                    op.status = OperationStatus::Cancelled;
789                    op.parent_id = update.parent_id.clone();
790                    op.name = update.name.clone();
791                    operations.insert(update.operation_id.clone(), op);
792                }
793            }
794            crate::operation::OperationAction::Retry => {
795                if let Some(op) = operations.get_mut(&update.operation_id) {
796                    op.status = OperationStatus::Pending;
797                    if update.result.is_some() || update.step_options.is_some() {
798                        let step_details =
799                            op.step_details
800                                .get_or_insert(crate::operation::StepDetails {
801                                    result: None,
802                                    attempt: None,
803                                    next_attempt_timestamp: None,
804                                    error: None,
805                                    payload: None,
806                                });
807                        if update.result.is_some() {
808                            step_details.payload = update.result.clone();
809                        }
810                        step_details.attempt = Some(step_details.attempt.unwrap_or(0) + 1);
811                    }
812                    if update.error.is_some() {
813                        op.error = update.error.clone();
814                    }
815                } else {
816                    let mut op = Operation::new(&update.operation_id, update.operation_type);
817                    op.status = OperationStatus::Pending;
818                    op.parent_id = update.parent_id.clone();
819                    op.name = update.name.clone();
820                    op.error = update.error.clone();
821                    if update.result.is_some() || update.step_options.is_some() {
822                        op.step_details = Some(crate::operation::StepDetails {
823                            result: None,
824                            attempt: Some(1),
825                            next_attempt_timestamp: None,
826                            error: None,
827                            payload: update.result.clone(),
828                        });
829                    }
830                    operations.insert(update.operation_id.clone(), op);
831                }
832            }
833        }
834    }
835
836    /// Returns a reference to the shared checkpoint token.
837    pub fn shared_checkpoint_token(&self) -> Arc<RwLock<String>> {
838        self.checkpoint_token.clone()
839    }
840
841    /// Returns true if this ExecutionState has a checkpoint sender configured.
842    pub fn has_checkpoint_sender(&self) -> bool {
843        self.checkpoint_sender.is_some()
844    }
845
846    /// Loads child operations for a specific parent operation.
847    ///
848    /// # Requirements
849    ///
850    /// - 10.5: THE Child_Context_Operation SHALL support ReplayChildren option for large parallel operations
851    /// - 10.6: WHEN ReplayChildren is true, THE Child_Context_Operation SHALL include child operations in state loads for replay
852    pub async fn load_child_operations(
853        &self,
854        parent_id: &str,
855    ) -> Result<Vec<Operation>, DurableError> {
856        let operations = self.operations.read().await;
857        let children: Vec<Operation> = operations
858            .values()
859            .filter(|op| op.parent_id.as_deref() == Some(parent_id))
860            .cloned()
861            .collect();
862
863        Ok(children)
864    }
865
866    /// Gets all child operations for a specific parent from the local cache.
867    pub async fn get_child_operations(&self, parent_id: &str) -> Vec<Operation> {
868        let operations = self.operations.read().await;
869        operations
870            .values()
871            .filter(|op| op.parent_id.as_deref() == Some(parent_id))
872            .cloned()
873            .collect()
874    }
875
876    /// Checks if a CONTEXT operation has ReplayChildren enabled.
877    ///
878    /// # Requirements
879    ///
880    /// - 10.5: THE Child_Context_Operation SHALL support ReplayChildren option for large parallel operations
881    pub async fn has_replay_children(&self, operation_id: &str) -> bool {
882        let operations = self.operations.read().await;
883        operations
884            .get(operation_id)
885            .filter(|op| op.operation_type == OperationType::Context)
886            .and_then(|op| op.context_details.as_ref())
887            .and_then(|details| details.replay_children)
888            .unwrap_or(false)
889    }
890}
891
892// Implement Debug manually since we can't derive it with async locks
893impl std::fmt::Debug for ExecutionState {
894    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
895        f.debug_struct("ExecutionState")
896            .field("durable_execution_arn", &self.durable_execution_arn)
897            .field("replay_status", &self.replay_status())
898            .finish_non_exhaustive()
899    }
900}
901
902#[cfg(test)]
903mod tests {
904    use super::*;
905    use std::sync::Arc;
906
907    use crate::client::{
908        CheckpointResponse, GetOperationsResponse, MockDurableServiceClient,
909        SharedDurableServiceClient,
910    };
911    use crate::error::ErrorObject;
912    use crate::lambda::InitialExecutionState;
913    use crate::operation::{
914        ContextDetails, ExecutionDetails, Operation, OperationStatus, OperationType,
915        OperationUpdate,
916    };
917
918    fn create_mock_client() -> SharedDurableServiceClient {
919        Arc::new(MockDurableServiceClient::new())
920    }
921
922    fn create_test_operation(id: &str, status: OperationStatus) -> Operation {
923        let mut op = Operation::new(id, OperationType::Step);
924        op.status = status;
925        op
926    }
927
928    fn create_execution_operation(input_payload: Option<&str>) -> Operation {
929        let mut op = Operation::new("exec-123", OperationType::Execution);
930        op.status = OperationStatus::Started;
931        op.execution_details = Some(ExecutionDetails {
932            input_payload: input_payload.map(|s| s.to_string()),
933        });
934        op
935    }
936
937    // Basic ExecutionState tests
938
939    #[tokio::test]
940    async fn test_execution_state_new_empty() {
941        let client = create_mock_client();
942        let state = ExecutionState::new(
943            "arn:test",
944            "token-123",
945            InitialExecutionState::new(),
946            client,
947        );
948
949        assert_eq!(state.durable_execution_arn(), "arn:test");
950        assert_eq!(state.checkpoint_token().await, "token-123");
951        assert!(state.is_new());
952        assert!(!state.is_replay());
953        assert_eq!(state.operation_count().await, 0);
954        assert!(!state.has_more_operations().await);
955    }
956
957    #[tokio::test]
958    async fn test_execution_state_new_with_operations() {
959        let client = create_mock_client();
960        let ops = vec![
961            create_test_operation("op-1", OperationStatus::Succeeded),
962            create_test_operation("op-2", OperationStatus::Succeeded),
963        ];
964        let initial_state = InitialExecutionState::with_operations(ops);
965
966        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
967
968        assert!(state.is_replay());
969        assert!(!state.is_new());
970        assert_eq!(state.operation_count().await, 2);
971    }
972
973    #[tokio::test]
974    async fn test_get_checkpoint_result_exists() {
975        let client = create_mock_client();
976        let mut op = create_test_operation("op-1", OperationStatus::Succeeded);
977        op.result = Some(r#"{"value": 42}"#.to_string());
978        let initial_state = InitialExecutionState::with_operations(vec![op]);
979
980        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
981
982        let result = state.get_checkpoint_result("op-1").await;
983        assert!(result.is_existent());
984        assert!(result.is_succeeded());
985        assert_eq!(result.result(), Some(r#"{"value": 42}"#));
986    }
987
988    #[tokio::test]
989    async fn test_get_checkpoint_result_not_exists() {
990        let client = create_mock_client();
991        let state = ExecutionState::new(
992            "arn:test",
993            "token-123",
994            InitialExecutionState::new(),
995            client,
996        );
997
998        let result = state.get_checkpoint_result("non-existent").await;
999        assert!(!result.is_existent());
1000    }
1001
1002    #[tokio::test]
1003    async fn test_track_replay_transitions_to_new() {
1004        let client = create_mock_client();
1005        let ops = vec![
1006            create_test_operation("op-1", OperationStatus::Succeeded),
1007            create_test_operation("op-2", OperationStatus::Succeeded),
1008        ];
1009        let initial_state = InitialExecutionState::with_operations(ops);
1010
1011        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1012
1013        assert!(state.is_replay());
1014        state.track_replay("op-1").await;
1015        assert!(state.is_replay());
1016        state.track_replay("op-2").await;
1017        assert!(state.is_new());
1018    }
1019
1020    #[tokio::test]
1021    async fn test_track_replay_with_pagination() {
1022        let client = Arc::new(
1023            MockDurableServiceClient::new().with_get_operations_response(Ok(
1024                GetOperationsResponse {
1025                    operations: vec![create_test_operation("op-3", OperationStatus::Succeeded)],
1026                    next_marker: None,
1027                },
1028            )),
1029        );
1030
1031        let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1032        let mut initial_state = InitialExecutionState::with_operations(ops);
1033        initial_state.next_marker = Some("marker-1".to_string());
1034
1035        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1036
1037        state.track_replay("op-1").await;
1038        assert!(state.is_replay());
1039    }
1040
1041    #[tokio::test]
1042    async fn test_set_checkpoint_token() {
1043        let client = create_mock_client();
1044        let state = ExecutionState::new(
1045            "arn:test",
1046            "token-123",
1047            InitialExecutionState::new(),
1048            client,
1049        );
1050
1051        assert_eq!(state.checkpoint_token().await, "token-123");
1052        state.set_checkpoint_token("token-456").await;
1053        assert_eq!(state.checkpoint_token().await, "token-456");
1054    }
1055
1056    #[tokio::test]
1057    async fn test_add_operation() {
1058        let client = create_mock_client();
1059        let state = ExecutionState::new(
1060            "arn:test",
1061            "token-123",
1062            InitialExecutionState::new(),
1063            client,
1064        );
1065
1066        assert!(!state.has_operation("op-1").await);
1067
1068        let op = create_test_operation("op-1", OperationStatus::Succeeded);
1069        state.add_operation(op).await;
1070
1071        assert!(state.has_operation("op-1").await);
1072        assert_eq!(state.operation_count().await, 1);
1073    }
1074
1075    #[tokio::test]
1076    async fn test_update_operation() {
1077        let client = create_mock_client();
1078        let ops = vec![create_test_operation("op-1", OperationStatus::Started)];
1079        let initial_state = InitialExecutionState::with_operations(ops);
1080
1081        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1082
1083        let op = state.get_operation("op-1").await.unwrap();
1084        assert_eq!(op.status, OperationStatus::Started);
1085
1086        state
1087            .update_operation("op-1", |op| {
1088                op.status = OperationStatus::Succeeded;
1089                op.result = Some("done".to_string());
1090            })
1091            .await;
1092
1093        let op = state.get_operation("op-1").await.unwrap();
1094        assert_eq!(op.status, OperationStatus::Succeeded);
1095        assert_eq!(op.result, Some("done".to_string()));
1096    }
1097
1098    #[tokio::test]
1099    async fn test_load_more_operations() {
1100        let client = Arc::new(
1101            MockDurableServiceClient::new().with_get_operations_response(Ok(
1102                GetOperationsResponse {
1103                    operations: vec![
1104                        create_test_operation("op-2", OperationStatus::Succeeded),
1105                        create_test_operation("op-3", OperationStatus::Succeeded),
1106                    ],
1107                    next_marker: None,
1108                },
1109            )),
1110        );
1111
1112        let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1113        let mut initial_state = InitialExecutionState::with_operations(ops);
1114        initial_state.next_marker = Some("marker-1".to_string());
1115
1116        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1117
1118        assert_eq!(state.operation_count().await, 1);
1119        assert!(state.has_more_operations().await);
1120
1121        let loaded = state.load_more_operations().await.unwrap();
1122        assert!(loaded);
1123
1124        assert_eq!(state.operation_count().await, 3);
1125        assert!(!state.has_more_operations().await);
1126    }
1127
1128    #[tokio::test]
1129    async fn test_load_more_operations_no_more() {
1130        let client = create_mock_client();
1131        let state = ExecutionState::new(
1132            "arn:test",
1133            "token-123",
1134            InitialExecutionState::new(),
1135            client,
1136        );
1137
1138        let loaded = state.load_more_operations().await.unwrap();
1139        assert!(!loaded);
1140    }
1141
1142    #[tokio::test]
1143    async fn test_load_all_operations() {
1144        let client = Arc::new(
1145            MockDurableServiceClient::new()
1146                .with_get_operations_response(Ok(GetOperationsResponse {
1147                    operations: vec![create_test_operation("op-2", OperationStatus::Succeeded)],
1148                    next_marker: Some("marker-2".to_string()),
1149                }))
1150                .with_get_operations_response(Ok(GetOperationsResponse {
1151                    operations: vec![create_test_operation("op-3", OperationStatus::Succeeded)],
1152                    next_marker: None,
1153                })),
1154        );
1155
1156        let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1157        let mut initial_state = InitialExecutionState::with_operations(ops);
1158        initial_state.next_marker = Some("marker-1".to_string());
1159
1160        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1161
1162        assert_eq!(state.operation_count().await, 1);
1163        state.load_all_operations().await.unwrap();
1164
1165        assert_eq!(state.operation_count().await, 3);
1166        assert!(!state.has_more_operations().await);
1167    }
1168
1169    #[tokio::test]
1170    async fn test_mark_parent_done() {
1171        let client = create_mock_client();
1172        let state = ExecutionState::new(
1173            "arn:test",
1174            "token-123",
1175            InitialExecutionState::new(),
1176            client,
1177        );
1178
1179        assert!(!state.is_parent_done("parent-1").await);
1180        state.mark_parent_done("parent-1").await;
1181        assert!(state.is_parent_done("parent-1").await);
1182        assert!(!state.is_parent_done("parent-2").await);
1183    }
1184
1185    #[tokio::test]
1186    async fn test_is_orphaned() {
1187        let client = create_mock_client();
1188        let state = ExecutionState::new(
1189            "arn:test",
1190            "token-123",
1191            InitialExecutionState::new(),
1192            client,
1193        );
1194
1195        assert!(!state.is_orphaned(None).await);
1196        assert!(!state.is_orphaned(Some("parent-1")).await);
1197
1198        state.mark_parent_done("parent-1").await;
1199
1200        assert!(state.is_orphaned(Some("parent-1")).await);
1201        assert!(!state.is_orphaned(Some("parent-2")).await);
1202    }
1203
1204    #[tokio::test]
1205    async fn test_debug_impl() {
1206        let client = create_mock_client();
1207        let state = ExecutionState::new(
1208            "arn:test",
1209            "token-123",
1210            InitialExecutionState::new(),
1211            client,
1212        );
1213
1214        let debug_str = format!("{:?}", state);
1215        assert!(debug_str.contains("ExecutionState"));
1216        assert!(debug_str.contains("arn:test"));
1217    }
1218
1219    #[tokio::test]
1220    async fn test_create_checkpoint_direct() {
1221        let client = Arc::new(
1222            MockDurableServiceClient::new()
1223                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1224        );
1225
1226        let state = ExecutionState::new(
1227            "arn:test",
1228            "token-123",
1229            InitialExecutionState::new(),
1230            client,
1231        );
1232
1233        let update = OperationUpdate::start("op-1", OperationType::Step);
1234        let result = state.create_checkpoint(update, true).await;
1235
1236        assert!(result.is_ok());
1237        assert_eq!(state.checkpoint_token().await, "new-token");
1238        assert!(state.has_operation("op-1").await);
1239    }
1240
1241    #[tokio::test]
1242    async fn test_create_checkpoint_updates_local_cache_on_succeed() {
1243        let client = Arc::new(
1244            MockDurableServiceClient::new()
1245                .with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
1246                .with_checkpoint_response(Ok(CheckpointResponse::new("token-2"))),
1247        );
1248
1249        let state = ExecutionState::new(
1250            "arn:test",
1251            "token-123",
1252            InitialExecutionState::new(),
1253            client,
1254        );
1255
1256        let update = OperationUpdate::start("op-1", OperationType::Step);
1257        state.create_checkpoint(update, true).await.unwrap();
1258
1259        let op = state.get_operation("op-1").await.unwrap();
1260        assert_eq!(op.status, OperationStatus::Started);
1261
1262        let update = OperationUpdate::succeed(
1263            "op-1",
1264            OperationType::Step,
1265            Some(r#"{"result": "ok"}"#.to_string()),
1266        );
1267        state.create_checkpoint(update, true).await.unwrap();
1268
1269        let op = state.get_operation("op-1").await.unwrap();
1270        assert_eq!(op.status, OperationStatus::Succeeded);
1271        assert_eq!(op.result, Some(r#"{"result": "ok"}"#.to_string()));
1272    }
1273
1274    #[tokio::test]
1275    async fn test_create_checkpoint_rejects_orphaned_child() {
1276        let client = create_mock_client();
1277        let state = ExecutionState::new(
1278            "arn:test",
1279            "token-123",
1280            InitialExecutionState::new(),
1281            client,
1282        );
1283
1284        state.mark_parent_done("parent-1").await;
1285
1286        let update =
1287            OperationUpdate::start("child-1", OperationType::Step).with_parent_id("parent-1");
1288        let result = state.create_checkpoint(update, true).await;
1289
1290        assert!(result.is_err());
1291        match result.unwrap_err() {
1292            crate::error::DurableError::OrphanedChild { operation_id, .. } => {
1293                assert_eq!(operation_id, "child-1");
1294            }
1295            _ => panic!("Expected OrphanedChild error"),
1296        }
1297    }
1298
1299    #[tokio::test]
1300    async fn test_with_batcher_creates_state_and_batcher() {
1301        let client = Arc::new(
1302            MockDurableServiceClient::new()
1303                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1304        );
1305
1306        let (state, mut batcher) = ExecutionState::with_batcher(
1307            "arn:test",
1308            "token-123",
1309            InitialExecutionState::new(),
1310            client,
1311            CheckpointBatcherConfig {
1312                max_batch_time_ms: 10,
1313                ..Default::default()
1314            },
1315            100,
1316        );
1317
1318        assert!(state.has_checkpoint_sender());
1319        assert_eq!(state.durable_execution_arn(), "arn:test");
1320
1321        let batcher_handle = tokio::spawn(async move {
1322            batcher.run().await;
1323        });
1324
1325        let update = OperationUpdate::start("op-1", OperationType::Step);
1326        let result = state.create_checkpoint(update, true).await;
1327
1328        drop(state);
1329        batcher_handle.await.unwrap();
1330
1331        assert!(result.is_ok());
1332    }
1333
1334    #[tokio::test]
1335    async fn test_checkpoint_sync_convenience_method() {
1336        let client = Arc::new(
1337            MockDurableServiceClient::new()
1338                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1339        );
1340
1341        let state = ExecutionState::new(
1342            "arn:test",
1343            "token-123",
1344            InitialExecutionState::new(),
1345            client,
1346        );
1347
1348        let update = OperationUpdate::start("op-1", OperationType::Step);
1349        let result = state.checkpoint_sync(update).await;
1350
1351        assert!(result.is_ok());
1352    }
1353
1354    #[tokio::test]
1355    async fn test_checkpoint_async_convenience_method() {
1356        let client = Arc::new(
1357            MockDurableServiceClient::new()
1358                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1359        );
1360
1361        let state = ExecutionState::new(
1362            "arn:test",
1363            "token-123",
1364            InitialExecutionState::new(),
1365            client,
1366        );
1367
1368        let update = OperationUpdate::start("op-1", OperationType::Step);
1369        let result = state.checkpoint_async(update).await;
1370
1371        assert!(result.is_ok());
1372    }
1373
1374    #[tokio::test]
1375    async fn test_shared_checkpoint_token() {
1376        let client = create_mock_client();
1377        let state = ExecutionState::new(
1378            "arn:test",
1379            "token-123",
1380            InitialExecutionState::new(),
1381            client,
1382        );
1383
1384        let shared_token = state.shared_checkpoint_token();
1385        assert_eq!(*shared_token.read().await, "token-123");
1386
1387        {
1388            let mut guard = shared_token.write().await;
1389            *guard = "modified-token".to_string();
1390        }
1391
1392        assert_eq!(state.checkpoint_token().await, "modified-token");
1393    }
1394
1395    // Tests for ReplayChildren support (Requirements 10.5, 10.6)
1396
1397    #[tokio::test]
1398    async fn test_load_child_operations_returns_children() {
1399        let client = create_mock_client();
1400
1401        let mut parent_op = Operation::new("parent-ctx", OperationType::Context);
1402        parent_op.status = OperationStatus::Succeeded;
1403
1404        let mut child1 = create_test_operation("child-1", OperationStatus::Succeeded);
1405        child1.parent_id = Some("parent-ctx".to_string());
1406
1407        let mut child2 = create_test_operation("child-2", OperationStatus::Succeeded);
1408        child2.parent_id = Some("parent-ctx".to_string());
1409
1410        let mut other_child = create_test_operation("other-child", OperationStatus::Succeeded);
1411        other_child.parent_id = Some("other-parent".to_string());
1412
1413        let initial_state =
1414            InitialExecutionState::with_operations(vec![parent_op, child1, child2, other_child]);
1415
1416        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1417
1418        let children = state.load_child_operations("parent-ctx").await.unwrap();
1419
1420        assert_eq!(children.len(), 2);
1421        let child_ids: Vec<&str> = children.iter().map(|c| c.operation_id.as_str()).collect();
1422        assert!(child_ids.contains(&"child-1"));
1423        assert!(child_ids.contains(&"child-2"));
1424    }
1425
1426    #[tokio::test]
1427    async fn test_load_child_operations_no_children() {
1428        let client = create_mock_client();
1429
1430        let parent_op = Operation::new("parent-ctx", OperationType::Context);
1431        let initial_state = InitialExecutionState::with_operations(vec![parent_op]);
1432
1433        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1434
1435        let children = state.load_child_operations("parent-ctx").await.unwrap();
1436
1437        assert!(children.is_empty());
1438    }
1439
1440    #[tokio::test]
1441    async fn test_get_child_operations_returns_cached_children() {
1442        let client = create_mock_client();
1443
1444        let mut parent_op = Operation::new("parent-ctx", OperationType::Context);
1445        parent_op.status = OperationStatus::Succeeded;
1446
1447        let mut child1 = create_test_operation("child-1", OperationStatus::Succeeded);
1448        child1.parent_id = Some("parent-ctx".to_string());
1449
1450        let mut child2 = create_test_operation("child-2", OperationStatus::Succeeded);
1451        child2.parent_id = Some("parent-ctx".to_string());
1452
1453        let initial_state = InitialExecutionState::with_operations(vec![parent_op, child1, child2]);
1454
1455        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1456
1457        let children = state.get_child_operations("parent-ctx").await;
1458
1459        assert_eq!(children.len(), 2);
1460    }
1461
1462    #[tokio::test]
1463    async fn test_get_child_operations_empty_for_nonexistent_parent() {
1464        let client = create_mock_client();
1465        let state = ExecutionState::new(
1466            "arn:test",
1467            "token-123",
1468            InitialExecutionState::new(),
1469            client,
1470        );
1471
1472        let children = state.get_child_operations("nonexistent-parent").await;
1473        assert!(children.is_empty());
1474    }
1475
1476    #[tokio::test]
1477    async fn test_has_replay_children_true() {
1478        let client = create_mock_client();
1479
1480        let mut ctx_op = Operation::new("ctx-with-replay", OperationType::Context);
1481        ctx_op.status = OperationStatus::Succeeded;
1482        ctx_op.context_details = Some(ContextDetails {
1483            result: None,
1484            replay_children: Some(true),
1485            error: None,
1486        });
1487
1488        let initial_state = InitialExecutionState::with_operations(vec![ctx_op]);
1489        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1490
1491        assert!(state.has_replay_children("ctx-with-replay").await);
1492    }
1493
1494    #[tokio::test]
1495    async fn test_has_replay_children_false_when_not_set() {
1496        let client = create_mock_client();
1497
1498        let mut ctx_op = Operation::new("ctx-no-replay", OperationType::Context);
1499        ctx_op.status = OperationStatus::Succeeded;
1500        ctx_op.context_details = Some(ContextDetails {
1501            result: None,
1502            replay_children: None,
1503            error: None,
1504        });
1505
1506        let initial_state = InitialExecutionState::with_operations(vec![ctx_op]);
1507        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1508
1509        assert!(!state.has_replay_children("ctx-no-replay").await);
1510    }
1511
1512    // Tests for CheckpointingMode (Requirements 24.1, 24.2, 24.3, 24.4)
1513
1514    #[tokio::test]
1515    async fn test_checkpointing_mode_default_is_batched() {
1516        let client = create_mock_client();
1517        let state = ExecutionState::new(
1518            "arn:test",
1519            "token-123",
1520            InitialExecutionState::new(),
1521            client,
1522        );
1523
1524        assert_eq!(
1525            state.checkpointing_mode(),
1526            crate::config::CheckpointingMode::Batched
1527        );
1528        assert!(state.is_batched_checkpointing());
1529        assert!(!state.is_eager_checkpointing());
1530        assert!(!state.is_optimistic_checkpointing());
1531    }
1532
1533    #[tokio::test]
1534    async fn test_checkpointing_mode_eager() {
1535        let client = create_mock_client();
1536        let state = ExecutionState::with_checkpointing_mode(
1537            "arn:test",
1538            "token-123",
1539            InitialExecutionState::new(),
1540            client,
1541            crate::config::CheckpointingMode::Eager,
1542        );
1543
1544        assert_eq!(
1545            state.checkpointing_mode(),
1546            crate::config::CheckpointingMode::Eager
1547        );
1548        assert!(state.is_eager_checkpointing());
1549        assert!(!state.is_batched_checkpointing());
1550        assert!(!state.is_optimistic_checkpointing());
1551        assert!(state.prioritizes_durability());
1552        assert!(!state.prioritizes_performance());
1553        assert!(!state.should_use_async_checkpoint());
1554    }
1555
1556    #[tokio::test]
1557    async fn test_checkpointing_mode_optimistic() {
1558        let client = create_mock_client();
1559        let state = ExecutionState::with_checkpointing_mode(
1560            "arn:test",
1561            "token-123",
1562            InitialExecutionState::new(),
1563            client,
1564            crate::config::CheckpointingMode::Optimistic,
1565        );
1566
1567        assert_eq!(
1568            state.checkpointing_mode(),
1569            crate::config::CheckpointingMode::Optimistic
1570        );
1571        assert!(state.is_optimistic_checkpointing());
1572        assert!(!state.is_eager_checkpointing());
1573        assert!(!state.is_batched_checkpointing());
1574        assert!(state.prioritizes_performance());
1575        assert!(!state.prioritizes_durability());
1576        assert!(state.should_use_async_checkpoint());
1577    }
1578
1579    #[tokio::test]
1580    async fn test_checkpointing_mode_batched_helpers() {
1581        let client = create_mock_client();
1582        let state = ExecutionState::with_checkpointing_mode(
1583            "arn:test",
1584            "token-123",
1585            InitialExecutionState::new(),
1586            client,
1587            crate::config::CheckpointingMode::Batched,
1588        );
1589
1590        assert!(!state.prioritizes_durability());
1591        assert!(!state.prioritizes_performance());
1592        assert!(state.should_use_async_checkpoint());
1593    }
1594
1595    #[tokio::test]
1596    async fn test_checkpoint_optimal_eager_mode() {
1597        let client = Arc::new(
1598            MockDurableServiceClient::new()
1599                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1600        );
1601
1602        let state = ExecutionState::with_checkpointing_mode(
1603            "arn:test",
1604            "token-123",
1605            InitialExecutionState::new(),
1606            client,
1607            crate::config::CheckpointingMode::Eager,
1608        );
1609
1610        let update = OperationUpdate::start("op-1", OperationType::Step);
1611        let result = state.checkpoint_optimal(update, false).await;
1612
1613        assert!(result.is_ok());
1614    }
1615
1616    #[tokio::test]
1617    async fn test_eager_mode_bypasses_batcher() {
1618        let client = Arc::new(
1619            MockDurableServiceClient::new()
1620                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1621        );
1622
1623        let (state, mut batcher) = ExecutionState::with_batcher_and_mode(
1624            "arn:test",
1625            "token-123",
1626            InitialExecutionState::new(),
1627            client,
1628            CheckpointBatcherConfig {
1629                max_batch_time_ms: 10,
1630                ..Default::default()
1631            },
1632            100,
1633            crate::config::CheckpointingMode::Eager,
1634        );
1635
1636        let batcher_handle = tokio::spawn(async move {
1637            batcher.run().await;
1638        });
1639
1640        let update = OperationUpdate::start("op-1", OperationType::Step);
1641        let result = state.create_checkpoint(update, false).await;
1642
1643        drop(state);
1644        batcher_handle.await.unwrap();
1645
1646        assert!(result.is_ok());
1647    }
1648
1649    // Tests for EXECUTION operation handling (Requirements 19.1-19.5)
1650
1651    #[tokio::test]
1652    async fn test_execution_operation_recognized() {
1653        let client = create_mock_client();
1654        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1655        let step_op = Operation::new("step-1", OperationType::Step);
1656
1657        let initial_state = InitialExecutionState::with_operations(vec![exec_op, step_op]);
1658        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1659
1660        assert!(state.execution_operation().is_some());
1661        let exec = state.execution_operation().unwrap();
1662        assert_eq!(exec.operation_type, OperationType::Execution);
1663        assert_eq!(exec.operation_id, "exec-123");
1664    }
1665
1666    #[tokio::test]
1667    async fn test_execution_operation_not_present() {
1668        let client = create_mock_client();
1669        let step_op = Operation::new("step-1", OperationType::Step);
1670
1671        let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1672        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1673
1674        assert!(state.execution_operation().is_none());
1675    }
1676
1677    #[tokio::test]
1678    async fn test_get_original_input_raw() {
1679        let client = create_mock_client();
1680        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1681
1682        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1683        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1684
1685        let input = state.get_original_input_raw();
1686        assert!(input.is_some());
1687        assert_eq!(input.unwrap(), r#"{"order_id": "123"}"#);
1688    }
1689
1690    #[tokio::test]
1691    async fn test_get_original_input_raw_no_payload() {
1692        let client = create_mock_client();
1693        let exec_op = create_execution_operation(None);
1694
1695        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1696        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1697
1698        let input = state.get_original_input_raw();
1699        assert!(input.is_none());
1700    }
1701
1702    #[tokio::test]
1703    async fn test_get_original_input_raw_no_execution_operation() {
1704        let client = create_mock_client();
1705        let step_op = Operation::new("step-1", OperationType::Step);
1706
1707        let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1708        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1709
1710        let input = state.get_original_input_raw();
1711        assert!(input.is_none());
1712    }
1713
1714    #[tokio::test]
1715    async fn test_execution_operation_id() {
1716        let client = create_mock_client();
1717        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1718
1719        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1720        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1721
1722        let id = state.execution_operation_id();
1723        assert!(id.is_some());
1724        assert_eq!(id.unwrap(), "exec-123");
1725    }
1726
1727    #[tokio::test]
1728    async fn test_complete_execution_success() {
1729        let client = Arc::new(
1730            MockDurableServiceClient::new()
1731                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1732        );
1733
1734        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1735        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1736        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1737
1738        let result = state
1739            .complete_execution_success(Some(r#"{"status": "completed"}"#.to_string()))
1740            .await;
1741        assert!(result.is_ok());
1742
1743        let op = state.get_operation("exec-123").await.unwrap();
1744        assert_eq!(op.status, OperationStatus::Succeeded);
1745        assert_eq!(op.result, Some(r#"{"status": "completed"}"#.to_string()));
1746    }
1747
1748    #[tokio::test]
1749    async fn test_complete_execution_success_no_execution_operation() {
1750        let client = create_mock_client();
1751        let step_op = Operation::new("step-1", OperationType::Step);
1752
1753        let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1754        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1755
1756        let result = state
1757            .complete_execution_success(Some("result".to_string()))
1758            .await;
1759        assert!(result.is_err());
1760        match result.unwrap_err() {
1761            crate::error::DurableError::Validation { message } => {
1762                assert!(message.contains("no EXECUTION operation"));
1763            }
1764            _ => panic!("Expected Validation error"),
1765        }
1766    }
1767
1768    #[tokio::test]
1769    async fn test_complete_execution_failure() {
1770        let client = Arc::new(
1771            MockDurableServiceClient::new()
1772                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1773        );
1774
1775        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1776        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1777        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1778
1779        let error = ErrorObject::new("ProcessingError", "Order processing failed");
1780        let result = state.complete_execution_failure(error).await;
1781        assert!(result.is_ok());
1782
1783        let op = state.get_operation("exec-123").await.unwrap();
1784        assert_eq!(op.status, OperationStatus::Failed);
1785        assert!(op.error.is_some());
1786        assert_eq!(op.error.as_ref().unwrap().error_type, "ProcessingError");
1787    }
1788
1789    #[tokio::test]
1790    async fn test_complete_execution_failure_no_execution_operation() {
1791        let client = create_mock_client();
1792        let step_op = Operation::new("step-1", OperationType::Step);
1793
1794        let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1795        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1796
1797        let error = ErrorObject::new("TestError", "Test message");
1798        let result = state.complete_execution_failure(error).await;
1799        assert!(result.is_err());
1800        match result.unwrap_err() {
1801            crate::error::DurableError::Validation { message } => {
1802                assert!(message.contains("no EXECUTION operation"));
1803            }
1804            _ => panic!("Expected Validation error"),
1805        }
1806    }
1807
1808    #[tokio::test]
1809    async fn test_with_batcher_recognizes_execution_operation() {
1810        let client = Arc::new(MockDurableServiceClient::new());
1811        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1812
1813        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1814        let (state, _batcher) = ExecutionState::with_batcher(
1815            "arn:test",
1816            "token-123",
1817            initial_state,
1818            client,
1819            CheckpointBatcherConfig::default(),
1820            100,
1821        );
1822
1823        assert!(state.execution_operation().is_some());
1824        let exec = state.execution_operation().unwrap();
1825        assert_eq!(exec.operation_type, OperationType::Execution);
1826        assert_eq!(
1827            state.get_original_input_raw(),
1828            Some(r#"{"order_id": "123"}"#)
1829        );
1830    }
1831
1832    // Property-based tests for orphan prevention
1833
1834    mod property_tests {
1835        use super::*;
1836        use proptest::prelude::*;
1837
1838        proptest! {
1839            #![proptest_config(ProptestConfig::with_cases(100))]
1840
1841            #[test]
1842            fn prop_orphaned_child_checkpoint_fails(
1843                parent_id in "[a-z]{5,10}",
1844                child_id in "[a-z]{5,10}",
1845            ) {
1846                let rt = tokio::runtime::Runtime::new().unwrap();
1847                let result: Result<(), TestCaseError> = rt.block_on(async {
1848                    let client = Arc::new(
1849                        MockDurableServiceClient::new()
1850                            .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1851                    );
1852
1853                    let state = ExecutionState::new(
1854                        "arn:test",
1855                        "token-123",
1856                        InitialExecutionState::new(),
1857                        client,
1858                    );
1859
1860                    state.mark_parent_done(&parent_id).await;
1861
1862                    let update = OperationUpdate::start(&child_id, OperationType::Step)
1863                        .with_parent_id(&parent_id);
1864                    let checkpoint_result = state.create_checkpoint(update, true).await;
1865
1866                    match checkpoint_result {
1867                        Err(DurableError::OrphanedChild { operation_id, .. }) => {
1868                            if operation_id != child_id {
1869                                return Err(TestCaseError::fail(format!(
1870                                    "Expected operation_id '{}' in OrphanedChild error, got '{}'",
1871                                    child_id, operation_id
1872                                )));
1873                            }
1874                        }
1875                        Ok(_) => {
1876                            return Err(TestCaseError::fail(
1877                                "Expected OrphanedChild error, but checkpoint succeeded"
1878                            ));
1879                        }
1880                        Err(other) => {
1881                            return Err(TestCaseError::fail(format!(
1882                                "Expected OrphanedChild error, got {:?}",
1883                                other
1884                            )));
1885                        }
1886                    }
1887
1888                    Ok(())
1889                });
1890                result?;
1891            }
1892
1893            #[test]
1894            fn prop_non_orphaned_child_checkpoint_succeeds(
1895                parent_id in "[a-z]{5,10}",
1896                child_id in "[a-z]{5,10}",
1897            ) {
1898                let rt = tokio::runtime::Runtime::new().unwrap();
1899                let result: Result<(), TestCaseError> = rt.block_on(async {
1900                    let client = Arc::new(
1901                        MockDurableServiceClient::new()
1902                            .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1903                    );
1904
1905                    let state = ExecutionState::new(
1906                        "arn:test",
1907                        "token-123",
1908                        InitialExecutionState::new(),
1909                        client,
1910                    );
1911
1912                    let update = OperationUpdate::start(&child_id, OperationType::Step)
1913                        .with_parent_id(&parent_id);
1914                    let checkpoint_result = state.create_checkpoint(update, true).await;
1915
1916                    if let Err(e) = checkpoint_result {
1917                        return Err(TestCaseError::fail(format!(
1918                            "Expected checkpoint to succeed for non-orphaned child, got error: {:?}",
1919                            e
1920                        )));
1921                    }
1922
1923                    Ok(())
1924                });
1925                result?;
1926            }
1927
1928            #[test]
1929            fn prop_root_operation_never_orphaned(
1930                operation_id in "[a-z]{5,10}",
1931            ) {
1932                let rt = tokio::runtime::Runtime::new().unwrap();
1933                let result: Result<(), TestCaseError> = rt.block_on(async {
1934                    let client = Arc::new(
1935                        MockDurableServiceClient::new()
1936                            .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1937                    );
1938
1939                    let state = ExecutionState::new(
1940                        "arn:test",
1941                        "token-123",
1942                        InitialExecutionState::new(),
1943                        client,
1944                    );
1945
1946                    let update = OperationUpdate::start(&operation_id, OperationType::Step);
1947                    let checkpoint_result = state.create_checkpoint(update, true).await;
1948
1949                    if let Err(e) = checkpoint_result {
1950                        return Err(TestCaseError::fail(format!(
1951                            "Expected checkpoint to succeed for root operation, got error: {:?}",
1952                            e
1953                        )));
1954                    }
1955
1956                    Ok(())
1957                });
1958                result?;
1959            }
1960
1961            #[test]
1962            fn prop_marking_parent_done_affects_future_checkpoints(
1963                parent_id in "[a-z]{5,10}",
1964                child_id_before in "[a-z]{5,10}",
1965                child_id_after in "[a-z]{5,10}",
1966            ) {
1967                let rt = tokio::runtime::Runtime::new().unwrap();
1968                let result: Result<(), TestCaseError> = rt.block_on(async {
1969                    let client = Arc::new(
1970                        MockDurableServiceClient::new()
1971                            .with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
1972                            .with_checkpoint_response(Ok(CheckpointResponse::new("token-2")))
1973                    );
1974
1975                    let state = ExecutionState::new(
1976                        "arn:test",
1977                        "token-123",
1978                        InitialExecutionState::new(),
1979                        client,
1980                    );
1981
1982                    let update_before = OperationUpdate::start(&child_id_before, OperationType::Step)
1983                        .with_parent_id(&parent_id);
1984                    let result_before = state.create_checkpoint(update_before, true).await;
1985
1986                    if let Err(e) = result_before {
1987                        return Err(TestCaseError::fail(format!(
1988                            "Expected first checkpoint to succeed, got error: {:?}",
1989                            e
1990                        )));
1991                    }
1992
1993                    state.mark_parent_done(&parent_id).await;
1994
1995                    let update_after = OperationUpdate::start(&child_id_after, OperationType::Step)
1996                        .with_parent_id(&parent_id);
1997                    let result_after = state.create_checkpoint(update_after, true).await;
1998
1999                    match result_after {
2000                        Err(DurableError::OrphanedChild { .. }) => {
2001                            // Expected
2002                        }
2003                        Ok(_) => {
2004                            return Err(TestCaseError::fail(
2005                                "Expected OrphanedChild error after marking parent done"
2006                            ));
2007                        }
2008                        Err(other) => {
2009                            return Err(TestCaseError::fail(format!(
2010                                "Expected OrphanedChild error, got {:?}",
2011                                other
2012                            )));
2013                        }
2014                    }
2015
2016                    Ok(())
2017                });
2018                result?;
2019            }
2020        }
2021    }
2022}