Skip to main content

LocalDurableTestRunner

Struct LocalDurableTestRunner 

Source
pub struct LocalDurableTestRunner<I, O>
where I: DeserializeOwned + Send + 'static, O: Serialize + DeserializeOwned + Send + 'static,
{ /* private fields */ }
Expand description

Local test runner for durable executions.

Executes durable handlers in-process with a checkpoint server running in a separate thread, allowing rapid iteration during development without AWS deployment.

The runner uses a CheckpointWorkerManager to manage the checkpoint server thread, matching the Node.js SDK’s architecture for consistent cross-SDK behavior.

§Type Parameters

  • H - The handler function type
  • I - The input type (must be deserializable)
  • O - The output type (must be serializable)

§Examples

use durable_execution_sdk_testing::LocalDurableTestRunner;

async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
    let result = ctx.step(|_| Ok(format!("processed: {}", input)), None).await?;
    Ok(result)
}

let mut runner = LocalDurableTestRunner::new(my_workflow);
let result = runner.run("hello".to_string()).await.unwrap();
assert_eq!(result.get_result().unwrap(), "processed: hello");

Implementations§

Source§

impl<I, O> LocalDurableTestRunner<I, O>
where I: DeserializeOwned + Send + Serialize + 'static, O: Serialize + DeserializeOwned + Send + 'static,

Source

pub async fn setup_test_environment( config: TestEnvironmentConfig, ) -> Result<(), TestError>

Sets up the test environment for durable execution testing.

This method should be called once before running tests. It configures time control and other test infrastructure.

§Arguments
  • config - Configuration for the test environment
§Examples
use durable_execution_sdk_testing::{LocalDurableTestRunner, TestEnvironmentConfig};

LocalDurableTestRunner::<String, String>::setup_test_environment(TestEnvironmentConfig {
    skip_time: true,
    checkpoint_delay: None,
}).await.unwrap();
Source

pub async fn teardown_test_environment() -> Result<(), TestError>

Tears down the test environment.

This method should be called after tests complete to restore normal time behavior and clean up test infrastructure.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

LocalDurableTestRunner::<String, String>::teardown_test_environment().await.unwrap();
Source

pub fn is_environment_setup() -> bool

Returns whether the test environment has been set up.

Source

pub fn is_time_skipping_enabled() -> bool

Returns whether time skipping is enabled.

Source

pub fn new<F, Fut>(handler: F) -> Self
where F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<O, DurableError>> + Send + 'static,

Creates a new local test runner with the given handler.

§Arguments
  • handler - An async function that takes input and DurableContext
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
    Ok(input)
}

let runner = LocalDurableTestRunner::new(my_workflow);
Source

pub fn with_mock_client<F, Fut>( handler: F, mock_client: MockDurableServiceClient, ) -> Self
where F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<O, DurableError>> + Send + 'static,

👎Deprecated: Use new() instead. The checkpoint worker manager is now the default.

Creates a new local test runner with a custom mock client.

§Arguments
  • handler - An async function that takes input and DurableContext
  • mock_client - A pre-configured mock client (deprecated, use checkpoint_worker)
Source

pub fn with_checkpoint_params<F, Fut>( handler: F, params: CheckpointWorkerParams, ) -> Self
where F: Fn(I, DurableContext) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<O, DurableError>> + Send + 'static,

Creates a new local test runner with custom checkpoint worker parameters.

§Arguments
  • handler - An async function that takes input and DurableContext
  • params - Configuration parameters for the checkpoint worker
Source

pub fn checkpoint_worker(&self) -> &Arc<CheckpointWorkerManager>

Returns a reference to the checkpoint worker manager.

Source

pub fn mock_client(&self) -> &Arc<MockDurableServiceClient>

👎Deprecated: Use checkpoint_worker() instead.

Returns a reference to the mock client (deprecated).

Source

pub async fn operation_count(&self) -> usize

Returns the number of captured operations.

Source

pub fn run(&mut self, input: impl Into<InvokeRequest<I>>) -> RunFuture<O>
where I: Clone + Send + Sync + 'static, O: Send + 'static,

Executes the handler and returns a RunFuture that resolves to the test result.

Accepts either a raw payload or an InvokeRequest wrapper (via impl Into<InvokeRequest<I>>). The execution is spawned as a tokio task so callers can await the result concurrently with OperationHandle interactions (e.g., mid-execution callbacks).

§Arguments
  • input - A payload or InvokeRequest<I> to pass to the handler. Raw payloads are automatically wrapped via From<I> for InvokeRequest<I>.
