Skip to main content

BuilderContext

Struct BuilderContext 

Source
pub struct BuilderContext { /* private fields */ }
Expand description

Builder-pattern context for durable Lambda operations.

Thin wrapper over DurableContext providing the builder-approach API. All operations delegate directly to the inner context — no replay logic, no checkpoint logic, just delegation.

Constructed internally by DurableHandlerBuilder::run — users never create this directly.

§Examples

use durable_lambda_builder::prelude::*;

#[tokio::main]
async fn main() -> Result<(), lambda_runtime::Error> {
    durable_lambda_builder::handler(|event: serde_json::Value, mut ctx: BuilderContext| async move {
        let result: Result<i32, String> = ctx.step("validate", || async {
            Ok(42)
        }).await?;
        Ok(serde_json::json!({"validated": result.unwrap()}))
    })
    .run()
    .await
}

Implementations§

Source§

impl BuilderContext

Source

pub async fn step<T, E, F, Fut>( &mut self, name: &str, f: F, ) -> Result<Result<T, E>, DurableError>
where T: Serialize + DeserializeOwned + Send + 'static, E: Serialize + DeserializeOwned + Send + 'static, F: FnOnce() -> Fut + Send + 'static, Fut: Future<Output = Result<T, E>> + Send + 'static,

Execute a named step with checkpointing.

During execution mode, runs the closure and checkpoints the result to AWS. During replay mode, returns the previously checkpointed result without executing the closure.

§Arguments
  • name — Human-readable step name, used as checkpoint metadata
  • f — Closure to execute (skipped during replay)
§Errors

Returns DurableError::Serialization if the result cannot be serialized. Returns DurableError::Deserialization if a cached result cannot be deserialized. Returns DurableError::CheckpointFailed if the AWS checkpoint API call fails.

§Examples
let result: Result<i32, String> = ctx.step("validate_order", || async {
    Ok(42)
}).await?;

match result {
    Ok(value) => println!("Step succeeded: {value}"),
    Err(e) => println!("Step failed: {e}"),
}
Source

pub async fn step_with_options<T, E, F, Fut>( &mut self, name: &str, options: StepOptions, f: F, ) -> Result<Result<T, E>, DurableError>
where T: Serialize + DeserializeOwned + Send + 'static, E: Serialize + DeserializeOwned + Send + 'static, F: FnOnce() -> Fut + Send + 'static, Fut: Future<Output = Result<T, E>> + Send + 'static,

Execute a named step with checkpointing and retry configuration.

If the closure fails and retries are configured, sends a RETRY checkpoint and returns DurableError::StepRetryScheduled to signal the function should exit.

§Arguments
  • name — Human-readable step name, used as checkpoint metadata
  • options — Retry configuration (see StepOptions)
  • f — Closure to execute (skipped during replay)
§Errors

Returns DurableError::StepRetryScheduled when a retry has been scheduled. Returns DurableError::Serialization if the result cannot be serialized. Returns DurableError::Deserialization if a cached result cannot be deserialized.

§Examples
use durable_lambda_builder::prelude::*;

let result: Result<i32, String> = ctx.step_with_options(
    "charge_payment",
    StepOptions::new().retries(3).backoff_seconds(5),
    || async { Ok(100) },
).await?;
Source

pub async fn wait( &mut self, name: &str, duration_secs: i32, ) -> Result<(), DurableError>

Suspend execution for the specified duration.

During execution mode, sends a START checkpoint and returns DurableError::WaitSuspended to signal the function should exit. The server re-invokes after the duration.

During replay mode, returns Ok(()) immediately if the wait has already completed.

§Arguments
  • name — Human-readable name for the wait operation
  • duration_secs — Duration to wait in seconds
§Errors

Returns DurableError::WaitSuspended when the wait has been checkpointed.

§Examples
ctx.wait("cooldown", 30).await?;
println!("Wait completed!");
Source

pub async fn create_callback( &mut self, name: &str, options: CallbackOptions, ) -> Result<CallbackHandle, DurableError>

