AWS Durable Execution SDK Testing Utilities
Testing utilities for the AWS Durable Execution SDK for Rust. This crate provides tools to test durable functions locally and against deployed AWS Lambda functions.
Features
- LocalDurableTestRunner - Execute and test durable functions in-process with a simulated checkpoint backend
- CloudDurableTestRunner - Test against deployed Lambda functions in AWS
- MockDurableServiceClient - Mock checkpoint client for unit testing
- Time Control - Skip wait operations for faster test execution
- Operation Inspection - Inspect individual operations by name, index, or ID
Installation
Add the testing crate as a dev dependency in your Cargo.toml:
[dev-dependencies]
aws-durable-execution-sdk-testing = "0.1.0-alpha1"
Quick Start
Local Testing
use aws_durable_execution_sdk::{durable_execution, DurableContext, DurableError, Duration};
use aws_durable_execution_sdk_testing::{
LocalDurableTestRunner, TestEnvironmentConfig, ExecutionStatus,
};
async fn my_workflow(input: String, ctx: DurableContext) -> Result<String, DurableError> {
let result = ctx.step(|_| Ok(format!("processed: {}", input)), None).await?;
ctx.wait(Duration::from_seconds(5), Some("delay")).await?;
Ok(result)
}
#[tokio::test]
async fn test_workflow() {
LocalDurableTestRunner::<String, String>::setup_test_environment(
TestEnvironmentConfig {
skip_time: true,
checkpoint_delay: None,
}
).await.unwrap();
let mut runner = LocalDurableTestRunner::new(my_workflow);
let result = runner.run("hello".to_string()).await.unwrap();
assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
assert_eq!(result.get_result().unwrap(), "processed: hello");
let ops = result.get_operations();
assert_eq!(ops.len(), 2);
LocalDurableTestRunner::<String, String>::teardown_test_environment().await.unwrap();
}
Cloud Testing
use aws_durable_execution_sdk_testing::{
CloudDurableTestRunner, CloudTestRunnerConfig, ExecutionStatus,
};
use std::time::Duration;
#[tokio::test]
async fn test_deployed_workflow() {
let mut runner = CloudDurableTestRunner::<String>::new("my-function-name")
.await
.unwrap()
.with_config(CloudTestRunnerConfig {
poll_interval: Duration::from_millis(500),
timeout: Duration::from_secs(60),
});
let result = runner.run("input".to_string()).await.unwrap();
assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
}
API Reference
LocalDurableTestRunner
The primary component for local testing. Executes durable handlers in-process with a simulated checkpoint backend.
use aws_durable_execution_sdk_testing::{LocalDurableTestRunner, TestEnvironmentConfig};
let mut runner = LocalDurableTestRunner::new(my_workflow);
let mock_client = MockDurableServiceClient::new()
.with_checkpoint_responses(10);
let mut runner = LocalDurableTestRunner::with_mock_client(my_workflow, mock_client);
let result = runner.run(input).await?;
runner.reset().await;
Test Environment Setup
LocalDurableTestRunner::<I, O>::setup_test_environment(TestEnvironmentConfig {
skip_time: true, checkpoint_delay: None, }).await?;
LocalDurableTestRunner::<I, O>::teardown_test_environment().await?;
Operation Lookup
let op = runner.get_operation("my-step").await;
let op = runner.get_operation_by_index(0).await;
let op = runner.get_operation_by_name_and_index("my-step", 1).await;
let op = runner.get_operation_by_id("op-123").await;
let ops = runner.get_all_operations().await;
Function Registration (for Chained Invokes)
runner.register_durable_function("helper", |input, ctx| async move {
Ok(serde_json::json!({"result": "done"}))
}).await;
runner.register_function("simple_helper", |input| {
Ok(serde_json::json!({"result": "done"}))
}).await;
CloudDurableTestRunner
For integration testing against deployed Lambda functions.
use aws_durable_execution_sdk_testing::{
CloudDurableTestRunner, CloudTestRunnerConfig,
};
use aws_sdk_lambda::Client as LambdaClient;
let runner = CloudDurableTestRunner::<String>::new("function-name").await?;
let custom_client = LambdaClient::new(&aws_config);
let runner = CloudDurableTestRunner::<String>::with_client("function-name", custom_client);
let runner = runner.with_config(CloudTestRunnerConfig {
poll_interval: Duration::from_millis(500),
timeout: Duration::from_secs(60),
});
let result = runner.run(payload).await?;
TestResult
Contains execution results and provides inspection methods.
use aws_durable_execution_sdk_testing::{TestResult, ExecutionStatus, PrintConfig};
use aws_durable_execution_sdk::OperationStatus;
let status = result.get_status();
assert_eq!(status, ExecutionStatus::Succeeded);
let value = result.get_result()?;
let error = result.get_error()?;
let ops = result.get_operations();
let succeeded = result.get_operations_by_status(OperationStatus::Succeeded);
let failed = result.get_operations_by_status(OperationStatus::Failed);
let invocations = result.get_invocations();
let events = result.get_history_events();
result.print();
result.print_with_config(PrintConfig::all());
result.print_with_config(PrintConfig::minimal());
ExecutionStatus
pub enum ExecutionStatus {
Running, Succeeded, Failed, Cancelled, TimedOut, }
DurableOperation
Represents a single operation with type-specific inspection methods.
use aws_durable_execution_sdk_testing::{DurableOperation, WaitingOperationStatus};
use aws_durable_execution_sdk::OperationType;
let id = op.get_id();
let name = op.get_name();
let op_type = op.get_type();
let status = op.get_status();
let start_time = op.get_start_timestamp();
let end_time = op.get_end_timestamp();
if op.is_callback() {
}
if op.is_completed() {
}
Type-Specific Details
if op.get_type() == OperationType::Step {
let details = op.get_step_details::<MyResultType>()?;
println!("Attempt: {:?}", details.attempt);
println!("Result: {:?}", details.result);
println!("Error: {:?}", details.error);
}
if op.get_type() == OperationType::Wait {
let details = op.get_wait_details()?;
println!("Wait seconds: {:?}", details.wait_seconds);
println!("Scheduled end: {:?}", details.scheduled_end_timestamp);
}
if op.get_type() == OperationType::Callback {
let details = op.get_callback_details::<MyResultType>()?;
println!("Callback ID: {:?}", details.callback_id);
println!("Result: {:?}", details.result);
}
if op.get_type() == OperationType::Invoke {
let details = op.get_invoke_details::<MyResultType>()?;
println!("Result: {:?}", details.result);
}
if op.get_type() == OperationType::Context {
let details = op.get_context_details::<MyResultType>()?;
println!("Result: {:?}", details.result);
}
Callback Interaction
op.send_callback_success(r#"{"status": "approved"}"#).await?;
op.send_callback_failure(&TestResultError::new("Rejected", "Request denied")).await?;
op.send_callback_heartbeat().await?;
Async Waiting
op.wait_for_data(WaitingOperationStatus::Started).await?;
op.wait_for_data(WaitingOperationStatus::Completed).await?;
op.wait_for_data(WaitingOperationStatus::Submitted).await?;
MockDurableServiceClient
A mock implementation of the checkpoint client for unit testing.
use aws_durable_execution_sdk_testing::{MockDurableServiceClient, DurableServiceClient};
use aws_durable_execution_sdk::{CheckpointResponse, Operation, OperationType, DurableError};
let client = MockDurableServiceClient::new();
let client = MockDurableServiceClient::new()
.with_checkpoint_response(Ok(CheckpointResponse::new("token-1")))
.with_checkpoint_response(Ok(CheckpointResponse::new("token-2")))
.with_checkpoint_response(Err(DurableError::checkpoint_retriable("Temp error")));
let client = MockDurableServiceClient::new()
.with_checkpoint_responses(10);
let op = Operation::new("callback-1", OperationType::Callback);
let client = MockDurableServiceClient::new()
.with_checkpoint_response_with_operations("token-1", vec![op]);
let calls = client.get_checkpoint_calls();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].checkpoint_token, "token-123");
client.clear_checkpoint_calls();
client.clear_all_calls();
Error Handling
use aws_durable_execution_sdk_testing::TestError;
match result {
Err(TestError::ExecutionFailed(e)) => {
}
Err(TestError::OperationNotFound(name)) => {
}
Err(TestError::OperationTypeMismatch { expected, found }) => {
}
Err(TestError::NotCallbackOperation) => {
}
Err(TestError::FunctionNotRegistered(name)) => {
}
Err(TestError::WaitTimeout(op)) => {
}
Err(TestError::ExecutionCompletedEarly(op, status)) => {
}
Err(TestError::SerializationError(e)) => {
}
Err(TestError::AwsError(msg)) => {
}
Err(TestError::EnvironmentNotSetUp) => {
}
_ => {}
}
Testing Patterns
Testing Workflows with Steps
#[tokio::test]
async fn test_multi_step_workflow() {
LocalDurableTestRunner::<_, _>::setup_test_environment(
TestEnvironmentConfig::default()
).await.unwrap();
let mut runner = LocalDurableTestRunner::new(my_workflow);
let result = runner.run(input).await.unwrap();
let ops = result.get_operations();
let succeeded = result.get_operations_by_status(OperationStatus::Succeeded);
assert_eq!(succeeded.len(), ops.len());
if let Some(op) = runner.get_operation("process-data").await {
let details = DurableOperation::new(op).get_step_details::<MyResult>().unwrap();
assert!(details.result.is_some());
}
LocalDurableTestRunner::<_, _>::teardown_test_environment().await.unwrap();
}
Testing Callbacks
#[tokio::test]
async fn test_callback_workflow() {
LocalDurableTestRunner::<_, _>::setup_test_environment(
TestEnvironmentConfig::default()
).await.unwrap();
let mut runner = LocalDurableTestRunner::new(approval_workflow);
let result = runner.run(request).await.unwrap();
if let Some(op) = runner.get_operation("approval-callback").await {
let durable_op = DurableOperation::new(op);
durable_op.send_callback_success(r#"{"approved": true}"#).await.unwrap();
}
LocalDurableTestRunner::<_, _>::teardown_test_environment().await.unwrap();
}
Testing Error Handling
#[tokio::test]
async fn test_error_handling() {
LocalDurableTestRunner::<_, _>::setup_test_environment(
TestEnvironmentConfig::default()
).await.unwrap();
let mut runner = LocalDurableTestRunner::new(failing_workflow);
let result = runner.run(bad_input).await.unwrap();
assert_eq!(result.get_status(), ExecutionStatus::Failed);
let error = result.get_error().unwrap();
assert_eq!(error.error_type, Some("ValidationError".to_string()));
LocalDurableTestRunner::<_, _>::teardown_test_environment().await.unwrap();
}
Testing with Chained Invokes
#[tokio::test]
async fn test_chained_invokes() {
LocalDurableTestRunner::<_, _>::setup_test_environment(
TestEnvironmentConfig::default()
).await.unwrap();
let mut runner = LocalDurableTestRunner::new(orchestrator_workflow);
runner.register_durable_function("process-item", process_item_workflow).await;
runner.register_function("validate", validate_fn).await;
let result = runner.run(items).await.unwrap();
assert_eq!(result.get_status(), ExecutionStatus::Succeeded);
LocalDurableTestRunner::<_, _>::teardown_test_environment().await.unwrap();
}
License
This project is licensed under the Apache-2.0 License.