Skip to main content

durable_execution_sdk_testing/checkpoint_server/
orchestrator.rs

1//! Test execution orchestrator for managing the full execution lifecycle.
2//!
3//! This module implements the TestExecutionOrchestrator which orchestrates
4//! the full test execution lifecycle, including polling for checkpoint updates,
5//! processing operations, and scheduling handler re-invocations.
6//!
7//! This matches the Node.js SDK's `TestExecutionOrchestrator` pattern.
8//!
9//! # Requirements
10//!
11//! - 16.1: WHEN a wait operation is encountered, THE Test_Execution_Orchestrator
12//!   SHALL track the wait's scheduled end timestamp
13//! - 16.2: WHEN time skipping is enabled and a wait's scheduled end time is reached,
14//!   THE Test_Execution_Orchestrator SHALL mark the wait as SUCCEEDED and schedule
15//!   handler re-invocation
16//! - 16.3: WHEN time skipping is enabled, THE Test_Execution_Orchestrator SHALL use
17//!   tokio::time::advance() to skip wait durations instantly
18//! - 16.4: WHEN a handler invocation returns PENDING status, THE Test_Execution_Orchestrator
19//!   SHALL continue polling for operation updates and re-invoke the handler when
20//!   operations complete
21//! - 16.5: WHEN a handler invocation returns SUCCEEDED or FAILED status,
22//!   THE Test_Execution_Orchestrator SHALL resolve the execution and stop polling
23//! - 16.6: WHEN multiple operations are pending (waits, callbacks, steps with retries),
24//!   THE Test_Execution_Orchestrator SHALL process them in scheduled order
25
26use std::collections::HashSet;
27use std::future::Future;
28use std::pin::Pin;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::Arc;
31
32use serde::de::DeserializeOwned;
33use serde::Serialize;
34use tokio::sync::{Mutex, RwLock};
35
36use durable_execution_sdk::{
37    DurableContext, DurableError, DurableServiceClient, Operation, OperationStatus, OperationType,
38};
39
40use super::scheduler::{QueueScheduler, Scheduler};
41use super::types::ExecutionId;
42use super::worker_manager::CheckpointWorkerManager;
43use crate::operation::CallbackSender;
44use crate::operation_handle::{OperationHandle, OperationMatcher};
45use crate::types::{ExecutionStatus, Invocation, TestResultError};
46
47/// Configuration for time skipping behavior.
48#[derive(Debug, Clone, Default)]
49pub struct SkipTimeConfig {
50    /// Whether time skipping is enabled.
51    pub enabled: bool,
52}
53
54/// Result of a test execution.
55#[derive(Debug)]
56pub struct TestExecutionResult<T> {
57    /// The execution status
58    pub status: ExecutionStatus,
59    /// The result value (if succeeded)
60    pub result: Option<T>,
61    /// The error (if failed)
62    pub error: Option<TestResultError>,
63    /// All operations from the execution
64    pub operations: Vec<Operation>,
65    /// Handler invocations
66    pub invocations: Vec<Invocation>,
67    /// The execution ID
68    pub execution_id: String,
69}
70
71impl<T> TestExecutionResult<T> {
72    /// Create a successful result.
73    pub fn success(result: T, operations: Vec<Operation>, execution_id: String) -> Self {
74        Self {
75            status: ExecutionStatus::Succeeded,
76            result: Some(result),
77            error: None,
78            operations,
79            invocations: Vec::new(),
80            execution_id,
81        }
82    }
83
84    /// Create a failed result.
85    pub fn failure(
86        error: TestResultError,
87        operations: Vec<Operation>,
88        execution_id: String,
89    ) -> Self {
90        Self {
91            status: ExecutionStatus::Failed,
92            result: None,
93            error: Some(error),
94            operations,
95            invocations: Vec::new(),
96            execution_id,
97        }
98    }
99
100    /// Create a running/pending result.
101    pub fn running(operations: Vec<Operation>, execution_id: String) -> Self {
102        Self {
103            status: ExecutionStatus::Running,
104            result: None,
105            error: None,
106            operations,
107            invocations: Vec::new(),
108            execution_id,
109        }
110    }
111
112    /// Add an invocation to the result.
113    pub fn with_invocation(mut self, invocation: Invocation) -> Self {
114        self.invocations.push(invocation);
115        self
116    }
117}
118
119/// Internal storage for operations during test execution.
120#[derive(Debug, Default)]
121pub struct OperationStorage {
122    /// All operations in execution order
123    operations: Vec<Operation>,
124    /// Map from operation ID to index in operations vec
125    operations_by_id: std::collections::HashMap<String, usize>,
126    /// Map from operation name to indices in operations vec
127    operations_by_name: std::collections::HashMap<String, Vec<usize>>,
128}
129
130impl OperationStorage {
131    /// Create a new operation storage.
132    pub fn new() -> Self {
133        Self::default()
134    }
135
136    /// Add an operation to storage.
137    pub fn add_operation(&mut self, operation: Operation) {
138        let index = self.operations.len();
139        let id = operation.operation_id.clone();
140        let name = operation.name.clone();
141
142        self.operations.push(operation);
143        self.operations_by_id.insert(id, index);
144
145        if let Some(name) = name {
146            self.operations_by_name.entry(name).or_default().push(index);
147        }
148    }
149
150    /// Update an existing operation or add if not exists.
151    pub fn update_operation(&mut self, operation: Operation) {
152        let id = operation.operation_id.clone();
153        if let Some(&index) = self.operations_by_id.get(&id) {
154            self.operations[index] = operation;
155        } else {
156            self.add_operation(operation);
157        }
158    }
159
160    /// Get an operation by ID.
161    pub fn get_by_id(&self, id: &str) -> Option<&Operation> {
162        self.operations_by_id
163            .get(id)
164            .and_then(|&idx| self.operations.get(idx))
165    }
166
167    /// Get all operations.
168    pub fn get_all(&self) -> &[Operation] {
169        &self.operations
170    }
171
172    /// Clear all operations.
173    pub fn clear(&mut self) {
174        self.operations.clear();
175        self.operations_by_id.clear();
176        self.operations_by_name.clear();
177    }
178
179    /// Get the number of operations.
180    pub fn len(&self) -> usize {
181        self.operations.len()
182    }
183
184    /// Check if storage is empty.
185    pub fn is_empty(&self) -> bool {
186        self.operations.is_empty()
187    }
188}
189
190/// Type alias for a boxed handler function.
191pub type BoxedHandler<I, O> = Box<
192    dyn Fn(I, DurableContext) -> Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
193        + Send
194        + Sync,
195>;
196
197/// Orchestrates test execution lifecycle, polling, and handler invocation.
198///
199/// This struct manages the full execution lifecycle including:
200/// - Starting executions via checkpoint API
201/// - Polling for checkpoint updates
202/// - Processing operation updates (waits, callbacks, retries)
203/// - Scheduling handler re-invocations
204/// - Resolving execution when complete
205pub struct TestExecutionOrchestrator<I, O>
206where
207    I: DeserializeOwned + Send + Serialize + 'static,
208    O: Serialize + DeserializeOwned + Send + 'static,
209{
210    /// The handler function to execute
211    handler: BoxedHandler<I, O>,
212    /// Storage for operations
213    operation_storage: Arc<RwLock<OperationStorage>>,
214    /// The checkpoint API client
215    checkpoint_api: Arc<CheckpointWorkerManager>,
216    /// Time skipping configuration
217    skip_time_config: SkipTimeConfig,
218    /// The scheduler for handler invocations
219    scheduler: Box<dyn Scheduler>,
220    /// Set of pending operation IDs
221    pending_operations: HashSet<String>,
222    /// Flag indicating if an invocation is active
223    invocation_active: Arc<AtomicBool>,
224    /// Current execution ID
225    execution_id: Option<ExecutionId>,
226    /// Current checkpoint token
227    checkpoint_token: Option<String>,
228    /// Flag indicating if execution is complete
229    execution_complete: Arc<AtomicBool>,
230    /// The final result (if any)
231    final_result: Arc<Mutex<Option<Result<O, DurableError>>>>,
232    /// Pre-registered operation handles for lazy population during execution
233    registered_handles: Vec<OperationHandle>,
234    /// Shared operations list for child operation enumeration across handles
235    shared_operations: Arc<RwLock<Vec<Operation>>>,
236    /// Callback sender for populating callback handles
237    callback_sender: Option<Arc<dyn CallbackSender>>,
238}
239
240impl<I, O> TestExecutionOrchestrator<I, O>
241where
242    I: DeserializeOwned + Send + Serialize + Clone + 'static,
243    O: Serialize + DeserializeOwned + Send + 'static,
244{
245    /// Create a new orchestrator.
246    ///
247    /// # Arguments
248    ///
249    /// * `handler` - The handler function to execute
250    /// * `operation_storage` - Shared storage for operations
251    /// * `checkpoint_api` - The checkpoint API client
252    /// * `skip_time_config` - Configuration for time skipping
253    pub fn new<F, Fut>(
254        handler: F,
255        operation_storage: Arc<RwLock<OperationStorage>>,
256        checkpoint_api: Arc<CheckpointWorkerManager>,
257        skip_time_config: SkipTimeConfig,
258    ) -> Self
259    where
260        F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
261        Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
262    {
263        let boxed_handler = Box::new(move |input: I, ctx: DurableContext| {
264            let fut = handler(input, ctx);
265            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
266        });
267
268        // Use QueueScheduler for time-skipping mode (FIFO order)
269        let scheduler: Box<dyn Scheduler> = Box::new(QueueScheduler::new());
270
271        Self {
272            handler: boxed_handler,
273            operation_storage,
274            checkpoint_api,
275            skip_time_config,
276            scheduler,
277            pending_operations: HashSet::new(),
278            invocation_active: Arc::new(AtomicBool::new(false)),
279            execution_id: None,
280            checkpoint_token: None,
281            execution_complete: Arc::new(AtomicBool::new(false)),
282            final_result: Arc::new(Mutex::new(None)),
283            registered_handles: Vec::new(),
284            shared_operations: Arc::new(RwLock::new(Vec::new())),
285            callback_sender: None,
286        }
287    }
288
289    /// Set the pre-registered operation handles for lazy population during execution.
290    ///
291    /// # Arguments
292    ///
293    /// * `handles` - The pre-registered operation handles
294    /// * `shared_operations` - Shared operations list for child enumeration
295    /// * `callback_sender` - Callback sender for populating callback handles
296    pub fn with_handles(
297        mut self,
298        handles: Vec<OperationHandle>,
299        shared_operations: Arc<RwLock<Vec<Operation>>>,
300        callback_sender: Option<Arc<dyn CallbackSender>>,
301    ) -> Self {
302        // Set the callback_sender on each handle so they can send callback responses.
303        // Because callback_sender is behind Arc<RwLock<>>, this update is visible
304        // to all clones of the handle (including the one the test code holds).
305        if let Some(ref sender) = callback_sender {
306            for handle in &handles {
307                let sender_clone = Arc::clone(sender);
308                if let Ok(mut guard) = handle.callback_sender.try_write() {
309                    *guard = Some(sender_clone);
310                }
311            }
312        }
313        self.registered_handles = handles;
314        self.shared_operations = shared_operations;
315        self.callback_sender = callback_sender;
316        self
317    }
318
319    /// Check if time skipping is enabled.
320    pub fn is_time_skipping_enabled(&self) -> bool {
321        self.skip_time_config.enabled
322    }
323
324    /// Get the current execution ID.
325    pub fn execution_id(&self) -> Option<&str> {
326        self.execution_id.as_deref()
327    }
328
329    /// Get the current checkpoint token.
330    pub fn checkpoint_token(&self) -> Option<&str> {
331        self.checkpoint_token.as_deref()
332    }
333
334    /// Check if execution is complete.
335    pub fn is_execution_complete(&self) -> bool {
336        self.execution_complete.load(Ordering::SeqCst)
337    }
338
339    /// Check if an invocation is currently active.
340    pub fn is_invocation_active(&self) -> bool {
341        self.invocation_active.load(Ordering::SeqCst)
342    }
343}
344
345impl<I, O> TestExecutionOrchestrator<I, O>
346where
347    I: DeserializeOwned + Send + Serialize + Clone + 'static,
348    O: Serialize + DeserializeOwned + Send + 'static,
349{
350    /// Execute the handler and return the result.
351    ///
352    /// This method orchestrates the full execution lifecycle:
353    /// 1. Start execution via checkpoint API
354    /// 2. Begin polling for checkpoint updates
355    /// 3. Invoke handler
356    /// 4. Process operation updates (waits, callbacks, retries)
357    /// 5. Schedule re-invocations as needed
358    /// 6. Return result when execution completes
359    ///
360    /// # Arguments
361    ///
362    /// * `payload` - The input payload to pass to the handler
363    ///
364    /// # Requirements
365    ///
366    /// - 16.4: WHEN a handler invocation returns PENDING status,
367    ///   THE Test_Execution_Orchestrator SHALL continue polling for operation
368    ///   updates and re-invoke the handler when operations complete
369    /// - 16.5: WHEN a handler invocation returns SUCCEEDED or FAILED status,
370    ///   THE Test_Execution_Orchestrator SHALL resolve the execution and stop polling
371    /// - 18.1: WHEN an execution starts, THE Test_Execution_Orchestrator SHALL
372    ///   begin polling for checkpoint data
373    pub async fn execute_handler(
374        &mut self,
375        payload: I,
376    ) -> Result<TestExecutionResult<O>, crate::error::TestError> {
377        use super::types::{ApiType, StartDurableExecutionRequest};
378        use durable_execution_sdk::lambda::InitialExecutionState;
379        use durable_execution_sdk::state::ExecutionState;
380
381        // Clear previous state
382        self.operation_storage.write().await.clear();
383        self.pending_operations.clear();
384        self.execution_complete.store(false, Ordering::SeqCst);
385        *self.final_result.lock().await = None;
386
387        // Serialize the payload for the checkpoint server
388        let payload_json = serde_json::to_string(&payload)?;
389
390        // Start execution with the checkpoint server
391        let invocation_id = uuid::Uuid::new_v4().to_string();
392        let start_request = StartDurableExecutionRequest {
393            invocation_id: invocation_id.clone(),
394            payload: Some(payload_json),
395        };
396        let start_payload = serde_json::to_string(&start_request)?;
397
398        let start_response = self
399            .checkpoint_api
400            .send_api_request(ApiType::StartDurableExecution, start_payload)
401            .await?;
402
403        if let Some(error) = start_response.error {
404            return Err(crate::error::TestError::CheckpointServerError(error));
405        }
406
407        let invocation_result: super::InvocationResult =
408            serde_json::from_str(&start_response.payload.ok_or_else(|| {
409                crate::error::TestError::CheckpointServerError(
410                    "Empty response from checkpoint server".to_string(),
411                )
412            })?)?;
413
414        self.execution_id = Some(invocation_result.execution_id.clone());
415        self.checkpoint_token = Some(invocation_result.checkpoint_token.clone());
416
417        let execution_arn = invocation_result.execution_id.clone();
418        let checkpoint_token = invocation_result.checkpoint_token.clone();
419
420        // Create initial execution state
421        let initial_state = InitialExecutionState::new();
422
423        // Create the execution state with the checkpoint worker manager
424        let execution_state = Arc::new(ExecutionState::new(
425            &execution_arn,
426            &checkpoint_token,
427            initial_state,
428            self.checkpoint_api.clone(),
429        ));
430
431        // Create the durable context
432        let ctx = DurableContext::new(execution_state.clone());
433
434        // Record invocation start
435        let start_time = chrono::Utc::now();
436        let mut invocation = Invocation::with_start(start_time);
437
438        // Execute the handler
439        self.invocation_active.store(true, Ordering::SeqCst);
440        let handler_result = (self.handler)(payload.clone(), ctx).await;
441        self.invocation_active.store(false, Ordering::SeqCst);
442
443        // Record invocation end
444        let end_time = chrono::Utc::now();
445        invocation = invocation.with_end(end_time);
446
447        // Retrieve operations from the checkpoint server
448        let operations = match self.checkpoint_api.get_operations(&execution_arn, "").await {
449            Ok(response) => {
450                let mut storage = self.operation_storage.write().await;
451                for op in &response.operations {
452                    storage.update_operation(op.clone());
453                }
454                response.operations
455            }
456            Err(_) => Vec::new(),
457        };
458
459        // Populate pre-registered handles with matching operations
460        self.populate_handles(&operations).await;
461
462        // Build the test result based on handler outcome
463        match handler_result {
464            Ok(result) => {
465                self.execution_complete.store(true, Ordering::SeqCst);
466                let mut test_result =
467                    TestExecutionResult::success(result, operations, execution_arn);
468                test_result.invocations.push(invocation);
469                Ok(test_result)
470            }
471            Err(error) => {
472                // Check if this is a suspend error (which means pending, not failed)
473                if error.is_suspend() {
474                    // Handler suspended - need to process pending operations
475                    let test_result = self
476                        .handle_pending_execution(payload, execution_arn, invocation)
477                        .await?;
478                    Ok(test_result)
479                } else {
480                    self.execution_complete.store(true, Ordering::SeqCst);
481                    let error_obj = durable_execution_sdk::ErrorObject::from(&error);
482                    let test_error = TestResultError::new(error_obj.error_type, error.to_string());
483                    let mut test_result =
484                        TestExecutionResult::failure(test_error.clone(), operations, execution_arn);
485                    test_result
486                        .invocations
487                        .push(invocation.with_error(test_error));
488                    Ok(test_result)
489                }
490            }
491        }
492    }
493
494    /// Handle a pending execution by processing operations and re-invoking as needed.
495    async fn handle_pending_execution(
496        &mut self,
497        payload: I,
498        execution_arn: String,
499        initial_invocation: Invocation,
500    ) -> Result<TestExecutionResult<O>, crate::error::TestError> {
501        let mut invocations = vec![initial_invocation];
502        let mut iteration_count = 0;
503        const MAX_ITERATIONS: usize = 100; // Safety limit
504
505        loop {
506            iteration_count += 1;
507            if iteration_count > MAX_ITERATIONS {
508                return Err(crate::error::TestError::CheckpointServerError(
509                    "Maximum iteration count exceeded".to_string(),
510                ));
511            }
512
513            // Get current operations
514            let mut operations = match self.checkpoint_api.get_operations(&execution_arn, "").await
515            {
516                Ok(response) => {
517                    let mut storage = self.operation_storage.write().await;
518                    for op in &response.operations {
519                        storage.update_operation(op.clone());
520                    }
521                    response.operations
522                }
523                Err(_) => Vec::new(),
524            };
525
526            // Populate pre-registered handles with matching operations
527            self.populate_handles(&operations).await;
528
529            // Process operations and check for execution completion
530            let process_result = self.process_operations(&operations, &execution_arn);
531
532            match process_result {
533                ProcessOperationsResult::ExecutionSucceeded(result_str) => {
534                    self.execution_complete.store(true, Ordering::SeqCst);
535                    if let Ok(result) = serde_json::from_str::<O>(&result_str) {
536                        let mut test_result =
537                            TestExecutionResult::success(result, operations, execution_arn);
538                        test_result.invocations = invocations;
539                        return Ok(test_result);
540                    }
541                    // If we can't parse the result, return running status
542                    let mut test_result = TestExecutionResult::running(operations, execution_arn);
543                    test_result.invocations = invocations;
544                    return Ok(test_result);
545                }
546                ProcessOperationsResult::ExecutionFailed(error) => {
547                    self.execution_complete.store(true, Ordering::SeqCst);
548                    let mut test_result =
549                        TestExecutionResult::failure(error, operations, execution_arn);
550                    test_result.invocations = invocations;
551                    return Ok(test_result);
552                }
553                ProcessOperationsResult::NoPendingOperations => {
554                    // No more pending operations we can handle
555                    let mut test_result = TestExecutionResult::running(operations, execution_arn);
556                    test_result.invocations = invocations;
557                    return Ok(test_result);
558                }
559                ProcessOperationsResult::ShouldReinvoke(advance_time_ms) => {
560                    // If advance_time_ms is None, it means there are pending operations
561                    // but none with a scheduled time (e.g., only callbacks are pending).
562                    // Callbacks need external signals to complete, not re-invocation.
563                    if advance_time_ms.is_none() {
564                        // When there are registered handles, the test code will send
565                        // callback responses via OperationHandle. Poll the checkpoint
566                        // server until the callback completes, then re-invoke.
567                        if !self.registered_handles.is_empty() {
568                            loop {
569                                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
570
571                                // Re-fetch operations to check for callback completion
572                                let poll_operations = match self
573                                    .checkpoint_api
574                                    .get_operations(&execution_arn, "")
575                                    .await
576                                {
577                                    Ok(response) => response.operations,
578                                    Err(_) => continue,
579                                };
580
581                                // Populate handles with updated operations
582                                self.populate_handles(&poll_operations).await;
583
584                                // Check if any previously pending callback has completed
585                                let all_callbacks_done =
586                                    self.pending_operations.iter().all(|op_id| {
587                                        poll_operations.iter().any(|op| {
588                                            &op.operation_id == op_id
589                                                && op.operation_type == OperationType::Callback
590                                                && matches!(
591                                                    op.status,
592                                                    OperationStatus::Succeeded
593                                                        | OperationStatus::Failed
594                                                        | OperationStatus::Cancelled
595                                                )
596                                        })
597                                    });
598
599                                if all_callbacks_done {
600                                    operations = poll_operations;
601                                    break;
602                                }
603                            }
604                            // Fall through to re-invoke the handler
605                        } else {
606                            // No registered handles — return Running as before
607                            let mut test_result =
608                                TestExecutionResult::running(operations, execution_arn);
609                            test_result.invocations = invocations;
610                            return Ok(test_result);
611                        }
612                    }
613
614                    // Advance time if needed (for time skipping mode)
615                    if let Some(advance_ms) = advance_time_ms {
616                        if advance_ms > 0 {
617                            tokio::time::advance(tokio::time::Duration::from_millis(advance_ms))
618                                .await;
619                        }
620                    }
621
622                    // Mark all pending wait operations as SUCCEEDED after time advancement
623                    // This is critical for the handler to see the waits as completed
624                    // In time-skip mode, we complete waits immediately since we've already
625                    // "advanced" time conceptually
626                    if self.skip_time_config.enabled {
627                        for op in &operations {
628                            if op.operation_type == OperationType::Wait
629                                && op.status == OperationStatus::Started
630                            {
631                                // In time-skip mode, complete all pending waits immediately
632                                // We don't check the timestamp because tokio::time::advance
633                                // doesn't affect chrono::Utc::now()
634
635                                // Update the wait operation to SUCCEEDED
636                                let mut updated_operation = op.clone();
637                                updated_operation.status = OperationStatus::Succeeded;
638                                updated_operation.end_timestamp =
639                                    Some(chrono::Utc::now().timestamp_millis());
640
641                                let update_request = super::types::UpdateCheckpointDataRequest {
642                                    execution_id: execution_arn.clone(),
643                                    operation_id: op.operation_id.clone(),
644                                    operation_data: updated_operation,
645                                    payload: None,
646                                    error: None,
647                                };
648
649                                let payload = serde_json::to_string(&update_request)?;
650                                let _ = self
651                                    .checkpoint_api
652                                    .send_api_request(
653                                        super::types::ApiType::UpdateCheckpointData,
654                                        payload,
655                                    )
656                                    .await;
657                            }
658                        }
659                    }
660                }
661            }
662
663            // Re-invoke the handler
664            let new_invocation_id = uuid::Uuid::new_v4().to_string();
665            let start_invocation_request = super::types::StartInvocationRequest {
666                execution_id: execution_arn.clone(),
667                invocation_id: new_invocation_id.clone(),
668            };
669            let start_payload = serde_json::to_string(&start_invocation_request)?;
670
671            let start_response = self
672                .checkpoint_api
673                .send_api_request(super::types::ApiType::StartInvocation, start_payload)
674                .await?;
675
676            if let Some(error) = start_response.error {
677                return Err(crate::error::TestError::CheckpointServerError(error));
678            }
679
680            let invocation_result: super::InvocationResult =
681                serde_json::from_str(&start_response.payload.ok_or_else(|| {
682                    crate::error::TestError::CheckpointServerError(
683                        "Empty response from checkpoint server".to_string(),
684                    )
685                })?)?;
686
687            self.checkpoint_token = Some(invocation_result.checkpoint_token.clone());
688
689            // Create new execution state for re-invocation with the current operations
690            // This is critical for the handler to see the updated operation states
691            use durable_execution_sdk::lambda::InitialExecutionState;
692            use durable_execution_sdk::state::ExecutionState;
693
694            // Convert operation_events to Operations for the initial state
695            let current_operations: Vec<Operation> = invocation_result
696                .operation_events
697                .iter()
698                .map(|e| e.operation.clone())
699                .collect();
700            let initial_state = InitialExecutionState::with_operations(current_operations);
701            let execution_state = Arc::new(ExecutionState::new(
702                &execution_arn,
703                &invocation_result.checkpoint_token,
704                initial_state,
705                self.checkpoint_api.clone(),
706            ));
707
708            let ctx = DurableContext::new(execution_state);
709
710            // Record invocation
711            let start_time = chrono::Utc::now();
712            let mut invocation = Invocation::with_start(start_time);
713
714            // Execute handler
715            self.invocation_active.store(true, Ordering::SeqCst);
716            let handler_result = (self.handler)(payload.clone(), ctx).await;
717            self.invocation_active.store(false, Ordering::SeqCst);
718
719            let end_time = chrono::Utc::now();
720            invocation = invocation.with_end(end_time);
721
722            match handler_result {
723                Ok(result) => {
724                    self.execution_complete.store(true, Ordering::SeqCst);
725                    invocations.push(invocation);
726
727                    // Get final operations
728                    let final_operations =
729                        match self.checkpoint_api.get_operations(&execution_arn, "").await {
730                            Ok(response) => response.operations,
731                            Err(_) => Vec::new(),
732                        };
733
734                    // Populate handles with final operation state
735                    self.populate_handles(&final_operations).await;
736
737                    let mut test_result =
738                        TestExecutionResult::success(result, final_operations, execution_arn);
739                    test_result.invocations = invocations;
740                    return Ok(test_result);
741                }
742                Err(error) => {
743                    if error.is_suspend() {
744                        // Still pending, continue loop
745                        invocations.push(invocation);
746                        continue;
747                    } else {
748                        self.execution_complete.store(true, Ordering::SeqCst);
749                        let error_obj = durable_execution_sdk::ErrorObject::from(&error);
750                        let test_error =
751                            TestResultError::new(error_obj.error_type, error.to_string());
752                        invocations.push(invocation.with_error(test_error.clone()));
753
754                        let final_operations =
755                            match self.checkpoint_api.get_operations(&execution_arn, "").await {
756                                Ok(response) => response.operations,
757                                Err(_) => Vec::new(),
758                            };
759
760                        // Populate handles with final operation state
761                        self.populate_handles(&final_operations).await;
762
763                        let mut test_result = TestExecutionResult::failure(
764                            test_error,
765                            final_operations,
766                            execution_arn,
767                        );
768                        test_result.invocations = invocations;
769                        return Ok(test_result);
770                    }
771                }
772            }
773        }
774    }
775
776    /// Process a batch of operations from checkpoint polling.
777    ///
778    /// This method iterates over all operations and dispatches each one
779    /// to the appropriate handler based on its type.
780    ///
781    /// # Arguments
782    ///
783    /// * `operations` - The list of operations to process
784    /// * `execution_id` - The execution ID
785    ///
786    /// # Returns
787    ///
788    /// A `ProcessOperationsResult` indicating what action should be taken next.
789    ///
790    /// # Requirements
791    ///
792    /// - 18.2: WHEN checkpoint polling receives operation updates,
793    ///   THE Test_Execution_Orchestrator SHALL process each operation based on its type
794    fn process_operations(
795        &mut self,
796        operations: &[Operation],
797        execution_id: &str,
798    ) -> ProcessOperationsResult {
799        // First, check for execution completion
800        if let Some(exec_result) = self.handle_execution_update(operations) {
801            return exec_result;
802        }
803
804        // Track pending operations and earliest scheduled time
805        let mut has_pending_operations = false;
806        let mut earliest_scheduled_time: Option<i64> = None;
807
808        // Process each operation
809        for operation in operations {
810            let result = self.process_operation(operation, execution_id);
811
812            match result {
813                OperationProcessResult::Pending(scheduled_time) => {
814                    has_pending_operations = true;
815                    if let Some(time) = scheduled_time {
816                        match earliest_scheduled_time {
817                            None => earliest_scheduled_time = Some(time),
818                            Some(current) if time < current => earliest_scheduled_time = Some(time),
819                            _ => {}
820                        }
821                    }
822                }
823                OperationProcessResult::Completed => {
824                    // Operation is complete, nothing to do
825                }
826                OperationProcessResult::NotApplicable => {
827                    // Operation type not handled
828                }
829            }
830        }
831
832        if !has_pending_operations {
833            return ProcessOperationsResult::NoPendingOperations;
834        }
835
836        // Calculate time to advance if time skipping is enabled
837        let advance_time_ms = if self.skip_time_config.enabled {
838            if let Some(end_ts) = earliest_scheduled_time {
839                let now_ms = chrono::Utc::now().timestamp_millis();
840                if end_ts > now_ms {
841                    Some((end_ts - now_ms) as u64)
842                } else {
843                    Some(0)
844                }
845            } else {
846                None
847            }
848        } else {
849            None
850        };
851
852        ProcessOperationsResult::ShouldReinvoke(advance_time_ms)
853    }
854
855    /// Process a single operation based on its type.
856    ///
857    /// This method dispatches to the appropriate handler based on the operation type.
858    ///
859    /// # Arguments
860    ///
861    /// * `operation` - The operation to process
862    /// * `execution_id` - The execution ID
863    ///
864    /// # Returns
865    ///
866    /// An `OperationProcessResult` indicating the operation's status.
867    ///
868    /// # Requirements
869    ///
870    /// - 18.2: WHEN checkpoint polling receives operation updates,
871    ///   THE Test_Execution_Orchestrator SHALL process each operation based on its type
872    fn process_operation(
873        &mut self,
874        operation: &Operation,
875        execution_id: &str,
876    ) -> OperationProcessResult {
877        // Skip completed operations
878        if operation.status.is_terminal() {
879            return OperationProcessResult::Completed;
880        }
881
882        match operation.operation_type {
883            OperationType::Wait => self.handle_wait_update(operation, execution_id),
884            OperationType::Step => self.handle_step_update(operation, execution_id),
885            OperationType::Callback => self.handle_callback_update(operation, execution_id),
886            OperationType::Execution => {
887                // Execution operations are handled separately in handle_execution_update
888                OperationProcessResult::NotApplicable
889            }
890            OperationType::Invoke | OperationType::Context => {
891                // These operation types are handled by the SDK internally
892                if operation.status == OperationStatus::Started {
893                    OperationProcessResult::Pending(None)
894                } else {
895                    OperationProcessResult::Completed
896                }
897            }
898        }
899    }
900
901    /// Handle WAIT operation - schedule re-invocation at wait end time.
902    ///
903    /// When a wait operation is encountered, this method extracts the scheduled
904    /// end timestamp and returns it so the orchestrator can schedule re-invocation.
905    ///
906    /// # Arguments
907    ///
908    /// * `operation` - The wait operation to handle
909    /// * `execution_id` - The execution ID
910    ///
911    /// # Returns
912    ///
913    /// An `OperationProcessResult` with the scheduled end timestamp if available.
914    ///
915    /// # Requirements
916    ///
917    /// - 16.1: WHEN a wait operation is encountered, THE Test_Execution_Orchestrator
918    ///   SHALL track the wait's scheduled end timestamp
919    /// - 18.3: WHEN a WAIT operation update is received with START action,
920    ///   THE Test_Execution_Orchestrator SHALL schedule re-invocation at the
921    ///   wait's scheduled end timestamp
922    fn handle_wait_update(
923        &mut self,
924        operation: &Operation,
925        _execution_id: &str,
926    ) -> OperationProcessResult {
927        // Only process started wait operations
928        if operation.status != OperationStatus::Started {
929            return OperationProcessResult::Completed;
930        }
931
932        // Track this as a pending operation
933        self.pending_operations
934            .insert(operation.operation_id.clone());
935
936        // Extract scheduled end timestamp from wait details
937        let scheduled_end_timestamp = operation
938            .wait_details
939            .as_ref()
940            .and_then(|details| details.scheduled_end_timestamp);
941
942        OperationProcessResult::Pending(scheduled_end_timestamp)
943    }
944
945    /// Handle STEP operation - schedule retry at next attempt time.
946    ///
947    /// When a step operation is pending retry, this method extracts the next
948    /// attempt timestamp and returns it so the orchestrator can schedule re-invocation.
949    ///
950    /// # Arguments
951    ///
952    /// * `operation` - The step operation to handle
953    /// * `execution_id` - The execution ID
954    ///
955    /// # Returns
956    ///
957    /// An `OperationProcessResult` with the next attempt timestamp if available.
958    ///
959    /// # Requirements
960    ///
961    /// - 18.4: WHEN a STEP operation update is received with RETRY action,
962    ///   THE Test_Execution_Orchestrator SHALL schedule re-invocation at the
963    ///   step's next attempt timestamp
964    fn handle_step_update(
965        &mut self,
966        operation: &Operation,
967        _execution_id: &str,
968    ) -> OperationProcessResult {
969        // Check if step is pending retry
970        if operation.status != OperationStatus::Pending
971            && operation.status != OperationStatus::Started
972        {
973            return OperationProcessResult::Completed;
974        }
975
976        // Track this as a pending operation
977        self.pending_operations
978            .insert(operation.operation_id.clone());
979
980        // Extract next attempt timestamp from step details
981        let next_attempt_timestamp = operation
982            .step_details
983            .as_ref()
984            .and_then(|details| details.next_attempt_timestamp);
985
986        OperationProcessResult::Pending(next_attempt_timestamp)
987    }
988
989    /// Handle CALLBACK operation - schedule re-invocation when callback completes.
990    ///
991    /// When a callback operation is started, this method tracks it as pending.
992    /// The callback will complete when an external system sends a success/failure response.
993    ///
994    /// # Arguments
995    ///
996    /// * `operation` - The callback operation to handle
997    /// * `execution_id` - The execution ID
998    ///
999    /// # Returns
1000    ///
1001    /// An `OperationProcessResult` indicating the callback is pending.
1002    ///
1003    /// # Requirements
1004    ///
1005    /// - 18.5: WHEN a CALLBACK operation status changes to SUCCEEDED or FAILED,
1006    ///   THE Test_Execution_Orchestrator SHALL schedule handler re-invocation
1007    fn handle_callback_update(
1008        &mut self,
1009        operation: &Operation,
1010        _execution_id: &str,
1011    ) -> OperationProcessResult {
1012        // Check if callback is still pending
1013        if operation.status != OperationStatus::Started {
1014            // Callback has completed (succeeded or failed)
1015            self.pending_operations.remove(&operation.operation_id);
1016            return OperationProcessResult::Completed;
1017        }
1018
1019        // Track this as a pending operation
1020        self.pending_operations
1021            .insert(operation.operation_id.clone());
1022
1023        // Callbacks don't have a scheduled time - they complete when external
1024        // system sends a response
1025        OperationProcessResult::Pending(None)
1026    }
1027
1028    /// Populate pre-registered operation handles with matching operations.
1029    ///
1030    /// For each operation in the list, checks all registered handles for a match
1031    /// based on the handle's matcher (by name, by index, or by ID). When a match
1032    /// is found, writes the `Operation` into the handle's `inner` and sends a
1033    /// status update via the handle's `status_tx` watch channel.
1034    ///
1035    /// Also updates the shared operations list for child operation enumeration.
1036    ///
1037    /// # Arguments
1038    ///
1039    /// * `operations` - The current list of operations from the checkpoint server
1040    ///
1041    /// # Requirements
1042    ///
1043    /// - 1.2: WHEN the handler executes and produces an operation matching the
1044    ///   registered name, THE Local_Test_Runner SHALL populate the Operation_Handle
1045    ///   with the operation data
1046    /// - 2.2: WHEN `run()` is executing, THE Local_Test_Runner SHALL allow concurrent
1047    ///   calls to `wait_for_data()` on pre-registered Operation_Handles
1048    async fn populate_handles(&self, operations: &[Operation]) {
1049        // Update the shared operations list for child enumeration
1050        {
1051            let mut shared_ops = self.shared_operations.write().await;
1052            shared_ops.clear();
1053            shared_ops.extend(operations.iter().cloned());
1054        }
1055
1056        // Check each registered handle against the operations
1057        for handle in &self.registered_handles {
1058            let matched_op = match &handle.matcher {
1059                OperationMatcher::ByName(name) => operations
1060                    .iter()
1061                    .find(|op| op.name.as_deref() == Some(name)),
1062                OperationMatcher::ByIndex(index) => operations.get(*index),
1063                OperationMatcher::ById(id) => operations.iter().find(|op| op.operation_id == *id),
1064                OperationMatcher::ByNameAndIndex(name, index) => operations
1065                    .iter()
1066                    .filter(|op| op.name.as_deref() == Some(name))
1067                    .nth(*index),
1068            };
1069
1070            if let Some(op) = matched_op {
1071                // Write the operation into the handle's inner
1072                {
1073                    let mut inner = handle.inner.write().await;
1074                    *inner = Some(op.clone());
1075                }
1076
1077                // Send status update via the watch channel
1078                let _ = handle.status_tx.send(Some(op.status));
1079            }
1080        }
1081    }
1082
1083    /// Schedule handler re-invocation at a specific timestamp.
1084    ///
1085    /// This method schedules a handler re-invocation via the scheduler. When time
1086    /// skipping is enabled, it advances tokio time before invocation. It also
1087    /// updates checkpoint data (marks the operation as SUCCEEDED) before re-invoking.
1088    ///
1089    /// # Arguments
1090    ///
1091    /// * `timestamp_ms` - The timestamp in milliseconds since epoch when to invoke
1092    /// * `execution_id` - The execution ID
1093    /// * `operation_id` - The operation ID to mark as SUCCEEDED before re-invoking
1094    ///
1095    /// # Requirements
1096    ///
1097    /// - 16.2: WHEN time skipping is enabled and a wait's scheduled end time is reached,
1098    ///   THE Test_Execution_Orchestrator SHALL mark the wait as SUCCEEDED and schedule
1099    ///   handler re-invocation
1100    /// - 16.3: WHEN time skipping is enabled, THE Test_Execution_Orchestrator SHALL use
1101    ///   tokio::time::advance() to skip wait durations instantly
1102    /// - 17.3: WHEN a function is scheduled, THE Scheduler SHALL execute any checkpoint
1103    ///   updates before invoking the handler
1104    pub fn schedule_invocation_at_timestamp(
1105        &mut self,
1106        timestamp_ms: i64,
1107        execution_id: &str,
1108        operation_id: &str,
1109    ) {
1110        let checkpoint_api = Arc::clone(&self.checkpoint_api);
1111        let execution_id_owned = execution_id.to_string();
1112        let operation_id_owned = operation_id.to_string();
1113        let skip_time_enabled = self.skip_time_config.enabled;
1114
1115        // Calculate the timestamp as DateTime<Utc>
1116        let timestamp = chrono::DateTime::from_timestamp_millis(timestamp_ms)
1117            .map(|dt| dt.with_timezone(&chrono::Utc));
1118
1119        // Create the checkpoint update function that marks the operation as SUCCEEDED
1120        let update_checkpoint: super::scheduler::CheckpointUpdateFn = Box::new(move || {
1121            let checkpoint_api = checkpoint_api;
1122            let execution_id = execution_id_owned;
1123            let operation_id = operation_id_owned;
1124
1125            Box::pin(async move {
1126                // If time skipping is enabled, advance tokio time to the scheduled timestamp
1127                if skip_time_enabled {
1128                    let now_ms = chrono::Utc::now().timestamp_millis();
1129                    let target_ms = timestamp_ms;
1130                    if target_ms > now_ms {
1131                        let advance_duration =
1132                            tokio::time::Duration::from_millis((target_ms - now_ms) as u64);
1133                        tokio::time::advance(advance_duration).await;
1134                    }
1135                }
1136
1137                // Update checkpoint data to mark the operation as SUCCEEDED
1138                let mut updated_operation = Operation::new(&operation_id, OperationType::Wait);
1139                updated_operation.status = OperationStatus::Succeeded;
1140                updated_operation.end_timestamp = Some(chrono::Utc::now().timestamp_millis());
1141
1142                let update_request = super::types::UpdateCheckpointDataRequest {
1143                    execution_id: execution_id.clone(),
1144                    operation_id: operation_id.clone(),
1145                    operation_data: updated_operation,
1146                    payload: None,
1147                    error: None,
1148                };
1149
1150                let payload = serde_json::to_string(&update_request)
1151                    .map_err(crate::error::TestError::SerializationError)?;
1152
1153                let response = checkpoint_api
1154                    .send_api_request(super::types::ApiType::UpdateCheckpointData, payload)
1155                    .await?;
1156
1157                if let Some(error) = response.error {
1158                    return Err(crate::error::TestError::CheckpointServerError(error));
1159                }
1160
1161                Ok(())
1162            })
1163        });
1164
1165        // Create the invocation function (empty for now - actual invocation happens in the main loop)
1166        let start_invocation: super::scheduler::BoxedAsyncFn = Box::new(|| {
1167            Box::pin(async {
1168                // The actual handler invocation is managed by the main execution loop
1169                // This function just signals that the scheduled time has been reached
1170            })
1171        });
1172
1173        // Create the error handler
1174        let on_error: super::scheduler::ErrorHandler = Box::new(|error| {
1175            tracing::error!("Error during scheduled invocation: {:?}", error);
1176        });
1177
1178        // Schedule the function via the scheduler
1179        self.scheduler.schedule_function(
1180            start_invocation,
1181            on_error,
1182            timestamp,
1183            Some(update_checkpoint),
1184        );
1185    }
1186
1187    /// Schedule handler re-invocation at a specific timestamp with a custom checkpoint update.
1188    ///
1189    /// This is a more flexible version that allows specifying a custom checkpoint update function.
1190    ///
1191    /// # Arguments
1192    ///
1193    /// * `timestamp` - The timestamp when to invoke (optional, None for immediate)
1194    /// * `update_checkpoint` - Optional function to update checkpoint data before invocation
1195    ///
1196    /// # Requirements
1197    ///
1198    /// - 17.3: WHEN a function is scheduled, THE Scheduler SHALL execute any checkpoint
1199    ///   updates before invoking the handler
1200    pub fn schedule_invocation_with_update(
1201        &mut self,
1202        timestamp: Option<chrono::DateTime<chrono::Utc>>,
1203        update_checkpoint: Option<super::scheduler::CheckpointUpdateFn>,
1204    ) {
1205        let skip_time_enabled = self.skip_time_config.enabled;
1206
1207        // Wrap the checkpoint update to include time advancement if needed
1208        let wrapped_update: Option<super::scheduler::CheckpointUpdateFn> = if skip_time_enabled {
1209            if let Some(ts) = timestamp {
1210                let original_update = update_checkpoint;
1211                Some(Box::new(move || {
1212                    Box::pin(async move {
1213                        // Advance tokio time to the scheduled timestamp
1214                        let now = chrono::Utc::now();
1215                        if ts > now {
1216                            let duration = (ts - now).to_std().unwrap_or_default();
1217                            tokio::time::advance(duration).await;
1218                        }
1219
1220                        // Execute the original checkpoint update if provided
1221                        if let Some(update_fn) = original_update {
1222                            update_fn().await?;
1223                        }
1224
1225                        Ok(())
1226                    })
1227                }))
1228            } else {
1229                update_checkpoint
1230            }
1231        } else {
1232            update_checkpoint
1233        };
1234
1235        // Create the invocation function
1236        let start_invocation: super::scheduler::BoxedAsyncFn = Box::new(|| {
1237            Box::pin(async {
1238                // The actual handler invocation is managed by the main execution loop
1239            })
1240        });
1241
1242        // Create the error handler
1243        let on_error: super::scheduler::ErrorHandler = Box::new(|error| {
1244            tracing::error!("Error during scheduled invocation: {:?}", error);
1245        });
1246
1247        // Schedule the function
1248        self.scheduler
1249            .schedule_function(start_invocation, on_error, timestamp, wrapped_update);
1250    }
1251
1252    /// Check if there are scheduled functions pending.
1253    ///
1254    /// # Returns
1255    ///
1256    /// `true` if there are scheduled functions waiting to be executed.
1257    ///
1258    /// # Requirements
1259    ///
1260    /// - 17.4: WHEN the scheduler has pending functions, THE Scheduler SHALL report
1261    ///   that scheduled functions exist via has_scheduled_function()
1262    pub fn has_scheduled_functions(&self) -> bool {
1263        self.scheduler.has_scheduled_function()
1264    }
1265
1266    /// Invoke the handler and process the result.
1267    ///
1268    /// This method handles a single handler invocation, including:
1269    /// - Checking for active invocations (prevents concurrent invocations in time-skip mode)
1270    /// - Starting invocation via checkpoint API
1271    /// - Invoking handler with checkpoint token and operations
1272    /// - Processing handler result (PENDING, SUCCEEDED, FAILED)
1273    /// - Scheduling re-invocation if dirty operations exist
1274    ///
1275    /// # Arguments
1276    ///
1277    /// * `payload` - The input payload to pass to the handler
1278    /// * `execution_id` - The execution ID
1279    /// * `is_initial` - Whether this is the initial invocation (vs a re-invocation)
1280    ///
1281    /// # Returns
1282    ///
1283    /// An `InvokeHandlerResult` indicating the outcome of the invocation.
1284    ///
1285    /// # Requirements
1286    ///
1287    /// - 16.4: WHEN a handler invocation returns PENDING status,
1288    ///   THE Test_Execution_Orchestrator SHALL continue polling for operation
1289    ///   updates and re-invoke the handler when operations complete
1290    /// - 16.5: WHEN a handler invocation returns SUCCEEDED or FAILED status,
1291    ///   THE Test_Execution_Orchestrator SHALL resolve the execution and stop polling
1292    /// - 16.6: WHEN multiple operations are pending (waits, callbacks, steps with retries),
1293    ///   THE Test_Execution_Orchestrator SHALL process them in scheduled order
1294    pub async fn invoke_handler(
1295        &mut self,
1296        payload: I,
1297        execution_id: &str,
1298        is_initial: bool,
1299    ) -> Result<InvokeHandlerResult<O>, crate::error::TestError> {
1300        use super::types::{ApiType, StartDurableExecutionRequest, StartInvocationRequest};
1301        use durable_execution_sdk::lambda::InitialExecutionState;
1302        use durable_execution_sdk::state::ExecutionState;
1303
1304        // Check for active invocations (prevent concurrent invocations in time-skip mode)
1305        if self.skip_time_config.enabled && self.invocation_active.load(Ordering::SeqCst) {
1306            return Err(crate::error::TestError::CheckpointServerError(
1307                "Concurrent invocation detected in time-skip mode. Only one invocation can be active at a time.".to_string(),
1308            ));
1309        }
1310
1311        // Start invocation via checkpoint API
1312        let invocation_id = uuid::Uuid::new_v4().to_string();
1313        let checkpoint_token = if is_initial {
1314            // For initial invocation, start a new durable execution
1315            let payload_json = serde_json::to_string(&payload)?;
1316            let start_request = StartDurableExecutionRequest {
1317                invocation_id: invocation_id.clone(),
1318                payload: Some(payload_json),
1319            };
1320            let start_payload = serde_json::to_string(&start_request)?;
1321
1322            let start_response = self
1323                .checkpoint_api
1324                .send_api_request(ApiType::StartDurableExecution, start_payload)
1325                .await?;
1326
1327            if let Some(error) = start_response.error {
1328                return Err(crate::error::TestError::CheckpointServerError(error));
1329            }
1330
1331            let invocation_result: super::InvocationResult =
1332                serde_json::from_str(&start_response.payload.ok_or_else(|| {
1333                    crate::error::TestError::CheckpointServerError(
1334                        "Empty response from checkpoint server".to_string(),
1335                    )
1336                })?)?;
1337
1338            // Update orchestrator state
1339            self.execution_id = Some(invocation_result.execution_id.clone());
1340            self.checkpoint_token = Some(invocation_result.checkpoint_token.clone());
1341
1342            invocation_result.checkpoint_token
1343        } else {
1344            // For re-invocation, start a new invocation for existing execution
1345            let start_invocation_request = StartInvocationRequest {
1346                execution_id: execution_id.to_string(),
1347                invocation_id: invocation_id.clone(),
1348            };
1349            let start_payload = serde_json::to_string(&start_invocation_request)?;
1350
1351            let start_response = self
1352                .checkpoint_api
1353                .send_api_request(ApiType::StartInvocation, start_payload)
1354                .await?;
1355
1356            if let Some(error) = start_response.error {
1357                return Err(crate::error::TestError::CheckpointServerError(error));
1358            }
1359
1360            let invocation_result: super::InvocationResult =
1361                serde_json::from_str(&start_response.payload.ok_or_else(|| {
1362                    crate::error::TestError::CheckpointServerError(
1363                        "Empty response from checkpoint server".to_string(),
1364                    )
1365                })?)?;
1366
1367            // Update checkpoint token
1368            self.checkpoint_token = Some(invocation_result.checkpoint_token.clone());
1369
1370            invocation_result.checkpoint_token
1371        };
1372
1373        // Create execution state with the checkpoint worker manager
1374        let initial_state = InitialExecutionState::new();
1375        let execution_state = Arc::new(ExecutionState::new(
1376            execution_id,
1377            &checkpoint_token,
1378            initial_state,
1379            self.checkpoint_api.clone(),
1380        ));
1381
1382        // Create the durable context
1383        let ctx = DurableContext::new(execution_state.clone());
1384
1385        // Record invocation start
1386        let start_time = chrono::Utc::now();
1387        let mut invocation = Invocation::with_start(start_time);
1388
1389        // Mark invocation as active
1390        self.invocation_active.store(true, Ordering::SeqCst);
1391
1392        // Invoke handler with checkpoint token and operations
1393        let handler_result = (self.handler)(payload.clone(), ctx).await;
1394
1395        // Mark invocation as inactive
1396        self.invocation_active.store(false, Ordering::SeqCst);
1397
1398        // Record invocation end
1399        let end_time = chrono::Utc::now();
1400        invocation = invocation.with_end(end_time);
1401
1402        // Retrieve operations from the checkpoint server
1403        let operations = match self.checkpoint_api.get_operations(execution_id, "").await {
1404            Ok(response) => {
1405                let mut storage = self.operation_storage.write().await;
1406                for op in &response.operations {
1407                    storage.update_operation(op.clone());
1408                }
1409                response.operations
1410            }
1411            Err(_) => Vec::new(),
1412        };
1413
1414        // Process handler result (PENDING, SUCCEEDED, FAILED)
1415        match handler_result {
1416            Ok(result) => {
1417                // Handler completed successfully
1418                self.execution_complete.store(true, Ordering::SeqCst);
1419                Ok(InvokeHandlerResult::Succeeded {
1420                    result,
1421                    operations,
1422                    invocation,
1423                })
1424            }
1425            Err(error) => {
1426                if error.is_suspend() {
1427                    // Handler suspended - check for dirty operations and schedule re-invocation
1428                    let process_result = self.process_operations(&operations, execution_id);
1429
1430                    match process_result {
1431                        ProcessOperationsResult::ExecutionSucceeded(result_str) => {
1432                            self.execution_complete.store(true, Ordering::SeqCst);
1433                            if let Ok(result) = serde_json::from_str::<O>(&result_str) {
1434                                Ok(InvokeHandlerResult::Succeeded {
1435                                    result,
1436                                    operations,
1437                                    invocation,
1438                                })
1439                            } else {
1440                                Ok(InvokeHandlerResult::Pending {
1441                                    operations,
1442                                    invocation,
1443                                    should_reinvoke: false,
1444                                    advance_time_ms: None,
1445                                })
1446                            }
1447                        }
1448                        ProcessOperationsResult::ExecutionFailed(test_error) => {
1449                            self.execution_complete.store(true, Ordering::SeqCst);
1450                            Ok(InvokeHandlerResult::Failed {
1451                                error: test_error,
1452                                operations,
1453                                invocation,
1454                            })
1455                        }
1456                        ProcessOperationsResult::NoPendingOperations => {
1457                            // No pending operations - execution is stuck
1458                            Ok(InvokeHandlerResult::Pending {
1459                                operations,
1460                                invocation,
1461                                should_reinvoke: false,
1462                                advance_time_ms: None,
1463                            })
1464                        }
1465                        ProcessOperationsResult::ShouldReinvoke(advance_time_ms) => {
1466                            // Schedule re-invocation if dirty operations exist
1467                            Ok(InvokeHandlerResult::Pending {
1468                                operations,
1469                                invocation,
1470                                should_reinvoke: true,
1471                                advance_time_ms,
1472                            })
1473                        }
1474                    }
1475                } else {
1476                    // Handler failed with an actual error
1477                    self.execution_complete.store(true, Ordering::SeqCst);
1478                    let error_obj = durable_execution_sdk::ErrorObject::from(&error);
1479                    let test_error = TestResultError::new(error_obj.error_type, error.to_string());
1480                    let invocation_with_error = invocation.with_error(test_error.clone());
1481                    Ok(InvokeHandlerResult::Failed {
1482                        error: test_error,
1483                        operations,
1484                        invocation: invocation_with_error,
1485                    })
1486                }
1487            }
1488        }
1489    }
1490
1491    /// Flush all scheduled functions without executing them.
1492    ///
1493    /// This is useful for cleanup when execution completes or is cancelled.
1494    ///
1495    /// # Requirements
1496    ///
1497    /// - 17.5: WHEN execution completes, THE Scheduler SHALL flush any remaining
1498    ///   scheduled functions
1499    pub fn flush_scheduled_functions(&mut self) {
1500        self.scheduler.flush_timers();
1501    }
1502
1503    /// Process the next scheduled function.
1504    ///
1505    /// # Returns
1506    ///
1507    /// `true` if a function was processed, `false` if the queue is empty.
1508    pub async fn process_next_scheduled(&mut self) -> bool {
1509        self.scheduler.process_next().await
1510    }
1511
1512    /// Handle EXECUTION operation - resolve execution when complete.
1513    ///
1514    /// This method checks if the execution operation has completed (succeeded or failed)
1515    /// and returns the appropriate result.
1516    ///
1517    /// # Arguments
1518    ///
1519    /// * `operations` - All operations to search for the execution operation
1520    ///
1521    /// # Returns
1522    ///
1523    /// `Some(ProcessOperationsResult)` if execution is complete, `None` otherwise.
1524    ///
1525    /// # Requirements
1526    ///
1527    /// - 16.5: WHEN a handler invocation returns SUCCEEDED or FAILED status,
1528    ///   THE Test_Execution_Orchestrator SHALL resolve the execution and stop polling
1529    fn handle_execution_update(&self, operations: &[Operation]) -> Option<ProcessOperationsResult> {
1530        // Find the execution operation
1531        let execution_op = operations
1532            .iter()
1533            .find(|op| op.operation_type == OperationType::Execution)?;
1534
1535        match execution_op.status {
1536            OperationStatus::Succeeded => {
1537                let result_str = execution_op.result.clone().unwrap_or_default();
1538                Some(ProcessOperationsResult::ExecutionSucceeded(result_str))
1539            }
1540            OperationStatus::Failed => {
1541                let error = if let Some(err) = &execution_op.error {
1542                    TestResultError::new(err.error_type.clone(), err.error_message.clone())
1543                } else {
1544                    TestResultError::new("ExecutionFailed", "Execution failed")
1545                };
1546                Some(ProcessOperationsResult::ExecutionFailed(error))
1547            }
1548            _ => None,
1549        }
1550    }
1551}
1552
1553/// Result of processing operations.
1554///
1555/// This enum represents the possible outcomes of processing a batch of operations.
1556#[derive(Debug)]
1557pub enum ProcessOperationsResult {
1558    /// Execution completed successfully with the given result string
1559    ExecutionSucceeded(String),
1560    /// Execution failed with the given error
1561    ExecutionFailed(TestResultError),
1562    /// No pending operations that can be advanced
1563    NoPendingOperations,
1564    /// Should re-invoke the handler, optionally advancing time by the given milliseconds
1565    ShouldReinvoke(Option<u64>),
1566}
1567
1568/// Result of processing a single operation.
1569///
1570/// This enum represents the possible outcomes of processing a single operation.
1571#[derive(Debug)]
1572pub enum OperationProcessResult {
1573    /// Operation is pending with an optional scheduled timestamp (milliseconds since epoch)
1574    Pending(Option<i64>),
1575    /// Operation has completed
1576    Completed,
1577    /// Operation type is not applicable for processing
1578    NotApplicable,
1579}
1580
1581/// Result of invoking the handler.
1582///
1583/// This enum represents the possible outcomes of a single handler invocation.
1584#[derive(Debug)]
1585pub enum InvokeHandlerResult<T> {
1586    /// Handler completed successfully with a result
1587    Succeeded {
1588        /// The result value
1589        result: T,
1590        /// All operations from the execution
1591        operations: Vec<Operation>,
1592        /// The invocation details
1593        invocation: Invocation,
1594    },
1595    /// Handler failed with an error
1596    Failed {
1597        /// The error that occurred
1598        error: TestResultError,
1599        /// All operations from the execution
1600        operations: Vec<Operation>,
1601        /// The invocation details
1602        invocation: Invocation,
1603    },
1604    /// Handler is pending (suspended) and may need re-invocation
1605    Pending {
1606        /// All operations from the execution
1607        operations: Vec<Operation>,
1608        /// The invocation details
1609        invocation: Invocation,
1610        /// Whether the handler should be re-invoked
1611        should_reinvoke: bool,
1612        /// Optional time to advance before re-invocation (milliseconds)
1613        advance_time_ms: Option<u64>,
1614    },
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619    use super::*;
1620    use durable_execution_sdk::{ErrorObject, StepDetails, WaitDetails};
1621
1622    #[test]
1623    fn test_skip_time_config_default() {
1624        let config = SkipTimeConfig::default();
1625        assert!(!config.enabled);
1626    }
1627
1628    #[test]
1629    fn test_operation_storage_new() {
1630        let storage = OperationStorage::new();
1631        assert!(storage.is_empty());
1632        assert_eq!(storage.len(), 0);
1633    }
1634
1635    #[test]
1636    fn test_operation_storage_add_and_get() {
1637        let mut storage = OperationStorage::new();
1638
1639        let op = Operation::new("op-1", durable_execution_sdk::OperationType::Step);
1640        storage.add_operation(op);
1641
1642        assert_eq!(storage.len(), 1);
1643        assert!(storage.get_by_id("op-1").is_some());
1644    }
1645
1646    #[test]
1647    fn test_operation_storage_update() {
1648        let mut storage = OperationStorage::new();
1649
1650        let mut op = Operation::new("op-1", durable_execution_sdk::OperationType::Step);
1651        op.status = durable_execution_sdk::OperationStatus::Started;
1652        storage.add_operation(op);
1653
1654        let mut updated_op = Operation::new("op-1", durable_execution_sdk::OperationType::Step);
1655        updated_op.status = durable_execution_sdk::OperationStatus::Succeeded;
1656        storage.update_operation(updated_op);
1657
1658        assert_eq!(storage.len(), 1);
1659        let retrieved = storage.get_by_id("op-1").unwrap();
1660        assert_eq!(
1661            retrieved.status,
1662            durable_execution_sdk::OperationStatus::Succeeded
1663        );
1664    }
1665
1666    #[test]
1667    fn test_test_execution_result_success() {
1668        let result: TestExecutionResult<String> =
1669            TestExecutionResult::success("test".to_string(), vec![], "exec-1".to_string());
1670        assert_eq!(result.status, ExecutionStatus::Succeeded);
1671        assert_eq!(result.result, Some("test".to_string()));
1672        assert!(result.error.is_none());
1673    }
1674
1675    #[test]
1676    fn test_test_execution_result_failure() {
1677        let error = TestResultError::new("TestError", "test error");
1678        let result: TestExecutionResult<String> =
1679            TestExecutionResult::failure(error, vec![], "exec-1".to_string());
1680        assert_eq!(result.status, ExecutionStatus::Failed);
1681        assert!(result.result.is_none());
1682        assert!(result.error.is_some());
1683    }
1684
1685    #[test]
1686    fn test_test_execution_result_running() {
1687        let result: TestExecutionResult<String> =
1688            TestExecutionResult::running(vec![], "exec-1".to_string());
1689        assert_eq!(result.status, ExecutionStatus::Running);
1690        assert!(result.result.is_none());
1691        assert!(result.error.is_none());
1692    }
1693
1694    // Tests for ProcessOperationsResult
1695    #[test]
1696    fn test_process_operations_result_execution_succeeded() {
1697        let result = ProcessOperationsResult::ExecutionSucceeded("test result".to_string());
1698        match result {
1699            ProcessOperationsResult::ExecutionSucceeded(s) => assert_eq!(s, "test result"),
1700            _ => panic!("Expected ExecutionSucceeded"),
1701        }
1702    }
1703
1704    #[test]
1705    fn test_process_operations_result_execution_failed() {
1706        let error = TestResultError::new("TestError", "test error");
1707        let result = ProcessOperationsResult::ExecutionFailed(error);
1708        match result {
1709            ProcessOperationsResult::ExecutionFailed(e) => {
1710                assert_eq!(e.error_type, Some("TestError".to_string()));
1711            }
1712            _ => panic!("Expected ExecutionFailed"),
1713        }
1714    }
1715
1716    #[test]
1717    fn test_process_operations_result_no_pending() {
1718        let result = ProcessOperationsResult::NoPendingOperations;
1719        assert!(matches!(
1720            result,
1721            ProcessOperationsResult::NoPendingOperations
1722        ));
1723    }
1724
1725    #[test]
1726    fn test_process_operations_result_should_reinvoke() {
1727        let result = ProcessOperationsResult::ShouldReinvoke(Some(1000));
1728        match result {
1729            ProcessOperationsResult::ShouldReinvoke(Some(ms)) => assert_eq!(ms, 1000),
1730            _ => panic!("Expected ShouldReinvoke with time"),
1731        }
1732    }
1733
1734    // Tests for OperationProcessResult
1735    #[test]
1736    fn test_operation_process_result_pending_with_timestamp() {
1737        let result = OperationProcessResult::Pending(Some(1234567890));
1738        match result {
1739            OperationProcessResult::Pending(Some(ts)) => assert_eq!(ts, 1234567890),
1740            _ => panic!("Expected Pending with timestamp"),
1741        }
1742    }
1743
1744    #[test]
1745    fn test_operation_process_result_pending_without_timestamp() {
1746        let result = OperationProcessResult::Pending(None);
1747        match result {
1748            OperationProcessResult::Pending(None) => {}
1749            _ => panic!("Expected Pending without timestamp"),
1750        }
1751    }
1752
1753    #[test]
1754    fn test_operation_process_result_completed() {
1755        let result = OperationProcessResult::Completed;
1756        assert!(matches!(result, OperationProcessResult::Completed));
1757    }
1758
1759    #[test]
1760    fn test_operation_process_result_not_applicable() {
1761        let result = OperationProcessResult::NotApplicable;
1762        assert!(matches!(result, OperationProcessResult::NotApplicable));
1763    }
1764
1765    // Tests for handle_execution_update
1766    #[test]
1767    fn test_handle_execution_update_succeeded() {
1768        // Create a mock orchestrator for testing
1769        // We'll test the logic directly by creating operations
1770        let mut exec_op = Operation::new("exec-1", OperationType::Execution);
1771        exec_op.status = OperationStatus::Succeeded;
1772        exec_op.result = Some("\"success\"".to_string());
1773
1774        let operations = vec![exec_op];
1775
1776        // Find execution operation and check status
1777        let execution_op = operations
1778            .iter()
1779            .find(|op| op.operation_type == OperationType::Execution);
1780
1781        assert!(execution_op.is_some());
1782        let exec = execution_op.unwrap();
1783        assert_eq!(exec.status, OperationStatus::Succeeded);
1784        assert_eq!(exec.result, Some("\"success\"".to_string()));
1785    }
1786
1787    #[test]
1788    fn test_handle_execution_update_failed() {
1789        let mut exec_op = Operation::new("exec-1", OperationType::Execution);
1790        exec_op.status = OperationStatus::Failed;
1791        exec_op.error = Some(ErrorObject {
1792            error_type: "TestError".to_string(),
1793            error_message: "Test error message".to_string(),
1794            stack_trace: None,
1795        });
1796
1797        let operations = vec![exec_op];
1798
1799        let execution_op = operations
1800            .iter()
1801            .find(|op| op.operation_type == OperationType::Execution);
1802
1803        assert!(execution_op.is_some());
1804        let exec = execution_op.unwrap();
1805        assert_eq!(exec.status, OperationStatus::Failed);
1806        assert!(exec.error.is_some());
1807    }
1808
1809    #[test]
1810    fn test_handle_execution_update_still_running() {
1811        let mut exec_op = Operation::new("exec-1", OperationType::Execution);
1812        exec_op.status = OperationStatus::Started;
1813
1814        let operations = vec![exec_op];
1815
1816        let execution_op = operations
1817            .iter()
1818            .find(|op| op.operation_type == OperationType::Execution);
1819
1820        assert!(execution_op.is_some());
1821        let exec = execution_op.unwrap();
1822        assert_eq!(exec.status, OperationStatus::Started);
1823    }
1824
1825    // Tests for wait operation handling
1826    #[test]
1827    fn test_wait_operation_started_with_timestamp() {
1828        let mut wait_op = Operation::new("wait-1", OperationType::Wait);
1829        wait_op.status = OperationStatus::Started;
1830        wait_op.wait_details = Some(WaitDetails {
1831            scheduled_end_timestamp: Some(1234567890000),
1832        });
1833
1834        // Verify the wait details are accessible
1835        assert!(wait_op.wait_details.is_some());
1836        let details = wait_op.wait_details.as_ref().unwrap();
1837        assert_eq!(details.scheduled_end_timestamp, Some(1234567890000));
1838    }
1839
1840    #[test]
1841    fn test_wait_operation_completed() {
1842        let mut wait_op = Operation::new("wait-1", OperationType::Wait);
1843        wait_op.status = OperationStatus::Succeeded;
1844
1845        // Completed operations should be skipped
1846        assert!(wait_op.status.is_terminal());
1847    }
1848
1849    // Tests for step operation handling
1850    #[test]
1851    fn test_step_operation_pending_retry() {
1852        let mut step_op = Operation::new("step-1", OperationType::Step);
1853        step_op.status = OperationStatus::Pending;
1854        step_op.step_details = Some(StepDetails {
1855            result: None,
1856            attempt: Some(1),
1857            next_attempt_timestamp: Some(1234567890000),
1858            error: None,
1859            payload: None,
1860        });
1861
1862        // Verify the step details are accessible
1863        assert!(step_op.step_details.is_some());
1864        let details = step_op.step_details.as_ref().unwrap();
1865        assert_eq!(details.next_attempt_timestamp, Some(1234567890000));
1866        assert_eq!(details.attempt, Some(1));
1867    }
1868
1869    #[test]
1870    fn test_step_operation_succeeded() {
1871        let mut step_op = Operation::new("step-1", OperationType::Step);
1872        step_op.status = OperationStatus::Succeeded;
1873        step_op.step_details = Some(StepDetails {
1874            result: Some("\"result\"".to_string()),
1875            attempt: Some(0),
1876            next_attempt_timestamp: None,
1877            error: None,
1878            payload: None,
1879        });
1880
1881        // Completed operations should be skipped
1882        assert!(step_op.status.is_terminal());
1883    }
1884
1885    // Tests for callback operation handling
1886    #[test]
1887    fn test_callback_operation_started() {
1888        let mut callback_op = Operation::new("callback-1", OperationType::Callback);
1889        callback_op.status = OperationStatus::Started;
1890
1891        // Started callbacks are pending
1892        assert_eq!(callback_op.status, OperationStatus::Started);
1893        assert!(!callback_op.status.is_terminal());
1894    }
1895
1896    #[test]
1897    fn test_callback_operation_succeeded() {
1898        let mut callback_op = Operation::new("callback-1", OperationType::Callback);
1899        callback_op.status = OperationStatus::Succeeded;
1900
1901        // Completed callbacks should trigger re-invocation
1902        assert!(callback_op.status.is_terminal());
1903    }
1904
1905    // Tests for operation type dispatch
1906    #[test]
1907    fn test_operation_type_dispatch_wait() {
1908        let op = Operation::new("op-1", OperationType::Wait);
1909        assert_eq!(op.operation_type, OperationType::Wait);
1910    }
1911
1912    #[test]
1913    fn test_operation_type_dispatch_step() {
1914        let op = Operation::new("op-1", OperationType::Step);
1915        assert_eq!(op.operation_type, OperationType::Step);
1916    }
1917
1918    #[test]
1919    fn test_operation_type_dispatch_callback() {
1920        let op = Operation::new("op-1", OperationType::Callback);
1921        assert_eq!(op.operation_type, OperationType::Callback);
1922    }
1923
1924    #[test]
1925    fn test_operation_type_dispatch_execution() {
1926        let op = Operation::new("op-1", OperationType::Execution);
1927        assert_eq!(op.operation_type, OperationType::Execution);
1928    }
1929
1930    #[test]
1931    fn test_operation_type_dispatch_invoke() {
1932        let op = Operation::new("op-1", OperationType::Invoke);
1933        assert_eq!(op.operation_type, OperationType::Invoke);
1934    }
1935
1936    #[test]
1937    fn test_operation_type_dispatch_context() {
1938        let op = Operation::new("op-1", OperationType::Context);
1939        assert_eq!(op.operation_type, OperationType::Context);
1940    }
1941
1942    // Test earliest scheduled time calculation
1943    #[test]
1944    fn test_earliest_scheduled_time_single_wait() {
1945        let mut wait_op = Operation::new("wait-1", OperationType::Wait);
1946        wait_op.status = OperationStatus::Started;
1947        wait_op.wait_details = Some(WaitDetails {
1948            scheduled_end_timestamp: Some(1000),
1949        });
1950
1951        let operations = vec![wait_op];
1952
1953        let mut earliest: Option<i64> = None;
1954        for op in &operations {
1955            if op.operation_type == OperationType::Wait && op.status == OperationStatus::Started {
1956                if let Some(details) = &op.wait_details {
1957                    if let Some(end_ts) = details.scheduled_end_timestamp {
1958                        match earliest {
1959                            None => earliest = Some(end_ts),
1960                            Some(current) if end_ts < current => earliest = Some(end_ts),
1961                            _ => {}
1962                        }
1963                    }
1964                }
1965            }
1966        }
1967
1968        assert_eq!(earliest, Some(1000));
1969    }
1970
1971    #[test]
1972    fn test_earliest_scheduled_time_multiple_waits() {
1973        let mut wait_op1 = Operation::new("wait-1", OperationType::Wait);
1974        wait_op1.status = OperationStatus::Started;
1975        wait_op1.wait_details = Some(WaitDetails {
1976            scheduled_end_timestamp: Some(2000),
1977        });
1978
1979        let mut wait_op2 = Operation::new("wait-2", OperationType::Wait);
1980        wait_op2.status = OperationStatus::Started;
1981        wait_op2.wait_details = Some(WaitDetails {
1982            scheduled_end_timestamp: Some(1000),
1983        });
1984
1985        let mut wait_op3 = Operation::new("wait-3", OperationType::Wait);
1986        wait_op3.status = OperationStatus::Started;
1987        wait_op3.wait_details = Some(WaitDetails {
1988            scheduled_end_timestamp: Some(3000),
1989        });
1990
1991        let operations = vec![wait_op1, wait_op2, wait_op3];
1992
1993        let mut earliest: Option<i64> = None;
1994        for op in &operations {
1995            if op.operation_type == OperationType::Wait && op.status == OperationStatus::Started {
1996                if let Some(details) = &op.wait_details {
1997                    if let Some(end_ts) = details.scheduled_end_timestamp {
1998                        match earliest {
1999                            None => earliest = Some(end_ts),
2000                            Some(current) if end_ts < current => earliest = Some(end_ts),
2001                            _ => {}
2002                        }
2003                    }
2004                }
2005            }
2006        }
2007
2008        assert_eq!(earliest, Some(1000)); // Should be the earliest
2009    }
2010
2011    // Tests for schedule_invocation_at_timestamp
2012    #[tokio::test]
2013    async fn test_schedule_invocation_at_timestamp_schedules_function() {
2014        use super::*;
2015        use std::sync::Arc;
2016        use tokio::sync::RwLock;
2017
2018        // Create a mock checkpoint worker manager
2019        let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2020
2021        // Create an orchestrator with time skipping disabled
2022        let handler =
2023            |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2024
2025        let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2026        let mut orchestrator = TestExecutionOrchestrator::new(
2027            handler,
2028            operation_storage,
2029            checkpoint_api,
2030            SkipTimeConfig { enabled: false },
2031        );
2032
2033        // Schedule an invocation at a future timestamp
2034        let future_timestamp = chrono::Utc::now().timestamp_millis() + 1000;
2035        orchestrator.schedule_invocation_at_timestamp(future_timestamp, "exec-1", "wait-1");
2036
2037        // Verify that a function was scheduled
2038        assert!(orchestrator.has_scheduled_functions());
2039    }
2040
2041    #[tokio::test]
2042    async fn test_schedule_invocation_with_update_schedules_function() {
2043        use super::*;
2044        use std::sync::Arc;
2045        use tokio::sync::RwLock;
2046
2047        // Create a mock checkpoint worker manager
2048        let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2049
2050        // Create an orchestrator
2051        let handler =
2052            |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2053
2054        let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2055        let mut orchestrator = TestExecutionOrchestrator::new(
2056            handler,
2057            operation_storage,
2058            checkpoint_api,
2059            SkipTimeConfig { enabled: false },
2060        );
2061
2062        // Schedule an invocation with no timestamp (immediate)
2063        orchestrator.schedule_invocation_with_update(None, None);
2064
2065        // Verify that a function was scheduled
2066        assert!(orchestrator.has_scheduled_functions());
2067    }
2068
2069    #[tokio::test]
2070    async fn test_flush_scheduled_functions_clears_queue() {
2071        use super::*;
2072        use std::sync::Arc;
2073        use tokio::sync::RwLock;
2074
2075        // Create a mock checkpoint worker manager
2076        let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2077
2078        // Create an orchestrator
2079        let handler =
2080            |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2081
2082        let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2083        let mut orchestrator = TestExecutionOrchestrator::new(
2084            handler,
2085            operation_storage,
2086            checkpoint_api,
2087            SkipTimeConfig { enabled: false },
2088        );
2089
2090        // Schedule multiple invocations
2091        let future_timestamp = chrono::Utc::now().timestamp_millis() + 1000;
2092        orchestrator.schedule_invocation_at_timestamp(future_timestamp, "exec-1", "wait-1");
2093        orchestrator.schedule_invocation_at_timestamp(future_timestamp + 1000, "exec-1", "wait-2");
2094
2095        // Verify functions are scheduled
2096        assert!(orchestrator.has_scheduled_functions());
2097
2098        // Flush all scheduled functions
2099        orchestrator.flush_scheduled_functions();
2100
2101        // Verify queue is empty
2102        assert!(!orchestrator.has_scheduled_functions());
2103    }
2104
2105    #[tokio::test]
2106    async fn test_process_next_scheduled_processes_function() {
2107        use super::*;
2108        use std::sync::Arc;
2109        use tokio::sync::RwLock;
2110
2111        // Create a mock checkpoint worker manager
2112        let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2113
2114        // Create an orchestrator
2115        let handler =
2116            |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2117
2118        let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2119        let mut orchestrator = TestExecutionOrchestrator::new(
2120            handler,
2121            operation_storage,
2122            checkpoint_api,
2123            SkipTimeConfig { enabled: false },
2124        );
2125
2126        // Schedule an immediate invocation
2127        orchestrator.schedule_invocation_with_update(None, None);
2128
2129        // Process the scheduled function
2130        let processed = orchestrator.process_next_scheduled().await;
2131        assert!(processed);
2132
2133        // Queue should now be empty
2134        assert!(!orchestrator.has_scheduled_functions());
2135    }
2136
2137    #[tokio::test]
2138    async fn test_schedule_invocation_with_time_skipping_enabled() {
2139        use super::*;
2140        use std::sync::Arc;
2141        use tokio::sync::RwLock;
2142
2143        // Create a mock checkpoint worker manager
2144        let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2145
2146        // Create an orchestrator with time skipping enabled
2147        let handler =
2148            |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2149
2150        let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2151        let mut orchestrator = TestExecutionOrchestrator::new(
2152            handler,
2153            operation_storage,
2154            checkpoint_api,
2155            SkipTimeConfig { enabled: true },
2156        );
2157
2158        // Verify time skipping is enabled
2159        assert!(orchestrator.is_time_skipping_enabled());
2160
2161        // Schedule an invocation at a future timestamp
2162        let future_timestamp = chrono::Utc::now().timestamp_millis() + 5000;
2163        orchestrator.schedule_invocation_at_timestamp(future_timestamp, "exec-1", "wait-1");
2164
2165        // Verify that a function was scheduled
2166        assert!(orchestrator.has_scheduled_functions());
2167    }
2168
2169    // Tests for InvokeHandlerResult
2170    #[test]
2171    fn test_invoke_handler_result_succeeded() {
2172        let invocation = Invocation::with_start(chrono::Utc::now());
2173        let result: InvokeHandlerResult<String> = InvokeHandlerResult::Succeeded {
2174            result: "test result".to_string(),
2175            operations: vec![],
2176            invocation,
2177        };
2178
2179        match result {
2180            InvokeHandlerResult::Succeeded {
2181                result, operations, ..
2182            } => {
2183                assert_eq!(result, "test result");
2184                assert!(operations.is_empty());
2185            }
2186            _ => panic!("Expected Succeeded variant"),
2187        }
2188    }
2189
2190    #[test]
2191    fn test_invoke_handler_result_failed() {
2192        let invocation = Invocation::with_start(chrono::Utc::now());
2193        let error = TestResultError::new("TestError", "test error message");
2194        let result: InvokeHandlerResult<String> = InvokeHandlerResult::Failed {
2195            error,
2196            operations: vec![],
2197            invocation,
2198        };
2199
2200        match result {
2201            InvokeHandlerResult::Failed {
2202                error, operations, ..
2203            } => {
2204                assert_eq!(error.error_type, Some("TestError".to_string()));
2205                assert!(operations.is_empty());
2206            }
2207            _ => panic!("Expected Failed variant"),
2208        }
2209    }
2210
2211    #[test]
2212    fn test_invoke_handler_result_pending_with_reinvoke() {
2213        let invocation = Invocation::with_start(chrono::Utc::now());
2214        let result: InvokeHandlerResult<String> = InvokeHandlerResult::Pending {
2215            operations: vec![],
2216            invocation,
2217            should_reinvoke: true,
2218            advance_time_ms: Some(5000),
2219        };
2220
2221        match result {
2222            InvokeHandlerResult::Pending {
2223                should_reinvoke,
2224                advance_time_ms,
2225                ..
2226            } => {
2227                assert!(should_reinvoke);
2228                assert_eq!(advance_time_ms, Some(5000));
2229            }
2230            _ => panic!("Expected Pending variant"),
2231        }
2232    }
2233
2234    #[test]
2235    fn test_invoke_handler_result_pending_without_reinvoke() {
2236        let invocation = Invocation::with_start(chrono::Utc::now());
2237        let result: InvokeHandlerResult<String> = InvokeHandlerResult::Pending {
2238            operations: vec![],
2239            invocation,
2240            should_reinvoke: false,
2241            advance_time_ms: None,
2242        };
2243
2244        match result {
2245            InvokeHandlerResult::Pending {
2246                should_reinvoke,
2247                advance_time_ms,
2248                ..
2249            } => {
2250                assert!(!should_reinvoke);
2251                assert_eq!(advance_time_ms, None);
2252            }
2253            _ => panic!("Expected Pending variant"),
2254        }
2255    }
2256
2257    // Tests for invoke_handler method behavior
2258    #[tokio::test]
2259    async fn test_invoke_handler_creates_orchestrator_state() {
2260        use super::*;
2261        use std::sync::Arc;
2262        use tokio::sync::RwLock;
2263
2264        // Create a mock checkpoint worker manager
2265        let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2266
2267        // Create an orchestrator
2268        let handler =
2269            |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2270
2271        let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2272        let orchestrator = TestExecutionOrchestrator::new(
2273            handler,
2274            operation_storage,
2275            checkpoint_api,
2276            SkipTimeConfig { enabled: false },
2277        );
2278
2279        // Verify initial state
2280        assert!(orchestrator.execution_id().is_none());
2281        assert!(orchestrator.checkpoint_token().is_none());
2282        assert!(!orchestrator.is_execution_complete());
2283        assert!(!orchestrator.is_invocation_active());
2284    }
2285
2286    #[tokio::test]
2287    async fn test_invoke_handler_time_skip_mode_prevents_concurrent() {
2288        use super::*;
2289        use std::sync::Arc;
2290        use tokio::sync::RwLock;
2291
2292        // Create a mock checkpoint worker manager
2293        let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2294
2295        // Create an orchestrator with time skipping enabled
2296        let handler =
2297            |_input: String, _ctx: DurableContext| async move { Ok("result".to_string()) };
2298
2299        let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2300        let orchestrator = TestExecutionOrchestrator::new(
2301            handler,
2302            operation_storage,
2303            checkpoint_api,
2304            SkipTimeConfig { enabled: true },
2305        );
2306
2307        // Verify time skipping is enabled
2308        assert!(orchestrator.is_time_skipping_enabled());
2309
2310        // Initially no invocation is active
2311        assert!(!orchestrator.is_invocation_active());
2312    }
2313
2314    #[tokio::test]
2315    async fn test_invoke_handler_tracks_invocation_active_state() {
2316        use super::*;
2317        use std::sync::atomic::{AtomicBool, Ordering};
2318        use std::sync::Arc;
2319        use tokio::sync::RwLock;
2320
2321        // Create a mock checkpoint worker manager
2322        let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2323
2324        // Track whether we observed the invocation as active
2325        let was_active = Arc::new(AtomicBool::new(false));
2326        let was_active_clone = Arc::clone(&was_active);
2327
2328        // Create an orchestrator
2329        let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2330        let orchestrator = TestExecutionOrchestrator::new(
2331            move |_input: String, _ctx: DurableContext| {
2332                let was_active = Arc::clone(&was_active_clone);
2333                async move {
2334                    // This would be where we'd check if invocation is active
2335                    // but we can't access orchestrator from inside the handler
2336                    was_active.store(true, Ordering::SeqCst);
2337                    Ok("result".to_string())
2338                }
2339            },
2340            operation_storage,
2341            checkpoint_api,
2342            SkipTimeConfig { enabled: false },
2343        );
2344
2345        // Initially no invocation is active
2346        assert!(!orchestrator.is_invocation_active());
2347    }
2348}
2349
2350/// Property-based tests for TestExecutionOrchestrator
2351///
2352/// These tests verify the correctness properties defined in the design document.
2353#[cfg(test)]
2354mod property_tests {
2355    use super::*;
2356    use durable_execution_sdk::{OperationType, WaitDetails};
2357    use proptest::prelude::*;
2358
2359    /// Strategy for generating wait durations in seconds (1 to 60 seconds)
2360    fn wait_duration_strategy() -> impl Strategy<Value = u64> {
2361        1u64..=60
2362    }
2363
2364    /// Strategy for generating multiple wait durations
2365    fn multiple_wait_durations_strategy() -> impl Strategy<Value = Vec<u64>> {
2366        prop::collection::vec(wait_duration_strategy(), 1..=3)
2367    }
2368
2369    proptest! {
2370        /// **Feature: rust-testing-utilities, Property 19: Wait Operation Completion (Orchestrator)**
2371        ///
2372        /// *For any* wait operation with scheduled end timestamp T, when time skipping is enabled
2373        /// and time advances past T, the orchestrator SHALL mark the wait as SUCCEEDED and
2374        /// re-invoke the handler.
2375        ///
2376        /// This test verifies that:
2377        /// 1. Wait operations are tracked with their scheduled end timestamps (Req 16.1)
2378        /// 2. When time skipping is enabled and time advances past T, waits are marked SUCCEEDED (Req 16.2)
2379        /// 3. Time skipping uses tokio::time::advance() to skip wait durations instantly (Req 16.3)
2380        ///
2381        /// **Validates: Requirements 16.1, 16.2, 16.3**
2382        #[test]
2383        fn prop_wait_operation_completion(wait_seconds in wait_duration_strategy()) {
2384            // Use current_thread runtime which is required for tokio::time::pause()
2385            let rt = tokio::runtime::Builder::new_current_thread()
2386                .enable_all()
2387                .build()
2388                .unwrap();
2389
2390            rt.block_on(async {
2391                // Calculate the scheduled end timestamp
2392                let now_ms = chrono::Utc::now().timestamp_millis();
2393                let scheduled_end_ms = now_ms + (wait_seconds as i64 * 1000);
2394
2395                // Create a wait operation with the scheduled end timestamp
2396                let mut wait_op = Operation::new("wait-test", OperationType::Wait);
2397                wait_op.status = OperationStatus::Started;
2398                wait_op.wait_details = Some(WaitDetails {
2399                    scheduled_end_timestamp: Some(scheduled_end_ms),
2400                });
2401
2402                // Property 16.1: Wait operation should have scheduled end timestamp tracked
2403                prop_assert!(wait_op.wait_details.is_some());
2404                let details = wait_op.wait_details.as_ref().unwrap();
2405                prop_assert_eq!(details.scheduled_end_timestamp, Some(scheduled_end_ms));
2406
2407                // Create an orchestrator with time skipping enabled
2408                let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2409                let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2410
2411                let handler = |_input: String, _ctx: DurableContext| async move {
2412                    Ok("result".to_string())
2413                };
2414
2415                let mut orchestrator = TestExecutionOrchestrator::new(
2416                    handler,
2417                    operation_storage.clone(),
2418                    checkpoint_api,
2419                    SkipTimeConfig { enabled: true },
2420                );
2421
2422                // Property 16.3: Verify time skipping is enabled
2423                prop_assert!(orchestrator.is_time_skipping_enabled());
2424
2425                // Process the wait operation
2426                let operations = vec![wait_op.clone()];
2427                let result = orchestrator.process_operations(&operations, "exec-test");
2428
2429                // Property 16.1: Wait operation should be tracked as pending with scheduled time
2430                match result {
2431                    ProcessOperationsResult::ShouldReinvoke(advance_time_ms) => {
2432                        // When time skipping is enabled, we should get the time to advance
2433                        prop_assert!(
2434                            advance_time_ms.is_some(),
2435                            "Should have advance time when time skipping is enabled"
2436                        );
2437
2438                        // The advance time should be approximately the wait duration
2439                        // (may be slightly less due to time elapsed during test)
2440                        if let Some(advance_ms) = advance_time_ms {
2441                            // Allow some tolerance for test execution time
2442                            let expected_min = (wait_seconds as u64).saturating_sub(1) * 1000;
2443                            let expected_max = (wait_seconds as u64 + 1) * 1000;
2444                            prop_assert!(
2445                                advance_ms >= expected_min && advance_ms <= expected_max,
2446                                "Advance time {} should be approximately {} seconds ({}ms - {}ms)",
2447                                advance_ms, wait_seconds, expected_min, expected_max
2448                            );
2449                        }
2450                    }
2451                    ProcessOperationsResult::NoPendingOperations => {
2452                        // This is also acceptable if the wait was already processed
2453                        // (e.g., if the scheduled time has already passed)
2454                    }
2455                    other => {
2456                        prop_assert!(
2457                            false,
2458                            "Expected ShouldReinvoke or NoPendingOperations, got {:?}",
2459                            other
2460                        );
2461                    }
2462                }
2463
2464                // Verify the wait operation was tracked as pending
2465                prop_assert!(
2466                    orchestrator.pending_operations.contains("wait-test"),
2467                    "Wait operation should be tracked as pending"
2468                );
2469
2470                Ok(())
2471            })?;
2472        }
2473
2474        /// **Feature: rust-testing-utilities, Property 19: Wait Operation Completion (Multiple Waits)**
2475        ///
2476        /// *For any* set of wait operations with different scheduled end timestamps,
2477        /// the orchestrator SHALL process them in order of their scheduled times and
2478        /// return the earliest scheduled time for advancement.
2479        ///
2480        /// **Validates: Requirements 16.1, 16.2, 16.3**
2481        #[test]
2482        fn prop_wait_operation_completion_multiple_waits(
2483            wait_durations in multiple_wait_durations_strategy()
2484        ) {
2485            let rt = tokio::runtime::Builder::new_current_thread()
2486                .enable_all()
2487                .build()
2488                .unwrap();
2489
2490            rt.block_on(async {
2491                let now_ms = chrono::Utc::now().timestamp_millis();
2492
2493                // Create multiple wait operations with different scheduled end timestamps
2494                let mut operations = Vec::new();
2495                for (i, &duration) in wait_durations.iter().enumerate() {
2496                    let scheduled_end_ms = now_ms + (duration as i64 * 1000);
2497                    let mut wait_op = Operation::new(&format!("wait-{}", i), OperationType::Wait);
2498                    wait_op.status = OperationStatus::Started;
2499                    wait_op.wait_details = Some(WaitDetails {
2500                        scheduled_end_timestamp: Some(scheduled_end_ms),
2501                    });
2502                    operations.push(wait_op);
2503                }
2504
2505                // Create an orchestrator with time skipping enabled
2506                let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2507                let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2508
2509                let handler = |_input: String, _ctx: DurableContext| async move {
2510                    Ok("result".to_string())
2511                };
2512
2513                let mut orchestrator = TestExecutionOrchestrator::new(
2514                    handler,
2515                    operation_storage,
2516                    checkpoint_api,
2517                    SkipTimeConfig { enabled: true },
2518                );
2519
2520                // Process all wait operations
2521                let result = orchestrator.process_operations(&operations, "exec-test");
2522
2523                // Find the minimum wait duration (earliest scheduled time)
2524                let min_duration = wait_durations.iter().min().copied().unwrap_or(0);
2525
2526                match result {
2527                    ProcessOperationsResult::ShouldReinvoke(advance_time_ms) => {
2528                        // Property: Should return the earliest scheduled time
2529                        if let Some(advance_ms) = advance_time_ms {
2530                            // The advance time should be approximately the minimum wait duration
2531                            let expected_min = min_duration.saturating_sub(1) * 1000;
2532                            let expected_max = (min_duration + 1) * 1000;
2533                            prop_assert!(
2534                                advance_ms >= expected_min && advance_ms <= expected_max,
2535                                "Advance time {} should be approximately {} seconds (min duration)",
2536                                advance_ms, min_duration
2537                            );
2538                        }
2539                    }
2540                    ProcessOperationsResult::NoPendingOperations => {
2541                        // Acceptable if all waits were already processed
2542                    }
2543                    other => {
2544                        prop_assert!(
2545                            false,
2546                            "Expected ShouldReinvoke or NoPendingOperations, got {:?}",
2547                            other
2548                        );
2549                    }
2550                }
2551
2552                // Property: All wait operations should be tracked as pending
2553                for (i, _) in wait_durations.iter().enumerate() {
2554                    let op_id = format!("wait-{}", i);
2555                    prop_assert!(
2556                        orchestrator.pending_operations.contains(&op_id),
2557                        "Wait operation {} should be tracked as pending",
2558                        op_id
2559                    );
2560                }
2561
2562                Ok(())
2563            })?;
2564        }
2565
2566        /// **Feature: rust-testing-utilities, Property 19: Wait Operation Completion (Completed Waits)**
2567        ///
2568        /// *For any* wait operation that has already completed (status is terminal),
2569        /// the orchestrator SHALL NOT track it as pending and SHALL NOT schedule re-invocation.
2570        ///
2571        /// **Validates: Requirements 16.1, 16.2, 16.3**
2572        #[test]
2573        fn prop_wait_operation_completion_already_completed(wait_seconds in wait_duration_strategy()) {
2574            let rt = tokio::runtime::Builder::new_current_thread()
2575                .enable_all()
2576                .build()
2577                .unwrap();
2578
2579            rt.block_on(async {
2580                let now_ms = chrono::Utc::now().timestamp_millis();
2581                let scheduled_end_ms = now_ms + (wait_seconds as i64 * 1000);
2582
2583                // Create a wait operation that has already completed
2584                let mut wait_op = Operation::new("wait-completed", OperationType::Wait);
2585                wait_op.status = OperationStatus::Succeeded; // Already completed
2586                wait_op.wait_details = Some(WaitDetails {
2587                    scheduled_end_timestamp: Some(scheduled_end_ms),
2588                });
2589                wait_op.end_timestamp = Some(now_ms);
2590
2591                // Create an orchestrator with time skipping enabled
2592                let checkpoint_api = CheckpointWorkerManager::get_instance(None).unwrap();
2593                let operation_storage = Arc::new(RwLock::new(OperationStorage::new()));
2594
2595                let handler = |_input: String, _ctx: DurableContext| async move {
2596                    Ok("result".to_string())
2597                };
2598
2599                let mut orchestrator = TestExecutionOrchestrator::new(
2600                    handler,
2601                    operation_storage,
2602                    checkpoint_api,
2603                    SkipTimeConfig { enabled: true },
2604                );
2605
2606                // Process the completed wait operation
2607                let operations = vec![wait_op];
2608                let result = orchestrator.process_operations(&operations, "exec-test");
2609
2610                // Property: Completed wait should not trigger re-invocation
2611                match result {
2612                    ProcessOperationsResult::NoPendingOperations => {
2613                        // Expected: no pending operations since the wait is already completed
2614                    }
2615                    ProcessOperationsResult::ShouldReinvoke(_) => {
2616                        prop_assert!(
2617                            false,
2618                            "Completed wait operation should not trigger re-invocation"
2619                        );
2620                    }
2621                    other => {
2622                        prop_assert!(
2623                            false,
2624                            "Expected NoPendingOperations for completed wait, got {:?}",
2625                            other
2626                        );
2627                    }
2628                }
2629
2630                // Property: Completed wait should not be tracked as pending
2631                prop_assert!(
2632                    !orchestrator.pending_operations.contains("wait-completed"),
2633                    "Completed wait operation should not be tracked as pending"
2634                );
2635
2636                Ok(())
2637            })?;
2638        }
2639    }
2640}