durable_lambda_closure/context.rs
1//! Closure-specific context wrapper.
2//!
3//! Thin wrapper over [`DurableContext`](durable_lambda_core::DurableContext)
4//! providing closure-approach ergonomics. All operations delegate directly
5//! to the inner context with no additional logic.
6
7use std::future::Future;
8
9use durable_lambda_core::context::DurableContext;
10use durable_lambda_core::error::DurableError;
11use durable_lambda_core::ops_trait::DurableContextOps;
12use durable_lambda_core::types::{
13 BatchResult, CallbackHandle, CallbackOptions, CompensationResult, ExecutionMode, MapOptions,
14 ParallelOptions, StepOptions,
15};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18
19/// Closure-native context for durable Lambda operations.
20///
21/// Thin wrapper over [`DurableContext`] providing the closure-approach API.
22/// All operations delegate directly to the inner context — no replay logic,
23/// no checkpoint logic, just delegation.
24///
25/// Constructed internally by [`run`](crate::run) — users never create this
26/// directly.
27///
28/// # Examples
29///
30/// ```no_run
31/// use durable_lambda_closure::prelude::*;
32///
33/// async fn handler(event: serde_json::Value, mut ctx: ClosureContext) -> Result<serde_json::Value, DurableError> {
34/// let result: Result<i32, String> = ctx.step("validate", || async {
35/// Ok(42)
36/// }).await?;
37/// Ok(serde_json::json!({"validated": result.unwrap()}))
38/// }
39/// ```
40pub struct ClosureContext {
41 inner: DurableContext,
42}
43
44impl ClosureContext {
45 /// Create from an existing `DurableContext`.
46 ///
47 /// This constructor is `pub(crate)` — only the [`run`](crate::run)
48 /// function creates `ClosureContext` instances.
49 pub(crate) fn new(ctx: DurableContext) -> Self {
50 Self { inner: ctx }
51 }
52
53 /// Execute a named step with checkpointing.
54 ///
55 /// During execution mode, runs the closure and checkpoints the result to AWS.
56 /// During replay mode, returns the previously checkpointed result without
57 /// executing the closure.
58 ///
59 /// # Arguments
60 ///
61 /// * `name` — Human-readable step name, used as checkpoint metadata
62 /// * `f` — Closure to execute (skipped during replay)
63 ///
64 /// # Errors
65 ///
66 /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
67 /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
68 /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
69 ///
70 /// # Examples
71 ///
72 /// ```no_run
73 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
74 /// let result: Result<i32, String> = ctx.step("validate_order", || async {
75 /// Ok(42)
76 /// }).await?;
77 ///
78 /// match result {
79 /// Ok(value) => println!("Step succeeded: {value}"),
80 /// Err(e) => println!("Step failed: {e}"),
81 /// }
82 /// # Ok(())
83 /// # }
84 /// ```
85 pub async fn step<T, E, F, Fut>(
86 &mut self,
87 name: &str,
88 f: F,
89 ) -> Result<Result<T, E>, DurableError>
90 where
91 T: Serialize + DeserializeOwned + Send + 'static,
92 E: Serialize + DeserializeOwned + Send + 'static,
93 F: FnOnce() -> Fut + Send + 'static,
94 Fut: Future<Output = Result<T, E>> + Send + 'static,
95 {
96 self.inner.step(name, f).await
97 }
98
99 /// Execute a named step with checkpointing and retry configuration.
100 ///
101 /// If the closure fails and retries are configured, sends a RETRY checkpoint
102 /// and returns [`DurableError::StepRetryScheduled`] to signal the function
103 /// should exit.
104 ///
105 /// # Arguments
106 ///
107 /// * `name` — Human-readable step name, used as checkpoint metadata
108 /// * `options` — Retry configuration (see [`StepOptions`])
109 /// * `f` — Closure to execute (skipped during replay)
110 ///
111 /// # Errors
112 ///
113 /// Returns [`DurableError::StepRetryScheduled`] when a retry has been scheduled.
114 /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
115 /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
116 ///
117 /// # Examples
118 ///
119 /// ```no_run
120 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
121 /// use durable_lambda_closure::prelude::*;
122 ///
123 /// let result: Result<i32, String> = ctx.step_with_options(
124 /// "charge_payment",
125 /// StepOptions::new().retries(3).backoff_seconds(5),
126 /// || async { Ok(100) },
127 /// ).await?;
128 /// # Ok(())
129 /// # }
130 /// ```
131 pub async fn step_with_options<T, E, F, Fut>(
132 &mut self,
133 name: &str,
134 options: StepOptions,
135 f: F,
136 ) -> Result<Result<T, E>, DurableError>
137 where
138 T: Serialize + DeserializeOwned + Send + 'static,
139 E: Serialize + DeserializeOwned + Send + 'static,
140 F: FnOnce() -> Fut + Send + 'static,
141 Fut: Future<Output = Result<T, E>> + Send + 'static,
142 {
143 self.inner.step_with_options(name, options, f).await
144 }
145
146 /// Suspend execution for the specified duration.
147 ///
148 /// During execution mode, sends a START checkpoint and returns
149 /// [`DurableError::WaitSuspended`] to signal the function should exit.
150 /// The server re-invokes after the duration.
151 ///
152 /// During replay mode, returns `Ok(())` immediately if the wait has
153 /// already completed.
154 ///
155 /// # Arguments
156 ///
157 /// * `name` — Human-readable name for the wait operation
158 /// * `duration_secs` — Duration to wait in seconds
159 ///
160 /// # Errors
161 ///
162 /// Returns [`DurableError::WaitSuspended`] when the wait has been checkpointed.
163 ///
164 /// # Examples
165 ///
166 /// ```no_run
167 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
168 /// ctx.wait("cooldown", 30).await?;
169 /// println!("Wait completed!");
170 /// # Ok(())
171 /// # }
172 /// ```
173 pub async fn wait(&mut self, name: &str, duration_secs: i32) -> Result<(), DurableError> {
174 self.inner.wait(name, duration_secs).await
175 }
176
177 /// Register a callback and return a handle with the server-generated callback ID.
178 ///
179 /// During execution mode, sends a START checkpoint and returns a
180 /// [`CallbackHandle`] containing the `callback_id` for external systems.
181 /// During replay mode, extracts the cached callback_id from history.
182 ///
183 /// This method NEVER suspends. Use [`callback_result`](Self::callback_result)
184 /// to check the callback outcome (which suspends if not yet signaled).
185 ///
186 /// # Arguments
187 ///
188 /// * `name` — Human-readable name for the callback operation
189 /// * `options` — Timeout configuration (see [`CallbackOptions`])
190 ///
191 /// # Errors
192 ///
193 /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
194 ///
195 /// # Examples
196 ///
197 /// ```no_run
198 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
199 /// use durable_lambda_closure::prelude::*;
200 ///
201 /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
202 /// println!("Callback ID: {}", handle.callback_id);
203 /// # Ok(())
204 /// # }
205 /// ```
206 pub async fn create_callback(
207 &mut self,
208 name: &str,
209 options: CallbackOptions,
210 ) -> Result<CallbackHandle, DurableError> {
211 self.inner.create_callback(name, options).await
212 }
213
214 /// Check the result of a previously created callback.
215 ///
216 /// Return the deserialized success payload if the callback has been
217 /// signaled. Return an error if the callback failed, timed out, or
218 /// hasn't been signaled yet.
219 ///
220 /// # Arguments
221 ///
222 /// * `handle` — The [`CallbackHandle`] returned by [`create_callback`](Self::create_callback)
223 ///
224 /// # Errors
225 ///
226 /// Returns [`DurableError::CallbackSuspended`] if not yet signaled.
227 /// Returns [`DurableError::CallbackFailed`] if the callback failed or timed out.
228 ///
229 /// # Examples
230 ///
231 /// ```no_run
232 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
233 /// use durable_lambda_closure::prelude::*;
234 ///
235 /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
236 /// let result: String = ctx.callback_result(&handle)?;
237 /// # Ok(())
238 /// # }
239 /// ```
240 pub fn callback_result<T: DeserializeOwned>(
241 &self,
242 handle: &CallbackHandle,
243 ) -> Result<T, DurableError> {
244 self.inner.callback_result(handle)
245 }
246
247 /// Durably invoke another Lambda function and return its result.
248 ///
249 /// During execution mode, serializes the payload, sends a START checkpoint,
250 /// and returns [`DurableError::InvokeSuspended`] to signal exit. The server
251 /// invokes the target asynchronously and re-invokes this Lambda when done.
252 ///
253 /// During replay, returns the cached result without re-invoking.
254 ///
255 /// # Arguments
256 ///
257 /// * `name` — Human-readable name for the invoke operation
258 /// * `function_name` — Name or ARN of the target Lambda function
259 /// * `payload` — Input payload to send to the target function
260 ///
261 /// # Errors
262 ///
263 /// Returns [`DurableError::InvokeSuspended`] when the target is still executing.
264 /// Returns [`DurableError::InvokeFailed`] if the target failed or timed out.
265 ///
266 /// # Examples
267 ///
268 /// ```no_run
269 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
270 /// let result: String = ctx.invoke(
271 /// "call_processor",
272 /// "payment-processor-lambda",
273 /// &serde_json::json!({"order_id": 123}),
274 /// ).await?;
275 /// # Ok(())
276 /// # }
277 /// ```
278 pub async fn invoke<T, P>(
279 &mut self,
280 name: &str,
281 function_name: &str,
282 payload: &P,
283 ) -> Result<T, DurableError>
284 where
285 T: DeserializeOwned,
286 P: Serialize,
287 {
288 self.inner.invoke(name, function_name, payload).await
289 }
290
291 /// Execute multiple branches concurrently and return their results.
292 ///
293 /// Each branch receives an owned child context with an isolated checkpoint
294 /// namespace. Branches satisfy `Send + 'static` via `tokio::spawn`.
295 ///
296 /// # Arguments
297 ///
298 /// * `name` — Human-readable name for the parallel operation
299 /// * `branches` — Collection of branch closures
300 /// * `options` — Parallel configuration
301 ///
302 /// # Errors
303 ///
304 /// Returns [`DurableError::ParallelFailed`] if the operation fails.
305 ///
306 /// # Examples
307 ///
308 /// ```no_run
309 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
310 /// use durable_lambda_closure::prelude::*;
311 /// use durable_lambda_core::context::DurableContext;
312 /// use std::pin::Pin;
313 /// use std::future::Future;
314 ///
315 /// type BranchFn = Box<dyn FnOnce(DurableContext) -> Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>> + Send>;
316 ///
317 /// let branches: Vec<BranchFn> = vec![
318 /// Box::new(|_ctx| Box::pin(async move { Ok(1) })),
319 /// Box::new(|_ctx| Box::pin(async move { Ok(2) })),
320 /// ];
321 /// let result = ctx.parallel("fan_out", branches, ParallelOptions::new()).await?;
322 /// # Ok(())
323 /// # }
324 /// ```
325 pub async fn parallel<T, F, Fut>(
326 &mut self,
327 name: &str,
328 branches: Vec<F>,
329 options: ParallelOptions,
330 ) -> Result<BatchResult<T>, DurableError>
331 where
332 T: Serialize + DeserializeOwned + Send + 'static,
333 F: FnOnce(DurableContext) -> Fut + Send + 'static,
334 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
335 {
336 self.inner.parallel(name, branches, options).await
337 }
338
339 /// Execute an isolated subflow with its own checkpoint namespace.
340 ///
341 /// The closure receives an owned child [`DurableContext`] whose operations
342 /// are namespaced under this child context's operation ID, preventing
343 /// collisions with the parent or sibling child contexts.
344 ///
345 /// During replay mode, returns the cached result without re-executing
346 /// the closure.
347 ///
348 /// # Arguments
349 ///
350 /// * `name` — Human-readable name for the child context operation
351 /// * `f` — Closure receiving an owned `DurableContext` for the subflow
352 ///
353 /// # Errors
354 ///
355 /// Returns [`DurableError::ChildContextFailed`] if the child context
356 /// is found in a failed state during replay.
357 /// Returns [`DurableError::CheckpointFailed`] if checkpoint API calls fail.
358 ///
359 /// # Examples
360 ///
361 /// ```no_run
362 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
363 /// use durable_lambda_core::context::DurableContext;
364 ///
365 /// let result: i32 = ctx.child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
366 /// let r: Result<i32, String> = child_ctx.step("inner_step", || async { Ok(42) }).await?;
367 /// Ok(r.unwrap())
368 /// }).await?;
369 /// assert_eq!(result, 42);
370 /// # Ok(())
371 /// # }
372 /// ```
373 pub async fn child_context<T, F, Fut>(&mut self, name: &str, f: F) -> Result<T, DurableError>
374 where
375 T: Serialize + DeserializeOwned + Send,
376 F: FnOnce(DurableContext) -> Fut + Send,
377 Fut: Future<Output = Result<T, DurableError>> + Send,
378 {
379 self.inner.child_context(name, f).await
380 }
381
382 /// Process a collection of items in parallel and return their results.
383 ///
384 /// Apply the closure `f` to each item concurrently. Each item receives an
385 /// owned child context with an isolated checkpoint namespace. Items satisfy
386 /// `Send + 'static` via `tokio::spawn`. The closure must be `Clone` since
387 /// it is applied to each item independently.
388 ///
389 /// When `batch_size` is configured, items process in sequential batches.
390 ///
391 /// # Arguments
392 ///
393 /// * `name` — Human-readable name for the map operation
394 /// * `items` — Collection of items to process
395 /// * `options` — Map configuration (batching)
396 /// * `f` — Closure applied to each item with an owned child context
397 ///
398 /// # Errors
399 ///
400 /// Returns [`DurableError::MapFailed`] if the operation fails.
401 ///
402 /// # Examples
403 ///
404 /// ```no_run
405 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
406 /// use durable_lambda_closure::prelude::*;
407 /// use durable_lambda_core::context::DurableContext;
408 ///
409 /// let items = vec![1, 2, 3];
410 /// let result = ctx.map(
411 /// "process_items",
412 /// items,
413 /// MapOptions::new().batch_size(2),
414 /// |item: i32, mut child_ctx: DurableContext| async move {
415 /// let r: Result<i32, String> = child_ctx.step("double", move || async move { Ok(item * 2) }).await?;
416 /// Ok(r.unwrap())
417 /// },
418 /// ).await?;
419 /// # Ok(())
420 /// # }
421 /// ```
422 pub async fn map<T, I, F, Fut>(
423 &mut self,
424 name: &str,
425 items: Vec<I>,
426 options: MapOptions,
427 f: F,
428 ) -> Result<BatchResult<T>, DurableError>
429 where
430 T: Serialize + DeserializeOwned + Send + 'static,
431 I: Send + 'static,
432 F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
433 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
434 {
435 self.inner.map(name, items, options, f).await
436 }
437
438 /// Register a compensatable step.
439 ///
440 /// Executes the forward step and, on success, registers the compensation
441 /// closure for later rollback via [`run_compensations`](Self::run_compensations).
442 ///
443 /// # Examples
444 ///
445 /// ```no_run
446 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
447 /// let result: Result<i32, String> = ctx.step_with_compensation(
448 /// "charge",
449 /// || async { Ok(100) },
450 /// |amount| async move { println!("Refunding {amount}"); Ok(()) },
451 /// ).await?;
452 /// # Ok(())
453 /// # }
454 /// ```
455 pub async fn step_with_compensation<T, E, F, Fut, G, GFut>(
456 &mut self,
457 name: &str,
458 forward_fn: F,
459 compensate_fn: G,
460 ) -> Result<Result<T, E>, DurableError>
461 where
462 T: Serialize + DeserializeOwned + Send + 'static,
463 E: Serialize + DeserializeOwned + Send + 'static,
464 F: FnOnce() -> Fut + Send + 'static,
465 Fut: Future<Output = Result<T, E>> + Send + 'static,
466 G: FnOnce(T) -> GFut + Send + 'static,
467 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
468 {
469 self.inner
470 .step_with_compensation(name, forward_fn, compensate_fn)
471 .await
472 }
473
474 /// Register a compensatable step with options.
475 ///
476 /// Like [`step_with_compensation`](Self::step_with_compensation) but accepts
477 /// [`StepOptions`] for configuring retries, backoff, and timeouts on the forward step.
478 ///
479 /// # Examples
480 ///
481 /// ```no_run
482 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
483 /// use durable_lambda_closure::prelude::*;
484 ///
485 /// let result: Result<String, String> = ctx.step_with_compensation_opts(
486 /// "book_hotel",
487 /// StepOptions::new().retries(3),
488 /// || async { Ok("BOOKING-123".to_string()) },
489 /// |booking_id| async move { println!("Cancelling: {booking_id}"); Ok(()) },
490 /// ).await?;
491 /// # Ok(())
492 /// # }
493 /// ```
494 pub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
495 &mut self,
496 name: &str,
497 options: StepOptions,
498 forward_fn: F,
499 compensate_fn: G,
500 ) -> Result<Result<T, E>, DurableError>
501 where
502 T: Serialize + DeserializeOwned + Send + 'static,
503 E: Serialize + DeserializeOwned + Send + 'static,
504 F: FnOnce() -> Fut + Send + 'static,
505 Fut: Future<Output = Result<T, E>> + Send + 'static,
506 G: FnOnce(T) -> GFut + Send + 'static,
507 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
508 {
509 self.inner
510 .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
511 .await
512 }
513
514 /// Execute all registered compensations in reverse registration order.
515 ///
516 /// # Examples
517 ///
518 /// ```no_run
519 /// # async fn example(ctx: &mut durable_lambda_closure::context::ClosureContext) -> Result<(), durable_lambda_core::error::DurableError> {
520 /// let result = ctx.run_compensations().await?;
521 /// if !result.all_succeeded {
522 /// eprintln!("Some compensations failed");
523 /// }
524 /// # Ok(())
525 /// # }
526 /// ```
527 pub async fn run_compensations(&mut self) -> Result<CompensationResult, DurableError> {
528 self.inner.run_compensations().await
529 }
530
531 /// Return the current execution mode (Replaying or Executing).
532 ///
533 /// # Examples
534 ///
535 /// ```no_run
536 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
537 /// use durable_lambda_core::types::ExecutionMode;
538 /// match ctx.execution_mode() {
539 /// ExecutionMode::Replaying => { /* returning cached results */ }
540 /// ExecutionMode::Executing => { /* running new operations */ }
541 /// }
542 /// # }
543 /// ```
544 pub fn execution_mode(&self) -> ExecutionMode {
545 self.inner.execution_mode()
546 }
547
548 /// Return whether the context is currently replaying from history.
549 ///
550 /// # Examples
551 ///
552 /// ```no_run
553 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
554 /// if ctx.is_replaying() {
555 /// println!("Replaying cached operations");
556 /// }
557 /// # }
558 /// ```
559 pub fn is_replaying(&self) -> bool {
560 self.inner.is_replaying()
561 }
562
563 /// Return a reference to the durable execution ARN.
564 ///
565 /// # Examples
566 ///
567 /// ```no_run
568 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
569 /// println!("Execution ARN: {}", ctx.arn());
570 /// # }
571 /// ```
572 pub fn arn(&self) -> &str {
573 self.inner.arn()
574 }
575
576 /// Return the current checkpoint token.
577 ///
578 /// # Examples
579 ///
580 /// ```no_run
581 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
582 /// let token = ctx.checkpoint_token();
583 /// # }
584 /// ```
585 pub fn checkpoint_token(&self) -> &str {
586 self.inner.checkpoint_token()
587 }
588
589 /// Emit a replay-safe info-level log message.
590 ///
591 /// During execution mode, emits via `tracing::info!` with execution
592 /// context enrichment. During replay mode, the call is a no-op.
593 ///
594 /// # Examples
595 ///
596 /// ```no_run
597 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
598 /// ctx.log("Order processing started");
599 /// # }
600 /// ```
601 pub fn log(&self, message: &str) {
602 self.inner.log(message);
603 }
604
605 /// Emit a replay-safe info-level log message with structured data.
606 ///
607 /// # Examples
608 ///
609 /// ```no_run
610 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
611 /// ctx.log_with_data("Order processed", &serde_json::json!({"order_id": 42}));
612 /// # }
613 /// ```
614 pub fn log_with_data(&self, message: &str, data: &serde_json::Value) {
615 self.inner.log_with_data(message, data);
616 }
617
618 /// Emit a replay-safe debug-level log message.
619 ///
620 /// # Examples
621 ///
622 /// ```no_run
623 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
624 /// ctx.log_debug("Validating order fields");
625 /// # }
626 /// ```
627 pub fn log_debug(&self, message: &str) {
628 self.inner.log_debug(message);
629 }
630
631 /// Emit a replay-safe warn-level log message.
632 ///
633 /// # Examples
634 ///
635 /// ```no_run
636 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
637 /// ctx.log_warn("Inventory below threshold");
638 /// # }
639 /// ```
640 pub fn log_warn(&self, message: &str) {
641 self.inner.log_warn(message);
642 }
643
644 /// Emit a replay-safe error-level log message.
645 ///
646 /// # Examples
647 ///
648 /// ```no_run
649 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
650 /// ctx.log_error("Payment gateway timeout");
651 /// # }
652 /// ```
653 pub fn log_error(&self, message: &str) {
654 self.inner.log_error(message);
655 }
656
657 /// Emit a replay-safe debug-level log message with structured data.
658 ///
659 /// # Examples
660 ///
661 /// ```no_run
662 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
663 /// ctx.log_debug_with_data("Request details", &serde_json::json!({"method": "POST"}));
664 /// # }
665 /// ```
666 pub fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
667 self.inner.log_debug_with_data(message, data);
668 }
669
670 /// Emit a replay-safe warn-level log message with structured data.
671 ///
672 /// # Examples
673 ///
674 /// ```no_run
675 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
676 /// ctx.log_warn_with_data("Retry attempt", &serde_json::json!({"attempt": 3}));
677 /// # }
678 /// ```
679 pub fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
680 self.inner.log_warn_with_data(message, data);
681 }
682
683 /// Emit a replay-safe error-level log message with structured data.
684 ///
685 /// # Examples
686 ///
687 /// ```no_run
688 /// # async fn example(ctx: &durable_lambda_closure::context::ClosureContext) {
689 /// ctx.log_error_with_data("Payment failed", &serde_json::json!({"error": "timeout"}));
690 /// # }
691 /// ```
692 pub fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
693 self.inner.log_error_with_data(message, data);
694 }
695}
696
697impl DurableContextOps for ClosureContext {
698 fn step<T, E, F, Fut>(
699 &mut self,
700 name: &str,
701 f: F,
702 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
703 where
704 T: Serialize + DeserializeOwned + Send + 'static,
705 E: Serialize + DeserializeOwned + Send + 'static,
706 F: FnOnce() -> Fut + Send + 'static,
707 Fut: Future<Output = Result<T, E>> + Send + 'static,
708 {
709 self.inner.step(name, f)
710 }
711
712 fn step_with_options<T, E, F, Fut>(
713 &mut self,
714 name: &str,
715 options: StepOptions,
716 f: F,
717 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
718 where
719 T: Serialize + DeserializeOwned + Send + 'static,
720 E: Serialize + DeserializeOwned + Send + 'static,
721 F: FnOnce() -> Fut + Send + 'static,
722 Fut: Future<Output = Result<T, E>> + Send + 'static,
723 {
724 self.inner.step_with_options(name, options, f)
725 }
726
727 fn wait(
728 &mut self,
729 name: &str,
730 duration_secs: i32,
731 ) -> impl Future<Output = Result<(), DurableError>> + Send {
732 self.inner.wait(name, duration_secs)
733 }
734
735 fn create_callback(
736 &mut self,
737 name: &str,
738 options: CallbackOptions,
739 ) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send {
740 self.inner.create_callback(name, options)
741 }
742
743 fn invoke<T, P>(
744 &mut self,
745 name: &str,
746 function_name: &str,
747 payload: &P,
748 ) -> impl Future<Output = Result<T, DurableError>> + Send
749 where
750 T: DeserializeOwned + Send,
751 P: Serialize + Sync,
752 {
753 self.inner.invoke(name, function_name, payload)
754 }
755
756 fn parallel<T, F, Fut>(
757 &mut self,
758 name: &str,
759 branches: Vec<F>,
760 options: durable_lambda_core::types::ParallelOptions,
761 ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
762 where
763 T: Serialize + DeserializeOwned + Send + 'static,
764 F: FnOnce(DurableContext) -> Fut + Send + 'static,
765 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
766 {
767 self.inner.parallel(name, branches, options)
768 }
769
770 fn child_context<T, F, Fut>(
771 &mut self,
772 name: &str,
773 f: F,
774 ) -> impl Future<Output = Result<T, DurableError>> + Send
775 where
776 T: Serialize + DeserializeOwned + Send,
777 F: FnOnce(DurableContext) -> Fut + Send,
778 Fut: Future<Output = Result<T, DurableError>> + Send,
779 {
780 self.inner.child_context(name, f)
781 }
782
783 fn map<T, I, F, Fut>(
784 &mut self,
785 name: &str,
786 items: Vec<I>,
787 options: MapOptions,
788 f: F,
789 ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
790 where
791 T: Serialize + DeserializeOwned + Send + 'static,
792 I: Send + 'static,
793 F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
794 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
795 {
796 self.inner.map(name, items, options, f)
797 }
798
799 fn step_with_compensation<T, E, F, Fut, G, GFut>(
800 &mut self,
801 name: &str,
802 forward_fn: F,
803 compensate_fn: G,
804 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
805 where
806 T: Serialize + DeserializeOwned + Send + 'static,
807 E: Serialize + DeserializeOwned + Send + 'static,
808 F: FnOnce() -> Fut + Send + 'static,
809 Fut: Future<Output = Result<T, E>> + Send + 'static,
810 G: FnOnce(T) -> GFut + Send + 'static,
811 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
812 {
813 self.inner
814 .step_with_compensation(name, forward_fn, compensate_fn)
815 }
816
817 fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
818 &mut self,
819 name: &str,
820 options: StepOptions,
821 forward_fn: F,
822 compensate_fn: G,
823 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
824 where
825 T: Serialize + DeserializeOwned + Send + 'static,
826 E: Serialize + DeserializeOwned + Send + 'static,
827 F: FnOnce() -> Fut + Send + 'static,
828 Fut: Future<Output = Result<T, E>> + Send + 'static,
829 G: FnOnce(T) -> GFut + Send + 'static,
830 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
831 {
832 self.inner
833 .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
834 }
835
836 fn run_compensations(
837 &mut self,
838 ) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send {
839 self.inner.run_compensations()
840 }
841
842 fn callback_result<T: DeserializeOwned>(
843 &self,
844 handle: &CallbackHandle,
845 ) -> Result<T, DurableError> {
846 self.inner.callback_result(handle)
847 }
848
849 fn execution_mode(&self) -> ExecutionMode {
850 self.inner.execution_mode()
851 }
852
853 fn is_replaying(&self) -> bool {
854 self.inner.is_replaying()
855 }
856
857 fn arn(&self) -> &str {
858 self.inner.arn()
859 }
860
861 fn checkpoint_token(&self) -> &str {
862 self.inner.checkpoint_token()
863 }
864
865 fn log(&self, message: &str) {
866 self.inner.log(message);
867 }
868
869 fn log_with_data(&self, message: &str, data: &serde_json::Value) {
870 self.inner.log_with_data(message, data);
871 }
872
873 fn log_debug(&self, message: &str) {
874 self.inner.log_debug(message);
875 }
876
877 fn log_warn(&self, message: &str) {
878 self.inner.log_warn(message);
879 }
880
881 fn log_error(&self, message: &str) {
882 self.inner.log_error(message);
883 }
884
885 fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
886 self.inner.log_debug_with_data(message, data);
887 }
888
889 fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
890 self.inner.log_warn_with_data(message, data);
891 }
892
893 fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
894 self.inner.log_error_with_data(message, data);
895 }
896
897 fn enable_batch_mode(&mut self) {
898 self.inner.enable_batch_mode();
899 }
900
901 fn flush_batch(&mut self) -> impl Future<Output = Result<(), DurableError>> + Send {
902 self.inner.flush_batch()
903 }
904}
905
906#[cfg(test)]
907mod tests {
908 use super::*;
909 use aws_sdk_lambda::operation::checkpoint_durable_execution::CheckpointDurableExecutionOutput;
910 use aws_sdk_lambda::operation::get_durable_execution_state::GetDurableExecutionStateOutput;
911 use aws_sdk_lambda::types::{
912 Operation, OperationStatus, OperationType, OperationUpdate, StepDetails,
913 };
914 use std::collections::HashMap;
915 use std::sync::{Arc, Mutex};
916
917 use durable_lambda_core::backend::DurableBackend;
918
919 /// Mock backend for testing ClosureContext delegation.
920 struct MockBackend {
921 operations: HashMap<String, Operation>,
922 checkpoint_token: Mutex<String>,
923 }
924
925 impl MockBackend {
926 fn new() -> Self {
927 Self {
928 operations: HashMap::new(),
929 checkpoint_token: Mutex::new("mock-token".to_string()),
930 }
931 }
932
933 fn with_operation(mut self, id: &str, operation: Operation) -> Self {
934 self.operations.insert(id.to_string(), operation);
935 self
936 }
937 }
938
939 #[async_trait::async_trait]
940 impl DurableBackend for MockBackend {
941 async fn checkpoint(
942 &self,
943 _arn: &str,
944 _checkpoint_token: &str,
945 _updates: Vec<OperationUpdate>,
946 _client_token: Option<&str>,
947 ) -> Result<CheckpointDurableExecutionOutput, DurableError> {
948 let token = self.checkpoint_token.lock().unwrap().clone();
949 Ok(CheckpointDurableExecutionOutput::builder()
950 .checkpoint_token(token)
951 .build())
952 }
953
954 async fn get_execution_state(
955 &self,
956 _arn: &str,
957 _checkpoint_token: &str,
958 _next_marker: &str,
959 _max_items: i32,
960 ) -> Result<GetDurableExecutionStateOutput, DurableError> {
961 Ok(GetDurableExecutionStateOutput::builder()
962 .build()
963 .expect("test: empty execution state"))
964 }
965 }
966
967 async fn make_closure_context(backend: MockBackend) -> ClosureContext {
968 let ctx = DurableContext::new(
969 Arc::new(backend),
970 "arn:test".to_string(),
971 "tok".to_string(),
972 vec![],
973 None,
974 )
975 .await
976 .unwrap();
977 ClosureContext::new(ctx)
978 }
979
980 fn make_succeeded_op(id: &str, result_json: &str) -> Operation {
981 Operation::builder()
982 .id(id)
983 .r#type(OperationType::Step)
984 .status(OperationStatus::Succeeded)
985 .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
986 .step_details(StepDetails::builder().result(result_json).build())
987 .build()
988 .unwrap_or_else(|e| panic!("failed to build test Operation: {e}"))
989 }
990
991 #[tokio::test]
992 async fn test_closure_context_step_delegates_to_core() {
993 let mut ctx = make_closure_context(MockBackend::new()).await;
994
995 // In executing mode, step should execute the closure and checkpoint
996 let result: Result<i32, String> = ctx.step("validate", || async { Ok(42) }).await.unwrap();
997
998 assert_eq!(result, Ok(42));
999 }
1000
1001 #[tokio::test]
1002 async fn test_closure_context_step_with_options_delegates_to_core() {
1003 let mut ctx = make_closure_context(MockBackend::new()).await;
1004
1005 let result: Result<i32, String> = ctx
1006 .step_with_options("charge", StepOptions::new().retries(3), || async {
1007 Ok(100)
1008 })
1009 .await
1010 .unwrap();
1011
1012 assert_eq!(result, Ok(100));
1013 }
1014
1015 #[tokio::test]
1016 async fn test_closure_context_execution_mode_executing() {
1017 let ctx = make_closure_context(MockBackend::new()).await;
1018 assert_eq!(ctx.execution_mode(), ExecutionMode::Executing);
1019 assert!(!ctx.is_replaying());
1020 }
1021
1022 #[tokio::test]
1023 async fn test_closure_context_execution_mode_replaying() {
1024 // Create a context with a completed operation to trigger replay mode.
1025 let op = make_succeeded_op("op-1", "42");
1026 let backend = MockBackend::new().with_operation("op-1", op.clone());
1027
1028 let durable_ctx = DurableContext::new(
1029 Arc::new(backend),
1030 "arn:test".to_string(),
1031 "tok".to_string(),
1032 vec![op],
1033 None,
1034 )
1035 .await
1036 .unwrap();
1037
1038 let ctx = ClosureContext::new(durable_ctx);
1039 assert_eq!(ctx.execution_mode(), ExecutionMode::Replaying);
1040 assert!(ctx.is_replaying());
1041 }
1042
1043 #[tokio::test]
1044 async fn test_closure_context_arn() {
1045 let ctx = make_closure_context(MockBackend::new()).await;
1046 assert_eq!(ctx.arn(), "arn:test");
1047 }
1048
1049 #[tokio::test]
1050 async fn test_closure_context_checkpoint_token() {
1051 let ctx = make_closure_context(MockBackend::new()).await;
1052 assert_eq!(ctx.checkpoint_token(), "tok");
1053 }
1054
1055 #[tokio::test]
1056 async fn test_closure_context_child_context_delegates_to_core() {
1057 let mut ctx = make_closure_context(MockBackend::new()).await;
1058
1059 let result: i32 = ctx
1060 .child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
1061 let r: Result<i32, String> =
1062 child_ctx.step("inner_step", || async { Ok(42) }).await?;
1063 Ok(r.unwrap())
1064 })
1065 .await
1066 .unwrap();
1067
1068 assert_eq!(result, 42);
1069 }
1070
1071 #[tokio::test]
1072 async fn test_closure_context_log_delegates_to_core() {
1073 let ctx = make_closure_context(MockBackend::new()).await;
1074 // Verify log methods are callable and don't panic.
1075 // In executing mode, these delegate to DurableContext which emits tracing events.
1076 ctx.log("test message");
1077 ctx.log_with_data("test data", &serde_json::json!({"key": "val"}));
1078 ctx.log_debug("test debug");
1079 ctx.log_warn("test warn");
1080 ctx.log_error("test error");
1081 ctx.log_debug_with_data("debug data", &serde_json::json!({"k": "v"}));
1082 ctx.log_warn_with_data("warn data", &serde_json::json!({"k": "v"}));
1083 ctx.log_error_with_data("error data", &serde_json::json!({"k": "v"}));
1084 }
1085}