§Examples
use durable_execution_sdk_testing::{LocalDurableTestRunner, InvokeRequest};

let mut runner = LocalDurableTestRunner::new(my_workflow);

// With a raw payload (backward compatible)
let result = runner.run("hello".to_string()).await.unwrap();

// With an InvokeRequest
let result = runner.run(InvokeRequest::with_payload("hello".to_string())).await.unwrap();

// Concurrent interaction with operation handles
let handle = runner.get_operation_handle("my-callback");
let run_future = runner.run("input".to_string());
handle.wait_for_data(WaitingOperationStatus::Submitted).await.unwrap();
handle.send_callback_success("result").await.unwrap();
let result = run_future.await.unwrap();
Source

pub async fn run_single_invocation( &mut self, payload: I, ) -> Result<TestResult<O>, TestError>

Executes the handler with a single invocation (no re-invocation on suspend).

This method performs a single handler invocation without using the orchestrator. If the handler suspends (e.g., due to a wait operation), the result will have status Running and the execution will not be automatically resumed.

Use this method when you want to test the initial invocation behavior without automatic wait completion and re-invocation.

§Arguments
  • payload - The input payload to pass to the handler
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(my_workflow_with_wait);
let result = runner.run_single_invocation("hello".to_string()).await.unwrap();

// Handler suspended on wait, execution is still running
assert_eq!(result.get_status(), ExecutionStatus::Running);
Source

pub async fn run_with_orchestrator( &mut self, payload: I, ) -> Result<TestResult<O>, TestError>
where I: Clone,

Executes the handler with the given payload using the TestExecutionOrchestrator.

This method uses the TestExecutionOrchestrator to manage the full execution lifecycle, including:

  • Polling for checkpoint updates
  • Processing wait operations and scheduling re-invocations
  • Handling time skipping for wait operations
  • Managing callback completions

This is the recommended method for testing workflows with wait operations, as it properly handles the full execution lifecycle including re-invocations.

§Arguments
  • payload - The input payload to pass to the handler
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(my_workflow_with_waits);
let result = runner.run_with_orchestrator("hello".to_string()).await.unwrap();

// Wait operations are automatically completed with time skipping
assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
Source

pub fn get_operation_handle(&mut self, name: &str) -> OperationHandle

Returns a lazy OperationHandle that populates when an operation matching the given name is created during execution.

§Arguments
  • name - The operation name to match against
§Examples
let handle = runner.get_operation_handle("my-callback");
// handle is unpopulated until run() executes and produces a matching operation
Source

pub fn get_operation_handle_by_index(&mut self, index: usize) -> OperationHandle

Returns a lazy OperationHandle that populates with the operation at the given execution order index.

§Arguments
  • index - The zero-based execution order index
§Examples
let handle = runner.get_operation_handle_by_index(0);
// handle populates with the first operation created during execution
Source

pub fn get_operation_handle_by_id(&mut self, id: &str) -> OperationHandle

Returns a lazy OperationHandle that populates with the operation matching the given unique ID.

§Arguments
  • id - The unique operation ID to match against
§Examples
let handle = runner.get_operation_handle_by_id("op-abc-123");
// handle populates with the operation whose ID matches during execution
Source

pub async fn reset(&mut self)

Resets the test runner state for a fresh test run.

This method clears all captured operations and resets the checkpoint server state, allowing the runner to be reused for multiple test scenarios.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(my_workflow);

// First test run
let result1 = runner.run("input1".to_string()).await.unwrap();

// Reset for fresh state
runner.reset().await;

// Second test run with clean state
let result2 = runner.run("input2".to_string()).await.unwrap();
Source

pub async fn get_operation_by_id(&self, id: &str) -> Option<Operation>

Gets an operation by its unique ID.

§Arguments
  • id - The unique operation ID
§Returns

The operation if found, or None if no operation with that ID exists.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();

if let Some(op) = runner.get_operation_by_id("op-123").await {
    println!("Found operation: {:?}", op);
}
Source

pub async fn get_operation(&self, name: &str) -> Option<Operation>

Gets the first operation with the given name.

§Arguments
  • name - The operation name to search for
§Returns

The first operation with that name, or None if no operation with that name exists.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();

if let Some(op) = runner.get_operation("process_data").await {
    println!("Found operation: {:?}", op);
}
Source

pub async fn get_operation_by_index(&self, index: usize) -> Option<Operation>

