Skip to main content

durable_lambda_builder/
context.rs

1//! Builder-specific context wrapper.
2//!
3//! Thin wrapper over [`DurableContext`](durable_lambda_core::DurableContext)
4//! providing builder-approach ergonomics. All operations delegate directly
5//! to the inner context with no additional logic.
6
7use std::future::Future;
8
9use durable_lambda_core::context::DurableContext;
10use durable_lambda_core::error::DurableError;
11use durable_lambda_core::ops_trait::DurableContextOps;
12use durable_lambda_core::types::{
13    BatchResult, CallbackHandle, CallbackOptions, CompensationResult, ExecutionMode, MapOptions,
14    ParallelOptions, StepOptions,
15};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18
19/// Builder-pattern context for durable Lambda operations.
20///
21/// Thin wrapper over [`DurableContext`] providing the builder-approach API.
22/// All operations delegate directly to the inner context — no replay logic,
23/// no checkpoint logic, just delegation.
24///
25/// Constructed internally by [`DurableHandlerBuilder::run`](crate::DurableHandlerBuilder::run)
26/// — users never create this directly.
27///
28/// # Examples
29///
30/// ```no_run
31/// use durable_lambda_builder::prelude::*;
32///
33/// #[tokio::main]
34/// async fn main() -> Result<(), lambda_runtime::Error> {
35///     durable_lambda_builder::handler(|event: serde_json::Value, mut ctx: BuilderContext| async move {
36///         let result: Result<i32, String> = ctx.step("validate", || async {
37///             Ok(42)
38///         }).await?;
39///         Ok(serde_json::json!({"validated": result.unwrap()}))
40///     })
41///     .run()
42///     .await
43/// }
44/// ```
45pub struct BuilderContext {
46    inner: DurableContext,
47}
48
49impl BuilderContext {
50    /// Create from an existing `DurableContext`.
51    ///
52    /// This constructor is `pub(crate)` — only the
53    /// [`DurableHandlerBuilder::run`](crate::DurableHandlerBuilder::run)
54    /// method creates `BuilderContext` instances.
55    pub(crate) fn new(ctx: DurableContext) -> Self {
56        Self { inner: ctx }
57    }
58
59    /// Execute a named step with checkpointing.
60    ///
61    /// During execution mode, runs the closure and checkpoints the result to AWS.
62    /// During replay mode, returns the previously checkpointed result without
63    /// executing the closure.
64    ///
65    /// # Arguments
66    ///
67    /// * `name` — Human-readable step name, used as checkpoint metadata
68    /// * `f` — Closure to execute (skipped during replay)
69    ///
70    /// # Errors
71    ///
72    /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
73    /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
74    /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
75    ///
76    /// # Examples
77    ///
78    /// ```no_run
79    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
80    /// let result: Result<i32, String> = ctx.step("validate_order", || async {
81    ///     Ok(42)
82    /// }).await?;
83    ///
84    /// match result {
85    ///     Ok(value) => println!("Step succeeded: {value}"),
86    ///     Err(e) => println!("Step failed: {e}"),
87    /// }
88    /// # Ok(())
89    /// # }
90    /// ```
91    pub async fn step<T, E, F, Fut>(
92        &mut self,
93        name: &str,
94        f: F,
95    ) -> Result<Result<T, E>, DurableError>
96    where
97        T: Serialize + DeserializeOwned + Send + 'static,
98        E: Serialize + DeserializeOwned + Send + 'static,
99        F: FnOnce() -> Fut + Send + 'static,
100        Fut: Future<Output = Result<T, E>> + Send + 'static,
101    {
102        self.inner.step(name, f).await
103    }
104
105    /// Execute a named step with checkpointing and retry configuration.
106    ///
107    /// If the closure fails and retries are configured, sends a RETRY checkpoint
108    /// and returns [`DurableError::StepRetryScheduled`] to signal the function
109    /// should exit.
110    ///
111    /// # Arguments
112    ///
113    /// * `name` — Human-readable step name, used as checkpoint metadata
114    /// * `options` — Retry configuration (see [`StepOptions`])
115    /// * `f` — Closure to execute (skipped during replay)
116    ///
117    /// # Errors
118    ///
119    /// Returns [`DurableError::StepRetryScheduled`] when a retry has been scheduled.
120    /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
121    /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
122    ///
123    /// # Examples
124    ///
125    /// ```no_run
126    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
127    /// use durable_lambda_builder::prelude::*;
128    ///
129    /// let result: Result<i32, String> = ctx.step_with_options(
130    ///     "charge_payment",
131    ///     StepOptions::new().retries(3).backoff_seconds(5),
132    ///     || async { Ok(100) },
133    /// ).await?;
134    /// # Ok(())
135    /// # }
136    /// ```
137    pub async fn step_with_options<T, E, F, Fut>(
138        &mut self,
139        name: &str,
140        options: StepOptions,
141        f: F,
142    ) -> Result<Result<T, E>, DurableError>
143    where
144        T: Serialize + DeserializeOwned + Send + 'static,
145        E: Serialize + DeserializeOwned + Send + 'static,
146        F: FnOnce() -> Fut + Send + 'static,
147        Fut: Future<Output = Result<T, E>> + Send + 'static,
148    {
149        self.inner.step_with_options(name, options, f).await
150    }
151
152    /// Suspend execution for the specified duration.
153    ///
154    /// During execution mode, sends a START checkpoint and returns
155    /// [`DurableError::WaitSuspended`] to signal the function should exit.
156    /// The server re-invokes after the duration.
157    ///
158    /// During replay mode, returns `Ok(())` immediately if the wait has
159    /// already completed.
160    ///
161    /// # Arguments
162    ///
163    /// * `name` — Human-readable name for the wait operation
164    /// * `duration_secs` — Duration to wait in seconds
165    ///
166    /// # Errors
167    ///
168    /// Returns [`DurableError::WaitSuspended`] when the wait has been checkpointed.
169    ///
170    /// # Examples
171    ///
172    /// ```no_run
173    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
174    /// ctx.wait("cooldown", 30).await?;
175    /// println!("Wait completed!");
176    /// # Ok(())
177    /// # }
178    /// ```
179    pub async fn wait(&mut self, name: &str, duration_secs: i32) -> Result<(), DurableError> {
180        self.inner.wait(name, duration_secs).await
181    }
182
183    /// Register a callback and return a handle with the server-generated callback ID.
184    ///
185    /// During execution mode, sends a START checkpoint and returns a
186    /// [`CallbackHandle`] containing the `callback_id` for external systems.
187    /// During replay mode, extracts the cached callback_id from history.
188    ///
189    /// This method NEVER suspends. Use [`callback_result`](Self::callback_result)
190    /// to check the callback outcome (which suspends if not yet signaled).
191    ///
192    /// # Arguments
193    ///
194    /// * `name` — Human-readable name for the callback operation
195    /// * `options` — Timeout configuration (see [`CallbackOptions`])
196    ///
197    /// # Errors
198    ///
199    /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
200    ///
201    /// # Examples
202    ///
203    /// ```no_run
204    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
205    /// use durable_lambda_builder::prelude::*;
206    ///
207    /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
208    /// println!("Callback ID: {}", handle.callback_id);
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub async fn create_callback(
213        &mut self,
214        name: &str,
215        options: CallbackOptions,
216    ) -> Result<CallbackHandle, DurableError> {
217        self.inner.create_callback(name, options).await
218    }
219
220    /// Check the result of a previously created callback.
221    ///
222    /// Return the deserialized success payload if the callback has been
223    /// signaled. Return an error if the callback failed, timed out, or
224    /// hasn't been signaled yet.
225    ///
226    /// # Arguments
227    ///
228    /// * `handle` — The [`CallbackHandle`] returned by [`create_callback`](Self::create_callback)
229    ///
230    /// # Errors
231    ///
232    /// Returns [`DurableError::CallbackSuspended`] if not yet signaled.
233    /// Returns [`DurableError::CallbackFailed`] if the callback failed or timed out.
234    ///
235    /// # Examples
236    ///
237    /// ```no_run
238    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
239    /// use durable_lambda_builder::prelude::*;
240    ///
241    /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
242    /// let result: String = ctx.callback_result(&handle)?;
243    /// # Ok(())
244    /// # }
245    /// ```
246    pub fn callback_result<T: DeserializeOwned>(
247        &self,
248        handle: &CallbackHandle,
249    ) -> Result<T, DurableError> {
250        self.inner.callback_result(handle)
251    }
252
253    /// Durably invoke another Lambda function and return its result.
254    ///
255    /// During execution mode, serializes the payload, sends a START checkpoint,
256    /// and returns [`DurableError::InvokeSuspended`] to signal exit. The server
257    /// invokes the target asynchronously and re-invokes this Lambda when done.
258    ///
259    /// During replay, returns the cached result without re-invoking.
260    ///
261    /// # Arguments
262    ///
263    /// * `name` — Human-readable name for the invoke operation
264    /// * `function_name` — Name or ARN of the target Lambda function
265    /// * `payload` — Input payload to send to the target function
266    ///
267    /// # Errors
268    ///
269    /// Returns [`DurableError::InvokeSuspended`] when the target is still executing.
270    /// Returns [`DurableError::InvokeFailed`] if the target failed or timed out.
271    ///
272    /// # Examples
273    ///
274    /// ```no_run
275    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
276    /// let result: String = ctx.invoke(
277    ///     "call_processor",
278    ///     "payment-processor-lambda",
279    ///     &serde_json::json!({"order_id": 123}),
280    /// ).await?;
281    /// # Ok(())
282    /// # }
283    /// ```
284    pub async fn invoke<T, P>(
285        &mut self,
286        name: &str,
287        function_name: &str,
288        payload: &P,
289    ) -> Result<T, DurableError>
290    where
291        T: DeserializeOwned,
292        P: Serialize,
293    {
294        self.inner.invoke(name, function_name, payload).await
295    }
296
297    /// Execute multiple branches concurrently and return their results.
298    ///
299    /// Each branch receives an owned child context with an isolated checkpoint
300    /// namespace. Branches satisfy `Send + 'static` via `tokio::spawn`.
301    ///
302    /// # Arguments
303    ///
304    /// * `name` — Human-readable name for the parallel operation
305    /// * `branches` — Collection of branch closures
306    /// * `options` — Parallel configuration
307    ///
308    /// # Errors
309    ///
310    /// Returns [`DurableError::ParallelFailed`] if the operation fails.
311    ///
312    /// # Examples
313    ///
314    /// ```no_run
315    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
316    /// use durable_lambda_builder::prelude::*;
317    /// use durable_lambda_core::context::DurableContext;
318    /// use std::pin::Pin;
319    /// use std::future::Future;
320    ///
321    /// type BranchFn = Box<dyn FnOnce(DurableContext) -> Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>> + Send>;
322    ///
323    /// let branches: Vec<BranchFn> = vec![
324    ///     Box::new(|_ctx| Box::pin(async move { Ok(1) })),
325    ///     Box::new(|_ctx| Box::pin(async move { Ok(2) })),
326    /// ];
327    /// let result = ctx.parallel("fan_out", branches, ParallelOptions::new()).await?;
328    /// # Ok(())
329    /// # }
330    /// ```
331    pub async fn parallel<T, F, Fut>(
332        &mut self,
333        name: &str,
334        branches: Vec<F>,
335        options: ParallelOptions,
336    ) -> Result<BatchResult<T>, DurableError>
337    where
338        T: Serialize + DeserializeOwned + Send + 'static,
339        F: FnOnce(DurableContext) -> Fut + Send + 'static,
340        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
341    {
342        self.inner.parallel(name, branches, options).await
343    }
344
345    /// Execute an isolated subflow with its own checkpoint namespace.
346    ///
347    /// The closure receives an owned child [`DurableContext`] whose operations
348    /// are namespaced under this child context's operation ID, preventing
349    /// collisions with the parent or sibling child contexts.
350    ///
351    /// During replay mode, returns the cached result without re-executing
352    /// the closure.
353    ///
354    /// # Arguments
355    ///
356    /// * `name` — Human-readable name for the child context operation
357    /// * `f` — Closure receiving an owned `DurableContext` for the subflow
358    ///
359    /// # Errors
360    ///
361    /// Returns [`DurableError::ChildContextFailed`] if the child context
362    /// is found in a failed state during replay.
363    /// Returns [`DurableError::CheckpointFailed`] if checkpoint API calls fail.
364    ///
365    /// # Examples
366    ///
367    /// ```no_run
368    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
369    /// use durable_lambda_core::context::DurableContext;
370    ///
371    /// let result: i32 = ctx.child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
372    ///     let r: Result<i32, String> = child_ctx.step("inner_step", || async { Ok(42) }).await?;
373    ///     Ok(r.unwrap())
374    /// }).await?;
375    /// assert_eq!(result, 42);
376    /// # Ok(())
377    /// # }
378    /// ```
379    pub async fn child_context<T, F, Fut>(&mut self, name: &str, f: F) -> Result<T, DurableError>
380    where
381        T: Serialize + DeserializeOwned + Send,
382        F: FnOnce(DurableContext) -> Fut + Send,
383        Fut: Future<Output = Result<T, DurableError>> + Send,
384    {
385        self.inner.child_context(name, f).await
386    }
387
388    /// Process a collection of items in parallel and return their results.
389    ///
390    /// Apply the closure `f` to each item concurrently. Each item receives an
391    /// owned child context with an isolated checkpoint namespace. Items satisfy
392    /// `Send + 'static` via `tokio::spawn`. The closure must be `Clone` since
393    /// it is applied to each item independently.
394    ///
395    /// When `batch_size` is configured, items process in sequential batches.
396    ///
397    /// # Arguments
398    ///
399    /// * `name` — Human-readable name for the map operation
400    /// * `items` — Collection of items to process
401    /// * `options` — Map configuration (batching)
402    /// * `f` — Closure applied to each item with an owned child context
403    ///
404    /// # Errors
405    ///
406    /// Returns [`DurableError::MapFailed`] if the operation fails.
407    ///
408    /// # Examples
409    ///
410    /// ```no_run
411    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
412    /// use durable_lambda_builder::prelude::*;
413    /// use durable_lambda_core::context::DurableContext;
414    ///
415    /// let items = vec![1, 2, 3];
416    /// let result = ctx.map(
417    ///     "process_items",
418    ///     items,
419    ///     MapOptions::new().batch_size(2),
420    ///     |item: i32, mut child_ctx: DurableContext| async move {
421    ///         let r: Result<i32, String> = child_ctx.step("double", move || async move { Ok(item * 2) }).await?;
422    ///         Ok(r.unwrap())
423    ///     },
424    /// ).await?;
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn map<T, I, F, Fut>(
429        &mut self,
430        name: &str,
431        items: Vec<I>,
432        options: MapOptions,
433        f: F,
434    ) -> Result<BatchResult<T>, DurableError>
435    where
436        T: Serialize + DeserializeOwned + Send + 'static,
437        I: Send + 'static,
438        F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
439        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
440    {
441        self.inner.map(name, items, options, f).await
442    }
443
444    /// Register a compensatable step.
445    ///
446    /// Executes the forward step and, on success, registers the compensation
447    /// closure for later rollback via [`run_compensations`](Self::run_compensations).
448    ///
449    /// # Examples
450    ///
451    /// ```no_run
452    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
453    /// let result: Result<i32, String> = ctx.step_with_compensation(
454    ///     "charge",
455    ///     || async { Ok(100) },
456    ///     |amount| async move { println!("Refunding {amount}"); Ok(()) },
457    /// ).await?;
458    /// # Ok(())
459    /// # }
460    /// ```
461    pub async fn step_with_compensation<T, E, F, Fut, G, GFut>(
462        &mut self,
463        name: &str,
464        forward_fn: F,
465        compensate_fn: G,
466    ) -> Result<Result<T, E>, DurableError>
467    where
468        T: Serialize + DeserializeOwned + Send + 'static,
469        E: Serialize + DeserializeOwned + Send + 'static,
470        F: FnOnce() -> Fut + Send + 'static,
471        Fut: Future<Output = Result<T, E>> + Send + 'static,
472        G: FnOnce(T) -> GFut + Send + 'static,
473        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
474    {
475        self.inner
476            .step_with_compensation(name, forward_fn, compensate_fn)
477            .await
478    }
479
480    /// Register a compensatable step with options.
481    ///
482    /// Like [`step_with_compensation`](Self::step_with_compensation) but accepts
483    /// [`StepOptions`] for configuring retries, backoff, and timeouts on the forward step.
484    ///
485    /// # Examples
486    ///
487    /// ```no_run
488    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
489    /// use durable_lambda_builder::prelude::*;
490    ///
491    /// let result: Result<String, String> = ctx.step_with_compensation_opts(
492    ///     "book_hotel",
493    ///     StepOptions::new().retries(3),
494    ///     || async { Ok("BOOKING-123".to_string()) },
495    ///     |booking_id| async move { println!("Cancelling: {booking_id}"); Ok(()) },
496    /// ).await?;
497    /// # Ok(())
498    /// # }
499    /// ```
500    pub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
501        &mut self,
502        name: &str,
503        options: StepOptions,
504        forward_fn: F,
505        compensate_fn: G,
506    ) -> Result<Result<T, E>, DurableError>
507    where
508        T: Serialize + DeserializeOwned + Send + 'static,
509        E: Serialize + DeserializeOwned + Send + 'static,
510        F: FnOnce() -> Fut + Send + 'static,
511        Fut: Future<Output = Result<T, E>> + Send + 'static,
512        G: FnOnce(T) -> GFut + Send + 'static,
513        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
514    {
515        self.inner
516            .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
517            .await
518    }
519
520    /// Execute all registered compensations in reverse registration order.
521    ///
522    /// # Examples
523    ///
524    /// ```no_run
525    /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
526    /// let result = ctx.run_compensations().await?;
527    /// if !result.all_succeeded {
528    ///     eprintln!("Some compensations failed");
529    /// }
530    /// # Ok(())
531    /// # }
532    /// ```
533    pub async fn run_compensations(&mut self) -> Result<CompensationResult, DurableError> {
534        self.inner.run_compensations().await
535    }
536
537    /// Return the current execution mode (Replaying or Executing).
538    ///
539    /// # Examples
540    ///
541    /// ```no_run
542    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
543    /// use durable_lambda_core::types::ExecutionMode;
544    /// match ctx.execution_mode() {
545    ///     ExecutionMode::Replaying => { /* returning cached results */ }
546    ///     ExecutionMode::Executing => { /* running new operations */ }
547    /// }
548    /// # }
549    /// ```
550    pub fn execution_mode(&self) -> ExecutionMode {
551        self.inner.execution_mode()
552    }
553
554    /// Return whether the context is currently replaying from history.
555    ///
556    /// # Examples
557    ///
558    /// ```no_run
559    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
560    /// if ctx.is_replaying() {
561    ///     println!("Replaying cached operations");
562    /// }
563    /// # }
564    /// ```
565    pub fn is_replaying(&self) -> bool {
566        self.inner.is_replaying()
567    }
568
569    /// Return a reference to the durable execution ARN.
570    ///
571    /// # Examples
572    ///
573    /// ```no_run
574    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
575    /// println!("Execution ARN: {}", ctx.arn());
576    /// # }
577    /// ```
578    pub fn arn(&self) -> &str {
579        self.inner.arn()
580    }
581
582    /// Return the current checkpoint token.
583    ///
584    /// # Examples
585    ///
586    /// ```no_run
587    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
588    /// let token = ctx.checkpoint_token();
589    /// # }
590    /// ```
591    pub fn checkpoint_token(&self) -> &str {
592        self.inner.checkpoint_token()
593    }
594
595    /// Emit a replay-safe info-level log message.
596    ///
597    /// During execution mode, emits via `tracing::info!` with execution
598    /// context enrichment. During replay mode, the call is a no-op.
599    ///
600    /// # Examples
601    ///
602    /// ```no_run
603    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
604    /// ctx.log("Order processing started");
605    /// # }
606    /// ```
607    pub fn log(&self, message: &str) {
608        self.inner.log(message);
609    }
610
611    /// Emit a replay-safe info-level log message with structured data.
612    ///
613    /// # Examples
614    ///
615    /// ```no_run
616    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
617    /// ctx.log_with_data("Order processed", &serde_json::json!({"order_id": 42}));
618    /// # }
619    /// ```
620    pub fn log_with_data(&self, message: &str, data: &serde_json::Value) {
621        self.inner.log_with_data(message, data);
622    }
623
624    /// Emit a replay-safe debug-level log message.
625    ///
626    /// # Examples
627    ///
628    /// ```no_run
629    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
630    /// ctx.log_debug("Validating order fields");
631    /// # }
632    /// ```
633    pub fn log_debug(&self, message: &str) {
634        self.inner.log_debug(message);
635    }
636
637    /// Emit a replay-safe warn-level log message.
638    ///
639    /// # Examples
640    ///
641    /// ```no_run
642    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
643    /// ctx.log_warn("Inventory below threshold");
644    /// # }
645    /// ```
646    pub fn log_warn(&self, message: &str) {
647        self.inner.log_warn(message);
648    }
649
650    /// Emit a replay-safe error-level log message.
651    ///
652    /// # Examples
653    ///
654    /// ```no_run
655    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
656    /// ctx.log_error("Payment gateway timeout");
657    /// # }
658    /// ```
659    pub fn log_error(&self, message: &str) {
660        self.inner.log_error(message);
661    }
662
663    /// Emit a replay-safe debug-level log message with structured data.
664    ///
665    /// # Examples
666    ///
667    /// ```no_run
668    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
669    /// ctx.log_debug_with_data("Request details", &serde_json::json!({"method": "POST"}));
670    /// # }
671    /// ```
672    pub fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
673        self.inner.log_debug_with_data(message, data);
674    }
675
676    /// Emit a replay-safe warn-level log message with structured data.
677    ///
678    /// # Examples
679    ///
680    /// ```no_run
681    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
682    /// ctx.log_warn_with_data("Retry attempt", &serde_json::json!({"attempt": 3}));
683    /// # }
684    /// ```
685    pub fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
686        self.inner.log_warn_with_data(message, data);
687    }
688
689    /// Emit a replay-safe error-level log message with structured data.
690    ///
691    /// # Examples
692    ///
693    /// ```no_run
694    /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
695    /// ctx.log_error_with_data("Payment failed", &serde_json::json!({"error": "timeout"}));
696    /// # }
697    /// ```
698    pub fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
699        self.inner.log_error_with_data(message, data);
700    }
701}
702
703impl DurableContextOps for BuilderContext {
704    fn step<T, E, F, Fut>(
705        &mut self,
706        name: &str,
707        f: F,
708    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
709    where
710        T: Serialize + DeserializeOwned + Send + 'static,
711        E: Serialize + DeserializeOwned + Send + 'static,
712        F: FnOnce() -> Fut + Send + 'static,
713        Fut: Future<Output = Result<T, E>> + Send + 'static,
714    {
715        self.inner.step(name, f)
716    }
717
718    fn step_with_options<T, E, F, Fut>(
719        &mut self,
720        name: &str,
721        options: StepOptions,
722        f: F,
723    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
724    where
725        T: Serialize + DeserializeOwned + Send + 'static,
726        E: Serialize + DeserializeOwned + Send + 'static,
727        F: FnOnce() -> Fut + Send + 'static,
728        Fut: Future<Output = Result<T, E>> + Send + 'static,
729    {
730        self.inner.step_with_options(name, options, f)
731    }
732
733    fn wait(
734        &mut self,
735        name: &str,
736        duration_secs: i32,
737    ) -> impl Future<Output = Result<(), DurableError>> + Send {
738        self.inner.wait(name, duration_secs)
739    }
740
741    fn create_callback(
742        &mut self,
743        name: &str,
744        options: CallbackOptions,
745    ) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send {
746        self.inner.create_callback(name, options)
747    }
748
749    fn invoke<T, P>(
750        &mut self,
751        name: &str,
752        function_name: &str,
753        payload: &P,
754    ) -> impl Future<Output = Result<T, DurableError>> + Send
755    where
756        T: DeserializeOwned + Send,
757        P: Serialize + Sync,
758    {
759        self.inner.invoke(name, function_name, payload)
760    }
761
762    fn parallel<T, F, Fut>(
763        &mut self,
764        name: &str,
765        branches: Vec<F>,
766        options: ParallelOptions,
767    ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
768    where
769        T: Serialize + DeserializeOwned + Send + 'static,
770        F: FnOnce(DurableContext) -> Fut + Send + 'static,
771        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
772    {
773        self.inner.parallel(name, branches, options)
774    }
775
776    fn child_context<T, F, Fut>(
777        &mut self,
778        name: &str,
779        f: F,
780    ) -> impl Future<Output = Result<T, DurableError>> + Send
781    where
782        T: Serialize + DeserializeOwned + Send,
783        F: FnOnce(DurableContext) -> Fut + Send,
784        Fut: Future<Output = Result<T, DurableError>> + Send,
785    {
786        self.inner.child_context(name, f)
787    }
788
789    fn map<T, I, F, Fut>(
790        &mut self,
791        name: &str,
792        items: Vec<I>,
793        options: MapOptions,
794        f: F,
795    ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
796    where
797        T: Serialize + DeserializeOwned + Send + 'static,
798        I: Send + 'static,
799        F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
800        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
801    {
802        self.inner.map(name, items, options, f)
803    }
804
805    fn step_with_compensation<T, E, F, Fut, G, GFut>(
806        &mut self,
807        name: &str,
808        forward_fn: F,
809        compensate_fn: G,
810    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
811    where
812        T: Serialize + DeserializeOwned + Send + 'static,
813        E: Serialize + DeserializeOwned + Send + 'static,
814        F: FnOnce() -> Fut + Send + 'static,
815        Fut: Future<Output = Result<T, E>> + Send + 'static,
816        G: FnOnce(T) -> GFut + Send + 'static,
817        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
818    {
819        self.inner
820            .step_with_compensation(name, forward_fn, compensate_fn)
821    }
822
823    fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
824        &mut self,
825        name: &str,
826        options: StepOptions,
827        forward_fn: F,
828        compensate_fn: G,
829    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
830    where
831        T: Serialize + DeserializeOwned + Send + 'static,
832        E: Serialize + DeserializeOwned + Send + 'static,
833        F: FnOnce() -> Fut + Send + 'static,
834        Fut: Future<Output = Result<T, E>> + Send + 'static,
835        G: FnOnce(T) -> GFut + Send + 'static,
836        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
837    {
838        self.inner
839            .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
840    }
841
842    fn run_compensations(
843        &mut self,
844    ) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send {
845        self.inner.run_compensations()
846    }
847
848    fn callback_result<T: DeserializeOwned>(
849        &self,
850        handle: &CallbackHandle,
851    ) -> Result<T, DurableError> {
852        self.inner.callback_result(handle)
853    }
854
855    fn execution_mode(&self) -> ExecutionMode {
856        self.inner.execution_mode()
857    }
858
859    fn is_replaying(&self) -> bool {
860        self.inner.is_replaying()
861    }
862
863    fn arn(&self) -> &str {
864        self.inner.arn()
865    }
866
867    fn checkpoint_token(&self) -> &str {
868        self.inner.checkpoint_token()
869    }
870
871    fn log(&self, message: &str) {
872        self.inner.log(message);
873    }
874
875    fn log_with_data(&self, message: &str, data: &serde_json::Value) {
876        self.inner.log_with_data(message, data);
877    }
878
879    fn log_debug(&self, message: &str) {
880        self.inner.log_debug(message);
881    }
882
883    fn log_warn(&self, message: &str) {
884        self.inner.log_warn(message);
885    }
886
887    fn log_error(&self, message: &str) {
888        self.inner.log_error(message);
889    }
890
891    fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
892        self.inner.log_debug_with_data(message, data);
893    }
894
895    fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
896        self.inner.log_warn_with_data(message, data);
897    }
898
899    fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
900        self.inner.log_error_with_data(message, data);
901    }
902
903    fn enable_batch_mode(&mut self) {
904        self.inner.enable_batch_mode();
905    }
906
907    fn flush_batch(&mut self) -> impl Future<Output = Result<(), DurableError>> + Send {
908        self.inner.flush_batch()
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915    use aws_sdk_lambda::operation::checkpoint_durable_execution::CheckpointDurableExecutionOutput;
916    use aws_sdk_lambda::operation::get_durable_execution_state::GetDurableExecutionStateOutput;
917    use aws_sdk_lambda::types::{
918        Operation, OperationStatus, OperationType, OperationUpdate, StepDetails,
919    };
920    use std::collections::HashMap;
921    use std::sync::{Arc, Mutex};
922
923    use durable_lambda_core::backend::DurableBackend;
924
925    /// Mock backend for testing BuilderContext delegation.
926    struct MockBackend {
927        operations: HashMap<String, Operation>,
928        checkpoint_token: Mutex<String>,
929    }
930
931    impl MockBackend {
932        fn new() -> Self {
933            Self {
934                operations: HashMap::new(),
935                checkpoint_token: Mutex::new("mock-token".to_string()),
936            }
937        }
938
939        fn with_operation(mut self, id: &str, operation: Operation) -> Self {
940            self.operations.insert(id.to_string(), operation);
941            self
942        }
943    }
944
945    #[async_trait::async_trait]
946    impl DurableBackend for MockBackend {
947        async fn checkpoint(
948            &self,
949            _arn: &str,
950            _checkpoint_token: &str,
951            _updates: Vec<OperationUpdate>,
952            _client_token: Option<&str>,
953        ) -> Result<CheckpointDurableExecutionOutput, DurableError> {
954            let token = self.checkpoint_token.lock().unwrap().clone();
955            Ok(CheckpointDurableExecutionOutput::builder()
956                .checkpoint_token(token)
957                .build())
958        }
959
960        async fn get_execution_state(
961            &self,
962            _arn: &str,
963            _checkpoint_token: &str,
964            _next_marker: &str,
965            _max_items: i32,
966        ) -> Result<GetDurableExecutionStateOutput, DurableError> {
967            Ok(GetDurableExecutionStateOutput::builder()
968                .build()
969                .expect("test: empty execution state"))
970        }
971    }
972
973    async fn make_builder_context(backend: MockBackend) -> BuilderContext {
974        let ctx = DurableContext::new(
975            Arc::new(backend),
976            "arn:test".to_string(),
977            "tok".to_string(),
978            vec![],
979            None,
980        )
981        .await
982        .unwrap();
983        BuilderContext::new(ctx)
984    }
985
986    fn make_succeeded_op(id: &str, result_json: &str) -> Operation {
987        Operation::builder()
988            .id(id)
989            .r#type(OperationType::Step)
990            .status(OperationStatus::Succeeded)
991            .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
992            .step_details(StepDetails::builder().result(result_json).build())
993            .build()
994            .unwrap_or_else(|e| panic!("failed to build test Operation: {e}"))
995    }
996
997    #[tokio::test]
998    async fn test_builder_context_step_delegates_to_core() {
999        let mut ctx = make_builder_context(MockBackend::new()).await;
1000
1001        let result: Result<i32, String> = ctx.step("validate", || async { Ok(42) }).await.unwrap();
1002
1003        assert_eq!(result, Ok(42));
1004    }
1005
1006    #[tokio::test]
1007    async fn test_builder_context_step_with_options_delegates_to_core() {
1008        let mut ctx = make_builder_context(MockBackend::new()).await;
1009
1010        let result: Result<i32, String> = ctx
1011            .step_with_options("charge", StepOptions::new().retries(3), || async {
1012                Ok(100)
1013            })
1014            .await
1015            .unwrap();
1016
1017        assert_eq!(result, Ok(100));
1018    }
1019
1020    #[tokio::test]
1021    async fn test_builder_context_execution_mode_executing() {
1022        let ctx = make_builder_context(MockBackend::new()).await;
1023        assert_eq!(ctx.execution_mode(), ExecutionMode::Executing);
1024        assert!(!ctx.is_replaying());
1025    }
1026
1027    #[tokio::test]
1028    async fn test_builder_context_execution_mode_replaying() {
1029        let op = make_succeeded_op("op-1", "42");
1030        let backend = MockBackend::new().with_operation("op-1", op.clone());
1031
1032        let durable_ctx = DurableContext::new(
1033            Arc::new(backend),
1034            "arn:test".to_string(),
1035            "tok".to_string(),
1036            vec![op],
1037            None,
1038        )
1039        .await
1040        .unwrap();
1041
1042        let ctx = BuilderContext::new(durable_ctx);
1043        assert_eq!(ctx.execution_mode(), ExecutionMode::Replaying);
1044        assert!(ctx.is_replaying());
1045    }
1046
1047    #[tokio::test]
1048    async fn test_builder_context_arn() {
1049        let ctx = make_builder_context(MockBackend::new()).await;
1050        assert_eq!(ctx.arn(), "arn:test");
1051    }
1052
1053    #[tokio::test]
1054    async fn test_builder_context_checkpoint_token() {
1055        let ctx = make_builder_context(MockBackend::new()).await;
1056        assert_eq!(ctx.checkpoint_token(), "tok");
1057    }
1058
1059    #[tokio::test]
1060    async fn test_builder_context_child_context_delegates_to_core() {
1061        let mut ctx = make_builder_context(MockBackend::new()).await;
1062
1063        let result: i32 = ctx
1064            .child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
1065                let r: Result<i32, String> =
1066                    child_ctx.step("inner_step", || async { Ok(42) }).await?;
1067                Ok(r.unwrap())
1068            })
1069            .await
1070            .unwrap();
1071
1072        assert_eq!(result, 42);
1073    }
1074
1075    #[tokio::test]
1076    async fn test_builder_context_log_delegates_to_core() {
1077        let ctx = make_builder_context(MockBackend::new()).await;
1078        // Verify log methods are callable and don't panic.
1079        ctx.log("test message");
1080        ctx.log_with_data("test data", &serde_json::json!({"key": "val"}));
1081        ctx.log_debug("test debug");
1082        ctx.log_warn("test warn");
1083        ctx.log_error("test error");
1084        ctx.log_debug_with_data("debug data", &serde_json::json!({"k": "v"}));
1085        ctx.log_warn_with_data("warn data", &serde_json::json!({"k": "v"}));
1086        ctx.log_error_with_data("error data", &serde_json::json!({"k": "v"}));
1087    }
1088}