Register a callback and return a handle with the server-generated callback ID.

During execution mode, sends a START checkpoint and returns a CallbackHandle containing the callback_id for external systems. During replay mode, extracts the cached callback_id from history.

This method NEVER suspends. Use callback_result to check the callback outcome (which suspends if not yet signaled).

§Arguments
  • name — Human-readable name for the callback operation
  • options — Timeout configuration (see CallbackOptions)
§Errors

Returns DurableError::CheckpointFailed if the AWS checkpoint API call fails.

§Examples
use durable_lambda_builder::prelude::*;

let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
println!("Callback ID: {}", handle.callback_id);
Source

pub fn callback_result<T: DeserializeOwned>( &self, handle: &CallbackHandle, ) -> Result<T, DurableError>

Check the result of a previously created callback.

Return the deserialized success payload if the callback has been signaled. Return an error if the callback failed, timed out, or hasn’t been signaled yet.

§Arguments
§Errors

Returns DurableError::CallbackSuspended if not yet signaled. Returns DurableError::CallbackFailed if the callback failed or timed out.

§Examples
use durable_lambda_builder::prelude::*;

let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
let result: String = ctx.callback_result(&handle)?;
Source

pub async fn invoke<T, P>( &mut self, name: &str, function_name: &str, payload: &P, ) -> Result<T, DurableError>

Durably invoke another Lambda function and return its result.

During execution mode, serializes the payload, sends a START checkpoint, and returns DurableError::InvokeSuspended to signal exit. The server invokes the target asynchronously and re-invokes this Lambda when done.

During replay, returns the cached result without re-invoking.

§Arguments
  • name — Human-readable name for the invoke operation
  • function_name — Name or ARN of the target Lambda function
  • payload — Input payload to send to the target function
§Errors

Returns DurableError::InvokeSuspended when the target is still executing. Returns DurableError::InvokeFailed if the target failed or timed out.

§Examples
let result: String = ctx.invoke(
    "call_processor",
    "payment-processor-lambda",
    &serde_json::json!({"order_id": 123}),
).await?;
Source

pub async fn parallel<T, F, Fut>( &mut self, name: &str, branches: Vec<F>, options: ParallelOptions, ) -> Result<BatchResult<T>, DurableError>
where T: Serialize + DeserializeOwned + Send + 'static, F: FnOnce(DurableContext) -> Fut + Send + 'static, Fut: Future<Output = Result<T, DurableError>> + Send + 'static,

Execute multiple branches concurrently and return their results.

Each branch receives an owned child context with an isolated checkpoint namespace. Branches satisfy Send + 'static via tokio::spawn.

§Arguments
  • name — Human-readable name for the parallel operation
  • branches — Collection of branch closures
  • options — Parallel configuration
§Errors

Returns DurableError::ParallelFailed if the operation fails.

§Examples
use durable_lambda_builder::prelude::*;
use durable_lambda_core::context::DurableContext;
use std::pin::Pin;
use std::future::Future;

type BranchFn = Box<dyn FnOnce(DurableContext) -> Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>> + Send>;

let branches: Vec<BranchFn> = vec![
    Box::new(|_ctx| Box::pin(async move { Ok(1) })),
    Box::new(|_ctx| Box::pin(async move { Ok(2) })),
];
let result = ctx.parallel("fan_out", branches, ParallelOptions::new()).await?;
Source

pub async fn child_context<T, F, Fut>( &mut self, name: &str, f: F, ) -> Result<T, DurableError>
where T: Serialize + DeserializeOwned + Send, F: FnOnce(DurableContext) -> Fut + Send, Fut: Future<Output = Result<T, DurableError>> + Send,

Execute an isolated subflow with its own checkpoint namespace.

The closure receives an owned child DurableContext whose operations are namespaced under this child context’s operation ID, preventing collisions with the parent or sibling child contexts.

During replay mode, returns the cached result without re-executing the closure.

§Arguments
  • name — Human-readable name for the child context operation
  • f — Closure receiving an owned DurableContext for the subflow
