pub struct Simulator { /* private fields */ }Expand description
A running Lambda runtime simulator.
The simulator provides an HTTP server that implements both the Lambda Runtime API and Extensions API, allowing Lambda runtimes and extensions to be tested locally.
Implementations§
Source§impl Simulator
impl Simulator
Sourcepub fn builder() -> SimulatorBuilder
pub fn builder() -> SimulatorBuilder
Creates a new simulator builder.
§Examples
use lambda_simulator::Simulator;
let simulator = Simulator::builder()
.function_name("my-function")
.build()
.await?;Sourcepub fn runtime_api_url(&self) -> String
pub fn runtime_api_url(&self) -> String
Returns the base URL for the Runtime API.
This should be set as the AWS_LAMBDA_RUNTIME_API environment variable
for Lambda runtimes.
§Examples
use lambda_simulator::Simulator;
let simulator = Simulator::builder().build().await?;
let runtime_api_url = simulator.runtime_api_url();
println!("Set AWS_LAMBDA_RUNTIME_API={}", runtime_api_url);Sourcepub fn addr(&self) -> SocketAddr
pub fn addr(&self) -> SocketAddr
Returns the socket address the simulator is listening on.
Sourcepub async fn enqueue(&self, invocation: Invocation) -> String
pub async fn enqueue(&self, invocation: Invocation) -> String
Enqueues an invocation for processing.
If process freezing is enabled and the process is currently frozen, this method will unfreeze (SIGCONT) the process before enqueuing the invocation.
§Arguments
invocation- The invocation to enqueue
§Returns
The request ID of the enqueued invocation.
§Examples
use lambda_simulator::{Simulator, InvocationBuilder};
use serde_json::json;
let simulator = Simulator::builder().build().await?;
let invocation = InvocationBuilder::new()
.payload(json!({"key": "value"}))
.build()?;
let request_id = simulator.enqueue(invocation).await;Sourcepub async fn enqueue_payload(&self, payload: Value) -> String
pub async fn enqueue_payload(&self, payload: Value) -> String
Enqueues a simple invocation with just a payload.
§Arguments
payload- The JSON payload for the invocation
§Returns
The request ID of the enqueued invocation.
§Examples
use lambda_simulator::Simulator;
use serde_json::json;
let simulator = Simulator::builder().build().await?;
let request_id = simulator.enqueue_payload(json!({"key": "value"})).await;Sourcepub async fn get_invocation_state(
&self,
request_id: &str,
) -> Option<InvocationState>
pub async fn get_invocation_state( &self, request_id: &str, ) -> Option<InvocationState>
Sourcepub async fn get_all_invocation_states(&self) -> Vec<InvocationState>
pub async fn get_all_invocation_states(&self) -> Vec<InvocationState>
Gets all invocation states.
Sourcepub async fn is_initialized(&self) -> bool
pub async fn is_initialized(&self) -> bool
Checks if the runtime has been initialized.
Sourcepub async fn get_init_error(&self) -> Option<String>
pub async fn get_init_error(&self) -> Option<String>
Gets the initialization error if one occurred.
Sourcepub async fn get_registered_extensions(&self) -> Vec<RegisteredExtension>
pub async fn get_registered_extensions(&self) -> Vec<RegisteredExtension>
Gets all registered extensions.
§Returns
A list of all extensions that have registered with the simulator.
Sourcepub async fn extension_count(&self) -> usize
pub async fn extension_count(&self) -> usize
Gets the number of registered extensions.
Sourcepub async fn shutdown(self)
pub async fn shutdown(self)
Shuts down the simulator immediately without waiting for extensions.
This will unfreeze any frozen process, abort the HTTP server, clean up background telemetry tasks, and wait for everything to finish.
For graceful shutdown that allows extensions to complete cleanup work,
use graceful_shutdown instead.
Sourcepub async fn graceful_shutdown(self, reason: ShutdownReason)
pub async fn graceful_shutdown(self, reason: ShutdownReason)
Gracefully shuts down the simulator, allowing extensions to complete cleanup.
This method performs a graceful shutdown sequence that matches Lambda’s actual behavior:
- Unfreezes the runtime process (if frozen)
- Transitions to
ShuttingDownphase - Broadcasts a
SHUTDOWNevent to all subscribed extensions - Waits for extensions to acknowledge by polling
/next - Cleans up resources and stops the HTTP server
§Arguments
reason- The reason for shutdown (Spindown, Timeout, or Failure)
§Extension Behavior
Extensions subscribed to SHUTDOWN events will receive the event when they
poll /next. After receiving the SHUTDOWN event, extensions should perform
their cleanup work (e.g., flushing buffers, sending final telemetry batches)
and then poll /next again to signal completion.
If extensions don’t complete within the configured shutdown_timeout,
the simulator proceeds with shutdown anyway.
§Examples
use lambda_simulator::{Simulator, ShutdownReason};
let simulator = Simulator::builder().build().await?;
// ... run invocations ...
// Graceful shutdown with SPINDOWN reason
simulator.graceful_shutdown(ShutdownReason::Spindown).await;Sourcepub async fn wait_for_invocation_complete(
&self,
request_id: &str,
timeout: Duration,
) -> SimulatorResult<InvocationState>
pub async fn wait_for_invocation_complete( &self, request_id: &str, timeout: Duration, ) -> SimulatorResult<InvocationState>
Waits for an invocation to reach a terminal state (Success, Error, or Timeout).
This method uses efficient event-driven waiting instead of polling, making tests more reliable and faster.
§Arguments
request_id- The request ID to wait fortimeout- Maximum duration to wait
§Returns
The final invocation state
§Errors
Returns SimulatorError::Timeout if the invocation doesn’t complete within the timeout,
or SimulatorError::InvocationNotFound if the request ID doesn’t exist.
§Examples
use lambda_simulator::Simulator;
use serde_json::json;
use std::time::Duration;
let simulator = Simulator::builder().build().await?;
let request_id = simulator.enqueue_payload(json!({"test": "data"})).await;
let state = simulator.wait_for_invocation_complete(&request_id, Duration::from_secs(5)).await?;
assert_eq!(state.status, lambda_simulator::InvocationStatus::Success);Sourcepub async fn wait_for<F, Fut>(
&self,
condition: F,
timeout: Duration,
) -> SimulatorResult<()>
pub async fn wait_for<F, Fut>( &self, condition: F, timeout: Duration, ) -> SimulatorResult<()>
Waits for a condition to become true.
This is a general-purpose helper that polls a condition function.
For common conditions, use specific helpers like wait_for_invocation_complete.
§Arguments
condition- Async function that returns true when the condition is mettimeout- Maximum duration to wait
§Errors
Returns SimulatorError::Timeout if the condition doesn’t become true within the timeout.
§Examples
use lambda_simulator::Simulator;
use std::time::Duration;
let simulator = Simulator::builder().build().await?;
// Wait for 3 extensions to register
simulator.wait_for(
|| async { simulator.extension_count().await >= 3 },
Duration::from_secs(5)
).await?;Sourcepub async fn enable_telemetry_capture(&self)
pub async fn enable_telemetry_capture(&self)
Enables test mode telemetry capture.
When enabled, all telemetry events are captured in memory, avoiding the need to set up HTTP servers in tests.
§Examples
use lambda_simulator::Simulator;
use serde_json::json;
let simulator = Simulator::builder().build().await?;
simulator.enable_telemetry_capture().await;
simulator.enqueue_payload(json!({"test": "data"})).await;
let events = simulator.get_telemetry_events().await;
assert!(!events.is_empty());Sourcepub async fn get_telemetry_events(&self) -> Vec<TelemetryEvent>
pub async fn get_telemetry_events(&self) -> Vec<TelemetryEvent>
Gets all captured telemetry events.
Returns an empty vector if telemetry capture is not enabled.
Sourcepub async fn get_telemetry_events_by_type(
&self,
event_type: &str,
) -> Vec<TelemetryEvent>
pub async fn get_telemetry_events_by_type( &self, event_type: &str, ) -> Vec<TelemetryEvent>
Gets captured telemetry events filtered by event type.
§Arguments
event_type- The event type to filter by (e.g., “platform.start”)
§Examples
use lambda_simulator::Simulator;
use serde_json::json;
let simulator = Simulator::builder().build().await?;
simulator.enable_telemetry_capture().await;
simulator.enqueue_payload(json!({"test": "data"})).await;
let start_events = simulator.get_telemetry_events_by_type("platform.start").await;
assert_eq!(start_events.len(), 1);Sourcepub async fn clear_telemetry_events(&self)
pub async fn clear_telemetry_events(&self)
Clears all captured telemetry events.
Sourcepub async fn phase(&self) -> SimulatorPhase
pub async fn phase(&self) -> SimulatorPhase
Gets the current lifecycle phase of the simulator.
§Examples
use lambda_simulator::{Simulator, SimulatorPhase};
let simulator = Simulator::builder().build().await?;
assert_eq!(simulator.phase().await, SimulatorPhase::Initializing);Sourcepub async fn wait_for_phase(
&self,
target_phase: SimulatorPhase,
timeout: Duration,
) -> SimulatorResult<()>
pub async fn wait_for_phase( &self, target_phase: SimulatorPhase, timeout: Duration, ) -> SimulatorResult<()>
Waits for the simulator to reach a specific phase.
§Arguments
target_phase- The phase to wait fortimeout- Maximum duration to wait
§Errors
Returns SimulatorError::Timeout if the phase is not reached within the timeout.
§Examples
use lambda_simulator::{Simulator, SimulatorPhase};
use std::time::Duration;
let simulator = Simulator::builder().build().await?;
// Wait for runtime to initialize
simulator.wait_for_phase(SimulatorPhase::Ready, Duration::from_secs(5)).await?;Sourcepub async fn mark_initialized(&self)
pub async fn mark_initialized(&self)
Marks the runtime as initialized and transitions to Ready phase.
This is typically called internally when the first runtime polls for an invocation, but can be called explicitly in tests.
Sourcepub fn is_frozen(&self) -> bool
pub fn is_frozen(&self) -> bool
Returns whether the runtime process is currently frozen.
This is useful in tests to verify freeze behaviour.
§Examples
use lambda_simulator::{Simulator, FreezeMode};
let simulator = Simulator::builder()
.freeze_mode(FreezeMode::Process)
.runtime_pid(12345)
.build()
.await?;
assert!(!simulator.is_frozen());Sourcepub async fn wait_for_frozen(&self, timeout: Duration) -> SimulatorResult<()>
pub async fn wait_for_frozen(&self, timeout: Duration) -> SimulatorResult<()>
Waits for the process to become frozen.
This is useful in tests to verify freeze behaviour after sending an invocation response.
§Arguments
timeout- Maximum duration to wait
§Errors
Returns SimulatorError::Timeout if the process doesn’t freeze within the timeout.
§Examples
use lambda_simulator::{Simulator, FreezeMode};
use std::time::Duration;
let simulator = Simulator::builder()
.freeze_mode(FreezeMode::Process)
.runtime_pid(12345)
.build()
.await?;
// After an invocation completes...
simulator.wait_for_frozen(Duration::from_secs(5)).await?;
assert!(simulator.is_frozen());Sourcepub fn freeze_mode(&self) -> FreezeMode
pub fn freeze_mode(&self) -> FreezeMode
Returns the current freeze mode.
Sourcepub fn freeze_epoch(&self) -> u64
pub fn freeze_epoch(&self) -> u64
Returns the current freeze epoch.
The epoch increments on each unfreeze operation. This can be used in tests to verify freeze/thaw cycles.
Sourcepub fn register_freeze_pid(&self, pid: u32)
pub fn register_freeze_pid(&self, pid: u32)
Registers a process ID for freeze/thaw operations.
In real AWS Lambda, the entire execution environment is frozen between invocations - this includes the runtime and all extension processes. Use this method to register PIDs after spawning processes, so they will be included in freeze/thaw cycles.
§Examples
use lambda_simulator::{Simulator, FreezeMode};
use std::process::Command;
let simulator = Simulator::builder()
.freeze_mode(FreezeMode::Process)
.build()
.await?;
// Spawn runtime and extension processes
let runtime = Command::new("my-runtime")
.env("AWS_LAMBDA_RUNTIME_API", simulator.runtime_api_url().replace("http://", ""))
.spawn()?;
let extension = Command::new("my-extension")
.env("AWS_LAMBDA_RUNTIME_API", simulator.runtime_api_url().replace("http://", ""))
.spawn()?;
// Register PIDs for freezing
simulator.register_freeze_pid(runtime.id());
simulator.register_freeze_pid(extension.id());Sourcepub fn spawn_process(
&self,
binary_path: impl Into<PathBuf>,
role: ProcessRole,
) -> Result<ManagedProcess, ProcessError>
pub fn spawn_process( &self, binary_path: impl Into<PathBuf>, role: ProcessRole, ) -> Result<ManagedProcess, ProcessError>
Spawns a process with Lambda environment variables and registers it for freeze/thaw.
This method:
- Injects all Lambda environment variables (from
lambda_env_vars()) - Spawns the process with the given configuration
- Automatically registers the PID for freeze/thaw if
FreezeMode::Processis enabled
The spawned process will be automatically terminated when the returned
ManagedProcess is dropped.
§Arguments
binary_path- Path to the executable binaryrole- Whether this is a runtime or extension process
§Errors
Returns ProcessError::BinaryNotFound if the binary doesn’t exist.
Returns ProcessError::SpawnFailed if the process fails to start.
§Examples
use lambda_simulator::{Simulator, FreezeMode};
use lambda_simulator::process::ProcessRole;
let simulator = Simulator::builder()
.freeze_mode(FreezeMode::Process)
.build()
.await?;
// Spawn a runtime process
// In tests, use env!("CARGO_BIN_EXE_<name>") for binary path resolution
let runtime = simulator.spawn_process(
"/path/to/runtime",
ProcessRole::Runtime,
)?;
println!("Runtime PID: {}", runtime.pid());Sourcepub fn spawn_process_with_config(
&self,
config: ProcessConfig,
) -> Result<ManagedProcess, ProcessError>
pub fn spawn_process_with_config( &self, config: ProcessConfig, ) -> Result<ManagedProcess, ProcessError>
Spawns a process with custom configuration.
This is like spawn_process but allows full control over the process
configuration, including additional environment variables and arguments.
§Examples
use lambda_simulator::Simulator;
use lambda_simulator::process::{ProcessConfig, ProcessRole};
let simulator = Simulator::builder().build().await?;
let config = ProcessConfig::new("/path/to/runtime", ProcessRole::Runtime)
.env("CUSTOM_VAR", "value")
.arg("--verbose")
.inherit_stdio(false);
let runtime = simulator.spawn_process_with_config(config)?;Sourcepub async fn wait_for_extensions_ready(
&self,
request_id: &str,
timeout: Duration,
) -> SimulatorResult<()>
pub async fn wait_for_extensions_ready( &self, request_id: &str, timeout: Duration, ) -> SimulatorResult<()>
Waits for all extensions to signal readiness for a specific invocation.
Extensions signal readiness by polling the /next endpoint after
the runtime has submitted its response. This method blocks until
all extensions subscribed to INVOKE events have signalled readiness.
§Arguments
request_id- The request ID to wait fortimeout- Maximum duration to wait
§Errors
Returns SimulatorError::Timeout if not all extensions become ready within the timeout.
§Examples
use lambda_simulator::Simulator;
use serde_json::json;
use std::time::Duration;
let simulator = Simulator::builder().build().await?;
let request_id = simulator.enqueue_payload(json!({"test": "data"})).await;
// Wait for extensions to be ready
simulator.wait_for_extensions_ready(&request_id, Duration::from_secs(5)).await?;Sourcepub async fn are_extensions_ready(&self, request_id: &str) -> bool
pub async fn are_extensions_ready(&self, request_id: &str) -> bool
Sourcepub async fn get_extension_overhead_ms(&self, request_id: &str) -> Option<f64>
pub async fn get_extension_overhead_ms(&self, request_id: &str) -> Option<f64>
Gets the extension overhead time for an invocation.
The overhead is the time between when the runtime completed its response and when all extensions signalled readiness.
§Arguments
request_id- The request ID to get overhead for
§Returns
The overhead in milliseconds, or None if the invocation is not complete
or extensions are not yet ready.
§Examples
use lambda_simulator::Simulator;
use serde_json::json;
use std::time::Duration;
let simulator = Simulator::builder().build().await?;
let request_id = simulator.enqueue_payload(json!({"test": "data"})).await;
// After invocation completes...
if let Some(overhead_ms) = simulator.get_extension_overhead_ms(&request_id).await {
println!("Extension overhead: {:.2}ms", overhead_ms);
}Sourcepub fn lambda_env_vars(&self) -> HashMap<String, String>
pub fn lambda_env_vars(&self) -> HashMap<String, String>
Generates a map of standard AWS Lambda environment variables.
This method creates a HashMap containing all the environment variables
that AWS Lambda sets for function execution. These can be used when
spawning a Lambda runtime process to simulate the real Lambda environment.
§Environment Variables Included
AWS_LAMBDA_FUNCTION_NAME- Function nameAWS_LAMBDA_FUNCTION_VERSION- Function versionAWS_LAMBDA_FUNCTION_MEMORY_SIZE- Memory allocation in MBAWS_LAMBDA_FUNCTION_ARN- Function ARN (if account_id configured)AWS_LAMBDA_LOG_GROUP_NAME- CloudWatch log group nameAWS_LAMBDA_LOG_STREAM_NAME- CloudWatch log stream nameAWS_LAMBDA_RUNTIME_API- Runtime API endpoint (host:port)AWS_LAMBDA_INITIALIZATION_TYPE- Always “on-demand” for simulatorAWS_REGION/AWS_DEFAULT_REGION- AWS regionAWS_ACCOUNT_ID- AWS account ID (if configured)AWS_EXECUTION_ENV- Runtime identifierLAMBDA_TASK_ROOT- Path to function code (/var/task)LAMBDA_RUNTIME_DIR- Path to runtime libraries (/var/runtime)TZ- Timezone (UTC)LANG- Locale (en_US.UTF-8)PATH- System pathLD_LIBRARY_PATH- Library search path_HANDLER- Handler identifier (if configured)
§Examples
use lambda_simulator::Simulator;
use std::process::Command;
let simulator = Simulator::builder()
.function_name("my-function")
.handler("bootstrap")
.build()
.await?;
let env_vars = simulator.lambda_env_vars();
let mut cmd = Command::new("./bootstrap");
for (key, value) in &env_vars {
cmd.env(key, value);
}