Skip to main content

durable_execution_sdk_testing/
local_runner.rs

1//! Local test runner for durable executions.
2//!
3//! This module provides the `LocalDurableTestRunner` for executing and testing
4//! durable functions in-process with a simulated checkpoint backend.
5//!
6//! # Examples
7//!
8//! ```ignore
9//! use durable_execution_sdk_testing::{
10//!     LocalDurableTestRunner, TestEnvironmentConfig, ExecutionStatus,
11//! };
12//!
13//! #[tokio::test]
14//! async fn test_workflow() {
15//!     LocalDurableTestRunner::setup_test_environment(TestEnvironmentConfig {
16//!         skip_time: true,
17//!         checkpoint_delay: None,
18//!     }).await.unwrap();
19//!
20//!     let mut runner = LocalDurableTestRunner::new(my_workflow);
21//!     let result = runner.run("input".to_string()).await.unwrap();
22//!
23//!     assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
24//!
25//!     LocalDurableTestRunner::teardown_test_environment().await.unwrap();
26//! }
27//! ```
28
29use std::collections::HashMap;
30use std::future::Future;
31use std::marker::PhantomData;
32use std::pin::Pin;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::sync::Arc;
35
36use serde::de::DeserializeOwned;
37use serde::Serialize;
38use tokio::sync::RwLock;
39
40use crate::checkpoint_server::{
41    ApiType, CheckpointWorkerManager, CheckpointWorkerParams, InvocationResult, SkipTimeConfig,
42    StartDurableExecutionRequest, TestExecutionOrchestrator,
43};
44use crate::error::TestError;
45use crate::mock_client::MockDurableServiceClient;
46use crate::operation::CallbackSender;
47use crate::operation_handle::{OperationHandle, OperationMatcher};
48use crate::test_result::TestResult;
49use crate::types::{ExecutionStatus, Invocation, TestResultError};
50use durable_execution_sdk::{
51    DurableContext, DurableError, DurableServiceClient, ErrorObject, Operation,
52};
53
54/// Global flag indicating whether the test environment has been set up.
55static TEST_ENVIRONMENT_SETUP: AtomicBool = AtomicBool::new(false);
56
57/// Global flag indicating whether time skipping is enabled.
58static TIME_SKIPPING_ENABLED: AtomicBool = AtomicBool::new(false);
59
60/// A `CallbackSender` implementation that delegates to `CheckpointWorkerManager`.
61///
62/// This bridges the `CallbackSender` trait (used by `OperationHandle`) with the
63/// checkpoint server's callback API, enabling handles to send callback responses
64/// during mid-execution interaction.
65struct CheckpointCallbackSender {
66    checkpoint_worker: Arc<CheckpointWorkerManager>,
67}
68
69impl CheckpointCallbackSender {
70    fn new(checkpoint_worker: Arc<CheckpointWorkerManager>) -> Self {
71        Self { checkpoint_worker }
72    }
73}
74
75#[async_trait::async_trait]
76impl CallbackSender for CheckpointCallbackSender {
77    async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError> {
78        self.checkpoint_worker
79            .send_callback_success(callback_id, result)
80            .await
81            .map_err(|e| TestError::CheckpointServerError(e.to_string()))
82    }
83
84    async fn send_failure(
85        &self,
86        callback_id: &str,
87        error: &TestResultError,
88    ) -> Result<(), TestError> {
89        let error_obj = ErrorObject::new(
90            error.error_type.clone().unwrap_or_default(),
91            error.error_message.clone().unwrap_or_default(),
92        );
93        self.checkpoint_worker
94            .send_callback_failure(callback_id, &error_obj)
95            .await
96            .map_err(|e| TestError::CheckpointServerError(e.to_string()))
97    }
98
99    async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError> {
100        self.checkpoint_worker
101            .send_callback_heartbeat(callback_id)
102            .await
103            .map_err(|e| TestError::CheckpointServerError(e.to_string()))
104    }
105}
106
107/// Configuration for setting up the test environment.
108///
109/// # Examples
110///
111/// ```
112/// use durable_execution_sdk_testing::TestEnvironmentConfig;
113///
114/// let config = TestEnvironmentConfig {
115///     skip_time: true,
116///     checkpoint_delay: None,
117/// };
118/// ```
119#[derive(Debug, Clone)]
120pub struct TestEnvironmentConfig {
121    /// Enable time skipping for faster test execution.
122    ///
123    /// When enabled, wait operations complete instantly without blocking.
124    pub skip_time: bool,
125
126    /// Optional simulated checkpoint delay in milliseconds.
127    ///
128    /// If set, checkpoint operations will be delayed by this amount.
129    pub checkpoint_delay: Option<u64>,
130}
131
132impl Default for TestEnvironmentConfig {
133    fn default() -> Self {
134        Self {
135            skip_time: true,
136            checkpoint_delay: None,
137        }
138    }
139}
140
141/// Internal storage for operations during test execution.
142#[derive(Debug, Default)]
143struct OperationStorage {
144    /// All operations in execution order
145    operations: Vec<Operation>,
146    /// Map from operation ID to index in operations vec
147    operations_by_id: HashMap<String, usize>,
148    /// Map from operation name to indices in operations vec
149    operations_by_name: HashMap<String, Vec<usize>>,
150}
151
152impl OperationStorage {
153    fn new() -> Self {
154        Self::default()
155    }
156
157    fn add_operation(&mut self, operation: Operation) {
158        let index = self.operations.len();
159        let id = operation.operation_id.clone();
160        let name = operation.name.clone();
161
162        self.operations.push(operation);
163        self.operations_by_id.insert(id, index);
164
165        if let Some(name) = name {
166            self.operations_by_name.entry(name).or_default().push(index);
167        }
168    }
169
170    fn get_by_id(&self, id: &str) -> Option<&Operation> {
171        self.operations_by_id
172            .get(id)
173            .and_then(|&idx| self.operations.get(idx))
174    }
175
176    fn get_by_name(&self, name: &str) -> Option<&Operation> {
177        self.operations_by_name
178            .get(name)
179            .and_then(|indices| indices.first())
180            .and_then(|&idx| self.operations.get(idx))
181    }
182
183    fn get_by_name_and_index(&self, name: &str, index: usize) -> Option<&Operation> {
184        self.operations_by_name
185            .get(name)
186            .and_then(|indices| indices.get(index))
187            .and_then(|&idx| self.operations.get(idx))
188    }
189
190    fn get_by_index(&self, index: usize) -> Option<&Operation> {
191        self.operations.get(index)
192    }
193
194    fn get_all(&self) -> &[Operation] {
195        &self.operations
196    }
197
198    fn clear(&mut self) {
199        self.operations.clear();
200        self.operations_by_id.clear();
201        self.operations_by_name.clear();
202    }
203
204    #[allow(dead_code)]
205    fn len(&self) -> usize {
206        self.operations.len()
207    }
208
209    #[allow(dead_code)]
210    fn is_empty(&self) -> bool {
211        self.operations.is_empty()
212    }
213}
214
215/// Type alias for a shared async function that can be cloned.
216type SharedAsyncFn<I, O> = Arc<
217    dyn Fn(I, DurableContext) -> Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
218        + Send
219        + Sync,
220>;
221
222/// Type alias for a boxed durable function to reduce type complexity.
223type BoxedDurableFn = Box<
224    dyn Fn(
225            serde_json::Value,
226            DurableContext,
227        ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, DurableError>> + Send>>
228        + Send
229        + Sync,
230>;
231
232/// A registered function for invoke testing.
233/// Note: The function fields are stored for future use when invoke operations
234/// are fully implemented. Currently marked with allow(dead_code).
235#[allow(dead_code)]
236enum RegisteredFunction {
237    /// A durable function that takes a DurableContext
238    Durable(BoxedDurableFn),
239    /// A regular function that doesn't need a DurableContext
240    Regular(
241        Box<dyn Fn(serde_json::Value) -> Result<serde_json::Value, DurableError> + Send + Sync>,
242    ),
243}
244
245/// Local test runner for durable executions.
246///
247/// Executes durable handlers in-process with a checkpoint server running in a
248/// separate thread, allowing rapid iteration during development without AWS deployment.
249///
250/// The runner uses a CheckpointWorkerManager to manage the checkpoint server thread,
251/// matching the Node.js SDK's architecture for consistent cross-SDK behavior.
252///
253/// # Type Parameters
254///
255/// * `H` - The handler function type
256/// * `I` - The input type (must be deserializable)
257/// * `O` - The output type (must be serializable)
258///
259/// # Examples
260///
261/// ```ignore
262/// use durable_execution_sdk_testing::LocalDurableTestRunner;
263///
264/// async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
265///     let result = ctx.step(|_| Ok(format!("processed: {}", input)), None).await?;
266///     Ok(result)
267/// }
268///
269/// let mut runner = LocalDurableTestRunner::new(my_workflow);
270/// let result = runner.run("hello".to_string()).await.unwrap();
271/// assert_eq!(result.get_result().unwrap(), "processed: hello");
272/// ```
273pub struct LocalDurableTestRunner<I, O>
274where
275    I: DeserializeOwned + Send + 'static,
276    O: Serialize + DeserializeOwned + Send + 'static,
277{
278    /// The handler function to execute (shared for use with orchestrator)
279    handler: SharedAsyncFn<I, O>,
280    /// The checkpoint worker manager (manages checkpoint server thread)
281    checkpoint_worker: Arc<CheckpointWorkerManager>,
282    /// Legacy mock client for backward compatibility (deprecated)
283    #[deprecated(note = "Use checkpoint_worker instead. Retained for backward compatibility.")]
284    mock_client: Arc<MockDurableServiceClient>,
285    /// Storage for captured operations
286    operation_storage: Arc<RwLock<OperationStorage>>,
287    /// Registered functions for chained invoke testing
288    registered_functions: Arc<RwLock<HashMap<String, RegisteredFunction>>>,
289    /// Pre-registered operation handles for lazy population during execution
290    registered_handles: Vec<OperationHandle>,
291    /// Shared operations list for child operation enumeration across handles
292    shared_operations: Arc<RwLock<Vec<Operation>>>,
293    /// Phantom data for type parameters
294    _phantom: PhantomData<(I, O)>,
295}
296
297impl<I, O> LocalDurableTestRunner<I, O>
298where
299    I: DeserializeOwned + Send + Serialize + 'static,
300    O: Serialize + DeserializeOwned + Send + 'static,
301{
302    /// Sets up the test environment for durable execution testing.
303    ///
304    /// This method should be called once before running tests. It configures
305    /// time control and other test infrastructure.
306    ///
307    /// # Arguments
308    ///
309    /// * `config` - Configuration for the test environment
310    ///
311    /// # Requirements
312    ///
313    /// - 2.1: WHEN time skipping is enabled via setup_test_environment(),
314    ///   THE Local_Test_Runner SHALL use Tokio's time manipulation to skip wait durations
315    /// - 2.3: WHEN time skipping is disabled, THE Local_Test_Runner SHALL execute
316    ///   wait operations with real timing
317    ///
318    /// # Examples
319    ///
320    /// ```ignore
321    /// use durable_execution_sdk_testing::{LocalDurableTestRunner, TestEnvironmentConfig};
322    ///
323    /// LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
324    ///     skip_time: true,
325    ///     checkpoint_delay: None,
326    /// }).await.unwrap();
327    /// ```
328    pub async fn setup_test_environment(config: TestEnvironmentConfig) -> Result<(), TestError> {
329        // Enable time skipping if configured
330        // Note: Each test with #[tokio::test(flavor = "current_thread")] has its own runtime,
331        // so we always need to pause time in the current runtime, regardless of global state.
332        if config.skip_time {
333            // Pause tokio time to enable instant time advancement
334            // Note: tokio::time::pause() requires current_thread runtime
335            // We use catch_unwind to handle the case where we're in a multi-threaded runtime
336            // or if time is already paused in this runtime
337            use std::panic;
338            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
339                tokio::time::pause();
340            }));
341
342            if let Err(panic_info) = result {
343                // Check if the panic was because we're not in a current_thread runtime
344                let is_runtime_error = panic_info
345                    .downcast_ref::<&str>()
346                    .map(|msg| msg.contains("current_thread"))
347                    .unwrap_or(false)
348                    || panic_info
349                        .downcast_ref::<String>()
350                        .map(|msg| msg.contains("current_thread"))
351                        .unwrap_or(false);
352
353                // Check if time is already frozen (this is fine, just continue)
354                let is_already_frozen = panic_info
355                    .downcast_ref::<&str>()
356                    .map(|msg| msg.contains("already frozen") || msg.contains("already paused"))
357                    .unwrap_or(false)
358                    || panic_info
359                        .downcast_ref::<String>()
360                        .map(|msg| msg.contains("already frozen") || msg.contains("already paused"))
361                        .unwrap_or(false);
362
363                if is_runtime_error {
364                    // We're not in a current_thread runtime, so time control won't work
365                    tracing::warn!(
366                        "Time control requires current_thread Tokio runtime. \
367                         Time skipping may not work correctly."
368                    );
369                } else if is_already_frozen {
370                    // Time is already frozen in this runtime, which is fine
371                    // This can happen if setup is called multiple times
372                } else {
373                    // Re-panic for other errors
374                    panic::resume_unwind(panic_info);
375                }
376            }
377            TIME_SKIPPING_ENABLED.store(true, Ordering::SeqCst);
378        }
379
380        TEST_ENVIRONMENT_SETUP.store(true, Ordering::SeqCst);
381        Ok(())
382    }
383
384    /// Tears down the test environment.
385    ///
386    /// This method should be called after tests complete to restore normal
387    /// time behavior and clean up test infrastructure.
388    ///
389    /// # Requirements
390    ///
391    /// - 2.4: WHEN teardown_test_environment() is called, THE Local_Test_Runner
392    ///   SHALL restore normal time behavior
393    ///
394    /// # Examples
395    ///
396    /// ```ignore
397    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
398    ///
399    /// LocalDurableTestRunner::<String, String>::teardown_test_environment().await.unwrap();
400    /// ```
401    pub async fn teardown_test_environment() -> Result<(), TestError> {
402        // Check if set up
403        if !TEST_ENVIRONMENT_SETUP.load(Ordering::SeqCst) {
404            return Ok(());
405        }
406
407        // Resume tokio time if it was paused
408        // Note: tokio::time::resume() panics if time was never paused,
409        // so we need to check if time is actually paused before resuming
410        if TIME_SKIPPING_ENABLED.load(Ordering::SeqCst) {
411            // Only resume if time is actually paused (check using is_time_paused helper)
412            // Use catch_unwind to handle the case where we're in a multi-threaded runtime
413            use std::panic;
414            let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
415                if crate::time_control::is_time_paused() {
416                    tokio::time::resume();
417                }
418            }));
419            TIME_SKIPPING_ENABLED.store(false, Ordering::SeqCst);
420        }
421
422        TEST_ENVIRONMENT_SETUP.store(false, Ordering::SeqCst);
423        Ok(())
424    }
425
426    /// Returns whether the test environment has been set up.
427    pub fn is_environment_setup() -> bool {
428        TEST_ENVIRONMENT_SETUP.load(Ordering::SeqCst)
429    }
430
431    /// Returns whether time skipping is enabled.
432    pub fn is_time_skipping_enabled() -> bool {
433        TIME_SKIPPING_ENABLED.load(Ordering::SeqCst)
434    }
435
436    /// Creates a new local test runner with the given handler.
437    ///
438    /// # Arguments
439    ///
440    /// * `handler` - An async function that takes input and DurableContext
441    ///
442    /// # Requirements
443    ///
444    /// - 1.1: WHEN a developer creates a Local_Test_Runner with a handler function,
445    ///   THE Local_Test_Runner SHALL accept any async function that takes a payload and DurableContext
446    /// - 9.1: WHEN a developer creates a Local_Test_Runner, THE Local_Test_Runner
447    ///   SHALL spawn a checkpoint server in a separate thread
448    ///
449    /// # Examples
450    ///
451    /// ```ignore
452    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
453    ///
454    /// async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
455    ///     Ok(input)
456    /// }
457    ///
458    /// let runner = LocalDurableTestRunner::new(my_workflow);
459    /// ```
460    pub fn new<F, Fut>(handler: F) -> Self
461    where
462        F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
463        Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
464    {
465        // Wrap handler in Arc for sharing with orchestrator
466        let handler: SharedAsyncFn<I, O> = Arc::new(move |input: I, ctx: DurableContext| {
467            let fut = handler(input, ctx);
468            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
469        });
470
471        // Get or create the checkpoint worker manager singleton
472        // If it fails (e.g., in test scenarios with poisoned locks), create a fresh instance
473        let checkpoint_worker = match CheckpointWorkerManager::get_instance(None) {
474            Ok(worker) => worker,
475            Err(_) => {
476                // Reset and try again
477                CheckpointWorkerManager::reset_instance_for_testing();
478                CheckpointWorkerManager::get_instance(None)
479                    .expect("Failed to create CheckpointWorkerManager after reset")
480            }
481        };
482
483        #[allow(deprecated)]
484        Self {
485            handler,
486            checkpoint_worker,
487            mock_client: Arc::new(MockDurableServiceClient::new().with_checkpoint_responses(100)),
488            operation_storage: Arc::new(RwLock::new(OperationStorage::new())),
489            registered_functions: Arc::new(RwLock::new(HashMap::new())),
490            registered_handles: Vec::new(),
491            shared_operations: Arc::new(RwLock::new(Vec::new())),
492            _phantom: PhantomData,
493        }
494    }
495
496    /// Creates a new local test runner with a custom mock client.
497    ///
498    /// # Arguments
499    ///
500    /// * `handler` - An async function that takes input and DurableContext
501    /// * `mock_client` - A pre-configured mock client (deprecated, use checkpoint_worker)
502    #[deprecated(note = "Use new() instead. The checkpoint worker manager is now the default.")]
503    pub fn with_mock_client<F, Fut>(handler: F, mock_client: MockDurableServiceClient) -> Self
504    where
505        F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
506        Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
507    {
508        // Wrap handler in Arc for sharing with orchestrator
509        let handler: SharedAsyncFn<I, O> = Arc::new(move |input: I, ctx: DurableContext| {
510            let fut = handler(input, ctx);
511            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
512        });
513
514        // Get or create the checkpoint worker manager singleton
515        let checkpoint_worker = CheckpointWorkerManager::get_instance(None)
516            .expect("Failed to create CheckpointWorkerManager");
517
518        #[allow(deprecated)]
519        Self {
520            handler,
521            checkpoint_worker,
522            mock_client: Arc::new(mock_client),
523            operation_storage: Arc::new(RwLock::new(OperationStorage::new())),
524            registered_functions: Arc::new(RwLock::new(HashMap::new())),
525            registered_handles: Vec::new(),
526            shared_operations: Arc::new(RwLock::new(Vec::new())),
527            _phantom: PhantomData,
528        }
529    }
530
531    /// Creates a new local test runner with custom checkpoint worker parameters.
532    ///
533    /// # Arguments
534    ///
535    /// * `handler` - An async function that takes input and DurableContext
536    /// * `params` - Configuration parameters for the checkpoint worker
537    ///
538    /// # Requirements
539    ///
540    /// - 9.2: WHEN the checkpoint server is running, THE Checkpoint_Worker_Manager
541    ///   SHALL manage the lifecycle of the server thread
542    pub fn with_checkpoint_params<F, Fut>(handler: F, params: CheckpointWorkerParams) -> Self
543    where
544        F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
545        Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
546    {
547        // Wrap handler in Arc for sharing with orchestrator
548        let handler: SharedAsyncFn<I, O> = Arc::new(move |input: I, ctx: DurableContext| {
549            let fut = handler(input, ctx);
550            Box::pin(fut) as Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
551        });
552
553        // Get or create the checkpoint worker manager singleton with custom params
554        let checkpoint_worker = CheckpointWorkerManager::get_instance(Some(params))
555            .expect("Failed to create CheckpointWorkerManager");
556
557        #[allow(deprecated)]
558        Self {
559            handler,
560            checkpoint_worker,
561            mock_client: Arc::new(MockDurableServiceClient::new().with_checkpoint_responses(100)),
562            operation_storage: Arc::new(RwLock::new(OperationStorage::new())),
563            registered_functions: Arc::new(RwLock::new(HashMap::new())),
564            registered_handles: Vec::new(),
565            shared_operations: Arc::new(RwLock::new(Vec::new())),
566            _phantom: PhantomData,
567        }
568    }
569
570    /// Returns a reference to the checkpoint worker manager.
571    pub fn checkpoint_worker(&self) -> &Arc<CheckpointWorkerManager> {
572        &self.checkpoint_worker
573    }
574
575    /// Returns a reference to the mock client (deprecated).
576    #[deprecated(note = "Use checkpoint_worker() instead.")]
577    #[allow(deprecated)]
578    pub fn mock_client(&self) -> &Arc<MockDurableServiceClient> {
579        &self.mock_client
580    }
581
582    /// Returns the number of captured operations.
583    pub async fn operation_count(&self) -> usize {
584        self.operation_storage.read().await.len()
585    }
586
587    /// Executes the handler and returns a `RunFuture` that resolves to the test result.
588    ///
589    /// Accepts either a raw payload or an `InvokeRequest` wrapper (via `impl Into<InvokeRequest<I>>`).
590    /// The execution is spawned as a tokio task so callers can `await` the result
591    /// concurrently with `OperationHandle` interactions (e.g., mid-execution callbacks).
592    ///
593    /// # Arguments
594    ///
595    /// * `input` - A payload or `InvokeRequest<I>` to pass to the handler.
596    ///   Raw payloads are automatically wrapped via `From<I> for InvokeRequest<I>`.
597    ///
598    /// # Requirements
599    ///
600    /// - 1.2: WHEN a developer calls run() with a payload, THE Local_Test_Runner
601    ///   SHALL execute the handler function and return a Test_Result
602    /// - 1.3: WHEN the handler function completes successfully, THE Test_Result
603    ///   SHALL contain the execution result and status SUCCEEDED
604    /// - 1.4: WHEN the handler function fails with an error, THE Test_Result
605    ///   SHALL contain the error details and status FAILED
606    /// - 1.5: WHEN the handler function performs durable operations, THE Local_Test_Runner
607    ///   SHALL capture all operations in the Test_Result
608    /// - 2.1: WHEN a developer calls run(), THE Local_Test_Runner SHALL return a future
609    ///   that resolves to a Test_Result when the handler completes
610    /// - 9.1: THE Local_Test_Runner SHALL accept an InvokeRequest struct with an optional
611    ///   payload field for the run() method
612    /// - 9.2: WHEN run() is called with an InvokeRequest containing a payload,
613    ///   THE Local_Test_Runner SHALL pass the payload to the handler function
614    /// - 9.3: WHEN run() is called with an InvokeRequest containing no payload,
615    ///   THE Local_Test_Runner SHALL use a default empty payload
616    ///
617    /// # Examples
618    ///
619    /// ```ignore
620    /// use durable_execution_sdk_testing::{LocalDurableTestRunner, InvokeRequest};
621    ///
622    /// let mut runner = LocalDurableTestRunner::new(my_workflow);
623    ///
624    /// // With a raw payload (backward compatible)
625    /// let result = runner.run("hello".to_string()).await.unwrap();
626    ///
627    /// // With an InvokeRequest
628    /// let result = runner.run(InvokeRequest::with_payload("hello".to_string())).await.unwrap();
629    ///
630    /// // Concurrent interaction with operation handles
631    /// let handle = runner.get_operation_handle("my-callback");
632    /// let run_future = runner.run("input".to_string());
633    /// handle.wait_for_data(WaitingOperationStatus::Submitted).await.unwrap();
634    /// handle.send_callback_success("result").await.unwrap();
635    /// let result = run_future.await.unwrap();
636    /// ```
637    pub fn run(
638        &mut self,
639        input: impl Into<crate::types::InvokeRequest<I>>,
640    ) -> crate::run_future::RunFuture<O>
641    where
642        I: Clone + Send + Sync + 'static,
643        O: Send + 'static,
644    {
645        let invoke_request: crate::types::InvokeRequest<I> = input.into();
646
647        // Extract payload from InvokeRequest.
648        // When payload is None (InvokeRequest::new()), attempt to create a default
649        // by deserializing from JSON null. This works for types like String (""),
650        // serde_json::Value (Null), Option<T> (None), etc.
651        let payload_result: Result<I, _> = match invoke_request.payload {
652            Some(p) => Ok(p),
653            None => serde_json::from_value(serde_json::Value::Null),
654        };
655
656        // If we can't create a default payload, return an error future immediately
657        let payload = match payload_result {
658            Ok(p) => p,
659            Err(e) => {
660                return crate::run_future::RunFuture::from_future(Box::pin(async move {
661                    Err(TestError::InvalidConfiguration(format!(
662                        "InvokeRequest has no payload and the input type cannot be \
663                         deserialized from null: {}. Use InvokeRequest::with_payload() \
664                         to provide a value.",
665                        e
666                    )))
667                }));
668            }
669        };
670
671        // Clear previous state before starting execution
672        // We use try_write to avoid blocking; the async block will also clear.
673        if let Ok(mut storage) = self.operation_storage.try_write() {
674            storage.clear();
675        }
676        #[allow(deprecated)]
677        self.mock_client.clear_all_calls();
678
679        // Capture all the state we need for the async execution
680        let handler = Arc::clone(&self.handler);
681        let checkpoint_worker = self.checkpoint_worker.clone();
682        let operation_storage = self.operation_storage.clone();
683        let registered_handles = self.registered_handles.clone();
684        let shared_operations = self.shared_operations.clone();
685
686        crate::run_future::RunFuture::from_future(Box::pin(async move {
687            use crate::checkpoint_server::OperationStorage as OrchestratorOperationStorage;
688
689            // Ensure operation storage is cleared
690            operation_storage.write().await.clear();
691
692            // Configure time skipping based on test environment settings
693            let skip_time_config = SkipTimeConfig {
694                enabled: LocalDurableTestRunner::<I, O>::is_time_skipping_enabled(),
695            };
696
697            // Create shared operation storage for the orchestrator
698            let orchestrator_storage =
699                Arc::new(tokio::sync::RwLock::new(OrchestratorOperationStorage::new()));
700
701            // Create the orchestrator with the shared handler
702            let handler_clone = Arc::clone(&handler);
703            let mut orchestrator = TestExecutionOrchestrator::new(
704                move |input: I, ctx: DurableContext| {
705                    let handler = Arc::clone(&handler_clone);
706                    async move { handler(input, ctx).await }
707                },
708                orchestrator_storage.clone(),
709                checkpoint_worker.clone(),
710                skip_time_config,
711            );
712
713            // Pass pre-registered operation handles to the orchestrator
714            if !registered_handles.is_empty() {
715                let callback_sender: Option<Arc<dyn CallbackSender>> = Some(Arc::new(
716                    CheckpointCallbackSender::new(checkpoint_worker.clone()),
717                ));
718                orchestrator = orchestrator.with_handles(
719                    registered_handles,
720                    shared_operations,
721                    callback_sender,
722                );
723            }
724
725            // Execute using the orchestrator
726            let execution_result = orchestrator.execute_handler(payload).await?;
727
728            // Copy operations from orchestrator storage to our storage
729            {
730                let orch_storage = orchestrator_storage.read().await;
731                let mut our_storage = operation_storage.write().await;
732                for op in orch_storage.get_all() {
733                    our_storage.add_operation(op.clone());
734                }
735            }
736
737            // Convert TestExecutionResult to TestResult
738            let mut test_result = match execution_result.status {
739                ExecutionStatus::Succeeded => {
740                    if let Some(result) = execution_result.result {
741                        TestResult::success(result, execution_result.operations)
742                    } else {
743                        TestResult::with_status(
744                            ExecutionStatus::Succeeded,
745                            execution_result.operations,
746                        )
747                    }
748                }
749                ExecutionStatus::Failed => {
750                    if let Some(error) = execution_result.error {
751                        TestResult::failure(error, execution_result.operations)
752                    } else {
753                        TestResult::with_status(
754                            ExecutionStatus::Failed,
755                            execution_result.operations,
756                        )
757                    }
758                }
759                ExecutionStatus::Running => {
760                    TestResult::with_status(ExecutionStatus::Running, execution_result.operations)
761                }
762                _ => TestResult::with_status(execution_result.status, execution_result.operations),
763            };
764
765            // Add invocations
766            for invocation in execution_result.invocations {
767                test_result.add_invocation(invocation);
768            }
769
770            // Retrieve and add Node.js-compatible history events
771            if let Ok(nodejs_events) = checkpoint_worker
772                .get_nodejs_history_events(&execution_result.execution_id)
773                .await
774            {
775                test_result.set_nodejs_history_events(nodejs_events);
776            }
777
778            Ok(test_result)
779        }))
780    }
781
782    /// Executes the handler with a single invocation (no re-invocation on suspend).
783    ///
784    /// This method performs a single handler invocation without using the orchestrator.
785    /// If the handler suspends (e.g., due to a wait operation), the result will have
786    /// status `Running` and the execution will not be automatically resumed.
787    ///
788    /// Use this method when you want to test the initial invocation behavior without
789    /// automatic wait completion and re-invocation.
790    ///
791    /// # Arguments
792    ///
793    /// * `payload` - The input payload to pass to the handler
794    ///
795    /// # Examples
796    ///
797    /// ```ignore
798    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
799    ///
800    /// let mut runner = LocalDurableTestRunner::new(my_workflow_with_wait);
801    /// let result = runner.run_single_invocation("hello".to_string()).await.unwrap();
802    ///
803    /// // Handler suspended on wait, execution is still running
804    /// assert_eq!(result.get_status(), ExecutionStatus::Running);
805    /// ```
806    pub async fn run_single_invocation(&mut self, payload: I) -> Result<TestResult<O>, TestError> {
807        use durable_execution_sdk::lambda::InitialExecutionState;
808        use durable_execution_sdk::state::ExecutionState;
809
810        // Clear previous operations
811        self.operation_storage.write().await.clear();
812
813        // Also clear mock client state for backward compatibility
814        #[allow(deprecated)]
815        self.mock_client.clear_all_calls();
816
817        // Serialize the payload for the checkpoint server
818        let payload_json = serde_json::to_string(&payload)?;
819
820        // Start execution with the checkpoint server
821        // This registers the execution and returns a checkpoint token
822        let invocation_id = uuid::Uuid::new_v4().to_string();
823        let start_request = StartDurableExecutionRequest {
824            invocation_id: invocation_id.clone(),
825            payload: Some(payload_json),
826        };
827        let start_payload = serde_json::to_string(&start_request)?;
828
829        let start_response = self
830            .checkpoint_worker
831            .send_api_request(ApiType::StartDurableExecution, start_payload)
832            .await?;
833
834        if let Some(error) = start_response.error {
835            return Err(TestError::CheckpointServerError(error));
836        }
837
838        let invocation_result: InvocationResult =
839            serde_json::from_str(&start_response.payload.ok_or_else(|| {
840                TestError::CheckpointServerError(
841                    "Empty response from checkpoint server".to_string(),
842                )
843            })?)?;
844
845        let execution_arn = invocation_result.execution_id;
846        let checkpoint_token = invocation_result.checkpoint_token;
847
848        // Create initial execution state (empty for new execution)
849        let initial_state = InitialExecutionState::new();
850
851        // Create the execution state with the checkpoint worker manager
852        // The checkpoint worker implements DurableServiceClient trait
853        let execution_state = Arc::new(ExecutionState::new(
854            &execution_arn,
855            &checkpoint_token,
856            initial_state,
857            self.checkpoint_worker.clone(),
858        ));
859
860        // Create the durable context
861        let ctx = DurableContext::new(execution_state.clone());
862
863        // Record invocation start
864        let start_time = chrono::Utc::now();
865        let mut invocation = Invocation::with_start(start_time);
866
867        // Execute the handler
868        let handler_result = (self.handler)(payload, ctx).await;
869
870        // Record invocation end
871        let end_time = chrono::Utc::now();
872        invocation = invocation.with_end(end_time);
873
874        // Retrieve operations from the checkpoint server
875        let operations = match self
876            .checkpoint_worker
877            .get_operations(&execution_arn, "")
878            .await
879        {
880            Ok(response) => {
881                let mut storage = self.operation_storage.write().await;
882                for op in &response.operations {
883                    storage.add_operation(op.clone());
884                }
885                response.operations
886            }
887            Err(_) => {
888                // If we can't get operations from checkpoint server, return empty list
889                Vec::new()
890            }
891        };
892
893        // Build the test result based on handler outcome
894        match handler_result {
895            Ok(result) => {
896                let mut test_result = TestResult::success(result, operations);
897                test_result.add_invocation(invocation);
898                Ok(test_result)
899            }
900            Err(error) => {
901                // Check if this is a suspend error (which means pending, not failed)
902                if error.is_suspend() {
903                    let mut test_result =
904                        TestResult::with_status(ExecutionStatus::Running, operations);
905                    test_result.add_invocation(invocation);
906                    Ok(test_result)
907                } else {
908                    // Convert DurableError to ErrorObject to get the error type
909                    let error_obj = durable_execution_sdk::ErrorObject::from(&error);
910                    let test_error = TestResultError::new(error_obj.error_type, error.to_string());
911                    invocation = invocation.with_error(test_error.clone());
912                    let mut test_result = TestResult::failure(test_error, operations);
913                    test_result.add_invocation(invocation);
914                    Ok(test_result)
915                }
916            }
917        }
918    }
919
920    /// Executes the handler with the given payload using the TestExecutionOrchestrator.
921    ///
922    /// This method uses the TestExecutionOrchestrator to manage the full execution
923    /// lifecycle, including:
924    /// - Polling for checkpoint updates
925    /// - Processing wait operations and scheduling re-invocations
926    /// - Handling time skipping for wait operations
927    /// - Managing callback completions
928    ///
929    /// This is the recommended method for testing workflows with wait operations,
930    /// as it properly handles the full execution lifecycle including re-invocations.
931    ///
932    /// # Arguments
933    ///
934    /// * `payload` - The input payload to pass to the handler
935    ///
936    /// # Requirements
937    ///
938    /// - 16.1: WHEN a wait operation is encountered, THE Test_Execution_Orchestrator
939    ///   SHALL track the wait's scheduled end timestamp
940    /// - 16.2: WHEN time skipping is enabled and a wait's scheduled end time is reached,
941    ///   THE Test_Execution_Orchestrator SHALL mark the wait as SUCCEEDED and schedule
942    ///   handler re-invocation
943    /// - 16.3: WHEN time skipping is enabled, THE Test_Execution_Orchestrator SHALL use
944    ///   tokio::time::advance() to skip wait durations instantly
945    /// - 16.4: WHEN a handler invocation returns PENDING status, THE Test_Execution_Orchestrator
946    ///   SHALL continue polling for operation updates and re-invoke the handler when
947    ///   operations complete
948    /// - 16.5: WHEN a handler invocation returns SUCCEEDED or FAILED status,
949    ///   THE Test_Execution_Orchestrator SHALL resolve the execution and stop polling
950    ///
951    /// # Examples
952    ///
953    /// ```ignore
954    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
955    ///
956    /// let mut runner = LocalDurableTestRunner::new(my_workflow_with_waits);
957    /// let result = runner.run_with_orchestrator("hello".to_string()).await.unwrap();
958    ///
959    /// // Wait operations are automatically completed with time skipping
960    /// assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
961    /// ```
962    pub async fn run_with_orchestrator(&mut self, payload: I) -> Result<TestResult<O>, TestError>
963    where
964        I: Clone,
965    {
966        use crate::checkpoint_server::OperationStorage as OrchestratorOperationStorage;
967
968        // Clear previous operations
969        self.operation_storage.write().await.clear();
970
971        // Also clear mock client state for backward compatibility
972        #[allow(deprecated)]
973        self.mock_client.clear_all_calls();
974
975        // Configure time skipping based on test environment settings
976        let skip_time_config = SkipTimeConfig {
977            enabled: Self::is_time_skipping_enabled(),
978        };
979
980        // Create shared operation storage for the orchestrator
981        let orchestrator_storage =
982            Arc::new(tokio::sync::RwLock::new(OrchestratorOperationStorage::new()));
983
984        // Clone the handler Arc for use in the orchestrator
985        let handler = Arc::clone(&self.handler);
986
987        // Create the orchestrator with the shared handler
988        // The orchestrator will manage the execution lifecycle
989        let mut orchestrator = TestExecutionOrchestrator::new(
990            move |input: I, ctx: DurableContext| {
991                let handler = Arc::clone(&handler);
992                async move { handler(input, ctx).await }
993            },
994            orchestrator_storage.clone(),
995            self.checkpoint_worker.clone(),
996            skip_time_config,
997        );
998
999        // Pass pre-registered operation handles to the orchestrator so it can
1000        // populate them as operations are created/updated during execution.
1001        if !self.registered_handles.is_empty() {
1002            let callback_sender: Option<Arc<dyn CallbackSender>> = Some(Arc::new(
1003                CheckpointCallbackSender::new(self.checkpoint_worker.clone()),
1004            ));
1005            orchestrator = orchestrator.with_handles(
1006                self.registered_handles.clone(),
1007                self.shared_operations.clone(),
1008                callback_sender,
1009            );
1010        }
1011
1012        // Execute using the orchestrator
1013        let execution_result = orchestrator.execute_handler(payload.clone()).await?;
1014
1015        // Copy operations from orchestrator storage to our storage
1016        {
1017            let orch_storage = orchestrator_storage.read().await;
1018            let mut our_storage = self.operation_storage.write().await;
1019            for op in orch_storage.get_all() {
1020                our_storage.add_operation(op.clone());
1021            }
1022        }
1023
1024        // Convert TestExecutionResult to TestResult
1025        let mut test_result = match execution_result.status {
1026            ExecutionStatus::Succeeded => {
1027                if let Some(result) = execution_result.result {
1028                    TestResult::success(result, execution_result.operations)
1029                } else {
1030                    TestResult::with_status(ExecutionStatus::Succeeded, execution_result.operations)
1031                }
1032            }
1033            ExecutionStatus::Failed => {
1034                if let Some(error) = execution_result.error {
1035                    TestResult::failure(error, execution_result.operations)
1036                } else {
1037                    TestResult::with_status(ExecutionStatus::Failed, execution_result.operations)
1038                }
1039            }
1040            ExecutionStatus::Running => {
1041                TestResult::with_status(ExecutionStatus::Running, execution_result.operations)
1042            }
1043            _ => TestResult::with_status(execution_result.status, execution_result.operations),
1044        };
1045
1046        // Add invocations
1047        for invocation in execution_result.invocations {
1048            test_result.add_invocation(invocation);
1049        }
1050
1051        // Retrieve and add Node.js-compatible history events
1052        if let Ok(nodejs_events) = self
1053            .checkpoint_worker
1054            .get_nodejs_history_events(&execution_result.execution_id)
1055            .await
1056        {
1057            test_result.set_nodejs_history_events(nodejs_events);
1058        }
1059
1060        Ok(test_result)
1061    }
1062
1063    /// Returns a lazy `OperationHandle` that populates when an operation
1064    /// matching the given name is created during execution.
1065    ///
1066    /// # Arguments
1067    ///
1068    /// * `name` - The operation name to match against
1069    ///
1070    /// # Requirements
1071    ///
1072    /// - 1.1: WHEN a developer calls `get_operation_handle(name)` on the Local_Test_Runner
1073    ///   before calling `run()`, THE Local_Test_Runner SHALL return an Operation_Handle
1074    ///   that is initially unpopulated
1075    ///
1076    /// # Examples
1077    ///
1078    /// ```ignore
1079    /// let handle = runner.get_operation_handle("my-callback");
1080    /// // handle is unpopulated until run() executes and produces a matching operation
1081    /// ```
1082    pub fn get_operation_handle(&mut self, name: &str) -> OperationHandle {
1083        let handle = OperationHandle::new(
1084            OperationMatcher::ByName(name.to_string()),
1085            self.shared_operations.clone(),
1086        );
1087        self.registered_handles.push(handle.clone());
1088        handle
1089    }
1090
1091    /// Returns a lazy `OperationHandle` that populates with the operation
1092    /// at the given execution order index.
1093    ///
1094    /// # Arguments
1095    ///
1096    /// * `index` - The zero-based execution order index
1097    ///
1098    /// # Requirements
1099    ///
1100    /// - 1.7: WHEN `get_operation_handle_by_index(index)` is called before `run()`,
1101    ///   THE Local_Test_Runner SHALL return an Operation_Handle that populates with
1102    ///   the operation at that execution order index
1103    ///
1104    /// # Examples
1105    ///
1106    /// ```ignore
1107    /// let handle = runner.get_operation_handle_by_index(0);
1108    /// // handle populates with the first operation created during execution
1109    /// ```
1110    pub fn get_operation_handle_by_index(&mut self, index: usize) -> OperationHandle {
1111        let handle = OperationHandle::new(
1112            OperationMatcher::ByIndex(index),
1113            self.shared_operations.clone(),
1114        );
1115        self.registered_handles.push(handle.clone());
1116        handle
1117    }
1118
1119    /// Returns a lazy `OperationHandle` that populates with the operation
1120    /// matching the given unique ID.
1121    ///
1122    /// # Arguments
1123    ///
1124    /// * `id` - The unique operation ID to match against
1125    ///
1126    /// # Requirements
1127    ///
1128    /// - 1.8: WHEN `get_operation_handle_by_id(id)` is called before `run()`,
1129    ///   THE Local_Test_Runner SHALL return an Operation_Handle that populates with
1130    ///   the operation matching that unique ID
1131    ///
1132    /// # Examples
1133    ///
1134    /// ```ignore
1135    /// let handle = runner.get_operation_handle_by_id("op-abc-123");
1136    /// // handle populates with the operation whose ID matches during execution
1137    /// ```
1138    pub fn get_operation_handle_by_id(&mut self, id: &str) -> OperationHandle {
1139        let handle = OperationHandle::new(
1140            OperationMatcher::ById(id.to_string()),
1141            self.shared_operations.clone(),
1142        );
1143        self.registered_handles.push(handle.clone());
1144        handle
1145    }
1146
1147    /// Resets the test runner state for a fresh test run.
1148    ///
1149    /// This method clears all captured operations and resets the checkpoint server
1150    /// state, allowing the runner to be reused for multiple test scenarios.
1151    ///
1152    /// # Requirements
1153    ///
1154    /// - 1.6: WHEN reset() is called, THE Local_Test_Runner SHALL clear all
1155    ///   captured operations and reset checkpoint server state
1156    ///
1157    /// # Examples
1158    ///
1159    /// ```ignore
1160    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1161    ///
1162    /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1163    ///
1164    /// // First test run
1165    /// let result1 = runner.run("input1".to_string()).await.unwrap();
1166    ///
1167    /// // Reset for fresh state
1168    /// runner.reset().await;
1169    ///
1170    /// // Second test run with clean state
1171    /// let result2 = runner.run("input2".to_string()).await.unwrap();
1172    /// ```
1173    pub async fn reset(&mut self) {
1174        // Clear operation storage
1175        self.operation_storage.write().await.clear();
1176
1177        // Reset checkpoint worker manager singleton for fresh state
1178        CheckpointWorkerManager::reset_instance_for_testing();
1179
1180        // Re-acquire the checkpoint worker manager
1181        self.checkpoint_worker = CheckpointWorkerManager::get_instance(None)
1182            .expect("Failed to create CheckpointWorkerManager after reset");
1183
1184        // Clear registered operation handles and shared operations
1185        self.registered_handles.clear();
1186        self.shared_operations.write().await.clear();
1187
1188        // Also clear mock client state for backward compatibility
1189        #[allow(deprecated)]
1190        self.mock_client.clear_all_calls();
1191    }
1192
1193    /// Gets an operation by its unique ID.
1194    ///
1195    /// # Arguments
1196    ///
1197    /// * `id` - The unique operation ID
1198    ///
1199    /// # Returns
1200    ///
1201    /// The operation if found, or `None` if no operation with that ID exists.
1202    ///
1203    /// # Requirements
1204    ///
1205    /// - 4.1: WHEN get_operation(name) is called, THE Test_Result SHALL return
1206    ///   the first operation with that name
1207    /// - 4.4: WHEN get_operation_by_id(id) is called, THE Test_Result SHALL return
1208    ///   the operation with that unique ID
1209    ///
1210    /// # Examples
1211    ///
1212    /// ```ignore
1213    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1214    ///
1215    /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1216    /// let _ = runner.run("input".to_string()).await.unwrap();
1217    ///
1218    /// if let Some(op) = runner.get_operation_by_id("op-123").await {
1219    ///     println!("Found operation: {:?}", op);
1220    /// }
1221    /// ```
1222    pub async fn get_operation_by_id(&self, id: &str) -> Option<Operation> {
1223        self.operation_storage.read().await.get_by_id(id).cloned()
1224    }
1225
1226    /// Gets the first operation with the given name.
1227    ///
1228    /// # Arguments
1229    ///
1230    /// * `name` - The operation name to search for
1231    ///
1232    /// # Returns
1233    ///
1234    /// The first operation with that name, or `None` if no operation with that name exists.
1235    ///
1236    /// # Requirements
1237    ///
1238    /// - 4.1: WHEN get_operation(name) is called, THE Test_Result SHALL return
1239    ///   the first operation with that name
1240    ///
1241    /// # Examples
1242    ///
1243    /// ```ignore
1244    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1245    ///
1246    /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1247    /// let _ = runner.run("input".to_string()).await.unwrap();
1248    ///
1249    /// if let Some(op) = runner.get_operation("process_data").await {
1250    ///     println!("Found operation: {:?}", op);
1251    /// }
1252    /// ```
1253    pub async fn get_operation(&self, name: &str) -> Option<Operation> {
1254        self.operation_storage
1255            .read()
1256            .await
1257            .get_by_name(name)
1258            .cloned()
1259    }
1260
1261    /// Gets an operation by its index in the execution order.
1262    ///
1263    /// # Arguments
1264    ///
1265    /// * `index` - The zero-based index of the operation
1266    ///
1267    /// # Returns
1268    ///
1269    /// The operation at that index, or `None` if the index is out of bounds.
1270    ///
1271    /// # Requirements
1272    ///
1273    /// - 4.2: WHEN get_operation_by_index(index) is called, THE Test_Result SHALL return
1274    ///   the operation at that execution order index
1275    ///
1276    /// # Examples
1277    ///
1278    /// ```ignore
1279    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1280    ///
1281    /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1282    /// let _ = runner.run("input".to_string()).await.unwrap();
1283    ///
1284    /// // Get the first operation
1285    /// if let Some(op) = runner.get_operation_by_index(0).await {
1286    ///     println!("First operation: {:?}", op);
1287    /// }
1288    /// ```
1289    pub async fn get_operation_by_index(&self, index: usize) -> Option<Operation> {
1290        self.operation_storage
1291            .read()
1292            .await
1293            .get_by_index(index)
1294            .cloned()
1295    }
1296
1297    /// Gets an operation by name and occurrence index.
1298    ///
1299    /// This is useful when multiple operations have the same name and you need
1300    /// to access a specific occurrence.
1301    ///
1302    /// # Arguments
1303    ///
1304    /// * `name` - The operation name to search for
1305    /// * `index` - The zero-based index among operations with that name
1306    ///
1307    /// # Returns
1308    ///
1309    /// The operation at that name/index combination, or `None` if not found.
1310    ///
1311    /// # Requirements
1312    ///
1313    /// - 4.3: WHEN get_operation_by_name_and_index(name, index) is called,
1314    ///   THE Test_Result SHALL return the Nth operation with that name
1315    ///
1316    /// # Examples
1317    ///
1318    /// ```ignore
1319    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1320    ///
1321    /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1322    /// let _ = runner.run("input".to_string()).await.unwrap();
1323    ///
1324    /// // Get the second "process" operation
1325    /// if let Some(op) = runner.get_operation_by_name_and_index("process", 1).await {
1326    ///     println!("Second process operation: {:?}", op);
1327    /// }
1328    /// ```
1329    pub async fn get_operation_by_name_and_index(
1330        &self,
1331        name: &str,
1332        index: usize,
1333    ) -> Option<Operation> {
1334        self.operation_storage
1335            .read()
1336            .await
1337            .get_by_name_and_index(name, index)
1338            .cloned()
1339    }
1340
1341    /// Gets all captured operations.
1342    ///
1343    /// # Returns
1344    ///
1345    /// A vector of all operations in execution order.
1346    ///
1347    /// # Requirements
1348    ///
1349    /// - 4.5: WHEN get_all_operations() is called, THE Test_Result SHALL return
1350    ///   all operations in execution order
1351    ///
1352    /// # Examples
1353    ///
1354    /// ```ignore
1355    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1356    ///
1357    /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1358    /// let _ = runner.run("input".to_string()).await.unwrap();
1359    ///
1360    /// let all_ops = runner.get_all_operations().await;
1361    /// println!("Total operations: {}", all_ops.len());
1362    /// ```
1363    pub async fn get_all_operations(&self) -> Vec<Operation> {
1364        self.operation_storage.read().await.get_all().to_vec()
1365    }
1366
1367    /// Registers a durable function for invoke testing.
1368    ///
1369    /// Durable functions receive a `DurableContext` and can perform durable operations.
1370    /// When the main handler invokes a function by name, the registered function
1371    /// will be called.
1372    ///
1373    /// # Arguments
1374    ///
1375    /// * `name` - The name to register the function under
1376    /// * `func` - The durable function to register
1377    ///
1378    /// # Requirements
1379    ///
1380    /// - 7.1: WHEN register_durable_function(name, func) is called, THE Local_Test_Runner
1381    ///   SHALL store the function for invoke handling
1382    /// - 7.2: WHEN a registered durable function is invoked, THE Local_Test_Runner
1383    ///   SHALL execute it with a DurableContext
1384    ///
1385    /// # Examples
1386    ///
1387    /// ```ignore
1388    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1389    ///
1390    /// async fn helper_workflow(input: serde_json::Value, ctx: DurableContext) -> Result<serde_json::Value, DurableError> {
1391    ///     Ok(serde_json::json!({"processed": true}))
1392    /// }
1393    ///
1394    /// let mut runner = LocalDurableTestRunner::new(main_workflow);
1395    /// runner.register_durable_function("helper", helper_workflow).await;
1396    /// ```
1397    pub async fn register_durable_function<F, Fut>(&self, name: impl Into<String>, func: F)
1398    where
1399        F: Fn(serde_json::Value, DurableContext) -> Fut + Send + Sync + 'static,
1400        Fut: Future<Output = Result<serde_json::Value, DurableError>> + Send + 'static,
1401    {
1402        let boxed_func = Box::new(move |input: serde_json::Value, ctx: DurableContext| {
1403            let fut = func(input, ctx);
1404            Box::pin(fut)
1405                as Pin<Box<dyn Future<Output = Result<serde_json::Value, DurableError>> + Send>>
1406        });
1407
1408        self.registered_functions
1409            .write()
1410            .await
1411            .insert(name.into(), RegisteredFunction::Durable(boxed_func));
1412    }
1413
1414    /// Registers a regular (non-durable) function for invoke testing.
1415    ///
1416    /// Regular functions do not receive a `DurableContext` and cannot perform
1417    /// durable operations. They are useful for testing simple helper functions.
1418    ///
1419    /// # Arguments
1420    ///
1421    /// * `name` - The name to register the function under
1422    /// * `func` - The regular function to register
1423    ///
1424    /// # Requirements
1425    ///
1426    /// - 7.3: WHEN register_function(name, func) is called, THE Local_Test_Runner
1427    ///   SHALL store the function for invoke handling
1428    /// - 7.4: WHEN a registered regular function is invoked, THE Local_Test_Runner
1429    ///   SHALL execute it without a DurableContext
1430    ///
1431    /// # Examples
1432    ///
1433    /// ```ignore
1434    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1435    ///
1436    /// fn simple_helper(input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
1437    ///     Ok(serde_json::json!({"result": "done"}))
1438    /// }
1439    ///
1440    /// let mut runner = LocalDurableTestRunner::new(main_workflow);
1441    /// runner.register_function("simple_helper", simple_helper).await;
1442    /// ```
1443    pub async fn register_function<F>(&self, name: impl Into<String>, func: F)
1444    where
1445        F: Fn(serde_json::Value) -> Result<serde_json::Value, DurableError> + Send + Sync + 'static,
1446    {
1447        self.registered_functions
1448            .write()
1449            .await
1450            .insert(name.into(), RegisteredFunction::Regular(Box::new(func)));
1451    }
1452
1453    /// Gets a registered function by name.
1454    ///
1455    /// # Arguments
1456    ///
1457    /// * `name` - The name of the function to retrieve
1458    ///
1459    /// # Returns
1460    ///
1461    /// `true` if a function with that name is registered, `false` otherwise.
1462    ///
1463    /// # Examples
1464    ///
1465    /// ```ignore
1466    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1467    ///
1468    /// let runner = LocalDurableTestRunner::new(main_workflow);
1469    /// runner.register_function("helper", |_| Ok(serde_json::json!({}))).await;
1470    ///
1471    /// assert!(runner.has_registered_function("helper").await);
1472    /// assert!(!runner.has_registered_function("nonexistent").await);
1473    /// ```
1474    pub async fn has_registered_function(&self, name: &str) -> bool {
1475        self.registered_functions.read().await.contains_key(name)
1476    }
1477
1478    /// Gets the count of registered functions.
1479    ///
1480    /// # Returns
1481    ///
1482    /// The number of registered functions.
1483    pub async fn registered_function_count(&self) -> usize {
1484        self.registered_functions.read().await.len()
1485    }
1486
1487    /// Clears all registered functions.
1488    ///
1489    /// # Examples
1490    ///
1491    /// ```ignore
1492    /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1493    ///
1494    /// let mut runner = LocalDurableTestRunner::new(main_workflow);
1495    /// runner.register_function("helper", |_| Ok(serde_json::json!({}))).await;
1496    /// assert_eq!(runner.registered_function_count().await, 1);
1497    ///
1498    /// runner.clear_registered_functions().await;
1499    /// assert_eq!(runner.registered_function_count().await, 0);
1500    /// ```
1501    pub async fn clear_registered_functions(&mut self) {
1502        self.registered_functions.write().await.clear();
1503    }
1504}
1505
1506impl<I, O> std::fmt::Debug for LocalDurableTestRunner<I, O>
1507where
1508    I: DeserializeOwned + Send + 'static,
1509    O: Serialize + DeserializeOwned + Send + 'static,
1510{
1511    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1512        f.debug_struct("LocalDurableTestRunner")
1513            .field("checkpoint_worker", &"CheckpointWorkerManager")
1514            .finish()
1515    }
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520    use super::*;
1521    use durable_execution_sdk::OperationType;
1522
1523    async fn simple_handler(input: String, _ctx: DurableContext) -> Result<String, DurableError> {
1524        Ok(format!("processed: {}", input))
1525    }
1526
1527    #[test]
1528    fn test_test_environment_config_default() {
1529        let config = TestEnvironmentConfig::default();
1530        assert!(config.skip_time);
1531        assert!(config.checkpoint_delay.is_none());
1532    }
1533
1534    #[test]
1535    fn test_operation_storage_new() {
1536        let storage = OperationStorage::new();
1537        assert!(storage.is_empty());
1538        assert_eq!(storage.len(), 0);
1539    }
1540
1541    #[test]
1542    fn test_operation_storage_add_and_get() {
1543        let mut storage = OperationStorage::new();
1544
1545        let mut op1 = Operation::new("op-1", OperationType::Step);
1546        op1.name = Some("step1".to_string());
1547        storage.add_operation(op1);
1548
1549        let mut op2 = Operation::new("op-2", OperationType::Wait);
1550        op2.name = Some("wait1".to_string());
1551        storage.add_operation(op2);
1552
1553        assert_eq!(storage.len(), 2);
1554
1555        // Get by ID
1556        let found = storage.get_by_id("op-1");
1557        assert!(found.is_some());
1558        assert_eq!(found.unwrap().operation_id, "op-1");
1559
1560        // Get by name
1561        let found = storage.get_by_name("step1");
1562        assert!(found.is_some());
1563        assert_eq!(found.unwrap().operation_id, "op-1");
1564
1565        // Get by index
1566        let found = storage.get_by_index(1);
1567        assert!(found.is_some());
1568        assert_eq!(found.unwrap().operation_id, "op-2");
1569
1570        // Get all
1571        let all = storage.get_all();
1572        assert_eq!(all.len(), 2);
1573    }
1574
1575    #[test]
1576    fn test_operation_storage_get_by_name_and_index() {
1577        let mut storage = OperationStorage::new();
1578
1579        // Add multiple operations with the same name
1580        let mut op1 = Operation::new("op-1", OperationType::Step);
1581        op1.name = Some("step".to_string());
1582        storage.add_operation(op1);
1583
1584        let mut op2 = Operation::new("op-2", OperationType::Step);
1585        op2.name = Some("step".to_string());
1586        storage.add_operation(op2);
1587
1588        let mut op3 = Operation::new("op-3", OperationType::Step);
1589        op3.name = Some("step".to_string());
1590        storage.add_operation(op3);
1591
1592        // Get first occurrence
1593        let found = storage.get_by_name_and_index("step", 0);
1594        assert!(found.is_some());
1595        assert_eq!(found.unwrap().operation_id, "op-1");
1596
1597        // Get second occurrence
1598        let found = storage.get_by_name_and_index("step", 1);
1599        assert!(found.is_some());
1600        assert_eq!(found.unwrap().operation_id, "op-2");
1601
1602        // Get third occurrence
1603        let found = storage.get_by_name_and_index("step", 2);
1604        assert!(found.is_some());
1605        assert_eq!(found.unwrap().operation_id, "op-3");
1606
1607        // Out of bounds
1608        let found = storage.get_by_name_and_index("step", 3);
1609        assert!(found.is_none());
1610    }
1611
1612    #[test]
1613    fn test_operation_storage_clear() {
1614        let mut storage = OperationStorage::new();
1615
1616        let op = Operation::new("op-1", OperationType::Step);
1617        storage.add_operation(op);
1618        assert_eq!(storage.len(), 1);
1619
1620        storage.clear();
1621        assert!(storage.is_empty());
1622        assert!(storage.get_by_id("op-1").is_none());
1623    }
1624
1625    #[tokio::test]
1626    async fn test_local_runner_creation() {
1627        let runner = LocalDurableTestRunner::new(simple_handler);
1628        assert_eq!(runner.operation_count().await, 0);
1629    }
1630
1631    #[tokio::test]
1632    #[allow(deprecated)]
1633    async fn test_local_runner_with_mock_client() {
1634        let mock_client = MockDurableServiceClient::new().with_checkpoint_responses(10);
1635        let runner = LocalDurableTestRunner::with_mock_client(simple_handler, mock_client);
1636        assert_eq!(runner.operation_count().await, 0);
1637    }
1638
1639    #[tokio::test]
1640    async fn test_setup_teardown_environment() {
1641        // Ensure clean state
1642        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1643            .await
1644            .unwrap();
1645
1646        assert!(!LocalDurableTestRunner::<String, String>::is_environment_setup());
1647        assert!(!LocalDurableTestRunner::<String, String>::is_time_skipping_enabled());
1648
1649        // Setup with time skipping
1650        LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
1651            skip_time: true,
1652            checkpoint_delay: None,
1653        })
1654        .await
1655        .unwrap();
1656
1657        assert!(LocalDurableTestRunner::<String, String>::is_environment_setup());
1658        assert!(LocalDurableTestRunner::<String, String>::is_time_skipping_enabled());
1659
1660        // Teardown
1661        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1662            .await
1663            .unwrap();
1664
1665        assert!(!LocalDurableTestRunner::<String, String>::is_environment_setup());
1666        assert!(!LocalDurableTestRunner::<String, String>::is_time_skipping_enabled());
1667    }
1668
1669    #[tokio::test]
1670    async fn test_setup_without_time_skipping() {
1671        // Ensure clean state
1672        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1673            .await
1674            .unwrap();
1675
1676        // Setup without time skipping
1677        LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
1678            skip_time: false,
1679            checkpoint_delay: None,
1680        })
1681        .await
1682        .unwrap();
1683
1684        assert!(LocalDurableTestRunner::<String, String>::is_environment_setup());
1685        assert!(!LocalDurableTestRunner::<String, String>::is_time_skipping_enabled());
1686
1687        // Teardown
1688        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1689            .await
1690            .unwrap();
1691    }
1692
1693    #[tokio::test]
1694    async fn test_double_setup_is_idempotent() {
1695        // Ensure clean state
1696        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1697            .await
1698            .unwrap();
1699
1700        // First setup
1701        LocalDurableTestRunner::<String, String>::setup_test_environment(
1702            TestEnvironmentConfig::default(),
1703        )
1704        .await
1705        .unwrap();
1706
1707        // Second setup should be idempotent
1708        LocalDurableTestRunner::<String, String>::setup_test_environment(
1709            TestEnvironmentConfig::default(),
1710        )
1711        .await
1712        .unwrap();
1713
1714        assert!(LocalDurableTestRunner::<String, String>::is_environment_setup());
1715
1716        // Teardown
1717        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1718            .await
1719            .unwrap();
1720    }
1721
1722    #[tokio::test]
1723    async fn test_double_teardown_is_idempotent() {
1724        // Ensure clean state
1725        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1726            .await
1727            .unwrap();
1728
1729        // Setup
1730        LocalDurableTestRunner::<String, String>::setup_test_environment(
1731            TestEnvironmentConfig::default(),
1732        )
1733        .await
1734        .unwrap();
1735
1736        // First teardown
1737        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1738            .await
1739            .unwrap();
1740
1741        // Second teardown should be idempotent
1742        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1743            .await
1744            .unwrap();
1745
1746        assert!(!LocalDurableTestRunner::<String, String>::is_environment_setup());
1747    }
1748
1749    // Tests for run() method - Subtask 8.3
1750
1751    #[tokio::test]
1752    async fn test_run_successful_execution() {
1753        // Ensure clean state
1754        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1755            .await
1756            .unwrap();
1757
1758        let mut runner = LocalDurableTestRunner::new(simple_handler);
1759        let result = runner.run("hello".to_string()).await.unwrap();
1760
1761        // Verify successful execution status
1762        assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
1763
1764        // Verify result value
1765        let output = result.get_result().unwrap();
1766        assert_eq!(output, "processed: hello");
1767
1768        // Verify invocation was recorded
1769        assert_eq!(result.get_invocations().len(), 1);
1770    }
1771
1772    #[tokio::test]
1773    async fn test_run_failed_execution() {
1774        async fn failing_handler(
1775            _input: String,
1776            _ctx: DurableContext,
1777        ) -> Result<String, DurableError> {
1778            Err(DurableError::execution("Test failure"))
1779        }
1780
1781        // Ensure clean state
1782        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1783            .await
1784            .unwrap();
1785
1786        let mut runner = LocalDurableTestRunner::new(failing_handler);
1787        let result = runner.run("hello".to_string()).await.unwrap();
1788
1789        // Verify failed execution status
1790        assert_eq!(result.get_status(), ExecutionStatus::Failed);
1791
1792        // Verify error is captured
1793        let error = result.get_error().unwrap();
1794        assert!(error
1795            .error_message
1796            .as_ref()
1797            .unwrap()
1798            .contains("Test failure"));
1799
1800        // Verify invocation was recorded with error
1801        let invocations = result.get_invocations();
1802        assert_eq!(invocations.len(), 1);
1803        assert!(invocations[0].error.is_some());
1804    }
1805
1806    #[tokio::test]
1807    async fn test_run_multiple_times_clears_previous_operations() {
1808        // Ensure clean state
1809        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1810            .await
1811            .unwrap();
1812
1813        let mut runner = LocalDurableTestRunner::new(simple_handler);
1814
1815        // First run
1816        let result1 = runner.run("first".to_string()).await.unwrap();
1817        assert_eq!(result1.get_status(), ExecutionStatus::Succeeded);
1818        assert_eq!(result1.get_result().unwrap(), "processed: first");
1819
1820        // Second run - should have fresh state
1821        let result2 = runner.run("second".to_string()).await.unwrap();
1822        assert_eq!(result2.get_status(), ExecutionStatus::Succeeded);
1823        assert_eq!(result2.get_result().unwrap(), "processed: second");
1824    }
1825
1826    // Tests for reset() method - Subtask 8.4
1827
1828    #[tokio::test]
1829    async fn test_reset_clears_operation_storage() {
1830        // Ensure clean state
1831        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1832            .await
1833            .unwrap();
1834
1835        // Reset checkpoint worker manager for clean state
1836        CheckpointWorkerManager::reset_instance_for_testing();
1837
1838        let mut runner = LocalDurableTestRunner::new(simple_handler);
1839
1840        // Run to generate some state
1841        let _ = runner.run("hello".to_string()).await.unwrap();
1842
1843        // Reset the runner
1844        runner.reset().await;
1845
1846        // Verify operation storage is cleared
1847        assert_eq!(runner.operation_count().await, 0);
1848    }
1849
1850    #[tokio::test]
1851    async fn test_reset_allows_fresh_run() {
1852        // Ensure clean state
1853        LocalDurableTestRunner::<String, String>::teardown_test_environment()
1854            .await
1855            .unwrap();
1856
1857        let mut runner = LocalDurableTestRunner::new(simple_handler);
1858
1859        // First run
1860        let result1 = runner.run("first".to_string()).await.unwrap();
1861        assert_eq!(result1.get_result().unwrap(), "processed: first");
1862
1863        // Reset
1864        runner.reset().await;
1865
1866        // Second run after reset
1867        let result2 = runner.run("second".to_string()).await.unwrap();
1868        assert_eq!(result2.get_result().unwrap(), "processed: second");
1869        assert_eq!(result2.get_status(), ExecutionStatus::Succeeded);
1870    }
1871
1872    // Tests for operation lookup methods - Subtask 8.5
1873
1874    #[tokio::test]
1875    async fn test_get_operation_by_id() {
1876        let runner: LocalDurableTestRunner<String, String> =
1877            LocalDurableTestRunner::new(simple_handler);
1878
1879        // Manually add an operation to storage for testing
1880        {
1881            let mut storage = runner.operation_storage.write().await;
1882            let mut op = Operation::new("test-op-id", OperationType::Step);
1883            op.name = Some("test_step".to_string());
1884            storage.add_operation(op);
1885        }
1886
1887        // Test get_operation_by_id
1888        let found = runner.get_operation_by_id("test-op-id").await;
1889        assert!(found.is_some());
1890        assert_eq!(found.unwrap().operation_id, "test-op-id");
1891
1892        // Test not found
1893        let not_found = runner.get_operation_by_id("nonexistent").await;
1894        assert!(not_found.is_none());
1895    }
1896
1897    #[tokio::test]
1898    async fn test_get_operation_by_name() {
1899        let runner: LocalDurableTestRunner<String, String> =
1900            LocalDurableTestRunner::new(simple_handler);
1901
1902        // Manually add operations to storage for testing
1903        {
1904            let mut storage = runner.operation_storage.write().await;
1905            let mut op1 = Operation::new("op-1", OperationType::Step);
1906            op1.name = Some("process".to_string());
1907            storage.add_operation(op1);
1908
1909            let mut op2 = Operation::new("op-2", OperationType::Step);
1910            op2.name = Some("validate".to_string());
1911            storage.add_operation(op2);
1912        }
1913
1914        // Test get_operation (by name)
1915        let found = runner.get_operation("process").await;
1916        assert!(found.is_some());
1917        assert_eq!(found.unwrap().operation_id, "op-1");
1918
1919        // Test not found
1920        let not_found = runner.get_operation("nonexistent").await;
1921        assert!(not_found.is_none());
1922    }
1923
1924    #[tokio::test]
1925    async fn test_get_operation_by_index() {
1926        let runner: LocalDurableTestRunner<String, String> =
1927            LocalDurableTestRunner::new(simple_handler);
1928
1929        // Manually add operations to storage for testing
1930        {
1931            let mut storage = runner.operation_storage.write().await;
1932            storage.add_operation(Operation::new("op-0", OperationType::Step));
1933            storage.add_operation(Operation::new("op-1", OperationType::Wait));
1934            storage.add_operation(Operation::new("op-2", OperationType::Callback));
1935        }
1936
1937        // Test get_operation_by_index
1938        let op0 = runner.get_operation_by_index(0).await;
1939        assert!(op0.is_some());
1940        assert_eq!(op0.unwrap().operation_id, "op-0");
1941
1942        let op1 = runner.get_operation_by_index(1).await;
1943        assert!(op1.is_some());
1944        assert_eq!(op1.unwrap().operation_id, "op-1");
1945
1946        let op2 = runner.get_operation_by_index(2).await;
1947        assert!(op2.is_some());
1948        assert_eq!(op2.unwrap().operation_id, "op-2");
1949
1950        // Test out of bounds
1951        let out_of_bounds = runner.get_operation_by_index(3).await;
1952        assert!(out_of_bounds.is_none());
1953    }
1954
1955    #[tokio::test]
1956    async fn test_get_operation_by_name_and_index() {
1957        let runner: LocalDurableTestRunner<String, String> =
1958            LocalDurableTestRunner::new(simple_handler);
1959
1960        // Manually add operations with same name to storage for testing
1961        {
1962            let mut storage = runner.operation_storage.write().await;
1963            let mut op1 = Operation::new("op-1", OperationType::Step);
1964            op1.name = Some("process".to_string());
1965            storage.add_operation(op1);
1966
1967            let mut op2 = Operation::new("op-2", OperationType::Step);
1968            op2.name = Some("process".to_string());
1969            storage.add_operation(op2);
1970
1971            let mut op3 = Operation::new("op-3", OperationType::Step);
1972            op3.name = Some("process".to_string());
1973            storage.add_operation(op3);
1974        }
1975
1976        // Test get_operation_by_name_and_index
1977        let first = runner.get_operation_by_name_and_index("process", 0).await;
1978        assert!(first.is_some());
1979        assert_eq!(first.unwrap().operation_id, "op-1");
1980
1981        let second = runner.get_operation_by_name_and_index("process", 1).await;
1982        assert!(second.is_some());
1983        assert_eq!(second.unwrap().operation_id, "op-2");
1984
1985        let third = runner.get_operation_by_name_and_index("process", 2).await;
1986        assert!(third.is_some());
1987        assert_eq!(third.unwrap().operation_id, "op-3");
1988
1989        // Test out of bounds
1990        let out_of_bounds = runner.get_operation_by_name_and_index("process", 3).await;
1991        assert!(out_of_bounds.is_none());
1992    }
1993
1994    #[tokio::test]
1995    async fn test_get_all_operations() {
1996        let runner: LocalDurableTestRunner<String, String> =
1997            LocalDurableTestRunner::new(simple_handler);
1998
1999        // Manually add operations to storage for testing
2000        {
2001            let mut storage = runner.operation_storage.write().await;
2002            storage.add_operation(Operation::new("op-0", OperationType::Step));
2003            storage.add_operation(Operation::new("op-1", OperationType::Wait));
2004            storage.add_operation(Operation::new("op-2", OperationType::Callback));
2005        }
2006
2007        // Test get_all_operations
2008        let all_ops = runner.get_all_operations().await;
2009        assert_eq!(all_ops.len(), 3);
2010        assert_eq!(all_ops[0].operation_id, "op-0");
2011        assert_eq!(all_ops[1].operation_id, "op-1");
2012        assert_eq!(all_ops[2].operation_id, "op-2");
2013    }
2014
2015    // Tests for function registration - Subtask 8.6
2016
2017    #[tokio::test]
2018    async fn test_register_durable_function() {
2019        async fn helper_func(
2020            _input: serde_json::Value,
2021            _ctx: DurableContext,
2022        ) -> Result<serde_json::Value, DurableError> {
2023            Ok(serde_json::json!({"result": "ok"}))
2024        }
2025
2026        let runner: LocalDurableTestRunner<String, String> =
2027            LocalDurableTestRunner::new(simple_handler);
2028
2029        // Register a durable function
2030        runner
2031            .register_durable_function("helper", helper_func)
2032            .await;
2033
2034        // Verify it's registered
2035        assert!(runner.has_registered_function("helper").await);
2036        assert_eq!(runner.registered_function_count().await, 1);
2037    }
2038
2039    #[tokio::test]
2040    async fn test_register_regular_function() {
2041        fn simple_func(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2042            Ok(serde_json::json!({"result": "ok"}))
2043        }
2044
2045        let runner: LocalDurableTestRunner<String, String> =
2046            LocalDurableTestRunner::new(simple_handler);
2047
2048        // Register a regular function
2049        runner.register_function("simple", simple_func).await;
2050
2051        // Verify it's registered
2052        assert!(runner.has_registered_function("simple").await);
2053        assert_eq!(runner.registered_function_count().await, 1);
2054    }
2055
2056    #[tokio::test]
2057    async fn test_register_multiple_functions() {
2058        async fn durable_func(
2059            _input: serde_json::Value,
2060            _ctx: DurableContext,
2061        ) -> Result<serde_json::Value, DurableError> {
2062            Ok(serde_json::json!({}))
2063        }
2064
2065        fn regular_func(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2066            Ok(serde_json::json!({}))
2067        }
2068
2069        let runner: LocalDurableTestRunner<String, String> =
2070            LocalDurableTestRunner::new(simple_handler);
2071
2072        // Register multiple functions
2073        runner
2074            .register_durable_function("durable1", durable_func)
2075            .await;
2076        runner.register_function("regular1", regular_func).await;
2077        runner
2078            .register_durable_function("durable2", durable_func)
2079            .await;
2080
2081        // Verify all are registered
2082        assert!(runner.has_registered_function("durable1").await);
2083        assert!(runner.has_registered_function("regular1").await);
2084        assert!(runner.has_registered_function("durable2").await);
2085        assert!(!runner.has_registered_function("nonexistent").await);
2086        assert_eq!(runner.registered_function_count().await, 3);
2087    }
2088
2089    #[tokio::test]
2090    async fn test_clear_registered_functions() {
2091        fn simple_func(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2092            Ok(serde_json::json!({}))
2093        }
2094
2095        let mut runner: LocalDurableTestRunner<String, String> =
2096            LocalDurableTestRunner::new(simple_handler);
2097
2098        // Register functions
2099        runner.register_function("func1", simple_func).await;
2100        runner.register_function("func2", simple_func).await;
2101        assert_eq!(runner.registered_function_count().await, 2);
2102
2103        // Clear all registered functions
2104        runner.clear_registered_functions().await;
2105        assert_eq!(runner.registered_function_count().await, 0);
2106        assert!(!runner.has_registered_function("func1").await);
2107        assert!(!runner.has_registered_function("func2").await);
2108    }
2109
2110    #[tokio::test]
2111    async fn test_register_function_overwrites_existing() {
2112        fn func_v1(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2113            Ok(serde_json::json!({"version": 1}))
2114        }
2115
2116        fn func_v2(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2117            Ok(serde_json::json!({"version": 2}))
2118        }
2119
2120        let runner: LocalDurableTestRunner<String, String> =
2121            LocalDurableTestRunner::new(simple_handler);
2122
2123        // Register first version
2124        runner.register_function("func", func_v1).await;
2125        assert_eq!(runner.registered_function_count().await, 1);
2126
2127        // Register second version with same name
2128        runner.register_function("func", func_v2).await;
2129
2130        // Should still have only one function (overwritten)
2131        assert_eq!(runner.registered_function_count().await, 1);
2132        assert!(runner.has_registered_function("func").await);
2133    }
2134
2135    // Tests for run_with_orchestrator() method - Subtask 8.1.2
2136
2137    #[tokio::test]
2138    async fn test_run_with_orchestrator_successful_execution() {
2139        // Ensure clean state
2140        LocalDurableTestRunner::<String, String>::teardown_test_environment()
2141            .await
2142            .unwrap();
2143
2144        let mut runner = LocalDurableTestRunner::new(simple_handler);
2145        let result = runner
2146            .run_with_orchestrator("hello".to_string())
2147            .await
2148            .unwrap();
2149
2150        // Verify successful execution status
2151        assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2152
2153        // Verify result value
2154        let output = result.get_result().unwrap();
2155        assert_eq!(output, "processed: hello");
2156
2157        // Verify invocation was recorded
2158        assert!(!result.get_invocations().is_empty());
2159    }
2160
2161    #[tokio::test]
2162    async fn test_run_with_orchestrator_failed_execution() {
2163        async fn failing_handler(
2164            _input: String,
2165            _ctx: DurableContext,
2166        ) -> Result<String, DurableError> {
2167            Err(DurableError::execution("Test failure"))
2168        }
2169
2170        // Ensure clean state
2171        LocalDurableTestRunner::<String, String>::teardown_test_environment()
2172            .await
2173            .unwrap();
2174
2175        let mut runner = LocalDurableTestRunner::new(failing_handler);
2176        let result = runner
2177            .run_with_orchestrator("hello".to_string())
2178            .await
2179            .unwrap();
2180
2181        // Verify failed execution status
2182        assert_eq!(result.get_status(), ExecutionStatus::Failed);
2183
2184        // Verify error is captured
2185        let error = result.get_error().unwrap();
2186        assert!(error
2187            .error_message
2188            .as_ref()
2189            .unwrap()
2190            .contains("Test failure"));
2191    }
2192
2193    #[tokio::test]
2194    async fn test_run_with_orchestrator_with_time_skipping() {
2195        // Ensure clean state
2196        LocalDurableTestRunner::<String, String>::teardown_test_environment()
2197            .await
2198            .unwrap();
2199
2200        // Setup with time skipping enabled
2201        LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
2202            skip_time: true,
2203            checkpoint_delay: None,
2204        })
2205        .await
2206        .unwrap();
2207
2208        let mut runner = LocalDurableTestRunner::new(simple_handler);
2209        let result = runner
2210            .run_with_orchestrator("hello".to_string())
2211            .await
2212            .unwrap();
2213
2214        // Verify successful execution status
2215        assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2216
2217        // Teardown
2218        LocalDurableTestRunner::<String, String>::teardown_test_environment()
2219            .await
2220            .unwrap();
2221    }
2222
2223    #[tokio::test]
2224    async fn test_run_with_orchestrator_populates_nodejs_history_events() {
2225        use crate::checkpoint_server::NodeJsEventType;
2226
2227        // Ensure clean state
2228        LocalDurableTestRunner::<String, String>::teardown_test_environment()
2229            .await
2230            .unwrap();
2231
2232        let mut runner = LocalDurableTestRunner::new(simple_handler);
2233        let result = runner
2234            .run_with_orchestrator("hello".to_string())
2235            .await
2236            .unwrap();
2237
2238        // Verify successful execution status
2239        assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2240
2241        // Verify Node.js history events are populated
2242        let nodejs_events = result.get_nodejs_history_events();
2243        assert!(
2244            !nodejs_events.is_empty(),
2245            "Node.js history events should be populated"
2246        );
2247
2248        // Verify the first event is ExecutionStarted
2249        assert_eq!(
2250            nodejs_events[0].event_type,
2251            NodeJsEventType::ExecutionStarted,
2252            "First event should be ExecutionStarted"
2253        );
2254
2255        // Verify event IDs are sequential starting from 1
2256        for (i, event) in nodejs_events.iter().enumerate() {
2257            assert_eq!(
2258                event.event_id,
2259                (i + 1) as u64,
2260                "Event IDs should be sequential starting from 1"
2261            );
2262        }
2263
2264        // Verify timestamps are in ISO 8601 format
2265        for event in nodejs_events {
2266            assert!(
2267                event.event_timestamp.contains('T') && event.event_timestamp.contains('Z'),
2268                "Timestamps should be in ISO 8601 format"
2269            );
2270        }
2271    }
2272}
2273
2274/// Property-based tests for LocalDurableTestRunner
2275///
2276/// These tests verify the correctness properties defined in the design document.
2277#[cfg(test)]
2278mod property_tests {
2279    use super::*;
2280    use durable_execution_sdk::OperationType;
2281    use proptest::prelude::*;
2282
2283    /// Strategy for generating non-empty strings (for inputs)
2284    fn non_empty_string_strategy() -> impl Strategy<Value = String> {
2285        "[a-zA-Z0-9_ ]{1,32}".prop_map(|s| s)
2286    }
2287
2288    /// Strategy for generating operation names
2289    fn operation_name_strategy() -> impl Strategy<Value = String> {
2290        "[a-zA-Z_][a-zA-Z0-9_]{0,15}".prop_map(|s| s)
2291    }
2292
2293    /// Strategy for generating function names
2294    fn function_name_strategy() -> impl Strategy<Value = String> {
2295        "[a-zA-Z_][a-zA-Z0-9_]{0,15}".prop_map(|s| s)
2296    }
2297
2298    proptest! {
2299        /// **Feature: rust-testing-utilities, Property 1: Execution Status Consistency**
2300        ///
2301        /// *For any* handler execution, if the handler returns `Ok(value)`, the TestResult
2302        /// status SHALL be `Succeeded`, and if the handler returns `Err(error)`, the TestResult
2303        /// status SHALL be `Failed`.
2304        ///
2305        /// **Validates: Requirements 1.3, 1.4**
2306        #[test]
2307        fn prop_execution_status_consistency_success(input in non_empty_string_strategy()) {
2308            // Use current_thread runtime which is required for tokio::time::pause()
2309            let rt = tokio::runtime::Builder::new_current_thread()
2310                .enable_all()
2311                .build()
2312                .unwrap();
2313            rt.block_on(async {
2314                // Ensure clean state
2315                LocalDurableTestRunner::<String, String>::teardown_test_environment()
2316                    .await
2317                    .unwrap();
2318
2319                // Handler that always succeeds
2320                async fn success_handler(
2321                    input: String,
2322                    _ctx: DurableContext,
2323                ) -> Result<String, DurableError> {
2324                    Ok(format!("success: {}", input))
2325                }
2326
2327                let mut runner = LocalDurableTestRunner::new(success_handler);
2328                let result = runner.run(input.clone()).await.unwrap();
2329
2330                // Property: successful handler -> Succeeded status
2331                prop_assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2332                prop_assert!(result.get_result().is_ok());
2333                let expected = format!("success: {}", input);
2334                prop_assert_eq!(result.get_result().unwrap(), &expected);
2335
2336                Ok(())
2337            })?;
2338        }
2339
2340        /// **Feature: rust-testing-utilities, Property 1: Execution Status Consistency (Failure)**
2341        ///
2342        /// *For any* handler execution that returns an error, the TestResult status SHALL be `Failed`.
2343        ///
2344        /// **Validates: Requirements 1.3, 1.4**
2345        #[test]
2346        fn prop_execution_status_consistency_failure(error_msg in non_empty_string_strategy()) {
2347            // Use current_thread runtime which is required for tokio::time::pause()
2348            let rt = tokio::runtime::Builder::new_current_thread()
2349                .enable_all()
2350                .build()
2351                .unwrap();
2352            rt.block_on(async {
2353                // Ensure clean state
2354                LocalDurableTestRunner::<String, String>::teardown_test_environment()
2355                    .await
2356                    .unwrap();
2357
2358                // Create a handler that fails with the given error message
2359                let error_msg_clone = error_msg.clone();
2360                let failing_handler = move |_input: String, _ctx: DurableContext| {
2361                    let msg = error_msg_clone.clone();
2362                    async move { Err::<String, DurableError>(DurableError::execution(msg)) }
2363                };
2364
2365                let mut runner = LocalDurableTestRunner::new(failing_handler);
2366                let result = runner.run("test".to_string()).await.unwrap();
2367
2368                // Property: failed handler -> Failed status
2369                prop_assert_eq!(result.get_status(), ExecutionStatus::Failed);
2370                prop_assert!(result.get_error().is_ok());
2371
2372                Ok(())
2373            })?;
2374        }
2375
2376        /// **Feature: rust-testing-utilities, Property 3: Reset Clears State**
2377        ///
2378        /// *For any* LocalDurableTestRunner state, after calling `reset()`, the operations
2379        /// list SHALL be empty and the runner SHALL be ready for a new execution.
2380        ///
2381        /// **Validates: Requirements 1.6**
2382        #[test]
2383        fn prop_reset_clears_state(
2384            input1 in non_empty_string_strategy(),
2385            input2 in non_empty_string_strategy()
2386        ) {
2387            // Use current_thread runtime which is required for tokio::time::pause()
2388            let rt = tokio::runtime::Builder::new_current_thread()
2389                .enable_all()
2390                .build()
2391                .unwrap();
2392            rt.block_on(async {
2393                // Ensure clean state
2394                LocalDurableTestRunner::<String, String>::teardown_test_environment()
2395                    .await
2396                    .unwrap();
2397
2398                // Reset checkpoint worker manager for clean state
2399                CheckpointWorkerManager::reset_instance_for_testing();
2400
2401                async fn simple_handler(
2402                    input: String,
2403                    _ctx: DurableContext,
2404                ) -> Result<String, DurableError> {
2405                    Ok(format!("processed: {}", input))
2406                }
2407
2408                let mut runner = LocalDurableTestRunner::new(simple_handler);
2409
2410                // First run
2411                let _ = runner.run(input1).await.unwrap();
2412
2413                // Reset
2414                runner.reset().await;
2415
2416                // Property: after reset, operation count is 0
2417                prop_assert_eq!(runner.operation_count().await, 0);
2418
2419                // Property: runner is ready for new execution
2420                let result2 = runner.run(input2.clone()).await.unwrap();
2421                prop_assert_eq!(result2.get_status(), ExecutionStatus::Succeeded);
2422                let expected = format!("processed: {}", input2);
2423                prop_assert_eq!(result2.get_result().unwrap(), &expected);
2424
2425                Ok(())
2426            })?;
2427        }
2428
2429        /// **Feature: rust-testing-utilities, Property 7: Operation Lookup Consistency**
2430        ///
2431        /// *For any* operation with name N at index I in the operations list, `get_operation(N)`
2432        /// SHALL return the first operation with name N, and `get_operation_by_index(I)` SHALL
2433        /// return the same operation as direct index access.
2434        ///
2435        /// **Validates: Requirements 4.1, 4.2, 4.3, 4.4**
2436        #[test]
2437        fn prop_operation_lookup_consistency(
2438            names in prop::collection::vec(operation_name_strategy(), 1..=5)
2439        ) {
2440            let rt = tokio::runtime::Runtime::new().unwrap();
2441            rt.block_on(async {
2442                let runner: LocalDurableTestRunner<String, String> =
2443                    LocalDurableTestRunner::new(|input: String, _ctx: DurableContext| async move {
2444                        Ok(input)
2445                    });
2446
2447                // Add operations with the given names
2448                {
2449                    let mut storage = runner.operation_storage.write().await;
2450                    for (i, name) in names.iter().enumerate() {
2451                        let mut op = Operation::new(&format!("op-{}", i), OperationType::Step);
2452                        op.name = Some(name.clone());
2453                        storage.add_operation(op);
2454                    }
2455                }
2456
2457                // Property: get_operation_by_index returns correct operation
2458                for (i, _name) in names.iter().enumerate() {
2459                    let op = runner.get_operation_by_index(i).await;
2460                    prop_assert!(op.is_some());
2461                    let expected_id = format!("op-{}", i);
2462                    prop_assert_eq!(&op.unwrap().operation_id, &expected_id);
2463                }
2464
2465                // Property: get_operation returns first operation with that name
2466                for name in &names {
2467                    let op = runner.get_operation(name).await;
2468                    prop_assert!(op.is_some());
2469                    prop_assert_eq!(op.as_ref().unwrap().name.as_ref().unwrap(), name);
2470                }
2471
2472                // Property: get_operation_by_id returns correct operation
2473                for i in 0..names.len() {
2474                    let op = runner.get_operation_by_id(&format!("op-{}", i)).await;
2475                    prop_assert!(op.is_some());
2476                    let expected_id = format!("op-{}", i);
2477                    prop_assert_eq!(&op.unwrap().operation_id, &expected_id);
2478                }
2479
2480                // Property: get_all_operations returns all operations in order
2481                let all_ops = runner.get_all_operations().await;
2482                prop_assert_eq!(all_ops.len(), names.len());
2483                for (i, op) in all_ops.iter().enumerate() {
2484                    let expected_id = format!("op-{}", i);
2485                    prop_assert_eq!(&op.operation_id, &expected_id);
2486                }
2487
2488                Ok(())
2489            })?;
2490        }
2491
2492        /// **Feature: rust-testing-utilities, Property 10: Function Registration Retrieval**
2493        ///
2494        /// *For any* function registered with name N, the function SHALL be retrievable
2495        /// by that name.
2496        ///
2497        /// **Validates: Requirements 7.1, 7.2, 7.3**
2498        #[test]
2499        fn prop_function_registration_retrieval(
2500            func_names in prop::collection::vec(function_name_strategy(), 1..=5)
2501        ) {
2502            let rt = tokio::runtime::Runtime::new().unwrap();
2503            rt.block_on(async {
2504                let runner: LocalDurableTestRunner<String, String> =
2505                    LocalDurableTestRunner::new(|input: String, _ctx: DurableContext| async move {
2506                        Ok(input)
2507                    });
2508
2509                // Register functions with the given names
2510                for name in &func_names {
2511                    runner.register_function(name.clone(), |_input: serde_json::Value| {
2512                        Ok(serde_json::json!({}))
2513                    }).await;
2514                }
2515
2516                // Property: all registered functions are retrievable
2517                for name in &func_names {
2518                    prop_assert!(
2519                        runner.has_registered_function(name).await,
2520                        "Function '{}' should be registered",
2521                        name
2522                    );
2523                }
2524
2525                // Property: count matches number of unique names
2526                let unique_names: std::collections::HashSet<_> = func_names.iter().collect();
2527                prop_assert_eq!(
2528                    runner.registered_function_count().await,
2529                    unique_names.len()
2530                );
2531
2532                // Property: non-registered functions are not found
2533                prop_assert!(!runner.has_registered_function("__nonexistent__").await);
2534
2535                Ok(())
2536            })?;
2537        }
2538
2539        /// **Feature: rust-testing-utilities, Property 2: Operation Capture Completeness**
2540        ///
2541        /// *For any* sequence of durable operations performed by a handler, all operations
2542        /// SHALL be captured in the TestResult's operations list in execution order.
2543        ///
2544        /// **Validates: Requirements 1.5, 3.5**
2545        #[test]
2546        fn prop_operation_capture_completeness(
2547            num_steps in 1usize..=5,
2548            step_values in prop::collection::vec(1i32..100, 1..=5)
2549        ) {
2550            // Use current_thread runtime which is required for tokio::time::pause()
2551            let rt = tokio::runtime::Builder::new_current_thread()
2552                .enable_all()
2553                .build()
2554                .unwrap();
2555            rt.block_on(async {
2556                // Ensure clean state
2557                LocalDurableTestRunner::<String, String>::teardown_test_environment()
2558                    .await
2559                    .unwrap();
2560
2561                // Create a handler that performs multiple step operations
2562                let num_steps_to_perform = num_steps.min(step_values.len());
2563                let values = step_values.clone();
2564
2565                let multi_step_handler = move |_input: String, ctx: DurableContext| {
2566                    let values = values.clone();
2567                    let num = num_steps_to_perform;
2568                    async move {
2569                        let mut results = Vec::new();
2570                        for i in 0..num {
2571                            let value = values.get(i).copied().unwrap_or(0);
2572                            let step_name = format!("step_{}", i);
2573                            // Use step_named to provide a name for each step
2574                            let result = ctx.step_named(
2575                                &step_name,
2576                                |_| Ok(value * 2),
2577                                None
2578                            ).await?;
2579                            results.push(result);
2580                        }
2581                        Ok::<String, DurableError>(format!("completed {} steps", results.len()))
2582                    }
2583                };
2584
2585                let mut runner = LocalDurableTestRunner::new(multi_step_handler);
2586                let result = runner.run("test".to_string()).await.unwrap();
2587
2588                // Property: All operations should be captured
2589                // Each step creates at least one checkpoint call (START and SUCCEED)
2590                // The operations list should contain entries for each step
2591                let operations = result.get_operations();
2592
2593                // We should have captured operations for each step
2594                // Note: The exact count depends on how operations are recorded
2595                // (START + SUCCEED = 2 per step, or just final state = 1 per step)
2596                // The key property is that we have operations for all steps performed
2597                prop_assert!(
2598                    !operations.is_empty() || num_steps_to_perform == 0,
2599                    "Operations should be captured when steps are performed"
2600                );
2601
2602                // Verify the execution completed successfully
2603                prop_assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2604
2605                Ok(())
2606            })?;
2607        }
2608
2609        /// **Feature: rust-testing-utilities, Property 4: Time Skipping Acceleration**
2610        ///
2611        /// *For any* wait operation with duration D seconds when time skipping is enabled,
2612        /// the actual wall-clock time to complete the wait SHALL be less than D seconds.
2613        ///
2614        /// Note: This test verifies that time skipping is properly configured and that
2615        /// wait operations don't block for their full duration. Due to the nature of
2616        /// durable execution (waits suspend and resume), we verify that the test
2617        /// infrastructure properly handles time advancement.
2618        ///
2619        /// **Validates: Requirements 2.1, 2.2**
2620        #[test]
2621        fn prop_time_skipping_acceleration(
2622            wait_seconds in 5u64..=60
2623        ) {
2624            // Use current_thread runtime which is required for tokio::time::pause()
2625            let rt = tokio::runtime::Builder::new_current_thread()
2626                .enable_all()
2627                .build()
2628                .unwrap();
2629
2630            rt.block_on(async {
2631                // Ensure clean state - teardown first in case previous test left state
2632                LocalDurableTestRunner::<String, String>::teardown_test_environment()
2633                    .await
2634                    .unwrap();
2635
2636                LocalDurableTestRunner::<String, String>::setup_test_environment(
2637                    TestEnvironmentConfig {
2638                        skip_time: true,
2639                        checkpoint_delay: None,
2640                    }
2641                ).await.unwrap();
2642
2643                // Verify time skipping is enabled
2644                prop_assert!(
2645                    LocalDurableTestRunner::<String, String>::is_time_skipping_enabled(),
2646                    "Time skipping should be enabled after setup"
2647                );
2648
2649                // Create a handler that performs a wait operation
2650                let wait_duration = wait_seconds;
2651                let wait_handler = move |_input: String, ctx: DurableContext| {
2652                    async move {
2653                        // Perform a wait operation
2654                        ctx.wait(
2655                            durable_execution_sdk::Duration::from_seconds(wait_duration),
2656                            Some("test_wait")
2657                        ).await?;
2658                        Ok::<String, DurableError>("wait completed".to_string())
2659                    }
2660                };
2661
2662                let mut runner = LocalDurableTestRunner::new(wait_handler);
2663
2664                // Measure wall-clock time for the operation
2665                let start_time = std::time::Instant::now();
2666                let result = runner.run("test".to_string()).await.unwrap();
2667                let elapsed = start_time.elapsed();
2668
2669                // The execution should either:
2670                // 1. Complete quickly (if time skipping works perfectly)
2671                // 2. Return Running status (if wait suspends execution)
2672                // Either way, wall-clock time should be much less than wait_seconds
2673
2674                // Property: Wall-clock time should be significantly less than wait duration
2675                // We use a generous threshold (wait_seconds - 1) to account for test overhead
2676                // but the key is that we're not actually waiting the full duration
2677                let max_allowed_seconds = wait_seconds.saturating_sub(1).max(1);
2678                prop_assert!(
2679                    elapsed.as_secs() < max_allowed_seconds,
2680                    "Wall-clock time ({:?}) should be less than wait duration ({} seconds). \
2681                     Time skipping should prevent actual waiting.",
2682                    elapsed,
2683                    wait_seconds
2684                );
2685
2686                // The result should either be Running (suspended on wait) or Succeeded
2687                // Both are valid outcomes depending on how the mock handles waits
2688                prop_assert!(
2689                    result.get_status() == ExecutionStatus::Running ||
2690                    result.get_status() == ExecutionStatus::Succeeded,
2691                    "Execution should be Running (suspended) or Succeeded, got {:?}",
2692                    result.get_status()
2693                );
2694
2695                // Teardown
2696                LocalDurableTestRunner::<String, String>::teardown_test_environment()
2697                    .await
2698                    .unwrap();
2699
2700                Ok(())
2701            })?;
2702        }
2703    }
2704}