Skip to main content

durable_lambda_closure/
context.rs

1//! Closure-specific context wrapper.
2//!
3//! Thin wrapper over [`DurableContext`](durable_lambda_core::DurableContext)
4//! providing closure-approach ergonomics. All operations delegate directly
5//! to the inner context with no additional logic.
6
7use std::future::Future;
8
9use durable_lambda_core::context::DurableContext;
10use durable_lambda_core::error::DurableError;
11use durable_lambda_core::ops_trait::DurableContextOps;
12use durable_lambda_core::types::{
13    BatchResult, CallbackHandle, CallbackOptions, CompensationResult, ExecutionMode, MapOptions,
14    ParallelOptions, StepOptions,
15};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18
19/// Closure-native context for durable Lambda operations.
20///
21/// Thin wrapper over [`DurableContext`] providing the closure-approach API.
22/// All operations delegate directly to the inner context — no replay logic,
23/// no checkpoint logic, just delegation.
24///
25/// Constructed internally by [`run`](crate::run) — users never create this
26/// directly.
27///
28/// # Examples
29///
30/// ```no_run
31/// use durable_lambda_closure::prelude::*;
32///
33/// async fn handler(event: serde_json::Value, mut ctx: ClosureContext) -> Result<serde_json::Value, DurableError> {
34///     let result: Result<i32, String> = ctx.step("validate", || async {
35///         Ok(42)
36///     }).await?;
37///     Ok(serde_json::json!({"validated": result.unwrap()}))
38/// }
39/// ```
40pub struct ClosureContext {
41    inner: DurableContext,
42}
43
44impl ClosureContext {
45    /// Create from an existing `DurableContext`.
46    ///
47    /// This constructor is `pub(crate)` — only the [`run`](crate::run)
48    /// function creates `ClosureContext` instances.
49    pub(crate) fn new(ctx: DurableContext) -> Self {
50        Self { inner: ctx }
51    }
52
53    /// Execute a named step with checkpointing.
54    ///
55    /// During execution mode, runs the closure and checkpoints the result to AWS.
56    /// During replay mode, returns the previously checkpointed result without
57    /// executing the closure.
58    ///
59    /// # Arguments
60    ///
61    /// * `name` — Human-readable step name, used as checkpoint metadata
62    /// * `f` — Closure to execute (skipped during replay)
63    ///
64    /// # Errors
65    ///
66    /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
67    /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
68    /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
69    ///
70    /// # Examples
71    ///
72    /// ```no_run
73    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
74    /// let result: Result<i32, String> = ctx.step("validate_order", || async {
75    ///     Ok(42)
76    /// }).await?;
77    ///
78    /// match result {
79    ///     Ok(value) => println!("Step succeeded: {value}"),
80    ///     Err(e) => println!("Step failed: {e}"),
81    /// }
82    /// # Ok(())
83    /// # }
84    /// ```
85    pub async fn step<T, E, F, Fut>(
86        &mut self,
87        name: &str,
88        f: F,
89    ) -> Result<Result<T, E>, DurableError>
90    where
91        T: Serialize + DeserializeOwned + Send + 'static,
92        E: Serialize + DeserializeOwned + Send + 'static,
93        F: FnOnce() -> Fut + Send + 'static,
94        Fut: Future<Output = Result<T, E>> + Send + 'static,
95    {
96        self.inner.step(name, f).await
97    }
98
99    /// Execute a named step with checkpointing and retry configuration.
100    ///
101    /// If the closure fails and retries are configured, sends a RETRY checkpoint
102    /// and returns [`DurableError::StepRetryScheduled`] to signal the function
103    /// should exit.
104    ///
105    /// # Arguments
106    ///
107    /// * `name` — Human-readable step name, used as checkpoint metadata
108    /// * `options` — Retry configuration (see [`StepOptions`])
109    /// * `f` — Closure to execute (skipped during replay)
110    ///
111    /// # Errors
112    ///
113    /// Returns [`DurableError::StepRetryScheduled`] when a retry has been scheduled.
114    /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
115    /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
116    ///
117    /// # Examples
118    ///
119    /// ```no_run
120    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
121    /// use durable_lambda_closure::prelude::*;
122    ///
123    /// let result: Result<i32, String> = ctx.step_with_options(
124    ///     "charge_payment",
125    ///     StepOptions::new().retries(3).backoff_seconds(5),
126    ///     || async { Ok(100) },
127    /// ).await?;
128    /// # Ok(())
129    /// # }
130    /// ```
131    pub async fn step_with_options<T, E, F, Fut>(
132        &mut self,
133        name: &str,
134        options: StepOptions,
135        f: F,
136    ) -> Result<Result<T, E>, DurableError>
137    where
138        T: Serialize + DeserializeOwned + Send + 'static,
139        E: Serialize + DeserializeOwned + Send + 'static,
140        F: FnOnce() -> Fut + Send + 'static,
141        Fut: Future<Output = Result<T, E>> + Send + 'static,
142    {
143        self.inner.step_with_options(name, options, f).await
144    }
145
146    /// Suspend execution for the specified duration.
147    ///
148    /// During execution mode, sends a START checkpoint and returns
149    /// [`DurableError::WaitSuspended`] to signal the function should exit.
150    /// The server re-invokes after the duration.
151    ///
152    /// During replay mode, returns `Ok(())` immediately if the wait has
153    /// already completed.
154    ///
155    /// # Arguments
156    ///
157    /// * `name` — Human-readable name for the wait operation
158    /// * `duration_secs` — Duration to wait in seconds
159    ///
160    /// # Errors
161    ///
162    /// Returns [`DurableError::WaitSuspended`] when the wait has been checkpointed.
163    ///
164    /// # Examples
165    ///
166    /// ```no_run
167    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
168    /// ctx.wait("cooldown", 30).await?;
169    /// println!("Wait completed!");
170    /// # Ok(())
171    /// # }
172    /// ```
173    pub async fn wait(&mut self, name: &str, duration_secs: i32) -> Result<(), DurableError> {
174        self.inner.wait(name, duration_secs).await
175    }
176
177    /// Register a callback and return a handle with the server-generated callback ID.
178    ///
179    /// During execution mode, sends a START checkpoint and returns a
180    /// [`CallbackHandle`] containing the `callback_id` for external systems.
181    /// During replay mode, extracts the cached callback_id from history.
182    ///
183    /// This method NEVER suspends. Use [`callback_result`](Self::callback_result)
184    /// to check the callback outcome (which suspends if not yet signaled).
185    ///
186    /// # Arguments
187    ///
188    /// * `name` — Human-readable name for the callback operation
189    /// * `options` — Timeout configuration (see [`CallbackOptions`])
190    ///
191    /// # Errors
192    ///
193    /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
194    ///
195    /// # Examples
196    ///
197    /// ```no_run
198    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
199    /// use durable_lambda_closure::prelude::*;
200    ///
201    /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
202    /// println!("Callback ID: {}", handle.callback_id);
203    /// # Ok(())
204    /// # }
205    /// ```
206    pub async fn create_callback(
207        &mut self,
208        name: &str,
209        options: CallbackOptions,
210    ) -> Result<CallbackHandle, DurableError> {
211        self.inner.create_callback(name, options).await
212    }
213
214    /// Check the result of a previously created callback.
215    ///
216    /// Return the deserialized success payload if the callback has been
217    /// signaled. Return an error if the callback failed, timed out, or
218    /// hasn't been signaled yet.
219    ///
220    /// # Arguments
221    ///
222    /// * `handle` — The [`CallbackHandle`] returned by [`create_callback`](Self::create_callback)
223    ///
224    /// # Errors
225    ///
226    /// Returns [`DurableError::CallbackSuspended`] if not yet signaled.
227    /// Returns [`DurableError::CallbackFailed`] if the callback failed or timed out.
228    ///
229    /// # Examples
230    ///
231    /// ```no_run
232    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
233    /// use durable_lambda_closure::prelude::*;
234    ///
235    /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
236    /// let result: String = ctx.callback_result(&handle)?;
237    /// # Ok(())
238    /// # }
239    /// ```
240    pub fn callback_result<T: DeserializeOwned>(
241        &self,
242        handle: &CallbackHandle,
243    ) -> Result<T, DurableError> {
244        self.inner.callback_result(handle)
245    }
246
247    /// Durably invoke another Lambda function and return its result.
248    ///
249    /// During execution mode, serializes the payload, sends a START checkpoint,
250    /// and returns [`DurableError::InvokeSuspended`] to signal exit. The server
251    /// invokes the target asynchronously and re-invokes this Lambda when done.
252    ///
253    /// During replay, returns the cached result without re-invoking.
254    ///
255    /// # Arguments
256    ///
257    /// * `name` — Human-readable name for the invoke operation
258    /// * `function_name` — Name or ARN of the target Lambda function
259    /// * `payload` — Input payload to send to the target function
260    ///
261    /// # Errors
262    ///
263    /// Returns [`DurableError::InvokeSuspended`] when the target is still executing.
264    /// Returns [`DurableError::InvokeFailed`] if the target failed or timed out.
265    ///
266    /// # Examples
267    ///
268    /// ```no_run
269    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
270    /// let result: String = ctx.invoke(
271    ///     "call_processor",
272    ///     "payment-processor-lambda",
273    ///     &serde_json::json!({"order_id": 123}),
274    /// ).await?;
275    /// # Ok(())
276    /// # }
277    /// ```
278    pub async fn invoke<T, P>(
279        &mut self,
280        name: &str,
281        function_name: &str,
282        payload: &P,
283    ) -> Result<T, DurableError>
284    where
285        T: DeserializeOwned,
286        P: Serialize,
287    {
288        self.inner.invoke(name, function_name, payload).await
289    }
290
291    /// Execute multiple branches concurrently and return their results.
292    ///
293    /// Each branch receives an owned child context with an isolated checkpoint
294    /// namespace. Branches satisfy `Send + 'static` via `tokio::spawn`.
295    ///
296    /// # Arguments
297    ///
298    /// * `name` — Human-readable name for the parallel operation
299    /// * `branches` — Collection of branch closures
300    /// * `options` — Parallel configuration
301    ///
302    /// # Errors
303    ///
304    /// Returns [`DurableError::ParallelFailed`] if the operation fails.
305    ///
306    /// # Examples
307    ///
308    /// ```no_run
309    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
310    /// use durable_lambda_closure::prelude::*;
311    /// use durable_lambda_core::context::DurableContext;
312    /// use std::pin::Pin;
313    /// use std::future::Future;
314    ///
315    /// type BranchFn = Box<dyn FnOnce(DurableContext) -> Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>> + Send>;
316    ///
317    /// let branches: Vec<BranchFn> = vec![
318    ///     Box::new(|_ctx| Box::pin(async move { Ok(1) })),
319    ///     Box::new(|_ctx| Box::pin(async move { Ok(2) })),
320    /// ];
321    /// let result = ctx.parallel("fan_out", branches, ParallelOptions::new()).await?;
322    /// # Ok(())
323    /// # }
324    /// ```
325    pub async fn parallel<T, F, Fut>(
326        &mut self,
327        name: &str,
328        branches: Vec<F>,
329        options: ParallelOptions,
330    ) -> Result<BatchResult<T>, DurableError>
331    where
332        T: Serialize + DeserializeOwned + Send + 'static,
333        F: FnOnce(DurableContext) -> Fut + Send + 'static,
334        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
335    {
336        self.inner.parallel(name, branches, options).await
337    }
338
339    /// Execute an isolated subflow with its own checkpoint namespace.
340    ///
341    /// The closure receives an owned child [`DurableContext`] whose operations
342    /// are namespaced under this child context's operation ID, preventing
343    /// collisions with the parent or sibling child contexts.
344    ///
345    /// During replay mode, returns the cached result without re-executing
346    /// the closure.
347    ///
348    /// # Arguments
349    ///
350    /// * `name` — Human-readable name for the child context operation
351    /// * `f` — Closure receiving an owned `DurableContext` for the subflow
352    ///
353    /// # Errors
354    ///
355    /// Returns [`DurableError::ChildContextFailed`] if the child context
356    /// is found in a failed state during replay.
357    /// Returns [`DurableError::CheckpointFailed`] if checkpoint API calls fail.
358    ///
359    /// # Examples
360    ///
361    /// ```no_run
362    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
363    /// use durable_lambda_core::context::DurableContext;
364    ///
365    /// let result: i32 = ctx.child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
366    ///     let r: Result<i32, String> = child_ctx.step("inner_step", || async { Ok(42) }).await?;
367    ///     Ok(r.unwrap())
368    /// }).await?;
369    /// assert_eq!(result, 42);
370    /// # Ok(())
371    /// # }
372    /// ```
373    pub async fn child_context<T, F, Fut>(&mut self, name: &str, f: F) -> Result<T, DurableError>
374    where
375        T: Serialize + DeserializeOwned + Send,
376        F: FnOnce(DurableContext) -> Fut + Send,
377        Fut: Future<Output = Result<T, DurableError>> + Send,
378    {
379        self.inner.child_context(name, f).await
380    }
381
382    /// Process a collection of items in parallel and return their results.
383    ///
384    /// Apply the closure `f` to each item concurrently. Each item receives an
385    /// owned child context with an isolated checkpoint namespace. Items satisfy
386    /// `Send + 'static` via `tokio::spawn`. The closure must be `Clone` since
387    /// it is applied to each item independently.
388    ///
389    /// When `batch_size` is configured, items process in sequential batches.
390    ///
391    /// # Arguments
392    ///
393    /// * `name` — Human-readable name for the map operation
394    /// * `items` — Collection of items to process
395    /// * `options` — Map configuration (batching)
396    /// * `f` — Closure applied to each item with an owned child context
397    ///
398    /// # Errors
399    ///
400    /// Returns [`DurableError::MapFailed`] if the operation fails.
401    ///
402    /// # Examples
403    ///
404    /// ```no_run
405    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
406    /// use durable_lambda_closure::prelude::*;
407    /// use durable_lambda_core::context::DurableContext;
408    ///
409    /// let items = vec![1, 2, 3];
410    /// let result = ctx.map(
411    ///     "process_items",
412    ///     items,
413    ///     MapOptions::new().batch_size(2),
414    ///     |item: i32, mut child_ctx: DurableContext| async move {
415    ///         let r: Result<i32, String> = child_ctx.step("double", move || async move { Ok(item * 2) }).await?;
416    ///         Ok(r.unwrap())
417    ///     },
418    /// ).await?;
419    /// # Ok(())
420    /// # }
421    /// ```
422    pub async fn map<T, I, F, Fut>(
423        &mut self,
424        name: &str,
425        items: Vec<I>,
426        options: MapOptions,
427        f: F,
428    ) -> Result<BatchResult<T>, DurableError>
429    where
430        T: Serialize + DeserializeOwned + Send + 'static,
431        I: Send + 'static,
432        F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
433        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
434    {
435        self.inner.map(name, items, options, f).await
436    }
437
438    /// Register a compensatable step.
439    ///
440    /// Executes the forward step and, on success, registers the compensation
441    /// closure for later rollback via [`run_compensations`](Self::run_compensations).
442    ///
443    /// # Examples
444    ///
445    /// ```no_run
446    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
447    /// let result: Result<i32, String> = ctx.step_with_compensation(
448    ///     "charge",
449    ///     || async { Ok(100) },
450    ///     |amount| async move { println!("Refunding {amount}"); Ok(()) },
451    /// ).await?;
452    /// # Ok(())
453    /// # }
454    /// ```
455    pub async fn step_with_compensation<T, E, F, Fut, G, GFut>(
456        &mut self,
457        name: &str,
458        forward_fn: F,
459        compensate_fn: G,
460    ) -> Result<Result<T, E>, DurableError>
461    where
462        T: Serialize + DeserializeOwned + Send + 'static,
463        E: Serialize + DeserializeOwned + Send + 'static,
464        F: FnOnce() -> Fut + Send + 'static,
465        Fut: Future<Output = Result<T, E>> + Send + 'static,
466        G: FnOnce(T) -> GFut + Send + 'static,
467        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
468    {
469        self.inner
470            .step_with_compensation(name, forward_fn, compensate_fn)
471            .await
472    }
473
474    /// Register a compensatable step with options.
475    ///
476    /// Like [`step_with_compensation`](Self::step_with_compensation) but accepts
477    /// [`StepOptions`] for configuring retries, backoff, and timeouts on the forward step.
478    ///
479    /// # Examples
480    ///
481    /// ```no_run
482    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
483    /// use durable_lambda_closure::prelude::*;
484    ///
485    /// let result: Result<String, String> = ctx.step_with_compensation_opts(
486    ///     "book_hotel",
487    ///     StepOptions::new().retries(3),
488    ///     || async { Ok("BOOKING-123".to_string()) },
489    ///     |booking_id| async move { println!("Cancelling: {booking_id}"); Ok(()) },
490    /// ).await?;
491    /// # Ok(())
492    /// # }
493    /// ```
494    pub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
495        &mut self,
496        name: &str,
497        options: StepOptions,
498        forward_fn: F,
499        compensate_fn: G,
500    ) -> Result<Result<T, E>, DurableError>
501    where
502        T: Serialize + DeserializeOwned + Send + 'static,
503        E: Serialize + DeserializeOwned + Send + 'static,
504        F: FnOnce() -> Fut + Send + 'static,
505        Fut: Future<Output = Result<T, E>> + Send + 'static,
506        G: FnOnce(T) -> GFut + Send + 'static,
507        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
508    {
509        self.inner
510            .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
511            .await
512    }
513
514    /// Execute all registered compensations in reverse registration order.
515    ///
516    /// # Examples
517    ///
518    /// ```no_run
519    /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
520    /// let result = ctx.run_compensations().await?;
521    /// if !result.all_succeeded {
522    ///     eprintln!("Some compensations failed");
523    /// }
524    /// # Ok(())
525    /// # }
526    /// ```
527    pub async fn run_compensations(&mut self) -> Result<CompensationResult, DurableError> {
528        self.inner.run_compensations().await
529    }
530
531    /// Return the current execution mode (Replaying or Executing).
532    ///
533    /// # Examples
534    ///
535    /// ```no_run
536    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
537    /// use durable_lambda_core::types::ExecutionMode;
538    /// match ctx.execution_mode() {
539    ///     ExecutionMode::Replaying => { /* returning cached results */ }
540    ///     ExecutionMode::Executing => { /* running new operations */ }
541    /// }
542    /// # }
543    /// ```
544    pub fn execution_mode(&self) -> ExecutionMode {
545        self.inner.execution_mode()
546    }
547
548    /// Return whether the context is currently replaying from history.
549    ///
550    /// # Examples
551    ///
552    /// ```no_run
553    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
554    /// if ctx.is_replaying() {
555    ///     println!("Replaying cached operations");
556    /// }
557    /// # }
558    /// ```
559    pub fn is_replaying(&self) -> bool {
560        self.inner.is_replaying()
561    }
562
563    /// Return a reference to the durable execution ARN.
564    ///
565    /// # Examples
566    ///
567    /// ```no_run
568    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
569    /// println!("Execution ARN: {}", ctx.arn());
570    /// # }
571    /// ```
572    pub fn arn(&self) -> &str {
573        self.inner.arn()
574    }
575
576    /// Return the current checkpoint token.
577    ///
578    /// # Examples
579    ///
580    /// ```no_run
581    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
582    /// let token = ctx.checkpoint_token();
583    /// # }
584    /// ```
585    pub fn checkpoint_token(&self) -> &str {
586        self.inner.checkpoint_token()
587    }
588
589    /// Emit a replay-safe info-level log message.
590    ///
591    /// During execution mode, emits via `tracing::info!` with execution
592    /// context enrichment. During replay mode, the call is a no-op.
593    ///
594    /// # Examples
595    ///
596    /// ```no_run
597    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
598    /// ctx.log("Order processing started");
599    /// # }
600    /// ```
601    pub fn log(&self, message: &str) {
602        self.inner.log(message);
603    }
604
605    /// Emit a replay-safe info-level log message with structured data.
606    ///
607    /// # Examples
608    ///
609    /// ```no_run
610    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
611    /// ctx.log_with_data("Order processed", &serde_json::json!({"order_id": 42}));
612    /// # }
613    /// ```
614    pub fn log_with_data(&self, message: &str, data: &serde_json::Value) {
615        self.inner.log_with_data(message, data);
616    }
617
618    /// Emit a replay-safe debug-level log message.
619    ///
620    /// # Examples
621    ///
622    /// ```no_run
623    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
624    /// ctx.log_debug("Validating order fields");
625    /// # }
626    /// ```
627    pub fn log_debug(&self, message: &str) {
628        self.inner.log_debug(message);
629    }
630
631    /// Emit a replay-safe warn-level log message.
632    ///
633    /// # Examples
634    ///
635    /// ```no_run
636    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
637    /// ctx.log_warn("Inventory below threshold");
638    /// # }
639    /// ```
640    pub fn log_warn(&self, message: &str) {
641        self.inner.log_warn(message);
642    }
643
644    /// Emit a replay-safe error-level log message.
645    ///
646    /// # Examples
647    ///
648    /// ```no_run
649    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
650    /// ctx.log_error("Payment gateway timeout");
651    /// # }
652    /// ```
653    pub fn log_error(&self, message: &str) {
654        self.inner.log_error(message);
655    }
656
657    /// Emit a replay-safe debug-level log message with structured data.
658    ///
659    /// # Examples
660    ///
661    /// ```no_run
662    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
663    /// ctx.log_debug_with_data("Request details", &serde_json::json!({"method": "POST"}));
664    /// # }
665    /// ```
666    pub fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
667        self.inner.log_debug_with_data(message, data);
668    }
669
670    /// Emit a replay-safe warn-level log message with structured data.
671    ///
672    /// # Examples
673    ///
674    /// ```no_run
675    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
676    /// ctx.log_warn_with_data("Retry attempt", &serde_json::json!({"attempt": 3}));
677    /// # }
678    /// ```
679    pub fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
680        self.inner.log_warn_with_data(message, data);
681    }
682
683    /// Emit a replay-safe error-level log message with structured data.
684    ///
685    /// # Examples
686    ///
687    /// ```no_run
688    /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
689    /// ctx.log_error_with_data("Payment failed", &serde_json::json!({"error": "timeout"}));
690    /// # }
691    /// ```
692    pub fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
693        self.inner.log_error_with_data(message, data);
694    }
695}
696
697impl DurableContextOps for ClosureContext {
698    fn step<T, E, F, Fut>(
699        &mut self,
700        name: &str,
701        f: F,
702    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
703    where
704        T: Serialize + DeserializeOwned + Send + 'static,
705        E: Serialize + DeserializeOwned + Send + 'static,
706        F: FnOnce() -> Fut + Send + 'static,
707        Fut: Future<Output = Result<T, E>> + Send + 'static,
708    {
709        self.inner.step(name, f)
710    }
711
712    fn step_with_options<T, E, F, Fut>(
713        &mut self,
714        name: &str,
715        options: StepOptions,
716        f: F,
717    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
718    where
719        T: Serialize + DeserializeOwned + Send + 'static,
720        E: Serialize + DeserializeOwned + Send + 'static,
721        F: FnOnce() -> Fut + Send + 'static,
722        Fut: Future<Output = Result<T, E>> + Send + 'static,
723    {
724        self.inner.step_with_options(name, options, f)
725    }
726
727    fn wait(
728        &mut self,
729        name: &str,
730        duration_secs: i32,
731    ) -> impl Future<Output = Result<(), DurableError>> + Send {
732        self.inner.wait(name, duration_secs)
733    }
734
735    fn create_callback(
736        &mut self,
737        name: &str,
738        options: CallbackOptions,
739    ) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send {
740        self.inner.create_callback(name, options)
741    }
742
743    fn invoke<T, P>(
744        &mut self,
745        name: &str,
746        function_name: &str,
747        payload: &P,
748    ) -> impl Future<Output = Result<T, DurableError>> + Send
749    where
750        T: DeserializeOwned + Send,
751        P: Serialize + Sync,
752    {
753        self.inner.invoke(name, function_name, payload)
754    }
755
756    fn parallel<T, F, Fut>(
757        &mut self,
758        name: &str,
759        branches: Vec<F>,
760        options: durable_lambda_core::types::ParallelOptions,
761    ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
762    where
763        T: Serialize + DeserializeOwned + Send + 'static,
764        F: FnOnce(DurableContext) -> Fut + Send + 'static,
765        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
766    {
767        self.inner.parallel(name, branches, options)
768    }
769
770    fn child_context<T, F, Fut>(
771        &mut self,
772        name: &str,
773        f: F,
774    ) -> impl Future<Output = Result<T, DurableError>> + Send
775    where
776        T: Serialize + DeserializeOwned + Send,
777        F: FnOnce(DurableContext) -> Fut + Send,
778        Fut: Future<Output = Result<T, DurableError>> + Send,
779    {
780        self.inner.child_context(name, f)
781    }
782
783    fn map<T, I, F, Fut>(
784        &mut self,
785        name: &str,
786        items: Vec<I>,
787        options: MapOptions,
788        f: F,
789    ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
790    where
791        T: Serialize + DeserializeOwned + Send + 'static,
792        I: Send + 'static,
793        F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
794        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
795    {
796        self.inner.map(name, items, options, f)
797    }
798
799    fn step_with_compensation<T, E, F, Fut, G, GFut>(
800        &mut self,
801        name: &str,
802        forward_fn: F,
803        compensate_fn: G,
804    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
805    where
806        T: Serialize + DeserializeOwned + Send + 'static,
807        E: Serialize + DeserializeOwned + Send + 'static,
808        F: FnOnce() -> Fut + Send + 'static,
809        Fut: Future<Output = Result<T, E>> + Send + 'static,
810        G: FnOnce(T) -> GFut + Send + 'static,
811        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
812    {
813        self.inner
814            .step_with_compensation(name, forward_fn, compensate_fn)
815    }
816
817    fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
818        &mut self,
819        name: &str,
820        options: StepOptions,
821        forward_fn: F,
822        compensate_fn: G,
823    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
824    where
825        T: Serialize + DeserializeOwned + Send + 'static,
826        E: Serialize + DeserializeOwned + Send + 'static,
827        F: FnOnce() -> Fut + Send + 'static,
828        Fut: Future<Output = Result<T, E>> + Send + 'static,
829        G: FnOnce(T) -> GFut + Send + 'static,
830        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
831    {
832        self.inner
833            .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
834    }
835
836    fn run_compensations(
837        &mut self,
838    ) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send {
839        self.inner.run_compensations()
840    }
841
842    fn callback_result<T: DeserializeOwned>(
843        &self,
844        handle: &CallbackHandle,
845    ) -> Result<T, DurableError> {
846        self.inner.callback_result(handle)
847    }
848
849    fn execution_mode(&self) -> ExecutionMode {
850        self.inner.execution_mode()
851    }
852
853    fn is_replaying(&self) -> bool {
854        self.inner.is_replaying()
855    }
856
857    fn arn(&self) -> &str {
858        self.inner.arn()
859    }
860
861    fn checkpoint_token(&self) -> &str {
862        self.inner.checkpoint_token()
863    }
864
865    fn log(&self, message: &str) {
866        self.inner.log(message);
867    }
868
869    fn log_with_data(&self, message: &str, data: &serde_json::Value) {
870        self.inner.log_with_data(message, data);
871    }
872
873    fn log_debug(&self, message: &str) {
874        self.inner.log_debug(message);
875    }
876
877    fn log_warn(&self, message: &str) {
878        self.inner.log_warn(message);
879    }
880
881    fn log_error(&self, message: &str) {
882        self.inner.log_error(message);
883    }
884
885    fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
886        self.inner.log_debug_with_data(message, data);
887    }
888
889    fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
890        self.inner.log_warn_with_data(message, data);
891    }
892
893    fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
894        self.inner.log_error_with_data(message, data);
895    }
896
897    fn enable_batch_mode(&mut self) {
898        self.inner.enable_batch_mode();
899    }
900
901    fn flush_batch(&mut self) -> impl Future<Output = Result<(), DurableError>> + Send {
902        self.inner.flush_batch()
903    }
904}
905
906#[cfg(test)]
907mod tests {
908    use super::*;
909    use aws_sdk_lambda::operation::checkpoint_durable_execution::CheckpointDurableExecutionOutput;
910    use aws_sdk_lambda::operation::get_durable_execution_state::GetDurableExecutionStateOutput;
911    use aws_sdk_lambda::types::{
912        Operation, OperationStatus, OperationType, OperationUpdate, StepDetails,
913    };
914    use std::collections::HashMap;
915    use std::sync::{Arc, Mutex};
916
917    use durable_lambda_core::backend::DurableBackend;
918
919    /// Mock backend for testing ClosureContext delegation.
920    struct MockBackend {
921        operations: HashMap<String, Operation>,
922        checkpoint_token: Mutex<String>,
923    }
924
925    impl MockBackend {
926        fn new() -> Self {
927            Self {
928                operations: HashMap::new(),
929                checkpoint_token: Mutex::new("mock-token".to_string()),
930            }
931        }
932
933        fn with_operation(mut self, id: &str, operation: Operation) -> Self {
934            self.operations.insert(id.to_string(), operation);
935            self
936        }
937    }
938
939    #[async_trait::async_trait]
940    impl DurableBackend for MockBackend {
941        async fn checkpoint(
942            &self,
943            _arn: &str,
944            _checkpoint_token: &str,
945            _updates: Vec<OperationUpdate>,
946            _client_token: Option<&str>,
947        ) -> Result<CheckpointDurableExecutionOutput, DurableError> {
948            let token = self.checkpoint_token.lock().unwrap().clone();
949            Ok(CheckpointDurableExecutionOutput::builder()
950                .checkpoint_token(token)
951                .build())
952        }
953
954        async fn get_execution_state(
955            &self,
956            _arn: &str,
957            _checkpoint_token: &str,
958            _next_marker: &str,
959            _max_items: i32,
960        ) -> Result<GetDurableExecutionStateOutput, DurableError> {
961            Ok(GetDurableExecutionStateOutput::builder()
962                .build()
963                .expect("test: empty execution state"))
964        }
965    }
966
967    async fn make_closure_context(backend: MockBackend) -> ClosureContext {
968        let ctx = DurableContext::new(
969            Arc::new(backend),
970            "arn:test".to_string(),
971            "tok".to_string(),
972            vec![],
973            None,
974        )
975        .await
976        .unwrap();
977        ClosureContext::new(ctx)
978    }
979
980    fn make_succeeded_op(id: &str, result_json: &str) -> Operation {
981        Operation::builder()
982            .id(id)
983            .r#type(OperationType::Step)
984            .status(OperationStatus::Succeeded)
985            .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
986            .step_details(StepDetails::builder().result(result_json).build())
987            .build()
988            .unwrap_or_else(|e| panic!("failed to build test Operation: {e}"))
989    }
990
991    #[tokio::test]
992    async fn test_closure_context_step_delegates_to_core() {
993        let mut ctx = make_closure_context(MockBackend::new()).await;
994
995        // In executing mode, step should execute the closure and checkpoint
996        let result: Result<i32, String> = ctx.step("validate", || async { Ok(42) }).await.unwrap();
997
998        assert_eq!(result, Ok(42));
999    }
1000
1001    #[tokio::test]
1002    async fn test_closure_context_step_with_options_delegates_to_core() {
1003        let mut ctx = make_closure_context(MockBackend::new()).await;
1004
1005        let result: Result<i32, String> = ctx
1006            .step_with_options("charge", StepOptions::new().retries(3), || async {
1007                Ok(100)
1008            })
1009            .await
1010            .unwrap();
1011
1012        assert_eq!(result, Ok(100));
1013    }
1014
1015    #[tokio::test]
1016    async fn test_closure_context_execution_mode_executing() {
1017        let ctx = make_closure_context(MockBackend::new()).await;
1018        assert_eq!(ctx.execution_mode(), ExecutionMode::Executing);
1019        assert!(!ctx.is_replaying());
1020    }
1021
1022    #[tokio::test]
1023    async fn test_closure_context_execution_mode_replaying() {
1024        // Create a context with a completed operation to trigger replay mode.
1025        let op = make_succeeded_op("op-1", "42");
1026        let backend = MockBackend::new().with_operation("op-1", op.clone());
1027
1028        let durable_ctx = DurableContext::new(
1029            Arc::new(backend),
1030            "arn:test".to_string(),
1031            "tok".to_string(),
1032            vec![op],
1033            None,
1034        )
1035        .await
1036        .unwrap();
1037
1038        let ctx = ClosureContext::new(durable_ctx);
1039        assert_eq!(ctx.execution_mode(), ExecutionMode::Replaying);
1040        assert!(ctx.is_replaying());
1041    }
1042
1043    #[tokio::test]
1044    async fn test_closure_context_arn() {
1045        let ctx = make_closure_context(MockBackend::new()).await;
1046        assert_eq!(ctx.arn(), "arn:test");
1047    }
1048
1049    #[tokio::test]
1050    async fn test_closure_context_checkpoint_token() {
1051        let ctx = make_closure_context(MockBackend::new()).await;
1052        assert_eq!(ctx.checkpoint_token(), "tok");
1053    }
1054
1055    #[tokio::test]
1056    async fn test_closure_context_child_context_delegates_to_core() {
1057        let mut ctx = make_closure_context(MockBackend::new()).await;
1058
1059        let result: i32 = ctx
1060            .child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
1061                let r: Result<i32, String> =
1062                    child_ctx.step("inner_step", || async { Ok(42) }).await?;
1063                Ok(r.unwrap())
1064            })
1065            .await
1066            .unwrap();
1067
1068        assert_eq!(result, 42);
1069    }
1070
1071    #[tokio::test]
1072    async fn test_closure_context_log_delegates_to_core() {
1073        let ctx = make_closure_context(MockBackend::new()).await;
1074        // Verify log methods are callable and don't panic.
1075        // In executing mode, these delegate to DurableContext which emits tracing events.
1076        ctx.log("test message");
1077        ctx.log_with_data("test data", &serde_json::json!({"key": "val"}));
1078        ctx.log_debug("test debug");
1079        ctx.log_warn("test warn");
1080        ctx.log_error("test error");
1081        ctx.log_debug_with_data("debug data", &serde_json::json!({"k": "v"}));
1082        ctx.log_warn_with_data("warn data", &serde_json::json!({"k": "v"}));
1083        ctx.log_error_with_data("error data", &serde_json::json!({"k": "v"}));
1084    }
1085}