§Errors

Returns DurableError::ChildContextFailed if the child context is found in a failed state during replay. Returns DurableError::CheckpointFailed if checkpoint API calls fail.

§Examples
use durable_lambda_core::context::DurableContext;

let result: i32 = ctx.child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
    let r: Result<i32, String> = child_ctx.step("inner_step", || async { Ok(42) }).await?;
    Ok(r.unwrap())
}).await?;
assert_eq!(result, 42);
Source

pub async fn map<T, I, F, Fut>( &mut self, name: &str, items: Vec<I>, options: MapOptions, f: F, ) -> Result<BatchResult<T>, DurableError>
where T: Serialize + DeserializeOwned + Send + 'static, I: Send + 'static, F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone, Fut: Future<Output = Result<T, DurableError>> + Send + 'static,

Process a collection of items in parallel and return their results.

Apply the closure f to each item concurrently. Each item receives an owned child context with an isolated checkpoint namespace. Items satisfy Send + 'static via tokio::spawn. The closure must be Clone since it is applied to each item independently.

When batch_size is configured, items process in sequential batches.

§Arguments
  • name — Human-readable name for the map operation
  • items — Collection of items to process
  • options — Map configuration (batching)
  • f — Closure applied to each item with an owned child context
§Errors

Returns DurableError::MapFailed if the operation fails.

§Examples
use durable_lambda_builder::prelude::*;
use durable_lambda_core::context::DurableContext;

let items = vec![1, 2, 3];
let result = ctx.map(
    "process_items",
    items,
    MapOptions::new().batch_size(2),
    |item: i32, mut child_ctx: DurableContext| async move {
        let r: Result<i32, String> = child_ctx.step("double", move || async move { Ok(item * 2) }).await?;
        Ok(r.unwrap())
    },
).await?;
Source

pub async fn step_with_compensation<T, E, F, Fut, G, GFut>( &mut self, name: &str, forward_fn: F, compensate_fn: G, ) -> Result<Result<T, E>, DurableError>
where T: Serialize + DeserializeOwned + Send + 'static, E: Serialize + DeserializeOwned + Send + 'static, F: FnOnce() -> Fut + Send + 'static, Fut: Future<Output = Result<T, E>> + Send + 'static, G: FnOnce(T) -> GFut + Send + 'static, GFut: Future<Output = Result<(), DurableError>> + Send + 'static,

Register a compensatable step.

Executes the forward step and, on success, registers the compensation closure for later rollback via run_compensations.

§Examples
let result: Result<i32, String> = ctx.step_with_compensation(
    "charge",
    || async { Ok(100) },
    |amount| async move { println!("Refunding {amount}"); Ok(()) },
).await?;
Source

pub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>( &mut self, name: &str, options: StepOptions, forward_fn: F, compensate_fn: G, ) -> Result<Result<T, E>, DurableError>
where T: Serialize + DeserializeOwned + Send + 'static, E: Serialize + DeserializeOwned + Send + 'static, F: FnOnce() -> Fut + Send + 'static, Fut: Future<Output = Result<T, E>> + Send + 'static, G: FnOnce(T) -> GFut + Send + 'static, GFut: Future<Output = Result<(), DurableError>> + Send + 'static,

Register a compensatable step with options.

Like step_with_compensation but accepts StepOptions for configuring retries, backoff, and timeouts on the forward step.

§Examples
use durable_lambda_builder::prelude::*;

let result: Result<String, String> = ctx.step_with_compensation_opts(
    "book_hotel",
    StepOptions::new().retries(3),
    || async { Ok("BOOKING-123".to_string()) },
    |booking_id| async move { println!("Cancelling: {booking_id}"); Ok(()) },
).await?;
Source

pub async fn run_compensations( &mut self, ) -> Result<CompensationResult, DurableError>

Execute all registered compensations in reverse registration order.

§Examples
let result = ctx.run_compensations().await?;
if !result.all_succeeded {
    eprintln!("Some compensations failed");
}
Source

