pub struct LocalDurableTestRunner<I, O>{ /* private fields */ }Expand description
Local test runner for durable executions.
Executes durable handlers in-process with a checkpoint server running in a separate thread, allowing rapid iteration during development without AWS deployment.
The runner uses a CheckpointWorkerManager to manage the checkpoint server thread, matching the Node.js SDK’s architecture for consistent cross-SDK behavior.
§Type Parameters
H- The handler function typeI- The input type (must be deserializable)O- The output type (must be serializable)
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
let result = ctx.step(|_| Ok(format!("processed: {}", input)), None).await?;
Ok(result)
}
let mut runner = LocalDurableTestRunner::new(my_workflow);
let result = runner.run("hello".to_string()).await.unwrap();
assert_eq!(result.get_result().unwrap(), "processed: hello");Implementations§
Source§impl<I, O> LocalDurableTestRunner<I, O>where
I: DeserializeOwned + Send + Serialize + 'static,
O: Serialize + DeserializeOwned + Send + 'static,
impl<I, O> LocalDurableTestRunner<I, O>where
I: DeserializeOwned + Send + Serialize + 'static,
O: Serialize + DeserializeOwned + Send + 'static,
Sourcepub async fn setup_test_environment(
config: TestEnvironmentConfig,
) -> Result<(), TestError>
pub async fn setup_test_environment( config: TestEnvironmentConfig, ) -> Result<(), TestError>
Sets up the test environment for durable execution testing.
This method should be called once before running tests. It configures time control and other test infrastructure.
§Arguments
config- Configuration for the test environment
§Examples
use durable_execution_sdk_testing::{LocalDurableTestRunner, TestEnvironmentConfig};
LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
skip_time: true,
checkpoint_delay: None,
}).await.unwrap();Sourcepub async fn teardown_test_environment() -> Result<(), TestError>
pub async fn teardown_test_environment() -> Result<(), TestError>
Tears down the test environment.
This method should be called after tests complete to restore normal time behavior and clean up test infrastructure.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
LocalDurableTestRunner::<String, String>::teardown_test_environment().await.unwrap();Sourcepub fn is_environment_setup() -> bool
pub fn is_environment_setup() -> bool
Returns whether the test environment has been set up.
Sourcepub fn is_time_skipping_enabled() -> bool
pub fn is_time_skipping_enabled() -> bool
Returns whether time skipping is enabled.
Sourcepub fn new<F, Fut>(handler: F) -> Selfwhere
F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
pub fn new<F, Fut>(handler: F) -> Selfwhere
F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
Creates a new local test runner with the given handler.
§Arguments
handler- An async function that takes input and DurableContext
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
Ok(input)
}
let runner = LocalDurableTestRunner::new(my_workflow);Sourcepub fn with_mock_client<F, Fut>(
handler: F,
mock_client: MockDurableServiceClient,
) -> Selfwhere
F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
👎Deprecated: Use new() instead. The checkpoint worker manager is now the default.
pub fn with_mock_client<F, Fut>(
handler: F,
mock_client: MockDurableServiceClient,
) -> Selfwhere
F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
Creates a new local test runner with a custom mock client.
§Arguments
handler- An async function that takes input and DurableContextmock_client- A pre-configured mock client (deprecated, use checkpoint_worker)
Sourcepub fn with_checkpoint_params<F, Fut>(
handler: F,
params: CheckpointWorkerParams,
) -> Selfwhere
F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
pub fn with_checkpoint_params<F, Fut>(
handler: F,
params: CheckpointWorkerParams,
) -> Selfwhere
F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<O, DurableError>> + Send + 'static,
Creates a new local test runner with custom checkpoint worker parameters.
§Arguments
handler- An async function that takes input and DurableContextparams- Configuration parameters for the checkpoint worker
Sourcepub fn checkpoint_worker(&self) -> &Arc<CheckpointWorkerManager>
pub fn checkpoint_worker(&self) -> &Arc<CheckpointWorkerManager>
Returns a reference to the checkpoint worker manager.
Sourcepub fn mock_client(&self) -> &Arc<MockDurableServiceClient>
👎Deprecated: Use checkpoint_worker() instead.
pub fn mock_client(&self) -> &Arc<MockDurableServiceClient>
Returns a reference to the mock client (deprecated).
Sourcepub async fn operation_count(&self) -> usize
pub async fn operation_count(&self) -> usize
Returns the number of captured operations.
Sourcepub fn run(&mut self, input: impl Into<InvokeRequest<I>>) -> RunFuture<O> ⓘ
pub fn run(&mut self, input: impl Into<InvokeRequest<I>>) -> RunFuture<O> ⓘ
Executes the handler and returns a RunFuture that resolves to the test result.
Accepts either a raw payload or an InvokeRequest wrapper (via impl Into<InvokeRequest<I>>).
The execution is spawned as a tokio task so callers can await the result
concurrently with OperationHandle interactions (e.g., mid-execution callbacks).
§Arguments
input- A payload orInvokeRequest<I>to pass to the handler. Raw payloads are automatically wrapped viaFrom<I> for InvokeRequest<I>.
§Examples
use durable_execution_sdk_testing::{LocalDurableTestRunner, InvokeRequest};
let mut runner = LocalDurableTestRunner::new(my_workflow);
// With a raw payload (backward compatible)
let result = runner.run("hello".to_string()).await.unwrap();
// With an InvokeRequest
let result = runner.run(InvokeRequest::with_payload("hello".to_string())).await.unwrap();
// Concurrent interaction with operation handles
let handle = runner.get_operation_handle("my-callback");
let run_future = runner.run("input".to_string());
handle.wait_for_data(WaitingOperationStatus::Submitted).await.unwrap();
handle.send_callback_success("result").await.unwrap();
let result = run_future.await.unwrap();Sourcepub async fn run_single_invocation(
&mut self,
payload: I,
) -> Result<TestResult<O>, TestError>
pub async fn run_single_invocation( &mut self, payload: I, ) -> Result<TestResult<O>, TestError>
Executes the handler with a single invocation (no re-invocation on suspend).
This method performs a single handler invocation without using the orchestrator.
If the handler suspends (e.g., due to a wait operation), the result will have
status Running and the execution will not be automatically resumed.
Use this method when you want to test the initial invocation behavior without automatic wait completion and re-invocation.
§Arguments
payload- The input payload to pass to the handler
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(my_workflow_with_wait);
let result = runner.run_single_invocation("hello".to_string()).await.unwrap();
// Handler suspended on wait, execution is still running
assert_eq!(result.get_status(), ExecutionStatus::Running);Sourcepub async fn run_with_orchestrator(
&mut self,
payload: I,
) -> Result<TestResult<O>, TestError>where
I: Clone,
pub async fn run_with_orchestrator(
&mut self,
payload: I,
) -> Result<TestResult<O>, TestError>where
I: Clone,
Executes the handler with the given payload using the TestExecutionOrchestrator.
This method uses the TestExecutionOrchestrator to manage the full execution lifecycle, including:
- Polling for checkpoint updates
- Processing wait operations and scheduling re-invocations
- Handling time skipping for wait operations
- Managing callback completions
This is the recommended method for testing workflows with wait operations, as it properly handles the full execution lifecycle including re-invocations.
§Arguments
payload- The input payload to pass to the handler
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(my_workflow_with_waits);
let result = runner.run_with_orchestrator("hello".to_string()).await.unwrap();
// Wait operations are automatically completed with time skipping
assert_eq!(result.get_status(), ExecutionStatus::Succeeded);Sourcepub fn get_operation_handle(&mut self, name: &str) -> OperationHandle
pub fn get_operation_handle(&mut self, name: &str) -> OperationHandle
Returns a lazy OperationHandle that populates when an operation
matching the given name is created during execution.
§Arguments
name- The operation name to match against
§Examples
let handle = runner.get_operation_handle("my-callback");
// handle is unpopulated until run() executes and produces a matching operationSourcepub fn get_operation_handle_by_index(&mut self, index: usize) -> OperationHandle
pub fn get_operation_handle_by_index(&mut self, index: usize) -> OperationHandle
Sourcepub fn get_operation_handle_by_id(&mut self, id: &str) -> OperationHandle
pub fn get_operation_handle_by_id(&mut self, id: &str) -> OperationHandle
Sourcepub async fn reset(&mut self)
pub async fn reset(&mut self)
Resets the test runner state for a fresh test run.
This method clears all captured operations and resets the checkpoint server state, allowing the runner to be reused for multiple test scenarios.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(my_workflow);
// First test run
let result1 = runner.run("input1".to_string()).await.unwrap();
// Reset for fresh state
runner.reset().await;
// Second test run with clean state
let result2 = runner.run("input2".to_string()).await.unwrap();Sourcepub async fn get_operation_by_id(&self, id: &str) -> Option<Operation>
pub async fn get_operation_by_id(&self, id: &str) -> Option<Operation>
Gets an operation by its unique ID.
§Arguments
id- The unique operation ID
§Returns
The operation if found, or None if no operation with that ID exists.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();
if let Some(op) = runner.get_operation_by_id("op-123").await {
println!("Found operation: {:?}", op);
}Sourcepub async fn get_operation(&self, name: &str) -> Option<Operation>
pub async fn get_operation(&self, name: &str) -> Option<Operation>
Gets the first operation with the given name.
§Arguments
name- The operation name to search for
§Returns
The first operation with that name, or None if no operation with that name exists.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();
if let Some(op) = runner.get_operation("process_data").await {
println!("Found operation: {:?}", op);
}Sourcepub async fn get_operation_by_index(&self, index: usize) -> Option<Operation>
pub async fn get_operation_by_index(&self, index: usize) -> Option<Operation>
Gets an operation by its index in the execution order.
§Arguments
index- The zero-based index of the operation
§Returns
The operation at that index, or None if the index is out of bounds.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();
// Get the first operation
if let Some(op) = runner.get_operation_by_index(0).await {
println!("First operation: {:?}", op);
}Sourcepub async fn get_operation_by_name_and_index(
&self,
name: &str,
index: usize,
) -> Option<Operation>
pub async fn get_operation_by_name_and_index( &self, name: &str, index: usize, ) -> Option<Operation>
Gets an operation by name and occurrence index.
This is useful when multiple operations have the same name and you need to access a specific occurrence.
§Arguments
name- The operation name to search forindex- The zero-based index among operations with that name
§Returns
The operation at that name/index combination, or None if not found.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();
// Get the second "process" operation
if let Some(op) = runner.get_operation_by_name_and_index("process", 1).await {
println!("Second process operation: {:?}", op);
}Sourcepub async fn get_all_operations(&self) -> Vec<Operation>
pub async fn get_all_operations(&self) -> Vec<Operation>
Gets all captured operations.
§Returns
A vector of all operations in execution order.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();
let all_ops = runner.get_all_operations().await;
println!("Total operations: {}", all_ops.len());Sourcepub async fn register_durable_function<F, Fut>(
&self,
name: impl Into<String>,
func: F,
)where
F: Fn(Value, DurableContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Value, DurableError>> + Send + 'static,
pub async fn register_durable_function<F, Fut>(
&self,
name: impl Into<String>,
func: F,
)where
F: Fn(Value, DurableContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Value, DurableError>> + Send + 'static,
Registers a durable function for invoke testing.
Durable functions receive a DurableContext and can perform durable operations.
When the main handler invokes a function by name, the registered function
will be called.
§Arguments
name- The name to register the function underfunc- The durable function to register
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
async fn helper_workflow(input: serde_json::Value, ctx: DurableContext) -> Result<serde_json::Value, DurableError> {
Ok(serde_json::json!({"processed": true}))
}
let mut runner = LocalDurableTestRunner::new(main_workflow);
runner.register_durable_function("helper", helper_workflow).await;Sourcepub async fn register_function<F>(&self, name: impl Into<String>, func: F)
pub async fn register_function<F>(&self, name: impl Into<String>, func: F)
Registers a regular (non-durable) function for invoke testing.
Regular functions do not receive a DurableContext and cannot perform
durable operations. They are useful for testing simple helper functions.
§Arguments
name- The name to register the function underfunc- The regular function to register
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
fn simple_helper(input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
Ok(serde_json::json!({"result": "done"}))
}
let mut runner = LocalDurableTestRunner::new(main_workflow);
runner.register_function("simple_helper", simple_helper).await;Sourcepub async fn has_registered_function(&self, name: &str) -> bool
pub async fn has_registered_function(&self, name: &str) -> bool
Gets a registered function by name.
§Arguments
name- The name of the function to retrieve
§Returns
true if a function with that name is registered, false otherwise.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let runner = LocalDurableTestRunner::new(main_workflow);
runner.register_function("helper", |_| Ok(serde_json::json!({}))).await;
assert!(runner.has_registered_function("helper").await);
assert!(!runner.has_registered_function("nonexistent").await);Sourcepub async fn registered_function_count(&self) -> usize
pub async fn registered_function_count(&self) -> usize
Sourcepub async fn clear_registered_functions(&mut self)
pub async fn clear_registered_functions(&mut self)
Clears all registered functions.
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;
let mut runner = LocalDurableTestRunner::new(main_workflow);
runner.register_function("helper", |_| Ok(serde_json::json!({}))).await;
assert_eq!(runner.registered_function_count().await, 1);
runner.clear_registered_functions().await;
assert_eq!(runner.registered_function_count().await, 0);Trait Implementations§
Source§impl<I, O> Debug for LocalDurableTestRunner<I, O>
impl<I, O> Debug for LocalDurableTestRunner<I, O>
Auto Trait Implementations§
impl<I, O> Freeze for LocalDurableTestRunner<I, O>
impl<I, O> !RefUnwindSafe for LocalDurableTestRunner<I, O>
impl<I, O> Send for LocalDurableTestRunner<I, O>
impl<I, O> Sync for LocalDurableTestRunner<I, O>
impl<I, O> Unpin for LocalDurableTestRunner<I, O>
impl<I, O> UnsafeUnpin for LocalDurableTestRunner<I, O>
impl<I, O> !UnwindSafe for LocalDurableTestRunner<I, O>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more