Skip to main content

durable_execution_sdk/state/
execution_state.rs

1//! Main execution state management.
2//!
3//! This module provides the [`ExecutionState`] struct which manages the execution
4//! state for a durable execution, including checkpoint tracking, replay logic,
5//! and operation state management.
6
7use std::collections::{HashMap, HashSet};
8use std::sync::atomic::{AtomicU8, Ordering};
9use std::sync::Arc;
10use std::time::Duration as StdDuration;
11
12use tokio::sync::{Mutex, RwLock};
13
14use crate::client::SharedDurableServiceClient;
15use crate::error::{DurableError, ErrorObject};
16use crate::operation::{Operation, OperationStatus, OperationType, OperationUpdate};
17use crate::types::ExecutionArn;
18
19use super::batcher::{
20    create_checkpoint_queue, CheckpointBatcher, CheckpointBatcherConfig, CheckpointSender,
21};
22use super::checkpoint_result::CheckpointedResult;
23use super::replay_status::ReplayStatus;
24
25/// Manages the execution state for a durable execution.
26///
27/// This struct tracks all checkpointed operations, handles replay logic,
28/// and manages communication with the Lambda durable execution service.
29pub struct ExecutionState {
30    /// The ARN of the durable execution
31    durable_execution_arn: String,
32
33    /// The current checkpoint token (updated after each checkpoint batch)
34    checkpoint_token: Arc<RwLock<String>>,
35
36    /// Map of operation_id to Operation for quick lookup during replay
37    operations: RwLock<HashMap<String, Operation>>,
38
39    /// The service client for communicating with Lambda
40    service_client: SharedDurableServiceClient,
41
42    /// Current replay status (Replay or New)
43    replay_status: AtomicU8,
44
45    /// Set of operation IDs that have been replayed (for tracking replay progress)
46    replayed_operations: RwLock<HashSet<String>>,
47
48    /// Marker for pagination when loading additional operations
49    next_marker: RwLock<Option<String>>,
50
51    /// Set of parent operation IDs that have completed (for orphan prevention)
52    parent_done_lock: Mutex<HashSet<String>>,
53
54    /// Optional checkpoint sender for batched checkpointing
55    checkpoint_sender: Option<CheckpointSender>,
56
57    /// The EXECUTION operation (first operation in state) - provides access to original input
58    /// Requirements: 19.1, 19.2
59    execution_operation: Option<Operation>,
60
61    /// The checkpointing mode that controls the trade-off between durability and performance.
62    checkpointing_mode: crate::config::CheckpointingMode,
63}
64
65impl ExecutionState {
66    /// Creates a new ExecutionState from the Lambda invocation input.
67    ///
68    /// # Arguments
69    ///
70    /// * `durable_execution_arn` - The ARN of the durable execution
71    /// * `checkpoint_token` - The initial checkpoint token
72    /// * `initial_state` - The initial execution state with operations
73    /// * `service_client` - The service client for Lambda communication
74    pub fn new(
75        durable_execution_arn: impl Into<String>,
76        checkpoint_token: impl Into<String>,
77        initial_state: crate::lambda::InitialExecutionState,
78        service_client: SharedDurableServiceClient,
79    ) -> Self {
80        Self::with_checkpointing_mode(
81            durable_execution_arn,
82            checkpoint_token,
83            initial_state,
84            service_client,
85            crate::config::CheckpointingMode::default(),
86        )
87    }
88
89    /// Creates a new ExecutionState with a specific checkpointing mode.
90    pub fn with_checkpointing_mode(
91        durable_execution_arn: impl Into<String>,
92        checkpoint_token: impl Into<String>,
93        initial_state: crate::lambda::InitialExecutionState,
94        service_client: SharedDurableServiceClient,
95        checkpointing_mode: crate::config::CheckpointingMode,
96    ) -> Self {
97        // Find and extract the EXECUTION operation (first operation of type EXECUTION)
98        let execution_operation = initial_state
99            .operations
100            .iter()
101            .find(|op| op.operation_type == OperationType::Execution)
102            .cloned();
103
104        // Build the operations map from the initial state
105        let operations: HashMap<String, Operation> = initial_state
106            .operations
107            .into_iter()
108            .map(|op| (op.operation_id.clone(), op))
109            .collect();
110
111        // Determine initial replay status based on whether we have operations
112        let replay_status = if operations.is_empty() {
113            ReplayStatus::New
114        } else {
115            ReplayStatus::Replay
116        };
117
118        Self {
119            durable_execution_arn: durable_execution_arn.into(),
120            checkpoint_token: Arc::new(RwLock::new(checkpoint_token.into())),
121            operations: RwLock::new(operations),
122            service_client,
123            replay_status: AtomicU8::new(replay_status as u8),
124            replayed_operations: RwLock::new(HashSet::new()),
125            next_marker: RwLock::new(initial_state.next_marker),
126            parent_done_lock: Mutex::new(HashSet::new()),
127            checkpoint_sender: None,
128            execution_operation,
129            checkpointing_mode,
130        }
131    }
132
133    /// Creates a new ExecutionState with a checkpoint batcher.
134    ///
135    /// This method sets up the checkpoint queue and batcher for efficient
136    /// batched checkpointing. Returns the ExecutionState and a handle to
137    /// the batcher that should be run in a background task.
138    pub fn with_batcher(
139        durable_execution_arn: impl Into<String>,
140        checkpoint_token: impl Into<String>,
141        initial_state: crate::lambda::InitialExecutionState,
142        service_client: SharedDurableServiceClient,
143        batcher_config: CheckpointBatcherConfig,
144        queue_buffer_size: usize,
145    ) -> (Self, CheckpointBatcher) {
146        Self::with_batcher_and_mode(
147            durable_execution_arn,
148            checkpoint_token,
149            initial_state,
150            service_client,
151            batcher_config,
152            queue_buffer_size,
153            crate::config::CheckpointingMode::default(),
154        )
155    }
156
157    /// Creates a new ExecutionState with a checkpoint batcher and specific checkpointing mode.
158    pub fn with_batcher_and_mode(
159        durable_execution_arn: impl Into<String>,
160        checkpoint_token: impl Into<String>,
161        initial_state: crate::lambda::InitialExecutionState,
162        service_client: SharedDurableServiceClient,
163        batcher_config: CheckpointBatcherConfig,
164        queue_buffer_size: usize,
165        checkpointing_mode: crate::config::CheckpointingMode,
166    ) -> (Self, CheckpointBatcher) {
167        let arn: String = durable_execution_arn.into();
168        let token: String = checkpoint_token.into();
169
170        // Find and extract the EXECUTION operation
171        let execution_operation = initial_state
172            .operations
173            .iter()
174            .find(|op| op.operation_type == OperationType::Execution)
175            .cloned();
176
177        // Build the operations map from the initial state
178        let operations: HashMap<String, Operation> = initial_state
179            .operations
180            .into_iter()
181            .map(|op| (op.operation_id.clone(), op))
182            .collect();
183
184        // Determine initial replay status based on whether we have operations
185        let replay_status = if operations.is_empty() {
186            ReplayStatus::New
187        } else {
188            ReplayStatus::Replay
189        };
190
191        // Create the checkpoint queue
192        let (sender, rx) = create_checkpoint_queue(queue_buffer_size);
193        let checkpoint_token = Arc::new(RwLock::new(token));
194
195        // Create the batcher
196        let batcher = CheckpointBatcher::new(
197            batcher_config,
198            rx,
199            service_client.clone(),
200            arn.clone(),
201            checkpoint_token.clone(),
202        );
203
204        let state = Self {
205            durable_execution_arn: arn,
206            checkpoint_token,
207            operations: RwLock::new(operations),
208            service_client,
209            replay_status: AtomicU8::new(replay_status as u8),
210            replayed_operations: RwLock::new(HashSet::new()),
211            next_marker: RwLock::new(initial_state.next_marker),
212            parent_done_lock: Mutex::new(HashSet::new()),
213            checkpoint_sender: Some(sender),
214            execution_operation,
215            checkpointing_mode,
216        };
217
218        (state, batcher)
219    }
220
221    /// Returns the durable execution ARN.
222    pub fn durable_execution_arn(&self) -> &str {
223        &self.durable_execution_arn
224    }
225
226    /// Returns the durable execution ARN as an `ExecutionArn` newtype.
227    #[inline]
228    pub fn durable_execution_arn_typed(&self) -> ExecutionArn {
229        ExecutionArn::from(self.durable_execution_arn.clone())
230    }
231
232    /// Returns the current checkpoint token.
233    pub async fn checkpoint_token(&self) -> String {
234        self.checkpoint_token.read().await.clone()
235    }
236
237    /// Updates the checkpoint token after a successful checkpoint.
238    pub async fn set_checkpoint_token(&self, token: impl Into<String>) {
239        let mut guard = self.checkpoint_token.write().await;
240        *guard = token.into();
241    }
242
243    /// Returns the current replay status.
244    ///
245    /// # Memory Ordering
246    ///
247    /// Uses `Ordering::Acquire` to ensure that when we read the replay status,
248    /// we also see all the operations that were replayed before the status was
249    /// set to `New`. This creates a happens-before relationship with the
250    /// `Release` store in `track_replay`.
251    ///
252    /// Requirements: 4.2, 4.3, 4.6
253    pub fn replay_status(&self) -> ReplayStatus {
254        // Acquire ordering ensures we see all writes that happened before
255        // the corresponding Release store that set this value.
256        ReplayStatus::from(self.replay_status.load(Ordering::Acquire))
257    }
258
259    /// Returns true if currently in replay mode.
260    pub fn is_replay(&self) -> bool {
261        self.replay_status().is_replay()
262    }
263
264    /// Returns true if executing new operations.
265    pub fn is_new(&self) -> bool {
266        self.replay_status().is_new()
267    }
268
269    /// Returns the current checkpointing mode.
270    pub fn checkpointing_mode(&self) -> crate::config::CheckpointingMode {
271        self.checkpointing_mode
272    }
273
274    /// Returns true if eager checkpointing mode is enabled.
275    pub fn is_eager_checkpointing(&self) -> bool {
276        self.checkpointing_mode.is_eager()
277    }
278
279    /// Returns true if batched checkpointing mode is enabled.
280    pub fn is_batched_checkpointing(&self) -> bool {
281        self.checkpointing_mode.is_batched()
282    }
283
284    /// Returns true if optimistic checkpointing mode is enabled.
285    pub fn is_optimistic_checkpointing(&self) -> bool {
286        self.checkpointing_mode.is_optimistic()
287    }
288
289    /// Returns a reference to the EXECUTION operation if it exists.
290    pub fn execution_operation(&self) -> Option<&Operation> {
291        self.execution_operation.as_ref()
292    }
293
294    /// Returns the original user input from the EXECUTION operation.
295    pub fn get_original_input_raw(&self) -> Option<&str> {
296        self.execution_operation
297            .as_ref()
298            .and_then(|op| op.execution_details.as_ref())
299            .and_then(|details| details.input_payload.as_deref())
300    }
301
302    /// Returns the EXECUTION operation's ID if it exists.
303    pub fn execution_operation_id(&self) -> Option<&str> {
304        self.execution_operation
305            .as_ref()
306            .map(|op| op.operation_id.as_str())
307    }
308
309    /// Completes the execution with a successful result via checkpointing.
310    pub async fn complete_execution_success(
311        &self,
312        result: Option<String>,
313    ) -> Result<(), DurableError> {
314        let execution_id =
315            self.execution_operation_id()
316                .ok_or_else(|| DurableError::Validation {
317                    message: "Cannot complete execution: no EXECUTION operation exists".to_string(),
318                })?;
319
320        let update = OperationUpdate::succeed(execution_id, OperationType::Execution, result);
321
322        self.create_checkpoint(update, true).await
323    }
324
325    /// Completes the execution with a failure via checkpointing.
326    pub async fn complete_execution_failure(&self, error: ErrorObject) -> Result<(), DurableError> {
327        let execution_id =
328            self.execution_operation_id()
329                .ok_or_else(|| DurableError::Validation {
330                    message: "Cannot complete execution: no EXECUTION operation exists".to_string(),
331                })?;
332
333        let update = OperationUpdate::fail(execution_id, OperationType::Execution, error);
334
335        self.create_checkpoint(update, true).await
336    }
337
338    /// Gets the checkpoint result for an operation.
339    pub async fn get_checkpoint_result(&self, operation_id: &str) -> CheckpointedResult {
340        let operations = self.operations.read().await;
341        CheckpointedResult::new(operations.get(operation_id).cloned())
342    }
343
344    /// Tracks that an operation has been replayed.
345    pub async fn track_replay(&self, operation_id: &str) {
346        {
347            let mut replayed = self.replayed_operations.write().await;
348            replayed.insert(operation_id.to_string());
349        }
350
351        let (replayed_count, total_count) = {
352            let replayed = self.replayed_operations.read().await;
353            let operations = self.operations.read().await;
354            (replayed.len(), operations.len())
355        };
356
357        if replayed_count >= total_count {
358            let has_more = self.next_marker.read().await.is_some();
359            if !has_more {
360                // Release ordering ensures that all the replay tracking writes
361                // (replayed_operations updates) are visible to any thread that
362                // subsequently reads the replay_status with Acquire ordering.
363                // This establishes a happens-before relationship.
364                //
365                // Requirements: 4.2, 4.3, 4.6
366                self.replay_status
367                    .store(ReplayStatus::New as u8, Ordering::Release);
368            }
369        }
370    }
371
372    /// Loads additional operations from the service for pagination.
373    pub async fn load_more_operations(&self) -> Result<bool, DurableError> {
374        let marker = {
375            let guard = self.next_marker.read().await;
376            match guard.as_ref() {
377                Some(m) => m.clone(),
378                None => return Ok(false),
379            }
380        };
381
382        let response = self.get_operations_with_retry(&marker).await?;
383
384        {
385            let mut operations = self.operations.write().await;
386            for op in response.operations {
387                operations.insert(op.operation_id.clone(), op);
388            }
389        }
390
391        {
392            let mut next_marker = self.next_marker.write().await;
393            *next_marker = response.next_marker;
394        }
395
396        Ok(true)
397    }
398
399    /// Fetches operations from the service with retry for throttling errors.
400    async fn get_operations_with_retry(
401        &self,
402        marker: &str,
403    ) -> Result<crate::client::GetOperationsResponse, DurableError> {
404        const MAX_RETRIES: u32 = 5;
405        const INITIAL_DELAY_MS: u64 = 100;
406        const MAX_DELAY_MS: u64 = 10_000;
407        const BACKOFF_MULTIPLIER: u64 = 2;
408
409        let mut attempt = 0;
410        let mut delay_ms = INITIAL_DELAY_MS;
411
412        loop {
413            let result = self
414                .service_client
415                .get_operations(&self.durable_execution_arn, marker)
416                .await;
417
418            match result {
419                Ok(response) => return Ok(response),
420                Err(error) if error.is_throttling() => {
421                    attempt += 1;
422                    if attempt > MAX_RETRIES {
423                        tracing::warn!(
424                            attempt = attempt,
425                            "GetOperations throttling: max retries exceeded"
426                        );
427                        return Err(error);
428                    }
429
430                    let actual_delay = error.get_retry_after_ms().unwrap_or(delay_ms);
431                    tracing::debug!(
432                        attempt = attempt,
433                        delay_ms = actual_delay,
434                        "GetOperations throttled, retrying"
435                    );
436                    tokio::time::sleep(StdDuration::from_millis(actual_delay)).await;
437                    delay_ms = (delay_ms * BACKOFF_MULTIPLIER).min(MAX_DELAY_MS);
438                }
439                Err(error) => return Err(error),
440            }
441        }
442    }
443
444    /// Loads all remaining operations from the service.
445    pub async fn load_all_operations(&self) -> Result<(), DurableError> {
446        while self.load_more_operations().await? {}
447        Ok(())
448    }
449
450    /// Returns true if there are more operations to load.
451    pub async fn has_more_operations(&self) -> bool {
452        self.next_marker.read().await.is_some()
453    }
454
455    /// Returns the number of loaded operations.
456    pub async fn operation_count(&self) -> usize {
457        self.operations.read().await.len()
458    }
459
460    /// Returns a reference to the service client.
461    pub fn service_client(&self) -> &SharedDurableServiceClient {
462        &self.service_client
463    }
464
465    /// Adds an operation to the local cache.
466    pub async fn add_operation(&self, operation: Operation) {
467        let mut operations = self.operations.write().await;
468        operations.insert(operation.operation_id.clone(), operation);
469    }
470
471    /// Updates an existing operation in the local cache.
472    pub async fn update_operation(
473        &self,
474        operation_id: &str,
475        update_fn: impl FnOnce(&mut Operation),
476    ) {
477        let mut operations = self.operations.write().await;
478        if let Some(op) = operations.get_mut(operation_id) {
479            update_fn(op);
480        }
481    }
482
483    /// Checks if an operation exists in the local cache.
484    pub async fn has_operation(&self, operation_id: &str) -> bool {
485        self.operations.read().await.contains_key(operation_id)
486    }
487
488    /// Gets a clone of an operation from the local cache.
489    pub async fn get_operation(&self, operation_id: &str) -> Option<Operation> {
490        self.operations.read().await.get(operation_id).cloned()
491    }
492
493    /// Marks a parent operation as done.
494    pub async fn mark_parent_done(&self, parent_id: &str) {
495        let mut done_parents = self.parent_done_lock.lock().await;
496        done_parents.insert(parent_id.to_string());
497    }
498
499    /// Checks if a parent operation has been marked as done.
500    pub async fn is_parent_done(&self, parent_id: &str) -> bool {
501        let done_parents = self.parent_done_lock.lock().await;
502        done_parents.contains(parent_id)
503    }
504
505    /// Checks if an operation would be orphaned.
506    pub async fn is_orphaned(&self, parent_id: Option<&str>) -> bool {
507        match parent_id {
508            Some(pid) => self.is_parent_done(pid).await,
509            None => false,
510        }
511    }
512
513    /// Creates a checkpoint for an operation.
514    pub async fn create_checkpoint(
515        &self,
516        operation: OperationUpdate,
517        is_sync: bool,
518    ) -> Result<(), DurableError> {
519        // Check for orphaned child
520        if let Some(ref parent_id) = operation.parent_id {
521            if self.is_parent_done(parent_id).await {
522                return Err(DurableError::OrphanedChild {
523                    message: format!(
524                        "Cannot checkpoint operation {} - parent {} has completed",
525                        operation.operation_id, parent_id
526                    ),
527                    operation_id: operation.operation_id.clone(),
528                });
529            }
530        }
531
532        // Determine effective sync behavior based on checkpointing mode
533        let effective_is_sync = match self.checkpointing_mode {
534            crate::config::CheckpointingMode::Eager => true,
535            crate::config::CheckpointingMode::Batched => is_sync,
536            crate::config::CheckpointingMode::Optimistic => is_sync,
537        };
538
539        // In Eager mode, bypass the batcher and send directly
540        if self.checkpointing_mode.is_eager() {
541            return self.checkpoint_direct(operation, effective_is_sync).await;
542        }
543
544        // Use the checkpoint sender if available (batched mode)
545        if let Some(ref sender) = self.checkpoint_sender {
546            let result = sender
547                .checkpoint(operation.clone(), effective_is_sync)
548                .await;
549            if result.is_ok() {
550                self.update_local_cache_from_update(&operation).await;
551            }
552            return result;
553        }
554
555        // Direct checkpoint (non-batched mode)
556        self.checkpoint_direct(operation, effective_is_sync).await
557    }
558
559    /// Sends a checkpoint directly to the service, bypassing the batcher.
560    async fn checkpoint_direct(
561        &self,
562        operation: OperationUpdate,
563        _is_sync: bool,
564    ) -> Result<(), DurableError> {
565        let token = self.checkpoint_token.read().await.clone();
566        let response = self
567            .service_client
568            .checkpoint(&self.durable_execution_arn, &token, vec![operation.clone()])
569            .await?;
570
571        {
572            let mut token_guard = self.checkpoint_token.write().await;
573            *token_guard = response.checkpoint_token;
574        }
575
576        // Update local cache from the response's NewExecutionState if available
577        if let Some(ref new_state) = response.new_execution_state {
578            self.update_local_cache_from_response(new_state).await;
579        } else {
580            self.update_local_cache_from_update(&operation).await;
581        }
582        Ok(())
583    }
584
585    /// Creates a checkpoint and returns the full response including NewExecutionState.
586    /// This is useful for operations like CALLBACK that need service-generated values.
587    pub async fn create_checkpoint_with_response(
588        &self,
589        operation: OperationUpdate,
590    ) -> Result<crate::client::CheckpointResponse, DurableError> {
591        // Check for orphaned child
592        if let Some(ref parent_id) = operation.parent_id {
593            if self.is_parent_done(parent_id).await {
594                return Err(DurableError::OrphanedChild {
595                    message: format!(
596                        "Cannot checkpoint operation {} - parent {} has completed",
597                        operation.operation_id, parent_id
598                    ),
599                    operation_id: operation.operation_id.clone(),
600                });
601            }
602        }
603
604        let token = self.checkpoint_token.read().await.clone();
605        let response = self
606            .service_client
607            .checkpoint(&self.durable_execution_arn, &token, vec![operation.clone()])
608            .await?;
609
610        tracing::debug!(
611            has_new_state = response.new_execution_state.is_some(),
612            num_operations = response
613                .new_execution_state
614                .as_ref()
615                .map(|s| s.operations.len())
616                .unwrap_or(0),
617            "Checkpoint response received"
618        );
619
620        {
621            let mut token_guard = self.checkpoint_token.write().await;
622            *token_guard = response.checkpoint_token.clone();
623        }
624
625        // Update local cache from the response's NewExecutionState if available
626        if let Some(ref new_state) = response.new_execution_state {
627            self.update_local_cache_from_response(new_state).await;
628        } else {
629            self.update_local_cache_from_update(&operation).await;
630        }
631
632        Ok(response)
633    }
634
635    /// Updates the local operation cache from a checkpoint response's NewExecutionState.
636    async fn update_local_cache_from_response(&self, new_state: &crate::client::NewExecutionState) {
637        let mut operations = self.operations.write().await;
638        for op in &new_state.operations {
639            operations.insert(op.operation_id.clone(), op.clone());
640        }
641    }
642
643    /// Creates a synchronous checkpoint (waits for confirmation).
644    pub async fn checkpoint_sync(&self, operation: OperationUpdate) -> Result<(), DurableError> {
645        self.create_checkpoint(operation, true).await
646    }
647
648    /// Creates an asynchronous checkpoint (fire-and-forget).
649    pub async fn checkpoint_async(&self, operation: OperationUpdate) -> Result<(), DurableError> {
650        self.create_checkpoint(operation, false).await
651    }
652
653    /// Creates a checkpoint using the optimal sync behavior for the current mode.
654    pub async fn checkpoint_optimal(
655        &self,
656        operation: OperationUpdate,
657        prefer_sync: bool,
658    ) -> Result<(), DurableError> {
659        let is_sync = match self.checkpointing_mode {
660            crate::config::CheckpointingMode::Eager => true,
661            crate::config::CheckpointingMode::Batched => prefer_sync,
662            crate::config::CheckpointingMode::Optimistic => prefer_sync,
663        };
664        self.create_checkpoint(operation, is_sync).await
665    }
666
667    /// Returns whether async checkpointing is recommended for the current mode.
668    pub fn should_use_async_checkpoint(&self) -> bool {
669        match self.checkpointing_mode {
670            crate::config::CheckpointingMode::Eager => false,
671            crate::config::CheckpointingMode::Batched => true,
672            crate::config::CheckpointingMode::Optimistic => true,
673        }
674    }
675
676    /// Returns whether the current mode prioritizes performance over durability.
677    pub fn prioritizes_performance(&self) -> bool {
678        self.checkpointing_mode.is_optimistic()
679    }
680
681    /// Returns whether the current mode prioritizes durability over performance.
682    pub fn prioritizes_durability(&self) -> bool {
683        self.checkpointing_mode.is_eager()
684    }
685
686    /// Updates the local operation cache based on an operation update.
687    async fn update_local_cache_from_update(&self, update: &OperationUpdate) {
688        let mut operations = self.operations.write().await;
689
690        match update.action {
691            crate::operation::OperationAction::Start => {
692                let mut op = Operation::new(&update.operation_id, update.operation_type);
693                op.parent_id = update.parent_id.clone();
694                op.name = update.name.clone();
695                operations.insert(update.operation_id.clone(), op);
696            }
697            crate::operation::OperationAction::Succeed => {
698                if let Some(op) = operations.get_mut(&update.operation_id) {
699                    op.status = OperationStatus::Succeeded;
700                    op.result = update.result.clone();
701                } else {
702                    let mut op = Operation::new(&update.operation_id, update.operation_type);
703                    op.status = OperationStatus::Succeeded;
704                    op.result = update.result.clone();
705                    op.parent_id = update.parent_id.clone();
706                    op.name = update.name.clone();
707                    operations.insert(update.operation_id.clone(), op);
708                }
709            }
710            crate::operation::OperationAction::Fail => {
711                if let Some(op) = operations.get_mut(&update.operation_id) {
712                    op.status = OperationStatus::Failed;
713                    op.error = update.error.clone();
714                } else {
715                    let mut op = Operation::new(&update.operation_id, update.operation_type);
716                    op.status = OperationStatus::Failed;
717                    op.error = update.error.clone();
718                    op.parent_id = update.parent_id.clone();
719                    op.name = update.name.clone();
720                    operations.insert(update.operation_id.clone(), op);
721                }
722            }
723            crate::operation::OperationAction::Cancel => {
724                if let Some(op) = operations.get_mut(&update.operation_id) {
725                    op.status = OperationStatus::Cancelled;
726                } else {
727                    let mut op = Operation::new(&update.operation_id, update.operation_type);
728                    op.status = OperationStatus::Cancelled;
729                    op.parent_id = update.parent_id.clone();
730                    op.name = update.name.clone();
731                    operations.insert(update.operation_id.clone(), op);
732                }
733            }
734            crate::operation::OperationAction::Retry => {
735                if let Some(op) = operations.get_mut(&update.operation_id) {
736                    op.status = OperationStatus::Pending;
737                    if update.result.is_some() || update.step_options.is_some() {
738                        let step_details =
739                            op.step_details
740                                .get_or_insert(crate::operation::StepDetails {
741                                    result: None,
742                                    attempt: None,
743                                    next_attempt_timestamp: None,
744                                    error: None,
745                                    payload: None,
746                                });
747                        if update.result.is_some() {
748                            step_details.payload = update.result.clone();
749                        }
750                        step_details.attempt = Some(step_details.attempt.unwrap_or(0) + 1);
751                    }
752                    if update.error.is_some() {
753                        op.error = update.error.clone();
754                    }
755                } else {
756                    let mut op = Operation::new(&update.operation_id, update.operation_type);
757                    op.status = OperationStatus::Pending;
758                    op.parent_id = update.parent_id.clone();
759                    op.name = update.name.clone();
760                    op.error = update.error.clone();
761                    if update.result.is_some() || update.step_options.is_some() {
762                        op.step_details = Some(crate::operation::StepDetails {
763                            result: None,
764                            attempt: Some(1),
765                            next_attempt_timestamp: None,
766                            error: None,
767                            payload: update.result.clone(),
768                        });
769                    }
770                    operations.insert(update.operation_id.clone(), op);
771                }
772            }
773        }
774    }
775
776    /// Returns a reference to the shared checkpoint token.
777    pub fn shared_checkpoint_token(&self) -> Arc<RwLock<String>> {
778        self.checkpoint_token.clone()
779    }
780
781    /// Returns true if this ExecutionState has a checkpoint sender configured.
782    pub fn has_checkpoint_sender(&self) -> bool {
783        self.checkpoint_sender.is_some()
784    }
785
786    /// Loads child operations for a specific parent operation.
787    pub async fn load_child_operations(
788        &self,
789        parent_id: &str,
790    ) -> Result<Vec<Operation>, DurableError> {
791        let operations = self.operations.read().await;
792        let children: Vec<Operation> = operations
793            .values()
794            .filter(|op| op.parent_id.as_deref() == Some(parent_id))
795            .cloned()
796            .collect();
797
798        Ok(children)
799    }
800
801    /// Gets all child operations for a specific parent from the local cache.
802    pub async fn get_child_operations(&self, parent_id: &str) -> Vec<Operation> {
803        let operations = self.operations.read().await;
804        operations
805            .values()
806            .filter(|op| op.parent_id.as_deref() == Some(parent_id))
807            .cloned()
808            .collect()
809    }
810
811    /// Checks if a CONTEXT operation has ReplayChildren enabled.
812    pub async fn has_replay_children(&self, operation_id: &str) -> bool {
813        let operations = self.operations.read().await;
814        operations
815            .get(operation_id)
816            .filter(|op| op.operation_type == OperationType::Context)
817            .and_then(|op| op.context_details.as_ref())
818            .and_then(|details| details.replay_children)
819            .unwrap_or(false)
820    }
821}
822
823// Implement Debug manually since we can't derive it with async locks
824impl std::fmt::Debug for ExecutionState {
825    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
826        f.debug_struct("ExecutionState")
827            .field("durable_execution_arn", &self.durable_execution_arn)
828            .field("replay_status", &self.replay_status())
829            .finish_non_exhaustive()
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836    use std::sync::Arc;
837
838    use crate::client::{
839        CheckpointResponse, GetOperationsResponse, MockDurableServiceClient,
840        SharedDurableServiceClient,
841    };
842    use crate::error::ErrorObject;
843    use crate::lambda::InitialExecutionState;
844    use crate::operation::{
845        ContextDetails, ExecutionDetails, Operation, OperationStatus, OperationType,
846        OperationUpdate,
847    };
848
849    fn create_mock_client() -> SharedDurableServiceClient {
850        Arc::new(MockDurableServiceClient::new())
851    }
852
853    fn create_test_operation(id: &str, status: OperationStatus) -> Operation {
854        let mut op = Operation::new(id, OperationType::Step);
855        op.status = status;
856        op
857    }
858
859    fn create_execution_operation(input_payload: Option<&str>) -> Operation {
860        let mut op = Operation::new("exec-123", OperationType::Execution);
861        op.status = OperationStatus::Started;
862        op.execution_details = Some(ExecutionDetails {
863            input_payload: input_payload.map(|s| s.to_string()),
864        });
865        op
866    }
867
868    // Basic ExecutionState tests
869
870    #[tokio::test]
871    async fn test_execution_state_new_empty() {
872        let client = create_mock_client();
873        let state = ExecutionState::new(
874            "arn:test",
875            "token-123",
876            InitialExecutionState::new(),
877            client,
878        );
879
880        assert_eq!(state.durable_execution_arn(), "arn:test");
881        assert_eq!(state.checkpoint_token().await, "token-123");
882        assert!(state.is_new());
883        assert!(!state.is_replay());
884        assert_eq!(state.operation_count().await, 0);
885        assert!(!state.has_more_operations().await);
886    }
887
888    #[tokio::test]
889    async fn test_execution_state_new_with_operations() {
890        let client = create_mock_client();
891        let ops = vec![
892            create_test_operation("op-1", OperationStatus::Succeeded),
893            create_test_operation("op-2", OperationStatus::Succeeded),
894        ];
895        let initial_state = InitialExecutionState::with_operations(ops);
896
897        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
898
899        assert!(state.is_replay());
900        assert!(!state.is_new());
901        assert_eq!(state.operation_count().await, 2);
902    }
903
904    #[tokio::test]
905    async fn test_get_checkpoint_result_exists() {
906        let client = create_mock_client();
907        let mut op = create_test_operation("op-1", OperationStatus::Succeeded);
908        op.result = Some(r#"{"value": 42}"#.to_string());
909        let initial_state = InitialExecutionState::with_operations(vec![op]);
910
911        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
912
913        let result = state.get_checkpoint_result("op-1").await;
914        assert!(result.is_existent());
915        assert!(result.is_succeeded());
916        assert_eq!(result.result(), Some(r#"{"value": 42}"#));
917    }
918
919    #[tokio::test]
920    async fn test_get_checkpoint_result_not_exists() {
921        let client = create_mock_client();
922        let state = ExecutionState::new(
923            "arn:test",
924            "token-123",
925            InitialExecutionState::new(),
926            client,
927        );
928
929        let result = state.get_checkpoint_result("non-existent").await;
930        assert!(!result.is_existent());
931    }
932
933    #[tokio::test]
934    async fn test_track_replay_transitions_to_new() {
935        let client = create_mock_client();
936        let ops = vec![
937            create_test_operation("op-1", OperationStatus::Succeeded),
938            create_test_operation("op-2", OperationStatus::Succeeded),
939        ];
940        let initial_state = InitialExecutionState::with_operations(ops);
941
942        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
943
944        assert!(state.is_replay());
945        state.track_replay("op-1").await;
946        assert!(state.is_replay());
947        state.track_replay("op-2").await;
948        assert!(state.is_new());
949    }
950
951    #[tokio::test]
952    async fn test_track_replay_with_pagination() {
953        let client = Arc::new(
954            MockDurableServiceClient::new().with_get_operations_response(Ok(
955                GetOperationsResponse {
956                    operations: vec![create_test_operation("op-3", OperationStatus::Succeeded)],
957                    next_marker: None,
958                },
959            )),
960        );
961
962        let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
963        let mut initial_state = InitialExecutionState::with_operations(ops);
964        initial_state.next_marker = Some("marker-1".to_string());
965
966        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
967
968        state.track_replay("op-1").await;
969        assert!(state.is_replay());
970    }
971
972    #[tokio::test]
973    async fn test_set_checkpoint_token() {
974        let client = create_mock_client();
975        let state = ExecutionState::new(
976            "arn:test",
977            "token-123",
978            InitialExecutionState::new(),
979            client,
980        );
981
982        assert_eq!(state.checkpoint_token().await, "token-123");
983        state.set_checkpoint_token("token-456").await;
984        assert_eq!(state.checkpoint_token().await, "token-456");
985    }
986
987    #[tokio::test]
988    async fn test_add_operation() {
989        let client = create_mock_client();
990        let state = ExecutionState::new(
991            "arn:test",
992            "token-123",
993            InitialExecutionState::new(),
994            client,
995        );
996
997        assert!(!state.has_operation("op-1").await);
998
999        let op = create_test_operation("op-1", OperationStatus::Succeeded);
1000        state.add_operation(op).await;
1001
1002        assert!(state.has_operation("op-1").await);
1003        assert_eq!(state.operation_count().await, 1);
1004    }
1005
1006    #[tokio::test]
1007    async fn test_update_operation() {
1008        let client = create_mock_client();
1009        let ops = vec![create_test_operation("op-1", OperationStatus::Started)];
1010        let initial_state = InitialExecutionState::with_operations(ops);
1011
1012        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1013
1014        let op = state.get_operation("op-1").await.unwrap();
1015        assert_eq!(op.status, OperationStatus::Started);
1016
1017        state
1018            .update_operation("op-1", |op| {
1019                op.status = OperationStatus::Succeeded;
1020                op.result = Some("done".to_string());
1021            })
1022            .await;
1023
1024        let op = state.get_operation("op-1").await.unwrap();
1025        assert_eq!(op.status, OperationStatus::Succeeded);
1026        assert_eq!(op.result, Some("done".to_string()));
1027    }
1028
1029    #[tokio::test]
1030    async fn test_load_more_operations() {
1031        let client = Arc::new(
1032            MockDurableServiceClient::new().with_get_operations_response(Ok(
1033                GetOperationsResponse {
1034                    operations: vec![
1035                        create_test_operation("op-2", OperationStatus::Succeeded),
1036                        create_test_operation("op-3", OperationStatus::Succeeded),
1037                    ],
1038                    next_marker: None,
1039                },
1040            )),
1041        );
1042
1043        let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1044        let mut initial_state = InitialExecutionState::with_operations(ops);
1045        initial_state.next_marker = Some("marker-1".to_string());
1046
1047        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1048
1049        assert_eq!(state.operation_count().await, 1);
1050        assert!(state.has_more_operations().await);
1051
1052        let loaded = state.load_more_operations().await.unwrap();
1053        assert!(loaded);
1054
1055        assert_eq!(state.operation_count().await, 3);
1056        assert!(!state.has_more_operations().await);
1057    }
1058
1059    #[tokio::test]
1060    async fn test_load_more_operations_no_more() {
1061        let client = create_mock_client();
1062        let state = ExecutionState::new(
1063            "arn:test",
1064            "token-123",
1065            InitialExecutionState::new(),
1066            client,
1067        );
1068
1069        let loaded = state.load_more_operations().await.unwrap();
1070        assert!(!loaded);
1071    }
1072
1073    #[tokio::test]
1074    async fn test_load_all_operations() {
1075        let client = Arc::new(
1076            MockDurableServiceClient::new()
1077                .with_get_operations_response(Ok(GetOperationsResponse {
1078                    operations: vec![create_test_operation("op-2", OperationStatus::Succeeded)],
1079                    next_marker: Some("marker-2".to_string()),
1080                }))
1081                .with_get_operations_response(Ok(GetOperationsResponse {
1082                    operations: vec![create_test_operation("op-3", OperationStatus::Succeeded)],
1083                    next_marker: None,
1084                })),
1085        );
1086
1087        let ops = vec![create_test_operation("op-1", OperationStatus::Succeeded)];
1088        let mut initial_state = InitialExecutionState::with_operations(ops);
1089        initial_state.next_marker = Some("marker-1".to_string());
1090
1091        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1092
1093        assert_eq!(state.operation_count().await, 1);
1094        state.load_all_operations().await.unwrap();
1095
1096        assert_eq!(state.operation_count().await, 3);
1097        assert!(!state.has_more_operations().await);
1098    }
1099
1100    #[tokio::test]
1101    async fn test_mark_parent_done() {
1102        let client = create_mock_client();
1103        let state = ExecutionState::new(
1104            "arn:test",
1105            "token-123",
1106            InitialExecutionState::new(),
1107            client,
1108        );
1109
1110        assert!(!state.is_parent_done("parent-1").await);
1111        state.mark_parent_done("parent-1").await;
1112        assert!(state.is_parent_done("parent-1").await);
1113        assert!(!state.is_parent_done("parent-2").await);
1114    }
1115
1116    #[tokio::test]
1117    async fn test_is_orphaned() {
1118        let client = create_mock_client();
1119        let state = ExecutionState::new(
1120            "arn:test",
1121            "token-123",
1122            InitialExecutionState::new(),
1123            client,
1124        );
1125
1126        assert!(!state.is_orphaned(None).await);
1127        assert!(!state.is_orphaned(Some("parent-1")).await);
1128
1129        state.mark_parent_done("parent-1").await;
1130
1131        assert!(state.is_orphaned(Some("parent-1")).await);
1132        assert!(!state.is_orphaned(Some("parent-2")).await);
1133    }
1134
1135    #[tokio::test]
1136    async fn test_debug_impl() {
1137        let client = create_mock_client();
1138        let state = ExecutionState::new(
1139            "arn:test",
1140            "token-123",
1141            InitialExecutionState::new(),
1142            client,
1143        );
1144
1145        let debug_str = format!("{:?}", state);
1146        assert!(debug_str.contains("ExecutionState"));
1147        assert!(debug_str.contains("arn:test"));
1148    }
1149
1150    #[tokio::test]
1151    async fn test_create_checkpoint_direct() {
1152        let client = Arc::new(
1153            MockDurableServiceClient::new()
1154                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1155        );
1156
1157        let state = ExecutionState::new(
1158            "arn:test",
1159            "token-123",
1160            InitialExecutionState::new(),
1161            client,
1162        );
1163
1164        let update = OperationUpdate::start("op-1", OperationType::Step);
1165        let result = state.create_checkpoint(update, true).await;
1166
1167        assert!(result.is_ok());
1168        assert_eq!(state.checkpoint_token().await, "new-token");
1169        assert!(state.has_operation("op-1").await);
1170    }
1171
1172    #[tokio::test]
1173    async fn test_create_checkpoint_updates_local_cache_on_succeed() {
1174        let client = Arc::new(
1175            MockDurableServiceClient::new()
1176                .with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
1177                .with_checkpoint_response(Ok(CheckpointResponse::new("token-2"))),
1178        );
1179
1180        let state = ExecutionState::new(
1181            "arn:test",
1182            "token-123",
1183            InitialExecutionState::new(),
1184            client,
1185        );
1186
1187        let update = OperationUpdate::start("op-1", OperationType::Step);
1188        state.create_checkpoint(update, true).await.unwrap();
1189
1190        let op = state.get_operation("op-1").await.unwrap();
1191        assert_eq!(op.status, OperationStatus::Started);
1192
1193        let update = OperationUpdate::succeed(
1194            "op-1",
1195            OperationType::Step,
1196            Some(r#"{"result": "ok"}"#.to_string()),
1197        );
1198        state.create_checkpoint(update, true).await.unwrap();
1199
1200        let op = state.get_operation("op-1").await.unwrap();
1201        assert_eq!(op.status, OperationStatus::Succeeded);
1202        assert_eq!(op.result, Some(r#"{"result": "ok"}"#.to_string()));
1203    }
1204
1205    #[tokio::test]
1206    async fn test_create_checkpoint_rejects_orphaned_child() {
1207        let client = create_mock_client();
1208        let state = ExecutionState::new(
1209            "arn:test",
1210            "token-123",
1211            InitialExecutionState::new(),
1212            client,
1213        );
1214
1215        state.mark_parent_done("parent-1").await;
1216
1217        let update =
1218            OperationUpdate::start("child-1", OperationType::Step).with_parent_id("parent-1");
1219        let result = state.create_checkpoint(update, true).await;
1220
1221        assert!(result.is_err());
1222        match result.unwrap_err() {
1223            crate::error::DurableError::OrphanedChild { operation_id, .. } => {
1224                assert_eq!(operation_id, "child-1");
1225            }
1226            _ => panic!("Expected OrphanedChild error"),
1227        }
1228    }
1229
1230    #[tokio::test]
1231    async fn test_with_batcher_creates_state_and_batcher() {
1232        let client = Arc::new(
1233            MockDurableServiceClient::new()
1234                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1235        );
1236
1237        let (state, mut batcher) = ExecutionState::with_batcher(
1238            "arn:test",
1239            "token-123",
1240            InitialExecutionState::new(),
1241            client,
1242            CheckpointBatcherConfig {
1243                max_batch_time_ms: 10,
1244                ..Default::default()
1245            },
1246            100,
1247        );
1248
1249        assert!(state.has_checkpoint_sender());
1250        assert_eq!(state.durable_execution_arn(), "arn:test");
1251
1252        let batcher_handle = tokio::spawn(async move {
1253            batcher.run().await;
1254        });
1255
1256        let update = OperationUpdate::start("op-1", OperationType::Step);
1257        let result = state.create_checkpoint(update, true).await;
1258
1259        drop(state);
1260        batcher_handle.await.unwrap();
1261
1262        assert!(result.is_ok());
1263    }
1264
1265    #[tokio::test]
1266    async fn test_checkpoint_sync_convenience_method() {
1267        let client = Arc::new(
1268            MockDurableServiceClient::new()
1269                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1270        );
1271
1272        let state = ExecutionState::new(
1273            "arn:test",
1274            "token-123",
1275            InitialExecutionState::new(),
1276            client,
1277        );
1278
1279        let update = OperationUpdate::start("op-1", OperationType::Step);
1280        let result = state.checkpoint_sync(update).await;
1281
1282        assert!(result.is_ok());
1283    }
1284
1285    #[tokio::test]
1286    async fn test_checkpoint_async_convenience_method() {
1287        let client = Arc::new(
1288            MockDurableServiceClient::new()
1289                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1290        );
1291
1292        let state = ExecutionState::new(
1293            "arn:test",
1294            "token-123",
1295            InitialExecutionState::new(),
1296            client,
1297        );
1298
1299        let update = OperationUpdate::start("op-1", OperationType::Step);
1300        let result = state.checkpoint_async(update).await;
1301
1302        assert!(result.is_ok());
1303    }
1304
1305    #[tokio::test]
1306    async fn test_shared_checkpoint_token() {
1307        let client = create_mock_client();
1308        let state = ExecutionState::new(
1309            "arn:test",
1310            "token-123",
1311            InitialExecutionState::new(),
1312            client,
1313        );
1314
1315        let shared_token = state.shared_checkpoint_token();
1316        assert_eq!(*shared_token.read().await, "token-123");
1317
1318        {
1319            let mut guard = shared_token.write().await;
1320            *guard = "modified-token".to_string();
1321        }
1322
1323        assert_eq!(state.checkpoint_token().await, "modified-token");
1324    }
1325
1326    // Tests for ReplayChildren support (Requirements 10.5, 10.6)
1327
1328    #[tokio::test]
1329    async fn test_load_child_operations_returns_children() {
1330        let client = create_mock_client();
1331
1332        let mut parent_op = Operation::new("parent-ctx", OperationType::Context);
1333        parent_op.status = OperationStatus::Succeeded;
1334
1335        let mut child1 = create_test_operation("child-1", OperationStatus::Succeeded);
1336        child1.parent_id = Some("parent-ctx".to_string());
1337
1338        let mut child2 = create_test_operation("child-2", OperationStatus::Succeeded);
1339        child2.parent_id = Some("parent-ctx".to_string());
1340
1341        let mut other_child = create_test_operation("other-child", OperationStatus::Succeeded);
1342        other_child.parent_id = Some("other-parent".to_string());
1343
1344        let initial_state =
1345            InitialExecutionState::with_operations(vec![parent_op, child1, child2, other_child]);
1346
1347        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1348
1349        let children = state.load_child_operations("parent-ctx").await.unwrap();
1350
1351        assert_eq!(children.len(), 2);
1352        let child_ids: Vec<&str> = children.iter().map(|c| c.operation_id.as_str()).collect();
1353        assert!(child_ids.contains(&"child-1"));
1354        assert!(child_ids.contains(&"child-2"));
1355    }
1356
1357    #[tokio::test]
1358    async fn test_load_child_operations_no_children() {
1359        let client = create_mock_client();
1360
1361        let parent_op = Operation::new("parent-ctx", OperationType::Context);
1362        let initial_state = InitialExecutionState::with_operations(vec![parent_op]);
1363
1364        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1365
1366        let children = state.load_child_operations("parent-ctx").await.unwrap();
1367
1368        assert!(children.is_empty());
1369    }
1370
1371    #[tokio::test]
1372    async fn test_get_child_operations_returns_cached_children() {
1373        let client = create_mock_client();
1374
1375        let mut parent_op = Operation::new("parent-ctx", OperationType::Context);
1376        parent_op.status = OperationStatus::Succeeded;
1377
1378        let mut child1 = create_test_operation("child-1", OperationStatus::Succeeded);
1379        child1.parent_id = Some("parent-ctx".to_string());
1380
1381        let mut child2 = create_test_operation("child-2", OperationStatus::Succeeded);
1382        child2.parent_id = Some("parent-ctx".to_string());
1383
1384        let initial_state = InitialExecutionState::with_operations(vec![parent_op, child1, child2]);
1385
1386        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1387
1388        let children = state.get_child_operations("parent-ctx").await;
1389
1390        assert_eq!(children.len(), 2);
1391    }
1392
1393    #[tokio::test]
1394    async fn test_get_child_operations_empty_for_nonexistent_parent() {
1395        let client = create_mock_client();
1396        let state = ExecutionState::new(
1397            "arn:test",
1398            "token-123",
1399            InitialExecutionState::new(),
1400            client,
1401        );
1402
1403        let children = state.get_child_operations("nonexistent-parent").await;
1404        assert!(children.is_empty());
1405    }
1406
1407    #[tokio::test]
1408    async fn test_has_replay_children_true() {
1409        let client = create_mock_client();
1410
1411        let mut ctx_op = Operation::new("ctx-with-replay", OperationType::Context);
1412        ctx_op.status = OperationStatus::Succeeded;
1413        ctx_op.context_details = Some(ContextDetails {
1414            result: None,
1415            replay_children: Some(true),
1416            error: None,
1417        });
1418
1419        let initial_state = InitialExecutionState::with_operations(vec![ctx_op]);
1420        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1421
1422        assert!(state.has_replay_children("ctx-with-replay").await);
1423    }
1424
1425    #[tokio::test]
1426    async fn test_has_replay_children_false_when_not_set() {
1427        let client = create_mock_client();
1428
1429        let mut ctx_op = Operation::new("ctx-no-replay", OperationType::Context);
1430        ctx_op.status = OperationStatus::Succeeded;
1431        ctx_op.context_details = Some(ContextDetails {
1432            result: None,
1433            replay_children: None,
1434            error: None,
1435        });
1436
1437        let initial_state = InitialExecutionState::with_operations(vec![ctx_op]);
1438        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1439
1440        assert!(!state.has_replay_children("ctx-no-replay").await);
1441    }
1442
1443    // Tests for CheckpointingMode (Requirements 24.1, 24.2, 24.3, 24.4)
1444
1445    #[tokio::test]
1446    async fn test_checkpointing_mode_default_is_batched() {
1447        let client = create_mock_client();
1448        let state = ExecutionState::new(
1449            "arn:test",
1450            "token-123",
1451            InitialExecutionState::new(),
1452            client,
1453        );
1454
1455        assert_eq!(
1456            state.checkpointing_mode(),
1457            crate::config::CheckpointingMode::Batched
1458        );
1459        assert!(state.is_batched_checkpointing());
1460        assert!(!state.is_eager_checkpointing());
1461        assert!(!state.is_optimistic_checkpointing());
1462    }
1463
1464    #[tokio::test]
1465    async fn test_checkpointing_mode_eager() {
1466        let client = create_mock_client();
1467        let state = ExecutionState::with_checkpointing_mode(
1468            "arn:test",
1469            "token-123",
1470            InitialExecutionState::new(),
1471            client,
1472            crate::config::CheckpointingMode::Eager,
1473        );
1474
1475        assert_eq!(
1476            state.checkpointing_mode(),
1477            crate::config::CheckpointingMode::Eager
1478        );
1479        assert!(state.is_eager_checkpointing());
1480        assert!(!state.is_batched_checkpointing());
1481        assert!(!state.is_optimistic_checkpointing());
1482        assert!(state.prioritizes_durability());
1483        assert!(!state.prioritizes_performance());
1484        assert!(!state.should_use_async_checkpoint());
1485    }
1486
1487    #[tokio::test]
1488    async fn test_checkpointing_mode_optimistic() {
1489        let client = create_mock_client();
1490        let state = ExecutionState::with_checkpointing_mode(
1491            "arn:test",
1492            "token-123",
1493            InitialExecutionState::new(),
1494            client,
1495            crate::config::CheckpointingMode::Optimistic,
1496        );
1497
1498        assert_eq!(
1499            state.checkpointing_mode(),
1500            crate::config::CheckpointingMode::Optimistic
1501        );
1502        assert!(state.is_optimistic_checkpointing());
1503        assert!(!state.is_eager_checkpointing());
1504        assert!(!state.is_batched_checkpointing());
1505        assert!(state.prioritizes_performance());
1506        assert!(!state.prioritizes_durability());
1507        assert!(state.should_use_async_checkpoint());
1508    }
1509
1510    #[tokio::test]
1511    async fn test_checkpointing_mode_batched_helpers() {
1512        let client = create_mock_client();
1513        let state = ExecutionState::with_checkpointing_mode(
1514            "arn:test",
1515            "token-123",
1516            InitialExecutionState::new(),
1517            client,
1518            crate::config::CheckpointingMode::Batched,
1519        );
1520
1521        assert!(!state.prioritizes_durability());
1522        assert!(!state.prioritizes_performance());
1523        assert!(state.should_use_async_checkpoint());
1524    }
1525
1526    #[tokio::test]
1527    async fn test_checkpoint_optimal_eager_mode() {
1528        let client = Arc::new(
1529            MockDurableServiceClient::new()
1530                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1531        );
1532
1533        let state = ExecutionState::with_checkpointing_mode(
1534            "arn:test",
1535            "token-123",
1536            InitialExecutionState::new(),
1537            client,
1538            crate::config::CheckpointingMode::Eager,
1539        );
1540
1541        let update = OperationUpdate::start("op-1", OperationType::Step);
1542        let result = state.checkpoint_optimal(update, false).await;
1543
1544        assert!(result.is_ok());
1545    }
1546
1547    #[tokio::test]
1548    async fn test_eager_mode_bypasses_batcher() {
1549        let client = Arc::new(
1550            MockDurableServiceClient::new()
1551                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1552        );
1553
1554        let (state, mut batcher) = ExecutionState::with_batcher_and_mode(
1555            "arn:test",
1556            "token-123",
1557            InitialExecutionState::new(),
1558            client,
1559            CheckpointBatcherConfig {
1560                max_batch_time_ms: 10,
1561                ..Default::default()
1562            },
1563            100,
1564            crate::config::CheckpointingMode::Eager,
1565        );
1566
1567        let batcher_handle = tokio::spawn(async move {
1568            batcher.run().await;
1569        });
1570
1571        let update = OperationUpdate::start("op-1", OperationType::Step);
1572        let result = state.create_checkpoint(update, false).await;
1573
1574        drop(state);
1575        batcher_handle.await.unwrap();
1576
1577        assert!(result.is_ok());
1578    }
1579
1580    // Tests for EXECUTION operation handling (Requirements 19.1-19.5)
1581
1582    #[tokio::test]
1583    async fn test_execution_operation_recognized() {
1584        let client = create_mock_client();
1585        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1586        let step_op = Operation::new("step-1", OperationType::Step);
1587
1588        let initial_state = InitialExecutionState::with_operations(vec![exec_op, step_op]);
1589        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1590
1591        assert!(state.execution_operation().is_some());
1592        let exec = state.execution_operation().unwrap();
1593        assert_eq!(exec.operation_type, OperationType::Execution);
1594        assert_eq!(exec.operation_id, "exec-123");
1595    }
1596
1597    #[tokio::test]
1598    async fn test_execution_operation_not_present() {
1599        let client = create_mock_client();
1600        let step_op = Operation::new("step-1", OperationType::Step);
1601
1602        let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1603        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1604
1605        assert!(state.execution_operation().is_none());
1606    }
1607
1608    #[tokio::test]
1609    async fn test_get_original_input_raw() {
1610        let client = create_mock_client();
1611        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1612
1613        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1614        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1615
1616        let input = state.get_original_input_raw();
1617        assert!(input.is_some());
1618        assert_eq!(input.unwrap(), r#"{"order_id": "123"}"#);
1619    }
1620
1621    #[tokio::test]
1622    async fn test_get_original_input_raw_no_payload() {
1623        let client = create_mock_client();
1624        let exec_op = create_execution_operation(None);
1625
1626        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1627        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1628
1629        let input = state.get_original_input_raw();
1630        assert!(input.is_none());
1631    }
1632
1633    #[tokio::test]
1634    async fn test_get_original_input_raw_no_execution_operation() {
1635        let client = create_mock_client();
1636        let step_op = Operation::new("step-1", OperationType::Step);
1637
1638        let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1639        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1640
1641        let input = state.get_original_input_raw();
1642        assert!(input.is_none());
1643    }
1644
1645    #[tokio::test]
1646    async fn test_execution_operation_id() {
1647        let client = create_mock_client();
1648        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1649
1650        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1651        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1652
1653        let id = state.execution_operation_id();
1654        assert!(id.is_some());
1655        assert_eq!(id.unwrap(), "exec-123");
1656    }
1657
1658    #[tokio::test]
1659    async fn test_complete_execution_success() {
1660        let client = Arc::new(
1661            MockDurableServiceClient::new()
1662                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1663        );
1664
1665        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1666        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1667        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1668
1669        let result = state
1670            .complete_execution_success(Some(r#"{"status": "completed"}"#.to_string()))
1671            .await;
1672        assert!(result.is_ok());
1673
1674        let op = state.get_operation("exec-123").await.unwrap();
1675        assert_eq!(op.status, OperationStatus::Succeeded);
1676        assert_eq!(op.result, Some(r#"{"status": "completed"}"#.to_string()));
1677    }
1678
1679    #[tokio::test]
1680    async fn test_complete_execution_success_no_execution_operation() {
1681        let client = create_mock_client();
1682        let step_op = Operation::new("step-1", OperationType::Step);
1683
1684        let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1685        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1686
1687        let result = state
1688            .complete_execution_success(Some("result".to_string()))
1689            .await;
1690        assert!(result.is_err());
1691        match result.unwrap_err() {
1692            crate::error::DurableError::Validation { message } => {
1693                assert!(message.contains("no EXECUTION operation"));
1694            }
1695            _ => panic!("Expected Validation error"),
1696        }
1697    }
1698
1699    #[tokio::test]
1700    async fn test_complete_execution_failure() {
1701        let client = Arc::new(
1702            MockDurableServiceClient::new()
1703                .with_checkpoint_response(Ok(CheckpointResponse::new("new-token"))),
1704        );
1705
1706        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1707        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1708        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1709
1710        let error = ErrorObject::new("ProcessingError", "Order processing failed");
1711        let result = state.complete_execution_failure(error).await;
1712        assert!(result.is_ok());
1713
1714        let op = state.get_operation("exec-123").await.unwrap();
1715        assert_eq!(op.status, OperationStatus::Failed);
1716        assert!(op.error.is_some());
1717        assert_eq!(op.error.as_ref().unwrap().error_type, "ProcessingError");
1718    }
1719
1720    #[tokio::test]
1721    async fn test_complete_execution_failure_no_execution_operation() {
1722        let client = create_mock_client();
1723        let step_op = Operation::new("step-1", OperationType::Step);
1724
1725        let initial_state = InitialExecutionState::with_operations(vec![step_op]);
1726        let state = ExecutionState::new("arn:test", "token-123", initial_state, client);
1727
1728        let error = ErrorObject::new("TestError", "Test message");
1729        let result = state.complete_execution_failure(error).await;
1730        assert!(result.is_err());
1731        match result.unwrap_err() {
1732            crate::error::DurableError::Validation { message } => {
1733                assert!(message.contains("no EXECUTION operation"));
1734            }
1735            _ => panic!("Expected Validation error"),
1736        }
1737    }
1738
1739    #[tokio::test]
1740    async fn test_with_batcher_recognizes_execution_operation() {
1741        let client = Arc::new(MockDurableServiceClient::new());
1742        let exec_op = create_execution_operation(Some(r#"{"order_id": "123"}"#));
1743
1744        let initial_state = InitialExecutionState::with_operations(vec![exec_op]);
1745        let (state, _batcher) = ExecutionState::with_batcher(
1746            "arn:test",
1747            "token-123",
1748            initial_state,
1749            client,
1750            CheckpointBatcherConfig::default(),
1751            100,
1752        );
1753
1754        assert!(state.execution_operation().is_some());
1755        let exec = state.execution_operation().unwrap();
1756        assert_eq!(exec.operation_type, OperationType::Execution);
1757        assert_eq!(
1758            state.get_original_input_raw(),
1759            Some(r#"{"order_id": "123"}"#)
1760        );
1761    }
1762
1763    // Property-based tests for orphan prevention
1764
1765    mod property_tests {
1766        use super::*;
1767        use proptest::prelude::*;
1768
1769        proptest! {
1770            #![proptest_config(ProptestConfig::with_cases(100))]
1771
1772            #[test]
1773            fn prop_orphaned_child_checkpoint_fails(
1774                parent_id in "[a-z]{5,10}",
1775                child_id in "[a-z]{5,10}",
1776            ) {
1777                let rt = tokio::runtime::Runtime::new().unwrap();
1778                let result: Result<(), TestCaseError> = rt.block_on(async {
1779                    let client = Arc::new(
1780                        MockDurableServiceClient::new()
1781                            .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1782                    );
1783
1784                    let state = ExecutionState::new(
1785                        "arn:test",
1786                        "token-123",
1787                        InitialExecutionState::new(),
1788                        client,
1789                    );
1790
1791                    state.mark_parent_done(&parent_id).await;
1792
1793                    let update = OperationUpdate::start(&child_id, OperationType::Step)
1794                        .with_parent_id(&parent_id);
1795                    let checkpoint_result = state.create_checkpoint(update, true).await;
1796
1797                    match checkpoint_result {
1798                        Err(DurableError::OrphanedChild { operation_id, .. }) => {
1799                            if operation_id != child_id {
1800                                return Err(TestCaseError::fail(format!(
1801                                    "Expected operation_id '{}' in OrphanedChild error, got '{}'",
1802                                    child_id, operation_id
1803                                )));
1804                            }
1805                        }
1806                        Ok(_) => {
1807                            return Err(TestCaseError::fail(
1808                                "Expected OrphanedChild error, but checkpoint succeeded"
1809                            ));
1810                        }
1811                        Err(other) => {
1812                            return Err(TestCaseError::fail(format!(
1813                                "Expected OrphanedChild error, got {:?}",
1814                                other
1815                            )));
1816                        }
1817                    }
1818
1819                    Ok(())
1820                });
1821                result?;
1822            }
1823
1824            #[test]
1825            fn prop_non_orphaned_child_checkpoint_succeeds(
1826                parent_id in "[a-z]{5,10}",
1827                child_id in "[a-z]{5,10}",
1828            ) {
1829                let rt = tokio::runtime::Runtime::new().unwrap();
1830                let result: Result<(), TestCaseError> = rt.block_on(async {
1831                    let client = Arc::new(
1832                        MockDurableServiceClient::new()
1833                            .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1834                    );
1835
1836                    let state = ExecutionState::new(
1837                        "arn:test",
1838                        "token-123",
1839                        InitialExecutionState::new(),
1840                        client,
1841                    );
1842
1843                    let update = OperationUpdate::start(&child_id, OperationType::Step)
1844                        .with_parent_id(&parent_id);
1845                    let checkpoint_result = state.create_checkpoint(update, true).await;
1846
1847                    if let Err(e) = checkpoint_result {
1848                        return Err(TestCaseError::fail(format!(
1849                            "Expected checkpoint to succeed for non-orphaned child, got error: {:?}",
1850                            e
1851                        )));
1852                    }
1853
1854                    Ok(())
1855                });
1856                result?;
1857            }
1858
1859            #[test]
1860            fn prop_root_operation_never_orphaned(
1861                operation_id in "[a-z]{5,10}",
1862            ) {
1863                let rt = tokio::runtime::Runtime::new().unwrap();
1864                let result: Result<(), TestCaseError> = rt.block_on(async {
1865                    let client = Arc::new(
1866                        MockDurableServiceClient::new()
1867                            .with_checkpoint_response(Ok(CheckpointResponse::new("new-token")))
1868                    );
1869
1870                    let state = ExecutionState::new(
1871                        "arn:test",
1872                        "token-123",
1873                        InitialExecutionState::new(),
1874                        client,
1875                    );
1876
1877                    let update = OperationUpdate::start(&operation_id, OperationType::Step);
1878                    let checkpoint_result = state.create_checkpoint(update, true).await;
1879
1880                    if let Err(e) = checkpoint_result {
1881                        return Err(TestCaseError::fail(format!(
1882                            "Expected checkpoint to succeed for root operation, got error: {:?}",
1883                            e
1884                        )));
1885                    }
1886
1887                    Ok(())
1888                });
1889                result?;
1890            }
1891
1892            #[test]
1893            fn prop_marking_parent_done_affects_future_checkpoints(
1894                parent_id in "[a-z]{5,10}",
1895                child_id_before in "[a-z]{5,10}",
1896                child_id_after in "[a-z]{5,10}",
1897            ) {
1898                let rt = tokio::runtime::Runtime::new().unwrap();
1899                let result: Result<(), TestCaseError> = rt.block_on(async {
1900                    let client = Arc::new(
1901                        MockDurableServiceClient::new()
1902                            .with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
1903                            .with_checkpoint_response(Ok(CheckpointResponse::new("token-2")))
1904                    );
1905
1906                    let state = ExecutionState::new(
1907                        "arn:test",
1908                        "token-123",
1909                        InitialExecutionState::new(),
1910                        client,
1911                    );
1912
1913                    let update_before = OperationUpdate::start(&child_id_before, OperationType::Step)
1914                        .with_parent_id(&parent_id);
1915                    let result_before = state.create_checkpoint(update_before, true).await;
1916
1917                    if let Err(e) = result_before {
1918                        return Err(TestCaseError::fail(format!(
1919                            "Expected first checkpoint to succeed, got error: {:?}",
1920                            e
1921                        )));
1922                    }
1923
1924                    state.mark_parent_done(&parent_id).await;
1925
1926                    let update_after = OperationUpdate::start(&child_id_after, OperationType::Step)
1927                        .with_parent_id(&parent_id);
1928                    let result_after = state.create_checkpoint(update_after, true).await;
1929
1930                    match result_after {
1931                        Err(DurableError::OrphanedChild { .. }) => {
1932                            // Expected
1933                        }
1934                        Ok(_) => {
1935                            return Err(TestCaseError::fail(
1936                                "Expected OrphanedChild error after marking parent done"
1937                            ));
1938                        }
1939                        Err(other) => {
1940                            return Err(TestCaseError::fail(format!(
1941                                "Expected OrphanedChild error, got {:?}",
1942                                other
1943                            )));
1944                        }
1945                    }
1946
1947                    Ok(())
1948                });
1949                result?;
1950            }
1951        }
1952    }
1953}