Skip to main content

durable_lambda_trait/
context.rs

1//! Trait-specific context wrapper.
2//!
3//! Thin wrapper over [`DurableContext`](durable_lambda_core::DurableContext)
4//! providing trait-approach ergonomics. All operations delegate directly
5//! to the inner context with no additional logic.
6
7use std::future::Future;
8
9use durable_lambda_core::context::DurableContext;
10use durable_lambda_core::error::DurableError;
11use durable_lambda_core::ops_trait::DurableContextOps;
12use durable_lambda_core::types::{
13    BatchResult, CallbackHandle, CallbackOptions, CompensationResult, ExecutionMode, MapOptions,
14    ParallelOptions, StepOptions,
15};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18
19/// Trait-native context for durable Lambda operations.
20///
21/// Thin wrapper over [`DurableContext`] providing the trait-approach API.
22/// All operations delegate directly to the inner context — no replay logic,
23/// no checkpoint logic, just delegation.
24///
25/// Constructed internally by [`run`](crate::run) — users never create this
26/// directly.
27///
28/// # Examples
29///
30/// ```no_run
31/// use durable_lambda_trait::prelude::*;
32/// use async_trait::async_trait;
33///
34/// struct MyHandler;
35///
36/// #[async_trait]
37/// impl DurableHandler for MyHandler {
38///     async fn handle(
39///         &self,
40///         event: serde_json::Value,
41///         mut ctx: TraitContext,
42///     ) -> Result<serde_json::Value, DurableError> {
43///         let result: Result<i32, String> = ctx.step("validate", || async {
44///             Ok(42)
45///         }).await?;
46///         Ok(serde_json::json!({"validated": result.unwrap()}))
47///     }
48/// }
49/// ```
50pub struct TraitContext {
51    inner: DurableContext,
52}
53
54impl TraitContext {
55    /// Create from an existing `DurableContext`.
56    ///
57    /// This constructor is `pub(crate)` — only the [`run`](crate::run)
58    /// function creates `TraitContext` instances.
59    pub(crate) fn new(ctx: DurableContext) -> Self {
60        Self { inner: ctx }
61    }
62
63    /// Execute a named step with checkpointing.
64    ///
65    /// During execution mode, runs the closure and checkpoints the result to AWS.
66    /// During replay mode, returns the previously checkpointed result without
67    /// executing the closure.
68    ///
69    /// # Arguments
70    ///
71    /// * `name` — Human-readable step name, used as checkpoint metadata
72    /// * `f` — Closure to execute (skipped during replay)
73    ///
74    /// # Errors
75    ///
76    /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
77    /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
78    /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
79    ///
80    /// # Examples
81    ///
82    /// ```no_run
83    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
84    /// let result: Result<i32, String> = ctx.step("validate_order", || async {
85    ///     Ok(42)
86    /// }).await?;
87    ///
88    /// match result {
89    ///     Ok(value) => println!("Step succeeded: {value}"),
90    ///     Err(e) => println!("Step failed: {e}"),
91    /// }
92    /// # Ok(())
93    /// # }
94    /// ```
95    pub async fn step<T, E, F, Fut>(
96        &mut self,
97        name: &str,
98        f: F,
99    ) -> Result<Result<T, E>, DurableError>
100    where
101        T: Serialize + DeserializeOwned + Send + 'static,
102        E: Serialize + DeserializeOwned + Send + 'static,
103        F: FnOnce() -> Fut + Send + 'static,
104        Fut: Future<Output = Result<T, E>> + Send + 'static,
105    {
106        self.inner.step(name, f).await
107    }
108
109    /// Execute a named step with checkpointing and retry configuration.
110    ///
111    /// If the closure fails and retries are configured, sends a RETRY checkpoint
112    /// and returns [`DurableError::StepRetryScheduled`] to signal the function
113    /// should exit.
114    ///
115    /// # Arguments
116    ///
117    /// * `name` — Human-readable step name, used as checkpoint metadata
118    /// * `options` — Retry configuration (see [`StepOptions`])
119    /// * `f` — Closure to execute (skipped during replay)
120    ///
121    /// # Errors
122    ///
123    /// Returns [`DurableError::StepRetryScheduled`] when a retry has been scheduled.
124    /// Returns [`DurableError::Serialization`] if the result cannot be serialized.
125    /// Returns [`DurableError::Deserialization`] if a cached result cannot be deserialized.
126    ///
127    /// # Examples
128    ///
129    /// ```no_run
130    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
131    /// use durable_lambda_trait::prelude::*;
132    ///
133    /// let result: Result<i32, String> = ctx.step_with_options(
134    ///     "charge_payment",
135    ///     StepOptions::new().retries(3).backoff_seconds(5),
136    ///     || async { Ok(100) },
137    /// ).await?;
138    /// # Ok(())
139    /// # }
140    /// ```
141    pub async fn step_with_options<T, E, F, Fut>(
142        &mut self,
143        name: &str,
144        options: StepOptions,
145        f: F,
146    ) -> Result<Result<T, E>, DurableError>
147    where
148        T: Serialize + DeserializeOwned + Send + 'static,
149        E: Serialize + DeserializeOwned + Send + 'static,
150        F: FnOnce() -> Fut + Send + 'static,
151        Fut: Future<Output = Result<T, E>> + Send + 'static,
152    {
153        self.inner.step_with_options(name, options, f).await
154    }
155
156    /// Suspend execution for the specified duration.
157    ///
158    /// During execution mode, sends a START checkpoint and returns
159    /// [`DurableError::WaitSuspended`] to signal the function should exit.
160    /// The server re-invokes after the duration.
161    ///
162    /// During replay mode, returns `Ok(())` immediately if the wait has
163    /// already completed.
164    ///
165    /// # Arguments
166    ///
167    /// * `name` — Human-readable name for the wait operation
168    /// * `duration_secs` — Duration to wait in seconds
169    ///
170    /// # Errors
171    ///
172    /// Returns [`DurableError::WaitSuspended`] when the wait has been checkpointed.
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
178    /// ctx.wait("cooldown", 30).await?;
179    /// println!("Wait completed!");
180    /// # Ok(())
181    /// # }
182    /// ```
183    pub async fn wait(&mut self, name: &str, duration_secs: i32) -> Result<(), DurableError> {
184        self.inner.wait(name, duration_secs).await
185    }
186
187    /// Register a callback and return a handle with the server-generated callback ID.
188    ///
189    /// During execution mode, sends a START checkpoint and returns a
190    /// [`CallbackHandle`] containing the `callback_id` for external systems.
191    /// During replay mode, extracts the cached callback_id from history.
192    ///
193    /// This method NEVER suspends. Use [`callback_result`](Self::callback_result)
194    /// to check the callback outcome (which suspends if not yet signaled).
195    ///
196    /// # Arguments
197    ///
198    /// * `name` — Human-readable name for the callback operation
199    /// * `options` — Timeout configuration (see [`CallbackOptions`])
200    ///
201    /// # Errors
202    ///
203    /// Returns [`DurableError::CheckpointFailed`] if the AWS checkpoint API call fails.
204    ///
205    /// # Examples
206    ///
207    /// ```no_run
208    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
209    /// use durable_lambda_trait::prelude::*;
210    ///
211    /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
212    /// println!("Callback ID: {}", handle.callback_id);
213    /// # Ok(())
214    /// # }
215    /// ```
216    pub async fn create_callback(
217        &mut self,
218        name: &str,
219        options: CallbackOptions,
220    ) -> Result<CallbackHandle, DurableError> {
221        self.inner.create_callback(name, options).await
222    }
223
224    /// Check the result of a previously created callback.
225    ///
226    /// Return the deserialized success payload if the callback has been
227    /// signaled. Return an error if the callback failed, timed out, or
228    /// hasn't been signaled yet.
229    ///
230    /// # Arguments
231    ///
232    /// * `handle` — The [`CallbackHandle`] returned by [`create_callback`](Self::create_callback)
233    ///
234    /// # Errors
235    ///
236    /// Returns [`DurableError::CallbackSuspended`] if not yet signaled.
237    /// Returns [`DurableError::CallbackFailed`] if the callback failed or timed out.
238    ///
239    /// # Examples
240    ///
241    /// ```no_run
242    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
243    /// use durable_lambda_trait::prelude::*;
244    ///
245    /// let handle = ctx.create_callback("approval", CallbackOptions::new()).await?;
246    /// let result: String = ctx.callback_result(&handle)?;
247    /// # Ok(())
248    /// # }
249    /// ```
250    pub fn callback_result<T: DeserializeOwned>(
251        &self,
252        handle: &CallbackHandle,
253    ) -> Result<T, DurableError> {
254        self.inner.callback_result(handle)
255    }
256
257    /// Durably invoke another Lambda function and return its result.
258    ///
259    /// During execution mode, serializes the payload, sends a START checkpoint,
260    /// and returns [`DurableError::InvokeSuspended`] to signal exit. The server
261    /// invokes the target asynchronously and re-invokes this Lambda when done.
262    ///
263    /// During replay, returns the cached result without re-invoking.
264    ///
265    /// # Arguments
266    ///
267    /// * `name` — Human-readable name for the invoke operation
268    /// * `function_name` — Name or ARN of the target Lambda function
269    /// * `payload` — Input payload to send to the target function
270    ///
271    /// # Errors
272    ///
273    /// Returns [`DurableError::InvokeSuspended`] when the target is still executing.
274    /// Returns [`DurableError::InvokeFailed`] if the target failed or timed out.
275    ///
276    /// # Examples
277    ///
278    /// ```no_run
279    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
280    /// let result: String = ctx.invoke(
281    ///     "call_processor",
282    ///     "payment-processor-lambda",
283    ///     &serde_json::json!({"order_id": 123}),
284    /// ).await?;
285    /// # Ok(())
286    /// # }
287    /// ```
288    pub async fn invoke<T, P>(
289        &mut self,
290        name: &str,
291        function_name: &str,
292        payload: &P,
293    ) -> Result<T, DurableError>
294    where
295        T: DeserializeOwned,
296        P: Serialize,
297    {
298        self.inner.invoke(name, function_name, payload).await
299    }
300
301    /// Execute multiple branches concurrently and return their results.
302    ///
303    /// Each branch receives an owned child context with an isolated checkpoint
304    /// namespace. Branches satisfy `Send + 'static` via `tokio::spawn`.
305    ///
306    /// # Arguments
307    ///
308    /// * `name` — Human-readable name for the parallel operation
309    /// * `branches` — Collection of branch closures
310    /// * `options` — Parallel configuration
311    ///
312    /// # Errors
313    ///
314    /// Returns [`DurableError::ParallelFailed`] if the operation fails.
315    ///
316    /// # Examples
317    ///
318    /// ```no_run
319    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
320    /// use durable_lambda_trait::prelude::*;
321    /// use durable_lambda_core::context::DurableContext;
322    /// use std::pin::Pin;
323    /// use std::future::Future;
324    ///
325    /// type BranchFn = Box<dyn FnOnce(DurableContext) -> Pin<Box<dyn Future<Output = Result<i32, DurableError>> + Send>> + Send>;
326    ///
327    /// let branches: Vec<BranchFn> = vec![
328    ///     Box::new(|_ctx| Box::pin(async move { Ok(1) })),
329    ///     Box::new(|_ctx| Box::pin(async move { Ok(2) })),
330    /// ];
331    /// let result = ctx.parallel("fan_out", branches, ParallelOptions::new()).await?;
332    /// # Ok(())
333    /// # }
334    /// ```
335    pub async fn parallel<T, F, Fut>(
336        &mut self,
337        name: &str,
338        branches: Vec<F>,
339        options: ParallelOptions,
340    ) -> Result<BatchResult<T>, DurableError>
341    where
342        T: Serialize + DeserializeOwned + Send + 'static,
343        F: FnOnce(DurableContext) -> Fut + Send + 'static,
344        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
345    {
346        self.inner.parallel(name, branches, options).await
347    }
348
349    /// Execute an isolated subflow with its own checkpoint namespace.
350    ///
351    /// The closure receives an owned child [`DurableContext`] whose operations
352    /// are namespaced under this child context's operation ID, preventing
353    /// collisions with the parent or sibling child contexts.
354    ///
355    /// During replay mode, returns the cached result without re-executing
356    /// the closure.
357    ///
358    /// # Arguments
359    ///
360    /// * `name` — Human-readable name for the child context operation
361    /// * `f` — Closure receiving an owned `DurableContext` for the subflow
362    ///
363    /// # Errors
364    ///
365    /// Returns [`DurableError::ChildContextFailed`] if the child context
366    /// is found in a failed state during replay.
367    /// Returns [`DurableError::CheckpointFailed`] if checkpoint API calls fail.
368    ///
369    /// # Examples
370    ///
371    /// ```no_run
372    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
373    /// use durable_lambda_core::context::DurableContext;
374    ///
375    /// let result: i32 = ctx.child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
376    ///     let r: Result<i32, String> = child_ctx.step("inner_step", || async { Ok(42) }).await?;
377    ///     Ok(r.unwrap())
378    /// }).await?;
379    /// assert_eq!(result, 42);
380    /// # Ok(())
381    /// # }
382    /// ```
383    pub async fn child_context<T, F, Fut>(&mut self, name: &str, f: F) -> Result<T, DurableError>
384    where
385        T: Serialize + DeserializeOwned + Send,
386        F: FnOnce(DurableContext) -> Fut + Send,
387        Fut: Future<Output = Result<T, DurableError>> + Send,
388    {
389        self.inner.child_context(name, f).await
390    }
391
392    /// Process a collection of items in parallel and return their results.
393    ///
394    /// Apply the closure `f` to each item concurrently. Each item receives an
395    /// owned child context with an isolated checkpoint namespace. Items satisfy
396    /// `Send + 'static` via `tokio::spawn`. The closure must be `Clone` since
397    /// it is applied to each item independently.
398    ///
399    /// When `batch_size` is configured, items process in sequential batches.
400    ///
401    /// # Arguments
402    ///
403    /// * `name` — Human-readable name for the map operation
404    /// * `items` — Collection of items to process
405    /// * `options` — Map configuration (batching)
406    /// * `f` — Closure applied to each item with an owned child context
407    ///
408    /// # Errors
409    ///
410    /// Returns [`DurableError::MapFailed`] if the operation fails.
411    ///
412    /// # Examples
413    ///
414    /// ```no_run
415    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
416    /// use durable_lambda_trait::prelude::*;
417    /// use durable_lambda_core::context::DurableContext;
418    ///
419    /// let items = vec![1, 2, 3];
420    /// let result = ctx.map(
421    ///     "process_items",
422    ///     items,
423    ///     MapOptions::new().batch_size(2),
424    ///     |item: i32, mut child_ctx: DurableContext| async move {
425    ///         let r: Result<i32, String> = child_ctx.step("double", move || async move { Ok(item * 2) }).await?;
426    ///         Ok(r.unwrap())
427    ///     },
428    /// ).await?;
429    /// # Ok(())
430    /// # }
431    /// ```
432    pub async fn map<T, I, F, Fut>(
433        &mut self,
434        name: &str,
435        items: Vec<I>,
436        options: MapOptions,
437        f: F,
438    ) -> Result<BatchResult<T>, DurableError>
439    where
440        T: Serialize + DeserializeOwned + Send + 'static,
441        I: Send + 'static,
442        F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
443        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
444    {
445        self.inner.map(name, items, options, f).await
446    }
447
448    /// Register a compensatable step.
449    ///
450    /// Executes the forward step and, on success, registers the compensation
451    /// closure for later rollback via [`run_compensations`](Self::run_compensations).
452    ///
453    /// # Examples
454    ///
455    /// ```no_run
456    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
457    /// let result: Result<i32, String> = ctx.step_with_compensation(
458    ///     "charge",
459    ///     || async { Ok(100) },
460    ///     |amount| async move { println!("Refunding {amount}"); Ok(()) },
461    /// ).await?;
462    /// # Ok(())
463    /// # }
464    /// ```
465    pub async fn step_with_compensation<T, E, F, Fut, G, GFut>(
466        &mut self,
467        name: &str,
468        forward_fn: F,
469        compensate_fn: G,
470    ) -> Result<Result<T, E>, DurableError>
471    where
472        T: Serialize + DeserializeOwned + Send + 'static,
473        E: Serialize + DeserializeOwned + Send + 'static,
474        F: FnOnce() -> Fut + Send + 'static,
475        Fut: Future<Output = Result<T, E>> + Send + 'static,
476        G: FnOnce(T) -> GFut + Send + 'static,
477        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
478    {
479        self.inner
480            .step_with_compensation(name, forward_fn, compensate_fn)
481            .await
482    }
483
484    /// Register a compensatable step with options.
485    ///
486    /// Like [`step_with_compensation`](Self::step_with_compensation) but accepts
487    /// [`StepOptions`] for configuring retries, backoff, and timeouts on the forward step.
488    ///
489    /// # Examples
490    ///
491    /// ```no_run
492    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
493    /// use durable_lambda_trait::prelude::*;
494    ///
495    /// let result: Result<String, String> = ctx.step_with_compensation_opts(
496    ///     "book_hotel",
497    ///     StepOptions::new().retries(3),
498    ///     || async { Ok("BOOKING-123".to_string()) },
499    ///     |booking_id| async move { println!("Cancelling: {booking_id}"); Ok(()) },
500    /// ).await?;
501    /// # Ok(())
502    /// # }
503    /// ```
504    pub async fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
505        &mut self,
506        name: &str,
507        options: StepOptions,
508        forward_fn: F,
509        compensate_fn: G,
510    ) -> Result<Result<T, E>, DurableError>
511    where
512        T: Serialize + DeserializeOwned + Send + 'static,
513        E: Serialize + DeserializeOwned + Send + 'static,
514        F: FnOnce() -> Fut + Send + 'static,
515        Fut: Future<Output = Result<T, E>> + Send + 'static,
516        G: FnOnce(T) -> GFut + Send + 'static,
517        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
518    {
519        self.inner
520            .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
521            .await
522    }
523
524    /// Execute all registered compensations in reverse registration order.
525    ///
526    /// # Examples
527    ///
528    /// ```no_run
529    /// # async fn example(ctx: &mut durable_lambda_trait::context::TraitContext) -> Result<(), durable_lambda_core::error::DurableError> {
530    /// let result = ctx.run_compensations().await?;
531    /// if !result.all_succeeded {
532    ///     eprintln!("Some compensations failed");
533    /// }
534    /// # Ok(())
535    /// # }
536    /// ```
537    pub async fn run_compensations(&mut self) -> Result<CompensationResult, DurableError> {
538        self.inner.run_compensations().await
539    }
540
541    /// Return the current execution mode (Replaying or Executing).
542    ///
543    /// # Examples
544    ///
545    /// ```no_run
546    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
547    /// use durable_lambda_core::types::ExecutionMode;
548    /// match ctx.execution_mode() {
549    ///     ExecutionMode::Replaying => { /* returning cached results */ }
550    ///     ExecutionMode::Executing => { /* running new operations */ }
551    /// }
552    /// # }
553    /// ```
554    pub fn execution_mode(&self) -> ExecutionMode {
555        self.inner.execution_mode()
556    }
557
558    /// Return whether the context is currently replaying from history.
559    ///
560    /// # Examples
561    ///
562    /// ```no_run
563    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
564    /// if ctx.is_replaying() {
565    ///     println!("Replaying cached operations");
566    /// }
567    /// # }
568    /// ```
569    pub fn is_replaying(&self) -> bool {
570        self.inner.is_replaying()
571    }
572
573    /// Return a reference to the durable execution ARN.
574    ///
575    /// # Examples
576    ///
577    /// ```no_run
578    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
579    /// println!("Execution ARN: {}", ctx.arn());
580    /// # }
581    /// ```
582    pub fn arn(&self) -> &str {
583        self.inner.arn()
584    }
585
586    /// Return the current checkpoint token.
587    ///
588    /// # Examples
589    ///
590    /// ```no_run
591    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
592    /// let token = ctx.checkpoint_token();
593    /// # }
594    /// ```
595    pub fn checkpoint_token(&self) -> &str {
596        self.inner.checkpoint_token()
597    }
598
599    /// Emit a replay-safe info-level log message.
600    ///
601    /// During execution mode, emits via `tracing::info!` with execution
602    /// context enrichment. During replay mode, the call is a no-op.
603    ///
604    /// # Examples
605    ///
606    /// ```no_run
607    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
608    /// ctx.log("Order processing started");
609    /// # }
610    /// ```
611    pub fn log(&self, message: &str) {
612        self.inner.log(message);
613    }
614
615    /// Emit a replay-safe info-level log message with structured data.
616    ///
617    /// # Examples
618    ///
619    /// ```no_run
620    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
621    /// ctx.log_with_data("Order processed", &serde_json::json!({"order_id": 42}));
622    /// # }
623    /// ```
624    pub fn log_with_data(&self, message: &str, data: &serde_json::Value) {
625        self.inner.log_with_data(message, data);
626    }
627
628    /// Emit a replay-safe debug-level log message.
629    ///
630    /// # Examples
631    ///
632    /// ```no_run
633    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
634    /// ctx.log_debug("Validating order fields");
635    /// # }
636    /// ```
637    pub fn log_debug(&self, message: &str) {
638        self.inner.log_debug(message);
639    }
640
641    /// Emit a replay-safe warn-level log message.
642    ///
643    /// # Examples
644    ///
645    /// ```no_run
646    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
647    /// ctx.log_warn("Inventory below threshold");
648    /// # }
649    /// ```
650    pub fn log_warn(&self, message: &str) {
651        self.inner.log_warn(message);
652    }
653
654    /// Emit a replay-safe error-level log message.
655    ///
656    /// # Examples
657    ///
658    /// ```no_run
659    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
660    /// ctx.log_error("Payment gateway timeout");
661    /// # }
662    /// ```
663    pub fn log_error(&self, message: &str) {
664        self.inner.log_error(message);
665    }
666
667    /// Emit a replay-safe debug-level log message with structured data.
668    ///
669    /// # Examples
670    ///
671    /// ```no_run
672    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
673    /// ctx.log_debug_with_data("Request details", &serde_json::json!({"method": "POST"}));
674    /// # }
675    /// ```
676    pub fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
677        self.inner.log_debug_with_data(message, data);
678    }
679
680    /// Emit a replay-safe warn-level log message with structured data.
681    ///
682    /// # Examples
683    ///
684    /// ```no_run
685    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
686    /// ctx.log_warn_with_data("Retry attempt", &serde_json::json!({"attempt": 3}));
687    /// # }
688    /// ```
689    pub fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
690        self.inner.log_warn_with_data(message, data);
691    }
692
693    /// Emit a replay-safe error-level log message with structured data.
694    ///
695    /// # Examples
696    ///
697    /// ```no_run
698    /// # async fn example(ctx: &durable_lambda_trait::context::TraitContext) {
699    /// ctx.log_error_with_data("Payment failed", &serde_json::json!({"error": "timeout"}));
700    /// # }
701    /// ```
702    pub fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
703        self.inner.log_error_with_data(message, data);
704    }
705}
706
707impl DurableContextOps for TraitContext {
708    fn step<T, E, F, Fut>(
709        &mut self,
710        name: &str,
711        f: F,
712    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
713    where
714        T: Serialize + DeserializeOwned + Send + 'static,
715        E: Serialize + DeserializeOwned + Send + 'static,
716        F: FnOnce() -> Fut + Send + 'static,
717        Fut: Future<Output = Result<T, E>> + Send + 'static,
718    {
719        self.inner.step(name, f)
720    }
721
722    fn step_with_options<T, E, F, Fut>(
723        &mut self,
724        name: &str,
725        options: StepOptions,
726        f: F,
727    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
728    where
729        T: Serialize + DeserializeOwned + Send + 'static,
730        E: Serialize + DeserializeOwned + Send + 'static,
731        F: FnOnce() -> Fut + Send + 'static,
732        Fut: Future<Output = Result<T, E>> + Send + 'static,
733    {
734        self.inner.step_with_options(name, options, f)
735    }
736
737    fn wait(
738        &mut self,
739        name: &str,
740        duration_secs: i32,
741    ) -> impl Future<Output = Result<(), DurableError>> + Send {
742        self.inner.wait(name, duration_secs)
743    }
744
745    fn create_callback(
746        &mut self,
747        name: &str,
748        options: CallbackOptions,
749    ) -> impl Future<Output = Result<CallbackHandle, DurableError>> + Send {
750        self.inner.create_callback(name, options)
751    }
752
753    fn invoke<T, P>(
754        &mut self,
755        name: &str,
756        function_name: &str,
757        payload: &P,
758    ) -> impl Future<Output = Result<T, DurableError>> + Send
759    where
760        T: DeserializeOwned + Send,
761        P: Serialize + Sync,
762    {
763        self.inner.invoke(name, function_name, payload)
764    }
765
766    fn parallel<T, F, Fut>(
767        &mut self,
768        name: &str,
769        branches: Vec<F>,
770        options: ParallelOptions,
771    ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
772    where
773        T: Serialize + DeserializeOwned + Send + 'static,
774        F: FnOnce(DurableContext) -> Fut + Send + 'static,
775        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
776    {
777        self.inner.parallel(name, branches, options)
778    }
779
780    fn child_context<T, F, Fut>(
781        &mut self,
782        name: &str,
783        f: F,
784    ) -> impl Future<Output = Result<T, DurableError>> + Send
785    where
786        T: Serialize + DeserializeOwned + Send,
787        F: FnOnce(DurableContext) -> Fut + Send,
788        Fut: Future<Output = Result<T, DurableError>> + Send,
789    {
790        self.inner.child_context(name, f)
791    }
792
793    fn map<T, I, F, Fut>(
794        &mut self,
795        name: &str,
796        items: Vec<I>,
797        options: MapOptions,
798        f: F,
799    ) -> impl Future<Output = Result<BatchResult<T>, DurableError>> + Send
800    where
801        T: Serialize + DeserializeOwned + Send + 'static,
802        I: Send + 'static,
803        F: FnOnce(I, DurableContext) -> Fut + Send + 'static + Clone,
804        Fut: Future<Output = Result<T, DurableError>> + Send + 'static,
805    {
806        self.inner.map(name, items, options, f)
807    }
808
809    fn step_with_compensation<T, E, F, Fut, G, GFut>(
810        &mut self,
811        name: &str,
812        forward_fn: F,
813        compensate_fn: G,
814    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
815    where
816        T: Serialize + DeserializeOwned + Send + 'static,
817        E: Serialize + DeserializeOwned + Send + 'static,
818        F: FnOnce() -> Fut + Send + 'static,
819        Fut: Future<Output = Result<T, E>> + Send + 'static,
820        G: FnOnce(T) -> GFut + Send + 'static,
821        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
822    {
823        self.inner
824            .step_with_compensation(name, forward_fn, compensate_fn)
825    }
826
827    fn step_with_compensation_opts<T, E, F, Fut, G, GFut>(
828        &mut self,
829        name: &str,
830        options: StepOptions,
831        forward_fn: F,
832        compensate_fn: G,
833    ) -> impl Future<Output = Result<Result<T, E>, DurableError>> + Send
834    where
835        T: Serialize + DeserializeOwned + Send + 'static,
836        E: Serialize + DeserializeOwned + Send + 'static,
837        F: FnOnce() -> Fut + Send + 'static,
838        Fut: Future<Output = Result<T, E>> + Send + 'static,
839        G: FnOnce(T) -> GFut + Send + 'static,
840        GFut: Future<Output = Result<(), DurableError>> + Send + 'static,
841    {
842        self.inner
843            .step_with_compensation_opts(name, options, forward_fn, compensate_fn)
844    }
845
846    fn run_compensations(
847        &mut self,
848    ) -> impl Future<Output = Result<CompensationResult, DurableError>> + Send {
849        self.inner.run_compensations()
850    }
851
852    fn callback_result<T: DeserializeOwned>(
853        &self,
854        handle: &CallbackHandle,
855    ) -> Result<T, DurableError> {
856        self.inner.callback_result(handle)
857    }
858
859    fn execution_mode(&self) -> ExecutionMode {
860        self.inner.execution_mode()
861    }
862
863    fn is_replaying(&self) -> bool {
864        self.inner.is_replaying()
865    }
866
867    fn arn(&self) -> &str {
868        self.inner.arn()
869    }
870
871    fn checkpoint_token(&self) -> &str {
872        self.inner.checkpoint_token()
873    }
874
875    fn log(&self, message: &str) {
876        self.inner.log(message);
877    }
878
879    fn log_with_data(&self, message: &str, data: &serde_json::Value) {
880        self.inner.log_with_data(message, data);
881    }
882
883    fn log_debug(&self, message: &str) {
884        self.inner.log_debug(message);
885    }
886
887    fn log_warn(&self, message: &str) {
888        self.inner.log_warn(message);
889    }
890
891    fn log_error(&self, message: &str) {
892        self.inner.log_error(message);
893    }
894
895    fn log_debug_with_data(&self, message: &str, data: &serde_json::Value) {
896        self.inner.log_debug_with_data(message, data);
897    }
898
899    fn log_warn_with_data(&self, message: &str, data: &serde_json::Value) {
900        self.inner.log_warn_with_data(message, data);
901    }
902
903    fn log_error_with_data(&self, message: &str, data: &serde_json::Value) {
904        self.inner.log_error_with_data(message, data);
905    }
906
907    fn enable_batch_mode(&mut self) {
908        self.inner.enable_batch_mode();
909    }
910
911    fn flush_batch(&mut self) -> impl Future<Output = Result<(), DurableError>> + Send {
912        self.inner.flush_batch()
913    }
914}
915
916#[cfg(test)]
917mod tests {
918    use super::*;
919    use aws_sdk_lambda::operation::checkpoint_durable_execution::CheckpointDurableExecutionOutput;
920    use aws_sdk_lambda::operation::get_durable_execution_state::GetDurableExecutionStateOutput;
921    use aws_sdk_lambda::types::{
922        Operation, OperationStatus, OperationType, OperationUpdate, StepDetails,
923    };
924    use std::collections::HashMap;
925    use std::sync::{Arc, Mutex};
926
927    use crate::handler::DurableHandler;
928    use durable_lambda_core::backend::DurableBackend;
929
930    /// Mock backend for testing TraitContext delegation.
931    struct MockBackend {
932        operations: HashMap<String, Operation>,
933        checkpoint_token: Mutex<String>,
934    }
935
936    impl MockBackend {
937        fn new() -> Self {
938            Self {
939                operations: HashMap::new(),
940                checkpoint_token: Mutex::new("mock-token".to_string()),
941            }
942        }
943
944        fn with_operation(mut self, id: &str, operation: Operation) -> Self {
945            self.operations.insert(id.to_string(), operation);
946            self
947        }
948    }
949
950    #[async_trait::async_trait]
951    impl DurableBackend for MockBackend {
952        async fn checkpoint(
953            &self,
954            _arn: &str,
955            _checkpoint_token: &str,
956            _updates: Vec<OperationUpdate>,
957            _client_token: Option<&str>,
958        ) -> Result<CheckpointDurableExecutionOutput, DurableError> {
959            let token = self.checkpoint_token.lock().unwrap().clone();
960            Ok(CheckpointDurableExecutionOutput::builder()
961                .checkpoint_token(token)
962                .build())
963        }
964
965        async fn get_execution_state(
966            &self,
967            _arn: &str,
968            _checkpoint_token: &str,
969            _next_marker: &str,
970            _max_items: i32,
971        ) -> Result<GetDurableExecutionStateOutput, DurableError> {
972            Ok(GetDurableExecutionStateOutput::builder()
973                .build()
974                .expect("test: empty execution state"))
975        }
976    }
977
978    async fn make_trait_context(backend: MockBackend) -> TraitContext {
979        let ctx = DurableContext::new(
980            Arc::new(backend),
981            "arn:test".to_string(),
982            "tok".to_string(),
983            vec![],
984            None,
985        )
986        .await
987        .unwrap();
988        TraitContext::new(ctx)
989    }
990
991    fn make_succeeded_op(id: &str, result_json: &str) -> Operation {
992        Operation::builder()
993            .id(id)
994            .r#type(OperationType::Step)
995            .status(OperationStatus::Succeeded)
996            .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
997            .step_details(StepDetails::builder().result(result_json).build())
998            .build()
999            .unwrap_or_else(|e| panic!("failed to build test Operation: {e}"))
1000    }
1001
1002    // --- Task 4.1: Test DurableHandler trait compilation and callability ---
1003
1004    struct TestHandler;
1005
1006    #[async_trait::async_trait]
1007    impl crate::handler::DurableHandler for TestHandler {
1008        async fn handle(
1009            &self,
1010            _event: serde_json::Value,
1011            mut ctx: TraitContext,
1012        ) -> Result<serde_json::Value, DurableError> {
1013            let result: Result<i32, String> = ctx.step("test_step", || async { Ok(42) }).await?;
1014            Ok(serde_json::json!({"value": result.unwrap()}))
1015        }
1016    }
1017
1018    #[tokio::test]
1019    async fn test_durable_handler_trait_compiles_and_is_callable() {
1020        let ctx = make_trait_context(MockBackend::new()).await;
1021        let handler = TestHandler;
1022        let result = handler.handle(serde_json::json!({}), ctx).await.unwrap();
1023        assert_eq!(result, serde_json::json!({"value": 42}));
1024    }
1025
1026    // --- Task 4.2: Test TraitContext delegation ---
1027
1028    #[tokio::test]
1029    async fn test_trait_context_step_delegates_to_core() {
1030        let mut ctx = make_trait_context(MockBackend::new()).await;
1031
1032        let result: Result<i32, String> = ctx.step("validate", || async { Ok(42) }).await.unwrap();
1033
1034        assert_eq!(result, Ok(42));
1035    }
1036
1037    #[tokio::test]
1038    async fn test_trait_context_step_with_options_delegates_to_core() {
1039        let mut ctx = make_trait_context(MockBackend::new()).await;
1040
1041        let result: Result<i32, String> = ctx
1042            .step_with_options("charge", StepOptions::new().retries(3), || async {
1043                Ok(100)
1044            })
1045            .await
1046            .unwrap();
1047
1048        assert_eq!(result, Ok(100));
1049    }
1050
1051    #[tokio::test]
1052    async fn test_trait_context_execution_mode_executing() {
1053        let ctx = make_trait_context(MockBackend::new()).await;
1054        assert_eq!(ctx.execution_mode(), ExecutionMode::Executing);
1055        assert!(!ctx.is_replaying());
1056    }
1057
1058    #[tokio::test]
1059    async fn test_trait_context_execution_mode_replaying() {
1060        let op = make_succeeded_op("op-1", "42");
1061        let backend = MockBackend::new().with_operation("op-1", op.clone());
1062
1063        let durable_ctx = DurableContext::new(
1064            Arc::new(backend),
1065            "arn:test".to_string(),
1066            "tok".to_string(),
1067            vec![op],
1068            None,
1069        )
1070        .await
1071        .unwrap();
1072
1073        let ctx = TraitContext::new(durable_ctx);
1074        assert_eq!(ctx.execution_mode(), ExecutionMode::Replaying);
1075        assert!(ctx.is_replaying());
1076    }
1077
1078    #[tokio::test]
1079    async fn test_trait_context_arn() {
1080        let ctx = make_trait_context(MockBackend::new()).await;
1081        assert_eq!(ctx.arn(), "arn:test");
1082    }
1083
1084    #[tokio::test]
1085    async fn test_trait_context_checkpoint_token() {
1086        let ctx = make_trait_context(MockBackend::new()).await;
1087        assert_eq!(ctx.checkpoint_token(), "tok");
1088    }
1089
1090    #[tokio::test]
1091    async fn test_trait_context_child_context_delegates_to_core() {
1092        let mut ctx = make_trait_context(MockBackend::new()).await;
1093
1094        let result: i32 = ctx
1095            .child_context("sub_workflow", |mut child_ctx: DurableContext| async move {
1096                let r: Result<i32, String> =
1097                    child_ctx.step("inner_step", || async { Ok(42) }).await?;
1098                Ok(r.unwrap())
1099            })
1100            .await
1101            .unwrap();
1102
1103        assert_eq!(result, 42);
1104    }
1105
1106    #[tokio::test]
1107    async fn test_trait_context_log_delegates_to_core() {
1108        let ctx = make_trait_context(MockBackend::new()).await;
1109        // Verify log methods are callable and don't panic.
1110        ctx.log("test message");
1111        ctx.log_with_data("test data", &serde_json::json!({"key": "val"}));
1112        ctx.log_debug("test debug");
1113        ctx.log_warn("test warn");
1114        ctx.log_error("test error");
1115        ctx.log_debug_with_data("debug data", &serde_json::json!({"k": "v"}));
1116        ctx.log_warn_with_data("warn data", &serde_json::json!({"k": "v"}));
1117        ctx.log_error_with_data("error data", &serde_json::json!({"k": "v"}));
1118    }
1119
1120    // --- Task 4.3: Test run() function type signature ---
1121
1122    #[test]
1123    fn test_run_function_accepts_durable_handler_implementors() {
1124        // Verify the run() function signature compiles with a DurableHandler impl.
1125        // We cannot actually call run() since it starts the Lambda runtime,
1126        // but we can verify the type constraint is satisfied.
1127        fn assert_handler_accepted<H: crate::handler::DurableHandler>(_h: H) {}
1128        assert_handler_accepted(TestHandler);
1129    }
1130}