Gets an operation by its index in the execution order.

§Arguments
  • index - The zero-based index of the operation
§Returns

The operation at that index, or None if the index is out of bounds.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();

// Get the first operation
if let Some(op) = runner.get_operation_by_index(0).await {
    println!("First operation: {:?}", op);
}
Source

pub async fn get_operation_by_name_and_index( &self, name: &str, index: usize, ) -> Option<Operation>

Gets an operation by name and occurrence index.

This is useful when multiple operations have the same name and you need to access a specific occurrence.

§Arguments
  • name - The operation name to search for
  • index - The zero-based index among operations with that name
§Returns

The operation at that name/index combination, or None if not found.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();

// Get the second "process" operation
if let Some(op) = runner.get_operation_by_name_and_index("process", 1).await {
    println!("Second process operation: {:?}", op);
}
Source

pub async fn get_all_operations(&self) -> Vec<Operation>

Gets all captured operations.

§Returns

A vector of all operations in execution order.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(my_workflow);
let _ = runner.run("input".to_string()).await.unwrap();

let all_ops = runner.get_all_operations().await;
println!("Total operations: {}", all_ops.len());
Source

pub async fn register_durable_function<F, Fut>( &self, name: impl Into<String>, func: F, )
where F: Fn(Value, DurableContext) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Value, DurableError>> + Send + 'static,

Registers a durable function for invoke testing.

Durable functions receive a DurableContext and can perform durable operations. When the main handler invokes a function by name, the registered function will be called.

§Arguments
  • name - The name to register the function under
  • func - The durable function to register
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

async fn helper_workflow(input: serde_json::Value, ctx: DurableContext) -> Result<serde_json::Value, DurableError> {
    Ok(serde_json::json!({"processed": true}))
}

let mut runner = LocalDurableTestRunner::new(main_workflow);
runner.register_durable_function("helper", helper_workflow).await;
Source

pub async fn register_function<F>(&self, name: impl Into<String>, func: F)
where F: Fn(Value) -> Result<Value, DurableError> + Send + Sync + 'static,

Registers a regular (non-durable) function for invoke testing.

Regular functions do not receive a DurableContext and cannot perform durable operations. They are useful for testing simple helper functions.

§Arguments
  • name - The name to register the function under
  • func - The regular function to register
§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

fn simple_helper(input: serde_json::Value) -> Result<serde_json::Value, DurableError> {
    Ok(serde_json::json!({"result": "done"}))
}

let mut runner = LocalDurableTestRunner::new(main_workflow);
runner.register_function("simple_helper", simple_helper).await;
Source

pub async fn has_registered_function(&self, name: &str) -> bool

Gets a registered function by name.

§Arguments
  • name - The name of the function to retrieve
§Returns

true if a function with that name is registered, false otherwise.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let runner = LocalDurableTestRunner::new(main_workflow);
runner.register_function("helper", |_| Ok(serde_json::json!({}))).await;

assert!(runner.has_registered_function("helper").await);
assert!(!runner.has_registered_function("nonexistent").await);
Source

pub async fn registered_function_count(&self) -> usize

Gets the count of registered functions.

§Returns

The number of registered functions.

Source

pub async fn clear_registered_functions(&mut self)

Clears all registered functions.

§Examples
use durable_execution_sdk_testing::LocalDurableTestRunner;

let mut runner = LocalDurableTestRunner::new(main_workflow);
runner.register_function("helper", |_| Ok(serde_json::json!({}))).await;
assert_eq!(runner.registered_function_count().await, 1);

runner.clear_registered_functions().await;
assert_eq!(runner.registered_function_count().await, 0);

Trait Implementations§

Source§

impl<I, O> Debug for LocalDurableTestRunner<I, O>
where I: DeserializeOwned + Send + 'static, O: Serialize + DeserializeOwned + Send + 'static,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<I, O> Freeze for LocalDurableTestRunner<I, O>

§

impl<I, O> !RefUnwindSafe for LocalDurableTestRunner<I, O>

§

impl<I, O> Send for LocalDurableTestRunner<I, O>

§

impl<I, O> Sync for LocalDurableTestRunner<I, O>
where I: Sync, O: Sync,

§

impl<I, O> Unpin for LocalDurableTestRunner<I, O>
where I: Unpin, O: Unpin,

§

impl<I, O> UnsafeUnpin for LocalDurableTestRunner<I, O>

§

impl<I, O> !UnwindSafe for LocalDurableTestRunner<I, O>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

Source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more