pub fn execution_mode(&self) -> ExecutionMode

Return the current execution mode (Replaying or Executing).

§Examples
use durable_lambda_core::types::ExecutionMode;
match ctx.execution_mode() {
    ExecutionMode::Replaying => { /* returning cached results */ }
    ExecutionMode::Executing => { /* running new operations */ }
}
Source

pub fn is_replaying(&self) -> bool

Return whether the context is currently replaying from history.

§Examples
if ctx.is_replaying() {
    println!("Replaying cached operations");
}
Source

pub fn arn(&self) -> &str

Return a reference to the durable execution ARN.

§Examples
println!("Execution ARN: {}", ctx.arn());
Source

pub fn checkpoint_token(&self) -> &str

Return the current checkpoint token.

§Examples
let token = ctx.checkpoint_token();
Source

pub fn log(&self, message: &str)

Emit a replay-safe info-level log message.

During execution mode, emits via tracing::info! with execution context enrichment. During replay mode, the call is a no-op.

§Examples
ctx.log("Order processing started");
Source

pub fn log_with_data(&self, message: &str, data: &Value)

Emit a replay-safe info-level log message with structured data.

§Examples
ctx.log_with_data("Order processed", &serde_json::json!({"order_id": 42}));
Source

pub fn log_debug(&self, message: &str)

Emit a replay-safe debug-level log message.

§Examples
ctx.log_debug("Validating order fields");
Source

pub fn log_warn(&self, message: &str)

Emit a replay-safe warn-level log message.

§Examples
ctx.log_warn("Inventory below threshold");
Source

pub fn log_error(&self, message: &str)

Emit a replay-safe error-level log message.

§Examples
ctx.log_error("Payment gateway timeout");
Source

pub fn log_debug_with_data(&self, message: &str, data: &Value)

Emit a replay-safe debug-level log message with structured data.

§Examples
ctx.log_debug_with_data("Request details", &serde_json::json!({"method": "POST"}));
Source

pub fn log_warn_with_data(&self, message: &str, data: &Value)

Emit a replay-safe warn-level log message with structured data.

§Examples
ctx.log_warn_with_data("Retry attempt", &serde_json::json!({"attempt": 3}));
Source

pub fn log_error_with_data(&self, message: &str, data: &Value)

Emit a replay-safe error-level log message with structured data.

§Examples
ctx.log_error_with_data("Payment failed", &serde_json::json!({"error": "timeout"}));

Trait Implementations§

Source§

impl DurableContextOps for BuilderContext

Source§

