durable_lambda_builder/context.rs
1//! Builder-specific context wrapper.
2//!
3//! Thin wrapper over [`DurableContext`](durable_lambda_core::DurableContext)
4//! providing builder-approach ergonomics. All operations delegate directly
5//! to the inner context with no additional logic.
6
7use std::future::Future;
8
9use durable_lambda_core::context::DurableContext;
10use durable_lambda_core::error::DurableError;
11use durable_lambda_core::ops_trait::DurableContextOps;
12use durable_lambda_core::types::{
13 BatchResult, CallbackHandle, CallbackOptions, CompensationResult, ExecutionMode, MapOptions,
14 ParallelOptions, StepOptions,
15};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18
19/// Builder-pattern context for durable Lambda operations.
20///
21/// Thin wrapper over [`DurableContext`] providing the builder-approach API.
22/// All operations delegate directly to the inner context — no replay logic,
23/// no checkpoint logic, just delegation.
24///
25/// Constructed internally by [`DurableHandlerBuilder::run`](crate::DurableHandlerBuilder::run)
26/// — users never create this directly.
27///
28/// # Examples
29///
30/// ```no_run
31/// use durable_lambda_builder::prelude::*;
32///
33/// #[tokio::main]
34/// async fn main() -> Result<(), lambda_runtime::Error> {
35/// durable_lambda_builder::handler(|event: serde_json::Value, mut ctx: BuilderContext| async move {
36/// let result: Result<i32, String> = ctx.step("validate", || async {
37/// Ok(42)
38/// }).await?;
39/// Ok(serde_json::json!({"validated": result.unwrap()}))
40/// })
41/// .run()
42/// .await
43/// }
44/// ```
45pub struct BuilderContext {
46 inner: DurableContext,
47}
48
49impl BuilderContext {
50 /// Create from an existing `DurableContext`.
51 ///
52 /// This constructor is `pub(crate)` — only the
53 /// [`DurableHandlerBuilder::run`](crate::DurableHandlerBuilder::run)
54 /// method creates `BuilderContext` instances.
55 pub(crate) fn new(ctx: DurableContext) -> Self {
56 Self { inner: ctx }
57 }
58
59 /// Execute a named step with checkpointing.
60 ///
61 /// During execution mode, runs the closure and checkpoints the result to AWS.
62 /// During replay mode, returns the previously checkpointed result without
63 /// executing the closure.
64 ///
65 /// # Arguments
66 ///
67 /// * `name` — Human-readable step name, used as checkpoint metadata
68 /// * `f` — Closure to execute (skipped during replay)
69 ///
70 /// # Errors
71 ///
72 /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
73 /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
74 /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
75 ///
76 /// # Examples
77 ///
78 /// ```no_run
79 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
80 /// let result: Result<i32, String> = ctx.step("validate_order", || async {
81 /// Ok(42)
82 /// }).await?;
83 ///
84 /// match result {
85 /// Ok(value) => println!("Step succeeded: {value}"),
86 /// Err(e) => println!("Step failed: {e}"),
87 /// }
88 /// # Ok(())
89 /// # }
90 /// ```
91 pub async fn step<T, E, F, Fut>(
92 &mut self,
93 name: &str,
94 f: F,
95 ) -> Result<Result<T, E>, DurableError>
96 where
97 T: Serialize + DeserializeOwned + Send + 'static,
98 E: Serialize + DeserializeOwned + Send + 'static,
99 F: FnOnce() -> Fut + Send + 'static,
100 Fut: Future<Output = Result<T, E>> + Send + 'static,
101 {
102 self.inner.step(name, f).await
103 }
104
105 /// Execute a named step with checkpointing and retry configuration.
106 ///
107 /// If the closure fails and retries are configured, sends a RETRY checkpoint
108 /// and returns [`DurableError::StepRetryScheduled`] to signal the function
109 /// should exit.
110 ///
111 /// # Arguments
112 ///
113 /// * `name` — Human-readable step name, used as checkpoint metadata
114 /// * `options` — Retry configuration (see [`StepOptions`])
115 /// * `f` — Closure to execute (skipped during replay)
116 ///
117 /// # Errors
118 ///
119 /// Returns [`DurableError::StepRetryScheduled`] when a retry has been scheduled.
120 /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
121 /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
122 ///
123 /// # Examples
124 ///
125 /// ```no_run
126 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
127 /// use durable_lambda_builder::prelude::*;
128 ///
129 /// let result: Result<i32, String> = ctx.step_with_options(
130 /// "charge_payment",
131 /// StepOptions::new().retries(3).backoff_seconds(5),
132 /// || async { Ok(100) },
133 /// ).await?;
134 /// # Ok(())
135 /// # }
136 /// ```
137 pub async fn step_with_options<T, E, F, Fut>(
138 &mut self,
139 name: &str,
140 options: StepOptions,
141 f: F,
142 ) -> Result<Result<T, E>, DurableError>
143 where
144 T: Serialize + DeserializeOwned + Send + 'static,
145 E: Serialize + DeserializeOwned + Send + 'static,
146 F: FnOnce() -> Fut + Send + 'static,
147 Fut: Future<Output = Result<T, E>> + Send + 'static,
148 {
149 self.inner.step_with_options(name, options, f).await
150 }
151
152 /// Suspend execution for the specified duration.
153 ///
154 /// During execution mode, sends a START checkpoint and returns
155 /// [`DurableError::WaitSuspended`] to signal the function should exit.
156 /// The server re-invokes after the duration.
157 ///
158 /// During replay mode, returns `Ok(())` immediately if the wait has
159 /// already completed.
160 ///
161 /// # Arguments
162 ///
163 /// * `name` — Human-readable name for the wait operation
164 /// * `duration_secs` — Duration to wait in seconds
165 ///
166 /// # Errors
167 ///
168 /// Returns [`DurableError::WaitSuspended`] when the wait has been checkpointed.
169 ///
170 /// # Examples
171 ///
172 /// ```no_run
173 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
174 /// ctx.wait("cooldown", 30).await?;
175 /// println!("Wait completed!");
176 /// # Ok(())
177 /// # }
178 /// ```
179 pub async fn wait(&mut self, name: &str, duration_secs: i32) -> Result<(), DurableError> {
180 self.inner.wait(name, duration_secs).await
181 }
182
183 /// Register a callback and return a handle with the server-generated callback ID.
184 ///
185 /// During execution mode, sends a START checkpoint and returns a
186 /// [`CallbackHandle`] containing the `callback_id` for external systems.
187 /// During replay mode, extracts the cached callback_id from history.
188 ///
189 /// This method NEVER suspends. Use [`callback_result`](Self::callback_result)
190 /// to check the callback outcome (which suspends if not yet signaled).
191 ///
192 /// # Arguments
193 ///
194 /// * `name` — Human-readable name for the callback operation
195 /// * `options` — Timeout configuration (see [`CallbackOptions`])
196 ///
197 /// # Errors
198 ///
199 /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
200 ///
201 /// # Examples
202 ///
203 /// ```no_run
204 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
205 /// use durable_lambda_builder::prelude::*;
206 ///
207 /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
208 /// println!("Callback ID: {}", handle.callback_id);
209 /// # Ok(())
210 /// # }
211 /// ```
212 pub async fn create_callback(
213 &mut self,
214 name: &str,
215 options: CallbackOptions,
216 ) -> Result<CallbackHandle, DurableError> {
217 self.inner.create_callback(name, options).await
218 }
219
220 /// Check the result of a previously created callback.
221 ///
222 /// Return the deserialized success payload if the callback has been
223 /// signaled. Return an error if the callback failed, timed out, or
224 /// hasn't been signaled yet.
225 ///
226 /// # Arguments
227 ///
228 /// * `handle` — The [`CallbackHandle`] returned by [`create_callback`](Self::create_callback)
229 ///
230 /// # Errors
231 ///
232 /// Returns [`DurableError::CallbackSuspended`] if not yet signaled.
233 /// Returns [`DurableError::CallbackFailed`] if the callback failed or timed out.
234 ///
235 /// # Examples
236 ///
237 /// ```no_run
238 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
239 /// use durable_lambda_builder::prelude::*;
240 ///
241 /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
242 /// let result: String = ctx.callback_result(&handle)?;
243 /// # Ok(())
244 /// # }
245 /// ```
246 pub fn callback_result<T: DeserializeOwned>(
247 &self,
248 handle: &CallbackHandle,
249 ) -> Result<T, DurableError> {
250 self.inner.callback_result(handle)
251 }
252
253 /// Durably invoke another Lambda function and return its result.
254 ///
255 /// During execution mode, serializes the payload, sends a START checkpoint,
256 /// and returns [`DurableError::InvokeSuspended`] to signal exit. The server
257 /// invokes the target asynchronously and re-invokes this Lambda when done.
258 ///
259 /// During replay, returns the cached result without re-invoking.
260 ///
261 /// # Arguments
262 ///
263 /// * `name` — Human-readable name for the invoke operation
264 /// * `function_name` — Name or ARN of the target Lambda function
265 /// * `payload` — Input payload to send to the target function
266 ///
267 /// # Errors
268 ///
269 /// Returns [`DurableError::InvokeSuspended`] when the target is still executing.
270 /// Returns [`DurableError::InvokeFailed`] if the target failed or timed out.
271 ///
272 /// # Examples
273 ///
274 /// ```no_run
275 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
276 /// let result: String = ctx.invoke(
277 /// "call_processor",
278 /// "payment-processor-lambda",
279 /// &serde_json::json!({"order_id": 123}),
280 /// ).await?;
281 /// # Ok(())
282 /// # }
283 /// ```
284 pub async fn invoke<T, P>(
285 &mut self,
286 name: &str,
287 function_name: &str,
288 payload: &P,
289 ) -> Result<T, DurableError>
290 where
291 T: DeserializeOwned,
292 P: Serialize,
293 {
294 self.inner.invoke(name, function_name, payload).await
295 }
296
297 /// Execute multiple branches concurrently and return their results.
298 ///
299 /// Each branch receives an owned child context with an isolated checkpoint
300 /// namespace. Branches satisfy `Send + 'static` via `tokio::spawn`.
301 ///
302 /// # Arguments
303 ///
304 /// * `name` — Human-readable name for the parallel operation
305 /// * `branches` — Collection of branch closures
306 /// * `options` — Parallel configuration
307 ///
308 /// # Errors
309 ///
310 /// Returns [`DurableError::ParallelFailed`] if the operation fails.
311 ///
312 /// # Examples
313 ///
314 /// ```no_run
315 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
316 /// use durable_lambda_builder::prelude::*;
317 /// use durable_lambda_core::context::DurableContext;
318 /// use std::pin::Pin;
319 /// use std::future::Future;
320 ///
321 /// type BranchFn = Box<dyn FnOnce(DurableContext) -> Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>> + Send>;
322 ///
323 /// let branches: Vec<BranchFn> = vec![
324 /// Box::new(|_ctx| Box::pin(async move { Ok(1) })),
325 /// Box::new(|_ctx| Box::pin(async move { Ok(2) })),
326 /// ];
327 /// let result = ctx.parallel("fan_out", branches, ParallelOptions::new()).await?;
328 /// # Ok(())
329 /// # }
330 /// ```
331 pub async fn parallel<T, F, Fut>(
332 &mut self,
333 name: &str,
334 branches: Vec<F>,
335 options: ParallelOptions,
336 ) -> Result<BatchResult<T>, DurableError>
337 where
338 T: Serialize + DeserializeOwned + Send + 'static,
339 F: FnOnce(DurableContext) -> Fut + Send + 'static,
340 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
341 {
342 self.inner.parallel(name, branches, options).await
343 }
344
345 /// Execute an isolated subflow with its own checkpoint namespace.
346 ///
347 /// The closure receives an owned child [`DurableContext`] whose operations
348 /// are namespaced under this child context's operation ID, preventing
349 /// collisions with the parent or sibling child contexts.
350 ///
351 /// During replay mode, returns the cached result without re-executing
352 /// the closure.
353 ///
354 /// # Arguments
355 ///
356 /// * `name` — Human-readable name for the child context operation
357 /// * `f` — Closure receiving an owned `DurableContext` for the subflow
358 ///
359 /// # Errors
360 ///
361 /// Returns [`DurableError::ChildContextFailed`] if the child context
362 /// is found in a failed state during replay.
363 /// Returns [`DurableError::CheckpointFailed`] if checkpoint API calls fail.
364 ///
365 /// # Examples
366 ///
367 /// ```no_run
368 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
369 /// use durable_lambda_core::context::DurableContext;
370 ///
371 /// let result: i32 = ctx.child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
372 /// let r: Result<i32, String> = child_ctx.step("inner_step", || async { Ok(42) }).await?;
373 /// Ok(r.unwrap())
374 /// }).await?;
375 /// assert_eq!(result, 42);
376 /// # Ok(())
377 /// # }
378 /// ```
379 pub async fn child_context<T, F, Fut>(&mut self, name: &str, f: F) -> Result<T, DurableError>
380 where
381 T: Serialize + DeserializeOwned + Send,
382 F: FnOnce(DurableContext) -> Fut + Send,
383 Fut: Future<Output = Result<T, DurableError>> + Send,
384 {
385 self.inner.child_context(name, f).await
386 }
387
388 /// Process a collection of items in parallel and return their results.
389 ///
390 /// Apply the closure `f` to each item concurrently. Each item receives an
391 /// owned child context with an isolated checkpoint namespace. Items satisfy
392 /// `Send + 'static` via `tokio::spawn`. The closure must be `Clone` since
393 /// it is applied to each item independently.
394 ///
395 /// When `batch_size` is configured, items process in sequential batches.
396 ///
397 /// # Arguments
398 ///
399 /// * `name` — Human-readable name for the map operation
400 /// * `items` — Collection of items to process
401 /// * `options` — Map configuration (batching)
402 /// * `f` — Closure applied to each item with an owned child context
403 ///
404 /// # Errors
405 ///
406 /// Returns [`DurableError::MapFailed`] if the operation fails.
407 ///
408 /// # Examples
409 ///
410 /// ```no_run
411 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
412 /// use durable_lambda_builder::prelude::*;
413 /// use durable_lambda_core::context::DurableContext;
414 ///
415 /// let items = vec![1, 2, 3];
416 /// let result = ctx.map(
417 /// "process_items",
418 /// items,
419 /// MapOptions::new().batch_size(2),
420 /// |item: i32, mut child_ctx: DurableContext| async move {
421 /// let r: Result<i32, String> = child_ctx.step("double", move || async move { Ok(item * 2) }).await?;
422 /// Ok(r.unwrap())
423 /// },
424 /// ).await?;
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub async fn map<T, I, F, Fut>(
429 &mut self,
430 name: &str,
431 items: Vec<I>,
432 options: MapOptions,
433 f: F,
434 ) -> Result<BatchResult<T>, DurableError>
435 where
436 T: Serialize + DeserializeOwned + Send + 'static,
437 I: Send + 'static,
438 F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
439 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
440 {
441 self.inner.map(name, items, options, f).await
442 }
443
444 /// Register a compensatable step.
445 ///
446 /// Executes the forward step and, on success, registers the compensation
447 /// closure for later rollback via [`run_compensations`](Self::run_compensations).
448 ///
449 /// # Examples
450 ///
451 /// ```no_run
452 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
453 /// let result: Result<i32, String> = ctx.step_with_compensation(
454 /// "charge",
455 /// || async { Ok(100) },
456 /// |amount| async move { println!("Refunding {amount}"); Ok(()) },
457 /// ).await?;
458 /// # Ok(())
459 /// # }
460 /// ```
461 pub async fn step_with_compensation<T, E, F, Fut, G, GFut>(
462 &mut self,
463 name: &str,
464 forward_fn: F,
465 compensate_fn: G,
466 ) -> Result<Result<T, E>, DurableError>
467 where
468 T: Serialize + DeserializeOwned + Send + 'static,
469 E: Serialize + DeserializeOwned + Send + 'static,
470 F: FnOnce() -> Fut + Send + 'static,
471 Fut: Future<Output = Result<T, E>> + Send + 'static,
472 G: FnOnce(T) -> GFut + Send + 'static,
473 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
474 {
475 self.inner
476 .step_with_compensation(name, forward_fn, compensate_fn)
477 .await
478 }
479
480 /// Register a compensatable step with options.
481 ///
482 /// Like [`step_with_compensation`](Self::step_with_compensation) but accepts
483 /// [`StepOptions`] for configuring retries, backoff, and timeouts on the forward step.
484 ///
485 /// # Examples
486 ///
487 /// ```no_run
488 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
489 /// use durable_lambda_builder::prelude::*;
490 ///
491 /// let result: Result<String, String> = ctx.step_with_compensation_opts(
492 /// "book_hotel",
493 /// StepOptions::new().retries(3),
494 /// || async { Ok("BOOKING-123".to_string()) },
495 /// |booking_id| async move { println!("Cancelling: {booking_id}"); Ok(()) },
496 /// ).await?;
497 /// # Ok(())
498 /// # }
499 /// ```
500 pub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
501 &mut self,
502 name: &str,
503 options: StepOptions,
504 forward_fn: F,
505 compensate_fn: G,
506 ) -> Result<Result<T, E>, DurableError>
507 where
508 T: Serialize + DeserializeOwned + Send + 'static,
509 E: Serialize + DeserializeOwned + Send + 'static,
510 F: FnOnce() -> Fut + Send + 'static,
511 Fut: Future<Output = Result<T, E>> + Send + 'static,
512 G: FnOnce(T) -> GFut + Send + 'static,
513 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
514 {
515 self.inner
516 .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
517 .await
518 }
519
520 /// Execute all registered compensations in reverse registration order.
521 ///
522 /// # Examples
523 ///
524 /// ```no_run
525 /// # async fn example(ctx: &mut durable_lambda_builder::context::BuilderContext) -> Result<(), durable_lambda_core::error::DurableError> {
526 /// let result = ctx.run_compensations().await?;
527 /// if !result.all_succeeded {
528 /// eprintln!("Some compensations failed");
529 /// }
530 /// # Ok(())
531 /// # }
532 /// ```
533 pub async fn run_compensations(&mut self) -> Result<CompensationResult, DurableError> {
534 self.inner.run_compensations().await
535 }
536
537 /// Return the current execution mode (Replaying or Executing).
538 ///
539 /// # Examples
540 ///
541 /// ```no_run
542 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
543 /// use durable_lambda_core::types::ExecutionMode;
544 /// match ctx.execution_mode() {
545 /// ExecutionMode::Replaying => { /* returning cached results */ }
546 /// ExecutionMode::Executing => { /* running new operations */ }
547 /// }
548 /// # }
549 /// ```
550 pub fn execution_mode(&self) -> ExecutionMode {
551 self.inner.execution_mode()
552 }
553
554 /// Return whether the context is currently replaying from history.
555 ///
556 /// # Examples
557 ///
558 /// ```no_run
559 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
560 /// if ctx.is_replaying() {
561 /// println!("Replaying cached operations");
562 /// }
563 /// # }
564 /// ```
565 pub fn is_replaying(&self) -> bool {
566 self.inner.is_replaying()
567 }
568
569 /// Return a reference to the durable execution ARN.
570 ///
571 /// # Examples
572 ///
573 /// ```no_run
574 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
575 /// println!("Execution ARN: {}", ctx.arn());
576 /// # }
577 /// ```
578 pub fn arn(&self) -> &str {
579 self.inner.arn()
580 }
581
582 /// Return the current checkpoint token.
583 ///
584 /// # Examples
585 ///
586 /// ```no_run
587 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
588 /// let token = ctx.checkpoint_token();
589 /// # }
590 /// ```
591 pub fn checkpoint_token(&self) -> &str {
592 self.inner.checkpoint_token()
593 }
594
595 /// Emit a replay-safe info-level log message.
596 ///
597 /// During execution mode, emits via `tracing::info!` with execution
598 /// context enrichment. During replay mode, the call is a no-op.
599 ///
600 /// # Examples
601 ///
602 /// ```no_run
603 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
604 /// ctx.log("Order processing started");
605 /// # }
606 /// ```
607 pub fn log(&self, message: &str) {
608 self.inner.log(message);
609 }
610
611 /// Emit a replay-safe info-level log message with structured data.
612 ///
613 /// # Examples
614 ///
615 /// ```no_run
616 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
617 /// ctx.log_with_data("Order processed", &serde_json::json!({"order_id": 42}));
618 /// # }
619 /// ```
620 pub fn log_with_data(&self, message: &str, data: &serde_json::Value) {
621 self.inner.log_with_data(message, data);
622 }
623
624 /// Emit a replay-safe debug-level log message.
625 ///
626 /// # Examples
627 ///
628 /// ```no_run
629 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
630 /// ctx.log_debug("Validating order fields");
631 /// # }
632 /// ```
633 pub fn log_debug(&self, message: &str) {
634 self.inner.log_debug(message);
635 }
636
637 /// Emit a replay-safe warn-level log message.
638 ///
639 /// # Examples
640 ///
641 /// ```no_run
642 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
643 /// ctx.log_warn("Inventory below threshold");
644 /// # }
645 /// ```
646 pub fn log_warn(&self, message: &str) {
647 self.inner.log_warn(message);
648 }
649
650 /// Emit a replay-safe error-level log message.
651 ///
652 /// # Examples
653 ///
654 /// ```no_run
655 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
656 /// ctx.log_error("Payment gateway timeout");
657 /// # }
658 /// ```
659 pub fn log_error(&self, message: &str) {
660 self.inner.log_error(message);
661 }
662
663 /// Emit a replay-safe debug-level log message with structured data.
664 ///
665 /// # Examples
666 ///
667 /// ```no_run
668 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
669 /// ctx.log_debug_with_data("Request details", &serde_json::json!({"method": "POST"}));
670 /// # }
671 /// ```
672 pub fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
673 self.inner.log_debug_with_data(message, data);
674 }
675
676 /// Emit a replay-safe warn-level log message with structured data.
677 ///
678 /// # Examples
679 ///
680 /// ```no_run
681 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
682 /// ctx.log_warn_with_data("Retry attempt", &serde_json::json!({"attempt": 3}));
683 /// # }
684 /// ```
685 pub fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
686 self.inner.log_warn_with_data(message, data);
687 }
688
689 /// Emit a replay-safe error-level log message with structured data.
690 ///
691 /// # Examples
692 ///
693 /// ```no_run
694 /// # async fn example(ctx: &durable_lambda_builder::context::BuilderContext) {
695 /// ctx.log_error_with_data("Payment failed", &serde_json::json!({"error": "timeout"}));
696 /// # }
697 /// ```
698 pub fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
699 self.inner.log_error_with_data(message, data);
700 }
701}
702
703impl DurableContextOps for BuilderContext {
704 fn step<T, E, F, Fut>(
705 &mut self,
706 name: &str,
707 f: F,
708 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
709 where
710 T: Serialize + DeserializeOwned + Send + 'static,
711 E: Serialize + DeserializeOwned + Send + 'static,
712 F: FnOnce() -> Fut + Send + 'static,
713 Fut: Future<Output = Result<T, E>> + Send + 'static,
714 {
715 self.inner.step(name, f)
716 }
717
718 fn step_with_options<T, E, F, Fut>(
719 &mut self,
720 name: &str,
721 options: StepOptions,
722 f: F,
723 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
724 where
725 T: Serialize + DeserializeOwned + Send + 'static,
726 E: Serialize + DeserializeOwned + Send + 'static,
727 F: FnOnce() -> Fut + Send + 'static,
728 Fut: Future<Output = Result<T, E>> + Send + 'static,
729 {
730 self.inner.step_with_options(name, options, f)
731 }
732
733 fn wait(
734 &mut self,
735 name: &str,
736 duration_secs: i32,
737 ) -> impl Future<Output = Result<(), DurableError>> + Send {
738 self.inner.wait(name, duration_secs)
739 }
740
741 fn create_callback(
742 &mut self,
743 name: &str,
744 options: CallbackOptions,
745 ) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send {
746 self.inner.create_callback(name, options)
747 }
748
749 fn invoke<T, P>(
750 &mut self,
751 name: &str,
752 function_name: &str,
753 payload: &P,
754 ) -> impl Future<Output = Result<T, DurableError>> + Send
755 where
756 T: DeserializeOwned + Send,
757 P: Serialize + Sync,
758 {
759 self.inner.invoke(name, function_name, payload)
760 }
761
762 fn parallel<T, F, Fut>(
763 &mut self,
764 name: &str,
765 branches: Vec<F>,
766 options: ParallelOptions,
767 ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
768 where
769 T: Serialize + DeserializeOwned + Send + 'static,
770 F: FnOnce(DurableContext) -> Fut + Send + 'static,
771 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
772 {
773 self.inner.parallel(name, branches, options)
774 }
775
776 fn child_context<T, F, Fut>(
777 &mut self,
778 name: &str,
779 f: F,
780 ) -> impl Future<Output = Result<T, DurableError>> + Send
781 where
782 T: Serialize + DeserializeOwned + Send,
783 F: FnOnce(DurableContext) -> Fut + Send,
784 Fut: Future<Output = Result<T, DurableError>> + Send,
785 {
786 self.inner.child_context(name, f)
787 }
788
789 fn map<T, I, F, Fut>(
790 &mut self,
791 name: &str,
792 items: Vec<I>,
793 options: MapOptions,
794 f: F,
795 ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
796 where
797 T: Serialize + DeserializeOwned + Send + 'static,
798 I: Send + 'static,
799 F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
800 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
801 {
802 self.inner.map(name, items, options, f)
803 }
804
805 fn step_with_compensation<T, E, F, Fut, G, GFut>(
806 &mut self,
807 name: &str,
808 forward_fn: F,
809 compensate_fn: G,
810 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
811 where
812 T: Serialize + DeserializeOwned + Send + 'static,
813 E: Serialize + DeserializeOwned + Send + 'static,
814 F: FnOnce() -> Fut + Send + 'static,
815 Fut: Future<Output = Result<T, E>> + Send + 'static,
816 G: FnOnce(T) -> GFut + Send + 'static,
817 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
818 {
819 self.inner
820 .step_with_compensation(name, forward_fn, compensate_fn)
821 }
822
823 fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
824 &mut self,
825 name: &str,
826 options: StepOptions,
827 forward_fn: F,
828 compensate_fn: G,
829 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
830 where
831 T: Serialize + DeserializeOwned + Send + 'static,
832 E: Serialize + DeserializeOwned + Send + 'static,
833 F: FnOnce() -> Fut + Send + 'static,
834 Fut: Future<Output = Result<T, E>> + Send + 'static,
835 G: FnOnce(T) -> GFut + Send + 'static,
836 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
837 {
838 self.inner
839 .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
840 }
841
842 fn run_compensations(
843 &mut self,
844 ) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send {
845 self.inner.run_compensations()
846 }
847
848 fn callback_result<T: DeserializeOwned>(
849 &self,
850 handle: &CallbackHandle,
851 ) -> Result<T, DurableError> {
852 self.inner.callback_result(handle)
853 }
854
855 fn execution_mode(&self) -> ExecutionMode {
856 self.inner.execution_mode()
857 }
858
859 fn is_replaying(&self) -> bool {
860 self.inner.is_replaying()
861 }
862
863 fn arn(&self) -> &str {
864 self.inner.arn()
865 }
866
867 fn checkpoint_token(&self) -> &str {
868 self.inner.checkpoint_token()
869 }
870
871 fn log(&self, message: &str) {
872 self.inner.log(message);
873 }
874
875 fn log_with_data(&self, message: &str, data: &serde_json::Value) {
876 self.inner.log_with_data(message, data);
877 }
878
879 fn log_debug(&self, message: &str) {
880 self.inner.log_debug(message);
881 }
882
883 fn log_warn(&self, message: &str) {
884 self.inner.log_warn(message);
885 }
886
887 fn log_error(&self, message: &str) {
888 self.inner.log_error(message);
889 }
890
891 fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
892 self.inner.log_debug_with_data(message, data);
893 }
894
895 fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
896 self.inner.log_warn_with_data(message, data);
897 }
898
899 fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
900 self.inner.log_error_with_data(message, data);
901 }
902
903 fn enable_batch_mode(&mut self) {
904 self.inner.enable_batch_mode();
905 }
906
907 fn flush_batch(&mut self) -> impl Future<Output = Result<(), DurableError>> + Send {
908 self.inner.flush_batch()
909 }
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915 use aws_sdk_lambda::operation::checkpoint_durable_execution::CheckpointDurableExecutionOutput;
916 use aws_sdk_lambda::operation::get_durable_execution_state::GetDurableExecutionStateOutput;
917 use aws_sdk_lambda::types::{
918 Operation, OperationStatus, OperationType, OperationUpdate, StepDetails,
919 };
920 use std::collections::HashMap;
921 use std::sync::{Arc, Mutex};
922
923 use durable_lambda_core::backend::DurableBackend;
924
925 /// Mock backend for testing BuilderContext delegation.
926 struct MockBackend {
927 operations: HashMap<String, Operation>,
928 checkpoint_token: Mutex<String>,
929 }
930
931 impl MockBackend {
932 fn new() -> Self {
933 Self {
934 operations: HashMap::new(),
935 checkpoint_token: Mutex::new("mock-token".to_string()),
936 }
937 }
938
939 fn with_operation(mut self, id: &str, operation: Operation) -> Self {
940 self.operations.insert(id.to_string(), operation);
941 self
942 }
943 }
944
945 #[async_trait::async_trait]
946 impl DurableBackend for MockBackend {
947 async fn checkpoint(
948 &self,
949 _arn: &str,
950 _checkpoint_token: &str,
951 _updates: Vec<OperationUpdate>,
952 _client_token: Option<&str>,
953 ) -> Result<CheckpointDurableExecutionOutput, DurableError> {
954 let token = self.checkpoint_token.lock().unwrap().clone();
955 Ok(CheckpointDurableExecutionOutput::builder()
956 .checkpoint_token(token)
957 .build())
958 }
959
960 async fn get_execution_state(
961 &self,
962 _arn: &str,
963 _checkpoint_token: &str,
964 _next_marker: &str,
965 _max_items: i32,
966 ) -> Result<GetDurableExecutionStateOutput, DurableError> {
967 Ok(GetDurableExecutionStateOutput::builder()
968 .build()
969 .expect("test: empty execution state"))
970 }
971 }
972
973 async fn make_builder_context(backend: MockBackend) -> BuilderContext {
974 let ctx = DurableContext::new(
975 Arc::new(backend),
976 "arn:test".to_string(),
977 "tok".to_string(),
978 vec![],
979 None,
980 )
981 .await
982 .unwrap();
983 BuilderContext::new(ctx)
984 }
985
986 fn make_succeeded_op(id: &str, result_json: &str) -> Operation {
987 Operation::builder()
988 .id(id)
989 .r#type(OperationType::Step)
990 .status(OperationStatus::Succeeded)
991 .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
992 .step_details(StepDetails::builder().result(result_json).build())
993 .build()
994 .unwrap_or_else(|e| panic!("failed to build test Operation: {e}"))
995 }
996
997 #[tokio::test]
998 async fn test_builder_context_step_delegates_to_core() {
999 let mut ctx = make_builder_context(MockBackend::new()).await;
1000
1001 let result: Result<i32, String> = ctx.step("validate", || async { Ok(42) }).await.unwrap();
1002
1003 assert_eq!(result, Ok(42));
1004 }
1005
1006 #[tokio::test]
1007 async fn test_builder_context_step_with_options_delegates_to_core() {
1008 let mut ctx = make_builder_context(MockBackend::new()).await;
1009
1010 let result: Result<i32, String> = ctx
1011 .step_with_options("charge", StepOptions::new().retries(3), || async {
1012 Ok(100)
1013 })
1014 .await
1015 .unwrap();
1016
1017 assert_eq!(result, Ok(100));
1018 }
1019
1020 #[tokio::test]
1021 async fn test_builder_context_execution_mode_executing() {
1022 let ctx = make_builder_context(MockBackend::new()).await;
1023 assert_eq!(ctx.execution_mode(), ExecutionMode::Executing);
1024 assert!(!ctx.is_replaying());
1025 }
1026
1027 #[tokio::test]
1028 async fn test_builder_context_execution_mode_replaying() {
1029 let op = make_succeeded_op("op-1", "42");
1030 let backend = MockBackend::new().with_operation("op-1", op.clone());
1031
1032 let durable_ctx = DurableContext::new(
1033 Arc::new(backend),
1034 "arn:test".to_string(),
1035 "tok".to_string(),
1036 vec![op],
1037 None,
1038 )
1039 .await
1040 .unwrap();
1041
1042 let ctx = BuilderContext::new(durable_ctx);
1043 assert_eq!(ctx.execution_mode(), ExecutionMode::Replaying);
1044 assert!(ctx.is_replaying());
1045 }
1046
1047 #[tokio::test]
1048 async fn test_builder_context_arn() {
1049 let ctx = make_builder_context(MockBackend::new()).await;
1050 assert_eq!(ctx.arn(), "arn:test");
1051 }
1052
1053 #[tokio::test]
1054 async fn test_builder_context_checkpoint_token() {
1055 let ctx = make_builder_context(MockBackend::new()).await;
1056 assert_eq!(ctx.checkpoint_token(), "tok");
1057 }
1058
1059 #[tokio::test]
1060 async fn test_builder_context_child_context_delegates_to_core() {
1061 let mut ctx = make_builder_context(MockBackend::new()).await;
1062
1063 let result: i32 = ctx
1064 .child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
1065 let r: Result<i32, String> =
1066 child_ctx.step("inner_step", || async { Ok(42) }).await?;
1067 Ok(r.unwrap())
1068 })
1069 .await
1070 .unwrap();
1071
1072 assert_eq!(result, 42);
1073 }
1074
1075 #[tokio::test]
1076 async fn test_builder_context_log_delegates_to_core() {
1077 let ctx = make_builder_context(MockBackend::new()).await;
1078 // Verify log methods are callable and don't panic.
1079 ctx.log("test message");
1080 ctx.log_with_data("test data", &serde_json::json!({"key": "val"}));
1081 ctx.log_debug("test debug");
1082 ctx.log_warn("test warn");
1083 ctx.log_error("test error");
1084 ctx.log_debug_with_data("debug data", &serde_json::json!({"k": "v"}));
1085 ctx.log_warn_with_data("warn data", &serde_json::json!({"k": "v"}));
1086 ctx.log_error_with_data("error data", &serde_json::json!({"k": "v"}));
1087 }
1088}