pub struct DurableContext { /* private fields */ }Expand description
Main context for a durable execution invocation.
DurableContext is created at the start of each Lambda invocation. It loads
the complete operation state from AWS (paginating if necessary), initializes
the replay engine, and provides the interface for durable operations.
§Construction
Use DurableContext::new to create a context from the invocation payload.
The constructor paginates through all remaining operations automatically.
§Examples
use durable_lambda_core::context::DurableContext;
use durable_lambda_core::backend::RealBackend;
use durable_lambda_core::types::ExecutionMode;
use std::sync::Arc;
use std::collections::HashMap;
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = aws_sdk_lambda::Client::new(&config);
let backend = Arc::new(RealBackend::new(client));
let ctx = DurableContext::new(
backend,
"arn:aws:lambda:us-east-1:123456789:durable-execution/my-exec".to_string(),
"initial-token".to_string(),
vec![], // initial operations from invocation payload
None, // no more pages
).await?;
match ctx.execution_mode() {
ExecutionMode::Replaying => println!("Replaying from history"),
ExecutionMode::Executing => println!("Executing new operations"),
}Implementations§
Source§impl DurableContext
impl DurableContext
Sourcepub async fn new(
backend: Arc<dyn DurableBackend>,
arn: String,
checkpoint_token: String,
initial_operations: Vec<Operation>,
next_marker: Option<String>,
) -> Result<Self, DurableError>
pub async fn new( backend: Arc<dyn DurableBackend>, arn: String, checkpoint_token: String, initial_operations: Vec<Operation>, next_marker: Option<String>, ) -> Result<Self, DurableError>
Create a new DurableContext from invocation parameters.
Loads the complete operation state by paginating through
get_execution_state until all pages are fetched. Initializes the
replay engine with the full operations map.
§Arguments
backend— The durable execution backend (real or mock).arn— The durable execution ARN.checkpoint_token— The initial checkpoint token from the invocation payload.initial_operations— First page of operations from the invocation payload.next_marker— Pagination marker for additional pages (Noneif complete).
§Errors
Returns DurableError if paginating the execution state fails.
Sourcepub fn execution_mode(&self) -> ExecutionMode
pub fn execution_mode(&self) -> ExecutionMode
Return the current execution mode (Replaying or Executing).
§Examples
use durable_lambda_core::types::ExecutionMode;
match ctx.execution_mode() {
ExecutionMode::Replaying => { /* returning cached results */ }
ExecutionMode::Executing => { /* running new operations */ }
}Sourcepub fn is_replaying(&self) -> bool
pub fn is_replaying(&self) -> bool
Return whether the context is currently replaying from history.
§Examples
if ctx.is_replaying() {
println!("Replaying cached operations");
}Sourcepub fn arn(&self) -> &str
pub fn arn(&self) -> &str
Return a reference to the durable execution ARN.
§Examples
println!("Execution ARN: {}", ctx.arn());Sourcepub fn checkpoint_token(&self) -> &str
pub fn checkpoint_token(&self) -> &str
Sourcepub fn set_checkpoint_token(&mut self, token: String)
pub fn set_checkpoint_token(&mut self, token: String)
Update the checkpoint token (called after a successful checkpoint).
§Examples
ctx.set_checkpoint_token("new-token-from-aws".to_string());Sourcepub fn backend(&self) -> &Arc<dyn DurableBackend>
pub fn backend(&self) -> &Arc<dyn DurableBackend>
Sourcepub fn replay_engine_mut(&mut self) -> &mut ReplayEngine
pub fn replay_engine_mut(&mut self) -> &mut ReplayEngine
Sourcepub fn create_child_context(&self, parent_op_id: &str) -> DurableContext
pub fn create_child_context(&self, parent_op_id: &str) -> DurableContext
Create a child context for isolated operation ID namespacing.
The child context shares the same backend and ARN but gets its own
ReplayEngine with a parent-scoped OperationIdGenerator. Operations
within the child context produce deterministic IDs scoped under
parent_op_id, preventing collisions with the parent or sibling contexts.
Used internally by parallel and child_context operations.
§Arguments
parent_op_id— The operation ID that scopes this child context
§Examples
let child = ctx.create_child_context("branch-op-id");
// child operations will have IDs scoped under "branch-op-id"Sourcepub fn replay_engine(&self) -> &ReplayEngine
pub fn replay_engine(&self) -> &ReplayEngine
Return a reference to the replay engine.
§Examples
let engine = ctx.replay_engine();
assert!(!engine.operations().is_empty() || true);Sourcepub fn parent_op_id(&self) -> Option<&str>
pub fn parent_op_id(&self) -> Option<&str>
Return the parent operation ID, if this is a child context.
Returns None for the root context. Returns the parent’s operation ID
for child contexts created via create_child_context.
Used by replay-safe logging for hierarchical tracing.
§Examples
if let Some(parent_id) = ctx.parent_op_id() {
println!("Child context under parent: {parent_id}");
}Sourcepub fn enable_batch_mode(&mut self)
pub fn enable_batch_mode(&mut self)
Enable batch checkpoint mode.
When enabled, step operation checkpoints (START and SUCCEED/FAIL)
are accumulated in memory instead of being sent immediately.
Call flush_batch to send all accumulated
updates in a single AWS API call.
Batch mode applies only to step operations. wait, invoke,
and callback always send checkpoints immediately because they
produce suspension errors that require the checkpoint to be
persisted before the function exits.
§Examples
ctx.enable_batch_mode();
let _: Result<i32, String> = ctx.step("step1", || async { Ok(1) }).await?;
let _: Result<i32, String> = ctx.step("step2", || async { Ok(2) }).await?;
ctx.flush_batch().await?; // sends all updates in one callSourcepub fn is_batch_mode(&self) -> bool
pub fn is_batch_mode(&self) -> bool
Return whether batch checkpoint mode is active.
Sourcepub fn pending_update_count(&self) -> usize
pub fn pending_update_count(&self) -> usize
Return the number of pending (unflushed) updates.
Sourcepub fn compensation_count(&self) -> usize
pub fn compensation_count(&self) -> usize
Return the number of registered compensation closures.
Useful for asserting compensation registration in tests.
§Examples
assert_eq!(ctx.compensation_count(), 0);Sourcepub async fn flush_batch(&mut self) -> Result<(), DurableError>
pub async fn flush_batch(&mut self) -> Result<(), DurableError>
Flush all accumulated checkpoint updates in a single AWS API call.
No-op if no updates are pending. After flushing, the checkpoint token is updated from the response.
§Errors
Returns DurableError if the batch checkpoint call fails.
§Examples
ctx.enable_batch_mode();
// ... run several steps ...
ctx.flush_batch().await?;Source§impl DurableContext
impl DurableContext
Sourcepub async fn create_callback(
&mut self,
name: &str,
options: CallbackOptions,
) -> Result<CallbackHandle, DurableError>
pub async fn create_callback( &mut self, name: &str, options: CallbackOptions, ) -> Result<CallbackHandle, DurableError>
Register a callback and return a handle with the server-generated callback ID.
During execution mode, sends a START checkpoint with callback configuration
and returns a CallbackHandle containing the callback_id that external
systems use to signal completion via SendDurableExecutionCallbackSuccess,
SendDurableExecutionCallbackFailure, or SendDurableExecutionCallbackHeartbeat.
During replay mode, extracts the cached callback_id from history without
sending any checkpoint.
Important: This method NEVER suspends. Suspension happens in
callback_result when the callback hasn’t
been signaled yet.
§Arguments
name— Human-readable name for the callback operationoptions— Timeout configuration (seeCallbackOptions)
§Errors
Returns DurableError::CheckpointFailed if the AWS checkpoint API
call fails or if the callback_id cannot be extracted from the response.
§Examples
use durable_lambda_core::types::CallbackOptions;
let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
println!("Callback ID for external system: {}", handle.callback_id);Sourcepub fn callback_result<T: DeserializeOwned>(
&self,
handle: &CallbackHandle,
) -> Result<T, DurableError>
pub fn callback_result<T: DeserializeOwned>( &self, handle: &CallbackHandle, ) -> Result<T, DurableError>
Check the result of a previously created callback.
Return the deserialized success payload if the callback has been signaled with success. Return an error if the callback failed, timed out, or hasn’t been signaled yet.
Important: This is NOT an async/durable operation — it only reads the current operation state. It does NOT generate an operation ID or create checkpoints.
§Arguments
handle— TheCallbackHandlereturned bycreate_callback
§Errors
Returns DurableError::CallbackSuspended if the callback has not
been signaled yet (the handler should propagate this to exit).
Returns DurableError::CallbackFailed if the callback was signaled
with failure, was cancelled, or timed out.
Returns DurableError::Deserialization if the callback result
cannot be deserialized as type T.
§Examples
use durable_lambda_core::types::CallbackOptions;
let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
// ... pass handle.callback_id to external system ...
let result: String = ctx.callback_result(&handle)?;Source§impl DurableContext
impl DurableContext
Sourcepub async fn child_context<T, F, Fut>(
&mut self,
name: &str,
f: F,
) -> Result<T, DurableError>where
T: Serialize + DeserializeOwned + Send,
F: FnOnce(DurableContext) -> Fut + Send,
Fut: Future<Output = Result<T, DurableError>> + Send,
pub async fn child_context<T, F, Fut>(
&mut self,
name: &str,
f: F,
) -> Result<T, DurableError>where
T: Serialize + DeserializeOwned + Send,
F: FnOnce(DurableContext) -> Fut + Send,
Fut: Future<Output = Result<T, DurableError>> + Send,
Execute an isolated subflow with its own checkpoint namespace.
The closure receives an owned child DurableContext whose operations
are namespaced under this child context’s operation ID, preventing
collisions with the parent or sibling child contexts.
During replay mode, returns the cached result without re-executing the closure.
§Arguments
name— Human-readable name for the child context operationf— Closure receiving an ownedDurableContextfor the subflow
§Errors
Returns DurableError::ChildContextFailed if the child context
is found in a failed state during replay.
Returns DurableError::CheckpointFailed if checkpoint API calls fail.
§Examples
let result: i32 = ctx.child_context("sub_workflow", |mut child_ctx| async move {
let r: Result<i32, String> = child_ctx.step("inner_step", || async { Ok(42) }).await?;
Ok(r.unwrap())
}).await?;
assert_eq!(result, 42);Source§impl DurableContext
impl DurableContext
Sourcepub async fn step_with_compensation<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
forward_fn: F,
compensate_fn: G,
) -> Result<Result<T, E>, DurableError>where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
pub async fn step_with_compensation<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
forward_fn: F,
compensate_fn: G,
) -> Result<Result<T, E>, DurableError>where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
Execute a forward step and register a compensation closure on success.
Delegates the forward execution to step. If the step
succeeds (returns Ok(Ok(value))), the compensate_fn closure is
registered and will be executed by run_compensations.
If the forward step fails (returns Ok(Err(e))), no compensation is
registered — only successful steps have compensations that need undoing.
§Arguments
name— Human-readable name for the forward step operation.forward_fn— Closure to execute the forward step.compensate_fn— Closure to execute when rolling back; receives the forward step’s success value.
§Returns
Ok(Ok(T))— Forward step succeeded; compensation registered.Ok(Err(E))— Forward step returned a user error; no compensation registered.Err(DurableError)— SDK-level failure (checkpoint, serialization).
§Examples
// Book a hotel room and register its cancellation as compensation
let booking_result: Result<String, String> = ctx.step_with_compensation(
"book_hotel",
|| async { Ok("BOOKING-123".to_string()) },
|booking_id| async move {
// Cancel the hotel booking
println!("Cancelling booking: {booking_id}");
Ok(())
},
).await?;
// Later, roll back all registered compensations
let comp_result = ctx.run_compensations().await?;
assert!(comp_result.all_succeeded);Sourcepub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
options: StepOptions,
forward_fn: F,
compensate_fn: G,
) -> Result<Result<T, E>, DurableError>where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
pub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
options: StepOptions,
forward_fn: F,
compensate_fn: G,
) -> Result<Result<T, E>, DurableError>where
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
Execute a forward step (with options) and register a compensation closure on success.
Like step_with_compensation but accepts
StepOptions for configuring retries, backoff, and timeouts on the
forward step.
§Arguments
name— Human-readable name for the forward step operation.options— Step configuration (retries, backoff, timeout).forward_fn— Closure to execute the forward step.compensate_fn— Closure to execute when rolling back.
§Examples
use durable_lambda_core::types::StepOptions;
let result: Result<String, String> = ctx.step_with_compensation_opts(
"book_hotel",
StepOptions::new().retries(3),
|| async { Ok("BOOKING-123".to_string()) },
|booking_id| async move {
println!("Cancelling: {booking_id}");
Ok(())
},
).await?;Sourcepub async fn run_compensations(
&mut self,
) -> Result<CompensationResult, DurableError>
pub async fn run_compensations( &mut self, ) -> Result<CompensationResult, DurableError>
Execute all registered compensations in reverse registration order.
Drains the registered compensations and executes them in LIFO order
(last registered runs first — stack semantics). Each compensation is
checkpointed with Context/START + Context/SUCCEED|FAIL using
sub_type = "Compensation".
All compensations are attempted even if earlier ones fail. The returned
CompensationResult captures the per-item outcomes.
During replay, completed compensations are skipped — their status is read from the execution history to support partial rollback resume.
§Returns
Returns Ok(CompensationResult) always (individual failures are captured
in the result items, not propagated as errors). Returns Err(DurableError)
only on AWS checkpoint failures.
§Examples
// After some compensable steps fail:
let result = ctx.run_compensations().await?;
if !result.all_succeeded {
for item in &result.items {
if let Some(err) = &item.error {
eprintln!("Compensation {} failed: {}", item.name, err);
}
}
}Source§impl DurableContext
impl DurableContext
Sourcepub async fn invoke<T, P>(
&mut self,
name: &str,
function_name: &str,
payload: &P,
) -> Result<T, DurableError>where
T: DeserializeOwned,
P: Serialize,
pub async fn invoke<T, P>(
&mut self,
name: &str,
function_name: &str,
payload: &P,
) -> Result<T, DurableError>where
T: DeserializeOwned,
P: Serialize,
Durably invoke another Lambda function and return its result.
During execution mode, serializes the payload, sends a START checkpoint
with the target function name, and returns DurableError::InvokeSuspended
to signal the function should exit. The server invokes the target function
asynchronously and re-invokes this Lambda when complete.
During replay mode, returns the cached result without re-invoking the target function.
If the target function completes immediately (detected via the double-check pattern), the result is returned directly without suspending.
§Arguments
name— Human-readable name for the invoke operationfunction_name— Name or ARN of the target Lambda functionpayload— Input payload to send to the target function
§Errors
Returns DurableError::InvokeSuspended when the invoke has been
checkpointed and the target is still executing — the handler must
propagate this to exit.
Returns DurableError::InvokeFailed if the target function failed,
timed out, or was stopped.
Returns DurableError::Serialization if the payload cannot be
serialized.
Returns DurableError::Deserialization if the result cannot be
deserialized as type T.
Returns DurableError::CheckpointFailed if the AWS checkpoint API
call fails.
§Examples
let result: String = ctx.invoke(
"call_processor",
"payment-processor-lambda",
&serde_json::json!({"order_id": 123}),
).await?;
println!("Target returned: {result}");Source§impl DurableContext
impl DurableContext
Sourcepub fn log(&self, message: &str)
pub fn log(&self, message: &str)
Emit a replay-safe info-level log message.
During execution mode, emits the message via tracing::info! with
execution context enrichment (execution ARN, parent ID for child
contexts). During replay mode, the call is a no-op — no log output
is produced.
§Arguments
message— The log message to emit
§Examples
ctx.log("Order processing started");
// During replay, this produces no output.Sourcepub fn log_with_data(&self, message: &str, data: &Value)
pub fn log_with_data(&self, message: &str, data: &Value)
Emit a replay-safe info-level log message with structured data.
During execution mode, emits the message and structured data via
tracing::info!. During replay mode, the call is a no-op.
§Arguments
message— The log message to emitdata— Structured data to include in the log event
§Examples
ctx.log_with_data("Order processed", &serde_json::json!({"order_id": 42}));Sourcepub fn log_debug_with_data(&self, message: &str, data: &Value)
pub fn log_debug_with_data(&self, message: &str, data: &Value)
Emit a replay-safe debug-level log message with structured data.
During execution mode, emits via tracing::debug! with data field.
During replay mode, the call is a no-op.
§Arguments
message— The log message to emitdata— Structured data to include in the log event
§Examples
ctx.log_debug_with_data("Request details", &serde_json::json!({"method": "POST"}));Sourcepub fn log_warn_with_data(&self, message: &str, data: &Value)
pub fn log_warn_with_data(&self, message: &str, data: &Value)
Emit a replay-safe warn-level log message with structured data.
During execution mode, emits via tracing::warn! with data field.
During replay mode, the call is a no-op.
§Arguments
message— The log message to emitdata— Structured data to include in the log event
§Examples
ctx.log_warn_with_data("Retry attempt", &serde_json::json!({"attempt": 3}));Sourcepub fn log_error_with_data(&self, message: &str, data: &Value)
pub fn log_error_with_data(&self, message: &str, data: &Value)
Emit a replay-safe error-level log message with structured data.
During execution mode, emits via tracing::error! with data field.
During replay mode, the call is a no-op.
§Arguments
message— The log message to emitdata— Structured data to include in the log event
§Examples
ctx.log_error_with_data("Payment failed", &serde_json::json!({"error": "timeout"}));Source§impl DurableContext
impl DurableContext
Sourcepub async fn map<T, I, F, Fut>(
&mut self,
name: &str,
items: Vec<I>,
options: MapOptions,
f: F,
) -> Result<BatchResult<T>, DurableError>where
T: Serialize + DeserializeOwned + Send + 'static,
I: Send + 'static,
F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
pub async fn map<T, I, F, Fut>(
&mut self,
name: &str,
items: Vec<I>,
options: MapOptions,
f: F,
) -> Result<BatchResult<T>, DurableError>where
T: Serialize + DeserializeOwned + Send + 'static,
I: Send + 'static,
F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
Process a collection of items in parallel and return their results.
Apply the closure f to each item concurrently. Each item receives an
owned child DurableContext with an isolated checkpoint namespace.
Items execute via tokio::spawn and must satisfy Send + 'static.
When batch_size is configured via MapOptions, items process in
sequential batches — each batch completes before the next begins.
During replay mode, returns the cached BatchResult without
re-executing any item closures.
§Arguments
name— Human-readable name for the map operationitems— Collection of items to processoptions— Map configuration (batching)f— Closure applied to each item with an owned child context
§Errors
Returns DurableError::MapFailed if the map operation itself fails
(e.g., checkpoint error, task panic). Individual item failures are
captured in the BatchResult rather than propagated as errors.
Returns DurableError::CheckpointFailed if checkpoint API calls fail.
§Examples
use durable_lambda_core::types::MapOptions;
use durable_lambda_core::context::DurableContext;
use durable_lambda_core::error::DurableError;
let items = vec![1, 2, 3];
let result = ctx.map(
"process_items",
items,
MapOptions::new(),
|item: i32, mut child_ctx: DurableContext| async move {
let r: Result<i32, String> = child_ctx.step("double", move || async move { Ok(item * 2) }).await?;
Ok(r.unwrap())
},
).await?;
assert_eq!(result.results.len(), 3);Source§impl DurableContext
impl DurableContext
Sourcepub async fn parallel<T, F, Fut>(
&mut self,
name: &str,
branches: Vec<F>,
_options: ParallelOptions,
) -> Result<BatchResult<T>, DurableError>where
T: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce(DurableContext) -> Fut + Send + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
pub async fn parallel<T, F, Fut>(
&mut self,
name: &str,
branches: Vec<F>,
_options: ParallelOptions,
) -> Result<BatchResult<T>, DurableError>where
T: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce(DurableContext) -> Fut + Send + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
Execute multiple branches concurrently and return their results.
Each branch receives an owned child DurableContext with an isolated
checkpoint namespace. Branches execute concurrently via tokio::spawn
and must satisfy Send + 'static bounds.
During replay mode, returns the cached BatchResult without
re-executing any branches.
§Arguments
name— Human-readable name for the parallel operationbranches— Collection of branch closures, each receiving an ownedDurableContext_options— Parallel configuration (reserved for future completion criteria)
§Errors
Returns DurableError::ParallelFailed if the parallel operation itself
fails (e.g., checkpoint error). Individual branch failures are captured
in the BatchResult rather than propagated as errors.
Returns DurableError::CheckpointFailed if checkpoint API calls fail.
§Examples
use durable_lambda_core::types::ParallelOptions;
use durable_lambda_core::context::DurableContext;
use durable_lambda_core::error::DurableError;
use std::pin::Pin;
use std::future::Future;
type BranchFn = Box<dyn FnOnce(DurableContext) -> Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>> + Send>;
let branches: Vec<BranchFn> = vec![
Box::new(|mut ctx| Box::pin(async move {
let r: Result<i32, String> = ctx.step("validate", || async { Ok(1) }).await?;
Ok(r.unwrap())
})),
Box::new(|mut ctx| Box::pin(async move {
let r: Result<i32, String> = ctx.step("check", || async { Ok(2) }).await?;
Ok(r.unwrap())
})),
];
let result = ctx.parallel("fan_out", branches, ParallelOptions::new()).await?;
assert_eq!(result.results.len(), 2);Source§impl DurableContext
impl DurableContext
Sourcepub async fn step<T, E, F, Fut>(
&mut self,
name: &str,
f: F,
) -> Result<Result<T, E>, DurableError>
pub async fn step<T, E, F, Fut>( &mut self, name: &str, f: F, ) -> Result<Result<T, E>, DurableError>
Execute a named step with checkpointing.
During execution mode, runs the closure and checkpoints the result to AWS. During replay mode, returns the previously checkpointed result without executing the closure.
This is a convenience wrapper around step_with_options
with default options (no retries).
§Arguments
name— Human-readable step name, used as checkpoint metadataf— Closure to execute (skipped during replay)
§Returns
Returns Ok(Ok(T)) on successful step execution or replay.
Returns Ok(Err(E)) when the step closure returned an error (also checkpointed).
Returns Err(DurableError) on SDK-level failures (checkpoint, serialization).
§Errors
Returns DurableError::Serialization if the result cannot be serialized to JSON.
Returns DurableError::Deserialization if a cached result cannot be deserialized.
Returns DurableError::CheckpointFailed or DurableError::AwsSdkOperation
if the AWS checkpoint API call fails.
§Examples
let result: Result<i32, String> = ctx.step("validate_order", || async {
Ok(42)
}).await?;
match result {
Ok(value) => println!("Step succeeded: {value}"),
Err(e) => println!("Step failed: {e}"),
}Sourcepub async fn step_with_options<T, E, F, Fut>(
&mut self,
name: &str,
options: StepOptions,
f: F,
) -> Result<Result<T, E>, DurableError>
pub async fn step_with_options<T, E, F, Fut>( &mut self, name: &str, options: StepOptions, f: F, ) -> Result<Result<T, E>, DurableError>
Execute a named step with checkpointing and retry configuration.
During execution mode, runs the closure and checkpoints the result.
If the closure fails and retries are configured, sends a RETRY checkpoint
and returns DurableError::StepRetryScheduled to signal the function
should exit. The server re-invokes the Lambda after the backoff delay.
During replay mode, returns the previously checkpointed result without executing the closure.
§Arguments
name— Human-readable step name, used as checkpoint metadataoptions— Retry configuration (seeStepOptions)f— Closure to execute (skipped during replay)
§Errors
Returns DurableError::StepRetryScheduled when a retry has been
scheduled — the handler must propagate this to exit the function.
Returns DurableError::Serialization if the result cannot be serialized.
Returns DurableError::Deserialization if a cached result cannot be deserialized.
§Examples
use durable_lambda_core::types::StepOptions;
let result: Result<i32, String> = ctx.step_with_options(
"charge_payment",
StepOptions::new().retries(3).backoff_seconds(5),
|| async { Ok(100) },
).await?;Source§impl DurableContext
impl DurableContext
Sourcepub async fn wait(
&mut self,
name: &str,
duration_secs: i32,
) -> Result<(), DurableError>
pub async fn wait( &mut self, name: &str, duration_secs: i32, ) -> Result<(), DurableError>
Suspend execution for the specified duration.
During execution mode, sends a START checkpoint with the wait duration
and returns DurableError::WaitSuspended to signal the function
should exit. The durable execution server re-invokes the Lambda after
the duration elapses.
During replay mode, returns Ok(()) immediately if the wait has
already completed (status SUCCEEDED in history).
§Arguments
name— Human-readable name for the wait operationduration_secs— Duration to wait in seconds (1 to 31,622,400)
§Errors
Returns DurableError::WaitSuspended when the wait has been
checkpointed — the handler must propagate this to exit the function.
Returns DurableError::CheckpointFailed if the AWS checkpoint
API call fails.
§Examples
// Wait 30 seconds before continuing.
ctx.wait("cooldown", 30).await?;
// Execution continues here after the wait completes.
println!("Wait completed!");Trait Implementations§
Source§impl DurableContextOps for DurableContext
impl DurableContextOps for DurableContext
Source§fn step<T, E, F, Fut>(
&mut self,
name: &str,
f: F,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
fn step<T, E, F, Fut>( &mut self, name: &str, f: F, ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
Source§fn step_with_options<T, E, F, Fut>(
&mut self,
name: &str,
options: StepOptions,
f: F,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
fn step_with_options<T, E, F, Fut>( &mut self, name: &str, options: StepOptions, f: F, ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
Source§fn wait(
&mut self,
name: &str,
duration_secs: i32,
) -> impl Future<Output = Result<(), DurableError>> + Send
fn wait( &mut self, name: &str, duration_secs: i32, ) -> impl Future<Output = Result<(), DurableError>> + Send
Source§fn create_callback(
&mut self,
name: &str,
options: CallbackOptions,
) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send
fn create_callback( &mut self, name: &str, options: CallbackOptions, ) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send
Source§fn invoke<T, P>(
&mut self,
name: &str,
function_name: &str,
payload: &P,
) -> impl Future<Output = Result<T, DurableError>> + Send
fn invoke<T, P>( &mut self, name: &str, function_name: &str, payload: &P, ) -> impl Future<Output = Result<T, DurableError>> + Send
Source§fn parallel<T, F, Fut>(
&mut self,
name: &str,
branches: Vec<F>,
options: ParallelOptions,
) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce(DurableContext) -> Fut + Send + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
fn parallel<T, F, Fut>(
&mut self,
name: &str,
branches: Vec<F>,
options: ParallelOptions,
) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce(DurableContext) -> Fut + Send + 'static,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
Source§fn child_context<T, F, Fut>(
&mut self,
name: &str,
f: F,
) -> impl Future<Output = Result<T, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send,
F: FnOnce(DurableContext) -> Fut + Send,
Fut: Future<Output = Result<T, DurableError>> + Send,
fn child_context<T, F, Fut>(
&mut self,
name: &str,
f: F,
) -> impl Future<Output = Result<T, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send,
F: FnOnce(DurableContext) -> Fut + Send,
Fut: Future<Output = Result<T, DurableError>> + Send,
Source§fn map<T, I, F, Fut>(
&mut self,
name: &str,
items: Vec<I>,
options: MapOptions,
f: F,
) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send + 'static,
I: Send + 'static,
F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
fn map<T, I, F, Fut>(
&mut self,
name: &str,
items: Vec<I>,
options: MapOptions,
f: F,
) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send + 'static,
I: Send + 'static,
F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
Source§fn step_with_compensation<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
forward_fn: F,
compensate_fn: G,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
fn step_with_compensation<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
forward_fn: F,
compensate_fn: G,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
Source§fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
options: StepOptions,
forward_fn: F,
compensate_fn: G,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
&mut self,
name: &str,
options: StepOptions,
forward_fn: F,
compensate_fn: G,
) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Sendwhere
T: Serialize + DeserializeOwned + Send + 'static,
E: Serialize + DeserializeOwned + Send + 'static,
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<T, E>> + Send + 'static,
G: FnOnce(T) -> GFut + Send + 'static,
GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
Source§fn run_compensations(
&mut self,
) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send
fn run_compensations( &mut self, ) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send
Source§fn callback_result<T: DeserializeOwned>(
&self,
handle: &CallbackHandle,
) -> Result<T, DurableError>
fn callback_result<T: DeserializeOwned>( &self, handle: &CallbackHandle, ) -> Result<T, DurableError>
Source§fn execution_mode(&self) -> ExecutionMode
fn execution_mode(&self) -> ExecutionMode
Source§fn is_replaying(&self) -> bool
fn is_replaying(&self) -> bool
Source§fn checkpoint_token(&self) -> &str
fn checkpoint_token(&self) -> &str
Source§fn log_with_data(&self, message: &str, data: &Value)
fn log_with_data(&self, message: &str, data: &Value)
Source§fn log_debug_with_data(&self, message: &str, data: &Value)
fn log_debug_with_data(&self, message: &str, data: &Value)
Source§fn log_warn_with_data(&self, message: &str, data: &Value)
fn log_warn_with_data(&self, message: &str, data: &Value)
Source§fn log_error_with_data(&self, message: &str, data: &Value)
fn log_error_with_data(&self, message: &str, data: &Value)
Source§fn enable_batch_mode(&mut self)
fn enable_batch_mode(&mut self)
Source§fn flush_batch(
&mut self,
) -> impl Future<Output = Result<(), DurableError>> + Send
fn flush_batch( &mut self, ) -> impl Future<Output = Result<(), DurableError>> + Send
Auto Trait Implementations§
impl Freeze for DurableContext
impl !RefUnwindSafe for DurableContext
impl Send for DurableContext
impl !Sync for DurableContext
impl Unpin for DurableContext
impl UnsafeUnpin for DurableContext
impl !UnwindSafe for DurableContext
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more