fn step<T, E, F, Fut>( &mut self, name: &str, f: F, ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where T: Serialize + DeserializeOwned + Send + 'static, E: Serialize + DeserializeOwned + Send + 'static, F: FnOnce() -> Fut + Send + 'static, Fut: Future<Output = Result<T, E>> + Send + 'static,

Execute a named step with checkpointing. Read more
Source§

fn step_with_options<T, E, F, Fut>( &mut self, name: &str, options: StepOptions, f: F, ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where T: Serialize + DeserializeOwned + Send + 'static, E: Serialize + DeserializeOwned + Send + 'static, F: FnOnce() -> Fut + Send + 'static, Fut: Future<Output = Result<T, E>> + Send + 'static,

Execute a named step with checkpointing and retry configuration. Read more
Source§

fn wait( &mut self, name: &str, duration_secs: i32, ) -> impl Future<Output = Result<(), DurableError>> + Send

Suspend execution for the specified duration. Read more
Source§

fn create_callback( &mut self, name: &str, options: CallbackOptions, ) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send

Register a callback and return a handle with the server-generated callback ID. Read more
Source§

fn invoke<T, P>( &mut self, name: &str, function_name: &str, payload: &P, ) -> impl Future<Output = Result<T, DurableError>> + Send

Durably invoke another Lambda function and return its result. Read more
Source§

fn parallel<T, F, Fut>( &mut self, name: &str, branches: Vec<F>, options: ParallelOptions, ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
where T: Serialize + DeserializeOwned + Send + 'static, F: FnOnce(DurableContext) -> Fut + Send + 'static, Fut: Future<Output = Result<T, DurableError>> + Send + 'static,

Execute multiple branches concurrently and return their results. Read more
Source§

fn child_context<T, F, Fut>( &mut self, name: &str, f: F, ) -> impl Future<Output = Result<T, DurableError>> + Send
where T: Serialize + DeserializeOwned + Send, F: FnOnce(DurableContext) -> Fut + Send, Fut: Future<Output = Result<T, DurableError>> + Send,

Execute an isolated subflow with its own checkpoint namespace. Read more
Source§

fn map<T, I, F, Fut>( &mut self, name: &str, items: Vec<I>, options: MapOptions, f: F, ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
where T: Serialize + DeserializeOwned + Send + 'static, I: Send + 'static, F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone, Fut: Future<Output = Result<T, DurableError>> + Send + 'static,

Process a collection of items in parallel and return their results. Read more
Source§

fn step_with_compensation<T, E, F, Fut, G, GFut>( &mut self, name: &str, forward_fn: F, compensate_fn: G, ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where T: Serialize + DeserializeOwned + Send + 'static, E: Serialize + DeserializeOwned + Send + 'static, F: FnOnce() -> Fut + Send + 'static, Fut: Future<Output = Result<T, E>> + Send + 'static, G: FnOnce(T) -> GFut + Send + 'static, GFut: Future<Output = Result<(), DurableError>> + Send + 'static,

Execute a forward step and register a compensation closure on success. Read more
Source§

fn step_with_compensation_opts<T, E, F, Fut, G, GFut>( &mut self, name: &str, options: StepOptions, forward_fn: F, compensate_fn: G, ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
where T: Serialize + DeserializeOwned + Send + 'static, E: Serialize + DeserializeOwned + Send + 'static, F: FnOnce() -> Fut + Send + 'static, Fut: Future<Output = Result<T, E>> + Send + 'static, G: FnOnce(T) -> GFut + Send + 'static, GFut: Future<Output = Result<(), DurableError>> + Send + 'static,

Execute a forward step (with options) and register a compensation closure on success. Read more
Source§

fn run_compensations( &mut self, ) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send

Execute all registered compensations in reverse registration order. Read more
Source§

fn callback_result<T: DeserializeOwned>( &self, handle: &CallbackHandle, ) -> Result<T, DurableError>

Check the result of a previously created callback. Read more
Source§

fn execution_mode(&self) -> ExecutionMode

Return the current execution mode (Replaying or Executing).
Source§

fn is_replaying(&self) -> bool

Return whether the context is currently replaying from history.
Source§

fn arn(&self) -> &str

Return a reference to the durable execution ARN.
Source§

fn checkpoint_token(&self) -> &str

Return the current checkpoint token.
Source§

fn log(&self, message: &str)

Emit a replay-safe info-level log message.
Source§

fn log_with_data(&self, message: &str, data: &Value)

Emit a replay-safe info-level log message with structured data.
Source§

fn log_debug(&self, message: &str)

Emit a replay-safe debug-level log message.
Source§

fn log_warn(&self, message: &str)

Emit a replay-safe warn-level log message.
Source§

fn log_error(&self, message: &str)

Emit a replay-safe error-level log message.
Source§

fn log_debug_with_data(&self, message: &str, data: &Value)

Emit a replay-safe debug-level log message with structured data.
Source§

fn log_warn_with_data(&self, message: &str, data: &Value)

Emit a replay-safe warn-level log message with structured data.
Source§

fn log_error_with_data(&self, message: &str, data: &Value)

Emit a replay-safe error-level log message with structured data.
Source§

fn enable_batch_mode(&mut self)

Enable batch checkpoint mode. Read more
Source§

fn flush_batch( &mut self, ) -> impl Future<Output = Result<(), DurableError>> + Send

Flush accumulated batch checkpoint updates. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

Source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more