Skip to main content

durable_execution_sdk/
context.rs

1//! DurableContext and operation identifier types for the AWS Durable Execution SDK.
2//!
3//! This module provides the main `DurableContext` interface that user code interacts with,
4//! as well as the `OperationIdentifier` type for deterministic operation ID generation.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use blake2::{Blake2b512, Digest};
10
11use crate::error::DurableResult;
12use crate::sealed::Sealed;
13use crate::state::ExecutionState;
14use crate::traits::{DurableValue, StepFn};
15use crate::types::OperationId;
16
17/// Identifies an operation within a durable execution.
18///
19/// Each operation has a unique ID that is deterministically generated
20/// based on the parent context and step counter. This ensures that
21/// the same operation gets the same ID across replays.
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct OperationIdentifier {
24    /// The unique operation ID (deterministically generated)
25    pub operation_id: String,
26    /// The parent operation ID (None for root operations)
27    pub parent_id: Option<String>,
28    /// Optional human-readable name for the operation
29    pub name: Option<String>,
30}
31
32impl OperationIdentifier {
33    /// Creates a new OperationIdentifier with the given values.
34    pub fn new(
35        operation_id: impl Into<String>,
36        parent_id: Option<String>,
37        name: Option<String>,
38    ) -> Self {
39        Self {
40            operation_id: operation_id.into(),
41            parent_id,
42            name,
43        }
44    }
45
46    /// Creates a root operation identifier (no parent).
47    pub fn root(operation_id: impl Into<String>) -> Self {
48        Self {
49            operation_id: operation_id.into(),
50            parent_id: None,
51            name: None,
52        }
53    }
54
55    /// Creates an operation identifier with a parent.
56    pub fn with_parent(operation_id: impl Into<String>, parent_id: impl Into<String>) -> Self {
57        Self {
58            operation_id: operation_id.into(),
59            parent_id: Some(parent_id.into()),
60            name: None,
61        }
62    }
63
64    /// Sets the name for this operation identifier.
65    pub fn with_name(mut self, name: impl Into<String>) -> Self {
66        self.name = Some(name.into());
67        self
68    }
69
70    /// Returns the operation ID as an `OperationId` newtype.
71    #[inline]
72    pub fn operation_id_typed(&self) -> OperationId {
73        OperationId::from(self.operation_id.clone())
74    }
75
76    /// Returns the parent ID as an `OperationId` newtype, if present.
77    #[inline]
78    pub fn parent_id_typed(&self) -> Option<OperationId> {
79        self.parent_id
80            .as_ref()
81            .map(|id| OperationId::from(id.clone()))
82    }
83}
84
85impl std::fmt::Display for OperationIdentifier {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        if let Some(ref name) = self.name {
88            write!(f, "{}({})", name, self.operation_id)
89        } else {
90            write!(f, "{}", self.operation_id)
91        }
92    }
93}
94
95/// Generates deterministic operation IDs using blake2b hashing.
96///
97/// The ID is generated by hashing the parent ID (or execution ARN for root)
98/// combined with the step counter value. This ensures:
99/// - Same inputs always produce the same ID
100/// - Different step counts produce different IDs
101/// - IDs are unique within an execution
102#[derive(Debug)]
103pub struct OperationIdGenerator {
104    /// The base identifier (execution ARN or parent operation ID)
105    base_id: String,
106    /// Thread-safe step counter
107    step_counter: AtomicU64,
108}
109
110impl OperationIdGenerator {
111    /// Creates a new OperationIdGenerator with the given base ID.
112    ///
113    /// # Arguments
114    ///
115    /// * `base_id` - The base identifier to use for hashing (typically execution ARN or parent ID)
116    pub fn new(base_id: impl Into<String>) -> Self {
117        Self {
118            base_id: base_id.into(),
119            step_counter: AtomicU64::new(0),
120        }
121    }
122
123    /// Creates a new OperationIdGenerator with a specific starting counter value.
124    ///
125    /// # Arguments
126    ///
127    /// * `base_id` - The base identifier to use for hashing
128    /// * `initial_counter` - The initial value for the step counter
129    pub fn with_counter(base_id: impl Into<String>, initial_counter: u64) -> Self {
130        Self {
131            base_id: base_id.into(),
132            step_counter: AtomicU64::new(initial_counter),
133        }
134    }
135
136    /// Generates the next operation ID.
137    ///
138    /// This method atomically increments the step counter and generates
139    /// a deterministic ID based on the base ID and counter value.
140    ///
141    /// # Returns
142    ///
143    /// A unique, deterministic operation ID string.
144    ///
145    /// # Memory Ordering
146    ///
147    /// Uses `Ordering::Relaxed` because the step counter only needs to provide
148    /// unique values - there's no synchronization requirement with other data.
149    /// Each call gets a unique counter value, and the hash function ensures
150    /// deterministic ID generation regardless of ordering between threads.
151    ///
152    /// Requirements: 4.1, 4.6
153    pub fn next_id(&self) -> String {
154        // Relaxed ordering is sufficient: we only need uniqueness, not synchronization.
155        // The counter value is combined with base_id in a hash, so the only requirement
156        // is that each call gets a different counter value, which fetch_add guarantees.
157        let counter = self.step_counter.fetch_add(1, Ordering::Relaxed);
158        generate_operation_id(&self.base_id, counter)
159    }
160
161    /// Generates an operation ID for a specific counter value without incrementing.
162    ///
163    /// This is useful for testing or when you need to generate an ID
164    /// for a known counter value.
165    ///
166    /// # Arguments
167    ///
168    /// * `counter` - The counter value to use for ID generation
169    ///
170    /// # Returns
171    ///
172    /// A deterministic operation ID string.
173    pub fn id_for_counter(&self, counter: u64) -> String {
174        generate_operation_id(&self.base_id, counter)
175    }
176
177    /// Returns the current counter value without incrementing.
178    ///
179    /// # Memory Ordering
180    ///
181    /// Uses `Ordering::Relaxed` because this is an informational read that doesn't
182    /// need to synchronize with other operations. The value may be slightly stale
183    /// in concurrent scenarios, but this is acceptable for debugging/monitoring use.
184    ///
185    /// Requirements: 4.1, 4.6
186    pub fn current_counter(&self) -> u64 {
187        // Relaxed ordering is sufficient: this is an informational read with no
188        // synchronization requirements. Callers should not rely on this value
189        // for correctness in concurrent scenarios.
190        self.step_counter.load(Ordering::Relaxed)
191    }
192
193    /// Returns the base ID used for generation.
194    pub fn base_id(&self) -> &str {
195        &self.base_id
196    }
197
198    /// Creates a child generator with a new base ID.
199    ///
200    /// The child generator starts with counter 0 and uses the provided
201    /// parent operation ID as its base.
202    ///
203    /// # Arguments
204    ///
205    /// * `parent_operation_id` - The operation ID to use as the base for the child
206    pub fn create_child(&self, parent_operation_id: impl Into<String>) -> Self {
207        Self::new(parent_operation_id)
208    }
209}
210
211impl Clone for OperationIdGenerator {
212    fn clone(&self) -> Self {
213        Self {
214            base_id: self.base_id.clone(),
215            // Relaxed ordering is sufficient: we're creating a snapshot of the counter
216            // for a new generator. The clone doesn't need to synchronize with the
217            // original - it just needs a reasonable starting point.
218            step_counter: AtomicU64::new(self.step_counter.load(Ordering::Relaxed)),
219        }
220    }
221}
222
223/// Generates a deterministic operation ID using blake2b hashing.
224///
225/// # Arguments
226///
227/// * `base_id` - The base identifier (execution ARN or parent operation ID)
228/// * `counter` - The step counter value
229///
230/// # Returns
231///
232/// A hex-encoded operation ID string (first 32 characters of the hash).
233pub fn generate_operation_id(base_id: &str, counter: u64) -> String {
234    let mut hasher = Blake2b512::new();
235    hasher.update(base_id.as_bytes());
236    hasher.update(counter.to_le_bytes());
237    let result = hasher.finalize();
238
239    // Take first 16 bytes (32 hex chars) for a reasonably short but unique ID
240    hex::encode(&result[..16])
241}
242
243/// Logger trait for structured logging in durable executions.
244///
245/// This trait is compatible with the `tracing` crate and allows
246/// custom logging implementations.
247///
248/// # Sealed Trait
249///
250/// This trait is sealed and cannot be implemented outside of this crate.
251/// This allows the SDK maintainers to evolve the logging interface without
252/// breaking external code. If you need custom logging behavior, use the
253/// provided factory functions or wrap one of the existing implementations.
254///
255/// # Requirements
256///
257/// - 3.1: THE SDK SHALL implement the sealed trait pattern for the `Logger` trait
258/// - 3.5: THE SDK SHALL document that these traits are sealed and cannot be implemented externally
259#[allow(private_bounds)]
260pub trait Logger: Sealed + Send + Sync {
261    /// Logs a debug message.
262    fn debug(&self, message: &str, info: &LogInfo);
263    /// Logs an info message.
264    fn info(&self, message: &str, info: &LogInfo);
265    /// Logs a warning message.
266    fn warn(&self, message: &str, info: &LogInfo);
267    /// Logs an error message.
268    fn error(&self, message: &str, info: &LogInfo);
269}
270
271/// Context information for log messages.
272///
273/// This struct provides context for log messages including execution ARN,
274/// operation IDs, and replay status. The `is_replay` flag indicates whether
275/// the current operation is being replayed from a checkpoint.
276///
277/// # Requirements
278///
279/// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
280#[derive(Debug, Clone, Default)]
281pub struct LogInfo {
282    /// The durable execution ARN
283    pub durable_execution_arn: Option<String>,
284    /// The current operation ID
285    pub operation_id: Option<String>,
286    /// The parent operation ID
287    pub parent_id: Option<String>,
288    /// Whether the current operation is being replayed from a checkpoint
289    ///
290    /// When `true`, the operation is returning a previously checkpointed result
291    /// without re-executing the operation's logic. Loggers can use this flag
292    /// to suppress or annotate logs during replay.
293    ///
294    /// # Requirements
295    ///
296    /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
297    pub is_replay: bool,
298    /// Additional key-value pairs
299    pub extra: Vec<(String, String)>,
300}
301
302impl LogInfo {
303    /// Creates a new LogInfo with the given execution ARN.
304    pub fn new(durable_execution_arn: impl Into<String>) -> Self {
305        Self {
306            durable_execution_arn: Some(durable_execution_arn.into()),
307            ..Default::default()
308        }
309    }
310
311    /// Sets the operation ID.
312    pub fn with_operation_id(mut self, operation_id: impl Into<String>) -> Self {
313        self.operation_id = Some(operation_id.into());
314        self
315    }
316
317    /// Sets the parent ID.
318    pub fn with_parent_id(mut self, parent_id: impl Into<String>) -> Self {
319        self.parent_id = Some(parent_id.into());
320        self
321    }
322
323    /// Sets the replay flag.
324    ///
325    /// When set to `true`, indicates that the current operation is being
326    /// replayed from a checkpoint rather than executing fresh.
327    ///
328    /// # Arguments
329    ///
330    /// * `is_replay` - Whether the operation is being replayed
331    ///
332    /// # Requirements
333    ///
334    /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
335    pub fn with_replay(mut self, is_replay: bool) -> Self {
336        self.is_replay = is_replay;
337        self
338    }
339
340    /// Adds an extra key-value pair.
341    pub fn with_extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
342        self.extra.push((key.into(), value.into()));
343        self
344    }
345}
346
347/// Creates a tracing span for a durable operation.
348///
349/// This function creates a structured tracing span that includes all relevant
350/// context for observability and debugging. The span can be used to trace
351/// execution flow and correlate logs across operations.
352///
353/// # Arguments
354///
355/// * `operation_type` - The type of operation (e.g., "step", "wait", "callback", "invoke", "map", "parallel")
356/// * `op_id` - The operation identifier containing operation_id, parent_id, and name
357/// * `durable_execution_arn` - The ARN of the durable execution
358///
359/// # Returns
360///
361/// A `tracing::Span` that can be entered during operation execution.
362///
363/// # Example
364///
365/// ```rust,ignore
366/// let span = create_operation_span("step", &op_id, state.durable_execution_arn());
367/// let _guard = span.enter();
368/// // ... operation execution ...
369/// ```
370///
371/// # Requirements
372///
373/// - 3.2: THE tracing span SHALL include the operation_id as a structured field
374/// - 3.3: THE tracing span SHALL include the operation_type as a structured field
375/// - 3.4: THE tracing span SHALL include the parent_id as a structured field when available
376/// - 3.5: THE tracing span SHALL include the durable_execution_arn as a structured field
377pub fn create_operation_span(
378    operation_type: &str,
379    op_id: &OperationIdentifier,
380    durable_execution_arn: &str,
381) -> tracing::Span {
382    tracing::info_span!(
383        "durable_operation",
384        operation_type = %operation_type,
385        operation_id = %op_id.operation_id,
386        parent_id = ?op_id.parent_id,
387        name = ?op_id.name,
388        durable_execution_arn = %durable_execution_arn,
389        status = tracing::field::Empty,
390    )
391}
392
393/// Default logger implementation using the `tracing` crate.
394///
395/// This logger includes the `is_replay` flag in all log messages to help
396/// distinguish between fresh executions and replayed operations.
397///
398/// Extra fields from `LogInfo` are included in the tracing output as a formatted
399/// string of key-value pairs, making them queryable in log aggregation systems.
400///
401/// # Requirements
402///
403/// - 5.1: WHEN LogInfo contains extra fields, THE TracingLogger SHALL include them in the tracing output
404/// - 5.2: THE extra fields SHALL be formatted as key-value pairs in the tracing event
405/// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
406#[derive(Debug, Clone, Default)]
407pub struct TracingLogger;
408
409// Implement Sealed for TracingLogger to allow it to implement Logger
410impl Sealed for TracingLogger {}
411
412impl TracingLogger {
413    /// Formats extra fields as a string of key-value pairs.
414    ///
415    /// Returns an empty string if there are no extra fields, otherwise returns
416    /// a comma-separated list of "key=value" pairs.
417    ///
418    /// # Requirements
419    ///
420    /// - 5.2: THE extra fields SHALL be formatted as key-value pairs in the tracing event
421    fn format_extra_fields(extra: &[(String, String)]) -> String {
422        if extra.is_empty() {
423            String::new()
424        } else {
425            extra
426                .iter()
427                .map(|(k, v)| format!("{}={}", k, v))
428                .collect::<Vec<_>>()
429                .join(", ")
430        }
431    }
432}
433
434impl Logger for TracingLogger {
435    fn debug(&self, message: &str, info: &LogInfo) {
436        let extra_fields = Self::format_extra_fields(&info.extra);
437        tracing::debug!(
438            durable_execution_arn = ?info.durable_execution_arn,
439            operation_id = ?info.operation_id,
440            parent_id = ?info.parent_id,
441            is_replay = info.is_replay,
442            extra = %extra_fields,
443            "{}",
444            message
445        );
446    }
447
448    fn info(&self, message: &str, info: &LogInfo) {
449        let extra_fields = Self::format_extra_fields(&info.extra);
450        tracing::info!(
451            durable_execution_arn = ?info.durable_execution_arn,
452            operation_id = ?info.operation_id,
453            parent_id = ?info.parent_id,
454            is_replay = info.is_replay,
455            extra = %extra_fields,
456            "{}",
457            message
458        );
459    }
460
461    fn warn(&self, message: &str, info: &LogInfo) {
462        let extra_fields = Self::format_extra_fields(&info.extra);
463        tracing::warn!(
464            durable_execution_arn = ?info.durable_execution_arn,
465            operation_id = ?info.operation_id,
466            parent_id = ?info.parent_id,
467            is_replay = info.is_replay,
468            extra = %extra_fields,
469            "{}",
470            message
471        );
472    }
473
474    fn error(&self, message: &str, info: &LogInfo) {
475        let extra_fields = Self::format_extra_fields(&info.extra);
476        tracing::error!(
477            durable_execution_arn = ?info.durable_execution_arn,
478            operation_id = ?info.operation_id,
479            parent_id = ?info.parent_id,
480            is_replay = info.is_replay,
481            extra = %extra_fields,
482            "{}",
483            message
484        );
485    }
486}
487
488/// Configuration for replay-aware logging behavior.
489///
490/// This configuration controls how the `ReplayAwareLogger` handles log messages
491/// during replay. Users can choose to suppress all logs during replay, allow
492/// only certain log levels, or log all messages regardless of replay status.
493///
494/// # Requirements
495///
496/// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
497/// - 16.7: THE Logging_System SHALL allow users to configure replay logging behavior
498#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
499pub enum ReplayLoggingConfig {
500    /// Suppress all logs during replay (default).
501    ///
502    /// When in replay mode, no log messages will be emitted. This is useful
503    /// to reduce noise in logs when replaying previously executed operations.
504    #[default]
505    SuppressAll,
506
507    /// Allow all logs during replay.
508    ///
509    /// Log messages will be emitted regardless of replay status. The `is_replay`
510    /// flag in `LogInfo` can still be used to distinguish replay logs.
511    AllowAll,
512
513    /// Allow only error logs during replay.
514    ///
515    /// Only error-level messages will be emitted during replay. This is useful
516    /// when you want to see errors but suppress informational messages.
517    ErrorsOnly,
518
519    /// Allow error and warning logs during replay.
520    ///
521    /// Error and warning-level messages will be emitted during replay.
522    /// Debug and info messages will be suppressed.
523    WarningsAndErrors,
524}
525
526/// A logger wrapper that can suppress logs during replay based on configuration.
527///
528/// `ReplayAwareLogger` wraps any `Logger` implementation and adds replay-aware
529/// behavior. When the `is_replay` flag in `LogInfo` is `true`, the logger will
530/// suppress or allow logs based on the configured `ReplayLoggingConfig`.
531///
532/// # Example
533///
534/// ```rust
535/// use durable_execution_sdk::{TracingLogger, ReplayAwareLogger, ReplayLoggingConfig};
536/// use std::sync::Arc;
537///
538/// // Create a replay-aware logger that suppresses all logs during replay
539/// let logger = ReplayAwareLogger::new(
540///     Arc::new(TracingLogger),
541///     ReplayLoggingConfig::SuppressAll,
542/// );
543///
544/// // Create a replay-aware logger that allows errors during replay
545/// let logger_with_errors = ReplayAwareLogger::new(
546///     Arc::new(TracingLogger),
547///     ReplayLoggingConfig::ErrorsOnly,
548/// );
549/// ```
550///
551/// # Requirements
552///
553/// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
554/// - 16.7: THE Logging_System SHALL allow users to configure replay logging behavior
555pub struct ReplayAwareLogger {
556    /// The underlying logger to delegate to
557    inner: Arc<dyn Logger>,
558    /// Configuration for replay logging behavior
559    config: ReplayLoggingConfig,
560}
561
562// Implement Sealed for ReplayAwareLogger to allow it to implement Logger
563impl Sealed for ReplayAwareLogger {}
564
565impl ReplayAwareLogger {
566    /// Creates a new `ReplayAwareLogger` with the specified configuration.
567    ///
568    /// # Arguments
569    ///
570    /// * `inner` - The underlying logger to delegate to
571    /// * `config` - Configuration for replay logging behavior
572    ///
573    /// # Example
574    ///
575    /// ```rust
576    /// use durable_execution_sdk::{TracingLogger, ReplayAwareLogger, ReplayLoggingConfig};
577    /// use std::sync::Arc;
578    ///
579    /// let logger = ReplayAwareLogger::new(
580    ///     Arc::new(TracingLogger),
581    ///     ReplayLoggingConfig::SuppressAll,
582    /// );
583    /// ```
584    pub fn new(inner: Arc<dyn Logger>, config: ReplayLoggingConfig) -> Self {
585        Self { inner, config }
586    }
587
588    /// Creates a new `ReplayAwareLogger` that suppresses all logs during replay.
589    ///
590    /// This is a convenience constructor for the most common use case.
591    ///
592    /// # Arguments
593    ///
594    /// * `inner` - The underlying logger to delegate to
595    pub fn suppress_replay(inner: Arc<dyn Logger>) -> Self {
596        Self::new(inner, ReplayLoggingConfig::SuppressAll)
597    }
598
599    /// Creates a new `ReplayAwareLogger` that allows all logs during replay.
600    ///
601    /// # Arguments
602    ///
603    /// * `inner` - The underlying logger to delegate to
604    pub fn allow_all(inner: Arc<dyn Logger>) -> Self {
605        Self::new(inner, ReplayLoggingConfig::AllowAll)
606    }
607
608    /// Returns the current replay logging configuration.
609    pub fn config(&self) -> ReplayLoggingConfig {
610        self.config
611    }
612
613    /// Returns a reference to the underlying logger.
614    pub fn inner(&self) -> &Arc<dyn Logger> {
615        &self.inner
616    }
617
618    /// Checks if a log at the given level should be suppressed during replay.
619    fn should_suppress(&self, info: &LogInfo, level: LogLevel) -> bool {
620        if !info.is_replay {
621            return false;
622        }
623
624        match self.config {
625            ReplayLoggingConfig::SuppressAll => true,
626            ReplayLoggingConfig::AllowAll => false,
627            ReplayLoggingConfig::ErrorsOnly => level != LogLevel::Error,
628            ReplayLoggingConfig::WarningsAndErrors => {
629                level != LogLevel::Error && level != LogLevel::Warn
630            }
631        }
632    }
633}
634
635/// Internal enum for log levels used by ReplayAwareLogger.
636#[derive(Debug, Clone, Copy, PartialEq, Eq)]
637enum LogLevel {
638    Debug,
639    Info,
640    Warn,
641    Error,
642}
643
644impl std::fmt::Debug for ReplayAwareLogger {
645    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
646        f.debug_struct("ReplayAwareLogger")
647            .field("config", &self.config)
648            .finish()
649    }
650}
651
652impl Logger for ReplayAwareLogger {
653    fn debug(&self, message: &str, info: &LogInfo) {
654        if !self.should_suppress(info, LogLevel::Debug) {
655            self.inner.debug(message, info);
656        }
657    }
658
659    fn info(&self, message: &str, info: &LogInfo) {
660        if !self.should_suppress(info, LogLevel::Info) {
661            self.inner.info(message, info);
662        }
663    }
664
665    fn warn(&self, message: &str, info: &LogInfo) {
666        if !self.should_suppress(info, LogLevel::Warn) {
667            self.inner.warn(message, info);
668        }
669    }
670
671    fn error(&self, message: &str, info: &LogInfo) {
672        if !self.should_suppress(info, LogLevel::Error) {
673            self.inner.error(message, info);
674        }
675    }
676}
677
678/// A custom logger that delegates to user-provided closures.
679///
680/// This struct allows users to provide custom logging behavior without
681/// implementing the sealed `Logger` trait directly. Each log level can
682/// have its own closure for handling log messages.
683///
684/// # Example
685///
686/// ```rust
687/// use durable_execution_sdk::context::{custom_logger, LogInfo};
688/// use std::sync::Arc;
689///
690/// // Create a custom logger that prints to stdout
691/// let logger = custom_logger(
692///     |msg, info| println!("[DEBUG] {}: {:?}", msg, info),
693///     |msg, info| println!("[INFO] {}: {:?}", msg, info),
694///     |msg, info| println!("[WARN] {}: {:?}", msg, info),
695///     |msg, info| println!("[ERROR] {}: {:?}", msg, info),
696/// );
697/// ```
698///
699/// # Requirements
700///
701/// - 3.6: THE SDK SHALL provide factory functions or builders for users who need custom behavior
702pub struct CustomLogger<D, I, W, E>
703where
704    D: Fn(&str, &LogInfo) + Send + Sync,
705    I: Fn(&str, &LogInfo) + Send + Sync,
706    W: Fn(&str, &LogInfo) + Send + Sync,
707    E: Fn(&str, &LogInfo) + Send + Sync,
708{
709    debug_fn: D,
710    info_fn: I,
711    warn_fn: W,
712    error_fn: E,
713}
714
715// Implement Sealed for CustomLogger to allow it to implement Logger
716impl<D, I, W, E> Sealed for CustomLogger<D, I, W, E>
717where
718    D: Fn(&str, &LogInfo) + Send + Sync,
719    I: Fn(&str, &LogInfo) + Send + Sync,
720    W: Fn(&str, &LogInfo) + Send + Sync,
721    E: Fn(&str, &LogInfo) + Send + Sync,
722{
723}
724
725impl<D, I, W, E> Logger for CustomLogger<D, I, W, E>
726where
727    D: Fn(&str, &LogInfo) + Send + Sync,
728    I: Fn(&str, &LogInfo) + Send + Sync,
729    W: Fn(&str, &LogInfo) + Send + Sync,
730    E: Fn(&str, &LogInfo) + Send + Sync,
731{
732    fn debug(&self, message: &str, info: &LogInfo) {
733        (self.debug_fn)(message, info);
734    }
735
736    fn info(&self, message: &str, info: &LogInfo) {
737        (self.info_fn)(message, info);
738    }
739
740    fn warn(&self, message: &str, info: &LogInfo) {
741        (self.warn_fn)(message, info);
742    }
743
744    fn error(&self, message: &str, info: &LogInfo) {
745        (self.error_fn)(message, info);
746    }
747}
748
749impl<D, I, W, E> std::fmt::Debug for CustomLogger<D, I, W, E>
750where
751    D: Fn(&str, &LogInfo) + Send + Sync,
752    I: Fn(&str, &LogInfo) + Send + Sync,
753    W: Fn(&str, &LogInfo) + Send + Sync,
754    E: Fn(&str, &LogInfo) + Send + Sync,
755{
756    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
757        f.debug_struct("CustomLogger").finish()
758    }
759}
760
761/// Creates a custom logger with user-provided closures for each log level.
762///
763/// This factory function allows users to create custom logging behavior without
764/// implementing the sealed `Logger` trait directly. Each log level has its own
765/// closure that receives the log message and context information.
766///
767/// # Arguments
768///
769/// * `debug_fn` - Closure called for debug-level messages
770/// * `info_fn` - Closure called for info-level messages
771/// * `warn_fn` - Closure called for warning-level messages
772/// * `error_fn` - Closure called for error-level messages
773///
774/// # Example
775///
776/// ```rust
777/// use durable_execution_sdk::context::{custom_logger, LogInfo};
778/// use std::sync::Arc;
779///
780/// // Create a custom logger that prints to stdout
781/// let logger = custom_logger(
782///     |msg, info| println!("[DEBUG] {}: {:?}", msg, info),
783///     |msg, info| println!("[INFO] {}: {:?}", msg, info),
784///     |msg, info| println!("[WARN] {}: {:?}", msg, info),
785///     |msg, info| println!("[ERROR] {}: {:?}", msg, info),
786/// );
787///
788/// // Use with Arc for sharing across contexts
789/// let shared_logger = Arc::new(logger);
790/// ```
791///
792/// # Requirements
793///
794/// - 3.6: THE SDK SHALL provide factory functions or builders for users who need custom behavior
795pub fn custom_logger<D, I, W, E>(
796    debug_fn: D,
797    info_fn: I,
798    warn_fn: W,
799    error_fn: E,
800) -> CustomLogger<D, I, W, E>
801where
802    D: Fn(&str, &LogInfo) + Send + Sync,
803    I: Fn(&str, &LogInfo) + Send + Sync,
804    W: Fn(&str, &LogInfo) + Send + Sync,
805    E: Fn(&str, &LogInfo) + Send + Sync,
806{
807    CustomLogger {
808        debug_fn,
809        info_fn,
810        warn_fn,
811        error_fn,
812    }
813}
814
815/// Creates a simple custom logger that uses a single closure for all log levels.
816///
817/// This is a convenience function for cases where you want the same handling
818/// for all log levels. The closure receives the log level as a string, the
819/// message, and the log info.
820///
821/// # Arguments
822///
823/// * `log_fn` - Closure called for all log messages. Receives (level, message, info).
824///
825/// # Example
826///
827/// ```rust
828/// use durable_execution_sdk::context::{simple_custom_logger, LogInfo};
829/// use std::sync::Arc;
830///
831/// // Create a simple logger that prints all messages with their level
832/// let logger = simple_custom_logger(|level, msg, info| {
833///     println!("[{}] {}: {:?}", level, msg, info);
834/// });
835///
836/// // Use with Arc for sharing across contexts
837/// let shared_logger = Arc::new(logger);
838/// ```
839///
840/// # Requirements
841///
842/// - 3.6: THE SDK SHALL provide factory functions or builders for users who need custom behavior
843pub fn simple_custom_logger<F>(log_fn: F) -> impl Logger
844where
845    F: Fn(&str, &str, &LogInfo) + Send + Sync + Clone + 'static,
846{
847    let debug_fn = log_fn.clone();
848    let info_fn = log_fn.clone();
849    let warn_fn = log_fn.clone();
850    let error_fn = log_fn;
851
852    custom_logger(
853        move |msg, info| debug_fn("DEBUG", msg, info),
854        move |msg, info| info_fn("INFO", msg, info),
855        move |msg, info| warn_fn("WARN", msg, info),
856        move |msg, info| error_fn("ERROR", msg, info),
857    )
858}
859
860/// The main context for durable execution operations.
861///
862/// `DurableContext` provides all the durable operations that user code
863/// can use to build reliable workflows. It handles checkpointing, replay,
864/// and state management automatically.
865///
866/// # Thread Safety
867///
868/// `DurableContext` is `Send + Sync` and can be safely shared across
869/// async tasks. The internal state uses appropriate synchronization
870/// primitives for concurrent access.
871///
872/// # Example
873///
874/// ```rust,ignore
875/// async fn my_workflow(ctx: DurableContext) -> Result<String, DurableError> {
876///     // Execute a step (automatically checkpointed)
877///     let result = ctx.step(|_| Ok("hello".to_string()), None).await?;
878///     
879///     // Wait for 5 seconds (suspends Lambda, resumes later)
880///     ctx.wait(Duration::from_seconds(5), None).await?;
881///     
882///     Ok(result)
883/// }
884/// ```
885pub struct DurableContext {
886    /// The execution state manager
887    state: Arc<ExecutionState>,
888    /// The Lambda context (if running in Lambda)
889    lambda_context: Option<lambda_runtime::Context>,
890    /// The parent operation ID (None for root context)
891    parent_id: Option<String>,
892    /// Operation ID generator for this context
893    id_generator: Arc<OperationIdGenerator>,
894    /// Logger for structured logging (wrapped in RwLock for runtime reconfiguration)
895    logger: Arc<RwLock<Arc<dyn Logger>>>,
896}
897
898// Ensure DurableContext is Send + Sync
899static_assertions::assert_impl_all!(DurableContext: Send, Sync);
900
901impl DurableContext {
902    /// Creates a new DurableContext with the given state.
903    ///
904    /// # Arguments
905    ///
906    /// * `state` - The execution state manager
907    pub fn new(state: Arc<ExecutionState>) -> Self {
908        let base_id = state.durable_execution_arn().to_string();
909        Self {
910            state,
911            lambda_context: None,
912            parent_id: None,
913            id_generator: Arc::new(OperationIdGenerator::new(base_id)),
914            logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
915        }
916    }
917
918    /// Creates a DurableContext from a Lambda context.
919    ///
920    /// This is the primary factory method used when running in AWS Lambda.
921    ///
922    /// # Arguments
923    ///
924    /// * `state` - The execution state manager
925    /// * `lambda_context` - The Lambda runtime context
926    pub fn from_lambda_context(
927        state: Arc<ExecutionState>,
928        lambda_context: lambda_runtime::Context,
929    ) -> Self {
930        let base_id = state.durable_execution_arn().to_string();
931        Self {
932            state,
933            lambda_context: Some(lambda_context),
934            parent_id: None,
935            id_generator: Arc::new(OperationIdGenerator::new(base_id)),
936            logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
937        }
938    }
939
940    /// Creates a child context for nested operations.
941    ///
942    /// Child contexts have their own step counter but share the same
943    /// execution state. Operations in a child context are tracked
944    /// with the parent's operation ID.
945    ///
946    /// # Arguments
947    ///
948    /// * `parent_operation_id` - The operation ID of the parent operation
949    pub fn create_child_context(&self, parent_operation_id: impl Into<String>) -> Self {
950        let parent_id = parent_operation_id.into();
951        Self {
952            state: self.state.clone(),
953            lambda_context: self.lambda_context.clone(),
954            parent_id: Some(parent_id.clone()),
955            id_generator: Arc::new(OperationIdGenerator::new(parent_id)),
956            logger: self.logger.clone(),
957        }
958    }
959
960    /// Sets a custom logger for this context.
961    ///
962    /// # Arguments
963    ///
964    /// * `logger` - The logger implementation to use
965    pub fn set_logger(&mut self, logger: Arc<dyn Logger>) {
966        *self.logger.write().unwrap() = logger;
967    }
968
969    /// Returns a new context with the specified logger.
970    ///
971    /// # Arguments
972    ///
973    /// * `logger` - The logger implementation to use
974    pub fn with_logger(self, logger: Arc<dyn Logger>) -> Self {
975        *self.logger.write().unwrap() = logger;
976        self
977    }
978
979    /// Reconfigures the logger for this context at runtime.
980    ///
981    /// All subsequent log calls on this context (and any clones sharing the
982    /// same underlying `RwLock`) will use the new logger.
983    ///
984    /// # Arguments
985    ///
986    /// * `logger` - The new logger implementation to use
987    ///
988    /// # Requirements
989    ///
990    /// - 13.1: configure_logger swaps the logger, subsequent calls use new logger
991    /// - 13.2: Original logger used when configure_logger not called
992    pub fn configure_logger(&self, logger: Arc<dyn Logger>) {
993        *self.logger.write().unwrap() = logger;
994    }
995
996    /// Returns a reference to the execution state.
997    pub fn state(&self) -> &Arc<ExecutionState> {
998        &self.state
999    }
1000
1001    /// Returns the durable execution ARN.
1002    pub fn durable_execution_arn(&self) -> &str {
1003        self.state.durable_execution_arn()
1004    }
1005
1006    /// Returns the parent operation ID, if any.
1007    pub fn parent_id(&self) -> Option<&str> {
1008        self.parent_id.as_deref()
1009    }
1010
1011    /// Returns a reference to the Lambda context, if available.
1012    pub fn lambda_context(&self) -> Option<&lambda_runtime::Context> {
1013        self.lambda_context.as_ref()
1014    }
1015
1016    /// Returns a reference to the logger.
1017    pub fn logger(&self) -> Arc<dyn Logger> {
1018        self.logger.read().unwrap().clone()
1019    }
1020
1021    /// Generates the next operation ID for this context.
1022    ///
1023    /// This method is thread-safe and will generate unique, deterministic
1024    /// IDs based on the context's base ID and step counter.
1025    pub fn next_operation_id(&self) -> String {
1026        self.id_generator.next_id()
1027    }
1028
1029    /// Creates an OperationIdentifier for the next operation.
1030    ///
1031    /// # Arguments
1032    ///
1033    /// * `name` - Optional human-readable name for the operation
1034    pub fn next_operation_identifier(&self, name: Option<String>) -> OperationIdentifier {
1035        OperationIdentifier::new(self.next_operation_id(), self.parent_id.clone(), name)
1036    }
1037
1038    /// Returns the current step counter value without incrementing.
1039    pub fn current_step_counter(&self) -> u64 {
1040        self.id_generator.current_counter()
1041    }
1042
1043    /// Creates log info for the current context.
1044    ///
1045    /// The returned `LogInfo` includes the current replay status from the
1046    /// execution state, allowing loggers to distinguish between fresh
1047    /// executions and replayed operations.
1048    ///
1049    /// # Requirements
1050    ///
1051    /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
1052    pub fn create_log_info(&self) -> LogInfo {
1053        let mut info = LogInfo::new(self.durable_execution_arn());
1054        if let Some(ref parent_id) = self.parent_id {
1055            info = info.with_parent_id(parent_id);
1056        }
1057        // Include replay status from execution state
1058        info = info.with_replay(self.state.is_replay());
1059        info
1060    }
1061
1062    /// Creates log info with an operation ID.
1063    ///
1064    /// The returned `LogInfo` includes the current replay status from the
1065    /// execution state, allowing loggers to distinguish between fresh
1066    /// executions and replayed operations.
1067    ///
1068    /// # Requirements
1069    ///
1070    /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
1071    pub fn create_log_info_with_operation(&self, operation_id: &str) -> LogInfo {
1072        self.create_log_info().with_operation_id(operation_id)
1073    }
1074
1075    /// Creates log info with explicit replay status.
1076    ///
1077    /// This method allows callers to explicitly set the replay status,
1078    /// which is useful when the operation-specific replay status differs
1079    /// from the global execution state replay status.
1080    ///
1081    /// # Arguments
1082    ///
1083    /// * `operation_id` - The operation ID to include in the log info
1084    /// * `is_replay` - Whether this specific operation is being replayed
1085    ///
1086    /// # Requirements
1087    ///
1088    /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
1089    pub fn create_log_info_with_replay(&self, operation_id: &str, is_replay: bool) -> LogInfo {
1090        let mut info = LogInfo::new(self.durable_execution_arn());
1091        if let Some(ref parent_id) = self.parent_id {
1092            info = info.with_parent_id(parent_id);
1093        }
1094        info.with_operation_id(operation_id).with_replay(is_replay)
1095    }
1096
1097    // ========================================================================
1098    // Simplified Logging API
1099    // ========================================================================
1100
1101    /// Logs a message at INFO level with automatic context.
1102    ///
1103    /// This method automatically includes the durable_execution_arn and parent_id
1104    /// in the log output without requiring the caller to specify them.
1105    ///
1106    /// # Arguments
1107    ///
1108    /// * `message` - The message to log
1109    ///
1110    /// # Example
1111    ///
1112    /// ```rust,ignore
1113    /// ctx.log_info("Processing order started");
1114    /// ```
1115    ///
1116    /// # Requirements
1117    ///
1118    /// - 4.1: THE DurableContext SHALL provide a log_info method that logs at INFO level with automatic context
1119    /// - 4.5: THE logging methods SHALL automatically include durable_execution_arn, operation_id, and parent_id
1120    pub fn log_info(&self, message: &str) {
1121        self.log_with_level(LogLevel::Info, message, &[]);
1122    }
1123
1124    /// Logs a message at INFO level with extra fields.
1125    ///
1126    /// This method automatically includes the durable_execution_arn and parent_id
1127    /// in the log output, plus any additional fields specified.
1128    ///
1129    /// # Arguments
1130    ///
1131    /// * `message` - The message to log
1132    /// * `fields` - Additional key-value pairs to include in the log
1133    ///
1134    /// # Example
1135    ///
1136    /// ```rust,ignore
1137    /// ctx.log_info_with("Processing order", &[("order_id", "123"), ("amount", "99.99")]);
1138    /// ```
1139    ///
1140    /// # Requirements
1141    ///
1142    /// - 4.1: THE DurableContext SHALL provide a log_info method that logs at INFO level with automatic context
1143    /// - 4.6: THE logging methods SHALL support additional structured fields via a builder pattern or variadic arguments
1144    pub fn log_info_with(&self, message: &str, fields: &[(&str, &str)]) {
1145        self.log_with_level(LogLevel::Info, message, fields);
1146    }
1147
1148    /// Logs a message at DEBUG level with automatic context.
1149    ///
1150    /// This method automatically includes the durable_execution_arn and parent_id
1151    /// in the log output without requiring the caller to specify them.
1152    ///
1153    /// # Arguments
1154    ///
1155    /// * `message` - The message to log
1156    ///
1157    /// # Example
1158    ///
1159    /// ```rust,ignore
1160    /// ctx.log_debug("Entering validation step");
1161    /// ```
1162    ///
1163    /// # Requirements
1164    ///
1165    /// - 4.2: THE DurableContext SHALL provide a log_debug method that logs at DEBUG level with automatic context
1166    /// - 4.5: THE logging methods SHALL automatically include durable_execution_arn, operation_id, and parent_id
1167    pub fn log_debug(&self, message: &str) {
1168        self.log_with_level(LogLevel::Debug, message, &[]);
1169    }
1170
1171    /// Logs a message at DEBUG level with extra fields.
1172    ///
1173    /// This method automatically includes the durable_execution_arn and parent_id
1174    /// in the log output, plus any additional fields specified.
1175    ///
1176    /// # Arguments
1177    ///
1178    /// * `message` - The message to log
1179    /// * `fields` - Additional key-value pairs to include in the log
1180    ///
1181    /// # Example
1182    ///
1183    /// ```rust,ignore
1184    /// ctx.log_debug_with("Variable state", &[("x", "42"), ("y", "100")]);
1185    /// ```
1186    ///
1187    /// # Requirements
1188    ///
1189    /// - 4.2: THE DurableContext SHALL provide a log_debug method that logs at DEBUG level with automatic context
1190    /// - 4.6: THE logging methods SHALL support additional structured fields via a builder pattern or variadic arguments
1191    pub fn log_debug_with(&self, message: &str, fields: &[(&str, &str)]) {
1192        self.log_with_level(LogLevel::Debug, message, fields);
1193    }
1194
1195    /// Logs a message at WARN level with automatic context.
1196    ///
1197    /// This method automatically includes the durable_execution_arn and parent_id
1198    /// in the log output without requiring the caller to specify them.
1199    ///
1200    /// # Arguments
1201    ///
1202    /// * `message` - The message to log
1203    ///
1204    /// # Example
1205    ///
1206    /// ```rust,ignore
1207    /// ctx.log_warn("Retry attempt 3 of 5");
1208    /// ```
1209    ///
1210    /// # Requirements
1211    ///
1212    /// - 4.3: THE DurableContext SHALL provide a log_warn method that logs at WARN level with automatic context
1213    /// - 4.5: THE logging methods SHALL automatically include durable_execution_arn, operation_id, and parent_id
1214    pub fn log_warn(&self, message: &str) {
1215        self.log_with_level(LogLevel::Warn, message, &[]);
1216    }
1217
1218    /// Logs a message at WARN level with extra fields.
1219    ///
1220    /// This method automatically includes the durable_execution_arn and parent_id
1221    /// in the log output, plus any additional fields specified.
1222    ///
1223    /// # Arguments
1224    ///
1225    /// * `message` - The message to log
1226    /// * `fields` - Additional key-value pairs to include in the log
1227    ///
1228    /// # Example
1229    ///
1230    /// ```rust,ignore
1231    /// ctx.log_warn_with("Rate limit approaching", &[("current", "95"), ("limit", "100")]);
1232    /// ```
1233    ///
1234    /// # Requirements
1235    ///
1236    /// - 4.3: THE DurableContext SHALL provide a log_warn method that logs at WARN level with automatic context
1237    /// - 4.6: THE logging methods SHALL support additional structured fields via a builder pattern or variadic arguments
1238    pub fn log_warn_with(&self, message: &str, fields: &[(&str, &str)]) {
1239        self.log_with_level(LogLevel::Warn, message, fields);
1240    }
1241
1242    /// Logs a message at ERROR level with automatic context.
1243    ///
1244    /// This method automatically includes the durable_execution_arn and parent_id
1245    /// in the log output without requiring the caller to specify them.
1246    ///
1247    /// # Arguments
1248    ///
1249    /// * `message` - The message to log
1250    ///
1251    /// # Example
1252    ///
1253    /// ```rust,ignore
1254    /// ctx.log_error("Failed to process payment");
1255    /// ```
1256    ///
1257    /// # Requirements
1258    ///
1259    /// - 4.4: THE DurableContext SHALL provide a log_error method that logs at ERROR level with automatic context
1260    /// - 4.5: THE logging methods SHALL automatically include durable_execution_arn, operation_id, and parent_id
1261    pub fn log_error(&self, message: &str) {
1262        self.log_with_level(LogLevel::Error, message, &[]);
1263    }
1264
1265    /// Logs a message at ERROR level with extra fields.
1266    ///
1267    /// This method automatically includes the durable_execution_arn and parent_id
1268    /// in the log output, plus any additional fields specified.
1269    ///
1270    /// # Arguments
1271    ///
1272    /// * `message` - The message to log
1273    /// * `fields` - Additional key-value pairs to include in the log
1274    ///
1275    /// # Example
1276    ///
1277    /// ```rust,ignore
1278    /// ctx.log_error_with("Payment failed", &[("error_code", "INSUFFICIENT_FUNDS"), ("amount", "150.00")]);
1279    /// ```
1280    ///
1281    /// # Requirements
1282    ///
1283    /// - 4.4: THE DurableContext SHALL provide a log_error method that logs at ERROR level with automatic context
1284    /// - 4.6: THE logging methods SHALL support additional structured fields via a builder pattern or variadic arguments
1285    pub fn log_error_with(&self, message: &str, fields: &[(&str, &str)]) {
1286        self.log_with_level(LogLevel::Error, message, fields);
1287    }
1288
1289    /// Internal helper method to log at a specific level with optional extra fields.
1290    ///
1291    /// This method creates a LogInfo with automatic context (durable_execution_arn, parent_id)
1292    /// and any additional fields, then delegates to the configured logger.
1293    ///
1294    /// # Arguments
1295    ///
1296    /// * `level` - The log level
1297    /// * `message` - The message to log
1298    /// * `extra` - Additional key-value pairs to include
1299    fn log_with_level(&self, level: LogLevel, message: &str, extra: &[(&str, &str)]) {
1300        let mut log_info = self.create_log_info();
1301
1302        for (key, value) in extra {
1303            log_info = log_info.with_extra(*key, *value);
1304        }
1305
1306        let logger = self.logger.read().unwrap();
1307        match level {
1308            LogLevel::Debug => logger.debug(message, &log_info),
1309            LogLevel::Info => logger.info(message, &log_info),
1310            LogLevel::Warn => logger.warn(message, &log_info),
1311            LogLevel::Error => logger.error(message, &log_info),
1312        }
1313    }
1314
1315    /// Returns the original user input from the EXECUTION operation.
1316    ///
1317    /// This method deserializes the input payload from the EXECUTION operation's
1318    /// ExecutionDetails.InputPayload field into the requested type.
1319    ///
1320    /// # Type Parameters
1321    ///
1322    /// * `T` - The type to deserialize the input into. Must implement `DeserializeOwned`.
1323    ///
1324    /// # Returns
1325    ///
1326    /// `Ok(T)` if the input exists and can be deserialized, or a `DurableError` if:
1327    /// - No EXECUTION operation exists
1328    /// - No input payload is available
1329    /// - Deserialization fails
1330    ///
1331    /// # Example
1332    ///
1333    /// ```rust,ignore
1334    /// #[derive(Deserialize)]
1335    /// struct OrderEvent {
1336    ///     order_id: String,
1337    ///     amount: f64,
1338    /// }
1339    ///
1340    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1341    ///     // Get the original input that started this execution
1342    ///     let event: OrderEvent = ctx.get_original_input()?;
1343    ///     println!("Processing order: {}", event.order_id);
1344    ///     Ok(())
1345    /// }
1346    /// ```
1347    ///
1348    /// # Requirements
1349    ///
1350    /// - 1.11: THE DurableContext SHALL provide access to the original user input from the EXECUTION operation
1351    /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
1352    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1353    pub fn get_original_input<T>(&self) -> DurableResult<T>
1354    where
1355        T: serde::de::DeserializeOwned,
1356    {
1357        // Get the raw input payload from the execution state
1358        let input_payload = self.state.get_original_input_raw().ok_or_else(|| {
1359            crate::error::DurableError::Validation {
1360                message: "No original input available. The EXECUTION operation may not exist or has no input payload.".to_string(),
1361            }
1362        })?;
1363
1364        // Deserialize the input payload to the requested type
1365        serde_json::from_str(input_payload).map_err(|e| crate::error::DurableError::SerDes {
1366            message: format!("Failed to deserialize original input: {}", e),
1367        })
1368    }
1369
1370    /// Returns the raw original user input as a string, if available.
1371    ///
1372    /// This method returns the raw JSON string from the EXECUTION operation's
1373    /// ExecutionDetails.InputPayload field without deserializing it.
1374    ///
1375    /// # Returns
1376    ///
1377    /// `Some(&str)` if the input exists, `None` otherwise.
1378    ///
1379    /// # Requirements
1380    ///
1381    /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
1382    pub fn get_original_input_raw(&self) -> Option<&str> {
1383        self.state.get_original_input_raw()
1384    }
1385
1386    /// Completes the execution with a successful result via checkpointing.
1387    ///
1388    /// This method checkpoints a SUCCEED action on the EXECUTION operation,
1389    /// which is useful for large results that exceed the Lambda response size limit (6MB).
1390    /// After calling this method, the Lambda function should return an empty result.
1391    ///
1392    /// # Arguments
1393    ///
1394    /// * `result` - The result to checkpoint. Must implement `Serialize`.
1395    ///
1396    /// # Returns
1397    ///
1398    /// `Ok(())` on success, or a `DurableError` if:
1399    /// - No EXECUTION operation exists
1400    /// - Serialization fails
1401    /// - The checkpoint fails
1402    ///
1403    /// # Example
1404    ///
1405    /// ```rust,ignore
1406    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1407    ///     let large_result = compute_large_result().await?;
1408    ///     
1409    ///     // Check if result would exceed Lambda response limit
1410    ///     if DurableExecutionInvocationOutput::would_exceed_max_size(&large_result) {
1411    ///         // Checkpoint the result instead of returning it
1412    ///         ctx.complete_execution_success(&large_result).await?;
1413    ///         // Return empty result - the actual result is checkpointed
1414    ///         return Ok(());
1415    ///     }
1416    ///     
1417    ///     Ok(())
1418    /// }
1419    /// ```
1420    ///
1421    /// # Requirements
1422    ///
1423    /// - 19.3: THE EXECUTION_Operation SHALL support completing execution via SUCCEED action with result
1424    /// - 19.5: WHEN execution result exceeds response size limits, THE EXECUTION_Operation SHALL checkpoint the result and return empty Result field
1425    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1426    pub async fn complete_execution_success<T>(&self, result: &T) -> DurableResult<()>
1427    where
1428        T: serde::Serialize,
1429    {
1430        let serialized =
1431            serde_json::to_string(result).map_err(|e| crate::error::DurableError::SerDes {
1432                message: format!("Failed to serialize execution result: {}", e),
1433            })?;
1434
1435        self.state
1436            .complete_execution_success(Some(serialized))
1437            .await
1438    }
1439
1440    /// Completes the execution with a failure via checkpointing.
1441    ///
1442    /// This method checkpoints a FAIL action on the EXECUTION operation.
1443    /// After calling this method, the Lambda function should return a FAILED status.
1444    ///
1445    /// # Arguments
1446    ///
1447    /// * `error` - The error details to checkpoint
1448    ///
1449    /// # Returns
1450    ///
1451    /// `Ok(())` on success, or a `DurableError` if:
1452    /// - No EXECUTION operation exists
1453    /// - The checkpoint fails
1454    ///
1455    /// # Example
1456    ///
1457    /// ```rust,ignore
1458    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1459    ///     if let Err(e) = process_order().await {
1460    ///         // Checkpoint the failure
1461    ///         ctx.complete_execution_failure(ErrorObject::new("ProcessingError", &e.to_string())).await?;
1462    ///         return Err(DurableError::execution(&e.to_string()));
1463    ///     }
1464    ///     Ok(())
1465    /// }
1466    /// ```
1467    ///
1468    /// # Requirements
1469    ///
1470    /// - 19.4: THE EXECUTION_Operation SHALL support completing execution via FAIL action with error
1471    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1472    pub async fn complete_execution_failure(
1473        &self,
1474        error: crate::error::ErrorObject,
1475    ) -> DurableResult<()> {
1476        self.state.complete_execution_failure(error).await
1477    }
1478
1479    /// Completes the execution with a successful result, automatically handling large results.
1480    ///
1481    /// This method checks if the result would exceed the Lambda response size limit (6MB).
1482    /// If so, it checkpoints the result via the EXECUTION operation and returns `true`.
1483    /// If the result fits within the limit, it returns `false` and the caller should
1484    /// return the result normally.
1485    ///
1486    /// # Arguments
1487    ///
1488    /// * `result` - The result to potentially checkpoint. Must implement `Serialize`.
1489    ///
1490    /// # Returns
1491    ///
1492    /// `Ok(true)` if the result was checkpointed (caller should return empty result),
1493    /// `Ok(false)` if the result fits within limits (caller should return it normally),
1494    /// or a `DurableError` if checkpointing fails.
1495    ///
1496    /// # Example
1497    ///
1498    /// ```rust,ignore
1499    /// async fn my_workflow(ctx: DurableContext) -> Result<LargeResult, DurableError> {
1500    ///     let result = compute_result().await?;
1501    ///     
1502    ///     // Automatically handle large results
1503    ///     if ctx.complete_execution_if_large(&result).await? {
1504    ///         // Result was checkpointed, return a placeholder
1505    ///         // The actual result is stored in the EXECUTION operation
1506    ///         return Ok(LargeResult::default());
1507    ///     }
1508    ///     
1509    ///     // Result fits within limits, return normally
1510    ///     Ok(result)
1511    /// }
1512    /// ```
1513    ///
1514    /// # Requirements
1515    ///
1516    /// - 19.5: WHEN execution result exceeds response size limits, THE EXECUTION_Operation SHALL checkpoint the result and return empty Result field
1517    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1518    pub async fn complete_execution_if_large<T>(&self, result: &T) -> DurableResult<bool>
1519    where
1520        T: serde::Serialize,
1521    {
1522        if crate::lambda::DurableExecutionInvocationOutput::would_exceed_max_size(result) {
1523            self.complete_execution_success(result).await?;
1524            Ok(true)
1525        } else {
1526            Ok(false)
1527        }
1528    }
1529
1530    // ========================================================================
1531    // Durable Operations
1532    // ========================================================================
1533
1534    /// Executes a step operation with automatic checkpointing.
1535    ///
1536    /// Steps are the fundamental unit of work in durable executions. Each step
1537    /// is checkpointed, allowing the workflow to resume from the last completed
1538    /// step after interruptions.
1539    ///
1540    /// # Arguments
1541    ///
1542    /// * `func` - The function to execute
1543    /// * `config` - Optional step configuration (retry strategy, semantics, serdes)
1544    ///
1545    /// # Returns
1546    ///
1547    /// The result of the step function, or an error if execution fails.
1548    ///
1549    /// # Example
1550    ///
1551    /// ```rust,ignore
1552    /// let result: i32 = ctx.step(|_step_ctx| {
1553    ///     Ok(42)
1554    /// }, None).await?;
1555    /// ```
1556    ///
1557    /// # Requirements
1558    ///
1559    /// - 1.1: THE DurableContext SHALL provide a `step` method that executes a closure and checkpoints the result
1560    /// - 2.3: WHEN defining public APIs, THE SDK SHALL use trait aliases instead of repeated bound combinations
1561    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1562    pub async fn step<T, F>(
1563        &self,
1564        func: F,
1565        config: Option<crate::config::StepConfig>,
1566    ) -> DurableResult<T>
1567    where
1568        T: DurableValue,
1569        F: StepFn<T>,
1570    {
1571        let op_id = self.next_operation_identifier(None);
1572        let config = config.unwrap_or_default();
1573
1574        let logger = self.logger.read().unwrap().clone();
1575        let result =
1576            crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1577
1578        // Track replay after completion
1579        if result.is_ok() {
1580            self.state.track_replay(&op_id.operation_id).await;
1581        }
1582
1583        result
1584    }
1585
1586    /// Executes a named step operation with automatic checkpointing.
1587    ///
1588    /// Same as `step`, but allows specifying a human-readable name for the operation.
1589    ///
1590    /// # Arguments
1591    ///
1592    /// * `name` - Human-readable name for the step
1593    /// * `func` - The function to execute
1594    /// * `config` - Optional step configuration
1595    ///
1596    /// # Example
1597    ///
1598    /// ```rust,ignore
1599    /// let result: i32 = ctx.step_named("validate_input", |_step_ctx| {
1600    ///     Ok(42)
1601    /// }, None).await?;
1602    /// ```
1603    ///
1604    /// # Requirements
1605    ///
1606    /// - 2.3: WHEN defining public APIs, THE SDK SHALL use trait aliases instead of repeated bound combinations
1607    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1608    pub async fn step_named<T, F>(
1609        &self,
1610        name: &str,
1611        func: F,
1612        config: Option<crate::config::StepConfig>,
1613    ) -> DurableResult<T>
1614    where
1615        T: DurableValue,
1616        F: StepFn<T>,
1617    {
1618        let op_id = self.next_operation_identifier(Some(name.to_string()));
1619        let config = config.unwrap_or_default();
1620
1621        let logger = self.logger.read().unwrap().clone();
1622        let result =
1623            crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1624
1625        // Track replay after completion
1626        if result.is_ok() {
1627            self.state.track_replay(&op_id.operation_id).await;
1628        }
1629
1630        result
1631    }
1632
1633    /// Pauses execution for a specified duration.
1634    ///
1635    /// Wait operations suspend the Lambda execution and resume after the
1636    /// specified duration has elapsed. This is efficient because it doesn't
1637    /// block Lambda resources during the wait.
1638    ///
1639    /// # Arguments
1640    ///
1641    /// * `duration` - The duration to wait (must be at least 1 second)
1642    /// * `name` - Optional human-readable name for the operation
1643    ///
1644    /// # Returns
1645    ///
1646    /// Ok(()) when the wait has elapsed, or an error if validation fails.
1647    ///
1648    /// # Example
1649    ///
1650    /// ```rust,ignore
1651    /// // Wait for 5 seconds
1652    /// ctx.wait(Duration::from_seconds(5), None).await?;
1653    ///
1654    /// // Wait with a name
1655    /// ctx.wait(Duration::from_minutes(1), Some("wait_for_processing")).await?;
1656    /// ```
1657    ///
1658    /// # Requirements
1659    ///
1660    /// - 1.2: THE DurableContext SHALL provide a `wait` method that pauses execution for a specified Duration
1661    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1662    pub async fn wait(
1663        &self,
1664        duration: crate::duration::Duration,
1665        name: Option<&str>,
1666    ) -> DurableResult<()> {
1667        let op_id = self.next_operation_identifier(name.map(|s| s.to_string()));
1668
1669        let logger = self.logger.read().unwrap().clone();
1670        let result = crate::handlers::wait_handler(duration, &self.state, &op_id, &logger).await;
1671
1672        // Track replay after completion (only if not suspended)
1673        if result.is_ok() {
1674            self.state.track_replay(&op_id.operation_id).await;
1675        }
1676
1677        result
1678    }
1679
1680    /// Cancels an active wait operation.
1681    ///
1682    /// This method allows cancelling a wait operation that was previously started.
1683    /// If the wait has already completed (succeeded, failed, or timed out), this
1684    /// method will return Ok(()) without making any changes.
1685    ///
1686    /// # Arguments
1687    ///
1688    /// * `operation_id` - The operation ID of the wait to cancel
1689    ///
1690    /// # Returns
1691    ///
1692    /// Ok(()) if the wait was cancelled or was already completed, or an error if:
1693    /// - The operation doesn't exist
1694    /// - The operation is not a WAIT operation
1695    /// - The checkpoint fails
1696    ///
1697    /// # Example
1698    ///
1699    /// ```rust,ignore
1700    /// // Start a wait in a parallel branch
1701    /// let wait_op_id = ctx.next_operation_id();
1702    ///
1703    /// // Later, cancel the wait from another branch
1704    /// ctx.cancel_wait(&wait_op_id).await?;
1705    /// ```
1706    ///
1707    /// # Requirements
1708    ///
1709    /// - 5.5: THE Wait_Operation SHALL support cancellation of active waits via CANCEL action
1710    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1711    pub async fn cancel_wait(&self, operation_id: &str) -> DurableResult<()> {
1712        let logger = self.logger.read().unwrap().clone();
1713        crate::handlers::wait_cancel_handler(&self.state, operation_id, &logger).await
1714    }
1715
1716    /// Creates a callback and returns a handle to wait for the result.
1717    ///
1718    /// Callbacks allow external systems to signal completion of asynchronous
1719    /// operations. The callback ID can be shared with external systems, which
1720    /// can then call the Lambda durable execution callback API.
1721    ///
1722    /// # Arguments
1723    ///
1724    /// * `config` - Optional callback configuration (timeout, heartbeat)
1725    ///
1726    /// # Returns
1727    ///
1728    /// A `Callback<T>` handle that can be used to wait for the result.
1729    ///
1730    /// # Example
1731    ///
1732    /// ```rust,ignore
1733    /// let callback = ctx.create_callback::<ApprovalResponse>(None).await?;
1734    ///
1735    /// // Share callback.callback_id with external system
1736    /// notify_approver(&callback.callback_id).await?;
1737    ///
1738    /// // Wait for the callback result
1739    /// let approval = callback.result().await?;
1740    /// ```
1741    ///
1742    /// # Requirements
1743    ///
1744    /// - 1.3: THE DurableContext SHALL provide a `create_callback` method that returns a Callback with a unique callback_id
1745    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1746    pub async fn create_callback<T>(
1747        &self,
1748        config: Option<crate::config::CallbackConfig>,
1749    ) -> DurableResult<crate::handlers::Callback<T>>
1750    where
1751        T: serde::Serialize + serde::de::DeserializeOwned,
1752    {
1753        let op_id = self.next_operation_identifier(None);
1754        let config = config.unwrap_or_default();
1755
1756        let logger = self.logger.read().unwrap().clone();
1757        let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1758
1759        // Track replay after completion
1760        if result.is_ok() {
1761            self.state.track_replay(&op_id.operation_id).await;
1762        }
1763
1764        result
1765    }
1766
1767    /// Creates a named callback and returns a handle to wait for the result.
1768    ///
1769    /// Same as `create_callback`, but allows specifying a human-readable name.
1770    ///
1771    /// # Requirements
1772    ///
1773    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1774    pub async fn create_callback_named<T>(
1775        &self,
1776        name: &str,
1777        config: Option<crate::config::CallbackConfig>,
1778    ) -> DurableResult<crate::handlers::Callback<T>>
1779    where
1780        T: serde::Serialize + serde::de::DeserializeOwned,
1781    {
1782        let op_id = self.next_operation_identifier(Some(name.to_string()));
1783        let config = config.unwrap_or_default();
1784
1785        let logger = self.logger.read().unwrap().clone();
1786        let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1787
1788        // Track replay after completion
1789        if result.is_ok() {
1790            self.state.track_replay(&op_id.operation_id).await;
1791        }
1792
1793        result
1794    }
1795
1796    /// Invokes another durable Lambda function.
1797    ///
1798    /// This method calls another Lambda function and waits for its result.
1799    /// The invocation is checkpointed, so if the workflow is interrupted,
1800    /// it will resume with the result of the invocation.
1801    ///
1802    /// # Arguments
1803    ///
1804    /// * `function_name` - The name or ARN of the Lambda function to invoke
1805    /// * `payload` - The payload to send to the function
1806    /// * `config` - Optional invoke configuration (timeout, serdes)
1807    ///
1808    /// # Returns
1809    ///
1810    /// The result from the invoked function, or an error if invocation fails.
1811    ///
1812    /// # Example
1813    ///
1814    /// ```rust,ignore
1815    /// let result: ProcessingResult = ctx.invoke(
1816    ///     "process-order-function",
1817    ///     OrderPayload { order_id: "123".to_string() },
1818    ///     None,
1819    /// ).await?;
1820    /// ```
1821    ///
1822    /// # Requirements
1823    ///
1824    /// - 1.4: THE DurableContext SHALL provide an `invoke` method that calls other durable functions
1825    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1826    pub async fn invoke<P, R>(
1827        &self,
1828        function_name: &str,
1829        payload: P,
1830        config: Option<crate::config::InvokeConfig<P, R>>,
1831    ) -> DurableResult<R>
1832    where
1833        P: serde::Serialize + serde::de::DeserializeOwned + Send,
1834        R: serde::Serialize + serde::de::DeserializeOwned + Send,
1835    {
1836        let op_id = self.next_operation_identifier(Some(format!("invoke:{}", function_name)));
1837        let config = config.unwrap_or_default();
1838
1839        let logger = self.logger.read().unwrap().clone();
1840        let result = crate::handlers::invoke_handler(
1841            function_name,
1842            payload,
1843            &self.state,
1844            &op_id,
1845            &config,
1846            &logger,
1847        )
1848        .await;
1849
1850        // Track replay after completion
1851        if result.is_ok() {
1852            self.state.track_replay(&op_id.operation_id).await;
1853        }
1854
1855        result
1856    }
1857
1858    /// Processes a collection in parallel with configurable concurrency.
1859    ///
1860    /// Map operations execute a function for each item in the collection,
1861    /// with configurable concurrency limits and failure tolerance.
1862    ///
1863    /// # Arguments
1864    ///
1865    /// * `items` - The collection of items to process
1866    /// * `func` - The function to apply to each item
1867    /// * `config` - Optional map configuration (concurrency, completion criteria)
1868    ///
1869    /// # Returns
1870    ///
1871    /// A `BatchResult<U>` containing results for all items.
1872    ///
1873    /// # Example
1874    ///
1875    /// ```rust,ignore
1876    /// let results = ctx.map(
1877    ///     vec![1, 2, 3, 4, 5],
1878    ///     |child_ctx, item, index| async move {
1879    ///         Ok(item * 2)
1880    ///     },
1881    ///     Some(MapConfig {
1882    ///         max_concurrency: Some(3),
1883    ///         ..Default::default()
1884    ///     }),
1885    /// ).await?;
1886    /// ```
1887    ///
1888    /// # Requirements
1889    ///
1890    /// - 1.5: THE DurableContext SHALL provide a `map` method that processes a collection in parallel with configurable concurrency
1891    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1892    pub async fn map<T, U, F, Fut>(
1893        &self,
1894        items: Vec<T>,
1895        func: F,
1896        config: Option<crate::config::MapConfig>,
1897    ) -> DurableResult<crate::concurrency::BatchResult<U>>
1898    where
1899        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + Clone + 'static,
1900        U: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1901        F: Fn(DurableContext, T, usize) -> Fut + Send + Sync + Clone + 'static,
1902        Fut: std::future::Future<Output = DurableResult<U>> + Send + 'static,
1903    {
1904        let op_id = self.next_operation_identifier(Some("map".to_string()));
1905        let config = config.unwrap_or_default();
1906
1907        let logger = self.logger.read().unwrap().clone();
1908        let result =
1909            crate::handlers::map_handler(items, func, &self.state, &op_id, self, &config, &logger)
1910                .await;
1911
1912        // Track replay after completion
1913        if result.is_ok() {
1914            self.state.track_replay(&op_id.operation_id).await;
1915        }
1916
1917        result
1918    }
1919
1920    /// Executes multiple operations in parallel.
1921    ///
1922    /// Parallel operations execute multiple independent functions concurrently,
1923    /// with configurable concurrency limits and completion criteria.
1924    ///
1925    /// # Arguments
1926    ///
1927    /// * `branches` - The list of functions to execute in parallel
1928    /// * `config` - Optional parallel configuration (concurrency, completion criteria)
1929    ///
1930    /// # Returns
1931    ///
1932    /// A `BatchResult<T>` containing results for all branches.
1933    ///
1934    /// # Example
1935    ///
1936    /// ```rust,ignore
1937    /// let results = ctx.parallel(
1938    ///     vec![
1939    ///         |ctx| async move { Ok(fetch_data_a(&ctx).await?) },
1940    ///         |ctx| async move { Ok(fetch_data_b(&ctx).await?) },
1941    ///         |ctx| async move { Ok(fetch_data_c(&ctx).await?) },
1942    ///     ],
1943    ///     None,
1944    /// ).await?;
1945    /// ```
1946    ///
1947    /// # Requirements
1948    ///
1949    /// - 1.6: THE DurableContext SHALL provide a `parallel` method that executes multiple closures concurrently
1950    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1951    pub async fn parallel<T, F, Fut>(
1952        &self,
1953        branches: Vec<F>,
1954        config: Option<crate::config::ParallelConfig>,
1955    ) -> DurableResult<crate::concurrency::BatchResult<T>>
1956    where
1957        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1958        F: FnOnce(DurableContext) -> Fut + Send + 'static,
1959        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
1960    {
1961        let op_id = self.next_operation_identifier(Some("parallel".to_string()));
1962        let config = config.unwrap_or_default();
1963
1964        let logger = self.logger.read().unwrap().clone();
1965        let result = crate::handlers::parallel_handler(
1966            branches,
1967            &self.state,
1968            &op_id,
1969            self,
1970            &config,
1971            &logger,
1972        )
1973        .await;
1974
1975        // Track replay after completion
1976        if result.is_ok() {
1977            self.state.track_replay(&op_id.operation_id).await;
1978        }
1979
1980        result
1981    }
1982
1983    /// Executes a function in a child context.
1984    ///
1985    /// Child contexts provide isolation for nested workflows. Operations in
1986    /// a child context are tracked separately and can be checkpointed as a unit.
1987    ///
1988    /// # Arguments
1989    ///
1990    /// * `func` - The function to execute in the child context
1991    /// * `config` - Optional child context configuration
1992    ///
1993    /// # Returns
1994    ///
1995    /// The result of the child function, or an error if execution fails.
1996    ///
1997    /// # Example
1998    ///
1999    /// ```rust,ignore
2000    /// let result = ctx.run_in_child_context(|child_ctx| async move {
2001    ///     let step1 = child_ctx.step(|_| Ok(1), None).await?;
2002    ///     let step2 = child_ctx.step(|_| Ok(2), None).await?;
2003    ///     Ok(step1 + step2)
2004    /// }, None).await?;
2005    /// ```
2006    ///
2007    /// # Requirements
2008    ///
2009    /// - 1.7: THE DurableContext SHALL provide a `run_in_child_context` method that creates isolated nested workflows
2010    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2011    pub async fn run_in_child_context<T, F, Fut>(
2012        &self,
2013        func: F,
2014        config: Option<crate::config::ChildConfig>,
2015    ) -> DurableResult<T>
2016    where
2017        T: serde::Serialize + serde::de::DeserializeOwned + Send,
2018        F: FnOnce(DurableContext) -> Fut + Send,
2019        Fut: std::future::Future<Output = DurableResult<T>> + Send,
2020    {
2021        let op_id = self.next_operation_identifier(Some("child_context".to_string()));
2022        let config = config.unwrap_or_default();
2023
2024        let logger = self.logger.read().unwrap().clone();
2025        let result =
2026            crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
2027
2028        // Track replay after completion
2029        if result.is_ok() {
2030            self.state.track_replay(&op_id.operation_id).await;
2031        }
2032
2033        result
2034    }
2035
2036    /// Executes a named function in a child context.
2037    ///
2038    /// Same as `run_in_child_context`, but allows specifying a human-readable name.
2039    ///
2040    /// # Requirements
2041    ///
2042    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2043    pub async fn run_in_child_context_named<T, F, Fut>(
2044        &self,
2045        name: &str,
2046        func: F,
2047        config: Option<crate::config::ChildConfig>,
2048    ) -> DurableResult<T>
2049    where
2050        T: serde::Serialize + serde::de::DeserializeOwned + Send,
2051        F: FnOnce(DurableContext) -> Fut + Send,
2052        Fut: std::future::Future<Output = DurableResult<T>> + Send,
2053    {
2054        let op_id = self.next_operation_identifier(Some(name.to_string()));
2055        let config = config.unwrap_or_default();
2056
2057        let logger = self.logger.read().unwrap().clone();
2058        let result =
2059            crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
2060
2061        // Track replay after completion
2062        if result.is_ok() {
2063            self.state.track_replay(&op_id.operation_id).await;
2064        }
2065
2066        result
2067    }
2068
2069    /// Polls until a condition is met.
2070    ///
2071    /// This method repeatedly checks a condition until it returns a successful
2072    /// result. Between checks, it waits for a configurable duration using the
2073    /// RETRY mechanism with NextAttemptDelaySeconds.
2074    ///
2075    /// # Implementation
2076    ///
2077    /// This method is implemented as a single STEP operation with RETRY mechanism,
2078    /// which is more efficient than using multiple steps and waits. The state is
2079    /// passed as Payload on retry (not Error), and the attempt number is tracked
2080    /// in StepDetails.Attempt.
2081    ///
2082    /// # Arguments
2083    ///
2084    /// * `check` - The function to check the condition
2085    /// * `config` - Configuration for the wait (interval, max attempts, timeout)
2086    ///
2087    /// # Returns
2088    ///
2089    /// The result when the condition is met, or an error if timeout/max attempts exceeded.
2090    ///
2091    /// # Example
2092    ///
2093    /// ```rust,ignore
2094    /// let result = ctx.wait_for_condition(
2095    ///     |state, ctx| {
2096    ///         // Check if order is ready
2097    ///         let status = check_order_status(&state.order_id)?;
2098    ///         if status == "ready" {
2099    ///             Ok(OrderReady { order_id: state.order_id.clone() })
2100    ///         } else {
2101    ///             Err("Order not ready yet".into())
2102    ///         }
2103    ///     },
2104    ///     WaitForConditionConfig::from_interval(
2105    ///         OrderState { order_id: "123".to_string() },
2106    ///         Duration::from_seconds(5),
2107    ///         Some(10),
2108    ///     ),
2109    /// ).await?;
2110    /// ```
2111    ///
2112    /// # Requirements
2113    ///
2114    /// - 1.8: THE DurableContext SHALL provide a `wait_for_condition` method that polls until a condition is met
2115    /// - 4.9: THE Step_Operation SHALL support RETRY action with Payload for wait-for-condition pattern
2116    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2117    pub async fn wait_for_condition<T, S, F>(
2118        &self,
2119        check: F,
2120        config: WaitForConditionConfig<S>,
2121    ) -> DurableResult<T>
2122    where
2123        T: serde::Serialize + serde::de::DeserializeOwned + Send,
2124        S: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
2125        F: Fn(&S, &WaitForConditionContext) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
2126            + Send
2127            + Sync,
2128    {
2129        let op_id = self.next_operation_identifier(Some("wait_for_condition".to_string()));
2130
2131        let logger = self.logger.read().unwrap().clone();
2132        // Use the new wait_for_condition_handler which implements the single STEP with RETRY pattern
2133        // This is more efficient than the previous child context + multiple steps approach
2134        let result = crate::handlers::wait_for_condition_handler(
2135            check,
2136            config,
2137            &self.state,
2138            &op_id,
2139            &logger,
2140        )
2141        .await;
2142
2143        // Track replay after completion (only if not suspended)
2144        if result.is_ok() {
2145            self.state.track_replay(&op_id.operation_id).await;
2146        }
2147
2148        result
2149    }
2150
2151    /// Creates a callback and waits for the result with a submitter function.
2152    ///
2153    /// This is a convenience method that combines callback creation with
2154    /// a submitter function that sends the callback ID to an external system.
2155    /// The submitter execution is checkpointed within a child context to ensure
2156    /// replay safety - the submitter will not be re-executed during replay.
2157    ///
2158    /// # Arguments
2159    ///
2160    /// * `submitter` - Function that receives the callback ID and submits it to external system
2161    /// * `config` - Optional callback configuration (timeout, heartbeat)
2162    ///
2163    /// # Returns
2164    ///
2165    /// The callback result from the external system.
2166    ///
2167    /// # Example
2168    ///
2169    /// ```rust,ignore
2170    /// let approval = ctx.wait_for_callback(
2171    ///     |callback_id| async move {
2172    ///         // Send callback ID to approval system
2173    ///         send_approval_request(&callback_id, &request).await
2174    ///     },
2175    ///     Some(CallbackConfig {
2176    ///         timeout: Duration::from_hours(24),
2177    ///         ..Default::default()
2178    ///     }),
2179    /// ).await?;
2180    /// ```
2181    ///
2182    /// # Requirements
2183    ///
2184    /// - 1.1: WHEN wait_for_callback is called, THE SDK SHALL create a callback and execute the submitter function within a durable step
2185    /// - 1.2: THE wait_for_callback method SHALL checkpoint the submitter execution to ensure replay safety
2186    /// - 1.3: IF the submitter function fails, THEN THE SDK SHALL propagate the error with appropriate context
2187    /// - 1.4: THE wait_for_callback method SHALL support configurable callback timeout and heartbeat timeout
2188    /// - 1.9: THE DurableContext SHALL provide a `wait_for_callback` method that combines callback creation with a submitter function
2189    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2190    pub async fn wait_for_callback<T, F, Fut>(
2191        &self,
2192        submitter: F,
2193        config: Option<crate::config::CallbackConfig>,
2194    ) -> DurableResult<T>
2195    where
2196        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync,
2197        F: FnOnce(String) -> Fut + Send + 'static,
2198        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
2199            + Send
2200            + 'static,
2201    {
2202        // Create the callback first with the provided configuration (Requirement 1.4)
2203        let callback: crate::handlers::Callback<T> = self.create_callback(config).await?;
2204        let callback_id = callback.callback_id.clone();
2205
2206        // Generate operation ID for the child context that will execute the submitter
2207        let op_id = self.next_operation_identifier(Some("wait_for_callback_submitter".to_string()));
2208
2209        // Execute the submitter within a child context for proper checkpointing (Requirements 1.1, 1.2)
2210        // The child context ensures the submitter execution is tracked and not re-executed during replay
2211        let child_config = crate::config::ChildConfig::default();
2212
2213        let logger = self.logger.read().unwrap().clone();
2214        crate::handlers::child_handler(
2215            |child_ctx| {
2216                let callback_id = callback_id.clone();
2217                async move {
2218                    // Use a step to checkpoint that we're about to execute the submitter
2219                    // This step returns () on success, indicating submission completed
2220                    child_ctx
2221                        .step_named(
2222                            "execute_submitter",
2223                            move |_| {
2224                                // The step just marks that we're executing the submitter
2225                                // The actual async submitter call happens after this checkpoint
2226                                Ok(())
2227                            },
2228                            None,
2229                        )
2230                        .await?;
2231
2232                    // Now execute the actual submitter
2233                    // If we're replaying and the step above succeeded, we won't reach here
2234                    // because the child context will return the checkpointed result
2235                    submitter(callback_id).await.map_err(|e| {
2236                        crate::error::DurableError::UserCode {
2237                            message: e.to_string(),
2238                            error_type: "SubmitterError".to_string(),
2239                            stack_trace: None,
2240                        }
2241                    })?;
2242
2243                    Ok(())
2244                }
2245            },
2246            &self.state,
2247            &op_id,
2248            self,
2249            &child_config,
2250            &logger,
2251        )
2252        .await?;
2253
2254        // Track replay after child context completion
2255        self.state.track_replay(&op_id.operation_id).await;
2256
2257        // Wait for the callback result
2258        callback.result().await
2259    }
2260
2261    // ========================================================================
2262    // Promise Combinators
2263    // ========================================================================
2264
2265    /// Waits for all futures to complete successfully.
2266    ///
2267    /// Returns all results if all futures succeed, or returns the first error encountered.
2268    /// This is implemented within a STEP operation for durability.
2269    ///
2270    /// # Arguments
2271    ///
2272    /// * `futures` - Vector of futures to execute
2273    ///
2274    /// # Returns
2275    ///
2276    /// `Ok(Vec<T>)` if all futures succeed, or `Err` with the first error.
2277    ///
2278    /// # Example
2279    ///
2280    /// ```rust,ignore
2281    /// let results = ctx.all(vec![
2282    ///     ctx.step(|_| Ok(1), None),
2283    ///     ctx.step(|_| Ok(2), None),
2284    ///     ctx.step(|_| Ok(3), None),
2285    /// ]).await?;
2286    /// assert_eq!(results, vec![1, 2, 3]);
2287    /// ```
2288    ///
2289    /// # Requirements
2290    ///
2291    /// - 20.1: Wait for all promises to complete successfully, return error on first failure
2292    /// - 20.5: Implement within a STEP operation for durability
2293    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2294    pub async fn all<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<Vec<T>>
2295    where
2296        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2297        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2298    {
2299        let op_id = self.next_operation_identifier(Some("all".to_string()));
2300
2301        let logger = self.logger.read().unwrap().clone();
2302        let result = crate::handlers::all_handler(futures, &self.state, &op_id, &logger).await;
2303
2304        // Track replay after completion
2305        if result.is_ok() {
2306            self.state.track_replay(&op_id.operation_id).await;
2307        }
2308
2309        result
2310    }
2311
2312    /// Waits for all futures to settle (success or failure).
2313    ///
2314    /// Returns a BatchResult containing outcomes for all futures, regardless of success or failure.
2315    /// This is implemented within a STEP operation for durability.
2316    ///
2317    /// # Arguments
2318    ///
2319    /// * `futures` - Vector of futures to execute
2320    ///
2321    /// # Returns
2322    ///
2323    /// `BatchResult<T>` containing results for all futures.
2324    ///
2325    /// # Example
2326    ///
2327    /// ```rust,ignore
2328    /// let results = ctx.all_settled(vec![
2329    ///     ctx.step(|_| Ok(1), None),
2330    ///     ctx.step(|_| Err("error".into()), None),
2331    ///     ctx.step(|_| Ok(3), None),
2332    /// ]).await?;
2333    /// assert_eq!(results.success_count(), 2);
2334    /// assert_eq!(results.failure_count(), 1);
2335    /// ```
2336    ///
2337    /// # Requirements
2338    ///
2339    /// - 20.2: Wait for all promises to settle, return BatchResult with all outcomes
2340    /// - 20.5: Implement within a STEP operation for durability
2341    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2342    pub async fn all_settled<T, Fut>(
2343        &self,
2344        futures: Vec<Fut>,
2345    ) -> DurableResult<crate::concurrency::BatchResult<T>>
2346    where
2347        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2348        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2349    {
2350        let op_id = self.next_operation_identifier(Some("all_settled".to_string()));
2351
2352        let logger = self.logger.read().unwrap().clone();
2353        let result =
2354            crate::handlers::all_settled_handler(futures, &self.state, &op_id, &logger).await;
2355
2356        // Track replay after completion
2357        if result.is_ok() {
2358            self.state.track_replay(&op_id.operation_id).await;
2359        }
2360
2361        result
2362    }
2363
2364    /// Returns the result of the first future to settle.
2365    ///
2366    /// Returns the result (success or failure) of whichever future completes first.
2367    /// This is implemented within a STEP operation for durability.
2368    ///
2369    /// # Arguments
2370    ///
2371    /// * `futures` - Vector of futures to execute
2372    ///
2373    /// # Returns
2374    ///
2375    /// The result of the first future to settle.
2376    ///
2377    /// # Example
2378    ///
2379    /// ```rust,ignore
2380    /// let result = ctx.race(vec![
2381    ///     ctx.step(|_| Ok(1), None),
2382    ///     ctx.step(|_| Ok(2), None),
2383    /// ]).await?;
2384    /// // result is either 1 or 2, whichever completed first
2385    /// ```
2386    ///
2387    /// # Requirements
2388    ///
2389    /// - 20.3: Return result of first promise to settle
2390    /// - 20.5: Implement within a STEP operation for durability
2391    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2392    pub async fn race<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2393    where
2394        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2395        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2396    {
2397        let op_id = self.next_operation_identifier(Some("race".to_string()));
2398
2399        let logger = self.logger.read().unwrap().clone();
2400        let result = crate::handlers::race_handler(futures, &self.state, &op_id, &logger).await;
2401
2402        // Track replay after completion
2403        if result.is_ok() {
2404            self.state.track_replay(&op_id.operation_id).await;
2405        }
2406
2407        result
2408    }
2409
2410    /// Returns the result of the first future to succeed.
2411    ///
2412    /// Returns the result of the first future to succeed. If all futures fail,
2413    /// returns an error containing all the failures.
2414    /// This is implemented within a STEP operation for durability.
2415    ///
2416    /// # Arguments
2417    ///
2418    /// * `futures` - Vector of futures to execute
2419    ///
2420    /// # Returns
2421    ///
2422    /// The result of the first future to succeed, or an error if all fail.
2423    ///
2424    /// # Example
2425    ///
2426    /// ```rust,ignore
2427    /// let result = ctx.any(vec![
2428    ///     ctx.step(|_| Err("error".into()), None),
2429    ///     ctx.step(|_| Ok(2), None),
2430    ///     ctx.step(|_| Ok(3), None),
2431    /// ]).await?;
2432    /// // result is either 2 or 3, whichever succeeded first
2433    /// ```
2434    ///
2435    /// # Requirements
2436    ///
2437    /// - 20.4: Return result of first promise to succeed, return error only if all fail
2438    /// - 20.5: Implement within a STEP operation for durability
2439    /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2440    pub async fn any<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2441    where
2442        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2443        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2444    {
2445        let op_id = self.next_operation_identifier(Some("any".to_string()));
2446
2447        let logger = self.logger.read().unwrap().clone();
2448        let result = crate::handlers::any_handler(futures, &self.state, &op_id, &logger).await;
2449
2450        // Track replay after completion
2451        if result.is_ok() {
2452            self.state.track_replay(&op_id.operation_id).await;
2453        }
2454
2455        result
2456    }
2457}
2458
2459/// Configuration for wait_for_condition operations.
2460///
2461/// # Requirements
2462///
2463/// - 4.7: wait_for_condition uses wait_strategy to determine polling delay
2464/// - 4.8: Backward-compatible constructor converts interval + max_attempts to a WaitStrategy
2465#[allow(clippy::type_complexity)]
2466pub struct WaitForConditionConfig<S> {
2467    /// Initial state to pass to the check function.
2468    pub initial_state: S,
2469    /// Wait strategy function that determines polling behavior.
2470    ///
2471    /// Takes a reference to the current state and the attempt number (1-indexed),
2472    /// and returns a [`WaitDecision`](crate::config::WaitDecision).
2473    pub wait_strategy: Box<dyn Fn(&S, usize) -> crate::config::WaitDecision + Send + Sync>,
2474    /// Overall timeout for the operation.
2475    pub timeout: Option<crate::duration::Duration>,
2476    /// Optional custom serializer/deserializer.
2477    pub serdes: Option<std::sync::Arc<dyn crate::config::SerDesAny>>,
2478}
2479
2480impl<S> std::fmt::Debug for WaitForConditionConfig<S>
2481where
2482    S: std::fmt::Debug,
2483{
2484    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2485        f.debug_struct("WaitForConditionConfig")
2486            .field("initial_state", &self.initial_state)
2487            .field("wait_strategy", &"<fn>")
2488            .field("timeout", &self.timeout)
2489            .field("serdes", &self.serdes.is_some())
2490            .finish()
2491    }
2492}
2493
2494impl<S> WaitForConditionConfig<S> {
2495    /// Creates a backward-compatible `WaitForConditionConfig` from interval and max_attempts.
2496    ///
2497    /// This constructor converts the old-style `interval` + `max_attempts` parameters
2498    /// into a wait strategy that uses a fixed delay (no backoff, no jitter).
2499    ///
2500    /// # Arguments
2501    ///
2502    /// * `initial_state` - Initial state to pass to the check function.
2503    /// * `interval` - Fixed interval between condition checks.
2504    /// * `max_attempts` - Maximum number of attempts (`None` for unlimited).
2505    ///
2506    /// # Example
2507    ///
2508    /// ```rust,ignore
2509    /// use durable_execution_sdk::{WaitForConditionConfig, Duration};
2510    ///
2511    /// let config = WaitForConditionConfig::from_interval(
2512    ///     my_initial_state,
2513    ///     Duration::from_seconds(5),
2514    ///     Some(10),
2515    /// );
2516    /// ```
2517    ///
2518    /// # Requirements
2519    ///
2520    /// - 4.8: Backward-compatible constructor converts interval + max_attempts to a WaitStrategy
2521    pub fn from_interval(
2522        initial_state: S,
2523        interval: crate::duration::Duration,
2524        max_attempts: Option<usize>,
2525    ) -> Self
2526    where
2527        S: Send + Sync + 'static,
2528    {
2529        let interval_secs = interval.to_seconds();
2530        let max = max_attempts.unwrap_or(usize::MAX);
2531
2532        Self {
2533            initial_state,
2534            wait_strategy: Box::new(move |_state: &S, attempts_made: usize| {
2535                // In the backward-compatible mode, the check function's Ok/Err
2536                // determines whether polling is done. The strategy only provides
2537                // the delay and enforces max_attempts.
2538                // Return Done when max_attempts exceeded (handler will checkpoint failure
2539                // if check returned Err, or success if check returned Ok).
2540                if attempts_made >= max {
2541                    return crate::config::WaitDecision::Done;
2542                }
2543                crate::config::WaitDecision::Continue {
2544                    delay: crate::duration::Duration::from_seconds(interval_secs),
2545                }
2546            }),
2547            timeout: None,
2548            serdes: None,
2549        }
2550    }
2551}
2552
2553impl<S: Default + Send + Sync + 'static> Default for WaitForConditionConfig<S> {
2554    fn default() -> Self {
2555        Self::from_interval(
2556            S::default(),
2557            crate::duration::Duration::from_seconds(5),
2558            None,
2559        )
2560    }
2561}
2562
2563/// Context provided to wait_for_condition check functions.
2564#[derive(Debug, Clone)]
2565pub struct WaitForConditionContext {
2566    /// Current attempt number (1-indexed).
2567    pub attempt: usize,
2568    /// Maximum number of attempts (None for unlimited).
2569    pub max_attempts: Option<usize>,
2570}
2571
2572impl Clone for DurableContext {
2573    fn clone(&self) -> Self {
2574        Self {
2575            state: self.state.clone(),
2576            lambda_context: self.lambda_context.clone(),
2577            parent_id: self.parent_id.clone(),
2578            id_generator: self.id_generator.clone(),
2579            logger: self.logger.clone(),
2580        }
2581    }
2582}
2583
2584impl std::fmt::Debug for DurableContext {
2585    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2586        f.debug_struct("DurableContext")
2587            .field("durable_execution_arn", &self.durable_execution_arn())
2588            .field("parent_id", &self.parent_id)
2589            .field("step_counter", &self.current_step_counter())
2590            .finish_non_exhaustive()
2591    }
2592}
2593
2594// Add hex encoding dependency
2595mod hex {
2596    const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
2597
2598    pub fn encode(bytes: &[u8]) -> String {
2599        let mut result = String::with_capacity(bytes.len() * 2);
2600        for &byte in bytes {
2601            result.push(HEX_CHARS[(byte >> 4) as usize] as char);
2602            result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
2603        }
2604        result
2605    }
2606}
2607
2608#[cfg(test)]
2609mod tests {
2610    use super::*;
2611
2612    #[test]
2613    fn test_operation_identifier_new() {
2614        let id = OperationIdentifier::new(
2615            "op-123",
2616            Some("parent-456".to_string()),
2617            Some("my-step".to_string()),
2618        );
2619        assert_eq!(id.operation_id, "op-123");
2620        assert_eq!(id.parent_id, Some("parent-456".to_string()));
2621        assert_eq!(id.name, Some("my-step".to_string()));
2622    }
2623
2624    #[test]
2625    fn test_operation_identifier_root() {
2626        let id = OperationIdentifier::root("op-123");
2627        assert_eq!(id.operation_id, "op-123");
2628        assert!(id.parent_id.is_none());
2629        assert!(id.name.is_none());
2630    }
2631
2632    #[test]
2633    fn test_operation_identifier_with_parent() {
2634        let id = OperationIdentifier::with_parent("op-123", "parent-456");
2635        assert_eq!(id.operation_id, "op-123");
2636        assert_eq!(id.parent_id, Some("parent-456".to_string()));
2637        assert!(id.name.is_none());
2638    }
2639
2640    #[test]
2641    fn test_operation_identifier_with_name() {
2642        let id = OperationIdentifier::root("op-123").with_name("my-step");
2643        assert_eq!(id.name, Some("my-step".to_string()));
2644    }
2645
2646    #[test]
2647    fn test_operation_identifier_display() {
2648        let id = OperationIdentifier::root("op-123");
2649        assert_eq!(format!("{}", id), "op-123");
2650
2651        let id_with_name = OperationIdentifier::root("op-123").with_name("my-step");
2652        assert_eq!(format!("{}", id_with_name), "my-step(op-123)");
2653    }
2654
2655    #[test]
2656    fn test_generate_operation_id_deterministic() {
2657        let id1 = generate_operation_id("base-123", 0);
2658        let id2 = generate_operation_id("base-123", 0);
2659        assert_eq!(id1, id2);
2660    }
2661
2662    #[test]
2663    fn test_generate_operation_id_different_counters() {
2664        let id1 = generate_operation_id("base-123", 0);
2665        let id2 = generate_operation_id("base-123", 1);
2666        assert_ne!(id1, id2);
2667    }
2668
2669    #[test]
2670    fn test_generate_operation_id_different_bases() {
2671        let id1 = generate_operation_id("base-123", 0);
2672        let id2 = generate_operation_id("base-456", 0);
2673        assert_ne!(id1, id2);
2674    }
2675
2676    #[test]
2677    fn test_generate_operation_id_length() {
2678        let id = generate_operation_id("base-123", 0);
2679        assert_eq!(id.len(), 32); // 16 bytes = 32 hex chars
2680    }
2681
2682    #[test]
2683    fn test_operation_id_generator_new() {
2684        let gen = OperationIdGenerator::new("base-123");
2685        assert_eq!(gen.base_id(), "base-123");
2686        assert_eq!(gen.current_counter(), 0);
2687    }
2688
2689    #[test]
2690    fn test_operation_id_generator_with_counter() {
2691        let gen = OperationIdGenerator::with_counter("base-123", 10);
2692        assert_eq!(gen.current_counter(), 10);
2693    }
2694
2695    #[test]
2696    fn test_operation_id_generator_next_id() {
2697        let gen = OperationIdGenerator::new("base-123");
2698
2699        let id1 = gen.next_id();
2700        assert_eq!(gen.current_counter(), 1);
2701
2702        let id2 = gen.next_id();
2703        assert_eq!(gen.current_counter(), 2);
2704
2705        assert_ne!(id1, id2);
2706    }
2707
2708    #[test]
2709    fn test_operation_id_generator_id_for_counter() {
2710        let gen = OperationIdGenerator::new("base-123");
2711
2712        let id_0 = gen.id_for_counter(0);
2713        let id_1 = gen.id_for_counter(1);
2714
2715        // id_for_counter should not increment the counter
2716        assert_eq!(gen.current_counter(), 0);
2717
2718        // Should match what next_id would produce
2719        let next = gen.next_id();
2720        assert_eq!(next, id_0);
2721
2722        let next = gen.next_id();
2723        assert_eq!(next, id_1);
2724    }
2725
2726    #[test]
2727    fn test_operation_id_generator_create_child() {
2728        let gen = OperationIdGenerator::new("base-123");
2729        gen.next_id(); // Increment parent counter
2730
2731        let child = gen.create_child("child-op-id");
2732        assert_eq!(child.base_id(), "child-op-id");
2733        assert_eq!(child.current_counter(), 0);
2734    }
2735
2736    #[test]
2737    fn test_operation_id_generator_clone() {
2738        let gen = OperationIdGenerator::new("base-123");
2739        gen.next_id();
2740        gen.next_id();
2741
2742        let cloned = gen.clone();
2743        assert_eq!(cloned.base_id(), gen.base_id());
2744        assert_eq!(cloned.current_counter(), gen.current_counter());
2745    }
2746
2747    #[test]
2748    fn test_operation_id_generator_thread_safety() {
2749        use std::thread;
2750
2751        let gen = Arc::new(OperationIdGenerator::new("base-123"));
2752        let mut handles = vec![];
2753
2754        for _ in 0..10 {
2755            let gen_clone = gen.clone();
2756            handles.push(thread::spawn(move || {
2757                let mut ids = vec![];
2758                for _ in 0..100 {
2759                    ids.push(gen_clone.next_id());
2760                }
2761                ids
2762            }));
2763        }
2764
2765        let mut all_ids = vec![];
2766        for handle in handles {
2767            all_ids.extend(handle.join().unwrap());
2768        }
2769
2770        // All IDs should be unique
2771        let unique_count = {
2772            let mut set = std::collections::HashSet::new();
2773            for id in &all_ids {
2774                set.insert(id.clone());
2775            }
2776            set.len()
2777        };
2778
2779        assert_eq!(unique_count, 1000);
2780        assert_eq!(gen.current_counter(), 1000);
2781    }
2782
2783    #[test]
2784    fn test_log_info_new() {
2785        let info = LogInfo::new("arn:test");
2786        assert_eq!(info.durable_execution_arn, Some("arn:test".to_string()));
2787        assert!(info.operation_id.is_none());
2788        assert!(info.parent_id.is_none());
2789    }
2790
2791    #[test]
2792    fn test_log_info_with_operation_id() {
2793        let info = LogInfo::new("arn:test").with_operation_id("op-123");
2794        assert_eq!(info.operation_id, Some("op-123".to_string()));
2795    }
2796
2797    #[test]
2798    fn test_log_info_with_parent_id() {
2799        let info = LogInfo::new("arn:test").with_parent_id("parent-456");
2800        assert_eq!(info.parent_id, Some("parent-456".to_string()));
2801    }
2802
2803    #[test]
2804    fn test_log_info_with_extra() {
2805        let info = LogInfo::new("arn:test")
2806            .with_extra("key1", "value1")
2807            .with_extra("key2", "value2");
2808        assert_eq!(info.extra.len(), 2);
2809        assert_eq!(info.extra[0], ("key1".to_string(), "value1".to_string()));
2810    }
2811
2812    #[test]
2813    fn test_hex_encode() {
2814        assert_eq!(hex::encode(&[0x00]), "00");
2815        assert_eq!(hex::encode(&[0xff]), "ff");
2816        assert_eq!(hex::encode(&[0x12, 0x34, 0xab, 0xcd]), "1234abcd");
2817    }
2818}
2819
2820#[cfg(test)]
2821mod durable_context_tests {
2822    use super::*;
2823    use crate::client::SharedDurableServiceClient;
2824    use crate::lambda::InitialExecutionState;
2825    use std::sync::Arc;
2826
2827    #[cfg(test)]
2828    fn create_mock_client() -> SharedDurableServiceClient {
2829        use crate::client::MockDurableServiceClient;
2830        Arc::new(MockDurableServiceClient::new())
2831    }
2832
2833    fn create_test_state() -> Arc<ExecutionState> {
2834        let client = create_mock_client();
2835        Arc::new(ExecutionState::new(
2836            "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
2837            "token-123",
2838            InitialExecutionState::new(),
2839            client,
2840        ))
2841    }
2842
2843    #[test]
2844    fn test_durable_context_new() {
2845        let state = create_test_state();
2846        let ctx = DurableContext::new(state.clone());
2847
2848        assert_eq!(
2849            ctx.durable_execution_arn(),
2850            "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123"
2851        );
2852        assert!(ctx.parent_id().is_none());
2853        assert!(ctx.lambda_context().is_none());
2854        assert_eq!(ctx.current_step_counter(), 0);
2855    }
2856
2857    #[test]
2858    fn test_durable_context_next_operation_id() {
2859        let state = create_test_state();
2860        let ctx = DurableContext::new(state);
2861
2862        let id1 = ctx.next_operation_id();
2863        let id2 = ctx.next_operation_id();
2864
2865        assert_ne!(id1, id2);
2866        assert_eq!(ctx.current_step_counter(), 2);
2867    }
2868
2869    #[test]
2870    fn test_durable_context_next_operation_identifier() {
2871        let state = create_test_state();
2872        let ctx = DurableContext::new(state);
2873
2874        let op_id = ctx.next_operation_identifier(Some("my-step".to_string()));
2875
2876        assert!(op_id.parent_id.is_none());
2877        assert_eq!(op_id.name, Some("my-step".to_string()));
2878        assert!(!op_id.operation_id.is_empty());
2879    }
2880
2881    #[test]
2882    fn test_durable_context_create_child_context() {
2883        let state = create_test_state();
2884        let ctx = DurableContext::new(state);
2885
2886        // Generate a parent operation ID
2887        let parent_op_id = ctx.next_operation_id();
2888
2889        // Create child context
2890        let child_ctx = ctx.create_child_context(&parent_op_id);
2891
2892        assert_eq!(child_ctx.parent_id(), Some(parent_op_id.as_str()));
2893        assert_eq!(child_ctx.current_step_counter(), 0);
2894        assert_eq!(
2895            child_ctx.durable_execution_arn(),
2896            ctx.durable_execution_arn()
2897        );
2898    }
2899
2900    #[test]
2901    fn test_durable_context_child_generates_different_ids() {
2902        let state = create_test_state();
2903        let ctx = DurableContext::new(state);
2904
2905        let parent_op_id = ctx.next_operation_id();
2906        let child_ctx = ctx.create_child_context(&parent_op_id);
2907
2908        // Child should generate different IDs than parent
2909        let child_id = child_ctx.next_operation_id();
2910        let parent_id = ctx.next_operation_id();
2911
2912        assert_ne!(child_id, parent_id);
2913    }
2914
2915    #[test]
2916    fn test_durable_context_child_operation_identifier_has_parent() {
2917        let state = create_test_state();
2918        let ctx = DurableContext::new(state);
2919
2920        let parent_op_id = ctx.next_operation_id();
2921        let child_ctx = ctx.create_child_context(&parent_op_id);
2922
2923        let child_op_id = child_ctx.next_operation_identifier(None);
2924
2925        assert_eq!(child_op_id.parent_id, Some(parent_op_id));
2926    }
2927
2928    #[test]
2929    fn test_durable_context_set_logger() {
2930        let state = create_test_state();
2931        let mut ctx = DurableContext::new(state);
2932
2933        // Create a custom logger
2934        let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2935        ctx.set_logger(custom_logger);
2936
2937        // Just verify it doesn't panic - the logger is set
2938        let _ = ctx.logger();
2939    }
2940
2941    #[test]
2942    fn test_durable_context_with_logger() {
2943        let state = create_test_state();
2944        let ctx = DurableContext::new(state);
2945
2946        let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2947        let ctx_with_logger = ctx.with_logger(custom_logger);
2948
2949        // Just verify it doesn't panic - the logger is set
2950        let _ = ctx_with_logger.logger();
2951    }
2952
2953    #[test]
2954    fn test_durable_context_create_log_info() {
2955        let state = create_test_state();
2956        let ctx = DurableContext::new(state);
2957
2958        let info = ctx.create_log_info();
2959
2960        assert_eq!(
2961            info.durable_execution_arn,
2962            Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
2963        );
2964        assert!(info.parent_id.is_none());
2965    }
2966
2967    #[test]
2968    fn test_durable_context_create_log_info_with_parent() {
2969        let state = create_test_state();
2970        let ctx = DurableContext::new(state);
2971
2972        let parent_op_id = ctx.next_operation_id();
2973        let child_ctx = ctx.create_child_context(&parent_op_id);
2974
2975        let info = child_ctx.create_log_info();
2976
2977        assert_eq!(info.parent_id, Some(parent_op_id));
2978    }
2979
2980    #[test]
2981    fn test_durable_context_create_log_info_with_operation() {
2982        let state = create_test_state();
2983        let ctx = DurableContext::new(state);
2984
2985        let info = ctx.create_log_info_with_operation("op-123");
2986
2987        assert_eq!(info.operation_id, Some("op-123".to_string()));
2988    }
2989
2990    #[test]
2991    fn test_log_info_with_replay() {
2992        let info = LogInfo::new("arn:test")
2993            .with_operation_id("op-123")
2994            .with_replay(true);
2995
2996        assert!(info.is_replay);
2997        assert_eq!(info.operation_id, Some("op-123".to_string()));
2998    }
2999
3000    #[test]
3001    fn test_log_info_default_not_replay() {
3002        let info = LogInfo::default();
3003        assert!(!info.is_replay);
3004    }
3005
3006    #[test]
3007    fn test_replay_logging_config_default() {
3008        let config = ReplayLoggingConfig::default();
3009        assert_eq!(config, ReplayLoggingConfig::SuppressAll);
3010    }
3011
3012    #[test]
3013    fn test_replay_aware_logger_suppress_all() {
3014        use std::sync::atomic::{AtomicUsize, Ordering};
3015
3016        // Create counters for each log level
3017        let debug_count = Arc::new(AtomicUsize::new(0));
3018        let info_count = Arc::new(AtomicUsize::new(0));
3019        let warn_count = Arc::new(AtomicUsize::new(0));
3020        let error_count = Arc::new(AtomicUsize::new(0));
3021
3022        let inner = Arc::new(custom_logger(
3023            {
3024                let count = debug_count.clone();
3025                move |_, _| {
3026                    count.fetch_add(1, Ordering::SeqCst);
3027                }
3028            },
3029            {
3030                let count = info_count.clone();
3031                move |_, _| {
3032                    count.fetch_add(1, Ordering::SeqCst);
3033                }
3034            },
3035            {
3036                let count = warn_count.clone();
3037                move |_, _| {
3038                    count.fetch_add(1, Ordering::SeqCst);
3039                }
3040            },
3041            {
3042                let count = error_count.clone();
3043                move |_, _| {
3044                    count.fetch_add(1, Ordering::SeqCst);
3045                }
3046            },
3047        ));
3048
3049        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
3050
3051        // Non-replay logs should pass through
3052        let non_replay_info = LogInfo::new("arn:test").with_replay(false);
3053        logger.debug("test", &non_replay_info);
3054        logger.info("test", &non_replay_info);
3055        logger.warn("test", &non_replay_info);
3056        logger.error("test", &non_replay_info);
3057
3058        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3059        assert_eq!(info_count.load(Ordering::SeqCst), 1);
3060        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3061        assert_eq!(error_count.load(Ordering::SeqCst), 1);
3062
3063        // Replay logs should be suppressed
3064        let replay_info = LogInfo::new("arn:test").with_replay(true);
3065        logger.debug("test", &replay_info);
3066        logger.info("test", &replay_info);
3067        logger.warn("test", &replay_info);
3068        logger.error("test", &replay_info);
3069
3070        // Counts should not have increased
3071        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3072        assert_eq!(info_count.load(Ordering::SeqCst), 1);
3073        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3074        assert_eq!(error_count.load(Ordering::SeqCst), 1);
3075    }
3076
3077    #[test]
3078    fn test_replay_aware_logger_allow_all() {
3079        use std::sync::atomic::{AtomicUsize, Ordering};
3080
3081        let call_count = Arc::new(AtomicUsize::new(0));
3082
3083        let inner = Arc::new(custom_logger(
3084            {
3085                let count = call_count.clone();
3086                move |_, _| {
3087                    count.fetch_add(1, Ordering::SeqCst);
3088                }
3089            },
3090            {
3091                let count = call_count.clone();
3092                move |_, _| {
3093                    count.fetch_add(1, Ordering::SeqCst);
3094                }
3095            },
3096            {
3097                let count = call_count.clone();
3098                move |_, _| {
3099                    count.fetch_add(1, Ordering::SeqCst);
3100                }
3101            },
3102            {
3103                let count = call_count.clone();
3104                move |_, _| {
3105                    count.fetch_add(1, Ordering::SeqCst);
3106                }
3107            },
3108        ));
3109
3110        let logger = ReplayAwareLogger::allow_all(inner);
3111
3112        // All logs should pass through even during replay
3113        let replay_info = LogInfo::new("arn:test").with_replay(true);
3114        logger.debug("test", &replay_info);
3115        logger.info("test", &replay_info);
3116        logger.warn("test", &replay_info);
3117        logger.error("test", &replay_info);
3118
3119        assert_eq!(call_count.load(Ordering::SeqCst), 4);
3120    }
3121
3122    #[test]
3123    fn test_replay_aware_logger_errors_only() {
3124        use std::sync::atomic::{AtomicUsize, Ordering};
3125
3126        let debug_count = Arc::new(AtomicUsize::new(0));
3127        let info_count = Arc::new(AtomicUsize::new(0));
3128        let warn_count = Arc::new(AtomicUsize::new(0));
3129        let error_count = Arc::new(AtomicUsize::new(0));
3130
3131        let inner = Arc::new(custom_logger(
3132            {
3133                let count = debug_count.clone();
3134                move |_, _| {
3135                    count.fetch_add(1, Ordering::SeqCst);
3136                }
3137            },
3138            {
3139                let count = info_count.clone();
3140                move |_, _| {
3141                    count.fetch_add(1, Ordering::SeqCst);
3142                }
3143            },
3144            {
3145                let count = warn_count.clone();
3146                move |_, _| {
3147                    count.fetch_add(1, Ordering::SeqCst);
3148                }
3149            },
3150            {
3151                let count = error_count.clone();
3152                move |_, _| {
3153                    count.fetch_add(1, Ordering::SeqCst);
3154                }
3155            },
3156        ));
3157
3158        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::ErrorsOnly);
3159
3160        // During replay, only errors should pass through
3161        let replay_info = LogInfo::new("arn:test").with_replay(true);
3162        logger.debug("test", &replay_info);
3163        logger.info("test", &replay_info);
3164        logger.warn("test", &replay_info);
3165        logger.error("test", &replay_info);
3166
3167        assert_eq!(debug_count.load(Ordering::SeqCst), 0);
3168        assert_eq!(info_count.load(Ordering::SeqCst), 0);
3169        assert_eq!(warn_count.load(Ordering::SeqCst), 0);
3170        assert_eq!(error_count.load(Ordering::SeqCst), 1);
3171    }
3172
3173    #[test]
3174    fn test_replay_aware_logger_warnings_and_errors() {
3175        use std::sync::atomic::{AtomicUsize, Ordering};
3176
3177        let debug_count = Arc::new(AtomicUsize::new(0));
3178        let info_count = Arc::new(AtomicUsize::new(0));
3179        let warn_count = Arc::new(AtomicUsize::new(0));
3180        let error_count = Arc::new(AtomicUsize::new(0));
3181
3182        let inner = Arc::new(custom_logger(
3183            {
3184                let count = debug_count.clone();
3185                move |_, _| {
3186                    count.fetch_add(1, Ordering::SeqCst);
3187                }
3188            },
3189            {
3190                let count = info_count.clone();
3191                move |_, _| {
3192                    count.fetch_add(1, Ordering::SeqCst);
3193                }
3194            },
3195            {
3196                let count = warn_count.clone();
3197                move |_, _| {
3198                    count.fetch_add(1, Ordering::SeqCst);
3199                }
3200            },
3201            {
3202                let count = error_count.clone();
3203                move |_, _| {
3204                    count.fetch_add(1, Ordering::SeqCst);
3205                }
3206            },
3207        ));
3208
3209        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::WarningsAndErrors);
3210
3211        // During replay, only warnings and errors should pass through
3212        let replay_info = LogInfo::new("arn:test").with_replay(true);
3213        logger.debug("test", &replay_info);
3214        logger.info("test", &replay_info);
3215        logger.warn("test", &replay_info);
3216        logger.error("test", &replay_info);
3217
3218        assert_eq!(debug_count.load(Ordering::SeqCst), 0);
3219        assert_eq!(info_count.load(Ordering::SeqCst), 0);
3220        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3221        assert_eq!(error_count.load(Ordering::SeqCst), 1);
3222    }
3223
3224    #[test]
3225    fn test_replay_aware_logger_suppress_replay_constructor() {
3226        let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3227        let logger = ReplayAwareLogger::suppress_replay(inner);
3228
3229        assert_eq!(logger.config(), ReplayLoggingConfig::SuppressAll);
3230    }
3231
3232    #[test]
3233    fn test_replay_aware_logger_debug() {
3234        let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3235        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
3236
3237        let debug_str = format!("{:?}", logger);
3238        assert!(debug_str.contains("ReplayAwareLogger"));
3239        assert!(debug_str.contains("SuppressAll"));
3240    }
3241
3242    #[test]
3243    fn test_durable_context_create_log_info_with_replay_method() {
3244        let state = create_test_state();
3245        let ctx = DurableContext::new(state);
3246
3247        let info = ctx.create_log_info_with_replay("op-123", true);
3248
3249        assert_eq!(info.operation_id, Some("op-123".to_string()));
3250        assert!(info.is_replay);
3251    }
3252
3253    #[test]
3254    fn test_durable_context_clone() {
3255        let state = create_test_state();
3256        let ctx = DurableContext::new(state);
3257
3258        ctx.next_operation_id();
3259        ctx.next_operation_id();
3260
3261        let cloned = ctx.clone();
3262
3263        assert_eq!(cloned.durable_execution_arn(), ctx.durable_execution_arn());
3264        assert_eq!(cloned.current_step_counter(), ctx.current_step_counter());
3265    }
3266
3267    #[test]
3268    fn test_durable_context_debug() {
3269        let state = create_test_state();
3270        let ctx = DurableContext::new(state);
3271
3272        let debug_str = format!("{:?}", ctx);
3273
3274        assert!(debug_str.contains("DurableContext"));
3275        assert!(debug_str.contains("durable_execution_arn"));
3276    }
3277
3278    #[test]
3279    fn test_durable_context_state_access() {
3280        let state = create_test_state();
3281        let ctx = DurableContext::new(state.clone());
3282
3283        // Verify we can access the state
3284        assert!(Arc::ptr_eq(&ctx.state(), &state));
3285    }
3286
3287    #[test]
3288    fn test_durable_context_send_sync() {
3289        // This test verifies at compile time that DurableContext is Send + Sync
3290        fn assert_send_sync<T: Send + Sync>() {}
3291        assert_send_sync::<DurableContext>();
3292    }
3293
3294    // ========================================================================
3295    // Simplified Logging API Tests
3296    // ========================================================================
3297
3298    #[test]
3299    fn test_log_info_method() {
3300        use std::sync::atomic::{AtomicUsize, Ordering};
3301        use std::sync::Mutex;
3302
3303        let info_count = Arc::new(AtomicUsize::new(0));
3304        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3305
3306        let captured_info_clone = captured_info.clone();
3307        let inner = Arc::new(custom_logger(
3308            |_, _| {},
3309            {
3310                let count = info_count.clone();
3311                move |_, info: &LogInfo| {
3312                    count.fetch_add(1, Ordering::SeqCst);
3313                    *captured_info_clone.lock().unwrap() = Some(info.clone());
3314                }
3315            },
3316            |_, _| {},
3317            |_, _| {},
3318        ));
3319
3320        let state = create_test_state();
3321        let ctx = DurableContext::new(state).with_logger(inner);
3322
3323        ctx.log_info("Test message");
3324
3325        assert_eq!(info_count.load(Ordering::SeqCst), 1);
3326
3327        let captured = captured_info.lock().unwrap();
3328        let info = captured.as_ref().unwrap();
3329        assert_eq!(
3330            info.durable_execution_arn,
3331            Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
3332        );
3333    }
3334
3335    #[test]
3336    fn test_log_debug_method() {
3337        use std::sync::atomic::{AtomicUsize, Ordering};
3338
3339        let debug_count = Arc::new(AtomicUsize::new(0));
3340
3341        let inner = Arc::new(custom_logger(
3342            {
3343                let count = debug_count.clone();
3344                move |_, _| {
3345                    count.fetch_add(1, Ordering::SeqCst);
3346                }
3347            },
3348            |_, _| {},
3349            |_, _| {},
3350            |_, _| {},
3351        ));
3352
3353        let state = create_test_state();
3354        let ctx = DurableContext::new(state).with_logger(inner);
3355
3356        ctx.log_debug("Debug message");
3357
3358        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3359    }
3360
3361    #[test]
3362    fn test_log_warn_method() {
3363        use std::sync::atomic::{AtomicUsize, Ordering};
3364
3365        let warn_count = Arc::new(AtomicUsize::new(0));
3366
3367        let inner = Arc::new(custom_logger(
3368            |_, _| {},
3369            |_, _| {},
3370            {
3371                let count = warn_count.clone();
3372                move |_, _| {
3373                    count.fetch_add(1, Ordering::SeqCst);
3374                }
3375            },
3376            |_, _| {},
3377        ));
3378
3379        let state = create_test_state();
3380        let ctx = DurableContext::new(state).with_logger(inner);
3381
3382        ctx.log_warn("Warning message");
3383
3384        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3385    }
3386
3387    #[test]
3388    fn test_log_error_method() {
3389        use std::sync::atomic::{AtomicUsize, Ordering};
3390
3391        let error_count = Arc::new(AtomicUsize::new(0));
3392
3393        let inner = Arc::new(custom_logger(|_, _| {}, |_, _| {}, |_, _| {}, {
3394            let count = error_count.clone();
3395            move |_, _| {
3396                count.fetch_add(1, Ordering::SeqCst);
3397            }
3398        }));
3399
3400        let state = create_test_state();
3401        let ctx = DurableContext::new(state).with_logger(inner);
3402
3403        ctx.log_error("Error message");
3404
3405        assert_eq!(error_count.load(Ordering::SeqCst), 1);
3406    }
3407
3408    #[test]
3409    fn test_log_info_with_extra_fields() {
3410        use std::sync::Mutex;
3411
3412        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3413
3414        let captured_info_clone = captured_info.clone();
3415        let inner = Arc::new(custom_logger(
3416            |_, _| {},
3417            move |_, info: &LogInfo| {
3418                *captured_info_clone.lock().unwrap() = Some(info.clone());
3419            },
3420            |_, _| {},
3421            |_, _| {},
3422        ));
3423
3424        let state = create_test_state();
3425        let ctx = DurableContext::new(state).with_logger(inner);
3426
3427        ctx.log_info_with("Test message", &[("order_id", "123"), ("amount", "99.99")]);
3428
3429        let captured = captured_info.lock().unwrap();
3430        let info = captured.as_ref().unwrap();
3431
3432        // Verify extra fields are present
3433        assert_eq!(info.extra.len(), 2);
3434        assert!(info
3435            .extra
3436            .contains(&("order_id".to_string(), "123".to_string())));
3437        assert!(info
3438            .extra
3439            .contains(&("amount".to_string(), "99.99".to_string())));
3440    }
3441
3442    #[test]
3443    fn test_log_debug_with_extra_fields() {
3444        use std::sync::Mutex;
3445
3446        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3447
3448        let captured_info_clone = captured_info.clone();
3449        let inner = Arc::new(custom_logger(
3450            move |_, info: &LogInfo| {
3451                *captured_info_clone.lock().unwrap() = Some(info.clone());
3452            },
3453            |_, _| {},
3454            |_, _| {},
3455            |_, _| {},
3456        ));
3457
3458        let state = create_test_state();
3459        let ctx = DurableContext::new(state).with_logger(inner);
3460
3461        ctx.log_debug_with("Debug message", &[("key", "value")]);
3462
3463        let captured = captured_info.lock().unwrap();
3464        let info = captured.as_ref().unwrap();
3465
3466        assert_eq!(info.extra.len(), 1);
3467        assert!(info
3468            .extra
3469            .contains(&("key".to_string(), "value".to_string())));
3470    }
3471
3472    #[test]
3473    fn test_log_warn_with_extra_fields() {
3474        use std::sync::Mutex;
3475
3476        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3477
3478        let captured_info_clone = captured_info.clone();
3479        let inner = Arc::new(custom_logger(
3480            |_, _| {},
3481            |_, _| {},
3482            move |_, info: &LogInfo| {
3483                *captured_info_clone.lock().unwrap() = Some(info.clone());
3484            },
3485            |_, _| {},
3486        ));
3487
3488        let state = create_test_state();
3489        let ctx = DurableContext::new(state).with_logger(inner);
3490
3491        ctx.log_warn_with("Warning message", &[("retry", "3")]);
3492
3493        let captured = captured_info.lock().unwrap();
3494        let info = captured.as_ref().unwrap();
3495
3496        assert_eq!(info.extra.len(), 1);
3497        assert!(info.extra.contains(&("retry".to_string(), "3".to_string())));
3498    }
3499
3500    #[test]
3501    fn test_log_error_with_extra_fields() {
3502        use std::sync::Mutex;
3503
3504        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3505
3506        let captured_info_clone = captured_info.clone();
3507        let inner = Arc::new(custom_logger(
3508            |_, _| {},
3509            |_, _| {},
3510            |_, _| {},
3511            move |_, info: &LogInfo| {
3512                *captured_info_clone.lock().unwrap() = Some(info.clone());
3513            },
3514        ));
3515
3516        let state = create_test_state();
3517        let ctx = DurableContext::new(state).with_logger(inner);
3518
3519        ctx.log_error_with(
3520            "Error message",
3521            &[("error_code", "E001"), ("details", "Something went wrong")],
3522        );
3523
3524        let captured = captured_info.lock().unwrap();
3525        let info = captured.as_ref().unwrap();
3526
3527        assert_eq!(info.extra.len(), 2);
3528        assert!(info
3529            .extra
3530            .contains(&("error_code".to_string(), "E001".to_string())));
3531        assert!(info
3532            .extra
3533            .contains(&("details".to_string(), "Something went wrong".to_string())));
3534    }
3535
3536    #[test]
3537    fn test_log_methods_include_parent_id_in_child_context() {
3538        use std::sync::Mutex;
3539
3540        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3541
3542        let captured_info_clone = captured_info.clone();
3543        let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3544            |_, _| {},
3545            move |_, info: &LogInfo| {
3546                *captured_info_clone.lock().unwrap() = Some(info.clone());
3547            },
3548            |_, _| {},
3549            |_, _| {},
3550        ));
3551
3552        let state = create_test_state();
3553        let ctx = DurableContext::new(state).with_logger(inner.clone());
3554
3555        let parent_op_id = ctx.next_operation_id();
3556        let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3557
3558        child_ctx.log_info("Child context message");
3559
3560        let captured = captured_info.lock().unwrap();
3561        let info = captured.as_ref().unwrap();
3562
3563        // Verify parent_id is included
3564        assert_eq!(info.parent_id, Some(parent_op_id));
3565    }
3566
3567    // ========================================================================
3568    // Runtime Logger Reconfiguration Tests (Req 13.1, 13.2)
3569    // ========================================================================
3570
3571    #[test]
3572    fn test_configure_logger_swaps_logger() {
3573        // Req 13.1: configure_logger swaps the logger, subsequent calls use new logger
3574        use std::sync::atomic::{AtomicUsize, Ordering};
3575
3576        let original_count = Arc::new(AtomicUsize::new(0));
3577        let new_count = Arc::new(AtomicUsize::new(0));
3578
3579        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3580            |_, _| {},
3581            {
3582                let count = original_count.clone();
3583                move |_, _| {
3584                    count.fetch_add(1, Ordering::SeqCst);
3585                }
3586            },
3587            |_, _| {},
3588            |_, _| {},
3589        ));
3590
3591        let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3592            |_, _| {},
3593            {
3594                let count = new_count.clone();
3595                move |_, _| {
3596                    count.fetch_add(1, Ordering::SeqCst);
3597                }
3598            },
3599            |_, _| {},
3600            |_, _| {},
3601        ));
3602
3603        let state = create_test_state();
3604        let ctx = DurableContext::new(state).with_logger(original_logger);
3605
3606        // Log with original logger
3607        ctx.log_info("before swap");
3608        assert_eq!(original_count.load(Ordering::SeqCst), 1);
3609        assert_eq!(new_count.load(Ordering::SeqCst), 0);
3610
3611        // Swap logger at runtime (note: &self, not &mut self)
3612        ctx.configure_logger(new_logger);
3613
3614        // Subsequent calls use the new logger
3615        ctx.log_info("after swap");
3616        assert_eq!(original_count.load(Ordering::SeqCst), 1); // unchanged
3617        assert_eq!(new_count.load(Ordering::SeqCst), 1);
3618    }
3619
3620    #[test]
3621    fn test_original_logger_used_when_configure_logger_not_called() {
3622        // Req 13.2: Original logger used when configure_logger not called
3623        use std::sync::atomic::{AtomicUsize, Ordering};
3624
3625        let original_count = Arc::new(AtomicUsize::new(0));
3626
3627        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3628            |_, _| {},
3629            {
3630                let count = original_count.clone();
3631                move |_, _| {
3632                    count.fetch_add(1, Ordering::SeqCst);
3633                }
3634            },
3635            |_, _| {},
3636            |_, _| {},
3637        ));
3638
3639        let state = create_test_state();
3640        let ctx = DurableContext::new(state).with_logger(original_logger);
3641
3642        // Log multiple times without calling configure_logger
3643        ctx.log_info("message 1");
3644        ctx.log_info("message 2");
3645        ctx.log_info("message 3");
3646
3647        assert_eq!(original_count.load(Ordering::SeqCst), 3);
3648    }
3649
3650    #[test]
3651    fn test_configure_logger_affects_child_contexts() {
3652        // Verify that child contexts sharing the same RwLock see the swapped logger
3653        use std::sync::atomic::{AtomicUsize, Ordering};
3654
3655        let original_count = Arc::new(AtomicUsize::new(0));
3656        let new_count = Arc::new(AtomicUsize::new(0));
3657
3658        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3659            |_, _| {},
3660            {
3661                let count = original_count.clone();
3662                move |_, _| {
3663                    count.fetch_add(1, Ordering::SeqCst);
3664                }
3665            },
3666            |_, _| {},
3667            |_, _| {},
3668        ));
3669
3670        let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3671            |_, _| {},
3672            {
3673                let count = new_count.clone();
3674                move |_, _| {
3675                    count.fetch_add(1, Ordering::SeqCst);
3676                }
3677            },
3678            |_, _| {},
3679            |_, _| {},
3680        ));
3681
3682        let state = create_test_state();
3683        let ctx = DurableContext::new(state).with_logger(original_logger);
3684        let parent_op_id = ctx.next_operation_id();
3685        let child_ctx = ctx.create_child_context(&parent_op_id);
3686
3687        // Child uses original logger
3688        child_ctx.log_info("child before swap");
3689        assert_eq!(original_count.load(Ordering::SeqCst), 1);
3690
3691        // Swap on parent
3692        ctx.configure_logger(new_logger);
3693
3694        // Child now uses the new logger (shared RwLock)
3695        child_ctx.log_info("child after swap");
3696        assert_eq!(new_count.load(Ordering::SeqCst), 1);
3697        assert_eq!(original_count.load(Ordering::SeqCst), 1); // unchanged
3698    }
3699}
3700
3701#[cfg(test)]
3702mod property_tests {
3703    use super::*;
3704    use proptest::prelude::*;
3705
3706    // Property 3: Operation ID Determinism
3707    // *For any* DurableContext with a given parent_id and step counter state,
3708    // calling create_operation_id() SHALL produce the same ID when called
3709    // in the same sequence position across multiple executions.
3710    // **Validates: Requirements 1.10**
3711    proptest! {
3712        #![proptest_config(ProptestConfig::with_cases(100))]
3713
3714        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism
3715        /// Validates: Requirements 1.10
3716        #[test]
3717        fn prop_operation_id_determinism(
3718            base_id in "[a-zA-Z0-9:/-]{1,100}",
3719            counter in 0u64..10000u64,
3720        ) {
3721            // Generate the same ID twice with the same inputs
3722            let id1 = generate_operation_id(&base_id, counter);
3723            let id2 = generate_operation_id(&base_id, counter);
3724
3725            // IDs must be identical for the same inputs
3726            prop_assert_eq!(&id1, &id2, "Same base_id and counter must produce identical IDs");
3727
3728            // ID must be a valid hex string of expected length
3729            prop_assert_eq!(id1.len(), 32, "ID must be 32 hex characters");
3730            prop_assert!(id1.chars().all(|c| c.is_ascii_hexdigit()), "ID must be valid hex");
3731        }
3732
3733        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism (Generator)
3734        /// Validates: Requirements 1.10
3735        #[test]
3736        fn prop_operation_id_generator_determinism(
3737            base_id in "[a-zA-Z0-9:/-]{1,100}",
3738            num_ids in 1usize..50usize,
3739        ) {
3740            // Create two generators with the same base ID
3741            let gen1 = OperationIdGenerator::new(&base_id);
3742            let gen2 = OperationIdGenerator::new(&base_id);
3743
3744            // Generate the same sequence of IDs from both
3745            let ids1: Vec<String> = (0..num_ids).map(|_| gen1.next_id()).collect();
3746            let ids2: Vec<String> = (0..num_ids).map(|_| gen2.next_id()).collect();
3747
3748            // Sequences must be identical
3749            prop_assert_eq!(&ids1, &ids2, "Same generator sequence must produce identical IDs");
3750
3751            // All IDs in a sequence must be unique
3752            let unique_count = {
3753                let mut set = std::collections::HashSet::new();
3754                for id in &ids1 {
3755                    set.insert(id.clone());
3756                }
3757                set.len()
3758            };
3759            prop_assert_eq!(unique_count, num_ids, "All IDs in sequence must be unique");
3760        }
3761
3762        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism (Replay Simulation)
3763        /// Validates: Requirements 1.10
3764        #[test]
3765        fn prop_operation_id_replay_determinism(
3766            base_id in "[a-zA-Z0-9:/-]{1,100}",
3767            operations in prop::collection::vec(0u32..3u32, 1..20),
3768        ) {
3769            // Simulate two executions with the same sequence of operations
3770            // Each operation type increments the counter
3771
3772            let gen1 = OperationIdGenerator::new(&base_id);
3773            let gen2 = OperationIdGenerator::new(&base_id);
3774
3775            let mut ids1 = Vec::new();
3776            let mut ids2 = Vec::new();
3777
3778            // First "execution"
3779            for op_type in &operations {
3780                // Each operation generates an ID
3781                ids1.push(gen1.next_id());
3782
3783                // Some operations might generate additional child IDs
3784                if *op_type == 2 {
3785                    let parent_id = ids1.last().unwrap().clone();
3786                    let child_gen = gen1.create_child(parent_id);
3787                    ids1.push(child_gen.next_id());
3788                }
3789            }
3790
3791            // Second "execution" (replay) - must produce same IDs
3792            for op_type in &operations {
3793                ids2.push(gen2.next_id());
3794
3795                if *op_type == 2 {
3796                    let parent_id = ids2.last().unwrap().clone();
3797                    let child_gen = gen2.create_child(parent_id);
3798                    ids2.push(child_gen.next_id());
3799                }
3800            }
3801
3802            // Both executions must produce identical ID sequences
3803            prop_assert_eq!(&ids1, &ids2, "Replay must produce identical operation IDs");
3804        }
3805    }
3806
3807    // Property 5: Concurrent ID Generation Uniqueness
3808    // *For any* number of concurrent tasks generating operation IDs from the same
3809    // DurableContext, all generated IDs SHALL be unique.
3810    // **Validates: Requirements 17.3**
3811    mod concurrent_id_tests {
3812        use super::*;
3813        use std::sync::Arc;
3814        use std::thread;
3815
3816        proptest! {
3817            #![proptest_config(ProptestConfig::with_cases(100))]
3818
3819            /// Feature: durable-execution-rust-sdk, Property 5: Concurrent ID Generation Uniqueness
3820            /// Validates: Requirements 17.3
3821            #[test]
3822            fn prop_concurrent_id_uniqueness(
3823                base_id in "[a-zA-Z0-9:/-]{1,100}",
3824                num_threads in 2usize..10usize,
3825                ids_per_thread in 10usize..100usize,
3826            ) {
3827                let gen = Arc::new(OperationIdGenerator::new(&base_id));
3828                let mut handles = vec![];
3829
3830                // Spawn multiple threads that concurrently generate IDs
3831                for _ in 0..num_threads {
3832                    let gen_clone = gen.clone();
3833                    let count = ids_per_thread;
3834                    handles.push(thread::spawn(move || {
3835                        let mut ids = Vec::with_capacity(count);
3836                        for _ in 0..count {
3837                            ids.push(gen_clone.next_id());
3838                        }
3839                        ids
3840                    }));
3841                }
3842
3843                // Collect all IDs from all threads
3844                let mut all_ids = Vec::new();
3845                for handle in handles {
3846                    all_ids.extend(handle.join().unwrap());
3847                }
3848
3849                let total_expected = num_threads * ids_per_thread;
3850
3851                // Verify we got the expected number of IDs
3852                prop_assert_eq!(all_ids.len(), total_expected, "Should have generated {} IDs", total_expected);
3853
3854                // Verify all IDs are unique
3855                let unique_count = {
3856                    let mut set = std::collections::HashSet::new();
3857                    for id in &all_ids {
3858                        set.insert(id.clone());
3859                    }
3860                    set.len()
3861                };
3862
3863                prop_assert_eq!(
3864                    unique_count,
3865                    total_expected,
3866                    "All {} IDs must be unique, but only {} were unique",
3867                    total_expected,
3868                    unique_count
3869                );
3870
3871                // Verify the counter was incremented correctly
3872                prop_assert_eq!(
3873                    gen.current_counter() as usize,
3874                    total_expected,
3875                    "Counter should equal total IDs generated"
3876                );
3877            }
3878
3879            /// Feature: durable-execution-rust-sdk, Property 5: Concurrent ID Generation Uniqueness (Stress)
3880            /// Validates: Requirements 17.3
3881            #[test]
3882            fn prop_concurrent_id_uniqueness_stress(
3883                base_id in "[a-zA-Z0-9:/-]{1,50}",
3884            ) {
3885                // Fixed high-concurrency stress test
3886                let num_threads = 20;
3887                let ids_per_thread = 500;
3888
3889                let gen = Arc::new(OperationIdGenerator::new(&base_id));
3890                let mut handles = vec![];
3891
3892                for _ in 0..num_threads {
3893                    let gen_clone = gen.clone();
3894                    handles.push(thread::spawn(move || {
3895                        let mut ids = Vec::with_capacity(ids_per_thread);
3896                        for _ in 0..ids_per_thread {
3897                            ids.push(gen_clone.next_id());
3898                        }
3899                        ids
3900                    }));
3901                }
3902
3903                let mut all_ids = Vec::new();
3904                for handle in handles {
3905                    all_ids.extend(handle.join().unwrap());
3906                }
3907
3908                let total_expected = num_threads * ids_per_thread;
3909
3910                // Verify all IDs are unique
3911                let unique_count = {
3912                    let mut set = std::collections::HashSet::new();
3913                    for id in &all_ids {
3914                        set.insert(id.clone());
3915                    }
3916                    set.len()
3917                };
3918
3919                prop_assert_eq!(
3920                    unique_count,
3921                    total_expected,
3922                    "All {} IDs must be unique under high concurrency",
3923                    total_expected
3924                );
3925            }
3926        }
3927    }
3928
3929    // Property 9: Logging Methods Automatic Context
3930    // *For any* call to log_info, log_debug, log_warn, or log_error on DurableContext,
3931    // the resulting log message SHALL include durable_execution_arn and parent_id
3932    // (when available) without the caller needing to specify them.
3933    // **Validates: Requirements 4.5**
3934    mod logging_automatic_context_tests {
3935        use super::*;
3936        use std::sync::{Arc, Mutex};
3937
3938        proptest! {
3939            #![proptest_config(ProptestConfig::with_cases(100))]
3940
3941            /// Feature: sdk-ergonomics-improvements, Property 9: Logging Methods Automatic Context
3942            /// Validates: Requirements 4.5
3943            #[test]
3944            fn prop_logging_automatic_context(
3945                message in "[a-zA-Z0-9 ]{1,100}",
3946                log_level in 0u8..4u8,
3947            ) {
3948                use crate::client::MockDurableServiceClient;
3949                use crate::lambda::InitialExecutionState;
3950
3951                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3952                let captured_info_clone = captured_info.clone();
3953
3954                // Create a logger that captures the LogInfo
3955                let inner = Arc::new(custom_logger(
3956                    {
3957                        let captured = captured_info_clone.clone();
3958                        move |_, info: &LogInfo| {
3959                            *captured.lock().unwrap() = Some(info.clone());
3960                        }
3961                    },
3962                    {
3963                        let captured = captured_info_clone.clone();
3964                        move |_, info: &LogInfo| {
3965                            *captured.lock().unwrap() = Some(info.clone());
3966                        }
3967                    },
3968                    {
3969                        let captured = captured_info_clone.clone();
3970                        move |_, info: &LogInfo| {
3971                            *captured.lock().unwrap() = Some(info.clone());
3972                        }
3973                    },
3974                    {
3975                        let captured = captured_info_clone.clone();
3976                        move |_, info: &LogInfo| {
3977                            *captured.lock().unwrap() = Some(info.clone());
3978                        }
3979                    },
3980                ));
3981
3982                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3983                let state = Arc::new(ExecutionState::new(
3984                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3985                    "token-123",
3986                    InitialExecutionState::new(),
3987                    client,
3988                ));
3989                let ctx = DurableContext::new(state).with_logger(inner);
3990
3991                // Call the appropriate log method based on log_level
3992                match log_level {
3993                    0 => ctx.log_debug(&message),
3994                    1 => ctx.log_info(&message),
3995                    2 => ctx.log_warn(&message),
3996                    _ => ctx.log_error(&message),
3997                }
3998
3999                // Verify the captured LogInfo has the automatic context
4000                let captured = captured_info.lock().unwrap();
4001                let info = captured.as_ref().expect("LogInfo should be captured");
4002
4003                // durable_execution_arn must always be present
4004                prop_assert!(
4005                    info.durable_execution_arn.is_some(),
4006                    "durable_execution_arn must be automatically included"
4007                );
4008                prop_assert_eq!(
4009                    info.durable_execution_arn.as_ref().unwrap(),
4010                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4011                    "durable_execution_arn must match the context's ARN"
4012                );
4013            }
4014
4015            /// Feature: sdk-ergonomics-improvements, Property 9: Logging Methods Automatic Context (Child Context)
4016            /// Validates: Requirements 4.5
4017            #[test]
4018            fn prop_logging_automatic_context_child(
4019                message in "[a-zA-Z0-9 ]{1,100}",
4020                log_level in 0u8..4u8,
4021            ) {
4022                use crate::client::MockDurableServiceClient;
4023                use crate::lambda::InitialExecutionState;
4024
4025                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
4026                let captured_info_clone = captured_info.clone();
4027
4028                // Create a logger that captures the LogInfo
4029                let inner: Arc<dyn Logger> = Arc::new(custom_logger(
4030                    {
4031                        let captured = captured_info_clone.clone();
4032                        move |_, info: &LogInfo| {
4033                            *captured.lock().unwrap() = Some(info.clone());
4034                        }
4035                    },
4036                    {
4037                        let captured = captured_info_clone.clone();
4038                        move |_, info: &LogInfo| {
4039                            *captured.lock().unwrap() = Some(info.clone());
4040                        }
4041                    },
4042                    {
4043                        let captured = captured_info_clone.clone();
4044                        move |_, info: &LogInfo| {
4045                            *captured.lock().unwrap() = Some(info.clone());
4046                        }
4047                    },
4048                    {
4049                        let captured = captured_info_clone.clone();
4050                        move |_, info: &LogInfo| {
4051                            *captured.lock().unwrap() = Some(info.clone());
4052                        }
4053                    },
4054                ));
4055
4056                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4057                let state = Arc::new(ExecutionState::new(
4058                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4059                    "token-123",
4060                    InitialExecutionState::new(),
4061                    client,
4062                ));
4063                let ctx = DurableContext::new(state).with_logger(inner.clone());
4064
4065                // Create a child context
4066                let parent_op_id = ctx.next_operation_id();
4067                let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
4068
4069                // Call the appropriate log method based on log_level
4070                match log_level {
4071                    0 => child_ctx.log_debug(&message),
4072                    1 => child_ctx.log_info(&message),
4073                    2 => child_ctx.log_warn(&message),
4074                    _ => child_ctx.log_error(&message),
4075                }
4076
4077                // Verify the captured LogInfo has the automatic context including parent_id
4078                let captured = captured_info.lock().unwrap();
4079                let info = captured.as_ref().expect("LogInfo should be captured");
4080
4081                // durable_execution_arn must always be present
4082                prop_assert!(
4083                    info.durable_execution_arn.is_some(),
4084                    "durable_execution_arn must be automatically included in child context"
4085                );
4086
4087                // parent_id must be present in child context
4088                prop_assert!(
4089                    info.parent_id.is_some(),
4090                    "parent_id must be automatically included in child context"
4091                );
4092                prop_assert_eq!(
4093                    info.parent_id.as_ref().unwrap(),
4094                    &parent_op_id,
4095                    "parent_id must match the parent operation ID"
4096                );
4097            }
4098        }
4099    }
4100
4101    // Property 10: Logging Methods Extra Fields
4102    // *For any* call to log_info_with, log_debug_with, log_warn_with, or log_error_with
4103    // with extra fields, those fields SHALL appear in the resulting log output.
4104    // **Validates: Requirements 4.6**
4105    mod logging_extra_fields_tests {
4106        use super::*;
4107        use std::sync::{Arc, Mutex};
4108
4109        proptest! {
4110            #![proptest_config(ProptestConfig::with_cases(100))]
4111
4112            /// Feature: sdk-ergonomics-improvements, Property 10: Logging Methods Extra Fields
4113            /// Validates: Requirements 4.6
4114            #[test]
4115            fn prop_logging_extra_fields(
4116                message in "[a-zA-Z0-9 ]{1,100}",
4117                log_level in 0u8..4u8,
4118                key1 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
4119                value1 in "[a-zA-Z0-9]{1,50}",
4120                key2 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
4121                value2 in "[a-zA-Z0-9]{1,50}",
4122            ) {
4123                use crate::client::MockDurableServiceClient;
4124                use crate::lambda::InitialExecutionState;
4125
4126                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
4127                let captured_info_clone = captured_info.clone();
4128
4129                // Create a logger that captures the LogInfo
4130                let inner = Arc::new(custom_logger(
4131                    {
4132                        let captured = captured_info_clone.clone();
4133                        move |_, info: &LogInfo| {
4134                            *captured.lock().unwrap() = Some(info.clone());
4135                        }
4136                    },
4137                    {
4138                        let captured = captured_info_clone.clone();
4139                        move |_, info: &LogInfo| {
4140                            *captured.lock().unwrap() = Some(info.clone());
4141                        }
4142                    },
4143                    {
4144                        let captured = captured_info_clone.clone();
4145                        move |_, info: &LogInfo| {
4146                            *captured.lock().unwrap() = Some(info.clone());
4147                        }
4148                    },
4149                    {
4150                        let captured = captured_info_clone.clone();
4151                        move |_, info: &LogInfo| {
4152                            *captured.lock().unwrap() = Some(info.clone());
4153                        }
4154                    },
4155                ));
4156
4157                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4158                let state = Arc::new(ExecutionState::new(
4159                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4160                    "token-123",
4161                    InitialExecutionState::new(),
4162                    client,
4163                ));
4164                let ctx = DurableContext::new(state).with_logger(inner);
4165
4166                // Create extra fields
4167                let fields: Vec<(&str, &str)> = vec![(&key1, &value1), (&key2, &value2)];
4168
4169                // Call the appropriate log method based on log_level
4170                match log_level {
4171                    0 => ctx.log_debug_with(&message, &fields),
4172                    1 => ctx.log_info_with(&message, &fields),
4173                    2 => ctx.log_warn_with(&message, &fields),
4174                    _ => ctx.log_error_with(&message, &fields),
4175                }
4176
4177                // Verify the captured LogInfo has the extra fields
4178                let captured = captured_info.lock().unwrap();
4179                let info = captured.as_ref().expect("LogInfo should be captured");
4180
4181                // Extra fields must be present
4182                prop_assert_eq!(
4183                    info.extra.len(),
4184                    2,
4185                    "Extra fields must be included in the log output"
4186                );
4187
4188                // Verify each field is present
4189                prop_assert!(
4190                    info.extra.contains(&(key1.clone(), value1.clone())),
4191                    "First extra field must be present: {}={}", key1, value1
4192                );
4193                prop_assert!(
4194                    info.extra.contains(&(key2.clone(), value2.clone())),
4195                    "Second extra field must be present: {}={}", key2, value2
4196                );
4197            }
4198
4199            /// Feature: sdk-ergonomics-improvements, Property 10: Logging Methods Extra Fields (Empty)
4200            /// Validates: Requirements 4.6
4201            #[test]
4202            fn prop_logging_extra_fields_empty(
4203                message in "[a-zA-Z0-9 ]{1,100}",
4204                log_level in 0u8..4u8,
4205            ) {
4206                use crate::client::MockDurableServiceClient;
4207                use crate::lambda::InitialExecutionState;
4208
4209                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
4210                let captured_info_clone = captured_info.clone();
4211
4212                // Create a logger that captures the LogInfo
4213                let inner = Arc::new(custom_logger(
4214                    {
4215                        let captured = captured_info_clone.clone();
4216                        move |_, info: &LogInfo| {
4217                            *captured.lock().unwrap() = Some(info.clone());
4218                        }
4219                    },
4220                    {
4221                        let captured = captured_info_clone.clone();
4222                        move |_, info: &LogInfo| {
4223                            *captured.lock().unwrap() = Some(info.clone());
4224                        }
4225                    },
4226                    {
4227                        let captured = captured_info_clone.clone();
4228                        move |_, info: &LogInfo| {
4229                            *captured.lock().unwrap() = Some(info.clone());
4230                        }
4231                    },
4232                    {
4233                        let captured = captured_info_clone.clone();
4234                        move |_, info: &LogInfo| {
4235                            *captured.lock().unwrap() = Some(info.clone());
4236                        }
4237                    },
4238                ));
4239
4240                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4241                let state = Arc::new(ExecutionState::new(
4242                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4243                    "token-123",
4244                    InitialExecutionState::new(),
4245                    client,
4246                ));
4247                let ctx = DurableContext::new(state).with_logger(inner);
4248
4249                // Call the appropriate log method with empty fields
4250                let empty_fields: &[(&str, &str)] = &[];
4251                match log_level {
4252                    0 => ctx.log_debug_with(&message, empty_fields),
4253                    1 => ctx.log_info_with(&message, empty_fields),
4254                    2 => ctx.log_warn_with(&message, empty_fields),
4255                    _ => ctx.log_error_with(&message, empty_fields),
4256                }
4257
4258                // Verify the captured LogInfo has no extra fields
4259                let captured = captured_info.lock().unwrap();
4260                let info = captured.as_ref().expect("LogInfo should be captured");
4261
4262                prop_assert!(
4263                    info.extra.is_empty(),
4264                    "Extra fields should be empty when none are provided"
4265                );
4266
4267                // But automatic context should still be present
4268                prop_assert!(
4269                    info.durable_execution_arn.is_some(),
4270                    "durable_execution_arn must still be present even with empty extra fields"
4271                );
4272            }
4273        }
4274    }
4275}
4276
4277#[cfg(test)]
4278mod sealed_trait_tests {
4279    use super::*;
4280    use std::sync::atomic::{AtomicUsize, Ordering};
4281
4282    /// Tests for sealed Logger trait implementations.
4283    ///
4284    /// The Logger trait is sealed, meaning it cannot be implemented outside this crate.
4285    /// This is enforced at compile time by requiring the private `Sealed` supertrait.
4286    /// External crates attempting to implement Logger will get a compile error:
4287    /// "the trait bound `MyType: Sealed` is not satisfied"
4288    ///
4289    /// These tests verify that the internal implementations work correctly.
4290    mod logger_tests {
4291        use super::*;
4292
4293        #[test]
4294        fn test_tracing_logger_implements_logger() {
4295            // TracingLogger should implement Logger (compile-time check)
4296            let logger: &dyn Logger = &TracingLogger;
4297            let info = LogInfo::default();
4298
4299            // These calls should not panic
4300            logger.debug("test debug", &info);
4301            logger.info("test info", &info);
4302            logger.warn("test warn", &info);
4303            logger.error("test error", &info);
4304        }
4305
4306        #[test]
4307        fn test_replay_aware_logger_implements_logger() {
4308            // ReplayAwareLogger should implement Logger (compile-time check)
4309            let inner = Arc::new(TracingLogger);
4310            let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::AllowAll);
4311            let logger_ref: &dyn Logger = &logger;
4312            let info = LogInfo::default();
4313
4314            // These calls should not panic
4315            logger_ref.debug("test debug", &info);
4316            logger_ref.info("test info", &info);
4317            logger_ref.warn("test warn", &info);
4318            logger_ref.error("test error", &info);
4319        }
4320
4321        #[test]
4322        fn test_custom_logger_implements_logger() {
4323            // CustomLogger should implement Logger (compile-time check)
4324            let call_count = Arc::new(AtomicUsize::new(0));
4325            let count_clone = call_count.clone();
4326
4327            let logger = custom_logger(
4328                {
4329                    let count = count_clone.clone();
4330                    move |_msg, _info| {
4331                        count.fetch_add(1, Ordering::SeqCst);
4332                    }
4333                },
4334                {
4335                    let count = count_clone.clone();
4336                    move |_msg, _info| {
4337                        count.fetch_add(1, Ordering::SeqCst);
4338                    }
4339                },
4340                {
4341                    let count = count_clone.clone();
4342                    move |_msg, _info| {
4343                        count.fetch_add(1, Ordering::SeqCst);
4344                    }
4345                },
4346                {
4347                    let count = count_clone.clone();
4348                    move |_msg, _info| {
4349                        count.fetch_add(1, Ordering::SeqCst);
4350                    }
4351                },
4352            );
4353
4354            let logger_ref: &dyn Logger = &logger;
4355            let info = LogInfo::default();
4356
4357            logger_ref.debug("test", &info);
4358            logger_ref.info("test", &info);
4359            logger_ref.warn("test", &info);
4360            logger_ref.error("test", &info);
4361
4362            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4363        }
4364
4365        #[test]
4366        fn test_simple_custom_logger() {
4367            let call_count = Arc::new(AtomicUsize::new(0));
4368            let count_clone = call_count.clone();
4369
4370            let logger = simple_custom_logger(move |_level, _msg, _info| {
4371                count_clone.fetch_add(1, Ordering::SeqCst);
4372            });
4373
4374            let info = LogInfo::default();
4375
4376            logger.debug("test", &info);
4377            logger.info("test", &info);
4378            logger.warn("test", &info);
4379            logger.error("test", &info);
4380
4381            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4382        }
4383
4384        #[test]
4385        fn test_custom_logger_receives_correct_messages() {
4386            let messages = Arc::new(std::sync::Mutex::new(Vec::new()));
4387            let messages_clone = messages.clone();
4388
4389            let logger = simple_custom_logger(move |level, msg, _info| {
4390                messages_clone
4391                    .lock()
4392                    .unwrap()
4393                    .push(format!("[{}] {}", level, msg));
4394            });
4395
4396            let info = LogInfo::default();
4397
4398            logger.debug("debug message", &info);
4399            logger.info("info message", &info);
4400            logger.warn("warn message", &info);
4401            logger.error("error message", &info);
4402
4403            let logged = messages.lock().unwrap();
4404            assert_eq!(logged.len(), 4);
4405            assert_eq!(logged[0], "[DEBUG] debug message");
4406            assert_eq!(logged[1], "[INFO] info message");
4407            assert_eq!(logged[2], "[WARN] warn message");
4408            assert_eq!(logged[3], "[ERROR] error message");
4409        }
4410
4411        #[test]
4412        fn test_custom_logger_receives_log_info() {
4413            let received_info = Arc::new(std::sync::Mutex::new(None));
4414            let info_clone = received_info.clone();
4415
4416            let logger = simple_custom_logger(move |_level, _msg, info| {
4417                *info_clone.lock().unwrap() = Some(info.clone());
4418            });
4419
4420            let info = LogInfo::new("arn:aws:test")
4421                .with_operation_id("op-123")
4422                .with_parent_id("parent-456")
4423                .with_replay(true);
4424
4425            logger.info("test", &info);
4426
4427            let received = received_info.lock().unwrap().clone().unwrap();
4428            assert_eq!(
4429                received.durable_execution_arn,
4430                Some("arn:aws:test".to_string())
4431            );
4432            assert_eq!(received.operation_id, Some("op-123".to_string()));
4433            assert_eq!(received.parent_id, Some("parent-456".to_string()));
4434            assert!(received.is_replay);
4435        }
4436
4437        #[test]
4438        fn test_replay_aware_logger_suppresses_during_replay() {
4439            let call_count = Arc::new(AtomicUsize::new(0));
4440            let count_clone = call_count.clone();
4441
4442            let inner_logger = Arc::new(custom_logger(
4443                {
4444                    let count = count_clone.clone();
4445                    move |_msg, _info| {
4446                        count.fetch_add(1, Ordering::SeqCst);
4447                    }
4448                },
4449                {
4450                    let count = count_clone.clone();
4451                    move |_msg, _info| {
4452                        count.fetch_add(1, Ordering::SeqCst);
4453                    }
4454                },
4455                {
4456                    let count = count_clone.clone();
4457                    move |_msg, _info| {
4458                        count.fetch_add(1, Ordering::SeqCst);
4459                    }
4460                },
4461                {
4462                    let count = count_clone.clone();
4463                    move |_msg, _info| {
4464                        count.fetch_add(1, Ordering::SeqCst);
4465                    }
4466                },
4467            ));
4468
4469            let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::SuppressAll);
4470
4471            // Non-replay logs should pass through
4472            let non_replay_info = LogInfo::default().with_replay(false);
4473            logger.debug("test", &non_replay_info);
4474            logger.info("test", &non_replay_info);
4475            logger.warn("test", &non_replay_info);
4476            logger.error("test", &non_replay_info);
4477            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4478
4479            // Replay logs should be suppressed
4480            let replay_info = LogInfo::default().with_replay(true);
4481            logger.debug("test", &replay_info);
4482            logger.info("test", &replay_info);
4483            logger.warn("test", &replay_info);
4484            logger.error("test", &replay_info);
4485            assert_eq!(call_count.load(Ordering::SeqCst), 4); // Still 4, no new calls
4486        }
4487
4488        #[test]
4489        fn test_replay_aware_logger_errors_only_during_replay() {
4490            let call_count = Arc::new(AtomicUsize::new(0));
4491            let count_clone = call_count.clone();
4492
4493            let inner_logger = Arc::new(custom_logger(
4494                {
4495                    let count = count_clone.clone();
4496                    move |_msg, _info| {
4497                        count.fetch_add(1, Ordering::SeqCst);
4498                    }
4499                },
4500                {
4501                    let count = count_clone.clone();
4502                    move |_msg, _info| {
4503                        count.fetch_add(1, Ordering::SeqCst);
4504                    }
4505                },
4506                {
4507                    let count = count_clone.clone();
4508                    move |_msg, _info| {
4509                        count.fetch_add(1, Ordering::SeqCst);
4510                    }
4511                },
4512                {
4513                    let count = count_clone.clone();
4514                    move |_msg, _info| {
4515                        count.fetch_add(1, Ordering::SeqCst);
4516                    }
4517                },
4518            ));
4519
4520            let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::ErrorsOnly);
4521
4522            let replay_info = LogInfo::default().with_replay(true);
4523            logger.debug("test", &replay_info);
4524            logger.info("test", &replay_info);
4525            logger.warn("test", &replay_info);
4526            logger.error("test", &replay_info);
4527
4528            // Only error should pass through during replay
4529            assert_eq!(call_count.load(Ordering::SeqCst), 1);
4530        }
4531    }
4532}