durable_lambda_trait/context.rs
1//! Trait-specific context wrapper.
2//!
3//! Thin wrapper over [`DurableContext`](durable_lambda_core::DurableContext)
4//! providing trait-approach ergonomics. All operations delegate directly
5//! to the inner context with no additional logic.
6
7use std::future::Future;
8
9use durable_lambda_core::context::DurableContext;
10use durable_lambda_core::error::DurableError;
11use durable_lambda_core::ops_trait::DurableContextOps;
12use durable_lambda_core::types::{
13 BatchResult, CallbackHandle, CallbackOptions, CompensationResult, ExecutionMode, MapOptions,
14 ParallelOptions, StepOptions,
15};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18
19/// Trait-native context for durable Lambda operations.
20///
21/// Thin wrapper over [`DurableContext`] providing the trait-approach API.
22/// All operations delegate directly to the inner context — no replay logic,
23/// no checkpoint logic, just delegation.
24///
25/// Constructed internally by [`run`](crate::run) — users never create this
26/// directly.
27///
28/// # Examples
29///
30/// ```no_run
31/// use durable_lambda_trait::prelude::*;
32/// use async_trait::async_trait;
33///
34/// struct MyHandler;
35///
36/// #[async_trait]
37/// impl DurableHandler for MyHandler {
38/// async fn handle(
39/// &self,
40/// event: serde_json::Value,
41/// mut ctx: TraitContext,
42/// ) -> Result<serde_json::Value, DurableError> {
43/// let result: Result<i32, String> = ctx.step("validate", || async {
44/// Ok(42)
45/// }).await?;
46/// Ok(serde_json::json!({"validated": result.unwrap()}))
47/// }
48/// }
49/// ```
50pub struct TraitContext {
51 inner: DurableContext,
52}
53
54impl TraitContext {
55 /// Create from an existing `DurableContext`.
56 ///
57 /// This constructor is `pub(crate)` — only the [`run`](crate::run)
58 /// function creates `TraitContext` instances.
59 pub(crate) fn new(ctx: DurableContext) -> Self {
60 Self { inner: ctx }
61 }
62
63 /// Execute a named step with checkpointing.
64 ///
65 /// During execution mode, runs the closure and checkpoints the result to AWS.
66 /// During replay mode, returns the previously checkpointed result without
67 /// executing the closure.
68 ///
69 /// # Arguments
70 ///
71 /// * `name` — Human-readable step name, used as checkpoint metadata
72 /// * `f` — Closure to execute (skipped during replay)
73 ///
74 /// # Errors
75 ///
76 /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
77 /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
78 /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
79 ///
80 /// # Examples
81 ///
82 /// ```no_run
83 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
84 /// let result: Result<i32, String> = ctx.step("validate_order", || async {
85 /// Ok(42)
86 /// }).await?;
87 ///
88 /// match result {
89 /// Ok(value) => println!("Step succeeded: {value}"),
90 /// Err(e) => println!("Step failed: {e}"),
91 /// }
92 /// # Ok(())
93 /// # }
94 /// ```
95 pub async fn step<T, E, F, Fut>(
96 &mut self,
97 name: &str,
98 f: F,
99 ) -> Result<Result<T, E>, DurableError>
100 where
101 T: Serialize + DeserializeOwned + Send + 'static,
102 E: Serialize + DeserializeOwned + Send + 'static,
103 F: FnOnce() -> Fut + Send + 'static,
104 Fut: Future<Output = Result<T, E>> + Send + 'static,
105 {
106 self.inner.step(name, f).await
107 }
108
109 /// Execute a named step with checkpointing and retry configuration.
110 ///
111 /// If the closure fails and retries are configured, sends a RETRY checkpoint
112 /// and returns [`DurableError::StepRetryScheduled`] to signal the function
113 /// should exit.
114 ///
115 /// # Arguments
116 ///
117 /// * `name` — Human-readable step name, used as checkpoint metadata
118 /// * `options` — Retry configuration (see [`StepOptions`])
119 /// * `f` — Closure to execute (skipped during replay)
120 ///
121 /// # Errors
122 ///
123 /// Returns [`DurableError::StepRetryScheduled`] when a retry has been scheduled.
124 /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
125 /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
126 ///
127 /// # Examples
128 ///
129 /// ```no_run
130 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
131 /// use durable_lambda_trait::prelude::*;
132 ///
133 /// let result: Result<i32, String> = ctx.step_with_options(
134 /// "charge_payment",
135 /// StepOptions::new().retries(3).backoff_seconds(5),
136 /// || async { Ok(100) },
137 /// ).await?;
138 /// # Ok(())
139 /// # }
140 /// ```
141 pub async fn step_with_options<T, E, F, Fut>(
142 &mut self,
143 name: &str,
144 options: StepOptions,
145 f: F,
146 ) -> Result<Result<T, E>, DurableError>
147 where
148 T: Serialize + DeserializeOwned + Send + 'static,
149 E: Serialize + DeserializeOwned + Send + 'static,
150 F: FnOnce() -> Fut + Send + 'static,
151 Fut: Future<Output = Result<T, E>> + Send + 'static,
152 {
153 self.inner.step_with_options(name, options, f).await
154 }
155
156 /// Suspend execution for the specified duration.
157 ///
158 /// During execution mode, sends a START checkpoint and returns
159 /// [`DurableError::WaitSuspended`] to signal the function should exit.
160 /// The server re-invokes after the duration.
161 ///
162 /// During replay mode, returns `Ok(())` immediately if the wait has
163 /// already completed.
164 ///
165 /// # Arguments
166 ///
167 /// * `name` — Human-readable name for the wait operation
168 /// * `duration_secs` — Duration to wait in seconds
169 ///
170 /// # Errors
171 ///
172 /// Returns [`DurableError::WaitSuspended`] when the wait has been checkpointed.
173 ///
174 /// # Examples
175 ///
176 /// ```no_run
177 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
178 /// ctx.wait("cooldown", 30).await?;
179 /// println!("Wait completed!");
180 /// # Ok(())
181 /// # }
182 /// ```
183 pub async fn wait(&mut self, name: &str, duration_secs: i32) -> Result<(), DurableError> {
184 self.inner.wait(name, duration_secs).await
185 }
186
187 /// Register a callback and return a handle with the server-generated callback ID.
188 ///
189 /// During execution mode, sends a START checkpoint and returns a
190 /// [`CallbackHandle`] containing the `callback_id` for external systems.
191 /// During replay mode, extracts the cached callback_id from history.
192 ///
193 /// This method NEVER suspends. Use [`callback_result`](Self::callback_result)
194 /// to check the callback outcome (which suspends if not yet signaled).
195 ///
196 /// # Arguments
197 ///
198 /// * `name` — Human-readable name for the callback operation
199 /// * `options` — Timeout configuration (see [`CallbackOptions`])
200 ///
201 /// # Errors
202 ///
203 /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
204 ///
205 /// # Examples
206 ///
207 /// ```no_run
208 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
209 /// use durable_lambda_trait::prelude::*;
210 ///
211 /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
212 /// println!("Callback ID: {}", handle.callback_id);
213 /// # Ok(())
214 /// # }
215 /// ```
216 pub async fn create_callback(
217 &mut self,
218 name: &str,
219 options: CallbackOptions,
220 ) -> Result<CallbackHandle, DurableError> {
221 self.inner.create_callback(name, options).await
222 }
223
224 /// Check the result of a previously created callback.
225 ///
226 /// Return the deserialized success payload if the callback has been
227 /// signaled. Return an error if the callback failed, timed out, or
228 /// hasn't been signaled yet.
229 ///
230 /// # Arguments
231 ///
232 /// * `handle` — The [`CallbackHandle`] returned by [`create_callback`](Self::create_callback)
233 ///
234 /// # Errors
235 ///
236 /// Returns [`DurableError::CallbackSuspended`] if not yet signaled.
237 /// Returns [`DurableError::CallbackFailed`] if the callback failed or timed out.
238 ///
239 /// # Examples
240 ///
241 /// ```no_run
242 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
243 /// use durable_lambda_trait::prelude::*;
244 ///
245 /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
246 /// let result: String = ctx.callback_result(&handle)?;
247 /// # Ok(())
248 /// # }
249 /// ```
250 pub fn callback_result<T: DeserializeOwned>(
251 &self,
252 handle: &CallbackHandle,
253 ) -> Result<T, DurableError> {
254 self.inner.callback_result(handle)
255 }
256
257 /// Durably invoke another Lambda function and return its result.
258 ///
259 /// During execution mode, serializes the payload, sends a START checkpoint,
260 /// and returns [`DurableError::InvokeSuspended`] to signal exit. The server
261 /// invokes the target asynchronously and re-invokes this Lambda when done.
262 ///
263 /// During replay, returns the cached result without re-invoking.
264 ///
265 /// # Arguments
266 ///
267 /// * `name` — Human-readable name for the invoke operation
268 /// * `function_name` — Name or ARN of the target Lambda function
269 /// * `payload` — Input payload to send to the target function
270 ///
271 /// # Errors
272 ///
273 /// Returns [`DurableError::InvokeSuspended`] when the target is still executing.
274 /// Returns [`DurableError::InvokeFailed`] if the target failed or timed out.
275 ///
276 /// # Examples
277 ///
278 /// ```no_run
279 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
280 /// let result: String = ctx.invoke(
281 /// "call_processor",
282 /// "payment-processor-lambda",
283 /// &serde_json::json!({"order_id": 123}),
284 /// ).await?;
285 /// # Ok(())
286 /// # }
287 /// ```
288 pub async fn invoke<T, P>(
289 &mut self,
290 name: &str,
291 function_name: &str,
292 payload: &P,
293 ) -> Result<T, DurableError>
294 where
295 T: DeserializeOwned,
296 P: Serialize,
297 {
298 self.inner.invoke(name, function_name, payload).await
299 }
300
301 /// Execute multiple branches concurrently and return their results.
302 ///
303 /// Each branch receives an owned child context with an isolated checkpoint
304 /// namespace. Branches satisfy `Send + 'static` via `tokio::spawn`.
305 ///
306 /// # Arguments
307 ///
308 /// * `name` — Human-readable name for the parallel operation
309 /// * `branches` — Collection of branch closures
310 /// * `options` — Parallel configuration
311 ///
312 /// # Errors
313 ///
314 /// Returns [`DurableError::ParallelFailed`] if the operation fails.
315 ///
316 /// # Examples
317 ///
318 /// ```no_run
319 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
320 /// use durable_lambda_trait::prelude::*;
321 /// use durable_lambda_core::context::DurableContext;
322 /// use std::pin::Pin;
323 /// use std::future::Future;
324 ///
325 /// type BranchFn = Box<dyn FnOnce(DurableContext) -> Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>> + Send>;
326 ///
327 /// let branches: Vec<BranchFn> = vec![
328 /// Box::new(|_ctx| Box::pin(async move { Ok(1) })),
329 /// Box::new(|_ctx| Box::pin(async move { Ok(2) })),
330 /// ];
331 /// let result = ctx.parallel("fan_out", branches, ParallelOptions::new()).await?;
332 /// # Ok(())
333 /// # }
334 /// ```
335 pub async fn parallel<T, F, Fut>(
336 &mut self,
337 name: &str,
338 branches: Vec<F>,
339 options: ParallelOptions,
340 ) -> Result<BatchResult<T>, DurableError>
341 where
342 T: Serialize + DeserializeOwned + Send + 'static,
343 F: FnOnce(DurableContext) -> Fut + Send + 'static,
344 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
345 {
346 self.inner.parallel(name, branches, options).await
347 }
348
349 /// Execute an isolated subflow with its own checkpoint namespace.
350 ///
351 /// The closure receives an owned child [`DurableContext`] whose operations
352 /// are namespaced under this child context's operation ID, preventing
353 /// collisions with the parent or sibling child contexts.
354 ///
355 /// During replay mode, returns the cached result without re-executing
356 /// the closure.
357 ///
358 /// # Arguments
359 ///
360 /// * `name` — Human-readable name for the child context operation
361 /// * `f` — Closure receiving an owned `DurableContext` for the subflow
362 ///
363 /// # Errors
364 ///
365 /// Returns [`DurableError::ChildContextFailed`] if the child context
366 /// is found in a failed state during replay.
367 /// Returns [`DurableError::CheckpointFailed`] if checkpoint API calls fail.
368 ///
369 /// # Examples
370 ///
371 /// ```no_run
372 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
373 /// use durable_lambda_core::context::DurableContext;
374 ///
375 /// let result: i32 = ctx.child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
376 /// let r: Result<i32, String> = child_ctx.step("inner_step", || async { Ok(42) }).await?;
377 /// Ok(r.unwrap())
378 /// }).await?;
379 /// assert_eq!(result, 42);
380 /// # Ok(())
381 /// # }
382 /// ```
383 pub async fn child_context<T, F, Fut>(&mut self, name: &str, f: F) -> Result<T, DurableError>
384 where
385 T: Serialize + DeserializeOwned + Send,
386 F: FnOnce(DurableContext) -> Fut + Send,
387 Fut: Future<Output = Result<T, DurableError>> + Send,
388 {
389 self.inner.child_context(name, f).await
390 }
391
392 /// Process a collection of items in parallel and return their results.
393 ///
394 /// Apply the closure `f` to each item concurrently. Each item receives an
395 /// owned child context with an isolated checkpoint namespace. Items satisfy
396 /// `Send + 'static` via `tokio::spawn`. The closure must be `Clone` since
397 /// it is applied to each item independently.
398 ///
399 /// When `batch_size` is configured, items process in sequential batches.
400 ///
401 /// # Arguments
402 ///
403 /// * `name` — Human-readable name for the map operation
404 /// * `items` — Collection of items to process
405 /// * `options` — Map configuration (batching)
406 /// * `f` — Closure applied to each item with an owned child context
407 ///
408 /// # Errors
409 ///
410 /// Returns [`DurableError::MapFailed`] if the operation fails.
411 ///
412 /// # Examples
413 ///
414 /// ```no_run
415 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
416 /// use durable_lambda_trait::prelude::*;
417 /// use durable_lambda_core::context::DurableContext;
418 ///
419 /// let items = vec![1, 2, 3];
420 /// let result = ctx.map(
421 /// "process_items",
422 /// items,
423 /// MapOptions::new().batch_size(2),
424 /// |item: i32, mut child_ctx: DurableContext| async move {
425 /// let r: Result<i32, String> = child_ctx.step("double", move || async move { Ok(item * 2) }).await?;
426 /// Ok(r.unwrap())
427 /// },
428 /// ).await?;
429 /// # Ok(())
430 /// # }
431 /// ```
432 pub async fn map<T, I, F, Fut>(
433 &mut self,
434 name: &str,
435 items: Vec<I>,
436 options: MapOptions,
437 f: F,
438 ) -> Result<BatchResult<T>, DurableError>
439 where
440 T: Serialize + DeserializeOwned + Send + 'static,
441 I: Send + 'static,
442 F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
443 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
444 {
445 self.inner.map(name, items, options, f).await
446 }
447
448 /// Register a compensatable step.
449 ///
450 /// Executes the forward step and, on success, registers the compensation
451 /// closure for later rollback via [`run_compensations`](Self::run_compensations).
452 ///
453 /// # Examples
454 ///
455 /// ```no_run
456 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
457 /// let result: Result<i32, String> = ctx.step_with_compensation(
458 /// "charge",
459 /// || async { Ok(100) },
460 /// |amount| async move { println!("Refunding {amount}"); Ok(()) },
461 /// ).await?;
462 /// # Ok(())
463 /// # }
464 /// ```
465 pub async fn step_with_compensation<T, E, F, Fut, G, GFut>(
466 &mut self,
467 name: &str,
468 forward_fn: F,
469 compensate_fn: G,
470 ) -> Result<Result<T, E>, DurableError>
471 where
472 T: Serialize + DeserializeOwned + Send + 'static,
473 E: Serialize + DeserializeOwned + Send + 'static,
474 F: FnOnce() -> Fut + Send + 'static,
475 Fut: Future<Output = Result<T, E>> + Send + 'static,
476 G: FnOnce(T) -> GFut + Send + 'static,
477 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
478 {
479 self.inner
480 .step_with_compensation(name, forward_fn, compensate_fn)
481 .await
482 }
483
484 /// Register a compensatable step with options.
485 ///
486 /// Like [`step_with_compensation`](Self::step_with_compensation) but accepts
487 /// [`StepOptions`] for configuring retries, backoff, and timeouts on the forward step.
488 ///
489 /// # Examples
490 ///
491 /// ```no_run
492 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
493 /// use durable_lambda_trait::prelude::*;
494 ///
495 /// let result: Result<String, String> = ctx.step_with_compensation_opts(
496 /// "book_hotel",
497 /// StepOptions::new().retries(3),
498 /// || async { Ok("BOOKING-123".to_string()) },
499 /// |booking_id| async move { println!("Cancelling: {booking_id}"); Ok(()) },
500 /// ).await?;
501 /// # Ok(())
502 /// # }
503 /// ```
504 pub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
505 &mut self,
506 name: &str,
507 options: StepOptions,
508 forward_fn: F,
509 compensate_fn: G,
510 ) -> Result<Result<T, E>, DurableError>
511 where
512 T: Serialize + DeserializeOwned + Send + 'static,
513 E: Serialize + DeserializeOwned + Send + 'static,
514 F: FnOnce() -> Fut + Send + 'static,
515 Fut: Future<Output = Result<T, E>> + Send + 'static,
516 G: FnOnce(T) -> GFut + Send + 'static,
517 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
518 {
519 self.inner
520 .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
521 .await
522 }
523
524 /// Execute all registered compensations in reverse registration order.
525 ///
526 /// # Examples
527 ///
528 /// ```no_run
529 /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
530 /// let result = ctx.run_compensations().await?;
531 /// if !result.all_succeeded {
532 /// eprintln!("Some compensations failed");
533 /// }
534 /// # Ok(())
535 /// # }
536 /// ```
537 pub async fn run_compensations(&mut self) -> Result<CompensationResult, DurableError> {
538 self.inner.run_compensations().await
539 }
540
541 /// Return the current execution mode (Replaying or Executing).
542 ///
543 /// # Examples
544 ///
545 /// ```no_run
546 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
547 /// use durable_lambda_core::types::ExecutionMode;
548 /// match ctx.execution_mode() {
549 /// ExecutionMode::Replaying => { /* returning cached results */ }
550 /// ExecutionMode::Executing => { /* running new operations */ }
551 /// }
552 /// # }
553 /// ```
554 pub fn execution_mode(&self) -> ExecutionMode {
555 self.inner.execution_mode()
556 }
557
558 /// Return whether the context is currently replaying from history.
559 ///
560 /// # Examples
561 ///
562 /// ```no_run
563 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
564 /// if ctx.is_replaying() {
565 /// println!("Replaying cached operations");
566 /// }
567 /// # }
568 /// ```
569 pub fn is_replaying(&self) -> bool {
570 self.inner.is_replaying()
571 }
572
573 /// Return a reference to the durable execution ARN.
574 ///
575 /// # Examples
576 ///
577 /// ```no_run
578 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
579 /// println!("Execution ARN: {}", ctx.arn());
580 /// # }
581 /// ```
582 pub fn arn(&self) -> &str {
583 self.inner.arn()
584 }
585
586 /// Return the current checkpoint token.
587 ///
588 /// # Examples
589 ///
590 /// ```no_run
591 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
592 /// let token = ctx.checkpoint_token();
593 /// # }
594 /// ```
595 pub fn checkpoint_token(&self) -> &str {
596 self.inner.checkpoint_token()
597 }
598
599 /// Emit a replay-safe info-level log message.
600 ///
601 /// During execution mode, emits via `tracing::info!` with execution
602 /// context enrichment. During replay mode, the call is a no-op.
603 ///
604 /// # Examples
605 ///
606 /// ```no_run
607 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
608 /// ctx.log("Order processing started");
609 /// # }
610 /// ```
611 pub fn log(&self, message: &str) {
612 self.inner.log(message);
613 }
614
615 /// Emit a replay-safe info-level log message with structured data.
616 ///
617 /// # Examples
618 ///
619 /// ```no_run
620 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
621 /// ctx.log_with_data("Order processed", &serde_json::json!({"order_id": 42}));
622 /// # }
623 /// ```
624 pub fn log_with_data(&self, message: &str, data: &serde_json::Value) {
625 self.inner.log_with_data(message, data);
626 }
627
628 /// Emit a replay-safe debug-level log message.
629 ///
630 /// # Examples
631 ///
632 /// ```no_run
633 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
634 /// ctx.log_debug("Validating order fields");
635 /// # }
636 /// ```
637 pub fn log_debug(&self, message: &str) {
638 self.inner.log_debug(message);
639 }
640
641 /// Emit a replay-safe warn-level log message.
642 ///
643 /// # Examples
644 ///
645 /// ```no_run
646 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
647 /// ctx.log_warn("Inventory below threshold");
648 /// # }
649 /// ```
650 pub fn log_warn(&self, message: &str) {
651 self.inner.log_warn(message);
652 }
653
654 /// Emit a replay-safe error-level log message.
655 ///
656 /// # Examples
657 ///
658 /// ```no_run
659 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
660 /// ctx.log_error("Payment gateway timeout");
661 /// # }
662 /// ```
663 pub fn log_error(&self, message: &str) {
664 self.inner.log_error(message);
665 }
666
667 /// Emit a replay-safe debug-level log message with structured data.
668 ///
669 /// # Examples
670 ///
671 /// ```no_run
672 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
673 /// ctx.log_debug_with_data("Request details", &serde_json::json!({"method": "POST"}));
674 /// # }
675 /// ```
676 pub fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
677 self.inner.log_debug_with_data(message, data);
678 }
679
680 /// Emit a replay-safe warn-level log message with structured data.
681 ///
682 /// # Examples
683 ///
684 /// ```no_run
685 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
686 /// ctx.log_warn_with_data("Retry attempt", &serde_json::json!({"attempt": 3}));
687 /// # }
688 /// ```
689 pub fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
690 self.inner.log_warn_with_data(message, data);
691 }
692
693 /// Emit a replay-safe error-level log message with structured data.
694 ///
695 /// # Examples
696 ///
697 /// ```no_run
698 /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
699 /// ctx.log_error_with_data("Payment failed", &serde_json::json!({"error": "timeout"}));
700 /// # }
701 /// ```
702 pub fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
703 self.inner.log_error_with_data(message, data);
704 }
705}
706
707impl DurableContextOps for TraitContext {
708 fn step<T, E, F, Fut>(
709 &mut self,
710 name: &str,
711 f: F,
712 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
713 where
714 T: Serialize + DeserializeOwned + Send + 'static,
715 E: Serialize + DeserializeOwned + Send + 'static,
716 F: FnOnce() -> Fut + Send + 'static,
717 Fut: Future<Output = Result<T, E>> + Send + 'static,
718 {
719 self.inner.step(name, f)
720 }
721
722 fn step_with_options<T, E, F, Fut>(
723 &mut self,
724 name: &str,
725 options: StepOptions,
726 f: F,
727 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
728 where
729 T: Serialize + DeserializeOwned + Send + 'static,
730 E: Serialize + DeserializeOwned + Send + 'static,
731 F: FnOnce() -> Fut + Send + 'static,
732 Fut: Future<Output = Result<T, E>> + Send + 'static,
733 {
734 self.inner.step_with_options(name, options, f)
735 }
736
737 fn wait(
738 &mut self,
739 name: &str,
740 duration_secs: i32,
741 ) -> impl Future<Output = Result<(), DurableError>> + Send {
742 self.inner.wait(name, duration_secs)
743 }
744
745 fn create_callback(
746 &mut self,
747 name: &str,
748 options: CallbackOptions,
749 ) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send {
750 self.inner.create_callback(name, options)
751 }
752
753 fn invoke<T, P>(
754 &mut self,
755 name: &str,
756 function_name: &str,
757 payload: &P,
758 ) -> impl Future<Output = Result<T, DurableError>> + Send
759 where
760 T: DeserializeOwned + Send,
761 P: Serialize + Sync,
762 {
763 self.inner.invoke(name, function_name, payload)
764 }
765
766 fn parallel<T, F, Fut>(
767 &mut self,
768 name: &str,
769 branches: Vec<F>,
770 options: ParallelOptions,
771 ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
772 where
773 T: Serialize + DeserializeOwned + Send + 'static,
774 F: FnOnce(DurableContext) -> Fut + Send + 'static,
775 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
776 {
777 self.inner.parallel(name, branches, options)
778 }
779
780 fn child_context<T, F, Fut>(
781 &mut self,
782 name: &str,
783 f: F,
784 ) -> impl Future<Output = Result<T, DurableError>> + Send
785 where
786 T: Serialize + DeserializeOwned + Send,
787 F: FnOnce(DurableContext) -> Fut + Send,
788 Fut: Future<Output = Result<T, DurableError>> + Send,
789 {
790 self.inner.child_context(name, f)
791 }
792
793 fn map<T, I, F, Fut>(
794 &mut self,
795 name: &str,
796 items: Vec<I>,
797 options: MapOptions,
798 f: F,
799 ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
800 where
801 T: Serialize + DeserializeOwned + Send + 'static,
802 I: Send + 'static,
803 F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
804 Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
805 {
806 self.inner.map(name, items, options, f)
807 }
808
809 fn step_with_compensation<T, E, F, Fut, G, GFut>(
810 &mut self,
811 name: &str,
812 forward_fn: F,
813 compensate_fn: G,
814 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
815 where
816 T: Serialize + DeserializeOwned + Send + 'static,
817 E: Serialize + DeserializeOwned + Send + 'static,
818 F: FnOnce() -> Fut + Send + 'static,
819 Fut: Future<Output = Result<T, E>> + Send + 'static,
820 G: FnOnce(T) -> GFut + Send + 'static,
821 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
822 {
823 self.inner
824 .step_with_compensation(name, forward_fn, compensate_fn)
825 }
826
827 fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
828 &mut self,
829 name: &str,
830 options: StepOptions,
831 forward_fn: F,
832 compensate_fn: G,
833 ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
834 where
835 T: Serialize + DeserializeOwned + Send + 'static,
836 E: Serialize + DeserializeOwned + Send + 'static,
837 F: FnOnce() -> Fut + Send + 'static,
838 Fut: Future<Output = Result<T, E>> + Send + 'static,
839 G: FnOnce(T) -> GFut + Send + 'static,
840 GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
841 {
842 self.inner
843 .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
844 }
845
846 fn run_compensations(
847 &mut self,
848 ) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send {
849 self.inner.run_compensations()
850 }
851
852 fn callback_result<T: DeserializeOwned>(
853 &self,
854 handle: &CallbackHandle,
855 ) -> Result<T, DurableError> {
856 self.inner.callback_result(handle)
857 }
858
859 fn execution_mode(&self) -> ExecutionMode {
860 self.inner.execution_mode()
861 }
862
863 fn is_replaying(&self) -> bool {
864 self.inner.is_replaying()
865 }
866
867 fn arn(&self) -> &str {
868 self.inner.arn()
869 }
870
871 fn checkpoint_token(&self) -> &str {
872 self.inner.checkpoint_token()
873 }
874
875 fn log(&self, message: &str) {
876 self.inner.log(message);
877 }
878
879 fn log_with_data(&self, message: &str, data: &serde_json::Value) {
880 self.inner.log_with_data(message, data);
881 }
882
883 fn log_debug(&self, message: &str) {
884 self.inner.log_debug(message);
885 }
886
887 fn log_warn(&self, message: &str) {
888 self.inner.log_warn(message);
889 }
890
891 fn log_error(&self, message: &str) {
892 self.inner.log_error(message);
893 }
894
895 fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
896 self.inner.log_debug_with_data(message, data);
897 }
898
899 fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
900 self.inner.log_warn_with_data(message, data);
901 }
902
903 fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
904 self.inner.log_error_with_data(message, data);
905 }
906
907 fn enable_batch_mode(&mut self) {
908 self.inner.enable_batch_mode();
909 }
910
911 fn flush_batch(&mut self) -> impl Future<Output = Result<(), DurableError>> + Send {
912 self.inner.flush_batch()
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use super::*;
919 use aws_sdk_lambda::operation::checkpoint_durable_execution::CheckpointDurableExecutionOutput;
920 use aws_sdk_lambda::operation::get_durable_execution_state::GetDurableExecutionStateOutput;
921 use aws_sdk_lambda::types::{
922 Operation, OperationStatus, OperationType, OperationUpdate, StepDetails,
923 };
924 use std::collections::HashMap;
925 use std::sync::{Arc, Mutex};
926
927 use crate::handler::DurableHandler;
928 use durable_lambda_core::backend::DurableBackend;
929
930 /// Mock backend for testing TraitContext delegation.
931 struct MockBackend {
932 operations: HashMap<String, Operation>,
933 checkpoint_token: Mutex<String>,
934 }
935
936 impl MockBackend {
937 fn new() -> Self {
938 Self {
939 operations: HashMap::new(),
940 checkpoint_token: Mutex::new("mock-token".to_string()),
941 }
942 }
943
944 fn with_operation(mut self, id: &str, operation: Operation) -> Self {
945 self.operations.insert(id.to_string(), operation);
946 self
947 }
948 }
949
950 #[async_trait::async_trait]
951 impl DurableBackend for MockBackend {
952 async fn checkpoint(
953 &self,
954 _arn: &str,
955 _checkpoint_token: &str,
956 _updates: Vec<OperationUpdate>,
957 _client_token: Option<&str>,
958 ) -> Result<CheckpointDurableExecutionOutput, DurableError> {
959 let token = self.checkpoint_token.lock().unwrap().clone();
960 Ok(CheckpointDurableExecutionOutput::builder()
961 .checkpoint_token(token)
962 .build())
963 }
964
965 async fn get_execution_state(
966 &self,
967 _arn: &str,
968 _checkpoint_token: &str,
969 _next_marker: &str,
970 _max_items: i32,
971 ) -> Result<GetDurableExecutionStateOutput, DurableError> {
972 Ok(GetDurableExecutionStateOutput::builder()
973 .build()
974 .expect("test: empty execution state"))
975 }
976 }
977
978 async fn make_trait_context(backend: MockBackend) -> TraitContext {
979 let ctx = DurableContext::new(
980 Arc::new(backend),
981 "arn:test".to_string(),
982 "tok".to_string(),
983 vec![],
984 None,
985 )
986 .await
987 .unwrap();
988 TraitContext::new(ctx)
989 }
990
991 fn make_succeeded_op(id: &str, result_json: &str) -> Operation {
992 Operation::builder()
993 .id(id)
994 .r#type(OperationType::Step)
995 .status(OperationStatus::Succeeded)
996 .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
997 .step_details(StepDetails::builder().result(result_json).build())
998 .build()
999 .unwrap_or_else(|e| panic!("failed to build test Operation: {e}"))
1000 }
1001
1002 // --- Task 4.1: Test DurableHandler trait compilation and callability ---
1003
1004 struct TestHandler;
1005
1006 #[async_trait::async_trait]
1007 impl crate::handler::DurableHandler for TestHandler {
1008 async fn handle(
1009 &self,
1010 _event: serde_json::Value,
1011 mut ctx: TraitContext,
1012 ) -> Result<serde_json::Value, DurableError> {
1013 let result: Result<i32, String> = ctx.step("test_step", || async { Ok(42) }).await?;
1014 Ok(serde_json::json!({"value": result.unwrap()}))
1015 }
1016 }
1017
1018 #[tokio::test]
1019 async fn test_durable_handler_trait_compiles_and_is_callable() {
1020 let ctx = make_trait_context(MockBackend::new()).await;
1021 let handler = TestHandler;
1022 let result = handler.handle(serde_json::json!({}), ctx).await.unwrap();
1023 assert_eq!(result, serde_json::json!({"value": 42}));
1024 }
1025
1026 // --- Task 4.2: Test TraitContext delegation ---
1027
1028 #[tokio::test]
1029 async fn test_trait_context_step_delegates_to_core() {
1030 let mut ctx = make_trait_context(MockBackend::new()).await;
1031
1032 let result: Result<i32, String> = ctx.step("validate", || async { Ok(42) }).await.unwrap();
1033
1034 assert_eq!(result, Ok(42));
1035 }
1036
1037 #[tokio::test]
1038 async fn test_trait_context_step_with_options_delegates_to_core() {
1039 let mut ctx = make_trait_context(MockBackend::new()).await;
1040
1041 let result: Result<i32, String> = ctx
1042 .step_with_options("charge", StepOptions::new().retries(3), || async {
1043 Ok(100)
1044 })
1045 .await
1046 .unwrap();
1047
1048 assert_eq!(result, Ok(100));
1049 }
1050
1051 #[tokio::test]
1052 async fn test_trait_context_execution_mode_executing() {
1053 let ctx = make_trait_context(MockBackend::new()).await;
1054 assert_eq!(ctx.execution_mode(), ExecutionMode::Executing);
1055 assert!(!ctx.is_replaying());
1056 }
1057
1058 #[tokio::test]
1059 async fn test_trait_context_execution_mode_replaying() {
1060 let op = make_succeeded_op("op-1", "42");
1061 let backend = MockBackend::new().with_operation("op-1", op.clone());
1062
1063 let durable_ctx = DurableContext::new(
1064 Arc::new(backend),
1065 "arn:test".to_string(),
1066 "tok".to_string(),
1067 vec![op],
1068 None,
1069 )
1070 .await
1071 .unwrap();
1072
1073 let ctx = TraitContext::new(durable_ctx);
1074 assert_eq!(ctx.execution_mode(), ExecutionMode::Replaying);
1075 assert!(ctx.is_replaying());
1076 }
1077
1078 #[tokio::test]
1079 async fn test_trait_context_arn() {
1080 let ctx = make_trait_context(MockBackend::new()).await;
1081 assert_eq!(ctx.arn(), "arn:test");
1082 }
1083
1084 #[tokio::test]
1085 async fn test_trait_context_checkpoint_token() {
1086 let ctx = make_trait_context(MockBackend::new()).await;
1087 assert_eq!(ctx.checkpoint_token(), "tok");
1088 }
1089
1090 #[tokio::test]
1091 async fn test_trait_context_child_context_delegates_to_core() {
1092 let mut ctx = make_trait_context(MockBackend::new()).await;
1093
1094 let result: i32 = ctx
1095 .child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
1096 let r: Result<i32, String> =
1097 child_ctx.step("inner_step", || async { Ok(42) }).await?;
1098 Ok(r.unwrap())
1099 })
1100 .await
1101 .unwrap();
1102
1103 assert_eq!(result, 42);
1104 }
1105
1106 #[tokio::test]
1107 async fn test_trait_context_log_delegates_to_core() {
1108 let ctx = make_trait_context(MockBackend::new()).await;
1109 // Verify log methods are callable and don't panic.
1110 ctx.log("test message");
1111 ctx.log_with_data("test data", &serde_json::json!({"key": "val"}));
1112 ctx.log_debug("test debug");
1113 ctx.log_warn("test warn");
1114 ctx.log_error("test error");
1115 ctx.log_debug_with_data("debug data", &serde_json::json!({"k": "v"}));
1116 ctx.log_warn_with_data("warn data", &serde_json::json!({"k": "v"}));
1117 ctx.log_error_with_data("error data", &serde_json::json!({"k": "v"}));
1118 }
1119
1120 // --- Task 4.3: Test run() function type signature ---
1121
1122 #[test]
1123 fn test_run_function_accepts_durable_handler_implementors() {
1124 // Verify the run() function signature compiles with a DurableHandler impl.
1125 // We cannot actually call run() since it starts the Lambda runtime,
1126 // but we can verify the type constraint is satisfied.
1127 fn assert_handler_accepted<H: crate::handler::DurableHandler>(_h: H) {}
1128 assert_handler_accepted(TestHandler);
1129 }
1130}