Skip to main content

durable_execution_sdk/
context.rs

1//! DurableContext and operation identifier types for the AWS Durable Execution SDK.
2//!
3//! This module provides the main `DurableContext` interface that user code interacts with,
4//! as well as the `OperationIdentifier` type for deterministic operation ID generation.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use blake2::{Blake2b512, Digest};
10
11use crate::error::DurableResult;
12use crate::sealed::Sealed;
13use crate::state::ExecutionState;
14use crate::traits::{DurableValue, StepFn};
15use crate::types::OperationId;
16
17/// Identifies an operation within a durable execution.
18///
19/// Each operation has a unique ID that is deterministically generated
20/// based on the parent context and step counter. This ensures that
21/// the same operation gets the same ID across replays.
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct OperationIdentifier {
24    /// The unique operation ID (deterministically generated)
25    pub operation_id: String,
26    /// The parent operation ID (None for root operations)
27    pub parent_id: Option<String>,
28    /// Optional human-readable name for the operation
29    pub name: Option<String>,
30}
31
32impl OperationIdentifier {
33    /// Creates a new OperationIdentifier with the given values.
34    pub fn new(
35        operation_id: impl Into<String>,
36        parent_id: Option<String>,
37        name: Option<String>,
38    ) -> Self {
39        Self {
40            operation_id: operation_id.into(),
41            parent_id,
42            name,
43        }
44    }
45
46    /// Creates a root operation identifier (no parent).
47    pub fn root(operation_id: impl Into<String>) -> Self {
48        Self {
49            operation_id: operation_id.into(),
50            parent_id: None,
51            name: None,
52        }
53    }
54
55    /// Creates an operation identifier with a parent.
56    pub fn with_parent(operation_id: impl Into<String>, parent_id: impl Into<String>) -> Self {
57        Self {
58            operation_id: operation_id.into(),
59            parent_id: Some(parent_id.into()),
60            name: None,
61        }
62    }
63
64    /// Sets the name for this operation identifier.
65    pub fn with_name(mut self, name: impl Into<String>) -> Self {
66        self.name = Some(name.into());
67        self
68    }
69
70    /// Returns the operation ID as an `OperationId` newtype.
71    #[inline]
72    pub fn operation_id_typed(&self) -> OperationId {
73        OperationId::from(self.operation_id.clone())
74    }
75
76    /// Returns the parent ID as an `OperationId` newtype, if present.
77    #[inline]
78    pub fn parent_id_typed(&self) -> Option<OperationId> {
79        self.parent_id
80            .as_ref()
81            .map(|id| OperationId::from(id.clone()))
82    }
83
84    /// Applies this identifier's parent_id and name to an `OperationUpdate`.
85    ///
86    /// This is a convenience method to avoid repeated boilerplate when building
87    /// operation updates across handlers.
88    pub fn apply_to(
89        &self,
90        mut update: crate::operation::OperationUpdate,
91    ) -> crate::operation::OperationUpdate {
92        if let Some(ref parent_id) = self.parent_id {
93            update = update.with_parent_id(parent_id);
94        }
95        if let Some(ref name) = self.name {
96            update = update.with_name(name);
97        }
98        update
99    }
100}
101
102impl std::fmt::Display for OperationIdentifier {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        if let Some(ref name) = self.name {
105            write!(f, "{}({})", name, self.operation_id)
106        } else {
107            write!(f, "{}", self.operation_id)
108        }
109    }
110}
111
112/// Generates deterministic operation IDs using blake2b hashing.
113///
114/// The ID is generated by hashing the parent ID (or execution ARN for root)
115/// combined with the step counter value. This ensures:
116/// - Same inputs always produce the same ID
117/// - Different step counts produce different IDs
118/// - IDs are unique within an execution
119#[derive(Debug)]
120pub struct OperationIdGenerator {
121    /// The base identifier (execution ARN or parent operation ID)
122    base_id: String,
123    /// Thread-safe step counter
124    step_counter: AtomicU64,
125}
126
127impl OperationIdGenerator {
128    /// Creates a new OperationIdGenerator with the given base ID.
129    ///
130    /// # Arguments
131    ///
132    /// * `base_id` - The base identifier to use for hashing (typically execution ARN or parent ID)
133    pub fn new(base_id: impl Into<String>) -> Self {
134        Self {
135            base_id: base_id.into(),
136            step_counter: AtomicU64::new(0),
137        }
138    }
139
140    /// Creates a new OperationIdGenerator with a specific starting counter value.
141    ///
142    /// # Arguments
143    ///
144    /// * `base_id` - The base identifier to use for hashing
145    /// * `initial_counter` - The initial value for the step counter
146    pub fn with_counter(base_id: impl Into<String>, initial_counter: u64) -> Self {
147        Self {
148            base_id: base_id.into(),
149            step_counter: AtomicU64::new(initial_counter),
150        }
151    }
152
153    /// Generates the next operation ID.
154    ///
155    /// This method atomically increments the step counter and generates
156    /// a deterministic ID based on the base ID and counter value.
157    ///
158    /// # Returns
159    ///
160    /// A unique, deterministic operation ID string.
161    ///
162    /// # Memory Ordering
163    ///
164    /// Uses `Ordering::Relaxed` because the step counter only needs to provide
165    /// unique values - there's no synchronization requirement with other data.
166    /// Each call gets a unique counter value, and the hash function ensures
167    /// deterministic ID generation regardless of ordering between threads.
168    ///
169    /// Requirements: 4.1, 4.6
170    pub fn next_id(&self) -> String {
171        // Relaxed ordering is sufficient: we only need uniqueness, not synchronization.
172        // The counter value is combined with base_id in a hash, so the only requirement
173        // is that each call gets a different counter value, which fetch_add guarantees.
174        let counter = self.step_counter.fetch_add(1, Ordering::Relaxed);
175        generate_operation_id(&self.base_id, counter)
176    }
177
178    /// Generates an operation ID for a specific counter value without incrementing.
179    ///
180    /// This is useful for testing or when you need to generate an ID
181    /// for a known counter value.
182    ///
183    /// # Arguments
184    ///
185    /// * `counter` - The counter value to use for ID generation
186    ///
187    /// # Returns
188    ///
189    /// A deterministic operation ID string.
190    pub fn id_for_counter(&self, counter: u64) -> String {
191        generate_operation_id(&self.base_id, counter)
192    }
193
194    /// Returns the current counter value without incrementing.
195    ///
196    /// # Memory Ordering
197    ///
198    /// Uses `Ordering::Relaxed` because this is an informational read that doesn't
199    /// need to synchronize with other operations. The value may be slightly stale
200    /// in concurrent scenarios, but this is acceptable for debugging/monitoring use.
201    ///
202    /// Requirements: 4.1, 4.6
203    pub fn current_counter(&self) -> u64 {
204        // Relaxed ordering is sufficient: this is an informational read with no
205        // synchronization requirements. Callers should not rely on this value
206        // for correctness in concurrent scenarios.
207        self.step_counter.load(Ordering::Relaxed)
208    }
209
210    /// Returns the base ID used for generation.
211    pub fn base_id(&self) -> &str {
212        &self.base_id
213    }
214
215    /// Creates a child generator with a new base ID.
216    ///
217    /// The child generator starts with counter 0 and uses the provided
218    /// parent operation ID as its base.
219    ///
220    /// # Arguments
221    ///
222    /// * `parent_operation_id` - The operation ID to use as the base for the child
223    pub fn create_child(&self, parent_operation_id: impl Into<String>) -> Self {
224        Self::new(parent_operation_id)
225    }
226}
227
228impl Clone for OperationIdGenerator {
229    fn clone(&self) -> Self {
230        Self {
231            base_id: self.base_id.clone(),
232            // Relaxed ordering is sufficient: we're creating a snapshot of the counter
233            // for a new generator. The clone doesn't need to synchronize with the
234            // original - it just needs a reasonable starting point.
235            step_counter: AtomicU64::new(self.step_counter.load(Ordering::Relaxed)),
236        }
237    }
238}
239
240/// Generates a deterministic operation ID using blake2b hashing.
241///
242/// # Arguments
243///
244/// * `base_id` - The base identifier (execution ARN or parent operation ID)
245/// * `counter` - The step counter value
246///
247/// # Returns
248///
249/// A hex-encoded operation ID string (first 32 characters of the hash).
250pub fn generate_operation_id(base_id: &str, counter: u64) -> String {
251    let mut hasher = Blake2b512::new();
252    hasher.update(base_id.as_bytes());
253    hasher.update(counter.to_le_bytes());
254    let result = hasher.finalize();
255
256    // Take first 16 bytes (32 hex chars) for a reasonably short but unique ID
257    hex::encode(&result[..16])
258}
259
260/// Logger trait for structured logging in durable executions.
261///
262/// This trait is compatible with the `tracing` crate and allows
263/// custom logging implementations.
264///
265/// # Sealed Trait
266///
267/// This trait is sealed and cannot be implemented outside of this crate.
268/// This allows the SDK maintainers to evolve the logging interface without
269/// breaking external code. If you need custom logging behavior, use the
270/// provided factory functions or wrap one of the existing implementations.
271#[allow(private_bounds)]
272pub trait Logger: Sealed + Send + Sync {
273    /// Logs a debug message.
274    fn debug(&self, message: &str, info: &LogInfo);
275    /// Logs an info message.
276    fn info(&self, message: &str, info: &LogInfo);
277    /// Logs a warning message.
278    fn warn(&self, message: &str, info: &LogInfo);
279    /// Logs an error message.
280    fn error(&self, message: &str, info: &LogInfo);
281}
282
283/// Context information for log messages.
284///
285/// This struct provides context for log messages including execution ARN,
286/// operation IDs, and replay status. The `is_replay` flag indicates whether
287/// the current operation is being replayed from a checkpoint.
288#[derive(Debug, Clone, Default)]
289pub struct LogInfo {
290    /// The durable execution ARN
291    pub durable_execution_arn: Option<String>,
292    /// The current operation ID
293    pub operation_id: Option<String>,
294    /// The parent operation ID
295    pub parent_id: Option<String>,
296    /// Whether the current operation is being replayed from a checkpoint
297    ///
298    /// When `true`, the operation is returning a previously checkpointed result
299    /// without re-executing the operation's logic. Loggers can use this flag
300    /// to suppress or annotate logs during replay.
301    pub is_replay: bool,
302    /// Additional key-value pairs
303    pub extra: Vec<(String, String)>,
304}
305
306impl LogInfo {
307    /// Creates a new LogInfo with the given execution ARN.
308    pub fn new(durable_execution_arn: impl Into<String>) -> Self {
309        Self {
310            durable_execution_arn: Some(durable_execution_arn.into()),
311            ..Default::default()
312        }
313    }
314
315    /// Sets the operation ID.
316    pub fn with_operation_id(mut self, operation_id: impl Into<String>) -> Self {
317        self.operation_id = Some(operation_id.into());
318        self
319    }
320
321    /// Sets the parent ID.
322    pub fn with_parent_id(mut self, parent_id: impl Into<String>) -> Self {
323        self.parent_id = Some(parent_id.into());
324        self
325    }
326
327    /// Sets the replay flag.
328    ///
329    /// When set to `true`, indicates that the current operation is being
330    /// replayed from a checkpoint rather than executing fresh.
331    ///
332    /// # Arguments
333    ///
334    /// * `is_replay` - Whether the operation is being replayed
335    pub fn with_replay(mut self, is_replay: bool) -> Self {
336        self.is_replay = is_replay;
337        self
338    }
339
340    /// Adds an extra key-value pair.
341    pub fn with_extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
342        self.extra.push((key.into(), value.into()));
343        self
344    }
345}
346
347/// Creates a tracing span for a durable operation.
348///
349/// This function creates a structured tracing span that includes all relevant
350/// context for observability and debugging. The span can be used to trace
351/// execution flow and correlate logs across operations.
352///
353/// # Arguments
354///
355/// * `operation_type` - The type of operation (e.g., "step", "wait", "callback", "invoke", "map", "parallel")
356/// * `op_id` - The operation identifier containing operation_id, parent_id, and name
357/// * `durable_execution_arn` - The ARN of the durable execution
358///
359/// # Returns
360///
361/// A `tracing::Span` that can be entered during operation execution.
362///
363/// # Example
364///
365/// ```rust,ignore
366/// let span = create_operation_span("step", &op_id, state.durable_execution_arn());
367/// let _guard = span.enter();
368/// // ... operation execution ...
369/// ```
370pub fn create_operation_span(
371    operation_type: &str,
372    op_id: &OperationIdentifier,
373    durable_execution_arn: &str,
374) -> tracing::Span {
375    tracing::info_span!(
376        "durable_operation",
377        operation_type = %operation_type,
378        operation_id = %op_id.operation_id,
379        parent_id = ?op_id.parent_id,
380        name = ?op_id.name,
381        durable_execution_arn = %durable_execution_arn,
382        status = tracing::field::Empty,
383    )
384}
385
386/// Default logger implementation using the `tracing` crate.
387///
388/// This logger includes the `is_replay` flag in all log messages to help
389/// distinguish between fresh executions and replayed operations.
390///
391/// Extra fields from `LogInfo` are included in the tracing output as a formatted
392/// string of key-value pairs, making them queryable in log aggregation systems.
393#[derive(Debug, Clone, Default)]
394pub struct TracingLogger;
395
396// Implement Sealed for TracingLogger to allow it to implement Logger
397impl Sealed for TracingLogger {}
398
399impl TracingLogger {
400    /// Formats extra fields as a string of key-value pairs.
401    ///
402    /// Returns an empty string if there are no extra fields, otherwise returns
403    /// a comma-separated list of "key=value" pairs.
404    fn format_extra_fields(extra: &[(String, String)]) -> String {
405        if extra.is_empty() {
406            String::new()
407        } else {
408            extra
409                .iter()
410                .map(|(k, v)| format!("{}={}", k, v))
411                .collect::<Vec<_>>()
412                .join(", ")
413        }
414    }
415}
416
417impl Logger for TracingLogger {
418    fn debug(&self, message: &str, info: &LogInfo) {
419        let extra_fields = Self::format_extra_fields(&info.extra);
420        tracing::debug!(
421            durable_execution_arn = ?info.durable_execution_arn,
422            operation_id = ?info.operation_id,
423            parent_id = ?info.parent_id,
424            is_replay = info.is_replay,
425            extra = %extra_fields,
426            "{}",
427            message
428        );
429    }
430
431    fn info(&self, message: &str, info: &LogInfo) {
432        let extra_fields = Self::format_extra_fields(&info.extra);
433        tracing::info!(
434            durable_execution_arn = ?info.durable_execution_arn,
435            operation_id = ?info.operation_id,
436            parent_id = ?info.parent_id,
437            is_replay = info.is_replay,
438            extra = %extra_fields,
439            "{}",
440            message
441        );
442    }
443
444    fn warn(&self, message: &str, info: &LogInfo) {
445        let extra_fields = Self::format_extra_fields(&info.extra);
446        tracing::warn!(
447            durable_execution_arn = ?info.durable_execution_arn,
448            operation_id = ?info.operation_id,
449            parent_id = ?info.parent_id,
450            is_replay = info.is_replay,
451            extra = %extra_fields,
452            "{}",
453            message
454        );
455    }
456
457    fn error(&self, message: &str, info: &LogInfo) {
458        let extra_fields = Self::format_extra_fields(&info.extra);
459        tracing::error!(
460            durable_execution_arn = ?info.durable_execution_arn,
461            operation_id = ?info.operation_id,
462            parent_id = ?info.parent_id,
463            is_replay = info.is_replay,
464            extra = %extra_fields,
465            "{}",
466            message
467        );
468    }
469}
470
471/// Configuration for replay-aware logging behavior.
472///
473/// This configuration controls how the `ReplayAwareLogger` handles log messages
474/// during replay. Users can choose to suppress all logs during replay, allow
475/// only certain log levels, or log all messages regardless of replay status.
476#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
477pub enum ReplayLoggingConfig {
478    /// Suppress all logs during replay (default).
479    ///
480    /// When in replay mode, no log messages will be emitted. This is useful
481    /// to reduce noise in logs when replaying previously executed operations.
482    #[default]
483    SuppressAll,
484
485    /// Allow all logs during replay.
486    ///
487    /// Log messages will be emitted regardless of replay status. The `is_replay`
488    /// flag in `LogInfo` can still be used to distinguish replay logs.
489    AllowAll,
490
491    /// Allow only error logs during replay.
492    ///
493    /// Only error-level messages will be emitted during replay. This is useful
494    /// when you want to see errors but suppress informational messages.
495    ErrorsOnly,
496
497    /// Allow error and warning logs during replay.
498    ///
499    /// Error and warning-level messages will be emitted during replay.
500    /// Debug and info messages will be suppressed.
501    WarningsAndErrors,
502}
503
504/// A logger wrapper that can suppress logs during replay based on configuration.
505///
506/// `ReplayAwareLogger` wraps any `Logger` implementation and adds replay-aware
507/// behavior. When the `is_replay` flag in `LogInfo` is `true`, the logger will
508/// suppress or allow logs based on the configured `ReplayLoggingConfig`.
509///
510/// # Example
511///
512/// ```rust
513/// use durable_execution_sdk::{TracingLogger, ReplayAwareLogger, ReplayLoggingConfig};
514/// use std::sync::Arc;
515///
516/// // Create a replay-aware logger that suppresses all logs during replay
517/// let logger = ReplayAwareLogger::new(
518///     Arc::new(TracingLogger),
519///     ReplayLoggingConfig::SuppressAll,
520/// );
521///
522/// // Create a replay-aware logger that allows errors during replay
523/// let logger_with_errors = ReplayAwareLogger::new(
524///     Arc::new(TracingLogger),
525///     ReplayLoggingConfig::ErrorsOnly,
526/// );
527/// ```
528pub struct ReplayAwareLogger {
529    /// The underlying logger to delegate to
530    inner: Arc<dyn Logger>,
531    /// Configuration for replay logging behavior
532    config: ReplayLoggingConfig,
533}
534
535// Implement Sealed for ReplayAwareLogger to allow it to implement Logger
536impl Sealed for ReplayAwareLogger {}
537
538impl ReplayAwareLogger {
539    /// Creates a new `ReplayAwareLogger` with the specified configuration.
540    ///
541    /// # Arguments
542    ///
543    /// * `inner` - The underlying logger to delegate to
544    /// * `config` - Configuration for replay logging behavior
545    ///
546    /// # Example
547    ///
548    /// ```rust
549    /// use durable_execution_sdk::{TracingLogger, ReplayAwareLogger, ReplayLoggingConfig};
550    /// use std::sync::Arc;
551    ///
552    /// let logger = ReplayAwareLogger::new(
553    ///     Arc::new(TracingLogger),
554    ///     ReplayLoggingConfig::SuppressAll,
555    /// );
556    /// ```
557    pub fn new(inner: Arc<dyn Logger>, config: ReplayLoggingConfig) -> Self {
558        Self { inner, config }
559    }
560
561    /// Creates a new `ReplayAwareLogger` that suppresses all logs during replay.
562    ///
563    /// This is a convenience constructor for the most common use case.
564    ///
565    /// # Arguments
566    ///
567    /// * `inner` - The underlying logger to delegate to
568    pub fn suppress_replay(inner: Arc<dyn Logger>) -> Self {
569        Self::new(inner, ReplayLoggingConfig::SuppressAll)
570    }
571
572    /// Creates a new `ReplayAwareLogger` that allows all logs during replay.
573    ///
574    /// # Arguments
575    ///
576    /// * `inner` - The underlying logger to delegate to
577    pub fn allow_all(inner: Arc<dyn Logger>) -> Self {
578        Self::new(inner, ReplayLoggingConfig::AllowAll)
579    }
580
581    /// Returns the current replay logging configuration.
582    pub fn config(&self) -> ReplayLoggingConfig {
583        self.config
584    }
585
586    /// Returns a reference to the underlying logger.
587    pub fn inner(&self) -> &Arc<dyn Logger> {
588        &self.inner
589    }
590
591    /// Checks if a log at the given level should be suppressed during replay.
592    fn should_suppress(&self, info: &LogInfo, level: LogLevel) -> bool {
593        if !info.is_replay {
594            return false;
595        }
596
597        match self.config {
598            ReplayLoggingConfig::SuppressAll => true,
599            ReplayLoggingConfig::AllowAll => false,
600            ReplayLoggingConfig::ErrorsOnly => level != LogLevel::Error,
601            ReplayLoggingConfig::WarningsAndErrors => {
602                level != LogLevel::Error && level != LogLevel::Warn
603            }
604        }
605    }
606}
607
608/// Internal enum for log levels used by ReplayAwareLogger.
609#[derive(Debug, Clone, Copy, PartialEq, Eq)]
610enum LogLevel {
611    Debug,
612    Info,
613    Warn,
614    Error,
615}
616
617impl std::fmt::Debug for ReplayAwareLogger {
618    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
619        f.debug_struct("ReplayAwareLogger")
620            .field("config", &self.config)
621            .finish()
622    }
623}
624
625impl Logger for ReplayAwareLogger {
626    fn debug(&self, message: &str, info: &LogInfo) {
627        if !self.should_suppress(info, LogLevel::Debug) {
628            self.inner.debug(message, info);
629        }
630    }
631
632    fn info(&self, message: &str, info: &LogInfo) {
633        if !self.should_suppress(info, LogLevel::Info) {
634            self.inner.info(message, info);
635        }
636    }
637
638    fn warn(&self, message: &str, info: &LogInfo) {
639        if !self.should_suppress(info, LogLevel::Warn) {
640            self.inner.warn(message, info);
641        }
642    }
643
644    fn error(&self, message: &str, info: &LogInfo) {
645        if !self.should_suppress(info, LogLevel::Error) {
646            self.inner.error(message, info);
647        }
648    }
649}
650
651/// A custom logger that delegates to user-provided closures.
652///
653/// This struct allows users to provide custom logging behavior without
654/// implementing the sealed `Logger` trait directly. Each log level can
655/// have its own closure for handling log messages.
656///
657/// # Example
658///
659/// ```rust
660/// use durable_execution_sdk::context::{custom_logger, LogInfo};
661/// use std::sync::Arc;
662///
663/// // Create a custom logger that prints to stdout
664/// let logger = custom_logger(
665///     |msg, info| println!("[DEBUG] {}: {:?}", msg, info),
666///     |msg, info| println!("[INFO] {}: {:?}", msg, info),
667///     |msg, info| println!("[WARN] {}: {:?}", msg, info),
668///     |msg, info| println!("[ERROR] {}: {:?}", msg, info),
669/// );
670/// ```
671pub struct CustomLogger<D, I, W, E>
672where
673    D: Fn(&str, &LogInfo) + Send + Sync,
674    I: Fn(&str, &LogInfo) + Send + Sync,
675    W: Fn(&str, &LogInfo) + Send + Sync,
676    E: Fn(&str, &LogInfo) + Send + Sync,
677{
678    debug_fn: D,
679    info_fn: I,
680    warn_fn: W,
681    error_fn: E,
682}
683
684// Implement Sealed for CustomLogger to allow it to implement Logger
685impl<D, I, W, E> Sealed for CustomLogger<D, I, W, E>
686where
687    D: Fn(&str, &LogInfo) + Send + Sync,
688    I: Fn(&str, &LogInfo) + Send + Sync,
689    W: Fn(&str, &LogInfo) + Send + Sync,
690    E: Fn(&str, &LogInfo) + Send + Sync,
691{
692}
693
694impl<D, I, W, E> Logger for CustomLogger<D, I, W, E>
695where
696    D: Fn(&str, &LogInfo) + Send + Sync,
697    I: Fn(&str, &LogInfo) + Send + Sync,
698    W: Fn(&str, &LogInfo) + Send + Sync,
699    E: Fn(&str, &LogInfo) + Send + Sync,
700{
701    fn debug(&self, message: &str, info: &LogInfo) {
702        (self.debug_fn)(message, info);
703    }
704
705    fn info(&self, message: &str, info: &LogInfo) {
706        (self.info_fn)(message, info);
707    }
708
709    fn warn(&self, message: &str, info: &LogInfo) {
710        (self.warn_fn)(message, info);
711    }
712
713    fn error(&self, message: &str, info: &LogInfo) {
714        (self.error_fn)(message, info);
715    }
716}
717
718impl<D, I, W, E> std::fmt::Debug for CustomLogger<D, I, W, E>
719where
720    D: Fn(&str, &LogInfo) + Send + Sync,
721    I: Fn(&str, &LogInfo) + Send + Sync,
722    W: Fn(&str, &LogInfo) + Send + Sync,
723    E: Fn(&str, &LogInfo) + Send + Sync,
724{
725    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726        f.debug_struct("CustomLogger").finish()
727    }
728}
729
730/// Creates a custom logger with user-provided closures for each log level.
731///
732/// This factory function allows users to create custom logging behavior without
733/// implementing the sealed `Logger` trait directly. Each log level has its own
734/// closure that receives the log message and context information.
735///
736/// # Arguments
737///
738/// * `debug_fn` - Closure called for debug-level messages
739/// * `info_fn` - Closure called for info-level messages
740/// * `warn_fn` - Closure called for warning-level messages
741/// * `error_fn` - Closure called for error-level messages
742///
743/// # Example
744///
745/// ```rust
746/// use durable_execution_sdk::context::{custom_logger, LogInfo};
747/// use std::sync::Arc;
748///
749/// // Create a custom logger that prints to stdout
750/// let logger = custom_logger(
751///     |msg, info| println!("[DEBUG] {}: {:?}", msg, info),
752///     |msg, info| println!("[INFO] {}: {:?}", msg, info),
753///     |msg, info| println!("[WARN] {}: {:?}", msg, info),
754///     |msg, info| println!("[ERROR] {}: {:?}", msg, info),
755/// );
756///
757/// // Use with Arc for sharing across contexts
758/// let shared_logger = Arc::new(logger);
759/// ```
760pub fn custom_logger<D, I, W, E>(
761    debug_fn: D,
762    info_fn: I,
763    warn_fn: W,
764    error_fn: E,
765) -> CustomLogger<D, I, W, E>
766where
767    D: Fn(&str, &LogInfo) + Send + Sync,
768    I: Fn(&str, &LogInfo) + Send + Sync,
769    W: Fn(&str, &LogInfo) + Send + Sync,
770    E: Fn(&str, &LogInfo) + Send + Sync,
771{
772    CustomLogger {
773        debug_fn,
774        info_fn,
775        warn_fn,
776        error_fn,
777    }
778}
779
780/// Creates a simple custom logger that uses a single closure for all log levels.
781///
782/// This is a convenience function for cases where you want the same handling
783/// for all log levels. The closure receives the log level as a string, the
784/// message, and the log info.
785///
786/// # Arguments
787///
788/// * `log_fn` - Closure called for all log messages. Receives (level, message, info).
789///
790/// # Example
791///
792/// ```rust
793/// use durable_execution_sdk::context::{simple_custom_logger, LogInfo};
794/// use std::sync::Arc;
795///
796/// // Create a simple logger that prints all messages with their level
797/// let logger = simple_custom_logger(|level, msg, info| {
798///     println!("[{}] {}: {:?}", level, msg, info);
799/// });
800///
801/// // Use with Arc for sharing across contexts
802/// let shared_logger = Arc::new(logger);
803/// ```
804pub fn simple_custom_logger<F>(log_fn: F) -> impl Logger
805where
806    F: Fn(&str, &str, &LogInfo) + Send + Sync + Clone + 'static,
807{
808    let debug_fn = log_fn.clone();
809    let info_fn = log_fn.clone();
810    let warn_fn = log_fn.clone();
811    let error_fn = log_fn;
812
813    custom_logger(
814        move |msg, info| debug_fn("DEBUG", msg, info),
815        move |msg, info| info_fn("INFO", msg, info),
816        move |msg, info| warn_fn("WARN", msg, info),
817        move |msg, info| error_fn("ERROR", msg, info),
818    )
819}
820
821/// The main context for durable execution operations.
822///
823/// `DurableContext` provides all the durable operations that user code
824/// can use to build reliable workflows. It handles checkpointing, replay,
825/// and state management automatically.
826///
827/// # Thread Safety
828///
829/// `DurableContext` is `Send + Sync` and can be safely shared across
830/// async tasks. The internal state uses appropriate synchronization
831/// primitives for concurrent access.
832///
833/// # Example
834///
835/// ```rust,ignore
836/// async fn my_workflow(ctx: DurableContext) -> Result<String, DurableError> {
837///     // Execute a step (automatically checkpointed)
838///     let result = ctx.step(|_| Ok("hello".to_string()), None).await?;
839///     
840///     // Wait for 5 seconds (suspends Lambda, resumes later)
841///     ctx.wait(Duration::from_seconds(5), None).await?;
842///     
843///     Ok(result)
844/// }
845/// ```
846pub struct DurableContext {
847    /// The execution state manager
848    state: Arc<ExecutionState>,
849    /// The Lambda context (if running in Lambda)
850    lambda_context: Option<lambda_runtime::Context>,
851    /// The parent operation ID (None for root context)
852    parent_id: Option<String>,
853    /// Operation ID generator for this context
854    id_generator: Arc<OperationIdGenerator>,
855    /// Logger for structured logging (wrapped in RwLock for runtime reconfiguration)
856    logger: Arc<RwLock<Arc<dyn Logger>>>,
857}
858
859// Ensure DurableContext is Send + Sync
860static_assertions::assert_impl_all!(DurableContext: Send, Sync);
861
862impl DurableContext {
863    /// Creates a new DurableContext with the given state.
864    ///
865    /// # Arguments
866    ///
867    /// * `state` - The execution state manager
868    pub fn new(state: Arc<ExecutionState>) -> Self {
869        let base_id = state.durable_execution_arn().to_string();
870        Self {
871            state,
872            lambda_context: None,
873            parent_id: None,
874            id_generator: Arc::new(OperationIdGenerator::new(base_id)),
875            logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
876        }
877    }
878
879    /// Creates a DurableContext from a Lambda context.
880    ///
881    /// This is the primary factory method used when running in AWS Lambda.
882    ///
883    /// # Arguments
884    ///
885    /// * `state` - The execution state manager
886    /// * `lambda_context` - The Lambda runtime context
887    pub fn from_lambda_context(
888        state: Arc<ExecutionState>,
889        lambda_context: lambda_runtime::Context,
890    ) -> Self {
891        let base_id = state.durable_execution_arn().to_string();
892        Self {
893            state,
894            lambda_context: Some(lambda_context),
895            parent_id: None,
896            id_generator: Arc::new(OperationIdGenerator::new(base_id)),
897            logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
898        }
899    }
900
901    /// Creates a child context for nested operations.
902    ///
903    /// Child contexts have their own step counter but share the same
904    /// execution state. Operations in a child context are tracked
905    /// with the parent's operation ID.
906    ///
907    /// # Arguments
908    ///
909    /// * `parent_operation_id` - The operation ID of the parent operation
910    pub fn create_child_context(&self, parent_operation_id: impl Into<String>) -> Self {
911        let parent_id = parent_operation_id.into();
912        Self {
913            state: self.state.clone(),
914            lambda_context: self.lambda_context.clone(),
915            parent_id: Some(parent_id.clone()),
916            id_generator: Arc::new(OperationIdGenerator::new(parent_id)),
917            logger: self.logger.clone(),
918        }
919    }
920
921    /// Sets a custom logger for this context.
922    ///
923    /// # Arguments
924    ///
925    /// * `logger` - The logger implementation to use
926    pub fn set_logger(&mut self, logger: Arc<dyn Logger>) {
927        *self.logger.write().unwrap() = logger;
928    }
929
930    /// Returns a new context with the specified logger.
931    ///
932    /// # Arguments
933    ///
934    /// * `logger` - The logger implementation to use
935    pub fn with_logger(self, logger: Arc<dyn Logger>) -> Self {
936        *self.logger.write().unwrap() = logger;
937        self
938    }
939
940    /// Reconfigures the logger for this context at runtime.
941    ///
942    /// All subsequent log calls on this context (and any clones sharing the
943    /// same underlying `RwLock`) will use the new logger.
944    ///
945    /// # Arguments
946    ///
947    /// * `logger` - The new logger implementation to use
948    pub fn configure_logger(&self, logger: Arc<dyn Logger>) {
949        *self.logger.write().unwrap() = logger;
950    }
951
952    /// Returns a reference to the execution state.
953    pub fn state(&self) -> &Arc<ExecutionState> {
954        &self.state
955    }
956
957    /// Returns the durable execution ARN.
958    pub fn durable_execution_arn(&self) -> &str {
959        self.state.durable_execution_arn()
960    }
961
962    /// Returns the parent operation ID, if any.
963    pub fn parent_id(&self) -> Option<&str> {
964        self.parent_id.as_deref()
965    }
966
967    /// Returns a reference to the Lambda context, if available.
968    pub fn lambda_context(&self) -> Option<&lambda_runtime::Context> {
969        self.lambda_context.as_ref()
970    }
971
972    /// Returns a reference to the logger.
973    pub fn logger(&self) -> Arc<dyn Logger> {
974        self.logger.read().unwrap().clone()
975    }
976
977    /// Generates the next operation ID for this context.
978    ///
979    /// This method is thread-safe and will generate unique, deterministic
980    /// IDs based on the context's base ID and step counter.
981    pub fn next_operation_id(&self) -> String {
982        self.id_generator.next_id()
983    }
984
985    /// Creates an OperationIdentifier for the next operation.
986    ///
987    /// # Arguments
988    ///
989    /// * `name` - Optional human-readable name for the operation
990    pub fn next_operation_identifier(&self, name: Option<String>) -> OperationIdentifier {
991        OperationIdentifier::new(self.next_operation_id(), self.parent_id.clone(), name)
992    }
993
994    /// Returns the current step counter value without incrementing.
995    pub fn current_step_counter(&self) -> u64 {
996        self.id_generator.current_counter()
997    }
998
999    /// Creates log info for the current context.
1000    ///
1001    /// The returned `LogInfo` includes the current replay status from the
1002    /// execution state, allowing loggers to distinguish between fresh
1003    /// executions and replayed operations.
1004    pub fn create_log_info(&self) -> LogInfo {
1005        let mut info = LogInfo::new(self.durable_execution_arn());
1006        if let Some(ref parent_id) = self.parent_id {
1007            info = info.with_parent_id(parent_id);
1008        }
1009        // Include replay status from execution state
1010        info = info.with_replay(self.state.is_replay());
1011        info
1012    }
1013
1014    /// Creates log info with an operation ID.
1015    ///
1016    /// The returned `LogInfo` includes the current replay status from the
1017    /// execution state, allowing loggers to distinguish between fresh
1018    /// executions and replayed operations.
1019    pub fn create_log_info_with_operation(&self, operation_id: &str) -> LogInfo {
1020        self.create_log_info().with_operation_id(operation_id)
1021    }
1022
1023    /// Creates log info with explicit replay status.
1024    ///
1025    /// This method allows callers to explicitly set the replay status,
1026    /// which is useful when the operation-specific replay status differs
1027    /// from the global execution state replay status.
1028    ///
1029    /// # Arguments
1030    ///
1031    /// * `operation_id` - The operation ID to include in the log info
1032    /// * `is_replay` - Whether this specific operation is being replayed
1033    pub fn create_log_info_with_replay(&self, operation_id: &str, is_replay: bool) -> LogInfo {
1034        let mut info = LogInfo::new(self.durable_execution_arn());
1035        if let Some(ref parent_id) = self.parent_id {
1036            info = info.with_parent_id(parent_id);
1037        }
1038        info.with_operation_id(operation_id).with_replay(is_replay)
1039    }
1040
1041    // ========================================================================
1042    // Simplified Logging API
1043    // ========================================================================
1044
1045    /// Logs a message at INFO level with automatic context.
1046    ///
1047    /// This method automatically includes the durable_execution_arn and parent_id
1048    /// in the log output without requiring the caller to specify them.
1049    ///
1050    /// # Arguments
1051    ///
1052    /// * `message` - The message to log
1053    ///
1054    /// # Example
1055    ///
1056    /// ```rust,ignore
1057    /// ctx.log_info("Processing order started");
1058    /// ```
1059    pub fn log_info(&self, message: &str) {
1060        self.log_with_level(LogLevel::Info, message, &[]);
1061    }
1062
1063    /// Logs a message at INFO level with extra fields.
1064    ///
1065    /// This method automatically includes the durable_execution_arn and parent_id
1066    /// in the log output, plus any additional fields specified.
1067    ///
1068    /// # Arguments
1069    ///
1070    /// * `message` - The message to log
1071    /// * `fields` - Additional key-value pairs to include in the log
1072    ///
1073    /// # Example
1074    ///
1075    /// ```rust,ignore
1076    /// ctx.log_info_with("Processing order", &[("order_id", "123"), ("amount", "99.99")]);
1077    /// ```
1078    pub fn log_info_with(&self, message: &str, fields: &[(&str, &str)]) {
1079        self.log_with_level(LogLevel::Info, message, fields);
1080    }
1081
1082    /// Logs a message at DEBUG level with automatic context.
1083    ///
1084    /// This method automatically includes the durable_execution_arn and parent_id
1085    /// in the log output without requiring the caller to specify them.
1086    ///
1087    /// # Arguments
1088    ///
1089    /// * `message` - The message to log
1090    ///
1091    /// # Example
1092    ///
1093    /// ```rust,ignore
1094    /// ctx.log_debug("Entering validation step");
1095    /// ```
1096    pub fn log_debug(&self, message: &str) {
1097        self.log_with_level(LogLevel::Debug, message, &[]);
1098    }
1099
1100    /// Logs a message at DEBUG level with extra fields.
1101    ///
1102    /// This method automatically includes the durable_execution_arn and parent_id
1103    /// in the log output, plus any additional fields specified.
1104    ///
1105    /// # Arguments
1106    ///
1107    /// * `message` - The message to log
1108    /// * `fields` - Additional key-value pairs to include in the log
1109    ///
1110    /// # Example
1111    ///
1112    /// ```rust,ignore
1113    /// ctx.log_debug_with("Variable state", &[("x", "42"), ("y", "100")]);
1114    /// ```
1115    pub fn log_debug_with(&self, message: &str, fields: &[(&str, &str)]) {
1116        self.log_with_level(LogLevel::Debug, message, fields);
1117    }
1118
1119    /// Logs a message at WARN level with automatic context.
1120    ///
1121    /// This method automatically includes the durable_execution_arn and parent_id
1122    /// in the log output without requiring the caller to specify them.
1123    ///
1124    /// # Arguments
1125    ///
1126    /// * `message` - The message to log
1127    ///
1128    /// # Example
1129    ///
1130    /// ```rust,ignore
1131    /// ctx.log_warn("Retry attempt 3 of 5");
1132    /// ```
1133    pub fn log_warn(&self, message: &str) {
1134        self.log_with_level(LogLevel::Warn, message, &[]);
1135    }
1136
1137    /// Logs a message at WARN level with extra fields.
1138    ///
1139    /// This method automatically includes the durable_execution_arn and parent_id
1140    /// in the log output, plus any additional fields specified.
1141    ///
1142    /// # Arguments
1143    ///
1144    /// * `message` - The message to log
1145    /// * `fields` - Additional key-value pairs to include in the log
1146    ///
1147    /// # Example
1148    ///
1149    /// ```rust,ignore
1150    /// ctx.log_warn_with("Rate limit approaching", &[("current", "95"), ("limit", "100")]);
1151    /// ```
1152    pub fn log_warn_with(&self, message: &str, fields: &[(&str, &str)]) {
1153        self.log_with_level(LogLevel::Warn, message, fields);
1154    }
1155
1156    /// Logs a message at ERROR level with automatic context.
1157    ///
1158    /// This method automatically includes the durable_execution_arn and parent_id
1159    /// in the log output without requiring the caller to specify them.
1160    ///
1161    /// # Arguments
1162    ///
1163    /// * `message` - The message to log
1164    ///
1165    /// # Example
1166    ///
1167    /// ```rust,ignore
1168    /// ctx.log_error("Failed to process payment");
1169    /// ```
1170    pub fn log_error(&self, message: &str) {
1171        self.log_with_level(LogLevel::Error, message, &[]);
1172    }
1173
1174    /// Logs a message at ERROR level with extra fields.
1175    ///
1176    /// This method automatically includes the durable_execution_arn and parent_id
1177    /// in the log output, plus any additional fields specified.
1178    ///
1179    /// # Arguments
1180    ///
1181    /// * `message` - The message to log
1182    /// * `fields` - Additional key-value pairs to include in the log
1183    ///
1184    /// # Example
1185    ///
1186    /// ```rust,ignore
1187    /// ctx.log_error_with("Payment failed", &[("error_code", "INSUFFICIENT_FUNDS"), ("amount", "150.00")]);
1188    /// ```
1189    pub fn log_error_with(&self, message: &str, fields: &[(&str, &str)]) {
1190        self.log_with_level(LogLevel::Error, message, fields);
1191    }
1192
1193    /// Internal helper method to log at a specific level with optional extra fields.
1194    ///
1195    /// This method creates a LogInfo with automatic context (durable_execution_arn, parent_id)
1196    /// and any additional fields, then delegates to the configured logger.
1197    ///
1198    /// # Arguments
1199    ///
1200    /// * `level` - The log level
1201    /// * `message` - The message to log
1202    /// * `extra` - Additional key-value pairs to include
1203    fn log_with_level(&self, level: LogLevel, message: &str, extra: &[(&str, &str)]) {
1204        let mut log_info = self.create_log_info();
1205
1206        for (key, value) in extra {
1207            log_info = log_info.with_extra(*key, *value);
1208        }
1209
1210        let logger = self.logger.read().unwrap();
1211        match level {
1212            LogLevel::Debug => logger.debug(message, &log_info),
1213            LogLevel::Info => logger.info(message, &log_info),
1214            LogLevel::Warn => logger.warn(message, &log_info),
1215            LogLevel::Error => logger.error(message, &log_info),
1216        }
1217    }
1218
1219    /// Returns the original user input from the EXECUTION operation.
1220    ///
1221    /// This method deserializes the input payload from the EXECUTION operation's
1222    /// ExecutionDetails.InputPayload field into the requested type.
1223    ///
1224    /// # Type Parameters
1225    ///
1226    /// * `T` - The type to deserialize the input into. Must implement `DeserializeOwned`.
1227    ///
1228    /// # Returns
1229    ///
1230    /// `Ok(T)` if the input exists and can be deserialized, or a `DurableError` if:
1231    /// - No EXECUTION operation exists
1232    /// - No input payload is available
1233    /// - Deserialization fails
1234    ///
1235    /// # Example
1236    ///
1237    /// ```rust,ignore
1238    /// #[derive(Deserialize)]
1239    /// struct OrderEvent {
1240    ///     order_id: String,
1241    ///     amount: f64,
1242    /// }
1243    ///
1244    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1245    ///     // Get the original input that started this execution
1246    ///     let event: OrderEvent = ctx.get_original_input()?;
1247    ///     println!("Processing order: {}", event.order_id);
1248    ///     Ok(())
1249    /// }
1250    /// ```
1251    pub fn get_original_input<T>(&self) -> DurableResult<T>
1252    where
1253        T: serde::de::DeserializeOwned,
1254    {
1255        // Get the raw input payload from the execution state
1256        let input_payload = self.state.get_original_input_raw().ok_or_else(|| {
1257            crate::error::DurableError::Validation {
1258                message: "No original input available. The EXECUTION operation may not exist or has no input payload.".to_string(),
1259            }
1260        })?;
1261
1262        // Deserialize the input payload to the requested type
1263        serde_json::from_str(input_payload).map_err(|e| crate::error::DurableError::SerDes {
1264            message: format!("Failed to deserialize original input: {}", e),
1265        })
1266    }
1267
1268    /// Returns the raw original user input as a string, if available.
1269    ///
1270    /// This method returns the raw JSON string from the EXECUTION operation's
1271    /// ExecutionDetails.InputPayload field without deserializing it.
1272    ///
1273    /// # Returns
1274    ///
1275    /// `Some(&str)` if the input exists, `None` otherwise.
1276    pub fn get_original_input_raw(&self) -> Option<&str> {
1277        self.state.get_original_input_raw()
1278    }
1279
1280    /// Completes the execution with a successful result via checkpointing.
1281    ///
1282    /// This method checkpoints a SUCCEED action on the EXECUTION operation,
1283    /// which is useful for large results that exceed the Lambda response size limit (6MB).
1284    /// After calling this method, the Lambda function should return an empty result.
1285    ///
1286    /// # Arguments
1287    ///
1288    /// * `result` - The result to checkpoint. Must implement `Serialize`.
1289    ///
1290    /// # Returns
1291    ///
1292    /// `Ok(())` on success, or a `DurableError` if:
1293    /// - No EXECUTION operation exists
1294    /// - Serialization fails
1295    /// - The checkpoint fails
1296    ///
1297    /// # Example
1298    ///
1299    /// ```rust,ignore
1300    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1301    ///     let large_result = compute_large_result().await?;
1302    ///     
1303    ///     // Check if result would exceed Lambda response limit
1304    ///     if DurableExecutionInvocationOutput::would_exceed_max_size(&large_result) {
1305    ///         // Checkpoint the result instead of returning it
1306    ///         ctx.complete_execution_success(&large_result).await?;
1307    ///         // Return empty result - the actual result is checkpointed
1308    ///         return Ok(());
1309    ///     }
1310    ///     
1311    ///     Ok(())
1312    /// }
1313    /// ```
1314    pub async fn complete_execution_success<T>(&self, result: &T) -> DurableResult<()>
1315    where
1316        T: serde::Serialize,
1317    {
1318        let serialized =
1319            serde_json::to_string(result).map_err(|e| crate::error::DurableError::SerDes {
1320                message: format!("Failed to serialize execution result: {}", e),
1321            })?;
1322
1323        self.state
1324            .complete_execution_success(Some(serialized))
1325            .await
1326    }
1327
1328    /// Completes the execution with a failure via checkpointing.
1329    ///
1330    /// This method checkpoints a FAIL action on the EXECUTION operation.
1331    /// After calling this method, the Lambda function should return a FAILED status.
1332    ///
1333    /// # Arguments
1334    ///
1335    /// * `error` - The error details to checkpoint
1336    ///
1337    /// # Returns
1338    ///
1339    /// `Ok(())` on success, or a `DurableError` if:
1340    /// - No EXECUTION operation exists
1341    /// - The checkpoint fails
1342    ///
1343    /// # Example
1344    ///
1345    /// ```rust,ignore
1346    /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1347    ///     if let Err(e) = process_order().await {
1348    ///         // Checkpoint the failure
1349    ///         ctx.complete_execution_failure(ErrorObject::new("ProcessingError", &e.to_string())).await?;
1350    ///         return Err(DurableError::execution(&e.to_string()));
1351    ///     }
1352    ///     Ok(())
1353    /// }
1354    /// ```
1355    pub async fn complete_execution_failure(
1356        &self,
1357        error: crate::error::ErrorObject,
1358    ) -> DurableResult<()> {
1359        self.state.complete_execution_failure(error).await
1360    }
1361
1362    /// Completes the execution with a successful result, automatically handling large results.
1363    ///
1364    /// This method checks if the result would exceed the Lambda response size limit (6MB).
1365    /// If so, it checkpoints the result via the EXECUTION operation and returns `true`.
1366    /// If the result fits within the limit, it returns `false` and the caller should
1367    /// return the result normally.
1368    ///
1369    /// # Arguments
1370    ///
1371    /// * `result` - The result to potentially checkpoint. Must implement `Serialize`.
1372    ///
1373    /// # Returns
1374    ///
1375    /// `Ok(true)` if the result was checkpointed (caller should return empty result),
1376    /// `Ok(false)` if the result fits within limits (caller should return it normally),
1377    /// or a `DurableError` if checkpointing fails.
1378    ///
1379    /// # Example
1380    ///
1381    /// ```rust,ignore
1382    /// async fn my_workflow(ctx: DurableContext) -> Result<LargeResult, DurableError> {
1383    ///     let result = compute_result().await?;
1384    ///     
1385    ///     // Automatically handle large results
1386    ///     if ctx.complete_execution_if_large(&result).await? {
1387    ///         // Result was checkpointed, return a placeholder
1388    ///         // The actual result is stored in the EXECUTION operation
1389    ///         return Ok(LargeResult::default());
1390    ///     }
1391    ///     
1392    ///     // Result fits within limits, return normally
1393    ///     Ok(result)
1394    /// }
1395    /// ```
1396    pub async fn complete_execution_if_large<T>(&self, result: &T) -> DurableResult<bool>
1397    where
1398        T: serde::Serialize,
1399    {
1400        if crate::lambda::DurableExecutionInvocationOutput::would_exceed_max_size(result) {
1401            self.complete_execution_success(result).await?;
1402            Ok(true)
1403        } else {
1404            Ok(false)
1405        }
1406    }
1407
1408    // ========================================================================
1409    // Durable Operations
1410    // ========================================================================
1411
1412    /// Executes a step operation with automatic checkpointing.
1413    ///
1414    /// Steps are the fundamental unit of work in durable executions. Each step
1415    /// is checkpointed, allowing the workflow to resume from the last completed
1416    /// step after interruptions.
1417    ///
1418    /// # Arguments
1419    ///
1420    /// * `func` - The function to execute
1421    /// * `config` - Optional step configuration (retry strategy, semantics, serdes)
1422    ///
1423    /// # Returns
1424    ///
1425    /// The result of the step function, or an error if execution fails.
1426    ///
1427    /// # Example
1428    ///
1429    /// ```rust,ignore
1430    /// let result: i32 = ctx.step(|_step_ctx| {
1431    ///     Ok(42)
1432    /// }, None).await?;
1433    /// ```
1434    pub async fn step<T, F>(
1435        &self,
1436        func: F,
1437        config: Option<crate::config::StepConfig>,
1438    ) -> DurableResult<T>
1439    where
1440        T: DurableValue,
1441        F: StepFn<T>,
1442    {
1443        let op_id = self.next_operation_identifier(None);
1444        let config = config.unwrap_or_default();
1445
1446        let logger = self.logger.read().unwrap().clone();
1447        let result =
1448            crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1449
1450        // Track replay after completion
1451        if result.is_ok() {
1452            self.state.track_replay(&op_id.operation_id).await;
1453        }
1454
1455        result
1456    }
1457
1458    /// Executes a named step operation with automatic checkpointing.
1459    ///
1460    /// Same as `step`, but allows specifying a human-readable name for the operation.
1461    ///
1462    /// # Arguments
1463    ///
1464    /// * `name` - Human-readable name for the step
1465    /// * `func` - The function to execute
1466    /// * `config` - Optional step configuration
1467    ///
1468    /// # Example
1469    ///
1470    /// ```rust,ignore
1471    /// let result: i32 = ctx.step_named("validate_input", |_step_ctx| {
1472    ///     Ok(42)
1473    /// }, None).await?;
1474    /// ```
1475    pub async fn step_named<T, F>(
1476        &self,
1477        name: &str,
1478        func: F,
1479        config: Option<crate::config::StepConfig>,
1480    ) -> DurableResult<T>
1481    where
1482        T: DurableValue,
1483        F: StepFn<T>,
1484    {
1485        let op_id = self.next_operation_identifier(Some(name.to_string()));
1486        let config = config.unwrap_or_default();
1487
1488        let logger = self.logger.read().unwrap().clone();
1489        let result =
1490            crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1491
1492        // Track replay after completion
1493        if result.is_ok() {
1494            self.state.track_replay(&op_id.operation_id).await;
1495        }
1496
1497        result
1498    }
1499
1500    /// Pauses execution for a specified duration.
1501    ///
1502    /// Wait operations suspend the Lambda execution and resume after the
1503    /// specified duration has elapsed. This is efficient because it doesn't
1504    /// block Lambda resources during the wait.
1505    ///
1506    /// # Arguments
1507    ///
1508    /// * `duration` - The duration to wait (must be at least 1 second)
1509    /// * `name` - Optional human-readable name for the operation
1510    ///
1511    /// # Returns
1512    ///
1513    /// Ok(()) when the wait has elapsed, or an error if validation fails.
1514    ///
1515    /// # Example
1516    ///
1517    /// ```rust,ignore
1518    /// // Wait for 5 seconds
1519    /// ctx.wait(Duration::from_seconds(5), None).await?;
1520    ///
1521    /// // Wait with a name
1522    /// ctx.wait(Duration::from_minutes(1), Some("wait_for_processing")).await?;
1523    /// ```
1524    pub async fn wait(
1525        &self,
1526        duration: crate::duration::Duration,
1527        name: Option<&str>,
1528    ) -> DurableResult<()> {
1529        let op_id = self.next_operation_identifier(name.map(|s| s.to_string()));
1530
1531        let logger = self.logger.read().unwrap().clone();
1532        let result = crate::handlers::wait_handler(duration, &self.state, &op_id, &logger).await;
1533
1534        // Track replay after completion (only if not suspended)
1535        if result.is_ok() {
1536            self.state.track_replay(&op_id.operation_id).await;
1537        }
1538
1539        result
1540    }
1541
1542    /// Cancels an active wait operation.
1543    ///
1544    /// This method allows cancelling a wait operation that was previously started.
1545    /// If the wait has already completed (succeeded, failed, or timed out), this
1546    /// method will return Ok(()) without making any changes.
1547    ///
1548    /// # Arguments
1549    ///
1550    /// * `operation_id` - The operation ID of the wait to cancel
1551    ///
1552    /// # Returns
1553    ///
1554    /// Ok(()) if the wait was cancelled or was already completed, or an error if:
1555    /// - The operation doesn't exist
1556    /// - The operation is not a WAIT operation
1557    /// - The checkpoint fails
1558    ///
1559    /// # Example
1560    ///
1561    /// ```rust,ignore
1562    /// // Start a wait in a parallel branch
1563    /// let wait_op_id = ctx.next_operation_id();
1564    ///
1565    /// // Later, cancel the wait from another branch
1566    /// ctx.cancel_wait(&wait_op_id).await?;
1567    /// ```
1568    pub async fn cancel_wait(&self, operation_id: &str) -> DurableResult<()> {
1569        let logger = self.logger.read().unwrap().clone();
1570        crate::handlers::wait_cancel_handler(&self.state, operation_id, &logger).await
1571    }
1572
1573    /// Creates a callback and returns a handle to wait for the result.
1574    ///
1575    /// Callbacks allow external systems to signal completion of asynchronous
1576    /// operations. The callback ID can be shared with external systems, which
1577    /// can then call the Lambda durable execution callback API.
1578    ///
1579    /// # Arguments
1580    ///
1581    /// * `config` - Optional callback configuration (timeout, heartbeat)
1582    ///
1583    /// # Returns
1584    ///
1585    /// A `Callback<T>` handle that can be used to wait for the result.
1586    ///
1587    /// # Example
1588    ///
1589    /// ```rust,ignore
1590    /// let callback = ctx.create_callback::<ApprovalResponse>(None).await?;
1591    ///
1592    /// // Share callback.callback_id with external system
1593    /// notify_approver(&callback.callback_id).await?;
1594    ///
1595    /// // Wait for the callback result
1596    /// let approval = callback.result().await?;
1597    /// ```
1598    pub async fn create_callback<T>(
1599        &self,
1600        config: Option<crate::config::CallbackConfig>,
1601    ) -> DurableResult<crate::handlers::Callback<T>>
1602    where
1603        T: serde::Serialize + serde::de::DeserializeOwned,
1604    {
1605        let op_id = self.next_operation_identifier(None);
1606        let config = config.unwrap_or_default();
1607
1608        let logger = self.logger.read().unwrap().clone();
1609        let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1610
1611        // Track replay after completion
1612        if result.is_ok() {
1613            self.state.track_replay(&op_id.operation_id).await;
1614        }
1615
1616        result
1617    }
1618
1619    /// Creates a named callback and returns a handle to wait for the result.
1620    ///
1621    /// Same as `create_callback`, but allows specifying a human-readable name.
1622    pub async fn create_callback_named<T>(
1623        &self,
1624        name: &str,
1625        config: Option<crate::config::CallbackConfig>,
1626    ) -> DurableResult<crate::handlers::Callback<T>>
1627    where
1628        T: serde::Serialize + serde::de::DeserializeOwned,
1629    {
1630        let op_id = self.next_operation_identifier(Some(name.to_string()));
1631        let config = config.unwrap_or_default();
1632
1633        let logger = self.logger.read().unwrap().clone();
1634        let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1635
1636        // Track replay after completion
1637        if result.is_ok() {
1638            self.state.track_replay(&op_id.operation_id).await;
1639        }
1640
1641        result
1642    }
1643
1644    /// Invokes another durable Lambda function.
1645    ///
1646    /// This method calls another Lambda function and waits for its result.
1647    /// The invocation is checkpointed, so if the workflow is interrupted,
1648    /// it will resume with the result of the invocation.
1649    ///
1650    /// # Arguments
1651    ///
1652    /// * `function_name` - The name or ARN of the Lambda function to invoke
1653    /// * `payload` - The payload to send to the function
1654    /// * `config` - Optional invoke configuration (timeout, serdes)
1655    ///
1656    /// # Returns
1657    ///
1658    /// The result from the invoked function, or an error if invocation fails.
1659    ///
1660    /// # Example
1661    ///
1662    /// ```rust,ignore
1663    /// let result: ProcessingResult = ctx.invoke(
1664    ///     "process-order-function",
1665    ///     OrderPayload { order_id: "123".to_string() },
1666    ///     None,
1667    /// ).await?;
1668    /// ```
1669    pub async fn invoke<P, R>(
1670        &self,
1671        function_name: &str,
1672        payload: P,
1673        config: Option<crate::config::InvokeConfig<P, R>>,
1674    ) -> DurableResult<R>
1675    where
1676        P: serde::Serialize + serde::de::DeserializeOwned + Send,
1677        R: serde::Serialize + serde::de::DeserializeOwned + Send,
1678    {
1679        let op_id = self.next_operation_identifier(Some(format!("invoke:{}", function_name)));
1680        let config = config.unwrap_or_default();
1681
1682        let logger = self.logger.read().unwrap().clone();
1683        let result = crate::handlers::invoke_handler(
1684            function_name,
1685            payload,
1686            &self.state,
1687            &op_id,
1688            &config,
1689            &logger,
1690        )
1691        .await;
1692
1693        // Track replay after completion
1694        if result.is_ok() {
1695            self.state.track_replay(&op_id.operation_id).await;
1696        }
1697
1698        result
1699    }
1700
1701    /// Processes a collection in parallel with configurable concurrency.
1702    ///
1703    /// Map operations execute a function for each item in the collection,
1704    /// with configurable concurrency limits and failure tolerance.
1705    ///
1706    /// # Arguments
1707    ///
1708    /// * `items` - The collection of items to process
1709    /// * `func` - The function to apply to each item
1710    /// * `config` - Optional map configuration (concurrency, completion criteria)
1711    ///
1712    /// # Returns
1713    ///
1714    /// A `BatchResult<U>` containing results for all items.
1715    ///
1716    /// # Example
1717    ///
1718    /// ```rust,ignore
1719    /// let results = ctx.map(
1720    ///     vec![1, 2, 3, 4, 5],
1721    ///     |child_ctx, item, index| async move {
1722    ///         Ok(item * 2)
1723    ///     },
1724    ///     Some(MapConfig {
1725    ///         max_concurrency: Some(3),
1726    ///         ..Default::default()
1727    ///     }),
1728    /// ).await?;
1729    /// ```
1730    pub async fn map<T, U, F, Fut>(
1731        &self,
1732        items: Vec<T>,
1733        func: F,
1734        config: Option<crate::config::MapConfig>,
1735    ) -> DurableResult<crate::concurrency::BatchResult<U>>
1736    where
1737        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + Clone + 'static,
1738        U: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1739        F: Fn(DurableContext, T, usize) -> Fut + Send + Sync + Clone + 'static,
1740        Fut: std::future::Future<Output = DurableResult<U>> + Send + 'static,
1741    {
1742        let op_id = self.next_operation_identifier(Some("map".to_string()));
1743        let config = config.unwrap_or_default();
1744
1745        let logger = self.logger.read().unwrap().clone();
1746        let result =
1747            crate::handlers::map_handler(items, func, &self.state, &op_id, self, &config, &logger)
1748                .await;
1749
1750        // Track replay after completion
1751        if result.is_ok() {
1752            self.state.track_replay(&op_id.operation_id).await;
1753        }
1754
1755        result
1756    }
1757
1758    /// Executes multiple operations in parallel.
1759    ///
1760    /// Parallel operations execute multiple independent functions concurrently,
1761    /// with configurable concurrency limits and completion criteria.
1762    ///
1763    /// # Arguments
1764    ///
1765    /// * `branches` - The list of functions to execute in parallel
1766    /// * `config` - Optional parallel configuration (concurrency, completion criteria)
1767    ///
1768    /// # Returns
1769    ///
1770    /// A `BatchResult<T>` containing results for all branches.
1771    ///
1772    /// # Example
1773    ///
1774    /// ```rust,ignore
1775    /// let results = ctx.parallel(
1776    ///     vec![
1777    ///         |ctx| async move { Ok(fetch_data_a(&ctx).await?) },
1778    ///         |ctx| async move { Ok(fetch_data_b(&ctx).await?) },
1779    ///         |ctx| async move { Ok(fetch_data_c(&ctx).await?) },
1780    ///     ],
1781    ///     None,
1782    /// ).await?;
1783    /// ```
1784    pub async fn parallel<T, F, Fut>(
1785        &self,
1786        branches: Vec<F>,
1787        config: Option<crate::config::ParallelConfig>,
1788    ) -> DurableResult<crate::concurrency::BatchResult<T>>
1789    where
1790        T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1791        F: FnOnce(DurableContext) -> Fut + Send + 'static,
1792        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
1793    {
1794        let op_id = self.next_operation_identifier(Some("parallel".to_string()));
1795        let config = config.unwrap_or_default();
1796
1797        let logger = self.logger.read().unwrap().clone();
1798        let result = crate::handlers::parallel_handler(
1799            branches,
1800            &self.state,
1801            &op_id,
1802            self,
1803            &config,
1804            &logger,
1805        )
1806        .await;
1807
1808        // Track replay after completion
1809        if result.is_ok() {
1810            self.state.track_replay(&op_id.operation_id).await;
1811        }
1812
1813        result
1814    }
1815
1816    /// Executes a function in a child context.
1817    ///
1818    /// Child contexts provide isolation for nested workflows. Operations in
1819    /// a child context are tracked separately and can be checkpointed as a unit.
1820    ///
1821    /// # Arguments
1822    ///
1823    /// * `func` - The function to execute in the child context
1824    /// * `config` - Optional child context configuration
1825    ///
1826    /// # Returns
1827    ///
1828    /// The result of the child function, or an error if execution fails.
1829    ///
1830    /// # Example
1831    ///
1832    /// ```rust,ignore
1833    /// let result = ctx.run_in_child_context(|child_ctx| async move {
1834    ///     let step1 = child_ctx.step(|_| Ok(1), None).await?;
1835    ///     let step2 = child_ctx.step(|_| Ok(2), None).await?;
1836    ///     Ok(step1 + step2)
1837    /// }, None).await?;
1838    /// ```
1839    pub async fn run_in_child_context<T, F, Fut>(
1840        &self,
1841        func: F,
1842        config: Option<crate::config::ChildConfig>,
1843    ) -> DurableResult<T>
1844    where
1845        T: serde::Serialize + serde::de::DeserializeOwned + Send,
1846        F: FnOnce(DurableContext) -> Fut + Send,
1847        Fut: std::future::Future<Output = DurableResult<T>> + Send,
1848    {
1849        let op_id = self.next_operation_identifier(Some("child_context".to_string()));
1850        let config = config.unwrap_or_default();
1851
1852        let logger = self.logger.read().unwrap().clone();
1853        let result =
1854            crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
1855
1856        // Track replay after completion
1857        if result.is_ok() {
1858            self.state.track_replay(&op_id.operation_id).await;
1859        }
1860
1861        result
1862    }
1863
1864    /// Executes a named function in a child context.
1865    ///
1866    /// Same as `run_in_child_context`, but allows specifying a human-readable name.
1867    pub async fn run_in_child_context_named<T, F, Fut>(
1868        &self,
1869        name: &str,
1870        func: F,
1871        config: Option<crate::config::ChildConfig>,
1872    ) -> DurableResult<T>
1873    where
1874        T: serde::Serialize + serde::de::DeserializeOwned + Send,
1875        F: FnOnce(DurableContext) -> Fut + Send,
1876        Fut: std::future::Future<Output = DurableResult<T>> + Send,
1877    {
1878        let op_id = self.next_operation_identifier(Some(name.to_string()));
1879        let config = config.unwrap_or_default();
1880
1881        let logger = self.logger.read().unwrap().clone();
1882        let result =
1883            crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
1884
1885        // Track replay after completion
1886        if result.is_ok() {
1887            self.state.track_replay(&op_id.operation_id).await;
1888        }
1889
1890        result
1891    }
1892
1893    /// Polls until a condition is met.
1894    ///
1895    /// This method repeatedly checks a condition until it returns a successful
1896    /// result. Between checks, it waits for a configurable duration using the
1897    /// RETRY mechanism with NextAttemptDelaySeconds.
1898    ///
1899    /// # Implementation
1900    ///
1901    /// This method is implemented as a single STEP operation with RETRY mechanism,
1902    /// which is more efficient than using multiple steps and waits. The state is
1903    /// passed as Payload on retry (not Error), and the attempt number is tracked
1904    /// in StepDetails.Attempt.
1905    ///
1906    /// # Arguments
1907    ///
1908    /// * `check` - The function to check the condition
1909    /// * `config` - Configuration for the wait (interval, max attempts, timeout)
1910    ///
1911    /// # Returns
1912    ///
1913    /// The result when the condition is met, or an error if timeout/max attempts exceeded.
1914    ///
1915    /// # Example
1916    ///
1917    /// ```rust,ignore
1918    /// let result = ctx.wait_for_condition(
1919    ///     |state, ctx| {
1920    ///         // Check if order is ready
1921    ///         let status = check_order_status(&state.order_id)?;
1922    ///         if status == "ready" {
1923    ///             Ok(OrderReady { order_id: state.order_id.clone() })
1924    ///         } else {
1925    ///             Err("Order not ready yet".into())
1926    ///         }
1927    ///     },
1928    ///     WaitForConditionConfig::from_interval(
1929    ///         OrderState { order_id: "123".to_string() },
1930    ///         Duration::from_seconds(5),
1931    ///         Some(10),
1932    ///     ),
1933    /// ).await?;
1934    /// ```
1935    pub async fn wait_for_condition<T, S, F>(
1936        &self,
1937        check: F,
1938        config: WaitForConditionConfig<S>,
1939    ) -> DurableResult<T>
1940    where
1941        T: serde::Serialize + serde::de::DeserializeOwned + Send,
1942        S: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
1943        F: Fn(&S, &WaitForConditionContext) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
1944            + Send
1945            + Sync,
1946    {
1947        let op_id = self.next_operation_identifier(Some("wait_for_condition".to_string()));
1948
1949        let logger = self.logger.read().unwrap().clone();
1950        // Use the new wait_for_condition_handler which implements the single STEP with RETRY pattern
1951        // This is more efficient than the previous child context + multiple steps approach
1952        let result = crate::handlers::wait_for_condition_handler(
1953            check,
1954            config,
1955            &self.state,
1956            &op_id,
1957            &logger,
1958        )
1959        .await;
1960
1961        // Track replay after completion (only if not suspended)
1962        if result.is_ok() {
1963            self.state.track_replay(&op_id.operation_id).await;
1964        }
1965
1966        result
1967    }
1968
1969    /// Creates a callback and waits for the result with a submitter function.
1970    ///
1971    /// This is a convenience method that combines callback creation with
1972    /// a submitter function that sends the callback ID to an external system.
1973    /// The submitter execution is checkpointed within a child context to ensure
1974    /// replay safety - the submitter will not be re-executed during replay.
1975    ///
1976    /// # Arguments
1977    ///
1978    /// * `submitter` - Function that receives the callback ID and submits it to external system
1979    /// * `config` - Optional callback configuration (timeout, heartbeat)
1980    ///
1981    /// # Returns
1982    ///
1983    /// The callback result from the external system.
1984    ///
1985    /// # Example
1986    ///
1987    /// ```rust,ignore
1988    /// let approval = ctx.wait_for_callback(
1989    ///     |callback_id| async move {
1990    ///         // Send callback ID to approval system
1991    ///         send_approval_request(&callback_id, &request).await
1992    ///     },
1993    ///     Some(CallbackConfig {
1994    ///         timeout: Duration::from_hours(24),
1995    ///         ..Default::default()
1996    ///     }),
1997    /// ).await?;
1998    /// ```
1999    pub async fn wait_for_callback<T, F, Fut>(
2000        &self,
2001        submitter: F,
2002        config: Option<crate::config::CallbackConfig>,
2003    ) -> DurableResult<T>
2004    where
2005        T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync,
2006        F: FnOnce(String) -> Fut + Send + 'static,
2007        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
2008            + Send
2009            + 'static,
2010    {
2011        // Create the callback first with the provided configuration (Requirement 1.4)
2012        let callback: crate::handlers::Callback<T> = self.create_callback(config).await?;
2013        let callback_id = callback.callback_id.clone();
2014
2015        // Generate operation ID for the child context that will execute the submitter
2016        let op_id = self.next_operation_identifier(Some("wait_for_callback_submitter".to_string()));
2017
2018        // Execute the submitter within a child context for proper checkpointing (Requirements 1.1, 1.2)
2019        // The child context ensures the submitter execution is tracked and not re-executed during replay
2020        let child_config = crate::config::ChildConfig::default();
2021
2022        let logger = self.logger.read().unwrap().clone();
2023        crate::handlers::child_handler(
2024            |child_ctx| {
2025                let callback_id = callback_id.clone();
2026                async move {
2027                    // Use a step to checkpoint that we're about to execute the submitter
2028                    // This step returns () on success, indicating submission completed
2029                    child_ctx
2030                        .step_named(
2031                            "execute_submitter",
2032                            move |_| {
2033                                // The step just marks that we're executing the submitter
2034                                // The actual async submitter call happens after this checkpoint
2035                                Ok(())
2036                            },
2037                            None,
2038                        )
2039                        .await?;
2040
2041                    // Now execute the actual submitter
2042                    // If we're replaying and the step above succeeded, we won't reach here
2043                    // because the child context will return the checkpointed result
2044                    submitter(callback_id).await.map_err(|e| {
2045                        crate::error::DurableError::UserCode {
2046                            message: e.to_string(),
2047                            error_type: "SubmitterError".to_string(),
2048                            stack_trace: None,
2049                        }
2050                    })?;
2051
2052                    Ok(())
2053                }
2054            },
2055            &self.state,
2056            &op_id,
2057            self,
2058            &child_config,
2059            &logger,
2060        )
2061        .await?;
2062
2063        // Track replay after child context completion
2064        self.state.track_replay(&op_id.operation_id).await;
2065
2066        // Wait for the callback result
2067        callback.result().await
2068    }
2069
2070    // ========================================================================
2071    // Promise Combinators
2072    // ========================================================================
2073
2074    /// Waits for all futures to complete successfully.
2075    ///
2076    /// Returns all results if all futures succeed, or returns the first error encountered.
2077    /// This is implemented within a STEP operation for durability.
2078    ///
2079    /// # Arguments
2080    ///
2081    /// * `futures` - Vector of futures to execute
2082    ///
2083    /// # Returns
2084    ///
2085    /// `Ok(Vec<T>)` if all futures succeed, or `Err` with the first error.
2086    ///
2087    /// # Example
2088    ///
2089    /// ```rust,ignore
2090    /// let results = ctx.all(vec![
2091    ///     ctx.step(|_| Ok(1), None),
2092    ///     ctx.step(|_| Ok(2), None),
2093    ///     ctx.step(|_| Ok(3), None),
2094    /// ]).await?;
2095    /// assert_eq!(results, vec![1, 2, 3]);
2096    /// ```
2097    pub async fn all<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<Vec<T>>
2098    where
2099        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2100        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2101    {
2102        let op_id = self.next_operation_identifier(Some("all".to_string()));
2103
2104        let logger = self.logger.read().unwrap().clone();
2105        let result = crate::handlers::all_handler(futures, &self.state, &op_id, &logger).await;
2106
2107        // Track replay after completion
2108        if result.is_ok() {
2109            self.state.track_replay(&op_id.operation_id).await;
2110        }
2111
2112        result
2113    }
2114
2115    /// Waits for all futures to settle (success or failure).
2116    ///
2117    /// Returns a BatchResult containing outcomes for all futures, regardless of success or failure.
2118    /// This is implemented within a STEP operation for durability.
2119    ///
2120    /// # Arguments
2121    ///
2122    /// * `futures` - Vector of futures to execute
2123    ///
2124    /// # Returns
2125    ///
2126    /// `BatchResult<T>` containing results for all futures.
2127    ///
2128    /// # Example
2129    ///
2130    /// ```rust,ignore
2131    /// let results = ctx.all_settled(vec![
2132    ///     ctx.step(|_| Ok(1), None),
2133    ///     ctx.step(|_| Err("error".into()), None),
2134    ///     ctx.step(|_| Ok(3), None),
2135    /// ]).await?;
2136    /// assert_eq!(results.success_count(), 2);
2137    /// assert_eq!(results.failure_count(), 1);
2138    /// ```
2139    pub async fn all_settled<T, Fut>(
2140        &self,
2141        futures: Vec<Fut>,
2142    ) -> DurableResult<crate::concurrency::BatchResult<T>>
2143    where
2144        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2145        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2146    {
2147        let op_id = self.next_operation_identifier(Some("all_settled".to_string()));
2148
2149        let logger = self.logger.read().unwrap().clone();
2150        let result =
2151            crate::handlers::all_settled_handler(futures, &self.state, &op_id, &logger).await;
2152
2153        // Track replay after completion
2154        if result.is_ok() {
2155            self.state.track_replay(&op_id.operation_id).await;
2156        }
2157
2158        result
2159    }
2160
2161    /// Returns the result of the first future to settle.
2162    ///
2163    /// Returns the result (success or failure) of whichever future completes first.
2164    /// This is implemented within a STEP operation for durability.
2165    ///
2166    /// # Arguments
2167    ///
2168    /// * `futures` - Vector of futures to execute
2169    ///
2170    /// # Returns
2171    ///
2172    /// The result of the first future to settle.
2173    ///
2174    /// # Example
2175    ///
2176    /// ```rust,ignore
2177    /// let result = ctx.race(vec![
2178    ///     ctx.step(|_| Ok(1), None),
2179    ///     ctx.step(|_| Ok(2), None),
2180    /// ]).await?;
2181    /// // result is either 1 or 2, whichever completed first
2182    /// ```
2183    pub async fn race<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2184    where
2185        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2186        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2187    {
2188        let op_id = self.next_operation_identifier(Some("race".to_string()));
2189
2190        let logger = self.logger.read().unwrap().clone();
2191        let result = crate::handlers::race_handler(futures, &self.state, &op_id, &logger).await;
2192
2193        // Track replay after completion
2194        if result.is_ok() {
2195            self.state.track_replay(&op_id.operation_id).await;
2196        }
2197
2198        result
2199    }
2200
2201    /// Returns the result of the first future to succeed.
2202    ///
2203    /// Returns the result of the first future to succeed. If all futures fail,
2204    /// returns an error containing all the failures.
2205    /// This is implemented within a STEP operation for durability.
2206    ///
2207    /// # Arguments
2208    ///
2209    /// * `futures` - Vector of futures to execute
2210    ///
2211    /// # Returns
2212    ///
2213    /// The result of the first future to succeed, or an error if all fail.
2214    ///
2215    /// # Example
2216    ///
2217    /// ```rust,ignore
2218    /// let result = ctx.any(vec![
2219    ///     ctx.step(|_| Err("error".into()), None),
2220    ///     ctx.step(|_| Ok(2), None),
2221    ///     ctx.step(|_| Ok(3), None),
2222    /// ]).await?;
2223    /// // result is either 2 or 3, whichever succeeded first
2224    /// ```
2225    pub async fn any<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2226    where
2227        T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2228        Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2229    {
2230        let op_id = self.next_operation_identifier(Some("any".to_string()));
2231
2232        let logger = self.logger.read().unwrap().clone();
2233        let result = crate::handlers::any_handler(futures, &self.state, &op_id, &logger).await;
2234
2235        // Track replay after completion
2236        if result.is_ok() {
2237            self.state.track_replay(&op_id.operation_id).await;
2238        }
2239
2240        result
2241    }
2242}
2243
2244/// Configuration for wait_for_condition operations.
2245#[allow(clippy::type_complexity)]
2246pub struct WaitForConditionConfig<S> {
2247    /// Initial state to pass to the check function.
2248    pub initial_state: S,
2249    /// Wait strategy function that determines polling behavior.
2250    ///
2251    /// Takes a reference to the current state and the attempt number (1-indexed),
2252    /// and returns a [`WaitDecision`](crate::config::WaitDecision).
2253    pub wait_strategy: Box<dyn Fn(&S, usize) -> crate::config::WaitDecision + Send + Sync>,
2254    /// Overall timeout for the operation.
2255    pub timeout: Option<crate::duration::Duration>,
2256    /// Optional custom serializer/deserializer.
2257    pub serdes: Option<std::sync::Arc<dyn crate::config::SerDesAny>>,
2258}
2259
2260impl<S> std::fmt::Debug for WaitForConditionConfig<S>
2261where
2262    S: std::fmt::Debug,
2263{
2264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2265        f.debug_struct("WaitForConditionConfig")
2266            .field("initial_state", &self.initial_state)
2267            .field("wait_strategy", &"<fn>")
2268            .field("timeout", &self.timeout)
2269            .field("serdes", &self.serdes.is_some())
2270            .finish()
2271    }
2272}
2273
2274impl<S> WaitForConditionConfig<S> {
2275    /// Creates a backward-compatible `WaitForConditionConfig` from interval and max_attempts.
2276    ///
2277    /// This constructor converts the old-style `interval` + `max_attempts` parameters
2278    /// into a wait strategy that uses a fixed delay (no backoff, no jitter).
2279    ///
2280    /// # Arguments
2281    ///
2282    /// * `initial_state` - Initial state to pass to the check function.
2283    /// * `interval` - Fixed interval between condition checks.
2284    /// * `max_attempts` - Maximum number of attempts (`None` for unlimited).
2285    ///
2286    /// # Example
2287    ///
2288    /// ```rust,ignore
2289    /// use durable_execution_sdk::{WaitForConditionConfig, Duration};
2290    ///
2291    /// let config = WaitForConditionConfig::from_interval(
2292    ///     my_initial_state,
2293    ///     Duration::from_seconds(5),
2294    ///     Some(10),
2295    /// );
2296    /// ```
2297    pub fn from_interval(
2298        initial_state: S,
2299        interval: crate::duration::Duration,
2300        max_attempts: Option<usize>,
2301    ) -> Self
2302    where
2303        S: Send + Sync + 'static,
2304    {
2305        let interval_secs = interval.to_seconds();
2306        let max = max_attempts.unwrap_or(usize::MAX);
2307
2308        Self {
2309            initial_state,
2310            wait_strategy: Box::new(move |_state: &S, attempts_made: usize| {
2311                // In the backward-compatible mode, the check function's Ok/Err
2312                // determines whether polling is done. The strategy only provides
2313                // the delay and enforces max_attempts.
2314                // Return Done when max_attempts exceeded (handler will checkpoint failure
2315                // if check returned Err, or success if check returned Ok).
2316                if attempts_made >= max {
2317                    return crate::config::WaitDecision::Done;
2318                }
2319                crate::config::WaitDecision::Continue {
2320                    delay: crate::duration::Duration::from_seconds(interval_secs),
2321                }
2322            }),
2323            timeout: None,
2324            serdes: None,
2325        }
2326    }
2327}
2328
2329impl<S: Default + Send + Sync + 'static> Default for WaitForConditionConfig<S> {
2330    fn default() -> Self {
2331        Self::from_interval(
2332            S::default(),
2333            crate::duration::Duration::from_seconds(5),
2334            None,
2335        )
2336    }
2337}
2338
2339/// Context provided to wait_for_condition check functions.
2340#[derive(Debug, Clone)]
2341pub struct WaitForConditionContext {
2342    /// Current attempt number (1-indexed).
2343    pub attempt: usize,
2344    /// Maximum number of attempts (None for unlimited).
2345    pub max_attempts: Option<usize>,
2346}
2347
2348impl Clone for DurableContext {
2349    fn clone(&self) -> Self {
2350        Self {
2351            state: self.state.clone(),
2352            lambda_context: self.lambda_context.clone(),
2353            parent_id: self.parent_id.clone(),
2354            id_generator: self.id_generator.clone(),
2355            logger: self.logger.clone(),
2356        }
2357    }
2358}
2359
2360impl std::fmt::Debug for DurableContext {
2361    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2362        f.debug_struct("DurableContext")
2363            .field("durable_execution_arn", &self.durable_execution_arn())
2364            .field("parent_id", &self.parent_id)
2365            .field("step_counter", &self.current_step_counter())
2366            .finish_non_exhaustive()
2367    }
2368}
2369
2370// Add hex encoding dependency
2371mod hex {
2372    const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
2373
2374    pub fn encode(bytes: &[u8]) -> String {
2375        let mut result = String::with_capacity(bytes.len() * 2);
2376        for &byte in bytes {
2377            result.push(HEX_CHARS[(byte >> 4) as usize] as char);
2378            result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
2379        }
2380        result
2381    }
2382}
2383
2384#[cfg(test)]
2385mod tests {
2386    use super::*;
2387
2388    #[test]
2389    fn test_operation_identifier_new() {
2390        let id = OperationIdentifier::new(
2391            "op-123",
2392            Some("parent-456".to_string()),
2393            Some("my-step".to_string()),
2394        );
2395        assert_eq!(id.operation_id, "op-123");
2396        assert_eq!(id.parent_id, Some("parent-456".to_string()));
2397        assert_eq!(id.name, Some("my-step".to_string()));
2398    }
2399
2400    #[test]
2401    fn test_operation_identifier_root() {
2402        let id = OperationIdentifier::root("op-123");
2403        assert_eq!(id.operation_id, "op-123");
2404        assert!(id.parent_id.is_none());
2405        assert!(id.name.is_none());
2406    }
2407
2408    #[test]
2409    fn test_operation_identifier_with_parent() {
2410        let id = OperationIdentifier::with_parent("op-123", "parent-456");
2411        assert_eq!(id.operation_id, "op-123");
2412        assert_eq!(id.parent_id, Some("parent-456".to_string()));
2413        assert!(id.name.is_none());
2414    }
2415
2416    #[test]
2417    fn test_operation_identifier_with_name() {
2418        let id = OperationIdentifier::root("op-123").with_name("my-step");
2419        assert_eq!(id.name, Some("my-step".to_string()));
2420    }
2421
2422    #[test]
2423    fn test_operation_identifier_display() {
2424        let id = OperationIdentifier::root("op-123");
2425        assert_eq!(format!("{}", id), "op-123");
2426
2427        let id_with_name = OperationIdentifier::root("op-123").with_name("my-step");
2428        assert_eq!(format!("{}", id_with_name), "my-step(op-123)");
2429    }
2430
2431    #[test]
2432    fn test_generate_operation_id_deterministic() {
2433        let id1 = generate_operation_id("base-123", 0);
2434        let id2 = generate_operation_id("base-123", 0);
2435        assert_eq!(id1, id2);
2436    }
2437
2438    #[test]
2439    fn test_generate_operation_id_different_counters() {
2440        let id1 = generate_operation_id("base-123", 0);
2441        let id2 = generate_operation_id("base-123", 1);
2442        assert_ne!(id1, id2);
2443    }
2444
2445    #[test]
2446    fn test_generate_operation_id_different_bases() {
2447        let id1 = generate_operation_id("base-123", 0);
2448        let id2 = generate_operation_id("base-456", 0);
2449        assert_ne!(id1, id2);
2450    }
2451
2452    #[test]
2453    fn test_generate_operation_id_length() {
2454        let id = generate_operation_id("base-123", 0);
2455        assert_eq!(id.len(), 32); // 16 bytes = 32 hex chars
2456    }
2457
2458    #[test]
2459    fn test_operation_id_generator_new() {
2460        let gen = OperationIdGenerator::new("base-123");
2461        assert_eq!(gen.base_id(), "base-123");
2462        assert_eq!(gen.current_counter(), 0);
2463    }
2464
2465    #[test]
2466    fn test_operation_id_generator_with_counter() {
2467        let gen = OperationIdGenerator::with_counter("base-123", 10);
2468        assert_eq!(gen.current_counter(), 10);
2469    }
2470
2471    #[test]
2472    fn test_operation_id_generator_next_id() {
2473        let gen = OperationIdGenerator::new("base-123");
2474
2475        let id1 = gen.next_id();
2476        assert_eq!(gen.current_counter(), 1);
2477
2478        let id2 = gen.next_id();
2479        assert_eq!(gen.current_counter(), 2);
2480
2481        assert_ne!(id1, id2);
2482    }
2483
2484    #[test]
2485    fn test_operation_id_generator_id_for_counter() {
2486        let gen = OperationIdGenerator::new("base-123");
2487
2488        let id_0 = gen.id_for_counter(0);
2489        let id_1 = gen.id_for_counter(1);
2490
2491        // id_for_counter should not increment the counter
2492        assert_eq!(gen.current_counter(), 0);
2493
2494        // Should match what next_id would produce
2495        let next = gen.next_id();
2496        assert_eq!(next, id_0);
2497
2498        let next = gen.next_id();
2499        assert_eq!(next, id_1);
2500    }
2501
2502    #[test]
2503    fn test_operation_id_generator_create_child() {
2504        let gen = OperationIdGenerator::new("base-123");
2505        gen.next_id(); // Increment parent counter
2506
2507        let child = gen.create_child("child-op-id");
2508        assert_eq!(child.base_id(), "child-op-id");
2509        assert_eq!(child.current_counter(), 0);
2510    }
2511
2512    #[test]
2513    fn test_operation_id_generator_clone() {
2514        let gen = OperationIdGenerator::new("base-123");
2515        gen.next_id();
2516        gen.next_id();
2517
2518        let cloned = gen.clone();
2519        assert_eq!(cloned.base_id(), gen.base_id());
2520        assert_eq!(cloned.current_counter(), gen.current_counter());
2521    }
2522
2523    #[test]
2524    fn test_operation_id_generator_thread_safety() {
2525        use std::thread;
2526
2527        let gen = Arc::new(OperationIdGenerator::new("base-123"));
2528        let mut handles = vec![];
2529
2530        for _ in 0..10 {
2531            let gen_clone = gen.clone();
2532            handles.push(thread::spawn(move || {
2533                let mut ids = vec![];
2534                for _ in 0..100 {
2535                    ids.push(gen_clone.next_id());
2536                }
2537                ids
2538            }));
2539        }
2540
2541        let mut all_ids = vec![];
2542        for handle in handles {
2543            all_ids.extend(handle.join().unwrap());
2544        }
2545
2546        // All IDs should be unique
2547        let unique_count = {
2548            let mut set = std::collections::HashSet::new();
2549            for id in &all_ids {
2550                set.insert(id.clone());
2551            }
2552            set.len()
2553        };
2554
2555        assert_eq!(unique_count, 1000);
2556        assert_eq!(gen.current_counter(), 1000);
2557    }
2558
2559    #[test]
2560    fn test_log_info_new() {
2561        let info = LogInfo::new("arn:test");
2562        assert_eq!(info.durable_execution_arn, Some("arn:test".to_string()));
2563        assert!(info.operation_id.is_none());
2564        assert!(info.parent_id.is_none());
2565    }
2566
2567    #[test]
2568    fn test_log_info_with_operation_id() {
2569        let info = LogInfo::new("arn:test").with_operation_id("op-123");
2570        assert_eq!(info.operation_id, Some("op-123".to_string()));
2571    }
2572
2573    #[test]
2574    fn test_log_info_with_parent_id() {
2575        let info = LogInfo::new("arn:test").with_parent_id("parent-456");
2576        assert_eq!(info.parent_id, Some("parent-456".to_string()));
2577    }
2578
2579    #[test]
2580    fn test_log_info_with_extra() {
2581        let info = LogInfo::new("arn:test")
2582            .with_extra("key1", "value1")
2583            .with_extra("key2", "value2");
2584        assert_eq!(info.extra.len(), 2);
2585        assert_eq!(info.extra[0], ("key1".to_string(), "value1".to_string()));
2586    }
2587
2588    #[test]
2589    fn test_hex_encode() {
2590        assert_eq!(hex::encode(&[0x00]), "00");
2591        assert_eq!(hex::encode(&[0xff]), "ff");
2592        assert_eq!(hex::encode(&[0x12, 0x34, 0xab, 0xcd]), "1234abcd");
2593    }
2594}
2595
2596#[cfg(test)]
2597mod durable_context_tests {
2598    use super::*;
2599    use crate::client::SharedDurableServiceClient;
2600    use crate::lambda::InitialExecutionState;
2601    use std::sync::Arc;
2602
2603    #[cfg(test)]
2604    fn create_mock_client() -> SharedDurableServiceClient {
2605        use crate::client::MockDurableServiceClient;
2606        Arc::new(MockDurableServiceClient::new())
2607    }
2608
2609    fn create_test_state() -> Arc<ExecutionState> {
2610        let client = create_mock_client();
2611        Arc::new(ExecutionState::new(
2612            "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
2613            "token-123",
2614            InitialExecutionState::new(),
2615            client,
2616        ))
2617    }
2618
2619    #[test]
2620    fn test_durable_context_new() {
2621        let state = create_test_state();
2622        let ctx = DurableContext::new(state.clone());
2623
2624        assert_eq!(
2625            ctx.durable_execution_arn(),
2626            "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123"
2627        );
2628        assert!(ctx.parent_id().is_none());
2629        assert!(ctx.lambda_context().is_none());
2630        assert_eq!(ctx.current_step_counter(), 0);
2631    }
2632
2633    #[test]
2634    fn test_durable_context_next_operation_id() {
2635        let state = create_test_state();
2636        let ctx = DurableContext::new(state);
2637
2638        let id1 = ctx.next_operation_id();
2639        let id2 = ctx.next_operation_id();
2640
2641        assert_ne!(id1, id2);
2642        assert_eq!(ctx.current_step_counter(), 2);
2643    }
2644
2645    #[test]
2646    fn test_durable_context_next_operation_identifier() {
2647        let state = create_test_state();
2648        let ctx = DurableContext::new(state);
2649
2650        let op_id = ctx.next_operation_identifier(Some("my-step".to_string()));
2651
2652        assert!(op_id.parent_id.is_none());
2653        assert_eq!(op_id.name, Some("my-step".to_string()));
2654        assert!(!op_id.operation_id.is_empty());
2655    }
2656
2657    #[test]
2658    fn test_durable_context_create_child_context() {
2659        let state = create_test_state();
2660        let ctx = DurableContext::new(state);
2661
2662        // Generate a parent operation ID
2663        let parent_op_id = ctx.next_operation_id();
2664
2665        // Create child context
2666        let child_ctx = ctx.create_child_context(&parent_op_id);
2667
2668        assert_eq!(child_ctx.parent_id(), Some(parent_op_id.as_str()));
2669        assert_eq!(child_ctx.current_step_counter(), 0);
2670        assert_eq!(
2671            child_ctx.durable_execution_arn(),
2672            ctx.durable_execution_arn()
2673        );
2674    }
2675
2676    #[test]
2677    fn test_durable_context_child_generates_different_ids() {
2678        let state = create_test_state();
2679        let ctx = DurableContext::new(state);
2680
2681        let parent_op_id = ctx.next_operation_id();
2682        let child_ctx = ctx.create_child_context(&parent_op_id);
2683
2684        // Child should generate different IDs than parent
2685        let child_id = child_ctx.next_operation_id();
2686        let parent_id = ctx.next_operation_id();
2687
2688        assert_ne!(child_id, parent_id);
2689    }
2690
2691    #[test]
2692    fn test_durable_context_child_operation_identifier_has_parent() {
2693        let state = create_test_state();
2694        let ctx = DurableContext::new(state);
2695
2696        let parent_op_id = ctx.next_operation_id();
2697        let child_ctx = ctx.create_child_context(&parent_op_id);
2698
2699        let child_op_id = child_ctx.next_operation_identifier(None);
2700
2701        assert_eq!(child_op_id.parent_id, Some(parent_op_id));
2702    }
2703
2704    #[test]
2705    fn test_durable_context_set_logger() {
2706        let state = create_test_state();
2707        let mut ctx = DurableContext::new(state);
2708
2709        // Create a custom logger
2710        let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2711        ctx.set_logger(custom_logger);
2712
2713        // Just verify it doesn't panic - the logger is set
2714        let _ = ctx.logger();
2715    }
2716
2717    #[test]
2718    fn test_durable_context_with_logger() {
2719        let state = create_test_state();
2720        let ctx = DurableContext::new(state);
2721
2722        let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2723        let ctx_with_logger = ctx.with_logger(custom_logger);
2724
2725        // Just verify it doesn't panic - the logger is set
2726        let _ = ctx_with_logger.logger();
2727    }
2728
2729    #[test]
2730    fn test_durable_context_create_log_info() {
2731        let state = create_test_state();
2732        let ctx = DurableContext::new(state);
2733
2734        let info = ctx.create_log_info();
2735
2736        assert_eq!(
2737            info.durable_execution_arn,
2738            Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
2739        );
2740        assert!(info.parent_id.is_none());
2741    }
2742
2743    #[test]
2744    fn test_durable_context_create_log_info_with_parent() {
2745        let state = create_test_state();
2746        let ctx = DurableContext::new(state);
2747
2748        let parent_op_id = ctx.next_operation_id();
2749        let child_ctx = ctx.create_child_context(&parent_op_id);
2750
2751        let info = child_ctx.create_log_info();
2752
2753        assert_eq!(info.parent_id, Some(parent_op_id));
2754    }
2755
2756    #[test]
2757    fn test_durable_context_create_log_info_with_operation() {
2758        let state = create_test_state();
2759        let ctx = DurableContext::new(state);
2760
2761        let info = ctx.create_log_info_with_operation("op-123");
2762
2763        assert_eq!(info.operation_id, Some("op-123".to_string()));
2764    }
2765
2766    #[test]
2767    fn test_log_info_with_replay() {
2768        let info = LogInfo::new("arn:test")
2769            .with_operation_id("op-123")
2770            .with_replay(true);
2771
2772        assert!(info.is_replay);
2773        assert_eq!(info.operation_id, Some("op-123".to_string()));
2774    }
2775
2776    #[test]
2777    fn test_log_info_default_not_replay() {
2778        let info = LogInfo::default();
2779        assert!(!info.is_replay);
2780    }
2781
2782    #[test]
2783    fn test_replay_logging_config_default() {
2784        let config = ReplayLoggingConfig::default();
2785        assert_eq!(config, ReplayLoggingConfig::SuppressAll);
2786    }
2787
2788    #[test]
2789    fn test_replay_aware_logger_suppress_all() {
2790        use std::sync::atomic::{AtomicUsize, Ordering};
2791
2792        // Create counters for each log level
2793        let debug_count = Arc::new(AtomicUsize::new(0));
2794        let info_count = Arc::new(AtomicUsize::new(0));
2795        let warn_count = Arc::new(AtomicUsize::new(0));
2796        let error_count = Arc::new(AtomicUsize::new(0));
2797
2798        let inner = Arc::new(custom_logger(
2799            {
2800                let count = debug_count.clone();
2801                move |_, _| {
2802                    count.fetch_add(1, Ordering::SeqCst);
2803                }
2804            },
2805            {
2806                let count = info_count.clone();
2807                move |_, _| {
2808                    count.fetch_add(1, Ordering::SeqCst);
2809                }
2810            },
2811            {
2812                let count = warn_count.clone();
2813                move |_, _| {
2814                    count.fetch_add(1, Ordering::SeqCst);
2815                }
2816            },
2817            {
2818                let count = error_count.clone();
2819                move |_, _| {
2820                    count.fetch_add(1, Ordering::SeqCst);
2821                }
2822            },
2823        ));
2824
2825        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
2826
2827        // Non-replay logs should pass through
2828        let non_replay_info = LogInfo::new("arn:test").with_replay(false);
2829        logger.debug("test", &non_replay_info);
2830        logger.info("test", &non_replay_info);
2831        logger.warn("test", &non_replay_info);
2832        logger.error("test", &non_replay_info);
2833
2834        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
2835        assert_eq!(info_count.load(Ordering::SeqCst), 1);
2836        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2837        assert_eq!(error_count.load(Ordering::SeqCst), 1);
2838
2839        // Replay logs should be suppressed
2840        let replay_info = LogInfo::new("arn:test").with_replay(true);
2841        logger.debug("test", &replay_info);
2842        logger.info("test", &replay_info);
2843        logger.warn("test", &replay_info);
2844        logger.error("test", &replay_info);
2845
2846        // Counts should not have increased
2847        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
2848        assert_eq!(info_count.load(Ordering::SeqCst), 1);
2849        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2850        assert_eq!(error_count.load(Ordering::SeqCst), 1);
2851    }
2852
2853    #[test]
2854    fn test_replay_aware_logger_allow_all() {
2855        use std::sync::atomic::{AtomicUsize, Ordering};
2856
2857        let call_count = Arc::new(AtomicUsize::new(0));
2858
2859        let inner = Arc::new(custom_logger(
2860            {
2861                let count = call_count.clone();
2862                move |_, _| {
2863                    count.fetch_add(1, Ordering::SeqCst);
2864                }
2865            },
2866            {
2867                let count = call_count.clone();
2868                move |_, _| {
2869                    count.fetch_add(1, Ordering::SeqCst);
2870                }
2871            },
2872            {
2873                let count = call_count.clone();
2874                move |_, _| {
2875                    count.fetch_add(1, Ordering::SeqCst);
2876                }
2877            },
2878            {
2879                let count = call_count.clone();
2880                move |_, _| {
2881                    count.fetch_add(1, Ordering::SeqCst);
2882                }
2883            },
2884        ));
2885
2886        let logger = ReplayAwareLogger::allow_all(inner);
2887
2888        // All logs should pass through even during replay
2889        let replay_info = LogInfo::new("arn:test").with_replay(true);
2890        logger.debug("test", &replay_info);
2891        logger.info("test", &replay_info);
2892        logger.warn("test", &replay_info);
2893        logger.error("test", &replay_info);
2894
2895        assert_eq!(call_count.load(Ordering::SeqCst), 4);
2896    }
2897
2898    #[test]
2899    fn test_replay_aware_logger_errors_only() {
2900        use std::sync::atomic::{AtomicUsize, Ordering};
2901
2902        let debug_count = Arc::new(AtomicUsize::new(0));
2903        let info_count = Arc::new(AtomicUsize::new(0));
2904        let warn_count = Arc::new(AtomicUsize::new(0));
2905        let error_count = Arc::new(AtomicUsize::new(0));
2906
2907        let inner = Arc::new(custom_logger(
2908            {
2909                let count = debug_count.clone();
2910                move |_, _| {
2911                    count.fetch_add(1, Ordering::SeqCst);
2912                }
2913            },
2914            {
2915                let count = info_count.clone();
2916                move |_, _| {
2917                    count.fetch_add(1, Ordering::SeqCst);
2918                }
2919            },
2920            {
2921                let count = warn_count.clone();
2922                move |_, _| {
2923                    count.fetch_add(1, Ordering::SeqCst);
2924                }
2925            },
2926            {
2927                let count = error_count.clone();
2928                move |_, _| {
2929                    count.fetch_add(1, Ordering::SeqCst);
2930                }
2931            },
2932        ));
2933
2934        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::ErrorsOnly);
2935
2936        // During replay, only errors should pass through
2937        let replay_info = LogInfo::new("arn:test").with_replay(true);
2938        logger.debug("test", &replay_info);
2939        logger.info("test", &replay_info);
2940        logger.warn("test", &replay_info);
2941        logger.error("test", &replay_info);
2942
2943        assert_eq!(debug_count.load(Ordering::SeqCst), 0);
2944        assert_eq!(info_count.load(Ordering::SeqCst), 0);
2945        assert_eq!(warn_count.load(Ordering::SeqCst), 0);
2946        assert_eq!(error_count.load(Ordering::SeqCst), 1);
2947    }
2948
2949    #[test]
2950    fn test_replay_aware_logger_warnings_and_errors() {
2951        use std::sync::atomic::{AtomicUsize, Ordering};
2952
2953        let debug_count = Arc::new(AtomicUsize::new(0));
2954        let info_count = Arc::new(AtomicUsize::new(0));
2955        let warn_count = Arc::new(AtomicUsize::new(0));
2956        let error_count = Arc::new(AtomicUsize::new(0));
2957
2958        let inner = Arc::new(custom_logger(
2959            {
2960                let count = debug_count.clone();
2961                move |_, _| {
2962                    count.fetch_add(1, Ordering::SeqCst);
2963                }
2964            },
2965            {
2966                let count = info_count.clone();
2967                move |_, _| {
2968                    count.fetch_add(1, Ordering::SeqCst);
2969                }
2970            },
2971            {
2972                let count = warn_count.clone();
2973                move |_, _| {
2974                    count.fetch_add(1, Ordering::SeqCst);
2975                }
2976            },
2977            {
2978                let count = error_count.clone();
2979                move |_, _| {
2980                    count.fetch_add(1, Ordering::SeqCst);
2981                }
2982            },
2983        ));
2984
2985        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::WarningsAndErrors);
2986
2987        // During replay, only warnings and errors should pass through
2988        let replay_info = LogInfo::new("arn:test").with_replay(true);
2989        logger.debug("test", &replay_info);
2990        logger.info("test", &replay_info);
2991        logger.warn("test", &replay_info);
2992        logger.error("test", &replay_info);
2993
2994        assert_eq!(debug_count.load(Ordering::SeqCst), 0);
2995        assert_eq!(info_count.load(Ordering::SeqCst), 0);
2996        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
2997        assert_eq!(error_count.load(Ordering::SeqCst), 1);
2998    }
2999
3000    #[test]
3001    fn test_replay_aware_logger_suppress_replay_constructor() {
3002        let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3003        let logger = ReplayAwareLogger::suppress_replay(inner);
3004
3005        assert_eq!(logger.config(), ReplayLoggingConfig::SuppressAll);
3006    }
3007
3008    #[test]
3009    fn test_replay_aware_logger_debug() {
3010        let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3011        let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
3012
3013        let debug_str = format!("{:?}", logger);
3014        assert!(debug_str.contains("ReplayAwareLogger"));
3015        assert!(debug_str.contains("SuppressAll"));
3016    }
3017
3018    #[test]
3019    fn test_durable_context_create_log_info_with_replay_method() {
3020        let state = create_test_state();
3021        let ctx = DurableContext::new(state);
3022
3023        let info = ctx.create_log_info_with_replay("op-123", true);
3024
3025        assert_eq!(info.operation_id, Some("op-123".to_string()));
3026        assert!(info.is_replay);
3027    }
3028
3029    #[test]
3030    fn test_durable_context_clone() {
3031        let state = create_test_state();
3032        let ctx = DurableContext::new(state);
3033
3034        ctx.next_operation_id();
3035        ctx.next_operation_id();
3036
3037        let cloned = ctx.clone();
3038
3039        assert_eq!(cloned.durable_execution_arn(), ctx.durable_execution_arn());
3040        assert_eq!(cloned.current_step_counter(), ctx.current_step_counter());
3041    }
3042
3043    #[test]
3044    fn test_durable_context_debug() {
3045        let state = create_test_state();
3046        let ctx = DurableContext::new(state);
3047
3048        let debug_str = format!("{:?}", ctx);
3049
3050        assert!(debug_str.contains("DurableContext"));
3051        assert!(debug_str.contains("durable_execution_arn"));
3052    }
3053
3054    #[test]
3055    fn test_durable_context_state_access() {
3056        let state = create_test_state();
3057        let ctx = DurableContext::new(state.clone());
3058
3059        // Verify we can access the state
3060        assert!(Arc::ptr_eq(&ctx.state(), &state));
3061    }
3062
3063    #[test]
3064    fn test_durable_context_send_sync() {
3065        // This test verifies at compile time that DurableContext is Send + Sync
3066        fn assert_send_sync<T: Send + Sync>() {}
3067        assert_send_sync::<DurableContext>();
3068    }
3069
3070    // ========================================================================
3071    // Simplified Logging API Tests
3072    // ========================================================================
3073
3074    #[test]
3075    fn test_log_info_method() {
3076        use std::sync::atomic::{AtomicUsize, Ordering};
3077        use std::sync::Mutex;
3078
3079        let info_count = Arc::new(AtomicUsize::new(0));
3080        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3081
3082        let captured_info_clone = captured_info.clone();
3083        let inner = Arc::new(custom_logger(
3084            |_, _| {},
3085            {
3086                let count = info_count.clone();
3087                move |_, info: &LogInfo| {
3088                    count.fetch_add(1, Ordering::SeqCst);
3089                    *captured_info_clone.lock().unwrap() = Some(info.clone());
3090                }
3091            },
3092            |_, _| {},
3093            |_, _| {},
3094        ));
3095
3096        let state = create_test_state();
3097        let ctx = DurableContext::new(state).with_logger(inner);
3098
3099        ctx.log_info("Test message");
3100
3101        assert_eq!(info_count.load(Ordering::SeqCst), 1);
3102
3103        let captured = captured_info.lock().unwrap();
3104        let info = captured.as_ref().unwrap();
3105        assert_eq!(
3106            info.durable_execution_arn,
3107            Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
3108        );
3109    }
3110
3111    #[test]
3112    fn test_log_debug_method() {
3113        use std::sync::atomic::{AtomicUsize, Ordering};
3114
3115        let debug_count = Arc::new(AtomicUsize::new(0));
3116
3117        let inner = Arc::new(custom_logger(
3118            {
3119                let count = debug_count.clone();
3120                move |_, _| {
3121                    count.fetch_add(1, Ordering::SeqCst);
3122                }
3123            },
3124            |_, _| {},
3125            |_, _| {},
3126            |_, _| {},
3127        ));
3128
3129        let state = create_test_state();
3130        let ctx = DurableContext::new(state).with_logger(inner);
3131
3132        ctx.log_debug("Debug message");
3133
3134        assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3135    }
3136
3137    #[test]
3138    fn test_log_warn_method() {
3139        use std::sync::atomic::{AtomicUsize, Ordering};
3140
3141        let warn_count = Arc::new(AtomicUsize::new(0));
3142
3143        let inner = Arc::new(custom_logger(
3144            |_, _| {},
3145            |_, _| {},
3146            {
3147                let count = warn_count.clone();
3148                move |_, _| {
3149                    count.fetch_add(1, Ordering::SeqCst);
3150                }
3151            },
3152            |_, _| {},
3153        ));
3154
3155        let state = create_test_state();
3156        let ctx = DurableContext::new(state).with_logger(inner);
3157
3158        ctx.log_warn("Warning message");
3159
3160        assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3161    }
3162
3163    #[test]
3164    fn test_log_error_method() {
3165        use std::sync::atomic::{AtomicUsize, Ordering};
3166
3167        let error_count = Arc::new(AtomicUsize::new(0));
3168
3169        let inner = Arc::new(custom_logger(|_, _| {}, |_, _| {}, |_, _| {}, {
3170            let count = error_count.clone();
3171            move |_, _| {
3172                count.fetch_add(1, Ordering::SeqCst);
3173            }
3174        }));
3175
3176        let state = create_test_state();
3177        let ctx = DurableContext::new(state).with_logger(inner);
3178
3179        ctx.log_error("Error message");
3180
3181        assert_eq!(error_count.load(Ordering::SeqCst), 1);
3182    }
3183
3184    #[test]
3185    fn test_log_info_with_extra_fields() {
3186        use std::sync::Mutex;
3187
3188        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3189
3190        let captured_info_clone = captured_info.clone();
3191        let inner = Arc::new(custom_logger(
3192            |_, _| {},
3193            move |_, info: &LogInfo| {
3194                *captured_info_clone.lock().unwrap() = Some(info.clone());
3195            },
3196            |_, _| {},
3197            |_, _| {},
3198        ));
3199
3200        let state = create_test_state();
3201        let ctx = DurableContext::new(state).with_logger(inner);
3202
3203        ctx.log_info_with("Test message", &[("order_id", "123"), ("amount", "99.99")]);
3204
3205        let captured = captured_info.lock().unwrap();
3206        let info = captured.as_ref().unwrap();
3207
3208        // Verify extra fields are present
3209        assert_eq!(info.extra.len(), 2);
3210        assert!(info
3211            .extra
3212            .contains(&("order_id".to_string(), "123".to_string())));
3213        assert!(info
3214            .extra
3215            .contains(&("amount".to_string(), "99.99".to_string())));
3216    }
3217
3218    #[test]
3219    fn test_log_debug_with_extra_fields() {
3220        use std::sync::Mutex;
3221
3222        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3223
3224        let captured_info_clone = captured_info.clone();
3225        let inner = Arc::new(custom_logger(
3226            move |_, info: &LogInfo| {
3227                *captured_info_clone.lock().unwrap() = Some(info.clone());
3228            },
3229            |_, _| {},
3230            |_, _| {},
3231            |_, _| {},
3232        ));
3233
3234        let state = create_test_state();
3235        let ctx = DurableContext::new(state).with_logger(inner);
3236
3237        ctx.log_debug_with("Debug message", &[("key", "value")]);
3238
3239        let captured = captured_info.lock().unwrap();
3240        let info = captured.as_ref().unwrap();
3241
3242        assert_eq!(info.extra.len(), 1);
3243        assert!(info
3244            .extra
3245            .contains(&("key".to_string(), "value".to_string())));
3246    }
3247
3248    #[test]
3249    fn test_log_warn_with_extra_fields() {
3250        use std::sync::Mutex;
3251
3252        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3253
3254        let captured_info_clone = captured_info.clone();
3255        let inner = Arc::new(custom_logger(
3256            |_, _| {},
3257            |_, _| {},
3258            move |_, info: &LogInfo| {
3259                *captured_info_clone.lock().unwrap() = Some(info.clone());
3260            },
3261            |_, _| {},
3262        ));
3263
3264        let state = create_test_state();
3265        let ctx = DurableContext::new(state).with_logger(inner);
3266
3267        ctx.log_warn_with("Warning message", &[("retry", "3")]);
3268
3269        let captured = captured_info.lock().unwrap();
3270        let info = captured.as_ref().unwrap();
3271
3272        assert_eq!(info.extra.len(), 1);
3273        assert!(info.extra.contains(&("retry".to_string(), "3".to_string())));
3274    }
3275
3276    #[test]
3277    fn test_log_error_with_extra_fields() {
3278        use std::sync::Mutex;
3279
3280        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3281
3282        let captured_info_clone = captured_info.clone();
3283        let inner = Arc::new(custom_logger(
3284            |_, _| {},
3285            |_, _| {},
3286            |_, _| {},
3287            move |_, info: &LogInfo| {
3288                *captured_info_clone.lock().unwrap() = Some(info.clone());
3289            },
3290        ));
3291
3292        let state = create_test_state();
3293        let ctx = DurableContext::new(state).with_logger(inner);
3294
3295        ctx.log_error_with(
3296            "Error message",
3297            &[("error_code", "E001"), ("details", "Something went wrong")],
3298        );
3299
3300        let captured = captured_info.lock().unwrap();
3301        let info = captured.as_ref().unwrap();
3302
3303        assert_eq!(info.extra.len(), 2);
3304        assert!(info
3305            .extra
3306            .contains(&("error_code".to_string(), "E001".to_string())));
3307        assert!(info
3308            .extra
3309            .contains(&("details".to_string(), "Something went wrong".to_string())));
3310    }
3311
3312    #[test]
3313    fn test_log_methods_include_parent_id_in_child_context() {
3314        use std::sync::Mutex;
3315
3316        let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3317
3318        let captured_info_clone = captured_info.clone();
3319        let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3320            |_, _| {},
3321            move |_, info: &LogInfo| {
3322                *captured_info_clone.lock().unwrap() = Some(info.clone());
3323            },
3324            |_, _| {},
3325            |_, _| {},
3326        ));
3327
3328        let state = create_test_state();
3329        let ctx = DurableContext::new(state).with_logger(inner.clone());
3330
3331        let parent_op_id = ctx.next_operation_id();
3332        let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3333
3334        child_ctx.log_info("Child context message");
3335
3336        let captured = captured_info.lock().unwrap();
3337        let info = captured.as_ref().unwrap();
3338
3339        // Verify parent_id is included
3340        assert_eq!(info.parent_id, Some(parent_op_id));
3341    }
3342
3343    // ========================================================================
3344    // Runtime Logger Reconfiguration Tests (Req 13.1, 13.2)
3345    // ========================================================================
3346
3347    #[test]
3348    fn test_configure_logger_swaps_logger() {
3349        // Req 13.1: configure_logger swaps the logger, subsequent calls use new logger
3350        use std::sync::atomic::{AtomicUsize, Ordering};
3351
3352        let original_count = Arc::new(AtomicUsize::new(0));
3353        let new_count = Arc::new(AtomicUsize::new(0));
3354
3355        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3356            |_, _| {},
3357            {
3358                let count = original_count.clone();
3359                move |_, _| {
3360                    count.fetch_add(1, Ordering::SeqCst);
3361                }
3362            },
3363            |_, _| {},
3364            |_, _| {},
3365        ));
3366
3367        let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3368            |_, _| {},
3369            {
3370                let count = new_count.clone();
3371                move |_, _| {
3372                    count.fetch_add(1, Ordering::SeqCst);
3373                }
3374            },
3375            |_, _| {},
3376            |_, _| {},
3377        ));
3378
3379        let state = create_test_state();
3380        let ctx = DurableContext::new(state).with_logger(original_logger);
3381
3382        // Log with original logger
3383        ctx.log_info("before swap");
3384        assert_eq!(original_count.load(Ordering::SeqCst), 1);
3385        assert_eq!(new_count.load(Ordering::SeqCst), 0);
3386
3387        // Swap logger at runtime (note: &self, not &mut self)
3388        ctx.configure_logger(new_logger);
3389
3390        // Subsequent calls use the new logger
3391        ctx.log_info("after swap");
3392        assert_eq!(original_count.load(Ordering::SeqCst), 1); // unchanged
3393        assert_eq!(new_count.load(Ordering::SeqCst), 1);
3394    }
3395
3396    #[test]
3397    fn test_original_logger_used_when_configure_logger_not_called() {
3398        // Req 13.2: Original logger used when configure_logger not called
3399        use std::sync::atomic::{AtomicUsize, Ordering};
3400
3401        let original_count = Arc::new(AtomicUsize::new(0));
3402
3403        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3404            |_, _| {},
3405            {
3406                let count = original_count.clone();
3407                move |_, _| {
3408                    count.fetch_add(1, Ordering::SeqCst);
3409                }
3410            },
3411            |_, _| {},
3412            |_, _| {},
3413        ));
3414
3415        let state = create_test_state();
3416        let ctx = DurableContext::new(state).with_logger(original_logger);
3417
3418        // Log multiple times without calling configure_logger
3419        ctx.log_info("message 1");
3420        ctx.log_info("message 2");
3421        ctx.log_info("message 3");
3422
3423        assert_eq!(original_count.load(Ordering::SeqCst), 3);
3424    }
3425
3426    #[test]
3427    fn test_configure_logger_affects_child_contexts() {
3428        // Verify that child contexts sharing the same RwLock see the swapped logger
3429        use std::sync::atomic::{AtomicUsize, Ordering};
3430
3431        let original_count = Arc::new(AtomicUsize::new(0));
3432        let new_count = Arc::new(AtomicUsize::new(0));
3433
3434        let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3435            |_, _| {},
3436            {
3437                let count = original_count.clone();
3438                move |_, _| {
3439                    count.fetch_add(1, Ordering::SeqCst);
3440                }
3441            },
3442            |_, _| {},
3443            |_, _| {},
3444        ));
3445
3446        let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3447            |_, _| {},
3448            {
3449                let count = new_count.clone();
3450                move |_, _| {
3451                    count.fetch_add(1, Ordering::SeqCst);
3452                }
3453            },
3454            |_, _| {},
3455            |_, _| {},
3456        ));
3457
3458        let state = create_test_state();
3459        let ctx = DurableContext::new(state).with_logger(original_logger);
3460        let parent_op_id = ctx.next_operation_id();
3461        let child_ctx = ctx.create_child_context(&parent_op_id);
3462
3463        // Child uses original logger
3464        child_ctx.log_info("child before swap");
3465        assert_eq!(original_count.load(Ordering::SeqCst), 1);
3466
3467        // Swap on parent
3468        ctx.configure_logger(new_logger);
3469
3470        // Child now uses the new logger (shared RwLock)
3471        child_ctx.log_info("child after swap");
3472        assert_eq!(new_count.load(Ordering::SeqCst), 1);
3473        assert_eq!(original_count.load(Ordering::SeqCst), 1); // unchanged
3474    }
3475}
3476
3477#[cfg(test)]
3478mod property_tests {
3479    use super::*;
3480    use proptest::prelude::*;
3481
3482    // Property 3: Operation ID Determinism
3483    // *For any* DurableContext with a given parent_id and step counter state,
3484    // calling create_operation_id() SHALL produce the same ID when called
3485    // in the same sequence position across multiple executions.
3486    // **Validates: Requirements 1.10**
3487    proptest! {
3488        #![proptest_config(ProptestConfig::with_cases(100))]
3489
3490        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism
3491        /// Validates: Requirements 1.10
3492        #[test]
3493        fn prop_operation_id_determinism(
3494            base_id in "[a-zA-Z0-9:/-]{1,100}",
3495            counter in 0u64..10000u64,
3496        ) {
3497            // Generate the same ID twice with the same inputs
3498            let id1 = generate_operation_id(&base_id, counter);
3499            let id2 = generate_operation_id(&base_id, counter);
3500
3501            // IDs must be identical for the same inputs
3502            prop_assert_eq!(&id1, &id2, "Same base_id and counter must produce identical IDs");
3503
3504            // ID must be a valid hex string of expected length
3505            prop_assert_eq!(id1.len(), 32, "ID must be 32 hex characters");
3506            prop_assert!(id1.chars().all(|c| c.is_ascii_hexdigit()), "ID must be valid hex");
3507        }
3508
3509        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism (Generator)
3510        /// Validates: Requirements 1.10
3511        #[test]
3512        fn prop_operation_id_generator_determinism(
3513            base_id in "[a-zA-Z0-9:/-]{1,100}",
3514            num_ids in 1usize..50usize,
3515        ) {
3516            // Create two generators with the same base ID
3517            let gen1 = OperationIdGenerator::new(&base_id);
3518            let gen2 = OperationIdGenerator::new(&base_id);
3519
3520            // Generate the same sequence of IDs from both
3521            let ids1: Vec<String> = (0..num_ids).map(|_| gen1.next_id()).collect();
3522            let ids2: Vec<String> = (0..num_ids).map(|_| gen2.next_id()).collect();
3523
3524            // Sequences must be identical
3525            prop_assert_eq!(&ids1, &ids2, "Same generator sequence must produce identical IDs");
3526
3527            // All IDs in a sequence must be unique
3528            let unique_count = {
3529                let mut set = std::collections::HashSet::new();
3530                for id in &ids1 {
3531                    set.insert(id.clone());
3532                }
3533                set.len()
3534            };
3535            prop_assert_eq!(unique_count, num_ids, "All IDs in sequence must be unique");
3536        }
3537
3538        /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism (Replay Simulation)
3539        /// Validates: Requirements 1.10
3540        #[test]
3541        fn prop_operation_id_replay_determinism(
3542            base_id in "[a-zA-Z0-9:/-]{1,100}",
3543            operations in prop::collection::vec(0u32..3u32, 1..20),
3544        ) {
3545            // Simulate two executions with the same sequence of operations
3546            // Each operation type increments the counter
3547
3548            let gen1 = OperationIdGenerator::new(&base_id);
3549            let gen2 = OperationIdGenerator::new(&base_id);
3550
3551            let mut ids1 = Vec::new();
3552            let mut ids2 = Vec::new();
3553
3554            // First "execution"
3555            for op_type in &operations {
3556                // Each operation generates an ID
3557                ids1.push(gen1.next_id());
3558
3559                // Some operations might generate additional child IDs
3560                if *op_type == 2 {
3561                    let parent_id = ids1.last().unwrap().clone();
3562                    let child_gen = gen1.create_child(parent_id);
3563                    ids1.push(child_gen.next_id());
3564                }
3565            }
3566
3567            // Second "execution" (replay) - must produce same IDs
3568            for op_type in &operations {
3569                ids2.push(gen2.next_id());
3570
3571                if *op_type == 2 {
3572                    let parent_id = ids2.last().unwrap().clone();
3573                    let child_gen = gen2.create_child(parent_id);
3574                    ids2.push(child_gen.next_id());
3575                }
3576            }
3577
3578            // Both executions must produce identical ID sequences
3579            prop_assert_eq!(&ids1, &ids2, "Replay must produce identical operation IDs");
3580        }
3581    }
3582
3583    // Property 5: Concurrent ID Generation Uniqueness
3584    // *For any* number of concurrent tasks generating operation IDs from the same
3585    // DurableContext, all generated IDs SHALL be unique.
3586    // **Validates: Requirements 17.3**
3587    mod concurrent_id_tests {
3588        use super::*;
3589        use std::sync::Arc;
3590        use std::thread;
3591
3592        proptest! {
3593            #![proptest_config(ProptestConfig::with_cases(100))]
3594
3595            /// Feature: durable-execution-rust-sdk, Property 5: Concurrent ID Generation Uniqueness
3596            /// Validates: Requirements 17.3
3597            #[test]
3598            fn prop_concurrent_id_uniqueness(
3599                base_id in "[a-zA-Z0-9:/-]{1,100}",
3600                num_threads in 2usize..10usize,
3601                ids_per_thread in 10usize..100usize,
3602            ) {
3603                let gen = Arc::new(OperationIdGenerator::new(&base_id));
3604                let mut handles = vec![];
3605
3606                // Spawn multiple threads that concurrently generate IDs
3607                for _ in 0..num_threads {
3608                    let gen_clone = gen.clone();
3609                    let count = ids_per_thread;
3610                    handles.push(thread::spawn(move || {
3611                        let mut ids = Vec::with_capacity(count);
3612                        for _ in 0..count {
3613                            ids.push(gen_clone.next_id());
3614                        }
3615                        ids
3616                    }));
3617                }
3618
3619                // Collect all IDs from all threads
3620                let mut all_ids = Vec::new();
3621                for handle in handles {
3622                    all_ids.extend(handle.join().unwrap());
3623                }
3624
3625                let total_expected = num_threads * ids_per_thread;
3626
3627                // Verify we got the expected number of IDs
3628                prop_assert_eq!(all_ids.len(), total_expected, "Should have generated {} IDs", total_expected);
3629
3630                // Verify all IDs are unique
3631                let unique_count = {
3632                    let mut set = std::collections::HashSet::new();
3633                    for id in &all_ids {
3634                        set.insert(id.clone());
3635                    }
3636                    set.len()
3637                };
3638
3639                prop_assert_eq!(
3640                    unique_count,
3641                    total_expected,
3642                    "All {} IDs must be unique, but only {} were unique",
3643                    total_expected,
3644                    unique_count
3645                );
3646
3647                // Verify the counter was incremented correctly
3648                prop_assert_eq!(
3649                    gen.current_counter() as usize,
3650                    total_expected,
3651                    "Counter should equal total IDs generated"
3652                );
3653            }
3654
3655            /// Feature: durable-execution-rust-sdk, Property 5: Concurrent ID Generation Uniqueness (Stress)
3656            /// Validates: Requirements 17.3
3657            #[test]
3658            fn prop_concurrent_id_uniqueness_stress(
3659                base_id in "[a-zA-Z0-9:/-]{1,50}",
3660            ) {
3661                // Fixed high-concurrency stress test
3662                let num_threads = 20;
3663                let ids_per_thread = 500;
3664
3665                let gen = Arc::new(OperationIdGenerator::new(&base_id));
3666                let mut handles = vec![];
3667
3668                for _ in 0..num_threads {
3669                    let gen_clone = gen.clone();
3670                    handles.push(thread::spawn(move || {
3671                        let mut ids = Vec::with_capacity(ids_per_thread);
3672                        for _ in 0..ids_per_thread {
3673                            ids.push(gen_clone.next_id());
3674                        }
3675                        ids
3676                    }));
3677                }
3678
3679                let mut all_ids = Vec::new();
3680                for handle in handles {
3681                    all_ids.extend(handle.join().unwrap());
3682                }
3683
3684                let total_expected = num_threads * ids_per_thread;
3685
3686                // Verify all IDs are unique
3687                let unique_count = {
3688                    let mut set = std::collections::HashSet::new();
3689                    for id in &all_ids {
3690                        set.insert(id.clone());
3691                    }
3692                    set.len()
3693                };
3694
3695                prop_assert_eq!(
3696                    unique_count,
3697                    total_expected,
3698                    "All {} IDs must be unique under high concurrency",
3699                    total_expected
3700                );
3701            }
3702        }
3703    }
3704
3705    // Property 9: Logging Methods Automatic Context
3706    // *For any* call to log_info, log_debug, log_warn, or log_error on DurableContext,
3707    // the resulting log message SHALL include durable_execution_arn and parent_id
3708    // (when available) without the caller needing to specify them.
3709    // **Validates: Requirements 4.5**
3710    mod logging_automatic_context_tests {
3711        use super::*;
3712        use std::sync::{Arc, Mutex};
3713
3714        proptest! {
3715            #![proptest_config(ProptestConfig::with_cases(100))]
3716
3717            /// Feature: sdk-ergonomics-improvements, Property 9: Logging Methods Automatic Context
3718            /// Validates: Requirements 4.5
3719            #[test]
3720            fn prop_logging_automatic_context(
3721                message in "[a-zA-Z0-9 ]{1,100}",
3722                log_level in 0u8..4u8,
3723            ) {
3724                use crate::client::MockDurableServiceClient;
3725                use crate::lambda::InitialExecutionState;
3726
3727                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3728                let captured_info_clone = captured_info.clone();
3729
3730                // Create a logger that captures the LogInfo
3731                let inner = Arc::new(custom_logger(
3732                    {
3733                        let captured = captured_info_clone.clone();
3734                        move |_, info: &LogInfo| {
3735                            *captured.lock().unwrap() = Some(info.clone());
3736                        }
3737                    },
3738                    {
3739                        let captured = captured_info_clone.clone();
3740                        move |_, info: &LogInfo| {
3741                            *captured.lock().unwrap() = Some(info.clone());
3742                        }
3743                    },
3744                    {
3745                        let captured = captured_info_clone.clone();
3746                        move |_, info: &LogInfo| {
3747                            *captured.lock().unwrap() = Some(info.clone());
3748                        }
3749                    },
3750                    {
3751                        let captured = captured_info_clone.clone();
3752                        move |_, info: &LogInfo| {
3753                            *captured.lock().unwrap() = Some(info.clone());
3754                        }
3755                    },
3756                ));
3757
3758                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3759                let state = Arc::new(ExecutionState::new(
3760                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3761                    "token-123",
3762                    InitialExecutionState::new(),
3763                    client,
3764                ));
3765                let ctx = DurableContext::new(state).with_logger(inner);
3766
3767                // Call the appropriate log method based on log_level
3768                match log_level {
3769                    0 => ctx.log_debug(&message),
3770                    1 => ctx.log_info(&message),
3771                    2 => ctx.log_warn(&message),
3772                    _ => ctx.log_error(&message),
3773                }
3774
3775                // Verify the captured LogInfo has the automatic context
3776                let captured = captured_info.lock().unwrap();
3777                let info = captured.as_ref().expect("LogInfo should be captured");
3778
3779                // durable_execution_arn must always be present
3780                prop_assert!(
3781                    info.durable_execution_arn.is_some(),
3782                    "durable_execution_arn must be automatically included"
3783                );
3784                prop_assert_eq!(
3785                    info.durable_execution_arn.as_ref().unwrap(),
3786                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3787                    "durable_execution_arn must match the context's ARN"
3788                );
3789            }
3790
3791            /// Feature: sdk-ergonomics-improvements, Property 9: Logging Methods Automatic Context (Child Context)
3792            /// Validates: Requirements 4.5
3793            #[test]
3794            fn prop_logging_automatic_context_child(
3795                message in "[a-zA-Z0-9 ]{1,100}",
3796                log_level in 0u8..4u8,
3797            ) {
3798                use crate::client::MockDurableServiceClient;
3799                use crate::lambda::InitialExecutionState;
3800
3801                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3802                let captured_info_clone = captured_info.clone();
3803
3804                // Create a logger that captures the LogInfo
3805                let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3806                    {
3807                        let captured = captured_info_clone.clone();
3808                        move |_, info: &LogInfo| {
3809                            *captured.lock().unwrap() = Some(info.clone());
3810                        }
3811                    },
3812                    {
3813                        let captured = captured_info_clone.clone();
3814                        move |_, info: &LogInfo| {
3815                            *captured.lock().unwrap() = Some(info.clone());
3816                        }
3817                    },
3818                    {
3819                        let captured = captured_info_clone.clone();
3820                        move |_, info: &LogInfo| {
3821                            *captured.lock().unwrap() = Some(info.clone());
3822                        }
3823                    },
3824                    {
3825                        let captured = captured_info_clone.clone();
3826                        move |_, info: &LogInfo| {
3827                            *captured.lock().unwrap() = Some(info.clone());
3828                        }
3829                    },
3830                ));
3831
3832                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3833                let state = Arc::new(ExecutionState::new(
3834                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3835                    "token-123",
3836                    InitialExecutionState::new(),
3837                    client,
3838                ));
3839                let ctx = DurableContext::new(state).with_logger(inner.clone());
3840
3841                // Create a child context
3842                let parent_op_id = ctx.next_operation_id();
3843                let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3844
3845                // Call the appropriate log method based on log_level
3846                match log_level {
3847                    0 => child_ctx.log_debug(&message),
3848                    1 => child_ctx.log_info(&message),
3849                    2 => child_ctx.log_warn(&message),
3850                    _ => child_ctx.log_error(&message),
3851                }
3852
3853                // Verify the captured LogInfo has the automatic context including parent_id
3854                let captured = captured_info.lock().unwrap();
3855                let info = captured.as_ref().expect("LogInfo should be captured");
3856
3857                // durable_execution_arn must always be present
3858                prop_assert!(
3859                    info.durable_execution_arn.is_some(),
3860                    "durable_execution_arn must be automatically included in child context"
3861                );
3862
3863                // parent_id must be present in child context
3864                prop_assert!(
3865                    info.parent_id.is_some(),
3866                    "parent_id must be automatically included in child context"
3867                );
3868                prop_assert_eq!(
3869                    info.parent_id.as_ref().unwrap(),
3870                    &parent_op_id,
3871                    "parent_id must match the parent operation ID"
3872                );
3873            }
3874        }
3875    }
3876
3877    // Property 10: Logging Methods Extra Fields
3878    // *For any* call to log_info_with, log_debug_with, log_warn_with, or log_error_with
3879    // with extra fields, those fields SHALL appear in the resulting log output.
3880    // **Validates: Requirements 4.6**
3881    mod logging_extra_fields_tests {
3882        use super::*;
3883        use std::sync::{Arc, Mutex};
3884
3885        proptest! {
3886            #![proptest_config(ProptestConfig::with_cases(100))]
3887
3888            /// Feature: sdk-ergonomics-improvements, Property 10: Logging Methods Extra Fields
3889            /// Validates: Requirements 4.6
3890            #[test]
3891            fn prop_logging_extra_fields(
3892                message in "[a-zA-Z0-9 ]{1,100}",
3893                log_level in 0u8..4u8,
3894                key1 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
3895                value1 in "[a-zA-Z0-9]{1,50}",
3896                key2 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
3897                value2 in "[a-zA-Z0-9]{1,50}",
3898            ) {
3899                use crate::client::MockDurableServiceClient;
3900                use crate::lambda::InitialExecutionState;
3901
3902                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3903                let captured_info_clone = captured_info.clone();
3904
3905                // Create a logger that captures the LogInfo
3906                let inner = Arc::new(custom_logger(
3907                    {
3908                        let captured = captured_info_clone.clone();
3909                        move |_, info: &LogInfo| {
3910                            *captured.lock().unwrap() = Some(info.clone());
3911                        }
3912                    },
3913                    {
3914                        let captured = captured_info_clone.clone();
3915                        move |_, info: &LogInfo| {
3916                            *captured.lock().unwrap() = Some(info.clone());
3917                        }
3918                    },
3919                    {
3920                        let captured = captured_info_clone.clone();
3921                        move |_, info: &LogInfo| {
3922                            *captured.lock().unwrap() = Some(info.clone());
3923                        }
3924                    },
3925                    {
3926                        let captured = captured_info_clone.clone();
3927                        move |_, info: &LogInfo| {
3928                            *captured.lock().unwrap() = Some(info.clone());
3929                        }
3930                    },
3931                ));
3932
3933                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3934                let state = Arc::new(ExecutionState::new(
3935                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3936                    "token-123",
3937                    InitialExecutionState::new(),
3938                    client,
3939                ));
3940                let ctx = DurableContext::new(state).with_logger(inner);
3941
3942                // Create extra fields
3943                let fields: Vec<(&str, &str)> = vec![(&key1, &value1), (&key2, &value2)];
3944
3945                // Call the appropriate log method based on log_level
3946                match log_level {
3947                    0 => ctx.log_debug_with(&message, &fields),
3948                    1 => ctx.log_info_with(&message, &fields),
3949                    2 => ctx.log_warn_with(&message, &fields),
3950                    _ => ctx.log_error_with(&message, &fields),
3951                }
3952
3953                // Verify the captured LogInfo has the extra fields
3954                let captured = captured_info.lock().unwrap();
3955                let info = captured.as_ref().expect("LogInfo should be captured");
3956
3957                // Extra fields must be present
3958                prop_assert_eq!(
3959                    info.extra.len(),
3960                    2,
3961                    "Extra fields must be included in the log output"
3962                );
3963
3964                // Verify each field is present
3965                prop_assert!(
3966                    info.extra.contains(&(key1.clone(), value1.clone())),
3967                    "First extra field must be present: {}={}", key1, value1
3968                );
3969                prop_assert!(
3970                    info.extra.contains(&(key2.clone(), value2.clone())),
3971                    "Second extra field must be present: {}={}", key2, value2
3972                );
3973            }
3974
3975            /// Feature: sdk-ergonomics-improvements, Property 10: Logging Methods Extra Fields (Empty)
3976            /// Validates: Requirements 4.6
3977            #[test]
3978            fn prop_logging_extra_fields_empty(
3979                message in "[a-zA-Z0-9 ]{1,100}",
3980                log_level in 0u8..4u8,
3981            ) {
3982                use crate::client::MockDurableServiceClient;
3983                use crate::lambda::InitialExecutionState;
3984
3985                let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3986                let captured_info_clone = captured_info.clone();
3987
3988                // Create a logger that captures the LogInfo
3989                let inner = Arc::new(custom_logger(
3990                    {
3991                        let captured = captured_info_clone.clone();
3992                        move |_, info: &LogInfo| {
3993                            *captured.lock().unwrap() = Some(info.clone());
3994                        }
3995                    },
3996                    {
3997                        let captured = captured_info_clone.clone();
3998                        move |_, info: &LogInfo| {
3999                            *captured.lock().unwrap() = Some(info.clone());
4000                        }
4001                    },
4002                    {
4003                        let captured = captured_info_clone.clone();
4004                        move |_, info: &LogInfo| {
4005                            *captured.lock().unwrap() = Some(info.clone());
4006                        }
4007                    },
4008                    {
4009                        let captured = captured_info_clone.clone();
4010                        move |_, info: &LogInfo| {
4011                            *captured.lock().unwrap() = Some(info.clone());
4012                        }
4013                    },
4014                ));
4015
4016                let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4017                let state = Arc::new(ExecutionState::new(
4018                    "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4019                    "token-123",
4020                    InitialExecutionState::new(),
4021                    client,
4022                ));
4023                let ctx = DurableContext::new(state).with_logger(inner);
4024
4025                // Call the appropriate log method with empty fields
4026                let empty_fields: &[(&str, &str)] = &[];
4027                match log_level {
4028                    0 => ctx.log_debug_with(&message, empty_fields),
4029                    1 => ctx.log_info_with(&message, empty_fields),
4030                    2 => ctx.log_warn_with(&message, empty_fields),
4031                    _ => ctx.log_error_with(&message, empty_fields),
4032                }
4033
4034                // Verify the captured LogInfo has no extra fields
4035                let captured = captured_info.lock().unwrap();
4036                let info = captured.as_ref().expect("LogInfo should be captured");
4037
4038                prop_assert!(
4039                    info.extra.is_empty(),
4040                    "Extra fields should be empty when none are provided"
4041                );
4042
4043                // But automatic context should still be present
4044                prop_assert!(
4045                    info.durable_execution_arn.is_some(),
4046                    "durable_execution_arn must still be present even with empty extra fields"
4047                );
4048            }
4049        }
4050    }
4051}
4052
4053#[cfg(test)]
4054mod sealed_trait_tests {
4055    use super::*;
4056    use std::sync::atomic::{AtomicUsize, Ordering};
4057
4058    /// Tests for sealed Logger trait implementations.
4059    ///
4060    /// The Logger trait is sealed, meaning it cannot be implemented outside this crate.
4061    /// This is enforced at compile time by requiring the private `Sealed` supertrait.
4062    /// External crates attempting to implement Logger will get a compile error:
4063    /// "the trait bound `MyType: Sealed` is not satisfied"
4064    ///
4065    /// These tests verify that the internal implementations work correctly.
4066    mod logger_tests {
4067        use super::*;
4068
4069        #[test]
4070        fn test_tracing_logger_implements_logger() {
4071            // TracingLogger should implement Logger (compile-time check)
4072            let logger: &dyn Logger = &TracingLogger;
4073            let info = LogInfo::default();
4074
4075            // These calls should not panic
4076            logger.debug("test debug", &info);
4077            logger.info("test info", &info);
4078            logger.warn("test warn", &info);
4079            logger.error("test error", &info);
4080        }
4081
4082        #[test]
4083        fn test_replay_aware_logger_implements_logger() {
4084            // ReplayAwareLogger should implement Logger (compile-time check)
4085            let inner = Arc::new(TracingLogger);
4086            let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::AllowAll);
4087            let logger_ref: &dyn Logger = &logger;
4088            let info = LogInfo::default();
4089
4090            // These calls should not panic
4091            logger_ref.debug("test debug", &info);
4092            logger_ref.info("test info", &info);
4093            logger_ref.warn("test warn", &info);
4094            logger_ref.error("test error", &info);
4095        }
4096
4097        #[test]
4098        fn test_custom_logger_implements_logger() {
4099            // CustomLogger should implement Logger (compile-time check)
4100            let call_count = Arc::new(AtomicUsize::new(0));
4101            let count_clone = call_count.clone();
4102
4103            let logger = custom_logger(
4104                {
4105                    let count = count_clone.clone();
4106                    move |_msg, _info| {
4107                        count.fetch_add(1, Ordering::SeqCst);
4108                    }
4109                },
4110                {
4111                    let count = count_clone.clone();
4112                    move |_msg, _info| {
4113                        count.fetch_add(1, Ordering::SeqCst);
4114                    }
4115                },
4116                {
4117                    let count = count_clone.clone();
4118                    move |_msg, _info| {
4119                        count.fetch_add(1, Ordering::SeqCst);
4120                    }
4121                },
4122                {
4123                    let count = count_clone.clone();
4124                    move |_msg, _info| {
4125                        count.fetch_add(1, Ordering::SeqCst);
4126                    }
4127                },
4128            );
4129
4130            let logger_ref: &dyn Logger = &logger;
4131            let info = LogInfo::default();
4132
4133            logger_ref.debug("test", &info);
4134            logger_ref.info("test", &info);
4135            logger_ref.warn("test", &info);
4136            logger_ref.error("test", &info);
4137
4138            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4139        }
4140
4141        #[test]
4142        fn test_simple_custom_logger() {
4143            let call_count = Arc::new(AtomicUsize::new(0));
4144            let count_clone = call_count.clone();
4145
4146            let logger = simple_custom_logger(move |_level, _msg, _info| {
4147                count_clone.fetch_add(1, Ordering::SeqCst);
4148            });
4149
4150            let info = LogInfo::default();
4151
4152            logger.debug("test", &info);
4153            logger.info("test", &info);
4154            logger.warn("test", &info);
4155            logger.error("test", &info);
4156
4157            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4158        }
4159
4160        #[test]
4161        fn test_custom_logger_receives_correct_messages() {
4162            let messages = Arc::new(std::sync::Mutex::new(Vec::new()));
4163            let messages_clone = messages.clone();
4164
4165            let logger = simple_custom_logger(move |level, msg, _info| {
4166                messages_clone
4167                    .lock()
4168                    .unwrap()
4169                    .push(format!("[{}] {}", level, msg));
4170            });
4171
4172            let info = LogInfo::default();
4173
4174            logger.debug("debug message", &info);
4175            logger.info("info message", &info);
4176            logger.warn("warn message", &info);
4177            logger.error("error message", &info);
4178
4179            let logged = messages.lock().unwrap();
4180            assert_eq!(logged.len(), 4);
4181            assert_eq!(logged[0], "[DEBUG] debug message");
4182            assert_eq!(logged[1], "[INFO] info message");
4183            assert_eq!(logged[2], "[WARN] warn message");
4184            assert_eq!(logged[3], "[ERROR] error message");
4185        }
4186
4187        #[test]
4188        fn test_custom_logger_receives_log_info() {
4189            let received_info = Arc::new(std::sync::Mutex::new(None));
4190            let info_clone = received_info.clone();
4191
4192            let logger = simple_custom_logger(move |_level, _msg, info| {
4193                *info_clone.lock().unwrap() = Some(info.clone());
4194            });
4195
4196            let info = LogInfo::new("arn:aws:test")
4197                .with_operation_id("op-123")
4198                .with_parent_id("parent-456")
4199                .with_replay(true);
4200
4201            logger.info("test", &info);
4202
4203            let received = received_info.lock().unwrap().clone().unwrap();
4204            assert_eq!(
4205                received.durable_execution_arn,
4206                Some("arn:aws:test".to_string())
4207            );
4208            assert_eq!(received.operation_id, Some("op-123".to_string()));
4209            assert_eq!(received.parent_id, Some("parent-456".to_string()));
4210            assert!(received.is_replay);
4211        }
4212
4213        #[test]
4214        fn test_replay_aware_logger_suppresses_during_replay() {
4215            let call_count = Arc::new(AtomicUsize::new(0));
4216            let count_clone = call_count.clone();
4217
4218            let inner_logger = Arc::new(custom_logger(
4219                {
4220                    let count = count_clone.clone();
4221                    move |_msg, _info| {
4222                        count.fetch_add(1, Ordering::SeqCst);
4223                    }
4224                },
4225                {
4226                    let count = count_clone.clone();
4227                    move |_msg, _info| {
4228                        count.fetch_add(1, Ordering::SeqCst);
4229                    }
4230                },
4231                {
4232                    let count = count_clone.clone();
4233                    move |_msg, _info| {
4234                        count.fetch_add(1, Ordering::SeqCst);
4235                    }
4236                },
4237                {
4238                    let count = count_clone.clone();
4239                    move |_msg, _info| {
4240                        count.fetch_add(1, Ordering::SeqCst);
4241                    }
4242                },
4243            ));
4244
4245            let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::SuppressAll);
4246
4247            // Non-replay logs should pass through
4248            let non_replay_info = LogInfo::default().with_replay(false);
4249            logger.debug("test", &non_replay_info);
4250            logger.info("test", &non_replay_info);
4251            logger.warn("test", &non_replay_info);
4252            logger.error("test", &non_replay_info);
4253            assert_eq!(call_count.load(Ordering::SeqCst), 4);
4254
4255            // Replay logs should be suppressed
4256            let replay_info = LogInfo::default().with_replay(true);
4257            logger.debug("test", &replay_info);
4258            logger.info("test", &replay_info);
4259            logger.warn("test", &replay_info);
4260            logger.error("test", &replay_info);
4261            assert_eq!(call_count.load(Ordering::SeqCst), 4); // Still 4, no new calls
4262        }
4263
4264        #[test]
4265        fn test_replay_aware_logger_errors_only_during_replay() {
4266            let call_count = Arc::new(AtomicUsize::new(0));
4267            let count_clone = call_count.clone();
4268
4269            let inner_logger = Arc::new(custom_logger(
4270                {
4271                    let count = count_clone.clone();
4272                    move |_msg, _info| {
4273                        count.fetch_add(1, Ordering::SeqCst);
4274                    }
4275                },
4276                {
4277                    let count = count_clone.clone();
4278                    move |_msg, _info| {
4279                        count.fetch_add(1, Ordering::SeqCst);
4280                    }
4281                },
4282                {
4283                    let count = count_clone.clone();
4284                    move |_msg, _info| {
4285                        count.fetch_add(1, Ordering::SeqCst);
4286                    }
4287                },
4288                {
4289                    let count = count_clone.clone();
4290                    move |_msg, _info| {
4291                        count.fetch_add(1, Ordering::SeqCst);
4292                    }
4293                },
4294            ));
4295
4296            let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::ErrorsOnly);
4297
4298            let replay_info = LogInfo::default().with_replay(true);
4299            logger.debug("test", &replay_info);
4300            logger.info("test", &replay_info);
4301            logger.warn("test", &replay_info);
4302            logger.error("test", &replay_info);
4303
4304            // Only error should pass through during replay
4305            assert_eq!(call_count.load(Ordering::SeqCst), 1);
4306        }
4307    }
4308}