Skip to main content

durable_execution_sdk/
context.rs

1//! DurableContext and operation identifier types for the AWS Durable Execution SDK.
2//!
3//! This module provides the main `DurableContext` interface that user code interacts with,
4//! as well as the `OperationIdentifier` type for deterministic operation ID generation.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use blake2::{Blake2b512, Digest};
10
11use crate::error::DurableResult;
12use crate::handlers::StepContext;
13use crate::sealed::Sealed;
14use crate::state::ExecutionState;
15use crate::traits::DurableValue;
16use crate::types::OperationId;
17
18/// Identifies an operation within a durable execution.
19///
20/// Each operation has a unique ID that is deterministically generated
21/// based on the parent context and step counter. This ensures that
22/// the same operation gets the same ID across replays.
23#[derive(Debug, Clone, PartialEq, Eq, Hash)]
24pub struct OperationIdentifier {
25    /// The unique operation ID (deterministically generated)
26    pub operation_id: String,
27    /// The parent operation ID (None for root operations)
28    pub parent_id: Option<String>,
29    /// Optional human-readable name for the operation
30    pub name: Option<String>,
31}
32
33impl OperationIdentifier {
34    /// Creates a new OperationIdentifier with the given values.
35    pub fn new(
36        operation_id: impl Into<String>,
37        parent_id: Option<String>,
38        name: Option<String>,
39    ) -> Self {
40        Self {
41            operation_id: operation_id.into(),
42            parent_id,
43            name,
44        }
45    }
46
47    /// Creates a root operation identifier (no parent).
48    pub fn root(operation_id: impl Into<String>) -> Self {
49        Self {
50            operation_id: operation_id.into(),
51            parent_id: None,
52            name: None,
53        }
54    }
55
56    /// Creates an operation identifier with a parent.
57    pub fn with_parent(operation_id: impl Into<String>, parent_id: impl Into<String>) -> Self {
58        Self {
59            operation_id: operation_id.into(),
60            parent_id: Some(parent_id.into()),
61            name: None,
62        }
63    }
64
65    /// Sets the name for this operation identifier.
66    pub fn with_name(mut self, name: impl Into<String>) -> Self {
67        self.name = Some(name.into());
68        self
69    }
70
71    /// Returns the operation ID as an `OperationId` newtype.
72    #[inline]
73    pub fn operation_id_typed(&self) -> OperationId {
74        OperationId::from(self.operation_id.clone())
75    }
76
77    /// Returns the parent ID as an `OperationId` newtype, if present.
78    #[inline]
79    pub fn parent_id_typed(&self) -> Option<OperationId> {
80        self.parent_id
81            .as_ref()
82            .map(|id| OperationId::from(id.clone()))
83    }
84
85    /// Applies this identifier's parent_id and name to an `OperationUpdate`.
86    ///
87    /// This is a convenience method to avoid repeated boilerplate when building
88    /// operation updates across handlers.
89    pub fn apply_to(
90        &self,
91        mut update: crate::operation::OperationUpdate,
92    ) -> crate::operation::OperationUpdate {
93        if let Some(ref parent_id) = self.parent_id {
94            update = update.with_parent_id(parent_id);
95        }
96        if let Some(ref name) = self.name {
97            update = update.with_name(name);
98        }
99        update
100    }
101}
102
103impl std::fmt::Display for OperationIdentifier {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        if let Some(ref name) = self.name {
106            write!(f, "{}({})", name, self.operation_id)
107        } else {
108            write!(f, "{}", self.operation_id)
109        }
110    }
111}
112
113/// Generates deterministic operation IDs using blake2b hashing.
114///
115/// The ID is generated by hashing the parent ID (or execution ARN for root)
116/// combined with the step counter value. This ensures:
117/// - Same inputs always produce the same ID
118/// - Different step counts produce different IDs
119/// - IDs are unique within an execution
120#[derive(Debug)]
121pub struct OperationIdGenerator {
122    /// The base identifier (execution ARN or parent operation ID)
123    base_id: String,
124    /// Thread-safe step counter
125    step_counter: AtomicU64,
126}
127
128impl OperationIdGenerator {
129    /// Creates a new OperationIdGenerator with the given base ID.
130    ///
131    /// # Arguments
132    ///
133    /// * `base_id` - The base identifier to use for hashing (typically execution ARN or parent ID)
134    pub fn new(base_id: impl Into<String>) -> Self {
135        Self {
136            base_id: base_id.into(),
137            step_counter: AtomicU64::new(0),
138        }
139    }
140
141    /// Creates a new OperationIdGenerator with a specific starting counter value.
142    ///
143    /// # Arguments
144    ///
145    /// * `base_id` - The base identifier to use for hashing
146    /// * `initial_counter` - The initial value for the step counter
147    pub fn with_counter(base_id: impl Into<String>, initial_counter: u64) -> Self {
148        Self {
149            base_id: base_id.into(),
150            step_counter: AtomicU64::new(initial_counter),
151        }
152    }
153
154    /// Generates the next operation ID.
155    ///
156    /// This method atomically increments the step counter and generates
157    /// a deterministic ID based on the base ID and counter value.
158    ///
159    /// # Returns
160    ///
161    /// A unique, deterministic operation ID string.
162    ///
163    /// # Memory Ordering
164    ///
165    /// Uses `Ordering::Relaxed` because the step counter only needs to provide
166    /// unique values - there's no synchronization requirement with other data.
167    /// Each call gets a unique counter value, and the hash function ensures
168    /// deterministic ID generation regardless of ordering between threads.
169    ///
170    /// Requirements: 4.1, 4.6
171    pub fn next_id(&self) -> String {
172        // Relaxed ordering is sufficient: we only need uniqueness, not synchronization.
173        // The counter value is combined with base_id in a hash, so the only requirement
174        // is that each call gets a different counter value, which fetch_add guarantees.
175        let counter = self.step_counter.fetch_add(1, Ordering::Relaxed);
176        generate_operation_id(&self.base_id, counter)
177    }
178
179    /// Generates an operation ID for a specific counter value without incrementing.
180    ///
181    /// This is useful for testing or when you need to generate an ID
182    /// for a known counter value.
183    ///
184    /// # Arguments
185    ///
186    /// * `counter` - The counter value to use for ID generation
187    ///
188    /// # Returns
189    ///
190    /// A deterministic operation ID string.
191    pub fn id_for_counter(&self, counter: u64) -> String {
192        generate_operation_id(&self.base_id, counter)
193    }
194
195    /// Returns the current counter value without incrementing.
196    ///
197    /// # Memory Ordering
198    ///
199    /// Uses `Ordering::Relaxed` because this is an informational read that doesn't
200    /// need to synchronize with other operations. The value may be slightly stale
201    /// in concurrent scenarios, but this is acceptable for debugging/monitoring use.
202    ///
203    /// Requirements: 4.1, 4.6
204    pub fn current_counter(&self) -> u64 {
205        // Relaxed ordering is sufficient: this is an informational read with no
206        // synchronization requirements. Callers should not rely on this value
207        // for correctness in concurrent scenarios.
208        self.step_counter.load(Ordering::Relaxed)
209    }
210
211    /// Returns the base ID used for generation.
212    pub fn base_id(&self) -> &str {
213        &self.base_id
214    }
215
216    /// Creates a child generator with a new base ID.
217    ///
218    /// The child generator starts with counter 0 and uses the provided
219    /// parent operation ID as its base.
220    ///
221    /// # Arguments
222    ///
223    /// * `parent_operation_id` - The operation ID to use as the base for the child
224    pub fn create_child(&self, parent_operation_id: impl Into<String>) -> Self {
225        Self::new(parent_operation_id)
226    }
227}
228
229impl Clone for OperationIdGenerator {
230    fn clone(&self) -> Self {
231        Self {
232            base_id: self.base_id.clone(),
233            // Relaxed ordering is sufficient: we're creating a snapshot of the counter
234            // for a new generator. The clone doesn't need to synchronize with the
235            // original - it just needs a reasonable starting point.
236            step_counter: AtomicU64::new(self.step_counter.load(Ordering::Relaxed)),
237        }
238    }
239}
240
241/// Generates a deterministic operation ID using blake2b hashing.
242///
243/// # Arguments
244///
245/// * `base_id` - The base identifier (execution ARN or parent operation ID)
246/// * `counter` - The step counter value
247///
248/// # Returns
249///
250/// A hex-encoded operation ID string (first 32 characters of the hash).
251pub fn generate_operation_id(base_id: &str, counter: u64) -> String {
252    let mut hasher = Blake2b512::new();
253    hasher.update(base_id.as_bytes());
254    hasher.update(counter.to_le_bytes());
255    let result = hasher.finalize();
256
257    // Take first 16 bytes (32 hex chars) for a reasonably short but unique ID
258    hex::encode(&result[..16])
259}
260
261/// Logger trait for structured logging in durable executions.
262///
263/// This trait is compatible with the `tracing` crate and allows
264/// custom logging implementations.
265///
266/// # Sealed Trait
267///
268/// This trait is sealed and cannot be implemented outside of this crate.
269/// This allows the SDK maintainers to evolve the logging interface without
270/// breaking external code. If you need custom logging behavior, use the
271/// provided factory functions or wrap one of the existing implementations.
272#[allow(private_bounds)]
273pub trait Logger: Sealed + Send + Sync {
274    /// Logs a debug message.
275    fn debug(&self, message: &str, info: &LogInfo);
276    /// Logs an info message.
277    fn info(&self, message: &str, info: &LogInfo);
278    /// Logs a warning message.
279    fn warn(&self, message: &str, info: &LogInfo);
280    /// Logs an error message.
281    fn error(&self, message: &str, info: &LogInfo);
282}
283
284/// Context information for log messages.
285///
286/// This struct provides context for log messages including execution ARN,
287/// operation IDs, and replay status. The `is_replay` flag indicates whether
288/// the current operation is being replayed from a checkpoint.
289#[derive(Debug, Clone, Default)]
290pub struct LogInfo {
291    /// The durable execution ARN
292    pub durable_execution_arn: Option<String>,
293    /// The current operation ID
294    pub operation_id: Option<String>,
295    /// The parent operation ID
296    pub parent_id: Option<String>,
297    /// Whether the current operation is being replayed from a checkpoint
298    ///
299    /// When `true`, the operation is returning a previously checkpointed result
300    /// without re-executing the operation's logic. Loggers can use this flag
301    /// to suppress or annotate logs during replay.
302    pub is_replay: bool,
303    /// Additional key-value pairs
304    pub extra: Vec<(String, String)>,
305}
306
307impl LogInfo {
308    /// Creates a new LogInfo with the given execution ARN.
309    pub fn new(durable_execution_arn: impl Into<String>) -> Self {
310        Self {
311            durable_execution_arn: Some(durable_execution_arn.into()),
312            ..Default::default()
313        }
314    }
315
316    /// Sets the operation ID.
317    pub fn with_operation_id(mut self, operation_id: impl Into<String>) -> Self {
318        self.operation_id = Some(operation_id.into());
319        self
320    }
321
322    /// Sets the parent ID.
323    pub fn with_parent_id(mut self, parent_id: impl Into<String>) -> Self {
324        self.parent_id = Some(parent_id.into());
325        self
326    }
327
328    /// Sets the replay flag.
329    ///
330    /// When set to `true`, indicates that the current operation is being
331    /// replayed from a checkpoint rather than executing fresh.
332    ///
333    /// # Arguments
334    ///
335    /// * `is_replay` - Whether the operation is being replayed
336    pub fn with_replay(mut self, is_replay: bool) -> Self {
337        self.is_replay = is_replay;
338        self
339    }
340
341    /// Adds an extra key-value pair.
342    pub fn with_extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
343        self.extra.push((key.into(), value.into()));
344        self
345    }
346}
347
348/// Creates a tracing span for a durable operation.
349///
350/// This function creates a structured tracing span that includes all relevant
351/// context for observability and debugging. The span can be used to trace
352/// execution flow and correlate logs across operations.
353///
354/// # Arguments
355///
356/// * `operation_type` - The type of operation (e.g., "step", "wait", "callback", "invoke", "map", "parallel")
357/// * `op_id` - The operation identifier containing operation_id, parent_id, and name
358/// * `durable_execution_arn` - The ARN of the durable execution
359///
360/// # Returns
361///
362/// A `tracing::Span` that can be entered during operation execution.
363///
364/// # Example
365///
366/// ```rust,ignore
367/// let span = create_operation_span("step", &op_id, state.durable_execution_arn());
368/// let _guard = span.enter();
369/// // ... operation execution ...
370/// ```
371pub fn create_operation_span(
372    operation_type: &str,
373    op_id: &OperationIdentifier,
374    durable_execution_arn: &str,
375) -> tracing::Span {
376    tracing::info_span!(
377        "durable_operation",
378        operation_type = %operation_type,
379        operation_id = %op_id.operation_id,
380        parent_id = ?op_id.parent_id,
381        name = ?op_id.name,
382        durable_execution_arn = %durable_execution_arn,
383        status = tracing::field::Empty,
384    )
385}
386
387/// Default logger implementation using the `tracing` crate.
388///
389/// This logger includes the `is_replay` flag in all log messages to help
390/// distinguish between fresh executions and replayed operations.
391///
392/// Extra fields from `LogInfo` are included in the tracing output as a formatted
393/// string of key-value pairs, making them queryable in log aggregation systems.
394#[derive(Debug, Clone, Default)]
395pub struct TracingLogger;
396
397// Implement Sealed for TracingLogger to allow it to implement Logger
398impl Sealed for TracingLogger {}
399
400impl TracingLogger {
401    /// Formats extra fields as a string of key-value pairs.
402    ///
403    /// Returns an empty string if there are no extra fields, otherwise returns
404    /// a comma-separated list of "key=value" pairs.
405    fn format_extra_fields(extra: &[(String, String)]) -> String {
406        if extra.is_empty() {
407            String::new()
408        } else {
409            extra
410                .iter()
411                .map(|(k, v)| format!("{}={}", k, v))
412                .collect::<Vec<_>>()
413                .join(", ")
414        }
415    }
416}
417
418impl Logger for TracingLogger {
419    fn debug(&self, message: &str, info: &LogInfo) {
420        let extra_fields = Self::format_extra_fields(&info.extra);
421        tracing::debug!(
422            durable_execution_arn = ?info.durable_execution_arn,
423            operation_id = ?info.operation_id,
424            parent_id = ?info.parent_id,
425            is_replay = info.is_replay,
426            extra = %extra_fields,
427            "{}",
428            message
429        );
430    }
431
432    fn info(&self, message: &str, info: &LogInfo) {
433        let extra_fields = Self::format_extra_fields(&info.extra);
434        tracing::info!(
435            durable_execution_arn = ?info.durable_execution_arn,
436            operation_id = ?info.operation_id,
437            parent_id = ?info.parent_id,
438            is_replay = info.is_replay,
439            extra = %extra_fields,
440            "{}",
441            message
442        );
443    }
444
445    fn warn(&self, message: &str, info: &LogInfo) {
446        let extra_fields = Self::format_extra_fields(&info.extra);
447        tracing::warn!(
448            durable_execution_arn = ?info.durable_execution_arn,
449            operation_id = ?info.operation_id,
450            parent_id = ?info.parent_id,
451            is_replay = info.is_replay,
452            extra = %extra_fields,
453            "{}",
454            message
455        );
456    }
457
458    fn error(&self, message: &str, info: &LogInfo) {
459        let extra_fields = Self::format_extra_fields(&info.extra);
460        tracing::error!(
461            durable_execution_arn = ?info.durable_execution_arn,
462            operation_id = ?info.operation_id,
463            parent_id = ?info.parent_id,
464            is_replay = info.is_replay,
465            extra = %extra_fields,
466            "{}",
467            message
468        );
469    }
470}
471
472/// Configuration for replay-aware logging behavior.
473///
474/// This configuration controls how the `ReplayAwareLogger` handles log messages
475/// during replay. Users can choose to suppress all logs during replay, allow
476/// only certain log levels, or log all messages regardless of replay status.
477#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
478pub enum ReplayLoggingConfig {
479    /// Suppress all logs during replay (default).
480    ///
481    /// When in replay mode, no log messages will be emitted. This is useful
482    /// to reduce noise in logs when replaying previously executed operations.
483    #[default]
484    SuppressAll,
485
486    /// Allow all logs during replay.
487    ///
488    /// Log messages will be emitted regardless of replay status. The `is_replay`
489    /// flag in `LogInfo` can still be used to distinguish replay logs.
490    AllowAll,
491
492    /// Allow only error logs during replay.
493    ///
494    /// Only error-level messages will be emitted during replay. This is useful
495    /// when you want to see errors but suppress informational messages.
496    ErrorsOnly,
497
498    /// Allow error and warning logs during replay.
499    ///
500    /// Error and warning-level messages will be emitted during replay.
501    /// Debug and info messages will be suppressed.
502    WarningsAndErrors,
503}
504
505/// A logger wrapper that can suppress logs during replay based on configuration.
506///
507/// `ReplayAwareLogger` wraps any `Logger` implementation and adds replay-aware
508/// behavior. When the `is_replay` flag in `LogInfo` is `true`, the logger will
509/// suppress or allow logs based on the configured `ReplayLoggingConfig`.
510///
511/// # Example
512///
513/// ```rust
514/// use durable_execution_sdk::{TracingLogger, ReplayAwareLogger, ReplayLoggingConfig};
515/// use std::sync::Arc;
516///
517/// // Create a replay-aware logger that suppresses all logs during replay
518/// let logger = ReplayAwareLogger::new(
519///     Arc::new(TracingLogger),
520///     ReplayLoggingConfig::SuppressAll,
521/// );
522///
523/// // Create a replay-aware logger that allows errors during replay
524/// let logger_with_errors = ReplayAwareLogger::new(
525///     Arc::new(TracingLogger),
526///     ReplayLoggingConfig::ErrorsOnly,
527/// );
528/// ```
529pub struct ReplayAwareLogger {
530    /// The underlying logger to delegate to
531    inner: Arc<dyn Logger>,
532    /// Configuration for replay logging behavior
533    config: ReplayLoggingConfig,
534}
535
536// Implement Sealed for ReplayAwareLogger to allow it to implement Logger
537impl Sealed for ReplayAwareLogger {}
538
539impl ReplayAwareLogger {
540    /// Creates a new `ReplayAwareLogger` with the specified configuration.
541    ///
542    /// # Arguments
543    ///
544    /// * `inner` - The underlying logger to delegate to
545    /// * `config` - Configuration for replay logging behavior
546    ///
547    /// # Example
548    ///
549    /// ```rust
550    /// use durable_execution_sdk::{TracingLogger, ReplayAwareLogger, ReplayLoggingConfig};
551    /// use std::sync::Arc;
552    ///
553    /// let logger = ReplayAwareLogger::new(
554    ///     Arc::new(TracingLogger),
555    ///     ReplayLoggingConfig::SuppressAll,
556    /// );
557    /// ```
558    pub fn new(inner: Arc<dyn Logger>, config: ReplayLoggingConfig) -> Self {
559        Self { inner, config }
560    }
561
562    /// Creates a new `ReplayAwareLogger` that suppresses all logs during replay.
563    ///
564    /// This is a convenience constructor for the most common use case.
565    ///
566    /// # Arguments
567    ///
568    /// * `inner` - The underlying logger to delegate to
569    pub fn suppress_replay(inner: Arc<dyn Logger>) -> Self {
570        Self::new(inner, ReplayLoggingConfig::SuppressAll)
571    }
572
573    /// Creates a new `ReplayAwareLogger` that allows all logs during replay.
574    ///
575    /// # Arguments
576    ///
577    /// * `inner` - The underlying logger to delegate to
578    pub fn allow_all(inner: Arc<dyn Logger>) -> Self {
579        Self::new(inner, ReplayLoggingConfig::AllowAll)
580    }
581
582    /// Returns the current replay logging configuration.
583    pub fn config(&self) -> ReplayLoggingConfig {
584        self.config
585    }
586
587    /// Returns a reference to the underlying logger.
588    pub fn inner(&self) -> &Arc<dyn Logger> {
589        &self.inner
590    }
591
592    /// Checks if a log at the given level should be suppressed during replay.
593    fn should_suppress(&self, info: &LogInfo, level: LogLevel) -> bool {
594        if !info.is_replay {
595            return false;
596        }
597
598        match self.config {
599            ReplayLoggingConfig::SuppressAll => true,
600            ReplayLoggingConfig::AllowAll => false,
601            ReplayLoggingConfig::ErrorsOnly => level != LogLevel::Error,
602            ReplayLoggingConfig::WarningsAndErrors => {
603                level != LogLevel::Error && level != LogLevel::Warn
604            }
605        }
606    }
607}
608
609/// Internal enum for log levels used by ReplayAwareLogger.
610#[derive(Debug, Clone, Copy, PartialEq, Eq)]
611enum LogLevel {
612    Debug,
613    Info,
614    Warn,
615    Error,
616}
617
618impl std::fmt::Debug for ReplayAwareLogger {
619    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
620        f.debug_struct("ReplayAwareLogger")
621            .field("config", &self.config)
622            .finish()
623    }
624}
625
626impl Logger for ReplayAwareLogger {
627    fn debug(&self, message: &str, info: &LogInfo) {
628        if !self.should_suppress(info, LogLevel::Debug) {
629            self.inner.debug(message, info);
630        }
631    }
632
633    fn info(&self, message: &str, info: &LogInfo) {
634        if !self.should_suppress(info, LogLevel::Info) {
635            self.inner.info(message, info);
636        }
637    }
638
639    fn warn(&self, message: &str, info: &LogInfo) {
640        if !self.should_suppress(info, LogLevel::Warn) {
641            self.inner.warn(message, info);
642        }
643    }
644
645    fn error(&self, message: &str, info: &LogInfo) {
646        if !self.should_suppress(info, LogLevel::Error) {
647            self.inner.error(message, info);
648        }
649    }
650}
651
652/// A custom logger that delegates to user-provided closures.
653///
654/// This struct allows users to provide custom logging behavior without
655/// implementing the sealed `Logger` trait directly. Each log level can
656/// have its own closure for handling log messages.
657///
658/// # Example
659///
660/// ```rust
661/// use durable_execution_sdk::context::{custom_logger, LogInfo};
662/// use std::sync::Arc;
663///
664/// // Create a custom logger that prints to stdout
665/// let logger = custom_logger(
666///     |msg, info| println!("[DEBUG] {}: {:?}", msg, info),
667///     |msg, info| println!("[INFO] {}: {:?}", msg, info),
668///     |msg, info| println!("[WARN] {}: {:?}", msg, info),
669///     |msg, info| println!("[ERROR] {}: {:?}", msg, info),
670/// );
671/// ```
672pub struct CustomLogger<D, I, W, E>
673where
674    D: Fn(&str, &LogInfo) + Send + Sync,
675    I: Fn(&str, &LogInfo) + Send + Sync,
676    W: Fn(&str, &LogInfo) + Send + Sync,
677    E: Fn(&str, &LogInfo) + Send + Sync,
678{
679    debug_fn: D,
680    info_fn: I,
681    warn_fn: W,
682    error_fn: E,
683}
684
685// Implement Sealed for CustomLogger to allow it to implement Logger
686impl<D, I, W, E> Sealed for CustomLogger<D, I, W, E>
687where
688    D: Fn(&str, &LogInfo) + Send + Sync,
689    I: Fn(&str, &LogInfo) + Send + Sync,
690    W: Fn(&str, &LogInfo) + Send + Sync,
691    E: Fn(&str, &LogInfo) + Send + Sync,
692{
693}
694
695impl<D, I, W, E> Logger for CustomLogger<D, I, W, E>
696where
697    D: Fn(&str, &LogInfo) + Send + Sync,
698    I: Fn(&str, &LogInfo) + Send + Sync,
699    W: Fn(&str, &LogInfo) + Send + Sync,
700    E: Fn(&str, &LogInfo) + Send + Sync,
701{
702    fn debug(&self, message: &str, info: &LogInfo) {
703        (self.debug_fn)(message, info);
704    }
705
706    fn info(&self, message: &str, info: &LogInfo) {
707        (self.info_fn)(message, info);
708    }
709
710    fn warn(&self, message: &str, info: &LogInfo) {
711        (self.warn_fn)(message, info);
712    }
713
714    fn error(&self, message: &str, info: &LogInfo) {
715        (self.error_fn)(message, info);
716    }
717}
718
719impl<D, I, W, E> std::fmt::Debug for CustomLogger<D, I, W, E>
720where
721    D: Fn(&str, &LogInfo) + Send + Sync,
722    I: Fn(&str, &LogInfo) + Send + Sync,
723    W: Fn(&str, &LogInfo) + Send + Sync,
724    E: Fn(&str, &LogInfo) + Send + Sync,
725{
726    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727        f.debug_struct("CustomLogger").finish()
728    }
729}
730
731/// Creates a custom logger with user-provided closures for each log level.
732///
733/// This factory function allows users to create custom logging behavior without
734/// implementing the sealed `Logger` trait directly. Each log level has its own
735/// closure that receives the log message and context information.
736///
737/// # Arguments
738///
739/// * `debug_fn` - Closure called for debug-level messages
740/// * `info_fn` - Closure called for info-level messages
741/// * `warn_fn` - Closure called for warning-level messages
742/// * `error_fn` - Closure called for error-level messages
743///
744/// # Example
745///
746/// ```rust
747/// use durable_execution_sdk::context::{custom_logger, LogInfo};
748/// use std::sync::Arc;
749///
750/// // Create a custom logger that prints to stdout
751/// let logger = custom_logger(
752///     |msg, info| println!("[DEBUG] {}: {:?}", msg, info),
753///     |msg, info| println!("[INFO] {}: {:?}", msg, info),
754///     |msg, info| println!("[WARN] {}: {:?}", msg, info),
755///     |msg, info| println!("[ERROR] {}: {:?}", msg, info),
756/// );
757///
758/// // Use with Arc for sharing across contexts
759/// let shared_logger = Arc::new(logger);
760/// ```
761pub fn custom_logger<D, I, W, E>(
762    debug_fn: D,
763    info_fn: I,
764    warn_fn: W,
765    error_fn: E,
766) -> CustomLogger<D, I, W, E>
767where
768    D: Fn(&str, &LogInfo) + Send + Sync,
769    I: Fn(&str, &LogInfo) + Send + Sync,
770    W: Fn(&str, &LogInfo) + Send + Sync,
771    E: Fn(&str, &LogInfo) + Send + Sync,
772{
773    CustomLogger {
774        debug_fn,
775        info_fn,
776        warn_fn,
777        error_fn,
778    }
779}
780
781/// Creates a simple custom logger that uses a single closure for all log levels.
782///
783/// This is a convenience function for cases where you want the same handling
784/// for all log levels. The closure receives the log level as a string, the
785/// message, and the log info.
786///
787/// # Arguments
788///
789/// * `log_fn` - Closure called for all log messages. Receives (level, message, info).
790///
791/// # Example
792///
793/// ```rust
794/// use durable_execution_sdk::context::{simple_custom_logger, LogInfo};
795/// use std::sync::Arc;
796///
797/// // Create a simple logger that prints all messages with their level
798/// let logger = simple_custom_logger(|level, msg, info| {
799///     println!("[{}] {}: {:?}", level, msg, info);
800/// });
801///
802/// // Use with Arc for sharing across contexts
803/// let shared_logger = Arc::new(logger);
804/// ```
805pub fn simple_custom_logger<F>(log_fn: F) -> impl Logger
806where
807    F: Fn(&str, &str, &LogInfo) + Send + Sync + Clone + 'static,
808{
809    let debug_fn = log_fn.clone();
810    let info_fn = log_fn.clone();
811    let warn_fn = log_fn.clone();
812    let error_fn = log_fn;
813
814    custom_logger(
815        move |msg, info| debug_fn("DEBUG", msg, info),
816        move |msg, info| info_fn("INFO", msg, info),
817        move |msg, info| warn_fn("WARN", msg, info),
818        move |msg, info| error_fn("ERROR", msg, info),
819    )
820}
821
822/// The main context for durable execution operations.
823///
824/// `DurableContext` provides all the durable operations that user code
825/// can use to build reliable workflows. It handles checkpointing, replay,
826/// and state management automatically.
827///
828/// # Thread Safety
829///
830/// `DurableContext` is `Send + Sync` and can be safely shared across
831/// async tasks. The internal state uses appropriate synchronization
832/// primitives for concurrent access.
833///
834/// # Example
835///
836/// ```rust,ignore
837/// async fn my_workflow(ctx: DurableContext) -> Result<String, DurableError> {
838///     // Execute a step (automatically checkpointed)
839///     let result = ctx.step(|_| async move { Ok("hello".to_string()) }, None).await?;
840///     
841///     // Wait for 5 seconds (suspends Lambda, resumes later)
842///     ctx.wait(Duration::from_seconds(5), None).await?;
843///     
844///     Ok(result)
845/// }
846/// ```
847pub struct DurableContext {
848    /// The execution state manager
849    state: Arc<ExecutionState>,
850    /// The Lambda context (if running in Lambda)
851    lambda_context: Option<lambda_runtime::Context>,
852    /// The parent operation ID (None for root context)
853    parent_id: Option<String>,
854    /// Operation ID generator for this context
855    id_generator: Arc<OperationIdGenerator>,
856    /// Logger for structured logging (wrapped in RwLock for runtime reconfiguration)
857    logger: Arc<RwLock<Arc<dyn Logger>>>,
858}
859
860// Ensure DurableContext is Send + Sync
861static_assertions::assert_impl_all!(DurableContext: Send, Sync);
862
863impl DurableContext {
864    /// Creates a new DurableContext with the given state.
865    ///
866    /// # Arguments
867    ///
868    /// * `state` - The execution state manager
869    pub fn new(state: Arc<ExecutionState>) -> Self {
870        let base_id = state.durable_execution_arn().to_string();
871        Self {
872            state,
873            lambda_context: None,
874            parent_id: None,
875            id_generator: Arc::new(OperationIdGenerator::new(base_id)),
876            logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
877        }
878    }
879
880    /// Creates a DurableContext from a Lambda context.
881    ///
882    /// This is the primary factory method used when running in AWS Lambda.
883    ///
884    /// # Arguments
885    ///
886    /// * `state` - The execution state manager
887    /// * `lambda_context` - The Lambda runtime context
888    pub fn from_lambda_context(
889        state: Arc<ExecutionState>,
890        lambda_context: lambda_runtime::Context,
891    ) -> Self {
892        let base_id = state.durable_execution_arn().to_string();
893        Self {
894            state,
895            lambda_context: Some(lambda_context),
896            parent_id: None,
897            id_generator: Arc::new(OperationIdGenerator::new(base_id)),
898            logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
899        }
900    }
901
902    /// Creates a child context for nested operations.
903    ///
904    /// Child contexts have their own step counter but share the same
905    /// execution state. Operations in a child context are tracked
906    /// with the parent's operation ID.
907    ///
908    /// # Arguments
909    ///
910    /// * `parent_operation_id` - The operation ID of the parent operation
911    pub fn create_child_context(&self, parent_operation_id: impl Into<String>) -> Self {
912        let parent_id = parent_operation_id.into();
913        Self {
914            state: self.state.clone(),
915            lambda_context: self.lambda_context.clone(),
916            parent_id: Some(parent_id.clone()),
917            id_generator: Arc::new(OperationIdGenerator::new(parent_id)),
918            logger: self.logger.clone(),
919        }
920    }
921
922    /// Sets a custom logger for this context.
923    ///
924    /// # Arguments
925    ///
926    /// * `logger` - The logger implementation to use
927    pub fn set_logger(&mut self, logger: Arc<dyn Logger>) {
928        *self.logger.write().unwrap() = logger;
929    }
930
931    /// Returns a new context with the specified logger.
932    ///
933    /// # Arguments
934    ///
935    /// * `logger` - The logger implementation to use
936    pub fn with_logger(self, logger: Arc<dyn Logger>) -> Self {
937        *self.logger.write().unwrap() = logger;
938        self
939    }
940
941    /// Reconfigures the logger for this context at runtime.
942    ///
943    /// All subsequent log calls on this context (and any clones sharing the
944    /// same underlying `RwLock`) will use the new logger.
945    ///
946    /// # Arguments
947    ///
948    /// * `logger` - The new logger implementation to use
949    pub fn configure_logger(&self, logger: Arc<dyn Logger>) {
950        *self.logger.write().unwrap() = logger;
951    }
952
953    /// Returns a reference to the execution state.
954    pub fn state(&self) -> &Arc<ExecutionState> {
955        &self.state
956    }
957
958    /// Returns the durable execution ARN.
959    pub fn durable_execution_arn(&self) -> &str {
960        self.state.durable_execution_arn()
961    }
962
963    /// Returns the parent operation ID, if any.
964    pub fn parent_id(&self) -> Option<&str> {
965        self.parent_id.as_deref()
966    }
967
968    /// Returns a reference to the Lambda context, if available.
969    pub fn lambda_context(&self) -> Option<&lambda_runtime::Context> {
970        self.lambda_context.as_ref()
971    }
972
973    /// Returns a reference to the logger.
974    pub fn logger(&self) -> Arc<dyn Logger> {
975        self.logger.read().unwrap().clone()
976    }
977
978    /// Generates the next operation ID for this context.
979    ///
980    /// This method is thread-safe and will generate unique, deterministic
981    /// IDs based on the context's base ID and step counter.
982    pub fn next_operation_id(&self) -> String {
983        self.id_generator.next_id()
984    }
985
986    /// Creates an OperationIdentifier for the next operation.
987    ///
988    /// # Arguments
989    ///
990    /// * `name` - Optional human-readable name for the operation
991    pub fn next_operation_identifier(&self, name: Option<String>) -> OperationIdentifier {
992        OperationIdentifier::new(self.next_operation_id(), self.parent_id.clone(), name)
993    }
994
995    /// Returns the current step counter value without incrementing.
996    pub fn current_step_counter(&self) -> u64 {
997        self.id_generator.current_counter()
998    }
999
1000    /// Creates log info for the current context.
1001    ///
1002    /// The returned `LogInfo` includes the current replay status from the
1003    /// execution state, allowing loggers to distinguish between fresh
1004    /// executions and replayed operations.
1005    pub fn create_log_info(&self) -> LogInfo {
1006        let mut info = LogInfo::new(self.durable_execution_arn());
1007        if let Some(ref parent_id) = self.parent_id {
1008            info = info.with_parent_id(parent_id);
1009        }
1010        // Include replay status from execution state
1011        info = info.with_replay(self.state.is_replay());
1012        info
1013    }
1014
1015    /// Creates log info with an operation ID.
1016    ///
1017    /// The returned `LogInfo` includes the current replay status from the
1018    /// execution state, allowing loggers to distinguish between fresh
1019    /// executions and replayed operations.
1020    pub fn create_log_info_with_operation(&self, operation_id: &str) -> LogInfo {
1021        self.create_log_info().with_operation_id(operation_id)
1022    }
1023
1024    /// Creates log info with explicit replay status.
1025    ///
1026    /// This method allows callers to explicitly set the replay status,
1027    /// which is useful when the operation-specific replay status differs
1028    /// from the global execution state replay status.
1029    ///
1030    /// # Arguments
1031    ///
1032    /// * `operation_id` - The operation ID to include in the log info
1033    /// * `is_replay` - Whether this specific operation is being replayed
1034    pub fn create_log_info_with_replay(&self, operation_id: &str, is_replay: bool) -> LogInfo {
1035        let mut info = LogInfo::new(self.durable_execution_arn());
1036        if let Some(ref parent_id) = self.parent_id {
1037            info = info.with_parent_id(parent_id);
1038        }
1039        info.with_operation_id(operation_id).with_replay(is_replay)
1040    }
1041
1042    // ========================================================================
1043    // Simplified Logging API
1044    // ========================================================================
1045
1046    /// Logs a message at INFO level with automatic context.
1047    ///
1048    /// This method automatically includes the durable_execution_arn and parent_id
1049    /// in the log output without requiring the caller to specify them.
1050    ///
1051    /// # Arguments
1052    ///
1053    /// * `message` - The message to log
1054    ///
1055    /// # Example
1056    ///
1057    /// ```rust,ignore
1058    /// ctx.log_info("Processing order started");
1059    /// ```
1060    pub fn log_info(&self, message: &str) {
1061        self.log_with_level(LogLevel::Info, message, &[]);
1062    }
1063
1064    /// Logs a message at INFO level with extra fields.
1065    ///
1066    /// This method automatically includes the durable_execution_arn and parent_id
1067    /// in the log output, plus any additional fields specified.
1068    ///
1069    /// # Arguments
1070    ///
1071    /// * `message` - The message to log
1072    /// * `fields` - Additional key-value pairs to include in the log
1073    ///
1074    /// # Example
1075    ///
1076    /// ```rust,ignore
1077    /// ctx.log_info_with("Processing order", &[("order_id", "123"), ("amount", "99.99")]);
1078    /// ```
1079    pub fn log_info_with(&self, message: &str, fields: &[(&str, &str)]) {
1080        self.log_with_level(LogLevel::Info, message, fields);
1081    }
1082
1083    /// Logs a message at DEBUG level with automatic context.
1084    ///
1085    /// This method automatically includes the durable_execution_arn and parent_id
1086    /// in the log output without requiring the caller to specify them.
1087    ///
1088    /// # Arguments
1089    ///
1090    /// * `message` - The message to log
1091    ///
1092    /// # Example
1093    ///
1094    /// ```rust,ignore
1095    /// ctx.log_debug("Entering validation step");
1096    /// ```
1097    pub fn log_debug(&self, message: &str) {
1098        self.log_with_level(LogLevel::Debug, message, &[]);
1099    }
1100
1101    /// Logs a message at DEBUG level with extra fields.
1102    ///
1103    /// This method automatically includes the durable_execution_arn and parent_id
1104    /// in the log output, plus any additional fields specified.
1105    ///
1106    /// # Arguments
1107    ///
1108    /// * `message` - The message to log
1109    /// * `fields` - Additional key-value pairs to include in the log
1110    ///
1111    /// # Example
1112    ///
1113    /// ```rust,ignore
1114    /// ctx.log_debug_with("Variable state", &[("x", "42"), ("y", "100")]);
1115    /// ```
1116    pub fn log_debug_with(&self, message: &str, fields: &[(&str, &str)]) {
1117        self.log_with_level(LogLevel::Debug, message, fields);
1118    }
1119
1120    /// Logs a message at WARN level with automatic context.
1121    ///
1122    /// This method automatically includes the durable_execution_arn and parent_id
1123    /// in the log output without requiring the caller to specify them.
1124    ///
1125    /// # Arguments
1126    ///
1127    /// * `message` - The message to log
1128    ///
1129    /// # Example
1130    ///
1131    /// ```rust,ignore
1132    /// ctx.log_warn("Retry attempt 3 of 5");
1133    /// ```
1134    pub fn log_warn(&self, message: &str) {
1135        self.log_with_level(LogLevel::Warn, message, &[]);
1136    }
1137
1138    /// Logs a message at WARN level with extra fields.
1139    ///
1140    /// This method automatically includes the durable_execution_arn and parent_id
1141    /// in the log output, plus any additional fields specified.
1142    ///
1143    /// # Arguments
1144    ///
1145    /// * `message` - The message to log
1146    /// * `fields` - Additional key-value pairs to include in the log
1147    ///
1148    /// # Example
1149    ///
1150    /// ```rust,ignore
1151    /// ctx.log_warn_with("Rate limit approaching", &[("current", "95"), ("limit", "100")]);
1152    /// ```
1153    pub fn log_warn_with(&self, message: &str, fields: &[(&str, &str)]) {
1154        self.log_with_level(LogLevel::Warn, message, fields);
1155    }
1156
1157    /// Logs a message at ERROR level with automatic context.
1158    ///
1159    /// This method automatically includes the durable_execution_arn and parent_id
1160    /// in the log output without requiring the caller to specify them.
1161    ///
1162    /// # Arguments
1163    ///
1164    /// * `message` - The message to log
1165    ///
1166    /// # Example
1167    ///
1168    /// ```rust,ignore
1169    /// ctx.log_error("Failed to process payment");
1170    /// ```
1171    pub fn log_error(&self, message: &str) {
1172        self.log_with_level(LogLevel::Error, message, &[]);
1173    }
1174
1175    /// Logs a message at ERROR level with extra fields.
1176    ///
1177    /// This method automatically includes the durable_execution_arn and parent_id
1178    /// in the log output, plus any additional fields specified.
1179    ///
1180    /// # Arguments
1181    ///
1182    /// * `message` - The message to log
1183    /// * `fields` - Additional key-value pairs to include in the log
1184    ///
1185    /// # Example
1186    ///
1187    /// ```rust,ignore
1188    /// ctx.log_error_with("Payment failed", &[("error_code", "INSUFFICIENT_FUNDS"), ("amount", "150.00")]);
1189    /// ```
1190    pub fn log_error_with(&self, message: &str, fields: &[(&str, &str)]) {
1191        self.log_with_level(LogLevel::Error, message, fields);
1192    }
1193
1194    /// Internal helper method to log at a specific level with optional extra fields.
1195    ///
1196    /// This method creates a LogInfo with automatic context (durable_execution_arn, parent_id)
1197    /// and any additional fields, then delegates to the configured logger.
1198    ///
1199    /// # Arguments
1200    ///
1201    /// * `level` - The log level
1202    /// * `message` - The message to log
1203    /// * `extra` - Additional key-value pairs to include
1204    fn log_with_level(&self, level: LogLevel, message: &str, extra: &[(&str, &str)]) {
1205        let mut log_info = self.create_log_info();
1206
1207        for (key, value) in extra {
1208            log_info = log_info.with_extra(*key, *value);
1209        }
1210
1211        let logger = self.logger.read().unwrap();
1212        match level {
1213            LogLevel::Debug => logger.debug(message, &log_info),
1214            LogLevel::Info => logger.info(message, &log_info),
1215            LogLevel::Warn => logger.warn(message, &log_info),
1216            LogLevel::Error => logger.error(message, &log_info),
1217        }
1218    }
1219
1220    /// Returns the original user input from the EXECUTION operation.
1221    ///
1222    /// This method deserializes the input payload from the EXECUTION operation's
1223    /// ExecutionDetails.InputPayload field into the requested type.
1224    ///
1225    /// # Type Parameters
1226    ///
1227    /// * `T` - The type to deserialize the input into. Must implement `DeserializeOwned`.
1228    ///
1229    /// # Returns
1230    ///
1231    /// `Ok(T)` if the input exists and can be deserialized, or a `DurableError` if:
1232    /// - No EXECUTION operation exists
1233    /// - No input payload is available
1234    /// - Deserialization fails
1235    ///
1236    /// # Example
1237    ///
1238    /// ```rust,ignore
1239    /// #[derive(Deserialize)]
1240    /// struct OrderEvent {
1241    ///     order_id: String,
1242    ///     amount: f64,
1243    /// }
1244    ///
1245    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1246    ///     // Get the original input that started this execution
1247    ///     let event: OrderEvent = ctx.get_original_input()?;
1248    ///     println!("Processing order: {}", event.order_id);
1249    ///     Ok(())
1250    /// }
1251    /// ```
1252    pub fn get_original_input<T>(&self) -> DurableResult<T>
1253    where
1254        T: serde::de::DeserializeOwned,
1255    {
1256        // Get the raw input payload from the execution state
1257        let input_payload = self.state.get_original_input_raw().ok_or_else(|| {
1258            crate::error::DurableError::Validation {
1259                message: "No original input available. The EXECUTION operation may not exist or has no input payload.".to_string(),
1260            }
1261        })?;
1262
1263        // Deserialize the input payload to the requested type
1264        serde_json::from_str(input_payload).map_err(|e| crate::error::DurableError::SerDes {
1265            message: format!("Failed to deserialize original input: {}", e),
1266        })
1267    }
1268
1269    /// Returns the raw original user input as a string, if available.
1270    ///
1271    /// This method returns the raw JSON string from the EXECUTION operation's
1272    /// ExecutionDetails.InputPayload field without deserializing it.
1273    ///
1274    /// # Returns
1275    ///
1276    /// `Some(&str)` if the input exists, `None` otherwise.
1277    pub fn get_original_input_raw(&self) -> Option<&str> {
1278        self.state.get_original_input_raw()
1279    }
1280
1281    /// Completes the execution with a successful result via checkpointing.
1282    ///
1283    /// This method checkpoints a SUCCEED action on the EXECUTION operation,
1284    /// which is useful for large results that exceed the Lambda response size limit (6MB).
1285    /// After calling this method, the Lambda function should return an empty result.
1286    ///
1287    /// # Arguments
1288    ///
1289    /// * `result` - The result to checkpoint. Must implement `Serialize`.
1290    ///
1291    /// # Returns
1292    ///
1293    /// `Ok(())` on success, or a `DurableError` if:
1294    /// - No EXECUTION operation exists
1295    /// - Serialization fails
1296    /// - The checkpoint fails
1297    ///
1298    /// # Example
1299    ///
1300    /// ```rust,ignore
1301    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1302    ///     let large_result = compute_large_result().await?;
1303    ///     
1304    ///     // Check if result would exceed Lambda response limit
1305    ///     if DurableExecutionInvocationOutput::would_exceed_max_size(&large_result) {
1306    ///         // Checkpoint the result instead of returning it
1307    ///         ctx.complete_execution_success(&large_result).await?;
1308    ///         // Return empty result - the actual result is checkpointed
1309    ///         return Ok(());
1310    ///     }
1311    ///     
1312    ///     Ok(())
1313    /// }
1314    /// ```
1315    pub async fn complete_execution_success<T>(&self, result: &T) -> DurableResult<()>
1316    where
1317        T: serde::Serialize,
1318    {
1319        let serialized =
1320            serde_json::to_string(result).map_err(|e| crate::error::DurableError::SerDes {
1321                message: format!("Failed to serialize execution result: {}", e),
1322            })?;
1323
1324        self.state
1325            .complete_execution_success(Some(serialized))
1326            .await
1327    }
1328
1329    /// Completes the execution with a failure via checkpointing.
1330    ///
1331    /// This method checkpoints a FAIL action on the EXECUTION operation.
1332    /// After calling this method, the Lambda function should return a FAILED status.
1333    ///
1334    /// # Arguments
1335    ///
1336    /// * `error` - The error details to checkpoint
1337    ///
1338    /// # Returns
1339    ///
1340    /// `Ok(())` on success, or a `DurableError` if:
1341    /// - No EXECUTION operation exists
1342    /// - The checkpoint fails
1343    ///
1344    /// # Example
1345    ///
1346    /// ```rust,ignore
1347    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1348    ///     if let Err(e) = process_order().await {
1349    ///         // Checkpoint the failure
1350    ///         ctx.complete_execution_failure(ErrorObject::new("ProcessingError", &e.to_string())).await?;
1351    ///         return Err(DurableError::execution(&e.to_string()));
1352    ///     }
1353    ///     Ok(())
1354    /// }
1355    /// ```
1356    pub async fn complete_execution_failure(
1357        &self,
1358        error: crate::error::ErrorObject,
1359    ) -> DurableResult<()> {
1360        self.state.complete_execution_failure(error).await
1361    }
1362
1363    /// Completes the execution with a successful result, automatically handling large results.
1364    ///
1365    /// This method checks if the result would exceed the Lambda response size limit (6MB).
1366    /// If so, it checkpoints the result via the EXECUTION operation and returns `true`.
1367    /// If the result fits within the limit, it returns `false` and the caller should
1368    /// return the result normally.
1369    ///
1370    /// # Arguments
1371    ///
1372    /// * `result` - The result to potentially checkpoint. Must implement `Serialize`.
1373    ///
1374    /// # Returns
1375    ///
1376    /// `Ok(true)` if the result was checkpointed (caller should return empty result),
1377    /// `Ok(false)` if the result fits within limits (caller should return it normally),
1378    /// or a `DurableError` if checkpointing fails.
1379    ///
1380    /// # Example
1381    ///
1382    /// ```rust,ignore
1383    /// async fn my_workflow(ctx: DurableContext) -> Result<LargeResult, DurableError> {
1384    ///     let result = compute_result().await?;
1385    ///     
1386    ///     // Automatically handle large results
1387    ///     if ctx.complete_execution_if_large(&result).await? {
1388    ///         // Result was checkpointed, return a placeholder
1389    ///         // The actual result is stored in the EXECUTION operation
1390    ///         return Ok(LargeResult::default());
1391    ///     }
1392    ///     
1393    ///     // Result fits within limits, return normally
1394    ///     Ok(result)
1395    /// }
1396    /// ```
1397    pub async fn complete_execution_if_large<T>(&self, result: &T) -> DurableResult<bool>
1398    where
1399        T: serde::Serialize,
1400    {
1401        if crate::lambda::DurableExecutionInvocationOutput::would_exceed_max_size(result) {
1402            self.complete_execution_success(result).await?;
1403            Ok(true)
1404        } else {
1405            Ok(false)
1406        }
1407    }
1408
1409    // ========================================================================
1410    // Durable Operations
1411    // ========================================================================
1412
1413    /// Executes a step operation with automatic checkpointing.
1414    ///
1415    /// Steps are the fundamental unit of work in durable executions. Each step
1416    /// is checkpointed, allowing the workflow to resume from the last completed
1417    /// step after interruptions.
1418    ///
1419    /// # Arguments
1420    ///
1421    /// * `func` - The function to execute
1422    /// * `config` - Optional step configuration (retry strategy, semantics, serdes)
1423    ///
1424    /// # Returns
1425    ///
1426    /// The result of the step function, or an error if execution fails.
1427    ///
1428    /// # Example
1429    ///
1430    /// ```rust,ignore
1431    /// let result: i32 = ctx.step(|_step_ctx| async move {
1432    ///     Ok(42)
1433    /// }, None).await?;
1434    /// ```
1435    pub async fn step<T, F, Fut>(
1436        &self,
1437        func: F,
1438        config: Option<crate::config::StepConfig>,
1439    ) -> DurableResult<T>
1440    where
1441        T: DurableValue,
1442        F: FnOnce(StepContext) -> Fut + Send,
1443        Fut: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>
1444            + Send,
1445    {
1446        let op_id = self.next_operation_identifier(None);
1447        let config = config.unwrap_or_default();
1448
1449        let logger = self.logger.read().unwrap().clone();
1450        let result =
1451            crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1452
1453        // Track replay after completion
1454        if result.is_ok() {
1455            self.state.track_replay(&op_id.operation_id).await;
1456        }
1457
1458        result
1459    }
1460
1461    /// Executes a named step operation with automatic checkpointing.
1462    ///
1463    /// Same as `step`, but allows specifying a human-readable name for the operation.
1464    ///
1465    /// # Arguments
1466    ///
1467    /// * `name` - Human-readable name for the step
1468    /// * `func` - The function to execute
1469    /// * `config` - Optional step configuration
1470    ///
1471    /// # Example
1472    ///
1473    /// ```rust,ignore
1474    /// let result: i32 = ctx.step_named("validate_input", |_step_ctx| async move {
1475    ///     Ok(42)
1476    /// }, None).await?;
1477    /// ```
1478    pub async fn step_named<T, F, Fut>(
1479        &self,
1480        name: &str,
1481        func: F,
1482        config: Option<crate::config::StepConfig>,
1483    ) -> DurableResult<T>
1484    where
1485        T: DurableValue,
1486        F: FnOnce(StepContext) -> Fut + Send,
1487        Fut: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>
1488            + Send,
1489    {
1490        let op_id = self.next_operation_identifier(Some(name.to_string()));
1491        let config = config.unwrap_or_default();
1492
1493        let logger = self.logger.read().unwrap().clone();
1494        let result =
1495            crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1496
1497        // Track replay after completion
1498        if result.is_ok() {
1499            self.state.track_replay(&op_id.operation_id).await;
1500        }
1501
1502        result
1503    }
1504
1505    /// Pauses execution for a specified duration.
1506    ///
1507    /// Wait operations suspend the Lambda execution and resume after the
1508    /// specified duration has elapsed. This is efficient because it doesn't
1509    /// block Lambda resources during the wait.
1510    ///
1511    /// # Arguments
1512    ///
1513    /// * `duration` - The duration to wait (must be at least 1 second)
1514    /// * `name` - Optional human-readable name for the operation
1515    ///
1516    /// # Returns
1517    ///
1518    /// Ok(()) when the wait has elapsed, or an error if validation fails.
1519    ///
1520    /// # Example
1521    ///
1522    /// ```rust,ignore
1523    /// // Wait for 5 seconds
1524    /// ctx.wait(Duration::from_seconds(5), None).await?;
1525    ///
1526    /// // Wait with a name
1527    /// ctx.wait(Duration::from_minutes(1), Some("wait_for_processing")).await?;
1528    /// ```
1529    pub async fn wait(
1530        &self,
1531        duration: crate::duration::Duration,
1532        name: Option<&str>,
1533    ) -> DurableResult<()> {
1534        let op_id = self.next_operation_identifier(name.map(|s| s.to_string()));
1535
1536        let logger = self.logger.read().unwrap().clone();
1537        let result = crate::handlers::wait_handler(duration, &self.state, &op_id, &logger).await;
1538
1539        // Track replay after completion (only if not suspended)
1540        if result.is_ok() {
1541            self.state.track_replay(&op_id.operation_id).await;
1542        }
1543
1544        result
1545    }
1546
1547    /// Cancels an active wait operation.
1548    ///
1549    /// This method allows cancelling a wait operation that was previously started.
1550    /// If the wait has already completed (succeeded, failed, or timed out), this
1551    /// method will return Ok(()) without making any changes.
1552    ///
1553    /// # Arguments
1554    ///
1555    /// * `operation_id` - The operation ID of the wait to cancel
1556    ///
1557    /// # Returns
1558    ///
1559    /// Ok(()) if the wait was cancelled or was already completed, or an error if:
1560    /// - The operation doesn't exist
1561    /// - The operation is not a WAIT operation
1562    /// - The checkpoint fails
1563    ///
1564    /// # Example
1565    ///
1566    /// ```rust,ignore
1567    /// // Start a wait in a parallel branch
1568    /// let wait_op_id = ctx.next_operation_id();
1569    ///
1570    /// // Later, cancel the wait from another branch
1571    /// ctx.cancel_wait(&wait_op_id).await?;
1572    /// ```
1573    pub async fn cancel_wait(&self, operation_id: &str) -> DurableResult<()> {
1574        let logger = self.logger.read().unwrap().clone();
1575        crate::handlers::wait_cancel_handler(&self.state, operation_id, &logger).await
1576    }
1577
1578    /// Creates a callback and returns a handle to wait for the result.
1579    ///
1580    /// Callbacks allow external systems to signal completion of asynchronous
1581    /// operations. The callback ID can be shared with external systems, which
1582    /// can then call the Lambda durable execution callback API.
1583    ///
1584    /// # Arguments
1585    ///
1586    /// * `config` - Optional callback configuration (timeout, heartbeat)
1587    ///
1588    /// # Returns
1589    ///
1590    /// A `Callback<T>` handle that can be used to wait for the result.
1591    ///
1592    /// # Example
1593    ///
1594    /// ```rust,ignore
1595    /// let callback = ctx.create_callback::<ApprovalResponse>(None).await?;
1596    ///
1597    /// // Share callback.callback_id with external system
1598    /// notify_approver(&callback.callback_id).await?;
1599    ///
1600    /// // Wait for the callback result
1601    /// let approval = callback.result().await?;
1602    /// ```
1603    pub async fn create_callback<T>(
1604        &self,
1605        config: Option<crate::config::CallbackConfig>,
1606    ) -> DurableResult<crate::handlers::Callback<T>>
1607    where
1608        T: serde::Serialize + serde::de::DeserializeOwned,
1609    {
1610        let op_id = self.next_operation_identifier(None);
1611        let config = config.unwrap_or_default();
1612
1613        let logger = self.logger.read().unwrap().clone();
1614        let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1615
1616        // Track replay after completion
1617        if result.is_ok() {
1618            self.state.track_replay(&op_id.operation_id).await;
1619        }
1620
1621        result
1622    }
1623
1624    /// Creates a named callback and returns a handle to wait for the result.
1625    ///
1626    /// Same as `create_callback`, but allows specifying a human-readable name.
1627    pub async fn create_callback_named<T>(
1628        &self,
1629        name: &str,
1630        config: Option<crate::config::CallbackConfig>,
1631    ) -> DurableResult<crate::handlers::Callback<T>>
1632    where
1633        T: serde::Serialize + serde::de::DeserializeOwned,
1634    {
1635        let op_id = self.next_operation_identifier(Some(name.to_string()));
1636        let config = config.unwrap_or_default();
1637
1638        let logger = self.logger.read().unwrap().clone();
1639        let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1640
1641        // Track replay after completion
1642        if result.is_ok() {
1643            self.state.track_replay(&op_id.operation_id).await;
1644        }
1645
1646        result
1647    }
1648
1649    /// Invokes another durable Lambda function.
1650    ///
1651    /// This method calls another Lambda function and waits for its result.
1652    /// The invocation is checkpointed, so if the workflow is interrupted,
1653    /// it will resume with the result of the invocation.
1654    ///
1655    /// # Arguments
1656    ///
1657    /// * `function_name` - The name or ARN of the Lambda function to invoke
1658    /// * `payload` - The payload to send to the function
1659    /// * `config` - Optional invoke configuration (timeout, serdes)
1660    ///
1661    /// # Returns
1662    ///
1663    /// The result from the invoked function, or an error if invocation fails.
1664    ///
1665    /// # Example
1666    ///
1667    /// ```rust,ignore
1668    /// let result: ProcessingResult = ctx.invoke(
1669    ///     "process-order-function",
1670    ///     OrderPayload { order_id: "123".to_string() },
1671    ///     None,
1672    /// ).await?;
1673    /// ```
1674    pub async fn invoke<P, R>(
1675        &self,
1676        function_name: &str,
1677        payload: P,
1678        config: Option<crate::config::InvokeConfig<P, R>>,
1679    ) -> DurableResult<R>
1680    where
1681        P: serde::Serialize + serde::de::DeserializeOwned + Send,
1682        R: serde::Serialize + serde::de::DeserializeOwned + Send,
1683    {
1684        let op_id = self.next_operation_identifier(Some(format!("invoke:{}", function_name)));
1685        let config = config.unwrap_or_default();
1686
1687        let logger = self.logger.read().unwrap().clone();
1688        let result = crate::handlers::invoke_handler(
1689            function_name,
1690            payload,
1691            &self.state,
1692            &op_id,
1693            &config,
1694            &logger,
1695        )
1696        .await;
1697
1698        // Track replay after completion
1699        if result.is_ok() {
1700            self.state.track_replay(&op_id.operation_id).await;
1701        }
1702
1703        result
1704    }
1705
1706    /// Processes a collection in parallel with configurable concurrency.
1707    ///
1708    /// Map operations execute a function for each item in the collection,
1709    /// with configurable concurrency limits and failure tolerance.
1710    ///
1711    /// # Arguments
1712    ///
1713    /// * `items` - The collection of items to process
1714    /// * `func` - The function to apply to each item
1715    /// * `config` - Optional map configuration (concurrency, completion criteria)
1716    ///
1717    /// # Returns
1718    ///
1719    /// A `BatchResult<U>` containing results for all items.
1720    ///
1721    /// # Example
1722    ///
1723    /// ```rust,ignore
1724    /// let results = ctx.map(
1725    ///     vec![1, 2, 3, 4, 5],
1726    ///     |child_ctx, item, index| async move {
1727    ///         Ok(item * 2)
1728    ///     },
1729    ///     Some(MapConfig {
1730    ///         max_concurrency: Some(3),
1731    ///         ..Default::default()
1732    ///     }),
1733    /// ).await?;
1734    /// ```
1735    pub async fn map<T, U, F, Fut>(
1736        &self,
1737        items: Vec<T>,
1738        func: F,
1739        config: Option<crate::config::MapConfig>,
1740    ) -> DurableResult<crate::concurrency::BatchResult<U>>
1741    where
1742        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + Clone + 'static,
1743        U: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1744        F: Fn(DurableContext, T, usize) -> Fut + Send + Sync + Clone + 'static,
1745        Fut: std::future::Future<Output = DurableResult<U>> + Send + 'static,
1746    {
1747        let op_id = self.next_operation_identifier(Some("map".to_string()));
1748        let config = config.unwrap_or_default();
1749
1750        let logger = self.logger.read().unwrap().clone();
1751        let result =
1752            crate::handlers::map_handler(items, func, &self.state, &op_id, self, &config, &logger)
1753                .await;
1754
1755        // Track replay after completion
1756        if result.is_ok() {
1757            self.state.track_replay(&op_id.operation_id).await;
1758        }
1759
1760        result
1761    }
1762
1763    /// Executes multiple operations in parallel.
1764    ///
1765    /// Parallel operations execute multiple independent functions concurrently,
1766    /// with configurable concurrency limits and completion criteria.
1767    ///
1768    /// # Arguments
1769    ///
1770    /// * `branches` - The list of functions to execute in parallel
1771    /// * `config` - Optional parallel configuration (concurrency, completion criteria)
1772    ///
1773    /// # Returns
1774    ///
1775    /// A `BatchResult<T>` containing results for all branches.
1776    ///
1777    /// # Example
1778    ///
1779    /// ```rust,ignore
1780    /// let results = ctx.parallel(
1781    ///     vec![
1782    ///         |ctx| async move { Ok(fetch_data_a(&ctx).await?) },
1783    ///         |ctx| async move { Ok(fetch_data_b(&ctx).await?) },
1784    ///         |ctx| async move { Ok(fetch_data_c(&ctx).await?) },
1785    ///     ],
1786    ///     None,
1787    /// ).await?;
1788    /// ```
1789    pub async fn parallel<T, F, Fut>(
1790        &self,
1791        branches: Vec<F>,
1792        config: Option<crate::config::ParallelConfig>,
1793    ) -> DurableResult<crate::concurrency::BatchResult<T>>
1794    where
1795        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1796        F: FnOnce(DurableContext) -> Fut + Send + 'static,
1797        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
1798    {
1799        let op_id = self.next_operation_identifier(Some("parallel".to_string()));
1800        let config = config.unwrap_or_default();
1801
1802        let logger = self.logger.read().unwrap().clone();
1803        let result = crate::handlers::parallel_handler(
1804            branches,
1805            &self.state,
1806            &op_id,
1807            self,
1808            &config,
1809            &logger,
1810        )
1811        .await;
1812
1813        // Track replay after completion
1814        if result.is_ok() {
1815            self.state.track_replay(&op_id.operation_id).await;
1816        }
1817
1818        result
1819    }
1820
1821    /// Executes a function in a child context.
1822    ///
1823    /// Child contexts provide isolation for nested workflows. Operations in
1824    /// a child context are tracked separately and can be checkpointed as a unit.
1825    ///
1826    /// # Arguments
1827    ///
1828    /// * `func` - The function to execute in the child context
1829    /// * `config` - Optional child context configuration
1830    ///
1831    /// # Returns
1832    ///
1833    /// The result of the child function, or an error if execution fails.
1834    ///
1835    /// # Example
1836    ///
1837    /// ```rust,ignore
1838    /// let result = ctx.run_in_child_context(|child_ctx| async move {
1839    ///     let step1 = child_ctx.step(|_| Ok(1), None).await?;
1840    ///     let step2 = child_ctx.step(|_| Ok(2), None).await?;
1841    ///     Ok(step1 + step2)
1842    /// }, None).await?;
1843    /// ```
1844    pub async fn run_in_child_context<T, F, Fut>(
1845        &self,
1846        func: F,
1847        config: Option<crate::config::ChildConfig>,
1848    ) -> DurableResult<T>
1849    where
1850        T: serde::Serialize + serde::de::DeserializeOwned + Send,
1851        F: FnOnce(DurableContext) -> Fut + Send,
1852        Fut: std::future::Future<Output = DurableResult<T>> + Send,
1853    {
1854        let op_id = self.next_operation_identifier(Some("child_context".to_string()));
1855        let config = config.unwrap_or_default();
1856
1857        let logger = self.logger.read().unwrap().clone();
1858        let result =
1859            crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
1860
1861        // Track replay after completion
1862        if result.is_ok() {
1863            self.state.track_replay(&op_id.operation_id).await;
1864        }
1865
1866        result
1867    }
1868
1869    /// Executes a named function in a child context.
1870    ///
1871    /// Same as `run_in_child_context`, but allows specifying a human-readable name.
1872    pub async fn run_in_child_context_named<T, F, Fut>(
1873        &self,
1874        name: &str,
1875        func: F,
1876        config: Option<crate::config::ChildConfig>,
1877    ) -> DurableResult<T>
1878    where
1879        T: serde::Serialize + serde::de::DeserializeOwned + Send,
1880        F: FnOnce(DurableContext) -> Fut + Send,
1881        Fut: std::future::Future<Output = DurableResult<T>> + Send,
1882    {
1883        let op_id = self.next_operation_identifier(Some(name.to_string()));
1884        let config = config.unwrap_or_default();
1885
1886        let logger = self.logger.read().unwrap().clone();
1887        let result =
1888            crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
1889
1890        // Track replay after completion
1891        if result.is_ok() {
1892            self.state.track_replay(&op_id.operation_id).await;
1893        }
1894
1895        result
1896    }
1897
1898    /// Polls until a condition is met.
1899    ///
1900    /// This method repeatedly checks a condition until it returns a successful
1901    /// result. Between checks, it waits for a configurable duration using the
1902    /// RETRY mechanism with NextAttemptDelaySeconds.
1903    ///
1904    /// # Implementation
1905    ///
1906    /// This method is implemented as a single STEP operation with RETRY mechanism,
1907    /// which is more efficient than using multiple steps and waits. The state is
1908    /// passed as Payload on retry (not Error), and the attempt number is tracked
1909    /// in StepDetails.Attempt.
1910    ///
1911    /// # Arguments
1912    ///
1913    /// * `check` - The function to check the condition
1914    /// * `config` - Configuration for the wait (interval, max attempts, timeout)
1915    ///
1916    /// # Returns
1917    ///
1918    /// The result when the condition is met, or an error if timeout/max attempts exceeded.
1919    ///
1920    /// # Example
1921    ///
1922    /// ```rust,ignore
1923    /// let result = ctx.wait_for_condition(
1924    ///     |state, ctx| {
1925    ///         let order_id = state.order_id.clone();
1926    ///         async move {
1927    ///             // Check if order is ready
1928    ///             let status = check_order_status(&order_id).await?;
1929    ///             if status == "ready" {
1930    ///                 Ok(OrderReady { order_id })
1931    ///             } else {
1932    ///                 Err("Order not ready yet".into())
1933    ///             }
1934    ///         }
1935    ///     },
1936    ///     WaitForConditionConfig::from_interval(
1937    ///         OrderState { order_id: "123".to_string() },
1938    ///         Duration::from_seconds(5),
1939    ///         Some(10),
1940    ///     ),
1941    /// ).await?;
1942    /// ```
1943    pub async fn wait_for_condition<T, S, F, Fut>(
1944        &self,
1945        check: F,
1946        config: WaitForConditionConfig<S>,
1947    ) -> DurableResult<T>
1948    where
1949        T: serde::Serialize + serde::de::DeserializeOwned + Send,
1950        S: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
1951        F: Fn(&S, &WaitForConditionContext) -> Fut + Send + Sync,
1952        Fut: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>
1953            + Send,
1954    {
1955        let op_id = self.next_operation_identifier(Some("wait_for_condition".to_string()));
1956
1957        let logger = self.logger.read().unwrap().clone();
1958        // Use the new wait_for_condition_handler which implements the single STEP with RETRY pattern
1959        // This is more efficient than the previous child context + multiple steps approach
1960        let result = crate::handlers::wait_for_condition_handler(
1961            check,
1962            config,
1963            &self.state,
1964            &op_id,
1965            &logger,
1966        )
1967        .await;
1968
1969        // Track replay after completion (only if not suspended)
1970        if result.is_ok() {
1971            self.state.track_replay(&op_id.operation_id).await;
1972        }
1973
1974        result
1975    }
1976
1977    /// Creates a callback and waits for the result with a submitter function.
1978    ///
1979    /// This is a convenience method that combines callback creation with
1980    /// a submitter function that sends the callback ID to an external system.
1981    /// The submitter execution is checkpointed within a child context to ensure
1982    /// replay safety - the submitter will not be re-executed during replay.
1983    ///
1984    /// # Arguments
1985    ///
1986    /// * `submitter` - Function that receives the callback ID and submits it to external system
1987    /// * `config` - Optional callback configuration (timeout, heartbeat)
1988    ///
1989    /// # Returns
1990    ///
1991    /// The callback result from the external system.
1992    ///
1993    /// # Example
1994    ///
1995    /// ```rust,ignore
1996    /// let approval = ctx.wait_for_callback(
1997    ///     |callback_id| async move {
1998    ///         // Send callback ID to approval system
1999    ///         send_approval_request(&callback_id, &request).await
2000    ///     },
2001    ///     Some(CallbackConfig {
2002    ///         timeout: Duration::from_hours(24),
2003    ///         ..Default::default()
2004    ///     }),
2005    /// ).await?;
2006    /// ```
2007    pub async fn wait_for_callback<T, F, Fut>(
2008        &self,
2009        submitter: F,
2010        config: Option<crate::config::CallbackConfig>,
2011    ) -> DurableResult<T>
2012    where
2013        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync,
2014        F: FnOnce(String) -> Fut + Send + 'static,
2015        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
2016            + Send
2017            + 'static,
2018    {
2019        // Create the callback first with the provided configuration (Requirement 1.4)
2020        let callback: crate::handlers::Callback<T> = self.create_callback(config).await?;
2021        let callback_id = callback.callback_id.clone();
2022
2023        // Generate operation ID for the child context that will execute the submitter
2024        let op_id = self.next_operation_identifier(Some("wait_for_callback_submitter".to_string()));
2025
2026        // Execute the submitter within a child context for proper checkpointing (Requirements 1.1, 1.2)
2027        // The child context ensures the submitter execution is tracked and not re-executed during replay
2028        let child_config = crate::config::ChildConfig::default();
2029
2030        let logger = self.logger.read().unwrap().clone();
2031        crate::handlers::child_handler(
2032            |child_ctx| {
2033                let callback_id = callback_id.clone();
2034                async move {
2035                    // Use a step to checkpoint that we're about to execute the submitter
2036                    // This step returns () on success, indicating submission completed
2037                    child_ctx
2038                        .step_named(
2039                            "execute_submitter",
2040                            move |_| async move {
2041                                // The step just marks that we're executing the submitter
2042                                // The actual async submitter call happens after this checkpoint
2043                                Ok(())
2044                            },
2045                            None,
2046                        )
2047                        .await?;
2048
2049                    // Now execute the actual submitter
2050                    // If we're replaying and the step above succeeded, we won't reach here
2051                    // because the child context will return the checkpointed result
2052                    submitter(callback_id).await.map_err(|e| {
2053                        crate::error::DurableError::UserCode {
2054                            message: e.to_string(),
2055                            error_type: "SubmitterError".to_string(),
2056                            stack_trace: None,
2057                        }
2058                    })?;
2059
2060                    Ok(())
2061                }
2062            },
2063            &self.state,
2064            &op_id,
2065            self,
2066            &child_config,
2067            &logger,
2068        )
2069        .await?;
2070
2071        // Track replay after child context completion
2072        self.state.track_replay(&op_id.operation_id).await;
2073
2074        // Wait for the callback result
2075        callback.result().await
2076    }
2077
2078    // ========================================================================
2079    // Promise Combinators
2080    // ========================================================================
2081
2082    /// Waits for all futures to complete successfully.
2083    ///
2084    /// Returns all results if all futures succeed, or returns the first error encountered.
2085    /// This is implemented within a STEP operation for durability.
2086    ///
2087    /// # Arguments
2088    ///
2089    /// * `futures` - Vector of futures to execute
2090    ///
2091    /// # Returns
2092    ///
2093    /// `Ok(Vec<T>)` if all futures succeed, or `Err` with the first error.
2094    ///
2095    /// # Example
2096    ///
2097    /// ```rust,ignore
2098    /// let results = ctx.all(vec![
2099    ///     ctx.step(|_| Ok(1), None),
2100    ///     ctx.step(|_| Ok(2), None),
2101    ///     ctx.step(|_| Ok(3), None),
2102    /// ]).await?;
2103    /// assert_eq!(results, vec![1, 2, 3]);
2104    /// ```
2105    pub async fn all<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<Vec<T>>
2106    where
2107        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2108        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2109    {
2110        let op_id = self.next_operation_identifier(Some("all".to_string()));
2111
2112        let logger = self.logger.read().unwrap().clone();
2113        let result = crate::handlers::all_handler(futures, &self.state, &op_id, &logger).await;
2114
2115        // Track replay after completion
2116        if result.is_ok() {
2117            self.state.track_replay(&op_id.operation_id).await;
2118        }
2119
2120        result
2121    }
2122
2123    /// Waits for all futures to settle (success or failure).
2124    ///
2125    /// Returns a BatchResult containing outcomes for all futures, regardless of success or failure.
2126    /// This is implemented within a STEP operation for durability.
2127    ///
2128    /// # Arguments
2129    ///
2130    /// * `futures` - Vector of futures to execute
2131    ///
2132    /// # Returns
2133    ///
2134    /// `BatchResult<T>` containing results for all futures.
2135    ///
2136    /// # Example
2137    ///
2138    /// ```rust,ignore
2139    /// let results = ctx.all_settled(vec![
2140    ///     ctx.step(|_| Ok(1), None),
2141    ///     ctx.step(|_| Err("error".into()), None),
2142    ///     ctx.step(|_| Ok(3), None),
2143    /// ]).await?;
2144    /// assert_eq!(results.success_count(), 2);
2145    /// assert_eq!(results.failure_count(), 1);
2146    /// ```
2147    pub async fn all_settled<T, Fut>(
2148        &self,
2149        futures: Vec<Fut>,
2150    ) -> DurableResult<crate::concurrency::BatchResult<T>>
2151    where
2152        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2153        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2154    {
2155        let op_id = self.next_operation_identifier(Some("all_settled".to_string()));
2156
2157        let logger = self.logger.read().unwrap().clone();
2158        let result =
2159            crate::handlers::all_settled_handler(futures, &self.state, &op_id, &logger).await;
2160
2161        // Track replay after completion
2162        if result.is_ok() {
2163            self.state.track_replay(&op_id.operation_id).await;
2164        }
2165
2166        result
2167    }
2168
2169    /// Returns the result of the first future to settle.
2170    ///
2171    /// Returns the result (success or failure) of whichever future completes first.
2172    /// This is implemented within a STEP operation for durability.
2173    ///
2174    /// # Arguments
2175    ///
2176    /// * `futures` - Vector of futures to execute
2177    ///
2178    /// # Returns
2179    ///
2180    /// The result of the first future to settle.
2181    ///
2182    /// # Example
2183    ///
2184    /// ```rust,ignore
2185    /// let result = ctx.race(vec![
2186    ///     ctx.step(|_| Ok(1), None),
2187    ///     ctx.step(|_| Ok(2), None),
2188    /// ]).await?;
2189    /// // result is either 1 or 2, whichever completed first
2190    /// ```
2191    pub async fn race<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2192    where
2193        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2194        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2195    {
2196        let op_id = self.next_operation_identifier(Some("race".to_string()));
2197
2198        let logger = self.logger.read().unwrap().clone();
2199        let result = crate::handlers::race_handler(futures, &self.state, &op_id, &logger).await;
2200
2201        // Track replay after completion
2202        if result.is_ok() {
2203            self.state.track_replay(&op_id.operation_id).await;
2204        }
2205
2206        result
2207    }
2208
2209    /// Returns the result of the first future to succeed.
2210    ///
2211    /// Returns the result of the first future to succeed. If all futures fail,
2212    /// returns an error containing all the failures.
2213    /// This is implemented within a STEP operation for durability.
2214    ///
2215    /// # Arguments
2216    ///
2217    /// * `futures` - Vector of futures to execute
2218    ///
2219    /// # Returns
2220    ///
2221    /// The result of the first future to succeed, or an error if all fail.
2222    ///
2223    /// # Example
2224    ///
2225    /// ```rust,ignore
2226    /// let result = ctx.any(vec![
2227    ///     ctx.step(|_| Err("error".into()), None),
2228    ///     ctx.step(|_| Ok(2), None),
2229    ///     ctx.step(|_| Ok(3), None),
2230    /// ]).await?;
2231    /// // result is either 2 or 3, whichever succeeded first
2232    /// ```
2233    pub async fn any<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2234    where
2235        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2236        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2237    {
2238        let op_id = self.next_operation_identifier(Some("any".to_string()));
2239
2240        let logger = self.logger.read().unwrap().clone();
2241        let result = crate::handlers::any_handler(futures, &self.state, &op_id, &logger).await;
2242
2243        // Track replay after completion
2244        if result.is_ok() {
2245            self.state.track_replay(&op_id.operation_id).await;
2246        }
2247
2248        result
2249    }
2250}
2251
2252/// Configuration for wait_for_condition operations.
2253#[allow(clippy::type_complexity)]
2254pub struct WaitForConditionConfig<S> {
2255    /// Initial state to pass to the check function.
2256    pub initial_state: S,
2257    /// Wait strategy function that determines polling behavior.
2258    ///
2259    /// Takes a reference to the current state and the attempt number (1-indexed),
2260    /// and returns a [`WaitDecision`](crate::config::WaitDecision).
2261    pub wait_strategy: Box<dyn Fn(&S, usize) -> crate::config::WaitDecision + Send + Sync>,
2262    /// Overall timeout for the operation.
2263    pub timeout: Option<crate::duration::Duration>,
2264    /// Optional custom serializer/deserializer.
2265    pub serdes: Option<std::sync::Arc<dyn crate::config::SerDesAny>>,
2266}
2267
2268impl<S> std::fmt::Debug for WaitForConditionConfig<S>
2269where
2270    S: std::fmt::Debug,
2271{
2272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2273        f.debug_struct("WaitForConditionConfig")
2274            .field("initial_state", &self.initial_state)
2275            .field("wait_strategy", &"<fn>")
2276            .field("timeout", &self.timeout)
2277            .field("serdes", &self.serdes.is_some())
2278            .finish()
2279    }
2280}
2281
2282impl<S> WaitForConditionConfig<S> {
2283    /// Creates a backward-compatible `WaitForConditionConfig` from interval and max_attempts.
2284    ///
2285    /// This constructor converts the old-style `interval` + `max_attempts` parameters
2286    /// into a wait strategy that uses a fixed delay (no backoff, no jitter).
2287    ///
2288    /// # Arguments
2289    ///
2290    /// * `initial_state` - Initial state to pass to the check function.
2291    /// * `interval` - Fixed interval between condition checks.
2292    /// * `max_attempts` - Maximum number of attempts (`None` for unlimited).
2293    ///
2294    /// # Example
2295    ///
2296    /// ```rust,ignore
2297    /// use durable_execution_sdk::{WaitForConditionConfig, Duration};
2298    ///
2299    /// let config = WaitForConditionConfig::from_interval(
2300    ///     my_initial_state,
2301    ///     Duration::from_seconds(5),
2302    ///     Some(10),
2303    /// );
2304    /// ```
2305    pub fn from_interval(
2306        initial_state: S,
2307        interval: crate::duration::Duration,
2308        max_attempts: Option<usize>,
2309    ) -> Self
2310    where
2311        S: Send + Sync + 'static,
2312    {
2313        let interval_secs = interval.to_seconds();
2314        let max = max_attempts.unwrap_or(usize::MAX);
2315
2316        Self {
2317            initial_state,
2318            wait_strategy: Box::new(move |_state: &S, attempts_made: usize| {
2319                // In the backward-compatible mode, the check function's Ok/Err
2320                // determines whether polling is done. The strategy only provides
2321                // the delay and enforces max_attempts.
2322                // Return Done when max_attempts exceeded (handler will checkpoint failure
2323                // if check returned Err, or success if check returned Ok).
2324                if attempts_made >= max {
2325                    return crate::config::WaitDecision::Done;
2326                }
2327                crate::config::WaitDecision::Continue {
2328                    delay: crate::duration::Duration::from_seconds(interval_secs),
2329                }
2330            }),
2331            timeout: None,
2332            serdes: None,
2333        }
2334    }
2335}
2336
2337impl<S: Default + Send + Sync + 'static> Default for WaitForConditionConfig<S> {
2338    fn default() -> Self {
2339        Self::from_interval(
2340            S::default(),
2341            crate::duration::Duration::from_seconds(5),
2342            None,
2343        )
2344    }
2345}
2346
2347/// Context provided to wait_for_condition check functions.
2348#[derive(Debug, Clone)]
2349pub struct WaitForConditionContext {
2350    /// Current attempt number (1-indexed).
2351    pub attempt: usize,
2352    /// Maximum number of attempts (None for unlimited).
2353    pub max_attempts: Option<usize>,
2354}
2355
2356impl Clone for DurableContext {
2357    fn clone(&self) -> Self {
2358        Self {
2359            state: self.state.clone(),
2360            lambda_context: self.lambda_context.clone(),
2361            parent_id: self.parent_id.clone(),
2362            id_generator: self.id_generator.clone(),
2363            logger: self.logger.clone(),
2364        }
2365    }
2366}
2367
2368impl std::fmt::Debug for DurableContext {
2369    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2370        f.debug_struct("DurableContext")
2371            .field("durable_execution_arn", &self.durable_execution_arn())
2372            .field("parent_id", &self.parent_id)
2373            .field("step_counter", &self.current_step_counter())
2374            .finish_non_exhaustive()
2375    }
2376}
2377
2378// Add hex encoding dependency
2379mod hex {
2380    const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
2381
2382    pub fn encode(bytes: &[u8]) -> String {
2383        let mut result = String::with_capacity(bytes.len() * 2);
2384        for &byte in bytes {
2385            result.push(HEX_CHARS[(byte >> 4) as usize] as char);
2386            result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
2387        }
2388        result
2389    }
2390}
2391
2392#[cfg(test)]
2393mod tests {
2394    use super::*;
2395
2396    #[test]
2397    fn test_operation_identifier_new() {
2398        let id = OperationIdentifier::new(
2399            "op-123",
2400            Some("parent-456".to_string()),
2401            Some("my-step".to_string()),
2402        );
2403        assert_eq!(id.operation_id, "op-123");
2404        assert_eq!(id.parent_id, Some("parent-456".to_string()));
2405        assert_eq!(id.name, Some("my-step".to_string()));
2406    }
2407
2408    #[test]
2409    fn test_operation_identifier_root() {
2410        let id = OperationIdentifier::root("op-123");
2411        assert_eq!(id.operation_id, "op-123");
2412        assert!(id.parent_id.is_none());
2413        assert!(id.name.is_none());
2414    }
2415
2416    #[test]
2417    fn test_operation_identifier_with_parent() {
2418        let id = OperationIdentifier::with_parent("op-123", "parent-456");
2419        assert_eq!(id.operation_id, "op-123");
2420        assert_eq!(id.parent_id, Some("parent-456".to_string()));
2421        assert!(id.name.is_none());
2422    }
2423
2424    #[test]
2425    fn test_operation_identifier_with_name() {
2426        let id = OperationIdentifier::root("op-123").with_name("my-step");
2427        assert_eq!(id.name, Some("my-step".to_string()));
2428    }
2429
2430    #[test]
2431    fn test_operation_identifier_display() {
2432        let id = OperationIdentifier::root("op-123");
2433        assert_eq!(format!("{}", id), "op-123");
2434
2435        let id_with_name = OperationIdentifier::root("op-123").with_name("my-step");
2436        assert_eq!(format!("{}", id_with_name), "my-step(op-123)");
2437    }
2438
2439    #[test]
2440    fn test_generate_operation_id_deterministic() {
2441        let id1 = generate_operation_id("base-123", 0);
2442        let id2 = generate_operation_id("base-123", 0);
2443        assert_eq!(id1, id2);
2444    }
2445
2446    #[test]
2447    fn test_generate_operation_id_different_counters() {
2448        let id1 = generate_operation_id("base-123", 0);
2449        let id2 = generate_operation_id("base-123", 1);
2450        assert_ne!(id1, id2);
2451    }
2452
2453    #[test]
2454    fn test_generate_operation_id_different_bases() {
2455        let id1 = generate_operation_id("base-123", 0);
2456        let id2 = generate_operation_id("base-456", 0);
2457        assert_ne!(id1, id2);
2458    }
2459
2460    #[test]
2461    fn test_generate_operation_id_length() {
2462        let id = generate_operation_id("base-123", 0);
2463        assert_eq!(id.len(), 32); // 16 bytes = 32 hex chars
2464    }
2465
2466    #[test]
2467    fn test_operation_id_generator_new() {
2468        let gen = OperationIdGenerator::new("base-123");
2469        assert_eq!(gen.base_id(), "base-123");
2470        assert_eq!(gen.current_counter(), 0);
2471    }
2472
2473    #[test]
2474    fn test_operation_id_generator_with_counter() {
2475        let gen = OperationIdGenerator::with_counter("base-123", 10);
2476        assert_eq!(gen.current_counter(), 10);
2477    }
2478
2479    #[test]
2480    fn test_operation_id_generator_next_id() {
2481        let gen = OperationIdGenerator::new("base-123");
2482
2483        let id1 = gen.next_id();
2484        assert_eq!(gen.current_counter(), 1);
2485
2486        let id2 = gen.next_id();
2487        assert_eq!(gen.current_counter(), 2);
2488
2489        assert_ne!(id1, id2);
2490    }
2491
2492    #[test]
2493    fn test_operation_id_generator_id_for_counter() {
2494        let gen = OperationIdGenerator::new("base-123");
2495
2496        let id_0 = gen.id_for_counter(0);
2497        let id_1 = gen.id_for_counter(1);
2498
2499        // id_for_counter should not increment the counter
2500        assert_eq!(gen.current_counter(), 0);
2501
2502        // Should match what next_id would produce
2503        let next = gen.next_id();
2504        assert_eq!(next, id_0);
2505
2506        let next = gen.next_id();
2507        assert_eq!(next, id_1);
2508    }
2509
2510    #[test]
2511    fn test_operation_id_generator_create_child() {
2512        let gen = OperationIdGenerator::new("base-123");
2513        gen.next_id(); // Increment parent counter
2514
2515        let child = gen.create_child("child-op-id");
2516        assert_eq!(child.base_id(), "child-op-id");
2517        assert_eq!(child.current_counter(), 0);
2518    }
2519
2520    #[test]
2521    fn test_operation_id_generator_clone() {
2522        let gen = OperationIdGenerator::new("base-123");
2523        gen.next_id();
2524        gen.next_id();
2525
2526        let cloned = gen.clone();
2527        assert_eq!(cloned.base_id(), gen.base_id());
2528        assert_eq!(cloned.current_counter(), gen.current_counter());
2529    }
2530
2531    #[test]
2532    fn test_operation_id_generator_thread_safety() {
2533        use std::thread;
2534
2535        let gen = Arc::new(OperationIdGenerator::new("base-123"));
2536        let mut handles = vec![];
2537
2538        for _ in 0..10 {
2539            let gen_clone = gen.clone();
2540            handles.push(thread::spawn(move || {
2541                let mut ids = vec![];
2542                for _ in 0..100 {
2543                    ids.push(gen_clone.next_id());
2544                }
2545                ids
2546            }));
2547        }
2548
2549        let mut all_ids = vec![];
2550        for handle in handles {
2551            all_ids.extend(handle.join().unwrap());
2552        }
2553
2554        // All IDs should be unique
2555        let unique_count = {
2556            let mut set = std::collections::HashSet::new();
2557            for id in &all_ids {
2558                set.insert(id.clone());
2559            }
2560            set.len()
2561        };
2562
2563        assert_eq!(unique_count, 1000);
2564        assert_eq!(gen.current_counter(), 1000);
2565    }
2566
2567    #[test]
2568    fn test_log_info_new() {
2569        let info = LogInfo::new("arn:test");
2570        assert_eq!(info.durable_execution_arn, Some("arn:test".to_string()));
2571        assert!(info.operation_id.is_none());
2572        assert!(info.parent_id.is_none());
2573    }
2574
2575    #[test]
2576    fn test_log_info_with_operation_id() {
2577        let info = LogInfo::new("arn:test").with_operation_id("op-123");
2578        assert_eq!(info.operation_id, Some("op-123".to_string()));
2579    }
2580
2581    #[test]
2582    fn test_log_info_with_parent_id() {
2583        let info = LogInfo::new("arn:test").with_parent_id("parent-456");
2584        assert_eq!(info.parent_id, Some("parent-456".to_string()));
2585    }
2586
2587    #[test]
2588    fn test_log_info_with_extra() {
2589        let info = LogInfo::new("arn:test")
2590            .with_extra("key1", "value1")
2591            .with_extra("key2", "value2");
2592        assert_eq!(info.extra.len(), 2);
2593        assert_eq!(info.extra[0], ("key1".to_string(), "value1".to_string()));
2594    }
2595
2596    #[test]
2597    fn test_hex_encode() {
2598        assert_eq!(hex::encode(&[0x00]), "00");
2599        assert_eq!(hex::encode(&[0xff]), "ff");
2600        assert_eq!(hex::encode(&[0x12, 0x34, 0xab, 0xcd]), "1234abcd");
2601    }
2602}
2603
2604#[cfg(test)]
2605mod durable_context_tests {
2606    use super::*;
2607    use crate::client::SharedDurableServiceClient;
2608    use crate::lambda::InitialExecutionState;
2609    use std::sync::Arc;
2610
2611    #[cfg(test)]
2612    fn create_mock_client() -> SharedDurableServiceClient {
2613        use crate::client::MockDurableServiceClient;
2614        Arc::new(MockDurableServiceClient::new())
2615    }
2616
2617    fn create_test_state() -> Arc<ExecutionState> {
2618        let client = create_mock_client();
2619        Arc::new(ExecutionState::new(
2620            "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
2621            "token-123",
2622            InitialExecutionState::new(),
2623            client,
2624        ))
2625    }
2626
2627    #[test]
2628    fn test_durable_context_new() {
2629        let state = create_test_state();
2630        let ctx = DurableContext::new(state.clone());
2631
2632        assert_eq!(
2633            ctx.durable_execution_arn(),
2634            "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123"
2635        );
2636        assert!(ctx.parent_id().is_none());
2637        assert!(ctx.lambda_context().is_none());
2638        assert_eq!(ctx.current_step_counter(), 0);
2639    }
2640
2641    #[test]
2642    fn test_durable_context_next_operation_id() {
2643        let state = create_test_state();
2644        let ctx = DurableContext::new(state);
2645
2646        let id1 = ctx.next_operation_id();
2647        let id2 = ctx.next_operation_id();
2648
2649        assert_ne!(id1, id2);
2650        assert_eq!(ctx.current_step_counter(), 2);
2651    }
2652
2653    #[test]
2654    fn test_durable_context_next_operation_identifier() {
2655        let state = create_test_state();
2656        let ctx = DurableContext::new(state);
2657
2658        let op_id = ctx.next_operation_identifier(Some("my-step".to_string()));
2659
2660        assert!(op_id.parent_id.is_none());
2661        assert_eq!(op_id.name, Some("my-step".to_string()));
2662        assert!(!op_id.operation_id.is_empty());
2663    }
2664
2665    #[test]
2666    fn test_durable_context_create_child_context() {
2667        let state = create_test_state();
2668        let ctx = DurableContext::new(state);
2669
2670        // Generate a parent operation ID
2671        let parent_op_id = ctx.next_operation_id();
2672
2673        // Create child context
2674        let child_ctx = ctx.create_child_context(&parent_op_id);
2675
2676        assert_eq!(child_ctx.parent_id(), Some(parent_op_id.as_str()));
2677        assert_eq!(child_ctx.current_step_counter(), 0);
2678        assert_eq!(
2679            child_ctx.durable_execution_arn(),
2680            ctx.durable_execution_arn()
2681        );
2682    }
2683
2684    #[test]
2685    fn test_durable_context_child_generates_different_ids() {
2686        let state = create_test_state();
2687        let ctx = DurableContext::new(state);
2688
2689        let parent_op_id = ctx.next_operation_id();
2690        let child_ctx = ctx.create_child_context(&parent_op_id);
2691
2692        // Child should generate different IDs than parent
2693        let child_id = child_ctx.next_operation_id();
2694        let parent_id = ctx.next_operation_id();
2695
2696        assert_ne!(child_id, parent_id);
2697    }
2698
2699    #[test]
2700    fn test_durable_context_child_operation_identifier_has_parent() {
2701        let state = create_test_state();
2702        let ctx = DurableContext::new(state);
2703
2704        let parent_op_id = ctx.next_operation_id();
2705        let child_ctx = ctx.create_child_context(&parent_op_id);
2706
2707        let child_op_id = child_ctx.next_operation_identifier(None);
2708
2709        assert_eq!(child_op_id.parent_id, Some(parent_op_id));
2710    }
2711
2712    #[test]
2713    fn test_durable_context_set_logger() {
2714        let state = create_test_state();
2715        let mut ctx = DurableContext::new(state);
2716
2717        // Create a custom logger
2718        let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2719        ctx.set_logger(custom_logger);
2720
2721        // Just verify it doesn't panic - the logger is set
2722        let _ = ctx.logger();
2723    }
2724
2725    #[test]
2726    fn test_durable_context_with_logger() {
2727        let state = create_test_state();
2728        let ctx = DurableContext::new(state);
2729
2730        let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2731        let ctx_with_logger = ctx.with_logger(custom_logger);
2732
2733        // Just verify it doesn't panic - the logger is set
2734        let _ = ctx_with_logger.logger();
2735    }
2736
2737    #[test]
2738    fn test_durable_context_create_log_info() {
2739        let state = create_test_state();
2740        let ctx = DurableContext::new(state);
2741
2742        let info = ctx.create_log_info();
2743
2744        assert_eq!(
2745            info.durable_execution_arn,
2746            Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
2747        );
2748        assert!(info.parent_id.is_none());
2749    }
2750
2751    #[test]
2752    fn test_durable_context_create_log_info_with_parent() {
2753        let state = create_test_state();
2754        let ctx = DurableContext::new(state);
2755
2756        let parent_op_id = ctx.next_operation_id();
2757        let child_ctx = ctx.create_child_context(&parent_op_id);
2758
2759        let info = child_ctx.create_log_info();
2760
2761        assert_eq!(info.parent_id, Some(parent_op_id));
2762    }
2763
2764    #[test]
2765    fn test_durable_context_create_log_info_with_operation() {
2766        let state = create_test_state();
2767        let ctx = DurableContext::new(state);
2768
2769        let info = ctx.create_log_info_with_operation("op-123");
2770
2771        assert_eq!(info.operation_id, Some("op-123".to_string()));
2772    }
2773
2774    #[test]
2775    fn test_log_info_with_replay() {
2776        let info = LogInfo::new("arn:test")
2777            .with_operation_id("op-123")
2778            .with_replay(true);
2779
2780        assert!(info.is_replay);
2781        assert_eq!(info.operation_id, Some("op-123".to_string()));
2782    }
2783
2784    #[test]
2785    fn test_log_info_default_not_replay() {
2786        let info = LogInfo::default();
2787        assert!(!info.is_replay);
2788    }
2789
2790    #[test]
2791    fn test_replay_logging_config_default() {
2792        let config = ReplayLoggingConfig::default();
2793        assert_eq!(config, ReplayLoggingConfig::SuppressAll);
2794    }
2795
2796    #[test]
2797    fn test_replay_aware_logger_suppress_all() {
2798        use std::sync::atomic::{AtomicUsize, Ordering};
2799
2800        // Create counters for each log level
2801        let debug_count = Arc::new(AtomicUsize::new(0));
2802        let info_count = Arc::new(AtomicUsize::new(0));
2803        let warn_count = Arc::new(AtomicUsize::new(0));
2804        let error_count = Arc::new(AtomicUsize::new(0));
2805
2806        let inner = Arc::new(custom_logger(
2807            {
2808                let count = debug_count.clone();
2809                move |_, _| {
2810                    count.fetch_add(1, Ordering::SeqCst);
2811                }
2812            },
2813            {
2814                let count = info_count.clone();
2815                move |_, _| {
2816                    count.fetch_add(1, Ordering::SeqCst);
2817                }
2818            },
2819            {
2820                let count = warn_count.clone();
2821                move |_, _| {
2822                    count.fetch_add(1, Ordering::SeqCst);
2823                }
2824            },
2825            {
2826                let count = error_count.clone();
2827                move |_, _| {
2828                    count.fetch_add(1, Ordering::SeqCst);
2829                }
2830            },
2831        ));
2832
2833        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
2834
2835        // Non-replay logs should pass through
2836        let non_replay_info = LogInfo::new("arn:test").with_replay(false);
2837        logger.debug("test", &non_replay_info);
2838        logger.info("test", &non_replay_info);
2839        logger.warn("test", &non_replay_info);
2840        logger.error("test", &non_replay_info);
2841
2842        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
2843        assert_eq!(info_count.load(Ordering::SeqCst), 1);
2844        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2845        assert_eq!(error_count.load(Ordering::SeqCst), 1);
2846
2847        // Replay logs should be suppressed
2848        let replay_info = LogInfo::new("arn:test").with_replay(true);
2849        logger.debug("test", &replay_info);
2850        logger.info("test", &replay_info);
2851        logger.warn("test", &replay_info);
2852        logger.error("test", &replay_info);
2853
2854        // Counts should not have increased
2855        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
2856        assert_eq!(info_count.load(Ordering::SeqCst), 1);
2857        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2858        assert_eq!(error_count.load(Ordering::SeqCst), 1);
2859    }
2860
2861    #[test]
2862    fn test_replay_aware_logger_allow_all() {
2863        use std::sync::atomic::{AtomicUsize, Ordering};
2864
2865        let call_count = Arc::new(AtomicUsize::new(0));
2866
2867        let inner = Arc::new(custom_logger(
2868            {
2869                let count = call_count.clone();
2870                move |_, _| {
2871                    count.fetch_add(1, Ordering::SeqCst);
2872                }
2873            },
2874            {
2875                let count = call_count.clone();
2876                move |_, _| {
2877                    count.fetch_add(1, Ordering::SeqCst);
2878                }
2879            },
2880            {
2881                let count = call_count.clone();
2882                move |_, _| {
2883                    count.fetch_add(1, Ordering::SeqCst);
2884                }
2885            },
2886            {
2887                let count = call_count.clone();
2888                move |_, _| {
2889                    count.fetch_add(1, Ordering::SeqCst);
2890                }
2891            },
2892        ));
2893
2894        let logger = ReplayAwareLogger::allow_all(inner);
2895
2896        // All logs should pass through even during replay
2897        let replay_info = LogInfo::new("arn:test").with_replay(true);
2898        logger.debug("test", &replay_info);
2899        logger.info("test", &replay_info);
2900        logger.warn("test", &replay_info);
2901        logger.error("test", &replay_info);
2902
2903        assert_eq!(call_count.load(Ordering::SeqCst), 4);
2904    }
2905
2906    #[test]
2907    fn test_replay_aware_logger_errors_only() {
2908        use std::sync::atomic::{AtomicUsize, Ordering};
2909
2910        let debug_count = Arc::new(AtomicUsize::new(0));
2911        let info_count = Arc::new(AtomicUsize::new(0));
2912        let warn_count = Arc::new(AtomicUsize::new(0));
2913        let error_count = Arc::new(AtomicUsize::new(0));
2914
2915        let inner = Arc::new(custom_logger(
2916            {
2917                let count = debug_count.clone();
2918                move |_, _| {
2919                    count.fetch_add(1, Ordering::SeqCst);
2920                }
2921            },
2922            {
2923                let count = info_count.clone();
2924                move |_, _| {
2925                    count.fetch_add(1, Ordering::SeqCst);
2926                }
2927            },
2928            {
2929                let count = warn_count.clone();
2930                move |_, _| {
2931                    count.fetch_add(1, Ordering::SeqCst);
2932                }
2933            },
2934            {
2935                let count = error_count.clone();
2936                move |_, _| {
2937                    count.fetch_add(1, Ordering::SeqCst);
2938                }
2939            },
2940        ));
2941
2942        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::ErrorsOnly);
2943
2944        // During replay, only errors should pass through
2945        let replay_info = LogInfo::new("arn:test").with_replay(true);
2946        logger.debug("test", &replay_info);
2947        logger.info("test", &replay_info);
2948        logger.warn("test", &replay_info);
2949        logger.error("test", &replay_info);
2950
2951        assert_eq!(debug_count.load(Ordering::SeqCst), 0);
2952        assert_eq!(info_count.load(Ordering::SeqCst), 0);
2953        assert_eq!(warn_count.load(Ordering::SeqCst), 0);
2954        assert_eq!(error_count.load(Ordering::SeqCst), 1);
2955    }
2956
2957    #[test]
2958    fn test_replay_aware_logger_warnings_and_errors() {
2959        use std::sync::atomic::{AtomicUsize, Ordering};
2960
2961        let debug_count = Arc::new(AtomicUsize::new(0));
2962        let info_count = Arc::new(AtomicUsize::new(0));
2963        let warn_count = Arc::new(AtomicUsize::new(0));
2964        let error_count = Arc::new(AtomicUsize::new(0));
2965
2966        let inner = Arc::new(custom_logger(
2967            {
2968                let count = debug_count.clone();
2969                move |_, _| {
2970                    count.fetch_add(1, Ordering::SeqCst);
2971                }
2972            },
2973            {
2974                let count = info_count.clone();
2975                move |_, _| {
2976                    count.fetch_add(1, Ordering::SeqCst);
2977                }
2978            },
2979            {
2980                let count = warn_count.clone();
2981                move |_, _| {
2982                    count.fetch_add(1, Ordering::SeqCst);
2983                }
2984            },
2985            {
2986                let count = error_count.clone();
2987                move |_, _| {
2988                    count.fetch_add(1, Ordering::SeqCst);
2989                }
2990            },
2991        ));
2992
2993        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::WarningsAndErrors);
2994
2995        // During replay, only warnings and errors should pass through
2996        let replay_info = LogInfo::new("arn:test").with_replay(true);
2997        logger.debug("test", &replay_info);
2998        logger.info("test", &replay_info);
2999        logger.warn("test", &replay_info);
3000        logger.error("test", &replay_info);
3001
3002        assert_eq!(debug_count.load(Ordering::SeqCst), 0);
3003        assert_eq!(info_count.load(Ordering::SeqCst), 0);
3004        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3005        assert_eq!(error_count.load(Ordering::SeqCst), 1);
3006    }
3007
3008    #[test]
3009    fn test_replay_aware_logger_suppress_replay_constructor() {
3010        let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3011        let logger = ReplayAwareLogger::suppress_replay(inner);
3012
3013        assert_eq!(logger.config(), ReplayLoggingConfig::SuppressAll);
3014    }
3015
3016    #[test]
3017    fn test_replay_aware_logger_debug() {
3018        let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3019        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
3020
3021        let debug_str = format!("{:?}", logger);
3022        assert!(debug_str.contains("ReplayAwareLogger"));
3023        assert!(debug_str.contains("SuppressAll"));
3024    }
3025
3026    #[test]
3027    fn test_durable_context_create_log_info_with_replay_method() {
3028        let state = create_test_state();
3029        let ctx = DurableContext::new(state);
3030
3031        let info = ctx.create_log_info_with_replay("op-123", true);
3032
3033        assert_eq!(info.operation_id, Some("op-123".to_string()));
3034        assert!(info.is_replay);
3035    }
3036
3037    #[test]
3038    fn test_durable_context_clone() {
3039        let state = create_test_state();
3040        let ctx = DurableContext::new(state);
3041
3042        ctx.next_operation_id();
3043        ctx.next_operation_id();
3044
3045        let cloned = ctx.clone();
3046
3047        assert_eq!(cloned.durable_execution_arn(), ctx.durable_execution_arn());
3048        assert_eq!(cloned.current_step_counter(), ctx.current_step_counter());
3049    }
3050
3051    #[test]
3052    fn test_durable_context_debug() {
3053        let state = create_test_state();
3054        let ctx = DurableContext::new(state);
3055
3056        let debug_str = format!("{:?}", ctx);
3057
3058        assert!(debug_str.contains("DurableContext"));
3059        assert!(debug_str.contains("durable_execution_arn"));
3060    }
3061
3062    #[test]
3063    fn test_durable_context_state_access() {
3064        let state = create_test_state();
3065        let ctx = DurableContext::new(state.clone());
3066
3067        // Verify we can access the state
3068        assert!(Arc::ptr_eq(ctx.state(), &state));
3069    }
3070
3071    #[test]
3072    fn test_durable_context_send_sync() {
3073        // This test verifies at compile time that DurableContext is Send + Sync
3074        fn assert_send_sync<T: Send + Sync>() {}
3075        assert_send_sync::<DurableContext>();
3076    }
3077
3078    // ========================================================================
3079    // Simplified Logging API Tests
3080    // ========================================================================
3081
3082    #[test]
3083    fn test_log_info_method() {
3084        use std::sync::atomic::{AtomicUsize, Ordering};
3085        use std::sync::Mutex;
3086
3087        let info_count = Arc::new(AtomicUsize::new(0));
3088        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3089
3090        let captured_info_clone = captured_info.clone();
3091        let inner = Arc::new(custom_logger(
3092            |_, _| {},
3093            {
3094                let count = info_count.clone();
3095                move |_, info: &LogInfo| {
3096                    count.fetch_add(1, Ordering::SeqCst);
3097                    *captured_info_clone.lock().unwrap() = Some(info.clone());
3098                }
3099            },
3100            |_, _| {},
3101            |_, _| {},
3102        ));
3103
3104        let state = create_test_state();
3105        let ctx = DurableContext::new(state).with_logger(inner);
3106
3107        ctx.log_info("Test message");
3108
3109        assert_eq!(info_count.load(Ordering::SeqCst), 1);
3110
3111        let captured = captured_info.lock().unwrap();
3112        let info = captured.as_ref().unwrap();
3113        assert_eq!(
3114            info.durable_execution_arn,
3115            Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
3116        );
3117    }
3118
3119    #[test]
3120    fn test_log_debug_method() {
3121        use std::sync::atomic::{AtomicUsize, Ordering};
3122
3123        let debug_count = Arc::new(AtomicUsize::new(0));
3124
3125        let inner = Arc::new(custom_logger(
3126            {
3127                let count = debug_count.clone();
3128                move |_, _| {
3129                    count.fetch_add(1, Ordering::SeqCst);
3130                }
3131            },
3132            |_, _| {},
3133            |_, _| {},
3134            |_, _| {},
3135        ));
3136
3137        let state = create_test_state();
3138        let ctx = DurableContext::new(state).with_logger(inner);
3139
3140        ctx.log_debug("Debug message");
3141
3142        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3143    }
3144
3145    #[test]
3146    fn test_log_warn_method() {
3147        use std::sync::atomic::{AtomicUsize, Ordering};
3148
3149        let warn_count = Arc::new(AtomicUsize::new(0));
3150
3151        let inner = Arc::new(custom_logger(
3152            |_, _| {},
3153            |_, _| {},
3154            {
3155                let count = warn_count.clone();
3156                move |_, _| {
3157                    count.fetch_add(1, Ordering::SeqCst);
3158                }
3159            },
3160            |_, _| {},
3161        ));
3162
3163        let state = create_test_state();
3164        let ctx = DurableContext::new(state).with_logger(inner);
3165
3166        ctx.log_warn("Warning message");
3167
3168        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3169    }
3170
3171    #[test]
3172    fn test_log_error_method() {
3173        use std::sync::atomic::{AtomicUsize, Ordering};
3174
3175        let error_count = Arc::new(AtomicUsize::new(0));
3176
3177        let inner = Arc::new(custom_logger(|_, _| {}, |_, _| {}, |_, _| {}, {
3178            let count = error_count.clone();
3179            move |_, _| {
3180                count.fetch_add(1, Ordering::SeqCst);
3181            }
3182        }));
3183
3184        let state = create_test_state();
3185        let ctx = DurableContext::new(state).with_logger(inner);
3186
3187        ctx.log_error("Error message");
3188
3189        assert_eq!(error_count.load(Ordering::SeqCst), 1);
3190    }
3191
3192    #[test]
3193    fn test_log_info_with_extra_fields() {
3194        use std::sync::Mutex;
3195
3196        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3197
3198        let captured_info_clone = captured_info.clone();
3199        let inner = Arc::new(custom_logger(
3200            |_, _| {},
3201            move |_, info: &LogInfo| {
3202                *captured_info_clone.lock().unwrap() = Some(info.clone());
3203            },
3204            |_, _| {},
3205            |_, _| {},
3206        ));
3207
3208        let state = create_test_state();
3209        let ctx = DurableContext::new(state).with_logger(inner);
3210
3211        ctx.log_info_with("Test message", &[("order_id", "123"), ("amount", "99.99")]);
3212
3213        let captured = captured_info.lock().unwrap();
3214        let info = captured.as_ref().unwrap();
3215
3216        // Verify extra fields are present
3217        assert_eq!(info.extra.len(), 2);
3218        assert!(info
3219            .extra
3220            .contains(&("order_id".to_string(), "123".to_string())));
3221        assert!(info
3222            .extra
3223            .contains(&("amount".to_string(), "99.99".to_string())));
3224    }
3225
3226    #[test]
3227    fn test_log_debug_with_extra_fields() {
3228        use std::sync::Mutex;
3229
3230        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3231
3232        let captured_info_clone = captured_info.clone();
3233        let inner = Arc::new(custom_logger(
3234            move |_, info: &LogInfo| {
3235                *captured_info_clone.lock().unwrap() = Some(info.clone());
3236            },
3237            |_, _| {},
3238            |_, _| {},
3239            |_, _| {},
3240        ));
3241
3242        let state = create_test_state();
3243        let ctx = DurableContext::new(state).with_logger(inner);
3244
3245        ctx.log_debug_with("Debug message", &[("key", "value")]);
3246
3247        let captured = captured_info.lock().unwrap();
3248        let info = captured.as_ref().unwrap();
3249
3250        assert_eq!(info.extra.len(), 1);
3251        assert!(info
3252            .extra
3253            .contains(&("key".to_string(), "value".to_string())));
3254    }
3255
3256    #[test]
3257    fn test_log_warn_with_extra_fields() {
3258        use std::sync::Mutex;
3259
3260        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3261
3262        let captured_info_clone = captured_info.clone();
3263        let inner = Arc::new(custom_logger(
3264            |_, _| {},
3265            |_, _| {},
3266            move |_, info: &LogInfo| {
3267                *captured_info_clone.lock().unwrap() = Some(info.clone());
3268            },
3269            |_, _| {},
3270        ));
3271
3272        let state = create_test_state();
3273        let ctx = DurableContext::new(state).with_logger(inner);
3274
3275        ctx.log_warn_with("Warning message", &[("retry", "3")]);
3276
3277        let captured = captured_info.lock().unwrap();
3278        let info = captured.as_ref().unwrap();
3279
3280        assert_eq!(info.extra.len(), 1);
3281        assert!(info.extra.contains(&("retry".to_string(), "3".to_string())));
3282    }
3283
3284    #[test]
3285    fn test_log_error_with_extra_fields() {
3286        use std::sync::Mutex;
3287
3288        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3289
3290        let captured_info_clone = captured_info.clone();
3291        let inner = Arc::new(custom_logger(
3292            |_, _| {},
3293            |_, _| {},
3294            |_, _| {},
3295            move |_, info: &LogInfo| {
3296                *captured_info_clone.lock().unwrap() = Some(info.clone());
3297            },
3298        ));
3299
3300        let state = create_test_state();
3301        let ctx = DurableContext::new(state).with_logger(inner);
3302
3303        ctx.log_error_with(
3304            "Error message",
3305            &[("error_code", "E001"), ("details", "Something went wrong")],
3306        );
3307
3308        let captured = captured_info.lock().unwrap();
3309        let info = captured.as_ref().unwrap();
3310
3311        assert_eq!(info.extra.len(), 2);
3312        assert!(info
3313            .extra
3314            .contains(&("error_code".to_string(), "E001".to_string())));
3315        assert!(info
3316            .extra
3317            .contains(&("details".to_string(), "Something went wrong".to_string())));
3318    }
3319
3320    #[test]
3321    fn test_log_methods_include_parent_id_in_child_context() {
3322        use std::sync::Mutex;
3323
3324        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3325
3326        let captured_info_clone = captured_info.clone();
3327        let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3328            |_, _| {},
3329            move |_, info: &LogInfo| {
3330                *captured_info_clone.lock().unwrap() = Some(info.clone());
3331            },
3332            |_, _| {},
3333            |_, _| {},
3334        ));
3335
3336        let state = create_test_state();
3337        let ctx = DurableContext::new(state).with_logger(inner.clone());
3338
3339        let parent_op_id = ctx.next_operation_id();
3340        let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3341
3342        child_ctx.log_info("Child context message");
3343
3344        let captured = captured_info.lock().unwrap();
3345        let info = captured.as_ref().unwrap();
3346
3347        // Verify parent_id is included
3348        assert_eq!(info.parent_id, Some(parent_op_id));
3349    }
3350
3351    // ========================================================================
3352    // Runtime Logger Reconfiguration Tests (Req 13.1, 13.2)
3353    // ========================================================================
3354
3355    #[test]
3356    fn test_configure_logger_swaps_logger() {
3357        // Req 13.1: configure_logger swaps the logger, subsequent calls use new logger
3358        use std::sync::atomic::{AtomicUsize, Ordering};
3359
3360        let original_count = Arc::new(AtomicUsize::new(0));
3361        let new_count = Arc::new(AtomicUsize::new(0));
3362
3363        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3364            |_, _| {},
3365            {
3366                let count = original_count.clone();
3367                move |_, _| {
3368                    count.fetch_add(1, Ordering::SeqCst);
3369                }
3370            },
3371            |_, _| {},
3372            |_, _| {},
3373        ));
3374
3375        let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3376            |_, _| {},
3377            {
3378                let count = new_count.clone();
3379                move |_, _| {
3380                    count.fetch_add(1, Ordering::SeqCst);
3381                }
3382            },
3383            |_, _| {},
3384            |_, _| {},
3385        ));
3386
3387        let state = create_test_state();
3388        let ctx = DurableContext::new(state).with_logger(original_logger);
3389
3390        // Log with original logger
3391        ctx.log_info("before swap");
3392        assert_eq!(original_count.load(Ordering::SeqCst), 1);
3393        assert_eq!(new_count.load(Ordering::SeqCst), 0);
3394
3395        // Swap logger at runtime (note: &self, not &mut self)
3396        ctx.configure_logger(new_logger);
3397
3398        // Subsequent calls use the new logger
3399        ctx.log_info("after swap");
3400        assert_eq!(original_count.load(Ordering::SeqCst), 1); // unchanged
3401        assert_eq!(new_count.load(Ordering::SeqCst), 1);
3402    }
3403
3404    #[test]
3405    fn test_original_logger_used_when_configure_logger_not_called() {
3406        // Req 13.2: Original logger used when configure_logger not called
3407        use std::sync::atomic::{AtomicUsize, Ordering};
3408
3409        let original_count = Arc::new(AtomicUsize::new(0));
3410
3411        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3412            |_, _| {},
3413            {
3414                let count = original_count.clone();
3415                move |_, _| {
3416                    count.fetch_add(1, Ordering::SeqCst);
3417                }
3418            },
3419            |_, _| {},
3420            |_, _| {},
3421        ));
3422
3423        let state = create_test_state();
3424        let ctx = DurableContext::new(state).with_logger(original_logger);
3425
3426        // Log multiple times without calling configure_logger
3427        ctx.log_info("message 1");
3428        ctx.log_info("message 2");
3429        ctx.log_info("message 3");
3430
3431        assert_eq!(original_count.load(Ordering::SeqCst), 3);
3432    }
3433
3434    #[test]
3435    fn test_configure_logger_affects_child_contexts() {
3436        // Verify that child contexts sharing the same RwLock see the swapped logger
3437        use std::sync::atomic::{AtomicUsize, Ordering};
3438
3439        let original_count = Arc::new(AtomicUsize::new(0));
3440        let new_count = Arc::new(AtomicUsize::new(0));
3441
3442        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3443            |_, _| {},
3444            {
3445                let count = original_count.clone();
3446                move |_, _| {
3447                    count.fetch_add(1, Ordering::SeqCst);
3448                }
3449            },
3450            |_, _| {},
3451            |_, _| {},
3452        ));
3453
3454        let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3455            |_, _| {},
3456            {
3457                let count = new_count.clone();
3458                move |_, _| {
3459                    count.fetch_add(1, Ordering::SeqCst);
3460                }
3461            },
3462            |_, _| {},
3463            |_, _| {},
3464        ));
3465
3466        let state = create_test_state();
3467        let ctx = DurableContext::new(state).with_logger(original_logger);
3468        let parent_op_id = ctx.next_operation_id();
3469        let child_ctx = ctx.create_child_context(&parent_op_id);
3470
3471        // Child uses original logger
3472        child_ctx.log_info("child before swap");
3473        assert_eq!(original_count.load(Ordering::SeqCst), 1);
3474
3475        // Swap on parent
3476        ctx.configure_logger(new_logger);
3477
3478        // Child now uses the new logger (shared RwLock)
3479        child_ctx.log_info("child after swap");
3480        assert_eq!(new_count.load(Ordering::SeqCst), 1);
3481        assert_eq!(original_count.load(Ordering::SeqCst), 1); // unchanged
3482    }
3483}
3484
3485#[cfg(test)]
3486mod property_tests {
3487    use super::*;
3488    use proptest::prelude::*;
3489
3490    // Property 3: Operation ID Determinism
3491    // *For any* DurableContext with a given parent_id and step counter state,
3492    // calling create_operation_id() SHALL produce the same ID when called
3493    // in the same sequence position across multiple executions.
3494    // **Validates: Requirements 1.10**
3495    proptest! {
3496        #![proptest_config(ProptestConfig::with_cases(100))]
3497
3498        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism
3499        /// Validates: Requirements 1.10
3500        #[test]
3501        fn prop_operation_id_determinism(
3502            base_id in "[a-zA-Z0-9:/-]{1,100}",
3503            counter in 0u64..10000u64,
3504        ) {
3505            // Generate the same ID twice with the same inputs
3506            let id1 = generate_operation_id(&base_id, counter);
3507            let id2 = generate_operation_id(&base_id, counter);
3508
3509            // IDs must be identical for the same inputs
3510            prop_assert_eq!(&id1, &id2, "Same base_id and counter must produce identical IDs");
3511
3512            // ID must be a valid hex string of expected length
3513            prop_assert_eq!(id1.len(), 32, "ID must be 32 hex characters");
3514            prop_assert!(id1.chars().all(|c| c.is_ascii_hexdigit()), "ID must be valid hex");
3515        }
3516
3517        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism (Generator)
3518        /// Validates: Requirements 1.10
3519        #[test]
3520        fn prop_operation_id_generator_determinism(
3521            base_id in "[a-zA-Z0-9:/-]{1,100}",
3522            num_ids in 1usize..50usize,
3523        ) {
3524            // Create two generators with the same base ID
3525            let gen1 = OperationIdGenerator::new(&base_id);
3526            let gen2 = OperationIdGenerator::new(&base_id);
3527
3528            // Generate the same sequence of IDs from both
3529            let ids1: Vec<String> = (0..num_ids).map(|_| gen1.next_id()).collect();
3530            let ids2: Vec<String> = (0..num_ids).map(|_| gen2.next_id()).collect();
3531
3532            // Sequences must be identical
3533            prop_assert_eq!(&ids1, &ids2, "Same generator sequence must produce identical IDs");
3534
3535            // All IDs in a sequence must be unique
3536            let unique_count = {
3537                let mut set = std::collections::HashSet::new();
3538                for id in &ids1 {
3539                    set.insert(id.clone());
3540                }
3541                set.len()
3542            };
3543            prop_assert_eq!(unique_count, num_ids, "All IDs in sequence must be unique");
3544        }
3545
3546        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism (Replay Simulation)
3547        /// Validates: Requirements 1.10
3548        #[test]
3549        fn prop_operation_id_replay_determinism(
3550            base_id in "[a-zA-Z0-9:/-]{1,100}",
3551            operations in prop::collection::vec(0u32..3u32, 1..20),
3552        ) {
3553            // Simulate two executions with the same sequence of operations
3554            // Each operation type increments the counter
3555
3556            let gen1 = OperationIdGenerator::new(&base_id);
3557            let gen2 = OperationIdGenerator::new(&base_id);
3558
3559            let mut ids1 = Vec::new();
3560            let mut ids2 = Vec::new();
3561
3562            // First "execution"
3563            for op_type in &operations {
3564                // Each operation generates an ID
3565                ids1.push(gen1.next_id());
3566
3567                // Some operations might generate additional child IDs
3568                if *op_type == 2 {
3569                    let parent_id = ids1.last().unwrap().clone();
3570                    let child_gen = gen1.create_child(parent_id);
3571                    ids1.push(child_gen.next_id());
3572                }
3573            }
3574
3575            // Second "execution" (replay) - must produce same IDs
3576            for op_type in &operations {
3577                ids2.push(gen2.next_id());
3578
3579                if *op_type == 2 {
3580                    let parent_id = ids2.last().unwrap().clone();
3581                    let child_gen = gen2.create_child(parent_id);
3582                    ids2.push(child_gen.next_id());
3583                }
3584            }
3585
3586            // Both executions must produce identical ID sequences
3587            prop_assert_eq!(&ids1, &ids2, "Replay must produce identical operation IDs");
3588        }
3589    }
3590
3591    // Property 5: Concurrent ID Generation Uniqueness
3592    // *For any* number of concurrent tasks generating operation IDs from the same
3593    // DurableContext, all generated IDs SHALL be unique.
3594    // **Validates: Requirements 17.3**
3595    mod concurrent_id_tests {
3596        use super::*;
3597        use std::sync::Arc;
3598        use std::thread;
3599
3600        proptest! {
3601            #![proptest_config(ProptestConfig::with_cases(100))]
3602
3603            /// Feature: durable-execution-rust-sdk, Property 5: Concurrent ID Generation Uniqueness
3604            /// Validates: Requirements 17.3
3605            #[test]
3606            fn prop_concurrent_id_uniqueness(
3607                base_id in "[a-zA-Z0-9:/-]{1,100}",
3608                num_threads in 2usize..10usize,
3609                ids_per_thread in 10usize..100usize,
3610            ) {
3611                let gen = Arc::new(OperationIdGenerator::new(&base_id));
3612                let mut handles = vec![];
3613
3614                // Spawn multiple threads that concurrently generate IDs
3615                for _ in 0..num_threads {
3616                    let gen_clone = gen.clone();
3617                    let count = ids_per_thread;
3618                    handles.push(thread::spawn(move || {
3619                        let mut ids = Vec::with_capacity(count);
3620                        for _ in 0..count {
3621                            ids.push(gen_clone.next_id());
3622                        }
3623                        ids
3624                    }));
3625                }
3626
3627                // Collect all IDs from all threads
3628                let mut all_ids = Vec::new();
3629                for handle in handles {
3630                    all_ids.extend(handle.join().unwrap());
3631                }
3632
3633                let total_expected = num_threads * ids_per_thread;
3634
3635                // Verify we got the expected number of IDs
3636                prop_assert_eq!(all_ids.len(), total_expected, "Should have generated {} IDs", total_expected);
3637
3638                // Verify all IDs are unique
3639                let unique_count = {
3640                    let mut set = std::collections::HashSet::new();
3641                    for id in &all_ids {
3642                        set.insert(id.clone());
3643                    }
3644                    set.len()
3645                };
3646
3647                prop_assert_eq!(
3648                    unique_count,
3649                    total_expected,
3650                    "All {} IDs must be unique, but only {} were unique",
3651                    total_expected,
3652                    unique_count
3653                );
3654
3655                // Verify the counter was incremented correctly
3656                prop_assert_eq!(
3657                    gen.current_counter() as usize,
3658                    total_expected,
3659                    "Counter should equal total IDs generated"
3660                );
3661            }
3662
3663            /// Feature: durable-execution-rust-sdk, Property 5: Concurrent ID Generation Uniqueness (Stress)
3664            /// Validates: Requirements 17.3
3665            #[test]
3666            fn prop_concurrent_id_uniqueness_stress(
3667                base_id in "[a-zA-Z0-9:/-]{1,50}",
3668            ) {
3669                // Fixed high-concurrency stress test
3670                let num_threads = 20;
3671                let ids_per_thread = 500;
3672
3673                let gen = Arc::new(OperationIdGenerator::new(&base_id));
3674                let mut handles = vec![];
3675
3676                for _ in 0..num_threads {
3677                    let gen_clone = gen.clone();
3678                    handles.push(thread::spawn(move || {
3679                        let mut ids = Vec::with_capacity(ids_per_thread);
3680                        for _ in 0..ids_per_thread {
3681                            ids.push(gen_clone.next_id());
3682                        }
3683                        ids
3684                    }));
3685                }
3686
3687                let mut all_ids = Vec::new();
3688                for handle in handles {
3689                    all_ids.extend(handle.join().unwrap());
3690                }
3691
3692                let total_expected = num_threads * ids_per_thread;
3693
3694                // Verify all IDs are unique
3695                let unique_count = {
3696                    let mut set = std::collections::HashSet::new();
3697                    for id in &all_ids {
3698                        set.insert(id.clone());
3699                    }
3700                    set.len()
3701                };
3702
3703                prop_assert_eq!(
3704                    unique_count,
3705                    total_expected,
3706                    "All {} IDs must be unique under high concurrency",
3707                    total_expected
3708                );
3709            }
3710        }
3711    }
3712
3713    // Property 9: Logging Methods Automatic Context
3714    // *For any* call to log_info, log_debug, log_warn, or log_error on DurableContext,
3715    // the resulting log message SHALL include durable_execution_arn and parent_id
3716    // (when available) without the caller needing to specify them.
3717    // **Validates: Requirements 4.5**
3718    mod logging_automatic_context_tests {
3719        use super::*;
3720        use std::sync::{Arc, Mutex};
3721
3722        proptest! {
3723            #![proptest_config(ProptestConfig::with_cases(100))]
3724
3725            /// Feature: sdk-ergonomics-improvements, Property 9: Logging Methods Automatic Context
3726            /// Validates: Requirements 4.5
3727            #[test]
3728            fn prop_logging_automatic_context(
3729                message in "[a-zA-Z0-9 ]{1,100}",
3730                log_level in 0u8..4u8,
3731            ) {
3732                use crate::client::MockDurableServiceClient;
3733                use crate::lambda::InitialExecutionState;
3734
3735                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3736                let captured_info_clone = captured_info.clone();
3737
3738                // Create a logger that captures the LogInfo
3739                let inner = Arc::new(custom_logger(
3740                    {
3741                        let captured = captured_info_clone.clone();
3742                        move |_, info: &LogInfo| {
3743                            *captured.lock().unwrap() = Some(info.clone());
3744                        }
3745                    },
3746                    {
3747                        let captured = captured_info_clone.clone();
3748                        move |_, info: &LogInfo| {
3749                            *captured.lock().unwrap() = Some(info.clone());
3750                        }
3751                    },
3752                    {
3753                        let captured = captured_info_clone.clone();
3754                        move |_, info: &LogInfo| {
3755                            *captured.lock().unwrap() = Some(info.clone());
3756                        }
3757                    },
3758                    {
3759                        let captured = captured_info_clone.clone();
3760                        move |_, info: &LogInfo| {
3761                            *captured.lock().unwrap() = Some(info.clone());
3762                        }
3763                    },
3764                ));
3765
3766                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3767                let state = Arc::new(ExecutionState::new(
3768                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3769                    "token-123",
3770                    InitialExecutionState::new(),
3771                    client,
3772                ));
3773                let ctx = DurableContext::new(state).with_logger(inner);
3774
3775                // Call the appropriate log method based on log_level
3776                match log_level {
3777                    0 => ctx.log_debug(&message),
3778                    1 => ctx.log_info(&message),
3779                    2 => ctx.log_warn(&message),
3780                    _ => ctx.log_error(&message),
3781                }
3782
3783                // Verify the captured LogInfo has the automatic context
3784                let captured = captured_info.lock().unwrap();
3785                let info = captured.as_ref().expect("LogInfo should be captured");
3786
3787                // durable_execution_arn must always be present
3788                prop_assert!(
3789                    info.durable_execution_arn.is_some(),
3790                    "durable_execution_arn must be automatically included"
3791                );
3792                prop_assert_eq!(
3793                    info.durable_execution_arn.as_ref().unwrap(),
3794                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3795                    "durable_execution_arn must match the context's ARN"
3796                );
3797            }
3798
3799            /// Feature: sdk-ergonomics-improvements, Property 9: Logging Methods Automatic Context (Child Context)
3800            /// Validates: Requirements 4.5
3801            #[test]
3802            fn prop_logging_automatic_context_child(
3803                message in "[a-zA-Z0-9 ]{1,100}",
3804                log_level in 0u8..4u8,
3805            ) {
3806                use crate::client::MockDurableServiceClient;
3807                use crate::lambda::InitialExecutionState;
3808
3809                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3810                let captured_info_clone = captured_info.clone();
3811
3812                // Create a logger that captures the LogInfo
3813                let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3814                    {
3815                        let captured = captured_info_clone.clone();
3816                        move |_, info: &LogInfo| {
3817                            *captured.lock().unwrap() = Some(info.clone());
3818                        }
3819                    },
3820                    {
3821                        let captured = captured_info_clone.clone();
3822                        move |_, info: &LogInfo| {
3823                            *captured.lock().unwrap() = Some(info.clone());
3824                        }
3825                    },
3826                    {
3827                        let captured = captured_info_clone.clone();
3828                        move |_, info: &LogInfo| {
3829                            *captured.lock().unwrap() = Some(info.clone());
3830                        }
3831                    },
3832                    {
3833                        let captured = captured_info_clone.clone();
3834                        move |_, info: &LogInfo| {
3835                            *captured.lock().unwrap() = Some(info.clone());
3836                        }
3837                    },
3838                ));
3839
3840                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3841                let state = Arc::new(ExecutionState::new(
3842                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3843                    "token-123",
3844                    InitialExecutionState::new(),
3845                    client,
3846                ));
3847                let ctx = DurableContext::new(state).with_logger(inner.clone());
3848
3849                // Create a child context
3850                let parent_op_id = ctx.next_operation_id();
3851                let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3852
3853                // Call the appropriate log method based on log_level
3854                match log_level {
3855                    0 => child_ctx.log_debug(&message),
3856                    1 => child_ctx.log_info(&message),
3857                    2 => child_ctx.log_warn(&message),
3858                    _ => child_ctx.log_error(&message),
3859                }
3860
3861                // Verify the captured LogInfo has the automatic context including parent_id
3862                let captured = captured_info.lock().unwrap();
3863                let info = captured.as_ref().expect("LogInfo should be captured");
3864
3865                // durable_execution_arn must always be present
3866                prop_assert!(
3867                    info.durable_execution_arn.is_some(),
3868                    "durable_execution_arn must be automatically included in child context"
3869                );
3870
3871                // parent_id must be present in child context
3872                prop_assert!(
3873                    info.parent_id.is_some(),
3874                    "parent_id must be automatically included in child context"
3875                );
3876                prop_assert_eq!(
3877                    info.parent_id.as_ref().unwrap(),
3878                    &parent_op_id,
3879                    "parent_id must match the parent operation ID"
3880                );
3881            }
3882        }
3883    }
3884
3885    // Property 10: Logging Methods Extra Fields
3886    // *For any* call to log_info_with, log_debug_with, log_warn_with, or log_error_with
3887    // with extra fields, those fields SHALL appear in the resulting log output.
3888    // **Validates: Requirements 4.6**
3889    mod logging_extra_fields_tests {
3890        use super::*;
3891        use std::sync::{Arc, Mutex};
3892
3893        proptest! {
3894            #![proptest_config(ProptestConfig::with_cases(100))]
3895
3896            /// Feature: sdk-ergonomics-improvements, Property 10: Logging Methods Extra Fields
3897            /// Validates: Requirements 4.6
3898            #[test]
3899            fn prop_logging_extra_fields(
3900                message in "[a-zA-Z0-9 ]{1,100}",
3901                log_level in 0u8..4u8,
3902                key1 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
3903                value1 in "[a-zA-Z0-9]{1,50}",
3904                key2 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
3905                value2 in "[a-zA-Z0-9]{1,50}",
3906            ) {
3907                use crate::client::MockDurableServiceClient;
3908                use crate::lambda::InitialExecutionState;
3909
3910                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3911                let captured_info_clone = captured_info.clone();
3912
3913                // Create a logger that captures the LogInfo
3914                let inner = Arc::new(custom_logger(
3915                    {
3916                        let captured = captured_info_clone.clone();
3917                        move |_, info: &LogInfo| {
3918                            *captured.lock().unwrap() = Some(info.clone());
3919                        }
3920                    },
3921                    {
3922                        let captured = captured_info_clone.clone();
3923                        move |_, info: &LogInfo| {
3924                            *captured.lock().unwrap() = Some(info.clone());
3925                        }
3926                    },
3927                    {
3928                        let captured = captured_info_clone.clone();
3929                        move |_, info: &LogInfo| {
3930                            *captured.lock().unwrap() = Some(info.clone());
3931                        }
3932                    },
3933                    {
3934                        let captured = captured_info_clone.clone();
3935                        move |_, info: &LogInfo| {
3936                            *captured.lock().unwrap() = Some(info.clone());
3937                        }
3938                    },
3939                ));
3940
3941                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3942                let state = Arc::new(ExecutionState::new(
3943                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3944                    "token-123",
3945                    InitialExecutionState::new(),
3946                    client,
3947                ));
3948                let ctx = DurableContext::new(state).with_logger(inner);
3949
3950                // Create extra fields
3951                let fields: Vec<(&str, &str)> = vec![(&key1, &value1), (&key2, &value2)];
3952
3953                // Call the appropriate log method based on log_level
3954                match log_level {
3955                    0 => ctx.log_debug_with(&message, &fields),
3956                    1 => ctx.log_info_with(&message, &fields),
3957                    2 => ctx.log_warn_with(&message, &fields),
3958                    _ => ctx.log_error_with(&message, &fields),
3959                }
3960
3961                // Verify the captured LogInfo has the extra fields
3962                let captured = captured_info.lock().unwrap();
3963                let info = captured.as_ref().expect("LogInfo should be captured");
3964
3965                // Extra fields must be present
3966                prop_assert_eq!(
3967                    info.extra.len(),
3968                    2,
3969                    "Extra fields must be included in the log output"
3970                );
3971
3972                // Verify each field is present
3973                prop_assert!(
3974                    info.extra.contains(&(key1.clone(), value1.clone())),
3975                    "First extra field must be present: {}={}", key1, value1
3976                );
3977                prop_assert!(
3978                    info.extra.contains(&(key2.clone(), value2.clone())),
3979                    "Second extra field must be present: {}={}", key2, value2
3980                );
3981            }
3982
3983            /// Feature: sdk-ergonomics-improvements, Property 10: Logging Methods Extra Fields (Empty)
3984            /// Validates: Requirements 4.6
3985            #[test]
3986            fn prop_logging_extra_fields_empty(
3987                message in "[a-zA-Z0-9 ]{1,100}",
3988                log_level in 0u8..4u8,
3989            ) {
3990                use crate::client::MockDurableServiceClient;
3991                use crate::lambda::InitialExecutionState;
3992
3993                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3994                let captured_info_clone = captured_info.clone();
3995
3996                // Create a logger that captures the LogInfo
3997                let inner = Arc::new(custom_logger(
3998                    {
3999                        let captured = captured_info_clone.clone();
4000                        move |_, info: &LogInfo| {
4001                            *captured.lock().unwrap() = Some(info.clone());
4002                        }
4003                    },
4004                    {
4005                        let captured = captured_info_clone.clone();
4006                        move |_, info: &LogInfo| {
4007                            *captured.lock().unwrap() = Some(info.clone());
4008                        }
4009                    },
4010                    {
4011                        let captured = captured_info_clone.clone();
4012                        move |_, info: &LogInfo| {
4013                            *captured.lock().unwrap() = Some(info.clone());
4014                        }
4015                    },
4016                    {
4017                        let captured = captured_info_clone.clone();
4018                        move |_, info: &LogInfo| {
4019                            *captured.lock().unwrap() = Some(info.clone());
4020                        }
4021                    },
4022                ));
4023
4024                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4025                let state = Arc::new(ExecutionState::new(
4026                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4027                    "token-123",
4028                    InitialExecutionState::new(),
4029                    client,
4030                ));
4031                let ctx = DurableContext::new(state).with_logger(inner);
4032
4033                // Call the appropriate log method with empty fields
4034                let empty_fields: &[(&str, &str)] = &[];
4035                match log_level {
4036                    0 => ctx.log_debug_with(&message, empty_fields),
4037                    1 => ctx.log_info_with(&message, empty_fields),
4038                    2 => ctx.log_warn_with(&message, empty_fields),
4039                    _ => ctx.log_error_with(&message, empty_fields),
4040                }
4041
4042                // Verify the captured LogInfo has no extra fields
4043                let captured = captured_info.lock().unwrap();
4044                let info = captured.as_ref().expect("LogInfo should be captured");
4045
4046                prop_assert!(
4047                    info.extra.is_empty(),
4048                    "Extra fields should be empty when none are provided"
4049                );
4050
4051                // But automatic context should still be present
4052                prop_assert!(
4053                    info.durable_execution_arn.is_some(),
4054                    "durable_execution_arn must still be present even with empty extra fields"
4055                );
4056            }
4057        }
4058    }
4059}
4060
4061#[cfg(test)]
4062mod sealed_trait_tests {
4063    use super::*;
4064    use std::sync::atomic::{AtomicUsize, Ordering};
4065
4066    /// Tests for sealed Logger trait implementations.
4067    ///
4068    /// The Logger trait is sealed, meaning it cannot be implemented outside this crate.
4069    /// This is enforced at compile time by requiring the private `Sealed` supertrait.
4070    /// External crates attempting to implement Logger will get a compile error:
4071    /// "the trait bound `MyType: Sealed` is not satisfied"
4072    ///
4073    /// These tests verify that the internal implementations work correctly.
4074    mod logger_tests {
4075        use super::*;
4076
4077        #[test]
4078        fn test_tracing_logger_implements_logger() {
4079            // TracingLogger should implement Logger (compile-time check)
4080            let logger: &dyn Logger = &TracingLogger;
4081            let info = LogInfo::default();
4082
4083            // These calls should not panic
4084            logger.debug("test debug", &info);
4085            logger.info("test info", &info);
4086            logger.warn("test warn", &info);
4087            logger.error("test error", &info);
4088        }
4089
4090        #[test]
4091        fn test_replay_aware_logger_implements_logger() {
4092            // ReplayAwareLogger should implement Logger (compile-time check)
4093            let inner = Arc::new(TracingLogger);
4094            let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::AllowAll);
4095            let logger_ref: &dyn Logger = &logger;
4096            let info = LogInfo::default();
4097
4098            // These calls should not panic
4099            logger_ref.debug("test debug", &info);
4100            logger_ref.info("test info", &info);
4101            logger_ref.warn("test warn", &info);
4102            logger_ref.error("test error", &info);
4103        }
4104
4105        #[test]
4106        fn test_custom_logger_implements_logger() {
4107            // CustomLogger should implement Logger (compile-time check)
4108            let call_count = Arc::new(AtomicUsize::new(0));
4109            let count_clone = call_count.clone();
4110
4111            let logger = custom_logger(
4112                {
4113                    let count = count_clone.clone();
4114                    move |_msg, _info| {
4115                        count.fetch_add(1, Ordering::SeqCst);
4116                    }
4117                },
4118                {
4119                    let count = count_clone.clone();
4120                    move |_msg, _info| {
4121                        count.fetch_add(1, Ordering::SeqCst);
4122                    }
4123                },
4124                {
4125                    let count = count_clone.clone();
4126                    move |_msg, _info| {
4127                        count.fetch_add(1, Ordering::SeqCst);
4128                    }
4129                },
4130                {
4131                    let count = count_clone.clone();
4132                    move |_msg, _info| {
4133                        count.fetch_add(1, Ordering::SeqCst);
4134                    }
4135                },
4136            );
4137
4138            let logger_ref: &dyn Logger = &logger;
4139            let info = LogInfo::default();
4140
4141            logger_ref.debug("test", &info);
4142            logger_ref.info("test", &info);
4143            logger_ref.warn("test", &info);
4144            logger_ref.error("test", &info);
4145
4146            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4147        }
4148
4149        #[test]
4150        fn test_simple_custom_logger() {
4151            let call_count = Arc::new(AtomicUsize::new(0));
4152            let count_clone = call_count.clone();
4153
4154            let logger = simple_custom_logger(move |_level, _msg, _info| {
4155                count_clone.fetch_add(1, Ordering::SeqCst);
4156            });
4157
4158            let info = LogInfo::default();
4159
4160            logger.debug("test", &info);
4161            logger.info("test", &info);
4162            logger.warn("test", &info);
4163            logger.error("test", &info);
4164
4165            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4166        }
4167
4168        #[test]
4169        fn test_custom_logger_receives_correct_messages() {
4170            let messages = Arc::new(std::sync::Mutex::new(Vec::new()));
4171            let messages_clone = messages.clone();
4172
4173            let logger = simple_custom_logger(move |level, msg, _info| {
4174                messages_clone
4175                    .lock()
4176                    .unwrap()
4177                    .push(format!("[{}] {}", level, msg));
4178            });
4179
4180            let info = LogInfo::default();
4181
4182            logger.debug("debug message", &info);
4183            logger.info("info message", &info);
4184            logger.warn("warn message", &info);
4185            logger.error("error message", &info);
4186
4187            let logged = messages.lock().unwrap();
4188            assert_eq!(logged.len(), 4);
4189            assert_eq!(logged[0], "[DEBUG] debug message");
4190            assert_eq!(logged[1], "[INFO] info message");
4191            assert_eq!(logged[2], "[WARN] warn message");
4192            assert_eq!(logged[3], "[ERROR] error message");
4193        }
4194
4195        #[test]
4196        fn test_custom_logger_receives_log_info() {
4197            let received_info = Arc::new(std::sync::Mutex::new(None));
4198            let info_clone = received_info.clone();
4199
4200            let logger = simple_custom_logger(move |_level, _msg, info| {
4201                *info_clone.lock().unwrap() = Some(info.clone());
4202            });
4203
4204            let info = LogInfo::new("arn:aws:test")
4205                .with_operation_id("op-123")
4206                .with_parent_id("parent-456")
4207                .with_replay(true);
4208
4209            logger.info("test", &info);
4210
4211            let received = received_info.lock().unwrap().clone().unwrap();
4212            assert_eq!(
4213                received.durable_execution_arn,
4214                Some("arn:aws:test".to_string())
4215            );
4216            assert_eq!(received.operation_id, Some("op-123".to_string()));
4217            assert_eq!(received.parent_id, Some("parent-456".to_string()));
4218            assert!(received.is_replay);
4219        }
4220
4221        #[test]
4222        fn test_replay_aware_logger_suppresses_during_replay() {
4223            let call_count = Arc::new(AtomicUsize::new(0));
4224            let count_clone = call_count.clone();
4225
4226            let inner_logger = Arc::new(custom_logger(
4227                {
4228                    let count = count_clone.clone();
4229                    move |_msg, _info| {
4230                        count.fetch_add(1, Ordering::SeqCst);
4231                    }
4232                },
4233                {
4234                    let count = count_clone.clone();
4235                    move |_msg, _info| {
4236                        count.fetch_add(1, Ordering::SeqCst);
4237                    }
4238                },
4239                {
4240                    let count = count_clone.clone();
4241                    move |_msg, _info| {
4242                        count.fetch_add(1, Ordering::SeqCst);
4243                    }
4244                },
4245                {
4246                    let count = count_clone.clone();
4247                    move |_msg, _info| {
4248                        count.fetch_add(1, Ordering::SeqCst);
4249                    }
4250                },
4251            ));
4252
4253            let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::SuppressAll);
4254
4255            // Non-replay logs should pass through
4256            let non_replay_info = LogInfo::default().with_replay(false);
4257            logger.debug("test", &non_replay_info);
4258            logger.info("test", &non_replay_info);
4259            logger.warn("test", &non_replay_info);
4260            logger.error("test", &non_replay_info);
4261            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4262
4263            // Replay logs should be suppressed
4264            let replay_info = LogInfo::default().with_replay(true);
4265            logger.debug("test", &replay_info);
4266            logger.info("test", &replay_info);
4267            logger.warn("test", &replay_info);
4268            logger.error("test", &replay_info);
4269            assert_eq!(call_count.load(Ordering::SeqCst), 4); // Still 4, no new calls
4270        }
4271
4272        #[test]
4273        fn test_replay_aware_logger_errors_only_during_replay() {
4274            let call_count = Arc::new(AtomicUsize::new(0));
4275            let count_clone = call_count.clone();
4276
4277            let inner_logger = Arc::new(custom_logger(
4278                {
4279                    let count = count_clone.clone();
4280                    move |_msg, _info| {
4281                        count.fetch_add(1, Ordering::SeqCst);
4282                    }
4283                },
4284                {
4285                    let count = count_clone.clone();
4286                    move |_msg, _info| {
4287                        count.fetch_add(1, Ordering::SeqCst);
4288                    }
4289                },
4290                {
4291                    let count = count_clone.clone();
4292                    move |_msg, _info| {
4293                        count.fetch_add(1, Ordering::SeqCst);
4294                    }
4295                },
4296                {
4297                    let count = count_clone.clone();
4298                    move |_msg, _info| {
4299                        count.fetch_add(1, Ordering::SeqCst);
4300                    }
4301                },
4302            ));
4303
4304            let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::ErrorsOnly);
4305
4306            let replay_info = LogInfo::default().with_replay(true);
4307            logger.debug("test", &replay_info);
4308            logger.info("test", &replay_info);
4309            logger.warn("test", &replay_info);
4310            logger.error("test", &replay_info);
4311
4312            // Only error should pass through during replay
4313            assert_eq!(call_count.load(Ordering::SeqCst), 1);
4314        }
4315    }
4316}