durable_execution_sdk_testing/local_runner.rs
1//! Local test runner for durable executions.
2//!
3//! This module provides the `LocalDurableTestRunner` for executing and testing
4//! durable functions in-process with a simulated checkpoint backend.
5//!
6//! # Examples
7//!
8//! ```ignore
9//! use durable_execution_sdk_testing::{
10//! LocalDurableTestRunner, TestEnvironmentConfig, ExecutionStatus,
11//! };
12//!
13//! #[tokio::test]
14//! async fn test_workflow() {
15//! LocalDurableTestRunner::setup_test_environment(TestEnvironmentConfig {
16//! skip_time: true,
17//! checkpoint_delay: None,
18//! }).await.unwrap();
19//!
20//! let mut runner = LocalDurableTestRunner::new(my_workflow);
21//! let result = runner.run("input".to_string()).await.unwrap();
22//!
23//! assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
24//!
25//! LocalDurableTestRunner::teardown_test_environment().await.unwrap();
26//! }
27//! ```
28
29use std::collections::HashMap;
30use std::future::Future;
31use std::marker::PhantomData;
32use std::pin::Pin;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::sync::Arc;
35
36use serde::de::DeserializeOwned;
37use serde::Serialize;
38use tokio::sync::RwLock;
39
40use crate::checkpoint_server::{
41 ApiType, CheckpointWorkerManager, CheckpointWorkerParams, InvocationResult, SkipTimeConfig,
42 StartDurableExecutionRequest, TestExecutionOrchestrator,
43};
44use crate::error::TestError;
45use crate::mock_client::MockDurableServiceClient;
46use crate::operation::CallbackSender;
47use crate::operation_handle::{OperationHandle, OperationMatcher};
48use crate::test_result::TestResult;
49use crate::types::{ExecutionStatus, Invocation, TestResultError};
50use durable_execution_sdk::{
51 DurableContext, DurableError, DurableServiceClient, ErrorObject, Operation,
52};
53
54/// Global flag indicating whether the test environment has been set up.
55static TEST_ENVIRONMENT_SETUP: AtomicBool = AtomicBool::new(false);
56
57/// Global flag indicating whether time skipping is enabled.
58static TIME_SKIPPING_ENABLED: AtomicBool = AtomicBool::new(false);
59
60/// A `CallbackSender` implementation that delegates to `CheckpointWorkerManager`.
61///
62/// This bridges the `CallbackSender` trait (used by `OperationHandle`) with the
63/// checkpoint server's callback API, enabling handles to send callback responses
64/// during mid-execution interaction.
65struct CheckpointCallbackSender {
66 checkpoint_worker: Arc<CheckpointWorkerManager>,
67}
68
69impl CheckpointCallbackSender {
70 fn new(checkpoint_worker: Arc<CheckpointWorkerManager>) -> Self {
71 Self { checkpoint_worker }
72 }
73}
74
75#[async_trait::async_trait]
76impl CallbackSender for CheckpointCallbackSender {
77 async fn send_success(&self, callback_id: &str, result: &str) -> Result<(), TestError> {
78 self.checkpoint_worker
79 .send_callback_success(callback_id, result)
80 .await
81 .map_err(|e| TestError::CheckpointServerError(e.to_string()))
82 }
83
84 async fn send_failure(
85 &self,
86 callback_id: &str,
87 error: &TestResultError,
88 ) -> Result<(), TestError> {
89 let error_obj = ErrorObject::new(
90 error.error_type.clone().unwrap_or_default(),
91 error.error_message.clone().unwrap_or_default(),
92 );
93 self.checkpoint_worker
94 .send_callback_failure(callback_id, &error_obj)
95 .await
96 .map_err(|e| TestError::CheckpointServerError(e.to_string()))
97 }
98
99 async fn send_heartbeat(&self, callback_id: &str) -> Result<(), TestError> {
100 self.checkpoint_worker
101 .send_callback_heartbeat(callback_id)
102 .await
103 .map_err(|e| TestError::CheckpointServerError(e.to_string()))
104 }
105}
106
107/// Configuration for setting up the test environment.
108///
109/// # Examples
110///
111/// ```
112/// use durable_execution_sdk_testing::TestEnvironmentConfig;
113///
114/// let config = TestEnvironmentConfig {
115/// skip_time: true,
116/// checkpoint_delay: None,
117/// };
118/// ```
119#[derive(Debug, Clone)]
120pub struct TestEnvironmentConfig {
121 /// Enable time skipping for faster test execution.
122 ///
123 /// When enabled, wait operations complete instantly without blocking.
124 pub skip_time: bool,
125
126 /// Optional simulated checkpoint delay in milliseconds.
127 ///
128 /// If set, checkpoint operations will be delayed by this amount.
129 pub checkpoint_delay: Option<u64>,
130}
131
132impl Default for TestEnvironmentConfig {
133 fn default() -> Self {
134 Self {
135 skip_time: true,
136 checkpoint_delay: None,
137 }
138 }
139}
140
141/// Internal storage for operations during test execution.
142#[derive(Debug, Default)]
143struct OperationStorage {
144 /// All operations in execution order
145 operations: Vec<Operation>,
146 /// Map from operation ID to index in operations vec
147 operations_by_id: HashMap<String, usize>,
148 /// Map from operation name to indices in operations vec
149 operations_by_name: HashMap<String, Vec<usize>>,
150}
151
152impl OperationStorage {
153 fn new() -> Self {
154 Self::default()
155 }
156
157 fn add_operation(&mut self, operation: Operation) {
158 let index = self.operations.len();
159 let id = operation.operation_id.clone();
160 let name = operation.name.clone();
161
162 self.operations.push(operation);
163 self.operations_by_id.insert(id, index);
164
165 if let Some(name) = name {
166 self.operations_by_name.entry(name).or_default().push(index);
167 }
168 }
169
170 fn get_by_id(&self, id: &str) -> Option<&Operation> {
171 self.operations_by_id
172 .get(id)
173 .and_then(|&idx| self.operations.get(idx))
174 }
175
176 fn get_by_name(&self, name: &str) -> Option<&Operation> {
177 self.operations_by_name
178 .get(name)
179 .and_then(|indices| indices.first())
180 .and_then(|&idx| self.operations.get(idx))
181 }
182
183 fn get_by_name_and_index(&self, name: &str, index: usize) -> Option<&Operation> {
184 self.operations_by_name
185 .get(name)
186 .and_then(|indices| indices.get(index))
187 .and_then(|&idx| self.operations.get(idx))
188 }
189
190 fn get_by_index(&self, index: usize) -> Option<&Operation> {
191 self.operations.get(index)
192 }
193
194 fn get_all(&self) -> &[Operation] {
195 &self.operations
196 }
197
198 fn clear(&mut self) {
199 self.operations.clear();
200 self.operations_by_id.clear();
201 self.operations_by_name.clear();
202 }
203
204 #[allow(dead_code)]
205 fn len(&self) -> usize {
206 self.operations.len()
207 }
208
209 #[allow(dead_code)]
210 fn is_empty(&self) -> bool {
211 self.operations.is_empty()
212 }
213}
214
215/// Type alias for a shared async function that can be cloned.
216type SharedAsyncFn<I, O> = Arc<
217 dyn Fn(I, DurableContext) -> Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
218 + Send
219 + Sync,
220>;
221
222/// Type alias for a boxed durable function to reduce type complexity.
223type BoxedDurableFn = Box<
224 dyn Fn(
225 serde_json::Value,
226 DurableContext,
227 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, DurableError>> + Send>>
228 + Send
229 + Sync,
230>;
231
232/// A registered function for invoke testing.
233/// Note: The function fields are stored for future use when invoke operations
234/// are fully implemented. Currently marked with allow(dead_code).
235#[allow(dead_code)]
236enum RegisteredFunction {
237 /// A durable function that takes a DurableContext
238 Durable(BoxedDurableFn),
239 /// A regular function that doesn't need a DurableContext
240 Regular(
241 Box<dyn Fn(serde_json::Value) -> Result<serde_json::Value, DurableError> + Send + Sync>,
242 ),
243}
244
245/// Local test runner for durable executions.
246///
247/// Executes durable handlers in-process with a checkpoint server running in a
248/// separate thread, allowing rapid iteration during development without AWS deployment.
249///
250/// The runner uses a CheckpointWorkerManager to manage the checkpoint server thread,
251/// matching the Node.js SDK's architecture for consistent cross-SDK behavior.
252///
253/// # Type Parameters
254///
255/// * `H` - The handler function type
256/// * `I` - The input type (must be deserializable)
257/// * `O` - The output type (must be serializable)
258///
259/// # Examples
260///
261/// ```ignore
262/// use durable_execution_sdk_testing::LocalDurableTestRunner;
263///
264/// async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
265/// let result = ctx.step(|_| Ok(format!("processed: {}", input)), None).await?;
266/// Ok(result)
267/// }
268///
269/// let mut runner = LocalDurableTestRunner::new(my_workflow);
270/// let result = runner.run("hello".to_string()).await.unwrap();
271/// assert_eq!(result.get_result().unwrap(), "processed: hello");
272/// ```
273pub struct LocalDurableTestRunner<I, O>
274where
275 I: DeserializeOwned + Send + 'static,
276 O: Serialize + DeserializeOwned + Send + 'static,
277{
278 /// The handler function to execute (shared for use with orchestrator)
279 handler: SharedAsyncFn<I, O>,
280 /// The checkpoint worker manager (manages checkpoint server thread)
281 checkpoint_worker: Arc<CheckpointWorkerManager>,
282 /// Legacy mock client for backward compatibility (deprecated)
283 #[deprecated(note = "Use checkpoint_worker instead. Retained for backward compatibility.")]
284 mock_client: Arc<MockDurableServiceClient>,
285 /// Storage for captured operations
286 operation_storage: Arc<RwLock<OperationStorage>>,
287 /// Registered functions for chained invoke testing
288 registered_functions: Arc<RwLock<HashMap<String, RegisteredFunction>>>,
289 /// Pre-registered operation handles for lazy population during execution
290 registered_handles: Vec<OperationHandle>,
291 /// Shared operations list for child operation enumeration across handles
292 shared_operations: Arc<RwLock<Vec<Operation>>>,
293 /// Phantom data for type parameters
294 _phantom: PhantomData<(I, O)>,
295}
296
297impl<I, O> LocalDurableTestRunner<I, O>
298where
299 I: DeserializeOwned + Send + Serialize + 'static,
300 O: Serialize + DeserializeOwned + Send + 'static,
301{
302 /// Sets up the test environment for durable execution testing.
303 ///
304 /// This method should be called once before running tests. It configures
305 /// time control and other test infrastructure.
306 ///
307 /// # Arguments
308 ///
309 /// * `config` - Configuration for the test environment
310 ///
311 /// # Requirements
312 ///
313 /// - 2.1: WHEN time skipping is enabled via setup_test_environment(),
314 /// THE Local_Test_Runner SHALL use Tokio's time manipulation to skip wait durations
315 /// - 2.3: WHEN time skipping is disabled, THE Local_Test_Runner SHALL execute
316 /// wait operations with real timing
317 ///
318 /// # Examples
319 ///
320 /// ```ignore
321 /// use durable_execution_sdk_testing::{LocalDurableTestRunner, TestEnvironmentConfig};
322 ///
323 /// LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
324 /// skip_time: true,
325 /// checkpoint_delay: None,
326 /// }).await.unwrap();
327 /// ```
328 pub async fn setup_test_environment(config: TestEnvironmentConfig) -> Result<(), TestError> {
329 // Enable time skipping if configured
330 // Note: Each test with #[tokio::test(flavor = "current_thread")] has its own runtime,
331 // so we always need to pause time in the current runtime, regardless of global state.
332 if config.skip_time {
333 // Pause tokio time to enable instant time advancement
334 // Note: tokio::time::pause() requires current_thread runtime
335 // We use catch_unwind to handle the case where we're in a multi-threaded runtime
336 // or if time is already paused in this runtime
337 use std::panic;
338 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
339 tokio::time::pause();
340 }));
341
342 if let Err(panic_info) = result {
343 // Check if the panic was because we're not in a current_thread runtime
344 let is_runtime_error = panic_info
345 .downcast_ref::<&str>()
346 .map(|msg| msg.contains("current_thread"))
347 .unwrap_or(false)
348 || panic_info
349 .downcast_ref::<String>()
350 .map(|msg| msg.contains("current_thread"))
351 .unwrap_or(false);
352
353 // Check if time is already frozen (this is fine, just continue)
354 let is_already_frozen = panic_info
355 .downcast_ref::<&str>()
356 .map(|msg| msg.contains("already frozen") || msg.contains("already paused"))
357 .unwrap_or(false)
358 || panic_info
359 .downcast_ref::<String>()
360 .map(|msg| msg.contains("already frozen") || msg.contains("already paused"))
361 .unwrap_or(false);
362
363 if is_runtime_error {
364 // We're not in a current_thread runtime, so time control won't work
365 tracing::warn!(
366 "Time control requires current_thread Tokio runtime. \
367 Time skipping may not work correctly."
368 );
369 } else if is_already_frozen {
370 // Time is already frozen in this runtime, which is fine
371 // This can happen if setup is called multiple times
372 } else {
373 // Re-panic for other errors
374 panic::resume_unwind(panic_info);
375 }
376 }
377 TIME_SKIPPING_ENABLED.store(true, Ordering::SeqCst);
378 }
379
380 TEST_ENVIRONMENT_SETUP.store(true, Ordering::SeqCst);
381 Ok(())
382 }
383
384 /// Tears down the test environment.
385 ///
386 /// This method should be called after tests complete to restore normal
387 /// time behavior and clean up test infrastructure.
388 ///
389 /// # Requirements
390 ///
391 /// - 2.4: WHEN teardown_test_environment() is called, THE Local_Test_Runner
392 /// SHALL restore normal time behavior
393 ///
394 /// # Examples
395 ///
396 /// ```ignore
397 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
398 ///
399 /// LocalDurableTestRunner::<String, String>::teardown_test_environment().await.unwrap();
400 /// ```
401 pub async fn teardown_test_environment() -> Result<(), TestError> {
402 // Check if set up
403 if !TEST_ENVIRONMENT_SETUP.load(Ordering::SeqCst) {
404 return Ok(());
405 }
406
407 // Resume tokio time if it was paused
408 // Note: tokio::time::resume() panics if time was never paused,
409 // so we need to check if time is actually paused before resuming
410 if TIME_SKIPPING_ENABLED.load(Ordering::SeqCst) {
411 // Only resume if time is actually paused (check using is_time_paused helper)
412 // Use catch_unwind to handle the case where we're in a multi-threaded runtime
413 use std::panic;
414 let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
415 if crate::time_control::is_time_paused() {
416 tokio::time::resume();
417 }
418 }));
419 TIME_SKIPPING_ENABLED.store(false, Ordering::SeqCst);
420 }
421
422 TEST_ENVIRONMENT_SETUP.store(false, Ordering::SeqCst);
423 Ok(())
424 }
425
426 /// Returns whether the test environment has been set up.
427 pub fn is_environment_setup() -> bool {
428 TEST_ENVIRONMENT_SETUP.load(Ordering::SeqCst)
429 }
430
431 /// Returns whether time skipping is enabled.
432 pub fn is_time_skipping_enabled() -> bool {
433 TIME_SKIPPING_ENABLED.load(Ordering::SeqCst)
434 }
435
436 /// Creates a new local test runner with the given handler.
437 ///
438 /// # Arguments
439 ///
440 /// * `handler` - An async function that takes input and DurableContext
441 ///
442 /// # Requirements
443 ///
444 /// - 1.1: WHEN a developer creates a Local_Test_Runner with a handler function,
445 /// THE Local_Test_Runner SHALL accept any async function that takes a payload and DurableContext
446 /// - 9.1: WHEN a developer creates a Local_Test_Runner, THE Local_Test_Runner
447 /// SHALL spawn a checkpoint server in a separate thread
448 ///
449 /// # Examples
450 ///
451 /// ```ignore
452 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
453 ///
454 /// async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
455 /// Ok(input)
456 /// }
457 ///
458 /// let runner = LocalDurableTestRunner::new(my_workflow);
459 /// ```
460 pub fn new<F, Fut>(handler: F) -> Self
461 where
462 F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
463 Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
464 {
465 // Wrap handler in Arc for sharing with orchestrator
466 let handler: SharedAsyncFn<I, O> = Arc::new(move |input: I, ctx: DurableContext| {
467 let fut = handler(input, ctx);
468 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
469 });
470
471 // Get or create the checkpoint worker manager singleton
472 // If it fails (e.g., in test scenarios with poisoned locks), create a fresh instance
473 let checkpoint_worker = match CheckpointWorkerManager::get_instance(None) {
474 Ok(worker) => worker,
475 Err(_) => {
476 // Reset and try again
477 CheckpointWorkerManager::reset_instance_for_testing();
478 CheckpointWorkerManager::get_instance(None)
479 .expect("Failed to create CheckpointWorkerManager after reset")
480 }
481 };
482
483 #[allow(deprecated)]
484 Self {
485 handler,
486 checkpoint_worker,
487 mock_client: Arc::new(MockDurableServiceClient::new().with_checkpoint_responses(100)),
488 operation_storage: Arc::new(RwLock::new(OperationStorage::new())),
489 registered_functions: Arc::new(RwLock::new(HashMap::new())),
490 registered_handles: Vec::new(),
491 shared_operations: Arc::new(RwLock::new(Vec::new())),
492 _phantom: PhantomData,
493 }
494 }
495
496 /// Creates a new local test runner with a custom mock client.
497 ///
498 /// # Arguments
499 ///
500 /// * `handler` - An async function that takes input and DurableContext
501 /// * `mock_client` - A pre-configured mock client (deprecated, use checkpoint_worker)
502 #[deprecated(note = "Use new() instead. The checkpoint worker manager is now the default.")]
503 pub fn with_mock_client<F, Fut>(handler: F, mock_client: MockDurableServiceClient) -> Self
504 where
505 F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
506 Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
507 {
508 // Wrap handler in Arc for sharing with orchestrator
509 let handler: SharedAsyncFn<I, O> = Arc::new(move |input: I, ctx: DurableContext| {
510 let fut = handler(input, ctx);
511 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
512 });
513
514 // Get or create the checkpoint worker manager singleton
515 let checkpoint_worker = CheckpointWorkerManager::get_instance(None)
516 .expect("Failed to create CheckpointWorkerManager");
517
518 #[allow(deprecated)]
519 Self {
520 handler,
521 checkpoint_worker,
522 mock_client: Arc::new(mock_client),
523 operation_storage: Arc::new(RwLock::new(OperationStorage::new())),
524 registered_functions: Arc::new(RwLock::new(HashMap::new())),
525 registered_handles: Vec::new(),
526 shared_operations: Arc::new(RwLock::new(Vec::new())),
527 _phantom: PhantomData,
528 }
529 }
530
531 /// Creates a new local test runner with custom checkpoint worker parameters.
532 ///
533 /// # Arguments
534 ///
535 /// * `handler` - An async function that takes input and DurableContext
536 /// * `params` - Configuration parameters for the checkpoint worker
537 ///
538 /// # Requirements
539 ///
540 /// - 9.2: WHEN the checkpoint server is running, THE Checkpoint_Worker_Manager
541 /// SHALL manage the lifecycle of the server thread
542 pub fn with_checkpoint_params<F, Fut>(handler: F, params: CheckpointWorkerParams) -> Self
543 where
544 F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
545 Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
546 {
547 // Wrap handler in Arc for sharing with orchestrator
548 let handler: SharedAsyncFn<I, O> = Arc::new(move |input: I, ctx: DurableContext| {
549 let fut = handler(input, ctx);
550 Box::pin(fut) as Pin<Box<dyn Future<Output = Result<O, DurableError>> + Send>>
551 });
552
553 // Get or create the checkpoint worker manager singleton with custom params
554 let checkpoint_worker = CheckpointWorkerManager::get_instance(Some(params))
555 .expect("Failed to create CheckpointWorkerManager");
556
557 #[allow(deprecated)]
558 Self {
559 handler,
560 checkpoint_worker,
561 mock_client: Arc::new(MockDurableServiceClient::new().with_checkpoint_responses(100)),
562 operation_storage: Arc::new(RwLock::new(OperationStorage::new())),
563 registered_functions: Arc::new(RwLock::new(HashMap::new())),
564 registered_handles: Vec::new(),
565 shared_operations: Arc::new(RwLock::new(Vec::new())),
566 _phantom: PhantomData,
567 }
568 }
569
570 /// Returns a reference to the checkpoint worker manager.
571 pub fn checkpoint_worker(&self) -> &Arc<CheckpointWorkerManager> {
572 &self.checkpoint_worker
573 }
574
575 /// Returns a reference to the mock client (deprecated).
576 #[deprecated(note = "Use checkpoint_worker() instead.")]
577 #[allow(deprecated)]
578 pub fn mock_client(&self) -> &Arc<MockDurableServiceClient> {
579 &self.mock_client
580 }
581
582 /// Returns the number of captured operations.
583 pub async fn operation_count(&self) -> usize {
584 self.operation_storage.read().await.len()
585 }
586
587 /// Executes the handler and returns a `RunFuture` that resolves to the test result.
588 ///
589 /// Accepts either a raw payload or an `InvokeRequest` wrapper (via `impl Into<InvokeRequest<I>>`).
590 /// The execution is spawned as a tokio task so callers can `await` the result
591 /// concurrently with `OperationHandle` interactions (e.g., mid-execution callbacks).
592 ///
593 /// # Arguments
594 ///
595 /// * `input` - A payload or `InvokeRequest<I>` to pass to the handler.
596 /// Raw payloads are automatically wrapped via `From<I> for InvokeRequest<I>`.
597 ///
598 /// # Requirements
599 ///
600 /// - 1.2: WHEN a developer calls run() with a payload, THE Local_Test_Runner
601 /// SHALL execute the handler function and return a Test_Result
602 /// - 1.3: WHEN the handler function completes successfully, THE Test_Result
603 /// SHALL contain the execution result and status SUCCEEDED
604 /// - 1.4: WHEN the handler function fails with an error, THE Test_Result
605 /// SHALL contain the error details and status FAILED
606 /// - 1.5: WHEN the handler function performs durable operations, THE Local_Test_Runner
607 /// SHALL capture all operations in the Test_Result
608 /// - 2.1: WHEN a developer calls run(), THE Local_Test_Runner SHALL return a future
609 /// that resolves to a Test_Result when the handler completes
610 /// - 9.1: THE Local_Test_Runner SHALL accept an InvokeRequest struct with an optional
611 /// payload field for the run() method
612 /// - 9.2: WHEN run() is called with an InvokeRequest containing a payload,
613 /// THE Local_Test_Runner SHALL pass the payload to the handler function
614 /// - 9.3: WHEN run() is called with an InvokeRequest containing no payload,
615 /// THE Local_Test_Runner SHALL use a default empty payload
616 ///
617 /// # Examples
618 ///
619 /// ```ignore
620 /// use durable_execution_sdk_testing::{LocalDurableTestRunner, InvokeRequest};
621 ///
622 /// let mut runner = LocalDurableTestRunner::new(my_workflow);
623 ///
624 /// // With a raw payload (backward compatible)
625 /// let result = runner.run("hello".to_string()).await.unwrap();
626 ///
627 /// // With an InvokeRequest
628 /// let result = runner.run(InvokeRequest::with_payload("hello".to_string())).await.unwrap();
629 ///
630 /// // Concurrent interaction with operation handles
631 /// let handle = runner.get_operation_handle("my-callback");
632 /// let run_future = runner.run("input".to_string());
633 /// handle.wait_for_data(WaitingOperationStatus::Submitted).await.unwrap();
634 /// handle.send_callback_success("result").await.unwrap();
635 /// let result = run_future.await.unwrap();
636 /// ```
637 pub fn run(
638 &mut self,
639 input: impl Into<crate::types::InvokeRequest<I>>,
640 ) -> crate::run_future::RunFuture<O>
641 where
642 I: Clone + Send + Sync + 'static,
643 O: Send + 'static,
644 {
645 let invoke_request: crate::types::InvokeRequest<I> = input.into();
646
647 // Extract payload from InvokeRequest.
648 // When payload is None (InvokeRequest::new()), attempt to create a default
649 // by deserializing from JSON null. This works for types like String (""),
650 // serde_json::Value (Null), Option<T> (None), etc.
651 let payload_result: Result<I, _> = match invoke_request.payload {
652 Some(p) => Ok(p),
653 None => serde_json::from_value(serde_json::Value::Null),
654 };
655
656 // If we can't create a default payload, return an error future immediately
657 let payload = match payload_result {
658 Ok(p) => p,
659 Err(e) => {
660 return crate::run_future::RunFuture::from_future(Box::pin(async move {
661 Err(TestError::InvalidConfiguration(format!(
662 "InvokeRequest has no payload and the input type cannot be \
663 deserialized from null: {}. Use InvokeRequest::with_payload() \
664 to provide a value.",
665 e
666 )))
667 }));
668 }
669 };
670
671 // Clear previous state before starting execution
672 // We use try_write to avoid blocking; the async block will also clear.
673 if let Ok(mut storage) = self.operation_storage.try_write() {
674 storage.clear();
675 }
676 #[allow(deprecated)]
677 self.mock_client.clear_all_calls();
678
679 // Capture all the state we need for the async execution
680 let handler = Arc::clone(&self.handler);
681 let checkpoint_worker = self.checkpoint_worker.clone();
682 let operation_storage = self.operation_storage.clone();
683 let registered_handles = self.registered_handles.clone();
684 let shared_operations = self.shared_operations.clone();
685
686 crate::run_future::RunFuture::from_future(Box::pin(async move {
687 use crate::checkpoint_server::OperationStorage as OrchestratorOperationStorage;
688
689 // Ensure operation storage is cleared
690 operation_storage.write().await.clear();
691
692 // Configure time skipping based on test environment settings
693 let skip_time_config = SkipTimeConfig {
694 enabled: LocalDurableTestRunner::<I, O>::is_time_skipping_enabled(),
695 };
696
697 // Create shared operation storage for the orchestrator
698 let orchestrator_storage =
699 Arc::new(tokio::sync::RwLock::new(OrchestratorOperationStorage::new()));
700
701 // Create the orchestrator with the shared handler
702 let handler_clone = Arc::clone(&handler);
703 let mut orchestrator = TestExecutionOrchestrator::new(
704 move |input: I, ctx: DurableContext| {
705 let handler = Arc::clone(&handler_clone);
706 async move { handler(input, ctx).await }
707 },
708 orchestrator_storage.clone(),
709 checkpoint_worker.clone(),
710 skip_time_config,
711 );
712
713 // Pass pre-registered operation handles to the orchestrator
714 if !registered_handles.is_empty() {
715 let callback_sender: Option<Arc<dyn CallbackSender>> = Some(Arc::new(
716 CheckpointCallbackSender::new(checkpoint_worker.clone()),
717 ));
718 orchestrator = orchestrator.with_handles(
719 registered_handles,
720 shared_operations,
721 callback_sender,
722 );
723 }
724
725 // Execute using the orchestrator
726 let execution_result = orchestrator.execute_handler(payload).await?;
727
728 // Copy operations from orchestrator storage to our storage
729 {
730 let orch_storage = orchestrator_storage.read().await;
731 let mut our_storage = operation_storage.write().await;
732 for op in orch_storage.get_all() {
733 our_storage.add_operation(op.clone());
734 }
735 }
736
737 // Convert TestExecutionResult to TestResult
738 let mut test_result = match execution_result.status {
739 ExecutionStatus::Succeeded => {
740 if let Some(result) = execution_result.result {
741 TestResult::success(result, execution_result.operations)
742 } else {
743 TestResult::with_status(
744 ExecutionStatus::Succeeded,
745 execution_result.operations,
746 )
747 }
748 }
749 ExecutionStatus::Failed => {
750 if let Some(error) = execution_result.error {
751 TestResult::failure(error, execution_result.operations)
752 } else {
753 TestResult::with_status(
754 ExecutionStatus::Failed,
755 execution_result.operations,
756 )
757 }
758 }
759 ExecutionStatus::Running => {
760 TestResult::with_status(ExecutionStatus::Running, execution_result.operations)
761 }
762 _ => TestResult::with_status(execution_result.status, execution_result.operations),
763 };
764
765 // Add invocations
766 for invocation in execution_result.invocations {
767 test_result.add_invocation(invocation);
768 }
769
770 // Retrieve and add Node.js-compatible history events
771 if let Ok(nodejs_events) = checkpoint_worker
772 .get_nodejs_history_events(&execution_result.execution_id)
773 .await
774 {
775 test_result.set_nodejs_history_events(nodejs_events);
776 }
777
778 Ok(test_result)
779 }))
780 }
781
782 /// Executes the handler with a single invocation (no re-invocation on suspend).
783 ///
784 /// This method performs a single handler invocation without using the orchestrator.
785 /// If the handler suspends (e.g., due to a wait operation), the result will have
786 /// status `Running` and the execution will not be automatically resumed.
787 ///
788 /// Use this method when you want to test the initial invocation behavior without
789 /// automatic wait completion and re-invocation.
790 ///
791 /// # Arguments
792 ///
793 /// * `payload` - The input payload to pass to the handler
794 ///
795 /// # Examples
796 ///
797 /// ```ignore
798 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
799 ///
800 /// let mut runner = LocalDurableTestRunner::new(my_workflow_with_wait);
801 /// let result = runner.run_single_invocation("hello".to_string()).await.unwrap();
802 ///
803 /// // Handler suspended on wait, execution is still running
804 /// assert_eq!(result.get_status(), ExecutionStatus::Running);
805 /// ```
806 pub async fn run_single_invocation(&mut self, payload: I) -> Result<TestResult<O>, TestError> {
807 use durable_execution_sdk::lambda::InitialExecutionState;
808 use durable_execution_sdk::state::ExecutionState;
809
810 // Clear previous operations
811 self.operation_storage.write().await.clear();
812
813 // Also clear mock client state for backward compatibility
814 #[allow(deprecated)]
815 self.mock_client.clear_all_calls();
816
817 // Serialize the payload for the checkpoint server
818 let payload_json = serde_json::to_string(&payload)?;
819
820 // Start execution with the checkpoint server
821 // This registers the execution and returns a checkpoint token
822 let invocation_id = uuid::Uuid::new_v4().to_string();
823 let start_request = StartDurableExecutionRequest {
824 invocation_id: invocation_id.clone(),
825 payload: Some(payload_json),
826 };
827 let start_payload = serde_json::to_string(&start_request)?;
828
829 let start_response = self
830 .checkpoint_worker
831 .send_api_request(ApiType::StartDurableExecution, start_payload)
832 .await?;
833
834 if let Some(error) = start_response.error {
835 return Err(TestError::CheckpointServerError(error));
836 }
837
838 let invocation_result: InvocationResult =
839 serde_json::from_str(&start_response.payload.ok_or_else(|| {
840 TestError::CheckpointServerError(
841 "Empty response from checkpoint server".to_string(),
842 )
843 })?)?;
844
845 let execution_arn = invocation_result.execution_id;
846 let checkpoint_token = invocation_result.checkpoint_token;
847
848 // Create initial execution state (empty for new execution)
849 let initial_state = InitialExecutionState::new();
850
851 // Create the execution state with the checkpoint worker manager
852 // The checkpoint worker implements DurableServiceClient trait
853 let execution_state = Arc::new(ExecutionState::new(
854 &execution_arn,
855 &checkpoint_token,
856 initial_state,
857 self.checkpoint_worker.clone(),
858 ));
859
860 // Create the durable context
861 let ctx = DurableContext::new(execution_state.clone());
862
863 // Record invocation start
864 let start_time = chrono::Utc::now();
865 let mut invocation = Invocation::with_start(start_time);
866
867 // Execute the handler
868 let handler_result = (self.handler)(payload, ctx).await;
869
870 // Record invocation end
871 let end_time = chrono::Utc::now();
872 invocation = invocation.with_end(end_time);
873
874 // Retrieve operations from the checkpoint server
875 let operations = match self
876 .checkpoint_worker
877 .get_operations(&execution_arn, "")
878 .await
879 {
880 Ok(response) => {
881 let mut storage = self.operation_storage.write().await;
882 for op in &response.operations {
883 storage.add_operation(op.clone());
884 }
885 response.operations
886 }
887 Err(_) => {
888 // If we can't get operations from checkpoint server, return empty list
889 Vec::new()
890 }
891 };
892
893 // Build the test result based on handler outcome
894 match handler_result {
895 Ok(result) => {
896 let mut test_result = TestResult::success(result, operations);
897 test_result.add_invocation(invocation);
898 Ok(test_result)
899 }
900 Err(error) => {
901 // Check if this is a suspend error (which means pending, not failed)
902 if error.is_suspend() {
903 let mut test_result =
904 TestResult::with_status(ExecutionStatus::Running, operations);
905 test_result.add_invocation(invocation);
906 Ok(test_result)
907 } else {
908 // Convert DurableError to ErrorObject to get the error type
909 let error_obj = durable_execution_sdk::ErrorObject::from(&error);
910 let test_error = TestResultError::new(error_obj.error_type, error.to_string());
911 invocation = invocation.with_error(test_error.clone());
912 let mut test_result = TestResult::failure(test_error, operations);
913 test_result.add_invocation(invocation);
914 Ok(test_result)
915 }
916 }
917 }
918 }
919
920 /// Executes the handler with the given payload using the TestExecutionOrchestrator.
921 ///
922 /// This method uses the TestExecutionOrchestrator to manage the full execution
923 /// lifecycle, including:
924 /// - Polling for checkpoint updates
925 /// - Processing wait operations and scheduling re-invocations
926 /// - Handling time skipping for wait operations
927 /// - Managing callback completions
928 ///
929 /// This is the recommended method for testing workflows with wait operations,
930 /// as it properly handles the full execution lifecycle including re-invocations.
931 ///
932 /// # Arguments
933 ///
934 /// * `payload` - The input payload to pass to the handler
935 ///
936 /// # Requirements
937 ///
938 /// - 16.1: WHEN a wait operation is encountered, THE Test_Execution_Orchestrator
939 /// SHALL track the wait's scheduled end timestamp
940 /// - 16.2: WHEN time skipping is enabled and a wait's scheduled end time is reached,
941 /// THE Test_Execution_Orchestrator SHALL mark the wait as SUCCEEDED and schedule
942 /// handler re-invocation
943 /// - 16.3: WHEN time skipping is enabled, THE Test_Execution_Orchestrator SHALL use
944 /// tokio::time::advance() to skip wait durations instantly
945 /// - 16.4: WHEN a handler invocation returns PENDING status, THE Test_Execution_Orchestrator
946 /// SHALL continue polling for operation updates and re-invoke the handler when
947 /// operations complete
948 /// - 16.5: WHEN a handler invocation returns SUCCEEDED or FAILED status,
949 /// THE Test_Execution_Orchestrator SHALL resolve the execution and stop polling
950 ///
951 /// # Examples
952 ///
953 /// ```ignore
954 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
955 ///
956 /// let mut runner = LocalDurableTestRunner::new(my_workflow_with_waits);
957 /// let result = runner.run_with_orchestrator("hello".to_string()).await.unwrap();
958 ///
959 /// // Wait operations are automatically completed with time skipping
960 /// assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
961 /// ```
962 pub async fn run_with_orchestrator(&mut self, payload: I) -> Result<TestResult<O>, TestError>
963 where
964 I: Clone,
965 {
966 use crate::checkpoint_server::OperationStorage as OrchestratorOperationStorage;
967
968 // Clear previous operations
969 self.operation_storage.write().await.clear();
970
971 // Also clear mock client state for backward compatibility
972 #[allow(deprecated)]
973 self.mock_client.clear_all_calls();
974
975 // Configure time skipping based on test environment settings
976 let skip_time_config = SkipTimeConfig {
977 enabled: Self::is_time_skipping_enabled(),
978 };
979
980 // Create shared operation storage for the orchestrator
981 let orchestrator_storage =
982 Arc::new(tokio::sync::RwLock::new(OrchestratorOperationStorage::new()));
983
984 // Clone the handler Arc for use in the orchestrator
985 let handler = Arc::clone(&self.handler);
986
987 // Create the orchestrator with the shared handler
988 // The orchestrator will manage the execution lifecycle
989 let mut orchestrator = TestExecutionOrchestrator::new(
990 move |input: I, ctx: DurableContext| {
991 let handler = Arc::clone(&handler);
992 async move { handler(input, ctx).await }
993 },
994 orchestrator_storage.clone(),
995 self.checkpoint_worker.clone(),
996 skip_time_config,
997 );
998
999 // Pass pre-registered operation handles to the orchestrator so it can
1000 // populate them as operations are created/updated during execution.
1001 if !self.registered_handles.is_empty() {
1002 let callback_sender: Option<Arc<dyn CallbackSender>> = Some(Arc::new(
1003 CheckpointCallbackSender::new(self.checkpoint_worker.clone()),
1004 ));
1005 orchestrator = orchestrator.with_handles(
1006 self.registered_handles.clone(),
1007 self.shared_operations.clone(),
1008 callback_sender,
1009 );
1010 }
1011
1012 // Execute using the orchestrator
1013 let execution_result = orchestrator.execute_handler(payload.clone()).await?;
1014
1015 // Copy operations from orchestrator storage to our storage
1016 {
1017 let orch_storage = orchestrator_storage.read().await;
1018 let mut our_storage = self.operation_storage.write().await;
1019 for op in orch_storage.get_all() {
1020 our_storage.add_operation(op.clone());
1021 }
1022 }
1023
1024 // Convert TestExecutionResult to TestResult
1025 let mut test_result = match execution_result.status {
1026 ExecutionStatus::Succeeded => {
1027 if let Some(result) = execution_result.result {
1028 TestResult::success(result, execution_result.operations)
1029 } else {
1030 TestResult::with_status(ExecutionStatus::Succeeded, execution_result.operations)
1031 }
1032 }
1033 ExecutionStatus::Failed => {
1034 if let Some(error) = execution_result.error {
1035 TestResult::failure(error, execution_result.operations)
1036 } else {
1037 TestResult::with_status(ExecutionStatus::Failed, execution_result.operations)
1038 }
1039 }
1040 ExecutionStatus::Running => {
1041 TestResult::with_status(ExecutionStatus::Running, execution_result.operations)
1042 }
1043 _ => TestResult::with_status(execution_result.status, execution_result.operations),
1044 };
1045
1046 // Add invocations
1047 for invocation in execution_result.invocations {
1048 test_result.add_invocation(invocation);
1049 }
1050
1051 // Retrieve and add Node.js-compatible history events
1052 if let Ok(nodejs_events) = self
1053 .checkpoint_worker
1054 .get_nodejs_history_events(&execution_result.execution_id)
1055 .await
1056 {
1057 test_result.set_nodejs_history_events(nodejs_events);
1058 }
1059
1060 Ok(test_result)
1061 }
1062
1063 /// Returns a lazy `OperationHandle` that populates when an operation
1064 /// matching the given name is created during execution.
1065 ///
1066 /// # Arguments
1067 ///
1068 /// * `name` - The operation name to match against
1069 ///
1070 /// # Requirements
1071 ///
1072 /// - 1.1: WHEN a developer calls `get_operation_handle(name)` on the Local_Test_Runner
1073 /// before calling `run()`, THE Local_Test_Runner SHALL return an Operation_Handle
1074 /// that is initially unpopulated
1075 ///
1076 /// # Examples
1077 ///
1078 /// ```ignore
1079 /// let handle = runner.get_operation_handle("my-callback");
1080 /// // handle is unpopulated until run() executes and produces a matching operation
1081 /// ```
1082 pub fn get_operation_handle(&mut self, name: &str) -> OperationHandle {
1083 let handle = OperationHandle::new(
1084 OperationMatcher::ByName(name.to_string()),
1085 self.shared_operations.clone(),
1086 );
1087 self.registered_handles.push(handle.clone());
1088 handle
1089 }
1090
1091 /// Returns a lazy `OperationHandle` that populates with the operation
1092 /// at the given execution order index.
1093 ///
1094 /// # Arguments
1095 ///
1096 /// * `index` - The zero-based execution order index
1097 ///
1098 /// # Requirements
1099 ///
1100 /// - 1.7: WHEN `get_operation_handle_by_index(index)` is called before `run()`,
1101 /// THE Local_Test_Runner SHALL return an Operation_Handle that populates with
1102 /// the operation at that execution order index
1103 ///
1104 /// # Examples
1105 ///
1106 /// ```ignore
1107 /// let handle = runner.get_operation_handle_by_index(0);
1108 /// // handle populates with the first operation created during execution
1109 /// ```
1110 pub fn get_operation_handle_by_index(&mut self, index: usize) -> OperationHandle {
1111 let handle = OperationHandle::new(
1112 OperationMatcher::ByIndex(index),
1113 self.shared_operations.clone(),
1114 );
1115 self.registered_handles.push(handle.clone());
1116 handle
1117 }
1118
1119 /// Returns a lazy `OperationHandle` that populates with the operation
1120 /// matching the given unique ID.
1121 ///
1122 /// # Arguments
1123 ///
1124 /// * `id` - The unique operation ID to match against
1125 ///
1126 /// # Requirements
1127 ///
1128 /// - 1.8: WHEN `get_operation_handle_by_id(id)` is called before `run()`,
1129 /// THE Local_Test_Runner SHALL return an Operation_Handle that populates with
1130 /// the operation matching that unique ID
1131 ///
1132 /// # Examples
1133 ///
1134 /// ```ignore
1135 /// let handle = runner.get_operation_handle_by_id("op-abc-123");
1136 /// // handle populates with the operation whose ID matches during execution
1137 /// ```
1138 pub fn get_operation_handle_by_id(&mut self, id: &str) -> OperationHandle {
1139 let handle = OperationHandle::new(
1140 OperationMatcher::ById(id.to_string()),
1141 self.shared_operations.clone(),
1142 );
1143 self.registered_handles.push(handle.clone());
1144 handle
1145 }
1146
1147 /// Resets the test runner state for a fresh test run.
1148 ///
1149 /// This method clears all captured operations and resets the checkpoint server
1150 /// state, allowing the runner to be reused for multiple test scenarios.
1151 ///
1152 /// # Requirements
1153 ///
1154 /// - 1.6: WHEN reset() is called, THE Local_Test_Runner SHALL clear all
1155 /// captured operations and reset checkpoint server state
1156 ///
1157 /// # Examples
1158 ///
1159 /// ```ignore
1160 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1161 ///
1162 /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1163 ///
1164 /// // First test run
1165 /// let result1 = runner.run("input1".to_string()).await.unwrap();
1166 ///
1167 /// // Reset for fresh state
1168 /// runner.reset().await;
1169 ///
1170 /// // Second test run with clean state
1171 /// let result2 = runner.run("input2".to_string()).await.unwrap();
1172 /// ```
1173 pub async fn reset(&mut self) {
1174 // Clear operation storage
1175 self.operation_storage.write().await.clear();
1176
1177 // Reset checkpoint worker manager singleton for fresh state
1178 CheckpointWorkerManager::reset_instance_for_testing();
1179
1180 // Re-acquire the checkpoint worker manager
1181 self.checkpoint_worker = CheckpointWorkerManager::get_instance(None)
1182 .expect("Failed to create CheckpointWorkerManager after reset");
1183
1184 // Clear registered operation handles and shared operations
1185 self.registered_handles.clear();
1186 self.shared_operations.write().await.clear();
1187
1188 // Also clear mock client state for backward compatibility
1189 #[allow(deprecated)]
1190 self.mock_client.clear_all_calls();
1191 }
1192
1193 /// Gets an operation by its unique ID.
1194 ///
1195 /// # Arguments
1196 ///
1197 /// * `id` - The unique operation ID
1198 ///
1199 /// # Returns
1200 ///
1201 /// The operation if found, or `None` if no operation with that ID exists.
1202 ///
1203 /// # Requirements
1204 ///
1205 /// - 4.1: WHEN get_operation(name) is called, THE Test_Result SHALL return
1206 /// the first operation with that name
1207 /// - 4.4: WHEN get_operation_by_id(id) is called, THE Test_Result SHALL return
1208 /// the operation with that unique ID
1209 ///
1210 /// # Examples
1211 ///
1212 /// ```ignore
1213 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1214 ///
1215 /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1216 /// let _ = runner.run("input".to_string()).await.unwrap();
1217 ///
1218 /// if let Some(op) = runner.get_operation_by_id("op-123").await {
1219 /// println!("Found operation: {:?}", op);
1220 /// }
1221 /// ```
1222 pub async fn get_operation_by_id(&self, id: &str) -> Option<Operation> {
1223 self.operation_storage.read().await.get_by_id(id).cloned()
1224 }
1225
1226 /// Gets the first operation with the given name.
1227 ///
1228 /// # Arguments
1229 ///
1230 /// * `name` - The operation name to search for
1231 ///
1232 /// # Returns
1233 ///
1234 /// The first operation with that name, or `None` if no operation with that name exists.
1235 ///
1236 /// # Requirements
1237 ///
1238 /// - 4.1: WHEN get_operation(name) is called, THE Test_Result SHALL return
1239 /// the first operation with that name
1240 ///
1241 /// # Examples
1242 ///
1243 /// ```ignore
1244 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1245 ///
1246 /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1247 /// let _ = runner.run("input".to_string()).await.unwrap();
1248 ///
1249 /// if let Some(op) = runner.get_operation("process_data").await {
1250 /// println!("Found operation: {:?}", op);
1251 /// }
1252 /// ```
1253 pub async fn get_operation(&self, name: &str) -> Option<Operation> {
1254 self.operation_storage
1255 .read()
1256 .await
1257 .get_by_name(name)
1258 .cloned()
1259 }
1260
1261 /// Gets an operation by its index in the execution order.
1262 ///
1263 /// # Arguments
1264 ///
1265 /// * `index` - The zero-based index of the operation
1266 ///
1267 /// # Returns
1268 ///
1269 /// The operation at that index, or `None` if the index is out of bounds.
1270 ///
1271 /// # Requirements
1272 ///
1273 /// - 4.2: WHEN get_operation_by_index(index) is called, THE Test_Result SHALL return
1274 /// the operation at that execution order index
1275 ///
1276 /// # Examples
1277 ///
1278 /// ```ignore
1279 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1280 ///
1281 /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1282 /// let _ = runner.run("input".to_string()).await.unwrap();
1283 ///
1284 /// // Get the first operation
1285 /// if let Some(op) = runner.get_operation_by_index(0).await {
1286 /// println!("First operation: {:?}", op);
1287 /// }
1288 /// ```
1289 pub async fn get_operation_by_index(&self, index: usize) -> Option<Operation> {
1290 self.operation_storage
1291 .read()
1292 .await
1293 .get_by_index(index)
1294 .cloned()
1295 }
1296
1297 /// Gets an operation by name and occurrence index.
1298 ///
1299 /// This is useful when multiple operations have the same name and you need
1300 /// to access a specific occurrence.
1301 ///
1302 /// # Arguments
1303 ///
1304 /// * `name` - The operation name to search for
1305 /// * `index` - The zero-based index among operations with that name
1306 ///
1307 /// # Returns
1308 ///
1309 /// The operation at that name/index combination, or `None` if not found.
1310 ///
1311 /// # Requirements
1312 ///
1313 /// - 4.3: WHEN get_operation_by_name_and_index(name, index) is called,
1314 /// THE Test_Result SHALL return the Nth operation with that name
1315 ///
1316 /// # Examples
1317 ///
1318 /// ```ignore
1319 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1320 ///
1321 /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1322 /// let _ = runner.run("input".to_string()).await.unwrap();
1323 ///
1324 /// // Get the second "process" operation
1325 /// if let Some(op) = runner.get_operation_by_name_and_index("process", 1).await {
1326 /// println!("Second process operation: {:?}", op);
1327 /// }
1328 /// ```
1329 pub async fn get_operation_by_name_and_index(
1330 &self,
1331 name: &str,
1332 index: usize,
1333 ) -> Option<Operation> {
1334 self.operation_storage
1335 .read()
1336 .await
1337 .get_by_name_and_index(name, index)
1338 .cloned()
1339 }
1340
1341 /// Gets all captured operations.
1342 ///
1343 /// # Returns
1344 ///
1345 /// A vector of all operations in execution order.
1346 ///
1347 /// # Requirements
1348 ///
1349 /// - 4.5: WHEN get_all_operations() is called, THE Test_Result SHALL return
1350 /// all operations in execution order
1351 ///
1352 /// # Examples
1353 ///
1354 /// ```ignore
1355 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1356 ///
1357 /// let mut runner = LocalDurableTestRunner::new(my_workflow);
1358 /// let _ = runner.run("input".to_string()).await.unwrap();
1359 ///
1360 /// let all_ops = runner.get_all_operations().await;
1361 /// println!("Total operations: {}", all_ops.len());
1362 /// ```
1363 pub async fn get_all_operations(&self) -> Vec<Operation> {
1364 self.operation_storage.read().await.get_all().to_vec()
1365 }
1366
1367 /// Registers a durable function for invoke testing.
1368 ///
1369 /// Durable functions receive a `DurableContext` and can perform durable operations.
1370 /// When the main handler invokes a function by name, the registered function
1371 /// will be called.
1372 ///
1373 /// # Arguments
1374 ///
1375 /// * `name` - The name to register the function under
1376 /// * `func` - The durable function to register
1377 ///
1378 /// # Requirements
1379 ///
1380 /// - 7.1: WHEN register_durable_function(name, func) is called, THE Local_Test_Runner
1381 /// SHALL store the function for invoke handling
1382 /// - 7.2: WHEN a registered durable function is invoked, THE Local_Test_Runner
1383 /// SHALL execute it with a DurableContext
1384 ///
1385 /// # Examples
1386 ///
1387 /// ```ignore
1388 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1389 ///
1390 /// async fn helper_workflow(input: serde_json::Value, ctx: DurableContext) -> Result<serde_json::Value, DurableError> {
1391 /// Ok(serde_json::json!({"processed": true}))
1392 /// }
1393 ///
1394 /// let mut runner = LocalDurableTestRunner::new(main_workflow);
1395 /// runner.register_durable_function("helper", helper_workflow).await;
1396 /// ```
1397 pub async fn register_durable_function<F, Fut>(&self, name: impl Into<String>, func: F)
1398 where
1399 F: Fn(serde_json::Value, DurableContext) -> Fut + Send + Sync + 'static,
1400 Fut: Future<Output = Result<serde_json::Value, DurableError>> + Send + 'static,
1401 {
1402 let boxed_func = Box::new(move |input: serde_json::Value, ctx: DurableContext| {
1403 let fut = func(input, ctx);
1404 Box::pin(fut)
1405 as Pin<Box<dyn Future<Output = Result<serde_json::Value, DurableError>> + Send>>
1406 });
1407
1408 self.registered_functions
1409 .write()
1410 .await
1411 .insert(name.into(), RegisteredFunction::Durable(boxed_func));
1412 }
1413
1414 /// Registers a regular (non-durable) function for invoke testing.
1415 ///
1416 /// Regular functions do not receive a `DurableContext` and cannot perform
1417 /// durable operations. They are useful for testing simple helper functions.
1418 ///
1419 /// # Arguments
1420 ///
1421 /// * `name` - The name to register the function under
1422 /// * `func` - The regular function to register
1423 ///
1424 /// # Requirements
1425 ///
1426 /// - 7.3: WHEN register_function(name, func) is called, THE Local_Test_Runner
1427 /// SHALL store the function for invoke handling
1428 /// - 7.4: WHEN a registered regular function is invoked, THE Local_Test_Runner
1429 /// SHALL execute it without a DurableContext
1430 ///
1431 /// # Examples
1432 ///
1433 /// ```ignore
1434 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1435 ///
1436 /// fn simple_helper(input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
1437 /// Ok(serde_json::json!({"result": "done"}))
1438 /// }
1439 ///
1440 /// let mut runner = LocalDurableTestRunner::new(main_workflow);
1441 /// runner.register_function("simple_helper", simple_helper).await;
1442 /// ```
1443 pub async fn register_function<F>(&self, name: impl Into<String>, func: F)
1444 where
1445 F: Fn(serde_json::Value) -> Result<serde_json::Value, DurableError> + Send + Sync + 'static,
1446 {
1447 self.registered_functions
1448 .write()
1449 .await
1450 .insert(name.into(), RegisteredFunction::Regular(Box::new(func)));
1451 }
1452
1453 /// Gets a registered function by name.
1454 ///
1455 /// # Arguments
1456 ///
1457 /// * `name` - The name of the function to retrieve
1458 ///
1459 /// # Returns
1460 ///
1461 /// `true` if a function with that name is registered, `false` otherwise.
1462 ///
1463 /// # Examples
1464 ///
1465 /// ```ignore
1466 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1467 ///
1468 /// let runner = LocalDurableTestRunner::new(main_workflow);
1469 /// runner.register_function("helper", |_| Ok(serde_json::json!({}))).await;
1470 ///
1471 /// assert!(runner.has_registered_function("helper").await);
1472 /// assert!(!runner.has_registered_function("nonexistent").await);
1473 /// ```
1474 pub async fn has_registered_function(&self, name: &str) -> bool {
1475 self.registered_functions.read().await.contains_key(name)
1476 }
1477
1478 /// Gets the count of registered functions.
1479 ///
1480 /// # Returns
1481 ///
1482 /// The number of registered functions.
1483 pub async fn registered_function_count(&self) -> usize {
1484 self.registered_functions.read().await.len()
1485 }
1486
1487 /// Clears all registered functions.
1488 ///
1489 /// # Examples
1490 ///
1491 /// ```ignore
1492 /// use durable_execution_sdk_testing::LocalDurableTestRunner;
1493 ///
1494 /// let mut runner = LocalDurableTestRunner::new(main_workflow);
1495 /// runner.register_function("helper", |_| Ok(serde_json::json!({}))).await;
1496 /// assert_eq!(runner.registered_function_count().await, 1);
1497 ///
1498 /// runner.clear_registered_functions().await;
1499 /// assert_eq!(runner.registered_function_count().await, 0);
1500 /// ```
1501 pub async fn clear_registered_functions(&mut self) {
1502 self.registered_functions.write().await.clear();
1503 }
1504}
1505
1506impl<I, O> std::fmt::Debug for LocalDurableTestRunner<I, O>
1507where
1508 I: DeserializeOwned + Send + 'static,
1509 O: Serialize + DeserializeOwned + Send + 'static,
1510{
1511 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1512 f.debug_struct("LocalDurableTestRunner")
1513 .field("checkpoint_worker", &"CheckpointWorkerManager")
1514 .finish()
1515 }
1516}
1517
1518#[cfg(test)]
1519mod tests {
1520 use super::*;
1521 use durable_execution_sdk::OperationType;
1522
1523 async fn simple_handler(input: String, _ctx: DurableContext) -> Result<String, DurableError> {
1524 Ok(format!("processed: {}", input))
1525 }
1526
1527 #[test]
1528 fn test_test_environment_config_default() {
1529 let config = TestEnvironmentConfig::default();
1530 assert!(config.skip_time);
1531 assert!(config.checkpoint_delay.is_none());
1532 }
1533
1534 #[test]
1535 fn test_operation_storage_new() {
1536 let storage = OperationStorage::new();
1537 assert!(storage.is_empty());
1538 assert_eq!(storage.len(), 0);
1539 }
1540
1541 #[test]
1542 fn test_operation_storage_add_and_get() {
1543 let mut storage = OperationStorage::new();
1544
1545 let mut op1 = Operation::new("op-1", OperationType::Step);
1546 op1.name = Some("step1".to_string());
1547 storage.add_operation(op1);
1548
1549 let mut op2 = Operation::new("op-2", OperationType::Wait);
1550 op2.name = Some("wait1".to_string());
1551 storage.add_operation(op2);
1552
1553 assert_eq!(storage.len(), 2);
1554
1555 // Get by ID
1556 let found = storage.get_by_id("op-1");
1557 assert!(found.is_some());
1558 assert_eq!(found.unwrap().operation_id, "op-1");
1559
1560 // Get by name
1561 let found = storage.get_by_name("step1");
1562 assert!(found.is_some());
1563 assert_eq!(found.unwrap().operation_id, "op-1");
1564
1565 // Get by index
1566 let found = storage.get_by_index(1);
1567 assert!(found.is_some());
1568 assert_eq!(found.unwrap().operation_id, "op-2");
1569
1570 // Get all
1571 let all = storage.get_all();
1572 assert_eq!(all.len(), 2);
1573 }
1574
1575 #[test]
1576 fn test_operation_storage_get_by_name_and_index() {
1577 let mut storage = OperationStorage::new();
1578
1579 // Add multiple operations with the same name
1580 let mut op1 = Operation::new("op-1", OperationType::Step);
1581 op1.name = Some("step".to_string());
1582 storage.add_operation(op1);
1583
1584 let mut op2 = Operation::new("op-2", OperationType::Step);
1585 op2.name = Some("step".to_string());
1586 storage.add_operation(op2);
1587
1588 let mut op3 = Operation::new("op-3", OperationType::Step);
1589 op3.name = Some("step".to_string());
1590 storage.add_operation(op3);
1591
1592 // Get first occurrence
1593 let found = storage.get_by_name_and_index("step", 0);
1594 assert!(found.is_some());
1595 assert_eq!(found.unwrap().operation_id, "op-1");
1596
1597 // Get second occurrence
1598 let found = storage.get_by_name_and_index("step", 1);
1599 assert!(found.is_some());
1600 assert_eq!(found.unwrap().operation_id, "op-2");
1601
1602 // Get third occurrence
1603 let found = storage.get_by_name_and_index("step", 2);
1604 assert!(found.is_some());
1605 assert_eq!(found.unwrap().operation_id, "op-3");
1606
1607 // Out of bounds
1608 let found = storage.get_by_name_and_index("step", 3);
1609 assert!(found.is_none());
1610 }
1611
1612 #[test]
1613 fn test_operation_storage_clear() {
1614 let mut storage = OperationStorage::new();
1615
1616 let op = Operation::new("op-1", OperationType::Step);
1617 storage.add_operation(op);
1618 assert_eq!(storage.len(), 1);
1619
1620 storage.clear();
1621 assert!(storage.is_empty());
1622 assert!(storage.get_by_id("op-1").is_none());
1623 }
1624
1625 #[tokio::test]
1626 async fn test_local_runner_creation() {
1627 let runner = LocalDurableTestRunner::new(simple_handler);
1628 assert_eq!(runner.operation_count().await, 0);
1629 }
1630
1631 #[tokio::test]
1632 #[allow(deprecated)]
1633 async fn test_local_runner_with_mock_client() {
1634 let mock_client = MockDurableServiceClient::new().with_checkpoint_responses(10);
1635 let runner = LocalDurableTestRunner::with_mock_client(simple_handler, mock_client);
1636 assert_eq!(runner.operation_count().await, 0);
1637 }
1638
1639 #[tokio::test]
1640 async fn test_setup_teardown_environment() {
1641 // Ensure clean state
1642 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1643 .await
1644 .unwrap();
1645
1646 assert!(!LocalDurableTestRunner::<String, String>::is_environment_setup());
1647 assert!(!LocalDurableTestRunner::<String, String>::is_time_skipping_enabled());
1648
1649 // Setup with time skipping
1650 LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
1651 skip_time: true,
1652 checkpoint_delay: None,
1653 })
1654 .await
1655 .unwrap();
1656
1657 assert!(LocalDurableTestRunner::<String, String>::is_environment_setup());
1658 assert!(LocalDurableTestRunner::<String, String>::is_time_skipping_enabled());
1659
1660 // Teardown
1661 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1662 .await
1663 .unwrap();
1664
1665 assert!(!LocalDurableTestRunner::<String, String>::is_environment_setup());
1666 assert!(!LocalDurableTestRunner::<String, String>::is_time_skipping_enabled());
1667 }
1668
1669 #[tokio::test]
1670 async fn test_setup_without_time_skipping() {
1671 // Ensure clean state
1672 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1673 .await
1674 .unwrap();
1675
1676 // Setup without time skipping
1677 LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
1678 skip_time: false,
1679 checkpoint_delay: None,
1680 })
1681 .await
1682 .unwrap();
1683
1684 assert!(LocalDurableTestRunner::<String, String>::is_environment_setup());
1685 assert!(!LocalDurableTestRunner::<String, String>::is_time_skipping_enabled());
1686
1687 // Teardown
1688 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1689 .await
1690 .unwrap();
1691 }
1692
1693 #[tokio::test]
1694 async fn test_double_setup_is_idempotent() {
1695 // Ensure clean state
1696 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1697 .await
1698 .unwrap();
1699
1700 // First setup
1701 LocalDurableTestRunner::<String, String>::setup_test_environment(
1702 TestEnvironmentConfig::default(),
1703 )
1704 .await
1705 .unwrap();
1706
1707 // Second setup should be idempotent
1708 LocalDurableTestRunner::<String, String>::setup_test_environment(
1709 TestEnvironmentConfig::default(),
1710 )
1711 .await
1712 .unwrap();
1713
1714 assert!(LocalDurableTestRunner::<String, String>::is_environment_setup());
1715
1716 // Teardown
1717 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1718 .await
1719 .unwrap();
1720 }
1721
1722 #[tokio::test]
1723 async fn test_double_teardown_is_idempotent() {
1724 // Ensure clean state
1725 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1726 .await
1727 .unwrap();
1728
1729 // Setup
1730 LocalDurableTestRunner::<String, String>::setup_test_environment(
1731 TestEnvironmentConfig::default(),
1732 )
1733 .await
1734 .unwrap();
1735
1736 // First teardown
1737 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1738 .await
1739 .unwrap();
1740
1741 // Second teardown should be idempotent
1742 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1743 .await
1744 .unwrap();
1745
1746 assert!(!LocalDurableTestRunner::<String, String>::is_environment_setup());
1747 }
1748
1749 // Tests for run() method - Subtask 8.3
1750
1751 #[tokio::test]
1752 async fn test_run_successful_execution() {
1753 // Ensure clean state
1754 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1755 .await
1756 .unwrap();
1757
1758 let mut runner = LocalDurableTestRunner::new(simple_handler);
1759 let result = runner.run("hello".to_string()).await.unwrap();
1760
1761 // Verify successful execution status
1762 assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
1763
1764 // Verify result value
1765 let output = result.get_result().unwrap();
1766 assert_eq!(output, "processed: hello");
1767
1768 // Verify invocation was recorded
1769 assert_eq!(result.get_invocations().len(), 1);
1770 }
1771
1772 #[tokio::test]
1773 async fn test_run_failed_execution() {
1774 async fn failing_handler(
1775 _input: String,
1776 _ctx: DurableContext,
1777 ) -> Result<String, DurableError> {
1778 Err(DurableError::execution("Test failure"))
1779 }
1780
1781 // Ensure clean state
1782 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1783 .await
1784 .unwrap();
1785
1786 let mut runner = LocalDurableTestRunner::new(failing_handler);
1787 let result = runner.run("hello".to_string()).await.unwrap();
1788
1789 // Verify failed execution status
1790 assert_eq!(result.get_status(), ExecutionStatus::Failed);
1791
1792 // Verify error is captured
1793 let error = result.get_error().unwrap();
1794 assert!(error
1795 .error_message
1796 .as_ref()
1797 .unwrap()
1798 .contains("Test failure"));
1799
1800 // Verify invocation was recorded with error
1801 let invocations = result.get_invocations();
1802 assert_eq!(invocations.len(), 1);
1803 assert!(invocations[0].error.is_some());
1804 }
1805
1806 #[tokio::test]
1807 async fn test_run_multiple_times_clears_previous_operations() {
1808 // Ensure clean state
1809 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1810 .await
1811 .unwrap();
1812
1813 let mut runner = LocalDurableTestRunner::new(simple_handler);
1814
1815 // First run
1816 let result1 = runner.run("first".to_string()).await.unwrap();
1817 assert_eq!(result1.get_status(), ExecutionStatus::Succeeded);
1818 assert_eq!(result1.get_result().unwrap(), "processed: first");
1819
1820 // Second run - should have fresh state
1821 let result2 = runner.run("second".to_string()).await.unwrap();
1822 assert_eq!(result2.get_status(), ExecutionStatus::Succeeded);
1823 assert_eq!(result2.get_result().unwrap(), "processed: second");
1824 }
1825
1826 // Tests for reset() method - Subtask 8.4
1827
1828 #[tokio::test]
1829 async fn test_reset_clears_operation_storage() {
1830 // Ensure clean state
1831 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1832 .await
1833 .unwrap();
1834
1835 // Reset checkpoint worker manager for clean state
1836 CheckpointWorkerManager::reset_instance_for_testing();
1837
1838 let mut runner = LocalDurableTestRunner::new(simple_handler);
1839
1840 // Run to generate some state
1841 let _ = runner.run("hello".to_string()).await.unwrap();
1842
1843 // Reset the runner
1844 runner.reset().await;
1845
1846 // Verify operation storage is cleared
1847 assert_eq!(runner.operation_count().await, 0);
1848 }
1849
1850 #[tokio::test]
1851 async fn test_reset_allows_fresh_run() {
1852 // Ensure clean state
1853 LocalDurableTestRunner::<String, String>::teardown_test_environment()
1854 .await
1855 .unwrap();
1856
1857 let mut runner = LocalDurableTestRunner::new(simple_handler);
1858
1859 // First run
1860 let result1 = runner.run("first".to_string()).await.unwrap();
1861 assert_eq!(result1.get_result().unwrap(), "processed: first");
1862
1863 // Reset
1864 runner.reset().await;
1865
1866 // Second run after reset
1867 let result2 = runner.run("second".to_string()).await.unwrap();
1868 assert_eq!(result2.get_result().unwrap(), "processed: second");
1869 assert_eq!(result2.get_status(), ExecutionStatus::Succeeded);
1870 }
1871
1872 // Tests for operation lookup methods - Subtask 8.5
1873
1874 #[tokio::test]
1875 async fn test_get_operation_by_id() {
1876 let runner: LocalDurableTestRunner<String, String> =
1877 LocalDurableTestRunner::new(simple_handler);
1878
1879 // Manually add an operation to storage for testing
1880 {
1881 let mut storage = runner.operation_storage.write().await;
1882 let mut op = Operation::new("test-op-id", OperationType::Step);
1883 op.name = Some("test_step".to_string());
1884 storage.add_operation(op);
1885 }
1886
1887 // Test get_operation_by_id
1888 let found = runner.get_operation_by_id("test-op-id").await;
1889 assert!(found.is_some());
1890 assert_eq!(found.unwrap().operation_id, "test-op-id");
1891
1892 // Test not found
1893 let not_found = runner.get_operation_by_id("nonexistent").await;
1894 assert!(not_found.is_none());
1895 }
1896
1897 #[tokio::test]
1898 async fn test_get_operation_by_name() {
1899 let runner: LocalDurableTestRunner<String, String> =
1900 LocalDurableTestRunner::new(simple_handler);
1901
1902 // Manually add operations to storage for testing
1903 {
1904 let mut storage = runner.operation_storage.write().await;
1905 let mut op1 = Operation::new("op-1", OperationType::Step);
1906 op1.name = Some("process".to_string());
1907 storage.add_operation(op1);
1908
1909 let mut op2 = Operation::new("op-2", OperationType::Step);
1910 op2.name = Some("validate".to_string());
1911 storage.add_operation(op2);
1912 }
1913
1914 // Test get_operation (by name)
1915 let found = runner.get_operation("process").await;
1916 assert!(found.is_some());
1917 assert_eq!(found.unwrap().operation_id, "op-1");
1918
1919 // Test not found
1920 let not_found = runner.get_operation("nonexistent").await;
1921 assert!(not_found.is_none());
1922 }
1923
1924 #[tokio::test]
1925 async fn test_get_operation_by_index() {
1926 let runner: LocalDurableTestRunner<String, String> =
1927 LocalDurableTestRunner::new(simple_handler);
1928
1929 // Manually add operations to storage for testing
1930 {
1931 let mut storage = runner.operation_storage.write().await;
1932 storage.add_operation(Operation::new("op-0", OperationType::Step));
1933 storage.add_operation(Operation::new("op-1", OperationType::Wait));
1934 storage.add_operation(Operation::new("op-2", OperationType::Callback));
1935 }
1936
1937 // Test get_operation_by_index
1938 let op0 = runner.get_operation_by_index(0).await;
1939 assert!(op0.is_some());
1940 assert_eq!(op0.unwrap().operation_id, "op-0");
1941
1942 let op1 = runner.get_operation_by_index(1).await;
1943 assert!(op1.is_some());
1944 assert_eq!(op1.unwrap().operation_id, "op-1");
1945
1946 let op2 = runner.get_operation_by_index(2).await;
1947 assert!(op2.is_some());
1948 assert_eq!(op2.unwrap().operation_id, "op-2");
1949
1950 // Test out of bounds
1951 let out_of_bounds = runner.get_operation_by_index(3).await;
1952 assert!(out_of_bounds.is_none());
1953 }
1954
1955 #[tokio::test]
1956 async fn test_get_operation_by_name_and_index() {
1957 let runner: LocalDurableTestRunner<String, String> =
1958 LocalDurableTestRunner::new(simple_handler);
1959
1960 // Manually add operations with same name to storage for testing
1961 {
1962 let mut storage = runner.operation_storage.write().await;
1963 let mut op1 = Operation::new("op-1", OperationType::Step);
1964 op1.name = Some("process".to_string());
1965 storage.add_operation(op1);
1966
1967 let mut op2 = Operation::new("op-2", OperationType::Step);
1968 op2.name = Some("process".to_string());
1969 storage.add_operation(op2);
1970
1971 let mut op3 = Operation::new("op-3", OperationType::Step);
1972 op3.name = Some("process".to_string());
1973 storage.add_operation(op3);
1974 }
1975
1976 // Test get_operation_by_name_and_index
1977 let first = runner.get_operation_by_name_and_index("process", 0).await;
1978 assert!(first.is_some());
1979 assert_eq!(first.unwrap().operation_id, "op-1");
1980
1981 let second = runner.get_operation_by_name_and_index("process", 1).await;
1982 assert!(second.is_some());
1983 assert_eq!(second.unwrap().operation_id, "op-2");
1984
1985 let third = runner.get_operation_by_name_and_index("process", 2).await;
1986 assert!(third.is_some());
1987 assert_eq!(third.unwrap().operation_id, "op-3");
1988
1989 // Test out of bounds
1990 let out_of_bounds = runner.get_operation_by_name_and_index("process", 3).await;
1991 assert!(out_of_bounds.is_none());
1992 }
1993
1994 #[tokio::test]
1995 async fn test_get_all_operations() {
1996 let runner: LocalDurableTestRunner<String, String> =
1997 LocalDurableTestRunner::new(simple_handler);
1998
1999 // Manually add operations to storage for testing
2000 {
2001 let mut storage = runner.operation_storage.write().await;
2002 storage.add_operation(Operation::new("op-0", OperationType::Step));
2003 storage.add_operation(Operation::new("op-1", OperationType::Wait));
2004 storage.add_operation(Operation::new("op-2", OperationType::Callback));
2005 }
2006
2007 // Test get_all_operations
2008 let all_ops = runner.get_all_operations().await;
2009 assert_eq!(all_ops.len(), 3);
2010 assert_eq!(all_ops[0].operation_id, "op-0");
2011 assert_eq!(all_ops[1].operation_id, "op-1");
2012 assert_eq!(all_ops[2].operation_id, "op-2");
2013 }
2014
2015 // Tests for function registration - Subtask 8.6
2016
2017 #[tokio::test]
2018 async fn test_register_durable_function() {
2019 async fn helper_func(
2020 _input: serde_json::Value,
2021 _ctx: DurableContext,
2022 ) -> Result<serde_json::Value, DurableError> {
2023 Ok(serde_json::json!({"result": "ok"}))
2024 }
2025
2026 let runner: LocalDurableTestRunner<String, String> =
2027 LocalDurableTestRunner::new(simple_handler);
2028
2029 // Register a durable function
2030 runner
2031 .register_durable_function("helper", helper_func)
2032 .await;
2033
2034 // Verify it's registered
2035 assert!(runner.has_registered_function("helper").await);
2036 assert_eq!(runner.registered_function_count().await, 1);
2037 }
2038
2039 #[tokio::test]
2040 async fn test_register_regular_function() {
2041 fn simple_func(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2042 Ok(serde_json::json!({"result": "ok"}))
2043 }
2044
2045 let runner: LocalDurableTestRunner<String, String> =
2046 LocalDurableTestRunner::new(simple_handler);
2047
2048 // Register a regular function
2049 runner.register_function("simple", simple_func).await;
2050
2051 // Verify it's registered
2052 assert!(runner.has_registered_function("simple").await);
2053 assert_eq!(runner.registered_function_count().await, 1);
2054 }
2055
2056 #[tokio::test]
2057 async fn test_register_multiple_functions() {
2058 async fn durable_func(
2059 _input: serde_json::Value,
2060 _ctx: DurableContext,
2061 ) -> Result<serde_json::Value, DurableError> {
2062 Ok(serde_json::json!({}))
2063 }
2064
2065 fn regular_func(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2066 Ok(serde_json::json!({}))
2067 }
2068
2069 let runner: LocalDurableTestRunner<String, String> =
2070 LocalDurableTestRunner::new(simple_handler);
2071
2072 // Register multiple functions
2073 runner
2074 .register_durable_function("durable1", durable_func)
2075 .await;
2076 runner.register_function("regular1", regular_func).await;
2077 runner
2078 .register_durable_function("durable2", durable_func)
2079 .await;
2080
2081 // Verify all are registered
2082 assert!(runner.has_registered_function("durable1").await);
2083 assert!(runner.has_registered_function("regular1").await);
2084 assert!(runner.has_registered_function("durable2").await);
2085 assert!(!runner.has_registered_function("nonexistent").await);
2086 assert_eq!(runner.registered_function_count().await, 3);
2087 }
2088
2089 #[tokio::test]
2090 async fn test_clear_registered_functions() {
2091 fn simple_func(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2092 Ok(serde_json::json!({}))
2093 }
2094
2095 let mut runner: LocalDurableTestRunner<String, String> =
2096 LocalDurableTestRunner::new(simple_handler);
2097
2098 // Register functions
2099 runner.register_function("func1", simple_func).await;
2100 runner.register_function("func2", simple_func).await;
2101 assert_eq!(runner.registered_function_count().await, 2);
2102
2103 // Clear all registered functions
2104 runner.clear_registered_functions().await;
2105 assert_eq!(runner.registered_function_count().await, 0);
2106 assert!(!runner.has_registered_function("func1").await);
2107 assert!(!runner.has_registered_function("func2").await);
2108 }
2109
2110 #[tokio::test]
2111 async fn test_register_function_overwrites_existing() {
2112 fn func_v1(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2113 Ok(serde_json::json!({"version": 1}))
2114 }
2115
2116 fn func_v2(_input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
2117 Ok(serde_json::json!({"version": 2}))
2118 }
2119
2120 let runner: LocalDurableTestRunner<String, String> =
2121 LocalDurableTestRunner::new(simple_handler);
2122
2123 // Register first version
2124 runner.register_function("func", func_v1).await;
2125 assert_eq!(runner.registered_function_count().await, 1);
2126
2127 // Register second version with same name
2128 runner.register_function("func", func_v2).await;
2129
2130 // Should still have only one function (overwritten)
2131 assert_eq!(runner.registered_function_count().await, 1);
2132 assert!(runner.has_registered_function("func").await);
2133 }
2134
2135 // Tests for run_with_orchestrator() method - Subtask 8.1.2
2136
2137 #[tokio::test]
2138 async fn test_run_with_orchestrator_successful_execution() {
2139 // Ensure clean state
2140 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2141 .await
2142 .unwrap();
2143
2144 let mut runner = LocalDurableTestRunner::new(simple_handler);
2145 let result = runner
2146 .run_with_orchestrator("hello".to_string())
2147 .await
2148 .unwrap();
2149
2150 // Verify successful execution status
2151 assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2152
2153 // Verify result value
2154 let output = result.get_result().unwrap();
2155 assert_eq!(output, "processed: hello");
2156
2157 // Verify invocation was recorded
2158 assert!(!result.get_invocations().is_empty());
2159 }
2160
2161 #[tokio::test]
2162 async fn test_run_with_orchestrator_failed_execution() {
2163 async fn failing_handler(
2164 _input: String,
2165 _ctx: DurableContext,
2166 ) -> Result<String, DurableError> {
2167 Err(DurableError::execution("Test failure"))
2168 }
2169
2170 // Ensure clean state
2171 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2172 .await
2173 .unwrap();
2174
2175 let mut runner = LocalDurableTestRunner::new(failing_handler);
2176 let result = runner
2177 .run_with_orchestrator("hello".to_string())
2178 .await
2179 .unwrap();
2180
2181 // Verify failed execution status
2182 assert_eq!(result.get_status(), ExecutionStatus::Failed);
2183
2184 // Verify error is captured
2185 let error = result.get_error().unwrap();
2186 assert!(error
2187 .error_message
2188 .as_ref()
2189 .unwrap()
2190 .contains("Test failure"));
2191 }
2192
2193 #[tokio::test]
2194 async fn test_run_with_orchestrator_with_time_skipping() {
2195 // Ensure clean state
2196 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2197 .await
2198 .unwrap();
2199
2200 // Setup with time skipping enabled
2201 LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
2202 skip_time: true,
2203 checkpoint_delay: None,
2204 })
2205 .await
2206 .unwrap();
2207
2208 let mut runner = LocalDurableTestRunner::new(simple_handler);
2209 let result = runner
2210 .run_with_orchestrator("hello".to_string())
2211 .await
2212 .unwrap();
2213
2214 // Verify successful execution status
2215 assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2216
2217 // Teardown
2218 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2219 .await
2220 .unwrap();
2221 }
2222
2223 #[tokio::test]
2224 async fn test_run_with_orchestrator_populates_nodejs_history_events() {
2225 use crate::checkpoint_server::NodeJsEventType;
2226
2227 // Ensure clean state
2228 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2229 .await
2230 .unwrap();
2231
2232 let mut runner = LocalDurableTestRunner::new(simple_handler);
2233 let result = runner
2234 .run_with_orchestrator("hello".to_string())
2235 .await
2236 .unwrap();
2237
2238 // Verify successful execution status
2239 assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2240
2241 // Verify Node.js history events are populated
2242 let nodejs_events = result.get_nodejs_history_events();
2243 assert!(
2244 !nodejs_events.is_empty(),
2245 "Node.js history events should be populated"
2246 );
2247
2248 // Verify the first event is ExecutionStarted
2249 assert_eq!(
2250 nodejs_events[0].event_type,
2251 NodeJsEventType::ExecutionStarted,
2252 "First event should be ExecutionStarted"
2253 );
2254
2255 // Verify event IDs are sequential starting from 1
2256 for (i, event) in nodejs_events.iter().enumerate() {
2257 assert_eq!(
2258 event.event_id,
2259 (i + 1) as u64,
2260 "Event IDs should be sequential starting from 1"
2261 );
2262 }
2263
2264 // Verify timestamps are in ISO 8601 format
2265 for event in nodejs_events {
2266 assert!(
2267 event.event_timestamp.contains('T') && event.event_timestamp.contains('Z'),
2268 "Timestamps should be in ISO 8601 format"
2269 );
2270 }
2271 }
2272}
2273
2274/// Property-based tests for LocalDurableTestRunner
2275///
2276/// These tests verify the correctness properties defined in the design document.
2277#[cfg(test)]
2278mod property_tests {
2279 use super::*;
2280 use durable_execution_sdk::OperationType;
2281 use proptest::prelude::*;
2282
2283 /// Strategy for generating non-empty strings (for inputs)
2284 fn non_empty_string_strategy() -> impl Strategy<Value = String> {
2285 "[a-zA-Z0-9_ ]{1,32}".prop_map(|s| s)
2286 }
2287
2288 /// Strategy for generating operation names
2289 fn operation_name_strategy() -> impl Strategy<Value = String> {
2290 "[a-zA-Z_][a-zA-Z0-9_]{0,15}".prop_map(|s| s)
2291 }
2292
2293 /// Strategy for generating function names
2294 fn function_name_strategy() -> impl Strategy<Value = String> {
2295 "[a-zA-Z_][a-zA-Z0-9_]{0,15}".prop_map(|s| s)
2296 }
2297
2298 proptest! {
2299 /// **Feature: rust-testing-utilities, Property 1: Execution Status Consistency**
2300 ///
2301 /// *For any* handler execution, if the handler returns `Ok(value)`, the TestResult
2302 /// status SHALL be `Succeeded`, and if the handler returns `Err(error)`, the TestResult
2303 /// status SHALL be `Failed`.
2304 ///
2305 /// **Validates: Requirements 1.3, 1.4**
2306 #[test]
2307 fn prop_execution_status_consistency_success(input in non_empty_string_strategy()) {
2308 // Use current_thread runtime which is required for tokio::time::pause()
2309 let rt = tokio::runtime::Builder::new_current_thread()
2310 .enable_all()
2311 .build()
2312 .unwrap();
2313 rt.block_on(async {
2314 // Ensure clean state
2315 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2316 .await
2317 .unwrap();
2318
2319 // Handler that always succeeds
2320 async fn success_handler(
2321 input: String,
2322 _ctx: DurableContext,
2323 ) -> Result<String, DurableError> {
2324 Ok(format!("success: {}", input))
2325 }
2326
2327 let mut runner = LocalDurableTestRunner::new(success_handler);
2328 let result = runner.run(input.clone()).await.unwrap();
2329
2330 // Property: successful handler -> Succeeded status
2331 prop_assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2332 prop_assert!(result.get_result().is_ok());
2333 let expected = format!("success: {}", input);
2334 prop_assert_eq!(result.get_result().unwrap(), &expected);
2335
2336 Ok(())
2337 })?;
2338 }
2339
2340 /// **Feature: rust-testing-utilities, Property 1: Execution Status Consistency (Failure)**
2341 ///
2342 /// *For any* handler execution that returns an error, the TestResult status SHALL be `Failed`.
2343 ///
2344 /// **Validates: Requirements 1.3, 1.4**
2345 #[test]
2346 fn prop_execution_status_consistency_failure(error_msg in non_empty_string_strategy()) {
2347 // Use current_thread runtime which is required for tokio::time::pause()
2348 let rt = tokio::runtime::Builder::new_current_thread()
2349 .enable_all()
2350 .build()
2351 .unwrap();
2352 rt.block_on(async {
2353 // Ensure clean state
2354 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2355 .await
2356 .unwrap();
2357
2358 // Create a handler that fails with the given error message
2359 let error_msg_clone = error_msg.clone();
2360 let failing_handler = move |_input: String, _ctx: DurableContext| {
2361 let msg = error_msg_clone.clone();
2362 async move { Err::<String, DurableError>(DurableError::execution(msg)) }
2363 };
2364
2365 let mut runner = LocalDurableTestRunner::new(failing_handler);
2366 let result = runner.run("test".to_string()).await.unwrap();
2367
2368 // Property: failed handler -> Failed status
2369 prop_assert_eq!(result.get_status(), ExecutionStatus::Failed);
2370 prop_assert!(result.get_error().is_ok());
2371
2372 Ok(())
2373 })?;
2374 }
2375
2376 /// **Feature: rust-testing-utilities, Property 3: Reset Clears State**
2377 ///
2378 /// *For any* LocalDurableTestRunner state, after calling `reset()`, the operations
2379 /// list SHALL be empty and the runner SHALL be ready for a new execution.
2380 ///
2381 /// **Validates: Requirements 1.6**
2382 #[test]
2383 fn prop_reset_clears_state(
2384 input1 in non_empty_string_strategy(),
2385 input2 in non_empty_string_strategy()
2386 ) {
2387 // Use current_thread runtime which is required for tokio::time::pause()
2388 let rt = tokio::runtime::Builder::new_current_thread()
2389 .enable_all()
2390 .build()
2391 .unwrap();
2392 rt.block_on(async {
2393 // Ensure clean state
2394 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2395 .await
2396 .unwrap();
2397
2398 // Reset checkpoint worker manager for clean state
2399 CheckpointWorkerManager::reset_instance_for_testing();
2400
2401 async fn simple_handler(
2402 input: String,
2403 _ctx: DurableContext,
2404 ) -> Result<String, DurableError> {
2405 Ok(format!("processed: {}", input))
2406 }
2407
2408 let mut runner = LocalDurableTestRunner::new(simple_handler);
2409
2410 // First run
2411 let _ = runner.run(input1).await.unwrap();
2412
2413 // Reset
2414 runner.reset().await;
2415
2416 // Property: after reset, operation count is 0
2417 prop_assert_eq!(runner.operation_count().await, 0);
2418
2419 // Property: runner is ready for new execution
2420 let result2 = runner.run(input2.clone()).await.unwrap();
2421 prop_assert_eq!(result2.get_status(), ExecutionStatus::Succeeded);
2422 let expected = format!("processed: {}", input2);
2423 prop_assert_eq!(result2.get_result().unwrap(), &expected);
2424
2425 Ok(())
2426 })?;
2427 }
2428
2429 /// **Feature: rust-testing-utilities, Property 7: Operation Lookup Consistency**
2430 ///
2431 /// *For any* operation with name N at index I in the operations list, `get_operation(N)`
2432 /// SHALL return the first operation with name N, and `get_operation_by_index(I)` SHALL
2433 /// return the same operation as direct index access.
2434 ///
2435 /// **Validates: Requirements 4.1, 4.2, 4.3, 4.4**
2436 #[test]
2437 fn prop_operation_lookup_consistency(
2438 names in prop::collection::vec(operation_name_strategy(), 1..=5)
2439 ) {
2440 let rt = tokio::runtime::Runtime::new().unwrap();
2441 rt.block_on(async {
2442 let runner: LocalDurableTestRunner<String, String> =
2443 LocalDurableTestRunner::new(|input: String, _ctx: DurableContext| async move {
2444 Ok(input)
2445 });
2446
2447 // Add operations with the given names
2448 {
2449 let mut storage = runner.operation_storage.write().await;
2450 for (i, name) in names.iter().enumerate() {
2451 let mut op = Operation::new(&format!("op-{}", i), OperationType::Step);
2452 op.name = Some(name.clone());
2453 storage.add_operation(op);
2454 }
2455 }
2456
2457 // Property: get_operation_by_index returns correct operation
2458 for (i, _name) in names.iter().enumerate() {
2459 let op = runner.get_operation_by_index(i).await;
2460 prop_assert!(op.is_some());
2461 let expected_id = format!("op-{}", i);
2462 prop_assert_eq!(&op.unwrap().operation_id, &expected_id);
2463 }
2464
2465 // Property: get_operation returns first operation with that name
2466 for name in &names {
2467 let op = runner.get_operation(name).await;
2468 prop_assert!(op.is_some());
2469 prop_assert_eq!(op.as_ref().unwrap().name.as_ref().unwrap(), name);
2470 }
2471
2472 // Property: get_operation_by_id returns correct operation
2473 for i in 0..names.len() {
2474 let op = runner.get_operation_by_id(&format!("op-{}", i)).await;
2475 prop_assert!(op.is_some());
2476 let expected_id = format!("op-{}", i);
2477 prop_assert_eq!(&op.unwrap().operation_id, &expected_id);
2478 }
2479
2480 // Property: get_all_operations returns all operations in order
2481 let all_ops = runner.get_all_operations().await;
2482 prop_assert_eq!(all_ops.len(), names.len());
2483 for (i, op) in all_ops.iter().enumerate() {
2484 let expected_id = format!("op-{}", i);
2485 prop_assert_eq!(&op.operation_id, &expected_id);
2486 }
2487
2488 Ok(())
2489 })?;
2490 }
2491
2492 /// **Feature: rust-testing-utilities, Property 10: Function Registration Retrieval**
2493 ///
2494 /// *For any* function registered with name N, the function SHALL be retrievable
2495 /// by that name.
2496 ///
2497 /// **Validates: Requirements 7.1, 7.2, 7.3**
2498 #[test]
2499 fn prop_function_registration_retrieval(
2500 func_names in prop::collection::vec(function_name_strategy(), 1..=5)
2501 ) {
2502 let rt = tokio::runtime::Runtime::new().unwrap();
2503 rt.block_on(async {
2504 let runner: LocalDurableTestRunner<String, String> =
2505 LocalDurableTestRunner::new(|input: String, _ctx: DurableContext| async move {
2506 Ok(input)
2507 });
2508
2509 // Register functions with the given names
2510 for name in &func_names {
2511 runner.register_function(name.clone(), |_input: serde_json::Value| {
2512 Ok(serde_json::json!({}))
2513 }).await;
2514 }
2515
2516 // Property: all registered functions are retrievable
2517 for name in &func_names {
2518 prop_assert!(
2519 runner.has_registered_function(name).await,
2520 "Function '{}' should be registered",
2521 name
2522 );
2523 }
2524
2525 // Property: count matches number of unique names
2526 let unique_names: std::collections::HashSet<_> = func_names.iter().collect();
2527 prop_assert_eq!(
2528 runner.registered_function_count().await,
2529 unique_names.len()
2530 );
2531
2532 // Property: non-registered functions are not found
2533 prop_assert!(!runner.has_registered_function("__nonexistent__").await);
2534
2535 Ok(())
2536 })?;
2537 }
2538
2539 /// **Feature: rust-testing-utilities, Property 2: Operation Capture Completeness**
2540 ///
2541 /// *For any* sequence of durable operations performed by a handler, all operations
2542 /// SHALL be captured in the TestResult's operations list in execution order.
2543 ///
2544 /// **Validates: Requirements 1.5, 3.5**
2545 #[test]
2546 fn prop_operation_capture_completeness(
2547 num_steps in 1usize..=5,
2548 step_values in prop::collection::vec(1i32..100, 1..=5)
2549 ) {
2550 // Use current_thread runtime which is required for tokio::time::pause()
2551 let rt = tokio::runtime::Builder::new_current_thread()
2552 .enable_all()
2553 .build()
2554 .unwrap();
2555 rt.block_on(async {
2556 // Ensure clean state
2557 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2558 .await
2559 .unwrap();
2560
2561 // Create a handler that performs multiple step operations
2562 let num_steps_to_perform = num_steps.min(step_values.len());
2563 let values = step_values.clone();
2564
2565 let multi_step_handler = move |_input: String, ctx: DurableContext| {
2566 let values = values.clone();
2567 let num = num_steps_to_perform;
2568 async move {
2569 let mut results = Vec::new();
2570 for i in 0..num {
2571 let value = values.get(i).copied().unwrap_or(0);
2572 let step_name = format!("step_{}", i);
2573 // Use step_named to provide a name for each step
2574 let result = ctx.step_named(
2575 &step_name,
2576 |_| Ok(value * 2),
2577 None
2578 ).await?;
2579 results.push(result);
2580 }
2581 Ok::<String, DurableError>(format!("completed {} steps", results.len()))
2582 }
2583 };
2584
2585 let mut runner = LocalDurableTestRunner::new(multi_step_handler);
2586 let result = runner.run("test".to_string()).await.unwrap();
2587
2588 // Property: All operations should be captured
2589 // Each step creates at least one checkpoint call (START and SUCCEED)
2590 // The operations list should contain entries for each step
2591 let operations = result.get_operations();
2592
2593 // We should have captured operations for each step
2594 // Note: The exact count depends on how operations are recorded
2595 // (START + SUCCEED = 2 per step, or just final state = 1 per step)
2596 // The key property is that we have operations for all steps performed
2597 prop_assert!(
2598 !operations.is_empty() || num_steps_to_perform == 0,
2599 "Operations should be captured when steps are performed"
2600 );
2601
2602 // Verify the execution completed successfully
2603 prop_assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
2604
2605 Ok(())
2606 })?;
2607 }
2608
2609 /// **Feature: rust-testing-utilities, Property 4: Time Skipping Acceleration**
2610 ///
2611 /// *For any* wait operation with duration D seconds when time skipping is enabled,
2612 /// the actual wall-clock time to complete the wait SHALL be less than D seconds.
2613 ///
2614 /// Note: This test verifies that time skipping is properly configured and that
2615 /// wait operations don't block for their full duration. Due to the nature of
2616 /// durable execution (waits suspend and resume), we verify that the test
2617 /// infrastructure properly handles time advancement.
2618 ///
2619 /// **Validates: Requirements 2.1, 2.2**
2620 #[test]
2621 fn prop_time_skipping_acceleration(
2622 wait_seconds in 5u64..=60
2623 ) {
2624 // Use current_thread runtime which is required for tokio::time::pause()
2625 let rt = tokio::runtime::Builder::new_current_thread()
2626 .enable_all()
2627 .build()
2628 .unwrap();
2629
2630 rt.block_on(async {
2631 // Ensure clean state - teardown first in case previous test left state
2632 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2633 .await
2634 .unwrap();
2635
2636 LocalDurableTestRunner::<String, String>::setup_test_environment(
2637 TestEnvironmentConfig {
2638 skip_time: true,
2639 checkpoint_delay: None,
2640 }
2641 ).await.unwrap();
2642
2643 // Verify time skipping is enabled
2644 prop_assert!(
2645 LocalDurableTestRunner::<String, String>::is_time_skipping_enabled(),
2646 "Time skipping should be enabled after setup"
2647 );
2648
2649 // Create a handler that performs a wait operation
2650 let wait_duration = wait_seconds;
2651 let wait_handler = move |_input: String, ctx: DurableContext| {
2652 async move {
2653 // Perform a wait operation
2654 ctx.wait(
2655 durable_execution_sdk::Duration::from_seconds(wait_duration),
2656 Some("test_wait")
2657 ).await?;
2658 Ok::<String, DurableError>("wait completed".to_string())
2659 }
2660 };
2661
2662 let mut runner = LocalDurableTestRunner::new(wait_handler);
2663
2664 // Measure wall-clock time for the operation
2665 let start_time = std::time::Instant::now();
2666 let result = runner.run("test".to_string()).await.unwrap();
2667 let elapsed = start_time.elapsed();
2668
2669 // The execution should either:
2670 // 1. Complete quickly (if time skipping works perfectly)
2671 // 2. Return Running status (if wait suspends execution)
2672 // Either way, wall-clock time should be much less than wait_seconds
2673
2674 // Property: Wall-clock time should be significantly less than wait duration
2675 // We use a generous threshold (wait_seconds - 1) to account for test overhead
2676 // but the key is that we're not actually waiting the full duration
2677 let max_allowed_seconds = wait_seconds.saturating_sub(1).max(1);
2678 prop_assert!(
2679 elapsed.as_secs() < max_allowed_seconds,
2680 "Wall-clock time ({:?}) should be less than wait duration ({} seconds). \
2681 Time skipping should prevent actual waiting.",
2682 elapsed,
2683 wait_seconds
2684 );
2685
2686 // The result should either be Running (suspended on wait) or Succeeded
2687 // Both are valid outcomes depending on how the mock handles waits
2688 prop_assert!(
2689 result.get_status() == ExecutionStatus::Running ||
2690 result.get_status() == ExecutionStatus::Succeeded,
2691 "Execution should be Running (suspended) or Succeeded, got {:?}",
2692 result.get_status()
2693 );
2694
2695 // Teardown
2696 LocalDurableTestRunner::<String, String>::teardown_test_environment()
2697 .await
2698 .unwrap();
2699
2700 Ok(())
2701 })?;
2702 }
2703 }
2704}