durable_execution_sdk/context.rs
1//! DurableContext and operation identifier types for the AWS Durable Execution SDK.
2//!
3//! This module provides the main `DurableContext` interface that user code interacts with,
4//! as well as the `OperationIdentifier` type for deterministic operation ID generation.
5
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use blake2::{Blake2b512, Digest};
10
11use crate::error::DurableResult;
12use crate::sealed::Sealed;
13use crate::state::ExecutionState;
14use crate::traits::{DurableValue, StepFn};
15use crate::types::OperationId;
16
17/// Identifies an operation within a durable execution.
18///
19/// Each operation has a unique ID that is deterministically generated
20/// based on the parent context and step counter. This ensures that
21/// the same operation gets the same ID across replays.
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct OperationIdentifier {
24 /// The unique operation ID (deterministically generated)
25 pub operation_id: String,
26 /// The parent operation ID (None for root operations)
27 pub parent_id: Option<String>,
28 /// Optional human-readable name for the operation
29 pub name: Option<String>,
30}
31
32impl OperationIdentifier {
33 /// Creates a new OperationIdentifier with the given values.
34 pub fn new(
35 operation_id: impl Into<String>,
36 parent_id: Option<String>,
37 name: Option<String>,
38 ) -> Self {
39 Self {
40 operation_id: operation_id.into(),
41 parent_id,
42 name,
43 }
44 }
45
46 /// Creates a root operation identifier (no parent).
47 pub fn root(operation_id: impl Into<String>) -> Self {
48 Self {
49 operation_id: operation_id.into(),
50 parent_id: None,
51 name: None,
52 }
53 }
54
55 /// Creates an operation identifier with a parent.
56 pub fn with_parent(operation_id: impl Into<String>, parent_id: impl Into<String>) -> Self {
57 Self {
58 operation_id: operation_id.into(),
59 parent_id: Some(parent_id.into()),
60 name: None,
61 }
62 }
63
64 /// Sets the name for this operation identifier.
65 pub fn with_name(mut self, name: impl Into<String>) -> Self {
66 self.name = Some(name.into());
67 self
68 }
69
70 /// Returns the operation ID as an `OperationId` newtype.
71 #[inline]
72 pub fn operation_id_typed(&self) -> OperationId {
73 OperationId::from(self.operation_id.clone())
74 }
75
76 /// Returns the parent ID as an `OperationId` newtype, if present.
77 #[inline]
78 pub fn parent_id_typed(&self) -> Option<OperationId> {
79 self.parent_id
80 .as_ref()
81 .map(|id| OperationId::from(id.clone()))
82 }
83}
84
85impl std::fmt::Display for OperationIdentifier {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 if let Some(ref name) = self.name {
88 write!(f, "{}({})", name, self.operation_id)
89 } else {
90 write!(f, "{}", self.operation_id)
91 }
92 }
93}
94
95/// Generates deterministic operation IDs using blake2b hashing.
96///
97/// The ID is generated by hashing the parent ID (or execution ARN for root)
98/// combined with the step counter value. This ensures:
99/// - Same inputs always produce the same ID
100/// - Different step counts produce different IDs
101/// - IDs are unique within an execution
102#[derive(Debug)]
103pub struct OperationIdGenerator {
104 /// The base identifier (execution ARN or parent operation ID)
105 base_id: String,
106 /// Thread-safe step counter
107 step_counter: AtomicU64,
108}
109
110impl OperationIdGenerator {
111 /// Creates a new OperationIdGenerator with the given base ID.
112 ///
113 /// # Arguments
114 ///
115 /// * `base_id` - The base identifier to use for hashing (typically execution ARN or parent ID)
116 pub fn new(base_id: impl Into<String>) -> Self {
117 Self {
118 base_id: base_id.into(),
119 step_counter: AtomicU64::new(0),
120 }
121 }
122
123 /// Creates a new OperationIdGenerator with a specific starting counter value.
124 ///
125 /// # Arguments
126 ///
127 /// * `base_id` - The base identifier to use for hashing
128 /// * `initial_counter` - The initial value for the step counter
129 pub fn with_counter(base_id: impl Into<String>, initial_counter: u64) -> Self {
130 Self {
131 base_id: base_id.into(),
132 step_counter: AtomicU64::new(initial_counter),
133 }
134 }
135
136 /// Generates the next operation ID.
137 ///
138 /// This method atomically increments the step counter and generates
139 /// a deterministic ID based on the base ID and counter value.
140 ///
141 /// # Returns
142 ///
143 /// A unique, deterministic operation ID string.
144 ///
145 /// # Memory Ordering
146 ///
147 /// Uses `Ordering::Relaxed` because the step counter only needs to provide
148 /// unique values - there's no synchronization requirement with other data.
149 /// Each call gets a unique counter value, and the hash function ensures
150 /// deterministic ID generation regardless of ordering between threads.
151 ///
152 /// Requirements: 4.1, 4.6
153 pub fn next_id(&self) -> String {
154 // Relaxed ordering is sufficient: we only need uniqueness, not synchronization.
155 // The counter value is combined with base_id in a hash, so the only requirement
156 // is that each call gets a different counter value, which fetch_add guarantees.
157 let counter = self.step_counter.fetch_add(1, Ordering::Relaxed);
158 generate_operation_id(&self.base_id, counter)
159 }
160
161 /// Generates an operation ID for a specific counter value without incrementing.
162 ///
163 /// This is useful for testing or when you need to generate an ID
164 /// for a known counter value.
165 ///
166 /// # Arguments
167 ///
168 /// * `counter` - The counter value to use for ID generation
169 ///
170 /// # Returns
171 ///
172 /// A deterministic operation ID string.
173 pub fn id_for_counter(&self, counter: u64) -> String {
174 generate_operation_id(&self.base_id, counter)
175 }
176
177 /// Returns the current counter value without incrementing.
178 ///
179 /// # Memory Ordering
180 ///
181 /// Uses `Ordering::Relaxed` because this is an informational read that doesn't
182 /// need to synchronize with other operations. The value may be slightly stale
183 /// in concurrent scenarios, but this is acceptable for debugging/monitoring use.
184 ///
185 /// Requirements: 4.1, 4.6
186 pub fn current_counter(&self) -> u64 {
187 // Relaxed ordering is sufficient: this is an informational read with no
188 // synchronization requirements. Callers should not rely on this value
189 // for correctness in concurrent scenarios.
190 self.step_counter.load(Ordering::Relaxed)
191 }
192
193 /// Returns the base ID used for generation.
194 pub fn base_id(&self) -> &str {
195 &self.base_id
196 }
197
198 /// Creates a child generator with a new base ID.
199 ///
200 /// The child generator starts with counter 0 and uses the provided
201 /// parent operation ID as its base.
202 ///
203 /// # Arguments
204 ///
205 /// * `parent_operation_id` - The operation ID to use as the base for the child
206 pub fn create_child(&self, parent_operation_id: impl Into<String>) -> Self {
207 Self::new(parent_operation_id)
208 }
209}
210
211impl Clone for OperationIdGenerator {
212 fn clone(&self) -> Self {
213 Self {
214 base_id: self.base_id.clone(),
215 // Relaxed ordering is sufficient: we're creating a snapshot of the counter
216 // for a new generator. The clone doesn't need to synchronize with the
217 // original - it just needs a reasonable starting point.
218 step_counter: AtomicU64::new(self.step_counter.load(Ordering::Relaxed)),
219 }
220 }
221}
222
223/// Generates a deterministic operation ID using blake2b hashing.
224///
225/// # Arguments
226///
227/// * `base_id` - The base identifier (execution ARN or parent operation ID)
228/// * `counter` - The step counter value
229///
230/// # Returns
231///
232/// A hex-encoded operation ID string (first 32 characters of the hash).
233pub fn generate_operation_id(base_id: &str, counter: u64) -> String {
234 let mut hasher = Blake2b512::new();
235 hasher.update(base_id.as_bytes());
236 hasher.update(counter.to_le_bytes());
237 let result = hasher.finalize();
238
239 // Take first 16 bytes (32 hex chars) for a reasonably short but unique ID
240 hex::encode(&result[..16])
241}
242
243/// Logger trait for structured logging in durable executions.
244///
245/// This trait is compatible with the `tracing` crate and allows
246/// custom logging implementations.
247///
248/// # Sealed Trait
249///
250/// This trait is sealed and cannot be implemented outside of this crate.
251/// This allows the SDK maintainers to evolve the logging interface without
252/// breaking external code. If you need custom logging behavior, use the
253/// provided factory functions or wrap one of the existing implementations.
254///
255/// # Requirements
256///
257/// - 3.1: THE SDK SHALL implement the sealed trait pattern for the `Logger` trait
258/// - 3.5: THE SDK SHALL document that these traits are sealed and cannot be implemented externally
259#[allow(private_bounds)]
260pub trait Logger: Sealed + Send + Sync {
261 /// Logs a debug message.
262 fn debug(&self, message: &str, info: &LogInfo);
263 /// Logs an info message.
264 fn info(&self, message: &str, info: &LogInfo);
265 /// Logs a warning message.
266 fn warn(&self, message: &str, info: &LogInfo);
267 /// Logs an error message.
268 fn error(&self, message: &str, info: &LogInfo);
269}
270
271/// Context information for log messages.
272///
273/// This struct provides context for log messages including execution ARN,
274/// operation IDs, and replay status. The `is_replay` flag indicates whether
275/// the current operation is being replayed from a checkpoint.
276///
277/// # Requirements
278///
279/// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
280#[derive(Debug, Clone, Default)]
281pub struct LogInfo {
282 /// The durable execution ARN
283 pub durable_execution_arn: Option<String>,
284 /// The current operation ID
285 pub operation_id: Option<String>,
286 /// The parent operation ID
287 pub parent_id: Option<String>,
288 /// Whether the current operation is being replayed from a checkpoint
289 ///
290 /// When `true`, the operation is returning a previously checkpointed result
291 /// without re-executing the operation's logic. Loggers can use this flag
292 /// to suppress or annotate logs during replay.
293 ///
294 /// # Requirements
295 ///
296 /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
297 pub is_replay: bool,
298 /// Additional key-value pairs
299 pub extra: Vec<(String, String)>,
300}
301
302impl LogInfo {
303 /// Creates a new LogInfo with the given execution ARN.
304 pub fn new(durable_execution_arn: impl Into<String>) -> Self {
305 Self {
306 durable_execution_arn: Some(durable_execution_arn.into()),
307 ..Default::default()
308 }
309 }
310
311 /// Sets the operation ID.
312 pub fn with_operation_id(mut self, operation_id: impl Into<String>) -> Self {
313 self.operation_id = Some(operation_id.into());
314 self
315 }
316
317 /// Sets the parent ID.
318 pub fn with_parent_id(mut self, parent_id: impl Into<String>) -> Self {
319 self.parent_id = Some(parent_id.into());
320 self
321 }
322
323 /// Sets the replay flag.
324 ///
325 /// When set to `true`, indicates that the current operation is being
326 /// replayed from a checkpoint rather than executing fresh.
327 ///
328 /// # Arguments
329 ///
330 /// * `is_replay` - Whether the operation is being replayed
331 ///
332 /// # Requirements
333 ///
334 /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
335 pub fn with_replay(mut self, is_replay: bool) -> Self {
336 self.is_replay = is_replay;
337 self
338 }
339
340 /// Adds an extra key-value pair.
341 pub fn with_extra(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
342 self.extra.push((key.into(), value.into()));
343 self
344 }
345}
346
347/// Creates a tracing span for a durable operation.
348///
349/// This function creates a structured tracing span that includes all relevant
350/// context for observability and debugging. The span can be used to trace
351/// execution flow and correlate logs across operations.
352///
353/// # Arguments
354///
355/// * `operation_type` - The type of operation (e.g., "step", "wait", "callback", "invoke", "map", "parallel")
356/// * `op_id` - The operation identifier containing operation_id, parent_id, and name
357/// * `durable_execution_arn` - The ARN of the durable execution
358///
359/// # Returns
360///
361/// A `tracing::Span` that can be entered during operation execution.
362///
363/// # Example
364///
365/// ```rust,ignore
366/// let span = create_operation_span("step", &op_id, state.durable_execution_arn());
367/// let _guard = span.enter();
368/// // ... operation execution ...
369/// ```
370///
371/// # Requirements
372///
373/// - 3.2: THE tracing span SHALL include the operation_id as a structured field
374/// - 3.3: THE tracing span SHALL include the operation_type as a structured field
375/// - 3.4: THE tracing span SHALL include the parent_id as a structured field when available
376/// - 3.5: THE tracing span SHALL include the durable_execution_arn as a structured field
377pub fn create_operation_span(
378 operation_type: &str,
379 op_id: &OperationIdentifier,
380 durable_execution_arn: &str,
381) -> tracing::Span {
382 tracing::info_span!(
383 "durable_operation",
384 operation_type = %operation_type,
385 operation_id = %op_id.operation_id,
386 parent_id = ?op_id.parent_id,
387 name = ?op_id.name,
388 durable_execution_arn = %durable_execution_arn,
389 status = tracing::field::Empty,
390 )
391}
392
393/// Default logger implementation using the `tracing` crate.
394///
395/// This logger includes the `is_replay` flag in all log messages to help
396/// distinguish between fresh executions and replayed operations.
397///
398/// Extra fields from `LogInfo` are included in the tracing output as a formatted
399/// string of key-value pairs, making them queryable in log aggregation systems.
400///
401/// # Requirements
402///
403/// - 5.1: WHEN LogInfo contains extra fields, THE TracingLogger SHALL include them in the tracing output
404/// - 5.2: THE extra fields SHALL be formatted as key-value pairs in the tracing event
405/// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
406#[derive(Debug, Clone, Default)]
407pub struct TracingLogger;
408
409// Implement Sealed for TracingLogger to allow it to implement Logger
410impl Sealed for TracingLogger {}
411
412impl TracingLogger {
413 /// Formats extra fields as a string of key-value pairs.
414 ///
415 /// Returns an empty string if there are no extra fields, otherwise returns
416 /// a comma-separated list of "key=value" pairs.
417 ///
418 /// # Requirements
419 ///
420 /// - 5.2: THE extra fields SHALL be formatted as key-value pairs in the tracing event
421 fn format_extra_fields(extra: &[(String, String)]) -> String {
422 if extra.is_empty() {
423 String::new()
424 } else {
425 extra
426 .iter()
427 .map(|(k, v)| format!("{}={}", k, v))
428 .collect::<Vec<_>>()
429 .join(", ")
430 }
431 }
432}
433
434impl Logger for TracingLogger {
435 fn debug(&self, message: &str, info: &LogInfo) {
436 let extra_fields = Self::format_extra_fields(&info.extra);
437 tracing::debug!(
438 durable_execution_arn = ?info.durable_execution_arn,
439 operation_id = ?info.operation_id,
440 parent_id = ?info.parent_id,
441 is_replay = info.is_replay,
442 extra = %extra_fields,
443 "{}",
444 message
445 );
446 }
447
448 fn info(&self, message: &str, info: &LogInfo) {
449 let extra_fields = Self::format_extra_fields(&info.extra);
450 tracing::info!(
451 durable_execution_arn = ?info.durable_execution_arn,
452 operation_id = ?info.operation_id,
453 parent_id = ?info.parent_id,
454 is_replay = info.is_replay,
455 extra = %extra_fields,
456 "{}",
457 message
458 );
459 }
460
461 fn warn(&self, message: &str, info: &LogInfo) {
462 let extra_fields = Self::format_extra_fields(&info.extra);
463 tracing::warn!(
464 durable_execution_arn = ?info.durable_execution_arn,
465 operation_id = ?info.operation_id,
466 parent_id = ?info.parent_id,
467 is_replay = info.is_replay,
468 extra = %extra_fields,
469 "{}",
470 message
471 );
472 }
473
474 fn error(&self, message: &str, info: &LogInfo) {
475 let extra_fields = Self::format_extra_fields(&info.extra);
476 tracing::error!(
477 durable_execution_arn = ?info.durable_execution_arn,
478 operation_id = ?info.operation_id,
479 parent_id = ?info.parent_id,
480 is_replay = info.is_replay,
481 extra = %extra_fields,
482 "{}",
483 message
484 );
485 }
486}
487
488/// Configuration for replay-aware logging behavior.
489///
490/// This configuration controls how the `ReplayAwareLogger` handles log messages
491/// during replay. Users can choose to suppress all logs during replay, allow
492/// only certain log levels, or log all messages regardless of replay status.
493///
494/// # Requirements
495///
496/// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
497/// - 16.7: THE Logging_System SHALL allow users to configure replay logging behavior
498#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
499pub enum ReplayLoggingConfig {
500 /// Suppress all logs during replay (default).
501 ///
502 /// When in replay mode, no log messages will be emitted. This is useful
503 /// to reduce noise in logs when replaying previously executed operations.
504 #[default]
505 SuppressAll,
506
507 /// Allow all logs during replay.
508 ///
509 /// Log messages will be emitted regardless of replay status. The `is_replay`
510 /// flag in `LogInfo` can still be used to distinguish replay logs.
511 AllowAll,
512
513 /// Allow only error logs during replay.
514 ///
515 /// Only error-level messages will be emitted during replay. This is useful
516 /// when you want to see errors but suppress informational messages.
517 ErrorsOnly,
518
519 /// Allow error and warning logs during replay.
520 ///
521 /// Error and warning-level messages will be emitted during replay.
522 /// Debug and info messages will be suppressed.
523 WarningsAndErrors,
524}
525
526/// A logger wrapper that can suppress logs during replay based on configuration.
527///
528/// `ReplayAwareLogger` wraps any `Logger` implementation and adds replay-aware
529/// behavior. When the `is_replay` flag in `LogInfo` is `true`, the logger will
530/// suppress or allow logs based on the configured `ReplayLoggingConfig`.
531///
532/// # Example
533///
534/// ```rust
535/// use durable_execution_sdk::{TracingLogger, ReplayAwareLogger, ReplayLoggingConfig};
536/// use std::sync::Arc;
537///
538/// // Create a replay-aware logger that suppresses all logs during replay
539/// let logger = ReplayAwareLogger::new(
540/// Arc::new(TracingLogger),
541/// ReplayLoggingConfig::SuppressAll,
542/// );
543///
544/// // Create a replay-aware logger that allows errors during replay
545/// let logger_with_errors = ReplayAwareLogger::new(
546/// Arc::new(TracingLogger),
547/// ReplayLoggingConfig::ErrorsOnly,
548/// );
549/// ```
550///
551/// # Requirements
552///
553/// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
554/// - 16.7: THE Logging_System SHALL allow users to configure replay logging behavior
555pub struct ReplayAwareLogger {
556 /// The underlying logger to delegate to
557 inner: Arc<dyn Logger>,
558 /// Configuration for replay logging behavior
559 config: ReplayLoggingConfig,
560}
561
562// Implement Sealed for ReplayAwareLogger to allow it to implement Logger
563impl Sealed for ReplayAwareLogger {}
564
565impl ReplayAwareLogger {
566 /// Creates a new `ReplayAwareLogger` with the specified configuration.
567 ///
568 /// # Arguments
569 ///
570 /// * `inner` - The underlying logger to delegate to
571 /// * `config` - Configuration for replay logging behavior
572 ///
573 /// # Example
574 ///
575 /// ```rust
576 /// use durable_execution_sdk::{TracingLogger, ReplayAwareLogger, ReplayLoggingConfig};
577 /// use std::sync::Arc;
578 ///
579 /// let logger = ReplayAwareLogger::new(
580 /// Arc::new(TracingLogger),
581 /// ReplayLoggingConfig::SuppressAll,
582 /// );
583 /// ```
584 pub fn new(inner: Arc<dyn Logger>, config: ReplayLoggingConfig) -> Self {
585 Self { inner, config }
586 }
587
588 /// Creates a new `ReplayAwareLogger` that suppresses all logs during replay.
589 ///
590 /// This is a convenience constructor for the most common use case.
591 ///
592 /// # Arguments
593 ///
594 /// * `inner` - The underlying logger to delegate to
595 pub fn suppress_replay(inner: Arc<dyn Logger>) -> Self {
596 Self::new(inner, ReplayLoggingConfig::SuppressAll)
597 }
598
599 /// Creates a new `ReplayAwareLogger` that allows all logs during replay.
600 ///
601 /// # Arguments
602 ///
603 /// * `inner` - The underlying logger to delegate to
604 pub fn allow_all(inner: Arc<dyn Logger>) -> Self {
605 Self::new(inner, ReplayLoggingConfig::AllowAll)
606 }
607
608 /// Returns the current replay logging configuration.
609 pub fn config(&self) -> ReplayLoggingConfig {
610 self.config
611 }
612
613 /// Returns a reference to the underlying logger.
614 pub fn inner(&self) -> &Arc<dyn Logger> {
615 &self.inner
616 }
617
618 /// Checks if a log at the given level should be suppressed during replay.
619 fn should_suppress(&self, info: &LogInfo, level: LogLevel) -> bool {
620 if !info.is_replay {
621 return false;
622 }
623
624 match self.config {
625 ReplayLoggingConfig::SuppressAll => true,
626 ReplayLoggingConfig::AllowAll => false,
627 ReplayLoggingConfig::ErrorsOnly => level != LogLevel::Error,
628 ReplayLoggingConfig::WarningsAndErrors => {
629 level != LogLevel::Error && level != LogLevel::Warn
630 }
631 }
632 }
633}
634
635/// Internal enum for log levels used by ReplayAwareLogger.
636#[derive(Debug, Clone, Copy, PartialEq, Eq)]
637enum LogLevel {
638 Debug,
639 Info,
640 Warn,
641 Error,
642}
643
644impl std::fmt::Debug for ReplayAwareLogger {
645 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
646 f.debug_struct("ReplayAwareLogger")
647 .field("config", &self.config)
648 .finish()
649 }
650}
651
652impl Logger for ReplayAwareLogger {
653 fn debug(&self, message: &str, info: &LogInfo) {
654 if !self.should_suppress(info, LogLevel::Debug) {
655 self.inner.debug(message, info);
656 }
657 }
658
659 fn info(&self, message: &str, info: &LogInfo) {
660 if !self.should_suppress(info, LogLevel::Info) {
661 self.inner.info(message, info);
662 }
663 }
664
665 fn warn(&self, message: &str, info: &LogInfo) {
666 if !self.should_suppress(info, LogLevel::Warn) {
667 self.inner.warn(message, info);
668 }
669 }
670
671 fn error(&self, message: &str, info: &LogInfo) {
672 if !self.should_suppress(info, LogLevel::Error) {
673 self.inner.error(message, info);
674 }
675 }
676}
677
678/// A custom logger that delegates to user-provided closures.
679///
680/// This struct allows users to provide custom logging behavior without
681/// implementing the sealed `Logger` trait directly. Each log level can
682/// have its own closure for handling log messages.
683///
684/// # Example
685///
686/// ```rust
687/// use durable_execution_sdk::context::{custom_logger, LogInfo};
688/// use std::sync::Arc;
689///
690/// // Create a custom logger that prints to stdout
691/// let logger = custom_logger(
692/// |msg, info| println!("[DEBUG] {}: {:?}", msg, info),
693/// |msg, info| println!("[INFO] {}: {:?}", msg, info),
694/// |msg, info| println!("[WARN] {}: {:?}", msg, info),
695/// |msg, info| println!("[ERROR] {}: {:?}", msg, info),
696/// );
697/// ```
698///
699/// # Requirements
700///
701/// - 3.6: THE SDK SHALL provide factory functions or builders for users who need custom behavior
702pub struct CustomLogger<D, I, W, E>
703where
704 D: Fn(&str, &LogInfo) + Send + Sync,
705 I: Fn(&str, &LogInfo) + Send + Sync,
706 W: Fn(&str, &LogInfo) + Send + Sync,
707 E: Fn(&str, &LogInfo) + Send + Sync,
708{
709 debug_fn: D,
710 info_fn: I,
711 warn_fn: W,
712 error_fn: E,
713}
714
715// Implement Sealed for CustomLogger to allow it to implement Logger
716impl<D, I, W, E> Sealed for CustomLogger<D, I, W, E>
717where
718 D: Fn(&str, &LogInfo) + Send + Sync,
719 I: Fn(&str, &LogInfo) + Send + Sync,
720 W: Fn(&str, &LogInfo) + Send + Sync,
721 E: Fn(&str, &LogInfo) + Send + Sync,
722{
723}
724
725impl<D, I, W, E> Logger for CustomLogger<D, I, W, E>
726where
727 D: Fn(&str, &LogInfo) + Send + Sync,
728 I: Fn(&str, &LogInfo) + Send + Sync,
729 W: Fn(&str, &LogInfo) + Send + Sync,
730 E: Fn(&str, &LogInfo) + Send + Sync,
731{
732 fn debug(&self, message: &str, info: &LogInfo) {
733 (self.debug_fn)(message, info);
734 }
735
736 fn info(&self, message: &str, info: &LogInfo) {
737 (self.info_fn)(message, info);
738 }
739
740 fn warn(&self, message: &str, info: &LogInfo) {
741 (self.warn_fn)(message, info);
742 }
743
744 fn error(&self, message: &str, info: &LogInfo) {
745 (self.error_fn)(message, info);
746 }
747}
748
749impl<D, I, W, E> std::fmt::Debug for CustomLogger<D, I, W, E>
750where
751 D: Fn(&str, &LogInfo) + Send + Sync,
752 I: Fn(&str, &LogInfo) + Send + Sync,
753 W: Fn(&str, &LogInfo) + Send + Sync,
754 E: Fn(&str, &LogInfo) + Send + Sync,
755{
756 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
757 f.debug_struct("CustomLogger").finish()
758 }
759}
760
761/// Creates a custom logger with user-provided closures for each log level.
762///
763/// This factory function allows users to create custom logging behavior without
764/// implementing the sealed `Logger` trait directly. Each log level has its own
765/// closure that receives the log message and context information.
766///
767/// # Arguments
768///
769/// * `debug_fn` - Closure called for debug-level messages
770/// * `info_fn` - Closure called for info-level messages
771/// * `warn_fn` - Closure called for warning-level messages
772/// * `error_fn` - Closure called for error-level messages
773///
774/// # Example
775///
776/// ```rust
777/// use durable_execution_sdk::context::{custom_logger, LogInfo};
778/// use std::sync::Arc;
779///
780/// // Create a custom logger that prints to stdout
781/// let logger = custom_logger(
782/// |msg, info| println!("[DEBUG] {}: {:?}", msg, info),
783/// |msg, info| println!("[INFO] {}: {:?}", msg, info),
784/// |msg, info| println!("[WARN] {}: {:?}", msg, info),
785/// |msg, info| println!("[ERROR] {}: {:?}", msg, info),
786/// );
787///
788/// // Use with Arc for sharing across contexts
789/// let shared_logger = Arc::new(logger);
790/// ```
791///
792/// # Requirements
793///
794/// - 3.6: THE SDK SHALL provide factory functions or builders for users who need custom behavior
795pub fn custom_logger<D, I, W, E>(
796 debug_fn: D,
797 info_fn: I,
798 warn_fn: W,
799 error_fn: E,
800) -> CustomLogger<D, I, W, E>
801where
802 D: Fn(&str, &LogInfo) + Send + Sync,
803 I: Fn(&str, &LogInfo) + Send + Sync,
804 W: Fn(&str, &LogInfo) + Send + Sync,
805 E: Fn(&str, &LogInfo) + Send + Sync,
806{
807 CustomLogger {
808 debug_fn,
809 info_fn,
810 warn_fn,
811 error_fn,
812 }
813}
814
815/// Creates a simple custom logger that uses a single closure for all log levels.
816///
817/// This is a convenience function for cases where you want the same handling
818/// for all log levels. The closure receives the log level as a string, the
819/// message, and the log info.
820///
821/// # Arguments
822///
823/// * `log_fn` - Closure called for all log messages. Receives (level, message, info).
824///
825/// # Example
826///
827/// ```rust
828/// use durable_execution_sdk::context::{simple_custom_logger, LogInfo};
829/// use std::sync::Arc;
830///
831/// // Create a simple logger that prints all messages with their level
832/// let logger = simple_custom_logger(|level, msg, info| {
833/// println!("[{}] {}: {:?}", level, msg, info);
834/// });
835///
836/// // Use with Arc for sharing across contexts
837/// let shared_logger = Arc::new(logger);
838/// ```
839///
840/// # Requirements
841///
842/// - 3.6: THE SDK SHALL provide factory functions or builders for users who need custom behavior
843pub fn simple_custom_logger<F>(log_fn: F) -> impl Logger
844where
845 F: Fn(&str, &str, &LogInfo) + Send + Sync + Clone + 'static,
846{
847 let debug_fn = log_fn.clone();
848 let info_fn = log_fn.clone();
849 let warn_fn = log_fn.clone();
850 let error_fn = log_fn;
851
852 custom_logger(
853 move |msg, info| debug_fn("DEBUG", msg, info),
854 move |msg, info| info_fn("INFO", msg, info),
855 move |msg, info| warn_fn("WARN", msg, info),
856 move |msg, info| error_fn("ERROR", msg, info),
857 )
858}
859
860/// The main context for durable execution operations.
861///
862/// `DurableContext` provides all the durable operations that user code
863/// can use to build reliable workflows. It handles checkpointing, replay,
864/// and state management automatically.
865///
866/// # Thread Safety
867///
868/// `DurableContext` is `Send + Sync` and can be safely shared across
869/// async tasks. The internal state uses appropriate synchronization
870/// primitives for concurrent access.
871///
872/// # Example
873///
874/// ```rust,ignore
875/// async fn my_workflow(ctx: DurableContext) -> Result<String, DurableError> {
876/// // Execute a step (automatically checkpointed)
877/// let result = ctx.step(|_| Ok("hello".to_string()), None).await?;
878///
879/// // Wait for 5 seconds (suspends Lambda, resumes later)
880/// ctx.wait(Duration::from_seconds(5), None).await?;
881///
882/// Ok(result)
883/// }
884/// ```
885pub struct DurableContext {
886 /// The execution state manager
887 state: Arc<ExecutionState>,
888 /// The Lambda context (if running in Lambda)
889 lambda_context: Option<lambda_runtime::Context>,
890 /// The parent operation ID (None for root context)
891 parent_id: Option<String>,
892 /// Operation ID generator for this context
893 id_generator: Arc<OperationIdGenerator>,
894 /// Logger for structured logging (wrapped in RwLock for runtime reconfiguration)
895 logger: Arc<RwLock<Arc<dyn Logger>>>,
896}
897
898// Ensure DurableContext is Send + Sync
899static_assertions::assert_impl_all!(DurableContext: Send, Sync);
900
901impl DurableContext {
902 /// Creates a new DurableContext with the given state.
903 ///
904 /// # Arguments
905 ///
906 /// * `state` - The execution state manager
907 pub fn new(state: Arc<ExecutionState>) -> Self {
908 let base_id = state.durable_execution_arn().to_string();
909 Self {
910 state,
911 lambda_context: None,
912 parent_id: None,
913 id_generator: Arc::new(OperationIdGenerator::new(base_id)),
914 logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
915 }
916 }
917
918 /// Creates a DurableContext from a Lambda context.
919 ///
920 /// This is the primary factory method used when running in AWS Lambda.
921 ///
922 /// # Arguments
923 ///
924 /// * `state` - The execution state manager
925 /// * `lambda_context` - The Lambda runtime context
926 pub fn from_lambda_context(
927 state: Arc<ExecutionState>,
928 lambda_context: lambda_runtime::Context,
929 ) -> Self {
930 let base_id = state.durable_execution_arn().to_string();
931 Self {
932 state,
933 lambda_context: Some(lambda_context),
934 parent_id: None,
935 id_generator: Arc::new(OperationIdGenerator::new(base_id)),
936 logger: Arc::new(RwLock::new(Arc::new(TracingLogger))),
937 }
938 }
939
940 /// Creates a child context for nested operations.
941 ///
942 /// Child contexts have their own step counter but share the same
943 /// execution state. Operations in a child context are tracked
944 /// with the parent's operation ID.
945 ///
946 /// # Arguments
947 ///
948 /// * `parent_operation_id` - The operation ID of the parent operation
949 pub fn create_child_context(&self, parent_operation_id: impl Into<String>) -> Self {
950 let parent_id = parent_operation_id.into();
951 Self {
952 state: self.state.clone(),
953 lambda_context: self.lambda_context.clone(),
954 parent_id: Some(parent_id.clone()),
955 id_generator: Arc::new(OperationIdGenerator::new(parent_id)),
956 logger: self.logger.clone(),
957 }
958 }
959
960 /// Sets a custom logger for this context.
961 ///
962 /// # Arguments
963 ///
964 /// * `logger` - The logger implementation to use
965 pub fn set_logger(&mut self, logger: Arc<dyn Logger>) {
966 *self.logger.write().unwrap() = logger;
967 }
968
969 /// Returns a new context with the specified logger.
970 ///
971 /// # Arguments
972 ///
973 /// * `logger` - The logger implementation to use
974 pub fn with_logger(self, logger: Arc<dyn Logger>) -> Self {
975 *self.logger.write().unwrap() = logger;
976 self
977 }
978
979 /// Reconfigures the logger for this context at runtime.
980 ///
981 /// All subsequent log calls on this context (and any clones sharing the
982 /// same underlying `RwLock`) will use the new logger.
983 ///
984 /// # Arguments
985 ///
986 /// * `logger` - The new logger implementation to use
987 ///
988 /// # Requirements
989 ///
990 /// - 13.1: configure_logger swaps the logger, subsequent calls use new logger
991 /// - 13.2: Original logger used when configure_logger not called
992 pub fn configure_logger(&self, logger: Arc<dyn Logger>) {
993 *self.logger.write().unwrap() = logger;
994 }
995
996 /// Returns a reference to the execution state.
997 pub fn state(&self) -> &Arc<ExecutionState> {
998 &self.state
999 }
1000
1001 /// Returns the durable execution ARN.
1002 pub fn durable_execution_arn(&self) -> &str {
1003 self.state.durable_execution_arn()
1004 }
1005
1006 /// Returns the parent operation ID, if any.
1007 pub fn parent_id(&self) -> Option<&str> {
1008 self.parent_id.as_deref()
1009 }
1010
1011 /// Returns a reference to the Lambda context, if available.
1012 pub fn lambda_context(&self) -> Option<&lambda_runtime::Context> {
1013 self.lambda_context.as_ref()
1014 }
1015
1016 /// Returns a reference to the logger.
1017 pub fn logger(&self) -> Arc<dyn Logger> {
1018 self.logger.read().unwrap().clone()
1019 }
1020
1021 /// Generates the next operation ID for this context.
1022 ///
1023 /// This method is thread-safe and will generate unique, deterministic
1024 /// IDs based on the context's base ID and step counter.
1025 pub fn next_operation_id(&self) -> String {
1026 self.id_generator.next_id()
1027 }
1028
1029 /// Creates an OperationIdentifier for the next operation.
1030 ///
1031 /// # Arguments
1032 ///
1033 /// * `name` - Optional human-readable name for the operation
1034 pub fn next_operation_identifier(&self, name: Option<String>) -> OperationIdentifier {
1035 OperationIdentifier::new(self.next_operation_id(), self.parent_id.clone(), name)
1036 }
1037
1038 /// Returns the current step counter value without incrementing.
1039 pub fn current_step_counter(&self) -> u64 {
1040 self.id_generator.current_counter()
1041 }
1042
1043 /// Creates log info for the current context.
1044 ///
1045 /// The returned `LogInfo` includes the current replay status from the
1046 /// execution state, allowing loggers to distinguish between fresh
1047 /// executions and replayed operations.
1048 ///
1049 /// # Requirements
1050 ///
1051 /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
1052 pub fn create_log_info(&self) -> LogInfo {
1053 let mut info = LogInfo::new(self.durable_execution_arn());
1054 if let Some(ref parent_id) = self.parent_id {
1055 info = info.with_parent_id(parent_id);
1056 }
1057 // Include replay status from execution state
1058 info = info.with_replay(self.state.is_replay());
1059 info
1060 }
1061
1062 /// Creates log info with an operation ID.
1063 ///
1064 /// The returned `LogInfo` includes the current replay status from the
1065 /// execution state, allowing loggers to distinguish between fresh
1066 /// executions and replayed operations.
1067 ///
1068 /// # Requirements
1069 ///
1070 /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
1071 pub fn create_log_info_with_operation(&self, operation_id: &str) -> LogInfo {
1072 self.create_log_info().with_operation_id(operation_id)
1073 }
1074
1075 /// Creates log info with explicit replay status.
1076 ///
1077 /// This method allows callers to explicitly set the replay status,
1078 /// which is useful when the operation-specific replay status differs
1079 /// from the global execution state replay status.
1080 ///
1081 /// # Arguments
1082 ///
1083 /// * `operation_id` - The operation ID to include in the log info
1084 /// * `is_replay` - Whether this specific operation is being replayed
1085 ///
1086 /// # Requirements
1087 ///
1088 /// - 16.6: THE Logging_System SHALL support replay-aware logging that can suppress logs during replay
1089 pub fn create_log_info_with_replay(&self, operation_id: &str, is_replay: bool) -> LogInfo {
1090 let mut info = LogInfo::new(self.durable_execution_arn());
1091 if let Some(ref parent_id) = self.parent_id {
1092 info = info.with_parent_id(parent_id);
1093 }
1094 info.with_operation_id(operation_id).with_replay(is_replay)
1095 }
1096
1097 // ========================================================================
1098 // Simplified Logging API
1099 // ========================================================================
1100
1101 /// Logs a message at INFO level with automatic context.
1102 ///
1103 /// This method automatically includes the durable_execution_arn and parent_id
1104 /// in the log output without requiring the caller to specify them.
1105 ///
1106 /// # Arguments
1107 ///
1108 /// * `message` - The message to log
1109 ///
1110 /// # Example
1111 ///
1112 /// ```rust,ignore
1113 /// ctx.log_info("Processing order started");
1114 /// ```
1115 ///
1116 /// # Requirements
1117 ///
1118 /// - 4.1: THE DurableContext SHALL provide a log_info method that logs at INFO level with automatic context
1119 /// - 4.5: THE logging methods SHALL automatically include durable_execution_arn, operation_id, and parent_id
1120 pub fn log_info(&self, message: &str) {
1121 self.log_with_level(LogLevel::Info, message, &[]);
1122 }
1123
1124 /// Logs a message at INFO level with extra fields.
1125 ///
1126 /// This method automatically includes the durable_execution_arn and parent_id
1127 /// in the log output, plus any additional fields specified.
1128 ///
1129 /// # Arguments
1130 ///
1131 /// * `message` - The message to log
1132 /// * `fields` - Additional key-value pairs to include in the log
1133 ///
1134 /// # Example
1135 ///
1136 /// ```rust,ignore
1137 /// ctx.log_info_with("Processing order", &[("order_id", "123"), ("amount", "99.99")]);
1138 /// ```
1139 ///
1140 /// # Requirements
1141 ///
1142 /// - 4.1: THE DurableContext SHALL provide a log_info method that logs at INFO level with automatic context
1143 /// - 4.6: THE logging methods SHALL support additional structured fields via a builder pattern or variadic arguments
1144 pub fn log_info_with(&self, message: &str, fields: &[(&str, &str)]) {
1145 self.log_with_level(LogLevel::Info, message, fields);
1146 }
1147
1148 /// Logs a message at DEBUG level with automatic context.
1149 ///
1150 /// This method automatically includes the durable_execution_arn and parent_id
1151 /// in the log output without requiring the caller to specify them.
1152 ///
1153 /// # Arguments
1154 ///
1155 /// * `message` - The message to log
1156 ///
1157 /// # Example
1158 ///
1159 /// ```rust,ignore
1160 /// ctx.log_debug("Entering validation step");
1161 /// ```
1162 ///
1163 /// # Requirements
1164 ///
1165 /// - 4.2: THE DurableContext SHALL provide a log_debug method that logs at DEBUG level with automatic context
1166 /// - 4.5: THE logging methods SHALL automatically include durable_execution_arn, operation_id, and parent_id
1167 pub fn log_debug(&self, message: &str) {
1168 self.log_with_level(LogLevel::Debug, message, &[]);
1169 }
1170
1171 /// Logs a message at DEBUG level with extra fields.
1172 ///
1173 /// This method automatically includes the durable_execution_arn and parent_id
1174 /// in the log output, plus any additional fields specified.
1175 ///
1176 /// # Arguments
1177 ///
1178 /// * `message` - The message to log
1179 /// * `fields` - Additional key-value pairs to include in the log
1180 ///
1181 /// # Example
1182 ///
1183 /// ```rust,ignore
1184 /// ctx.log_debug_with("Variable state", &[("x", "42"), ("y", "100")]);
1185 /// ```
1186 ///
1187 /// # Requirements
1188 ///
1189 /// - 4.2: THE DurableContext SHALL provide a log_debug method that logs at DEBUG level with automatic context
1190 /// - 4.6: THE logging methods SHALL support additional structured fields via a builder pattern or variadic arguments
1191 pub fn log_debug_with(&self, message: &str, fields: &[(&str, &str)]) {
1192 self.log_with_level(LogLevel::Debug, message, fields);
1193 }
1194
1195 /// Logs a message at WARN level with automatic context.
1196 ///
1197 /// This method automatically includes the durable_execution_arn and parent_id
1198 /// in the log output without requiring the caller to specify them.
1199 ///
1200 /// # Arguments
1201 ///
1202 /// * `message` - The message to log
1203 ///
1204 /// # Example
1205 ///
1206 /// ```rust,ignore
1207 /// ctx.log_warn("Retry attempt 3 of 5");
1208 /// ```
1209 ///
1210 /// # Requirements
1211 ///
1212 /// - 4.3: THE DurableContext SHALL provide a log_warn method that logs at WARN level with automatic context
1213 /// - 4.5: THE logging methods SHALL automatically include durable_execution_arn, operation_id, and parent_id
1214 pub fn log_warn(&self, message: &str) {
1215 self.log_with_level(LogLevel::Warn, message, &[]);
1216 }
1217
1218 /// Logs a message at WARN level with extra fields.
1219 ///
1220 /// This method automatically includes the durable_execution_arn and parent_id
1221 /// in the log output, plus any additional fields specified.
1222 ///
1223 /// # Arguments
1224 ///
1225 /// * `message` - The message to log
1226 /// * `fields` - Additional key-value pairs to include in the log
1227 ///
1228 /// # Example
1229 ///
1230 /// ```rust,ignore
1231 /// ctx.log_warn_with("Rate limit approaching", &[("current", "95"), ("limit", "100")]);
1232 /// ```
1233 ///
1234 /// # Requirements
1235 ///
1236 /// - 4.3: THE DurableContext SHALL provide a log_warn method that logs at WARN level with automatic context
1237 /// - 4.6: THE logging methods SHALL support additional structured fields via a builder pattern or variadic arguments
1238 pub fn log_warn_with(&self, message: &str, fields: &[(&str, &str)]) {
1239 self.log_with_level(LogLevel::Warn, message, fields);
1240 }
1241
1242 /// Logs a message at ERROR level with automatic context.
1243 ///
1244 /// This method automatically includes the durable_execution_arn and parent_id
1245 /// in the log output without requiring the caller to specify them.
1246 ///
1247 /// # Arguments
1248 ///
1249 /// * `message` - The message to log
1250 ///
1251 /// # Example
1252 ///
1253 /// ```rust,ignore
1254 /// ctx.log_error("Failed to process payment");
1255 /// ```
1256 ///
1257 /// # Requirements
1258 ///
1259 /// - 4.4: THE DurableContext SHALL provide a log_error method that logs at ERROR level with automatic context
1260 /// - 4.5: THE logging methods SHALL automatically include durable_execution_arn, operation_id, and parent_id
1261 pub fn log_error(&self, message: &str) {
1262 self.log_with_level(LogLevel::Error, message, &[]);
1263 }
1264
1265 /// Logs a message at ERROR level with extra fields.
1266 ///
1267 /// This method automatically includes the durable_execution_arn and parent_id
1268 /// in the log output, plus any additional fields specified.
1269 ///
1270 /// # Arguments
1271 ///
1272 /// * `message` - The message to log
1273 /// * `fields` - Additional key-value pairs to include in the log
1274 ///
1275 /// # Example
1276 ///
1277 /// ```rust,ignore
1278 /// ctx.log_error_with("Payment failed", &[("error_code", "INSUFFICIENT_FUNDS"), ("amount", "150.00")]);
1279 /// ```
1280 ///
1281 /// # Requirements
1282 ///
1283 /// - 4.4: THE DurableContext SHALL provide a log_error method that logs at ERROR level with automatic context
1284 /// - 4.6: THE logging methods SHALL support additional structured fields via a builder pattern or variadic arguments
1285 pub fn log_error_with(&self, message: &str, fields: &[(&str, &str)]) {
1286 self.log_with_level(LogLevel::Error, message, fields);
1287 }
1288
1289 /// Internal helper method to log at a specific level with optional extra fields.
1290 ///
1291 /// This method creates a LogInfo with automatic context (durable_execution_arn, parent_id)
1292 /// and any additional fields, then delegates to the configured logger.
1293 ///
1294 /// # Arguments
1295 ///
1296 /// * `level` - The log level
1297 /// * `message` - The message to log
1298 /// * `extra` - Additional key-value pairs to include
1299 fn log_with_level(&self, level: LogLevel, message: &str, extra: &[(&str, &str)]) {
1300 let mut log_info = self.create_log_info();
1301
1302 for (key, value) in extra {
1303 log_info = log_info.with_extra(*key, *value);
1304 }
1305
1306 let logger = self.logger.read().unwrap();
1307 match level {
1308 LogLevel::Debug => logger.debug(message, &log_info),
1309 LogLevel::Info => logger.info(message, &log_info),
1310 LogLevel::Warn => logger.warn(message, &log_info),
1311 LogLevel::Error => logger.error(message, &log_info),
1312 }
1313 }
1314
1315 /// Returns the original user input from the EXECUTION operation.
1316 ///
1317 /// This method deserializes the input payload from the EXECUTION operation's
1318 /// ExecutionDetails.InputPayload field into the requested type.
1319 ///
1320 /// # Type Parameters
1321 ///
1322 /// * `T` - The type to deserialize the input into. Must implement `DeserializeOwned`.
1323 ///
1324 /// # Returns
1325 ///
1326 /// `Ok(T)` if the input exists and can be deserialized, or a `DurableError` if:
1327 /// - No EXECUTION operation exists
1328 /// - No input payload is available
1329 /// - Deserialization fails
1330 ///
1331 /// # Example
1332 ///
1333 /// ```rust,ignore
1334 /// #[derive(Deserialize)]
1335 /// struct OrderEvent {
1336 /// order_id: String,
1337 /// amount: f64,
1338 /// }
1339 ///
1340 /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1341 /// // Get the original input that started this execution
1342 /// let event: OrderEvent = ctx.get_original_input()?;
1343 /// println!("Processing order: {}", event.order_id);
1344 /// Ok(())
1345 /// }
1346 /// ```
1347 ///
1348 /// # Requirements
1349 ///
1350 /// - 1.11: THE DurableContext SHALL provide access to the original user input from the EXECUTION operation
1351 /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
1352 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1353 pub fn get_original_input<T>(&self) -> DurableResult<T>
1354 where
1355 T: serde::de::DeserializeOwned,
1356 {
1357 // Get the raw input payload from the execution state
1358 let input_payload = self.state.get_original_input_raw().ok_or_else(|| {
1359 crate::error::DurableError::Validation {
1360 message: "No original input available. The EXECUTION operation may not exist or has no input payload.".to_string(),
1361 }
1362 })?;
1363
1364 // Deserialize the input payload to the requested type
1365 serde_json::from_str(input_payload).map_err(|e| crate::error::DurableError::SerDes {
1366 message: format!("Failed to deserialize original input: {}", e),
1367 })
1368 }
1369
1370 /// Returns the raw original user input as a string, if available.
1371 ///
1372 /// This method returns the raw JSON string from the EXECUTION operation's
1373 /// ExecutionDetails.InputPayload field without deserializing it.
1374 ///
1375 /// # Returns
1376 ///
1377 /// `Some(&str)` if the input exists, `None` otherwise.
1378 ///
1379 /// # Requirements
1380 ///
1381 /// - 19.2: THE EXECUTION_Operation SHALL provide access to original user input from ExecutionDetails.InputPayload
1382 pub fn get_original_input_raw(&self) -> Option<&str> {
1383 self.state.get_original_input_raw()
1384 }
1385
1386 /// Completes the execution with a successful result via checkpointing.
1387 ///
1388 /// This method checkpoints a SUCCEED action on the EXECUTION operation,
1389 /// which is useful for large results that exceed the Lambda response size limit (6MB).
1390 /// After calling this method, the Lambda function should return an empty result.
1391 ///
1392 /// # Arguments
1393 ///
1394 /// * `result` - The result to checkpoint. Must implement `Serialize`.
1395 ///
1396 /// # Returns
1397 ///
1398 /// `Ok(())` on success, or a `DurableError` if:
1399 /// - No EXECUTION operation exists
1400 /// - Serialization fails
1401 /// - The checkpoint fails
1402 ///
1403 /// # Example
1404 ///
1405 /// ```rust,ignore
1406 /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1407 /// let large_result = compute_large_result().await?;
1408 ///
1409 /// // Check if result would exceed Lambda response limit
1410 /// if DurableExecutionInvocationOutput::would_exceed_max_size(&large_result) {
1411 /// // Checkpoint the result instead of returning it
1412 /// ctx.complete_execution_success(&large_result).await?;
1413 /// // Return empty result - the actual result is checkpointed
1414 /// return Ok(());
1415 /// }
1416 ///
1417 /// Ok(())
1418 /// }
1419 /// ```
1420 ///
1421 /// # Requirements
1422 ///
1423 /// - 19.3: THE EXECUTION_Operation SHALL support completing execution via SUCCEED action with result
1424 /// - 19.5: WHEN execution result exceeds response size limits, THE EXECUTION_Operation SHALL checkpoint the result and return empty Result field
1425 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1426 pub async fn complete_execution_success<T>(&self, result: &T) -> DurableResult<()>
1427 where
1428 T: serde::Serialize,
1429 {
1430 let serialized =
1431 serde_json::to_string(result).map_err(|e| crate::error::DurableError::SerDes {
1432 message: format!("Failed to serialize execution result: {}", e),
1433 })?;
1434
1435 self.state
1436 .complete_execution_success(Some(serialized))
1437 .await
1438 }
1439
1440 /// Completes the execution with a failure via checkpointing.
1441 ///
1442 /// This method checkpoints a FAIL action on the EXECUTION operation.
1443 /// After calling this method, the Lambda function should return a FAILED status.
1444 ///
1445 /// # Arguments
1446 ///
1447 /// * `error` - The error details to checkpoint
1448 ///
1449 /// # Returns
1450 ///
1451 /// `Ok(())` on success, or a `DurableError` if:
1452 /// - No EXECUTION operation exists
1453 /// - The checkpoint fails
1454 ///
1455 /// # Example
1456 ///
1457 /// ```rust,ignore
1458 /// async fn my_workflow(ctx: DurableContext) -> Result<(), DurableError> {
1459 /// if let Err(e) = process_order().await {
1460 /// // Checkpoint the failure
1461 /// ctx.complete_execution_failure(ErrorObject::new("ProcessingError", &e.to_string())).await?;
1462 /// return Err(DurableError::execution(&e.to_string()));
1463 /// }
1464 /// Ok(())
1465 /// }
1466 /// ```
1467 ///
1468 /// # Requirements
1469 ///
1470 /// - 19.4: THE EXECUTION_Operation SHALL support completing execution via FAIL action with error
1471 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1472 pub async fn complete_execution_failure(
1473 &self,
1474 error: crate::error::ErrorObject,
1475 ) -> DurableResult<()> {
1476 self.state.complete_execution_failure(error).await
1477 }
1478
1479 /// Completes the execution with a successful result, automatically handling large results.
1480 ///
1481 /// This method checks if the result would exceed the Lambda response size limit (6MB).
1482 /// If so, it checkpoints the result via the EXECUTION operation and returns `true`.
1483 /// If the result fits within the limit, it returns `false` and the caller should
1484 /// return the result normally.
1485 ///
1486 /// # Arguments
1487 ///
1488 /// * `result` - The result to potentially checkpoint. Must implement `Serialize`.
1489 ///
1490 /// # Returns
1491 ///
1492 /// `Ok(true)` if the result was checkpointed (caller should return empty result),
1493 /// `Ok(false)` if the result fits within limits (caller should return it normally),
1494 /// or a `DurableError` if checkpointing fails.
1495 ///
1496 /// # Example
1497 ///
1498 /// ```rust,ignore
1499 /// async fn my_workflow(ctx: DurableContext) -> Result<LargeResult, DurableError> {
1500 /// let result = compute_result().await?;
1501 ///
1502 /// // Automatically handle large results
1503 /// if ctx.complete_execution_if_large(&result).await? {
1504 /// // Result was checkpointed, return a placeholder
1505 /// // The actual result is stored in the EXECUTION operation
1506 /// return Ok(LargeResult::default());
1507 /// }
1508 ///
1509 /// // Result fits within limits, return normally
1510 /// Ok(result)
1511 /// }
1512 /// ```
1513 ///
1514 /// # Requirements
1515 ///
1516 /// - 19.5: WHEN execution result exceeds response size limits, THE EXECUTION_Operation SHALL checkpoint the result and return empty Result field
1517 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1518 pub async fn complete_execution_if_large<T>(&self, result: &T) -> DurableResult<bool>
1519 where
1520 T: serde::Serialize,
1521 {
1522 if crate::lambda::DurableExecutionInvocationOutput::would_exceed_max_size(result) {
1523 self.complete_execution_success(result).await?;
1524 Ok(true)
1525 } else {
1526 Ok(false)
1527 }
1528 }
1529
1530 // ========================================================================
1531 // Durable Operations
1532 // ========================================================================
1533
1534 /// Executes a step operation with automatic checkpointing.
1535 ///
1536 /// Steps are the fundamental unit of work in durable executions. Each step
1537 /// is checkpointed, allowing the workflow to resume from the last completed
1538 /// step after interruptions.
1539 ///
1540 /// # Arguments
1541 ///
1542 /// * `func` - The function to execute
1543 /// * `config` - Optional step configuration (retry strategy, semantics, serdes)
1544 ///
1545 /// # Returns
1546 ///
1547 /// The result of the step function, or an error if execution fails.
1548 ///
1549 /// # Example
1550 ///
1551 /// ```rust,ignore
1552 /// let result: i32 = ctx.step(|_step_ctx| {
1553 /// Ok(42)
1554 /// }, None).await?;
1555 /// ```
1556 ///
1557 /// # Requirements
1558 ///
1559 /// - 1.1: THE DurableContext SHALL provide a `step` method that executes a closure and checkpoints the result
1560 /// - 2.3: WHEN defining public APIs, THE SDK SHALL use trait aliases instead of repeated bound combinations
1561 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1562 pub async fn step<T, F>(
1563 &self,
1564 func: F,
1565 config: Option<crate::config::StepConfig>,
1566 ) -> DurableResult<T>
1567 where
1568 T: DurableValue,
1569 F: StepFn<T>,
1570 {
1571 let op_id = self.next_operation_identifier(None);
1572 let config = config.unwrap_or_default();
1573
1574 let logger = self.logger.read().unwrap().clone();
1575 let result =
1576 crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1577
1578 // Track replay after completion
1579 if result.is_ok() {
1580 self.state.track_replay(&op_id.operation_id).await;
1581 }
1582
1583 result
1584 }
1585
1586 /// Executes a named step operation with automatic checkpointing.
1587 ///
1588 /// Same as `step`, but allows specifying a human-readable name for the operation.
1589 ///
1590 /// # Arguments
1591 ///
1592 /// * `name` - Human-readable name for the step
1593 /// * `func` - The function to execute
1594 /// * `config` - Optional step configuration
1595 ///
1596 /// # Example
1597 ///
1598 /// ```rust,ignore
1599 /// let result: i32 = ctx.step_named("validate_input", |_step_ctx| {
1600 /// Ok(42)
1601 /// }, None).await?;
1602 /// ```
1603 ///
1604 /// # Requirements
1605 ///
1606 /// - 2.3: WHEN defining public APIs, THE SDK SHALL use trait aliases instead of repeated bound combinations
1607 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1608 pub async fn step_named<T, F>(
1609 &self,
1610 name: &str,
1611 func: F,
1612 config: Option<crate::config::StepConfig>,
1613 ) -> DurableResult<T>
1614 where
1615 T: DurableValue,
1616 F: StepFn<T>,
1617 {
1618 let op_id = self.next_operation_identifier(Some(name.to_string()));
1619 let config = config.unwrap_or_default();
1620
1621 let logger = self.logger.read().unwrap().clone();
1622 let result =
1623 crate::handlers::step_handler(func, &self.state, &op_id, &config, &logger).await;
1624
1625 // Track replay after completion
1626 if result.is_ok() {
1627 self.state.track_replay(&op_id.operation_id).await;
1628 }
1629
1630 result
1631 }
1632
1633 /// Pauses execution for a specified duration.
1634 ///
1635 /// Wait operations suspend the Lambda execution and resume after the
1636 /// specified duration has elapsed. This is efficient because it doesn't
1637 /// block Lambda resources during the wait.
1638 ///
1639 /// # Arguments
1640 ///
1641 /// * `duration` - The duration to wait (must be at least 1 second)
1642 /// * `name` - Optional human-readable name for the operation
1643 ///
1644 /// # Returns
1645 ///
1646 /// Ok(()) when the wait has elapsed, or an error if validation fails.
1647 ///
1648 /// # Example
1649 ///
1650 /// ```rust,ignore
1651 /// // Wait for 5 seconds
1652 /// ctx.wait(Duration::from_seconds(5), None).await?;
1653 ///
1654 /// // Wait with a name
1655 /// ctx.wait(Duration::from_minutes(1), Some("wait_for_processing")).await?;
1656 /// ```
1657 ///
1658 /// # Requirements
1659 ///
1660 /// - 1.2: THE DurableContext SHALL provide a `wait` method that pauses execution for a specified Duration
1661 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1662 pub async fn wait(
1663 &self,
1664 duration: crate::duration::Duration,
1665 name: Option<&str>,
1666 ) -> DurableResult<()> {
1667 let op_id = self.next_operation_identifier(name.map(|s| s.to_string()));
1668
1669 let logger = self.logger.read().unwrap().clone();
1670 let result = crate::handlers::wait_handler(duration, &self.state, &op_id, &logger).await;
1671
1672 // Track replay after completion (only if not suspended)
1673 if result.is_ok() {
1674 self.state.track_replay(&op_id.operation_id).await;
1675 }
1676
1677 result
1678 }
1679
1680 /// Cancels an active wait operation.
1681 ///
1682 /// This method allows cancelling a wait operation that was previously started.
1683 /// If the wait has already completed (succeeded, failed, or timed out), this
1684 /// method will return Ok(()) without making any changes.
1685 ///
1686 /// # Arguments
1687 ///
1688 /// * `operation_id` - The operation ID of the wait to cancel
1689 ///
1690 /// # Returns
1691 ///
1692 /// Ok(()) if the wait was cancelled or was already completed, or an error if:
1693 /// - The operation doesn't exist
1694 /// - The operation is not a WAIT operation
1695 /// - The checkpoint fails
1696 ///
1697 /// # Example
1698 ///
1699 /// ```rust,ignore
1700 /// // Start a wait in a parallel branch
1701 /// let wait_op_id = ctx.next_operation_id();
1702 ///
1703 /// // Later, cancel the wait from another branch
1704 /// ctx.cancel_wait(&wait_op_id).await?;
1705 /// ```
1706 ///
1707 /// # Requirements
1708 ///
1709 /// - 5.5: THE Wait_Operation SHALL support cancellation of active waits via CANCEL action
1710 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1711 pub async fn cancel_wait(&self, operation_id: &str) -> DurableResult<()> {
1712 let logger = self.logger.read().unwrap().clone();
1713 crate::handlers::wait_cancel_handler(&self.state, operation_id, &logger).await
1714 }
1715
1716 /// Creates a callback and returns a handle to wait for the result.
1717 ///
1718 /// Callbacks allow external systems to signal completion of asynchronous
1719 /// operations. The callback ID can be shared with external systems, which
1720 /// can then call the Lambda durable execution callback API.
1721 ///
1722 /// # Arguments
1723 ///
1724 /// * `config` - Optional callback configuration (timeout, heartbeat)
1725 ///
1726 /// # Returns
1727 ///
1728 /// A `Callback<T>` handle that can be used to wait for the result.
1729 ///
1730 /// # Example
1731 ///
1732 /// ```rust,ignore
1733 /// let callback = ctx.create_callback::<ApprovalResponse>(None).await?;
1734 ///
1735 /// // Share callback.callback_id with external system
1736 /// notify_approver(&callback.callback_id).await?;
1737 ///
1738 /// // Wait for the callback result
1739 /// let approval = callback.result().await?;
1740 /// ```
1741 ///
1742 /// # Requirements
1743 ///
1744 /// - 1.3: THE DurableContext SHALL provide a `create_callback` method that returns a Callback with a unique callback_id
1745 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1746 pub async fn create_callback<T>(
1747 &self,
1748 config: Option<crate::config::CallbackConfig>,
1749 ) -> DurableResult<crate::handlers::Callback<T>>
1750 where
1751 T: serde::Serialize + serde::de::DeserializeOwned,
1752 {
1753 let op_id = self.next_operation_identifier(None);
1754 let config = config.unwrap_or_default();
1755
1756 let logger = self.logger.read().unwrap().clone();
1757 let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1758
1759 // Track replay after completion
1760 if result.is_ok() {
1761 self.state.track_replay(&op_id.operation_id).await;
1762 }
1763
1764 result
1765 }
1766
1767 /// Creates a named callback and returns a handle to wait for the result.
1768 ///
1769 /// Same as `create_callback`, but allows specifying a human-readable name.
1770 ///
1771 /// # Requirements
1772 ///
1773 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1774 pub async fn create_callback_named<T>(
1775 &self,
1776 name: &str,
1777 config: Option<crate::config::CallbackConfig>,
1778 ) -> DurableResult<crate::handlers::Callback<T>>
1779 where
1780 T: serde::Serialize + serde::de::DeserializeOwned,
1781 {
1782 let op_id = self.next_operation_identifier(Some(name.to_string()));
1783 let config = config.unwrap_or_default();
1784
1785 let logger = self.logger.read().unwrap().clone();
1786 let result = crate::handlers::callback_handler(&self.state, &op_id, &config, &logger).await;
1787
1788 // Track replay after completion
1789 if result.is_ok() {
1790 self.state.track_replay(&op_id.operation_id).await;
1791 }
1792
1793 result
1794 }
1795
1796 /// Invokes another durable Lambda function.
1797 ///
1798 /// This method calls another Lambda function and waits for its result.
1799 /// The invocation is checkpointed, so if the workflow is interrupted,
1800 /// it will resume with the result of the invocation.
1801 ///
1802 /// # Arguments
1803 ///
1804 /// * `function_name` - The name or ARN of the Lambda function to invoke
1805 /// * `payload` - The payload to send to the function
1806 /// * `config` - Optional invoke configuration (timeout, serdes)
1807 ///
1808 /// # Returns
1809 ///
1810 /// The result from the invoked function, or an error if invocation fails.
1811 ///
1812 /// # Example
1813 ///
1814 /// ```rust,ignore
1815 /// let result: ProcessingResult = ctx.invoke(
1816 /// "process-order-function",
1817 /// OrderPayload { order_id: "123".to_string() },
1818 /// None,
1819 /// ).await?;
1820 /// ```
1821 ///
1822 /// # Requirements
1823 ///
1824 /// - 1.4: THE DurableContext SHALL provide an `invoke` method that calls other durable functions
1825 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1826 pub async fn invoke<P, R>(
1827 &self,
1828 function_name: &str,
1829 payload: P,
1830 config: Option<crate::config::InvokeConfig<P, R>>,
1831 ) -> DurableResult<R>
1832 where
1833 P: serde::Serialize + serde::de::DeserializeOwned + Send,
1834 R: serde::Serialize + serde::de::DeserializeOwned + Send,
1835 {
1836 let op_id = self.next_operation_identifier(Some(format!("invoke:{}", function_name)));
1837 let config = config.unwrap_or_default();
1838
1839 let logger = self.logger.read().unwrap().clone();
1840 let result = crate::handlers::invoke_handler(
1841 function_name,
1842 payload,
1843 &self.state,
1844 &op_id,
1845 &config,
1846 &logger,
1847 )
1848 .await;
1849
1850 // Track replay after completion
1851 if result.is_ok() {
1852 self.state.track_replay(&op_id.operation_id).await;
1853 }
1854
1855 result
1856 }
1857
1858 /// Processes a collection in parallel with configurable concurrency.
1859 ///
1860 /// Map operations execute a function for each item in the collection,
1861 /// with configurable concurrency limits and failure tolerance.
1862 ///
1863 /// # Arguments
1864 ///
1865 /// * `items` - The collection of items to process
1866 /// * `func` - The function to apply to each item
1867 /// * `config` - Optional map configuration (concurrency, completion criteria)
1868 ///
1869 /// # Returns
1870 ///
1871 /// A `BatchResult<U>` containing results for all items.
1872 ///
1873 /// # Example
1874 ///
1875 /// ```rust,ignore
1876 /// let results = ctx.map(
1877 /// vec![1, 2, 3, 4, 5],
1878 /// |child_ctx, item, index| async move {
1879 /// Ok(item * 2)
1880 /// },
1881 /// Some(MapConfig {
1882 /// max_concurrency: Some(3),
1883 /// ..Default::default()
1884 /// }),
1885 /// ).await?;
1886 /// ```
1887 ///
1888 /// # Requirements
1889 ///
1890 /// - 1.5: THE DurableContext SHALL provide a `map` method that processes a collection in parallel with configurable concurrency
1891 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1892 pub async fn map<T, U, F, Fut>(
1893 &self,
1894 items: Vec<T>,
1895 func: F,
1896 config: Option<crate::config::MapConfig>,
1897 ) -> DurableResult<crate::concurrency::BatchResult<U>>
1898 where
1899 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + Clone + 'static,
1900 U: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1901 F: Fn(DurableContext, T, usize) -> Fut + Send + Sync + Clone + 'static,
1902 Fut: std::future::Future<Output = DurableResult<U>> + Send + 'static,
1903 {
1904 let op_id = self.next_operation_identifier(Some("map".to_string()));
1905 let config = config.unwrap_or_default();
1906
1907 let logger = self.logger.read().unwrap().clone();
1908 let result =
1909 crate::handlers::map_handler(items, func, &self.state, &op_id, self, &config, &logger)
1910 .await;
1911
1912 // Track replay after completion
1913 if result.is_ok() {
1914 self.state.track_replay(&op_id.operation_id).await;
1915 }
1916
1917 result
1918 }
1919
1920 /// Executes multiple operations in parallel.
1921 ///
1922 /// Parallel operations execute multiple independent functions concurrently,
1923 /// with configurable concurrency limits and completion criteria.
1924 ///
1925 /// # Arguments
1926 ///
1927 /// * `branches` - The list of functions to execute in parallel
1928 /// * `config` - Optional parallel configuration (concurrency, completion criteria)
1929 ///
1930 /// # Returns
1931 ///
1932 /// A `BatchResult<T>` containing results for all branches.
1933 ///
1934 /// # Example
1935 ///
1936 /// ```rust,ignore
1937 /// let results = ctx.parallel(
1938 /// vec![
1939 /// |ctx| async move { Ok(fetch_data_a(&ctx).await?) },
1940 /// |ctx| async move { Ok(fetch_data_b(&ctx).await?) },
1941 /// |ctx| async move { Ok(fetch_data_c(&ctx).await?) },
1942 /// ],
1943 /// None,
1944 /// ).await?;
1945 /// ```
1946 ///
1947 /// # Requirements
1948 ///
1949 /// - 1.6: THE DurableContext SHALL provide a `parallel` method that executes multiple closures concurrently
1950 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
1951 pub async fn parallel<T, F, Fut>(
1952 &self,
1953 branches: Vec<F>,
1954 config: Option<crate::config::ParallelConfig>,
1955 ) -> DurableResult<crate::concurrency::BatchResult<T>>
1956 where
1957 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
1958 F: FnOnce(DurableContext) -> Fut + Send + 'static,
1959 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
1960 {
1961 let op_id = self.next_operation_identifier(Some("parallel".to_string()));
1962 let config = config.unwrap_or_default();
1963
1964 let logger = self.logger.read().unwrap().clone();
1965 let result = crate::handlers::parallel_handler(
1966 branches,
1967 &self.state,
1968 &op_id,
1969 self,
1970 &config,
1971 &logger,
1972 )
1973 .await;
1974
1975 // Track replay after completion
1976 if result.is_ok() {
1977 self.state.track_replay(&op_id.operation_id).await;
1978 }
1979
1980 result
1981 }
1982
1983 /// Executes a function in a child context.
1984 ///
1985 /// Child contexts provide isolation for nested workflows. Operations in
1986 /// a child context are tracked separately and can be checkpointed as a unit.
1987 ///
1988 /// # Arguments
1989 ///
1990 /// * `func` - The function to execute in the child context
1991 /// * `config` - Optional child context configuration
1992 ///
1993 /// # Returns
1994 ///
1995 /// The result of the child function, or an error if execution fails.
1996 ///
1997 /// # Example
1998 ///
1999 /// ```rust,ignore
2000 /// let result = ctx.run_in_child_context(|child_ctx| async move {
2001 /// let step1 = child_ctx.step(|_| Ok(1), None).await?;
2002 /// let step2 = child_ctx.step(|_| Ok(2), None).await?;
2003 /// Ok(step1 + step2)
2004 /// }, None).await?;
2005 /// ```
2006 ///
2007 /// # Requirements
2008 ///
2009 /// - 1.7: THE DurableContext SHALL provide a `run_in_child_context` method that creates isolated nested workflows
2010 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2011 pub async fn run_in_child_context<T, F, Fut>(
2012 &self,
2013 func: F,
2014 config: Option<crate::config::ChildConfig>,
2015 ) -> DurableResult<T>
2016 where
2017 T: serde::Serialize + serde::de::DeserializeOwned + Send,
2018 F: FnOnce(DurableContext) -> Fut + Send,
2019 Fut: std::future::Future<Output = DurableResult<T>> + Send,
2020 {
2021 let op_id = self.next_operation_identifier(Some("child_context".to_string()));
2022 let config = config.unwrap_or_default();
2023
2024 let logger = self.logger.read().unwrap().clone();
2025 let result =
2026 crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
2027
2028 // Track replay after completion
2029 if result.is_ok() {
2030 self.state.track_replay(&op_id.operation_id).await;
2031 }
2032
2033 result
2034 }
2035
2036 /// Executes a named function in a child context.
2037 ///
2038 /// Same as `run_in_child_context`, but allows specifying a human-readable name.
2039 ///
2040 /// # Requirements
2041 ///
2042 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2043 pub async fn run_in_child_context_named<T, F, Fut>(
2044 &self,
2045 name: &str,
2046 func: F,
2047 config: Option<crate::config::ChildConfig>,
2048 ) -> DurableResult<T>
2049 where
2050 T: serde::Serialize + serde::de::DeserializeOwned + Send,
2051 F: FnOnce(DurableContext) -> Fut + Send,
2052 Fut: std::future::Future<Output = DurableResult<T>> + Send,
2053 {
2054 let op_id = self.next_operation_identifier(Some(name.to_string()));
2055 let config = config.unwrap_or_default();
2056
2057 let logger = self.logger.read().unwrap().clone();
2058 let result =
2059 crate::handlers::child_handler(func, &self.state, &op_id, self, &config, &logger).await;
2060
2061 // Track replay after completion
2062 if result.is_ok() {
2063 self.state.track_replay(&op_id.operation_id).await;
2064 }
2065
2066 result
2067 }
2068
2069 /// Polls until a condition is met.
2070 ///
2071 /// This method repeatedly checks a condition until it returns a successful
2072 /// result. Between checks, it waits for a configurable duration using the
2073 /// RETRY mechanism with NextAttemptDelaySeconds.
2074 ///
2075 /// # Implementation
2076 ///
2077 /// This method is implemented as a single STEP operation with RETRY mechanism,
2078 /// which is more efficient than using multiple steps and waits. The state is
2079 /// passed as Payload on retry (not Error), and the attempt number is tracked
2080 /// in StepDetails.Attempt.
2081 ///
2082 /// # Arguments
2083 ///
2084 /// * `check` - The function to check the condition
2085 /// * `config` - Configuration for the wait (interval, max attempts, timeout)
2086 ///
2087 /// # Returns
2088 ///
2089 /// The result when the condition is met, or an error if timeout/max attempts exceeded.
2090 ///
2091 /// # Example
2092 ///
2093 /// ```rust,ignore
2094 /// let result = ctx.wait_for_condition(
2095 /// |state, ctx| {
2096 /// // Check if order is ready
2097 /// let status = check_order_status(&state.order_id)?;
2098 /// if status == "ready" {
2099 /// Ok(OrderReady { order_id: state.order_id.clone() })
2100 /// } else {
2101 /// Err("Order not ready yet".into())
2102 /// }
2103 /// },
2104 /// WaitForConditionConfig::from_interval(
2105 /// OrderState { order_id: "123".to_string() },
2106 /// Duration::from_seconds(5),
2107 /// Some(10),
2108 /// ),
2109 /// ).await?;
2110 /// ```
2111 ///
2112 /// # Requirements
2113 ///
2114 /// - 1.8: THE DurableContext SHALL provide a `wait_for_condition` method that polls until a condition is met
2115 /// - 4.9: THE Step_Operation SHALL support RETRY action with Payload for wait-for-condition pattern
2116 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2117 pub async fn wait_for_condition<T, S, F>(
2118 &self,
2119 check: F,
2120 config: WaitForConditionConfig<S>,
2121 ) -> DurableResult<T>
2122 where
2123 T: serde::Serialize + serde::de::DeserializeOwned + Send,
2124 S: serde::Serialize + serde::de::DeserializeOwned + Clone + Send + Sync,
2125 F: Fn(&S, &WaitForConditionContext) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
2126 + Send
2127 + Sync,
2128 {
2129 let op_id = self.next_operation_identifier(Some("wait_for_condition".to_string()));
2130
2131 let logger = self.logger.read().unwrap().clone();
2132 // Use the new wait_for_condition_handler which implements the single STEP with RETRY pattern
2133 // This is more efficient than the previous child context + multiple steps approach
2134 let result = crate::handlers::wait_for_condition_handler(
2135 check,
2136 config,
2137 &self.state,
2138 &op_id,
2139 &logger,
2140 )
2141 .await;
2142
2143 // Track replay after completion (only if not suspended)
2144 if result.is_ok() {
2145 self.state.track_replay(&op_id.operation_id).await;
2146 }
2147
2148 result
2149 }
2150
2151 /// Creates a callback and waits for the result with a submitter function.
2152 ///
2153 /// This is a convenience method that combines callback creation with
2154 /// a submitter function that sends the callback ID to an external system.
2155 /// The submitter execution is checkpointed within a child context to ensure
2156 /// replay safety - the submitter will not be re-executed during replay.
2157 ///
2158 /// # Arguments
2159 ///
2160 /// * `submitter` - Function that receives the callback ID and submits it to external system
2161 /// * `config` - Optional callback configuration (timeout, heartbeat)
2162 ///
2163 /// # Returns
2164 ///
2165 /// The callback result from the external system.
2166 ///
2167 /// # Example
2168 ///
2169 /// ```rust,ignore
2170 /// let approval = ctx.wait_for_callback(
2171 /// |callback_id| async move {
2172 /// // Send callback ID to approval system
2173 /// send_approval_request(&callback_id, &request).await
2174 /// },
2175 /// Some(CallbackConfig {
2176 /// timeout: Duration::from_hours(24),
2177 /// ..Default::default()
2178 /// }),
2179 /// ).await?;
2180 /// ```
2181 ///
2182 /// # Requirements
2183 ///
2184 /// - 1.1: WHEN wait_for_callback is called, THE SDK SHALL create a callback and execute the submitter function within a durable step
2185 /// - 1.2: THE wait_for_callback method SHALL checkpoint the submitter execution to ensure replay safety
2186 /// - 1.3: IF the submitter function fails, THEN THE SDK SHALL propagate the error with appropriate context
2187 /// - 1.4: THE wait_for_callback method SHALL support configurable callback timeout and heartbeat timeout
2188 /// - 1.9: THE DurableContext SHALL provide a `wait_for_callback` method that combines callback creation with a submitter function
2189 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2190 pub async fn wait_for_callback<T, F, Fut>(
2191 &self,
2192 submitter: F,
2193 config: Option<crate::config::CallbackConfig>,
2194 ) -> DurableResult<T>
2195 where
2196 T: serde::Serialize + serde::de::DeserializeOwned + Send + Sync,
2197 F: FnOnce(String) -> Fut + Send + 'static,
2198 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
2199 + Send
2200 + 'static,
2201 {
2202 // Create the callback first with the provided configuration (Requirement 1.4)
2203 let callback: crate::handlers::Callback<T> = self.create_callback(config).await?;
2204 let callback_id = callback.callback_id.clone();
2205
2206 // Generate operation ID for the child context that will execute the submitter
2207 let op_id = self.next_operation_identifier(Some("wait_for_callback_submitter".to_string()));
2208
2209 // Execute the submitter within a child context for proper checkpointing (Requirements 1.1, 1.2)
2210 // The child context ensures the submitter execution is tracked and not re-executed during replay
2211 let child_config = crate::config::ChildConfig::default();
2212
2213 let logger = self.logger.read().unwrap().clone();
2214 crate::handlers::child_handler(
2215 |child_ctx| {
2216 let callback_id = callback_id.clone();
2217 async move {
2218 // Use a step to checkpoint that we're about to execute the submitter
2219 // This step returns () on success, indicating submission completed
2220 child_ctx
2221 .step_named(
2222 "execute_submitter",
2223 move |_| {
2224 // The step just marks that we're executing the submitter
2225 // The actual async submitter call happens after this checkpoint
2226 Ok(())
2227 },
2228 None,
2229 )
2230 .await?;
2231
2232 // Now execute the actual submitter
2233 // If we're replaying and the step above succeeded, we won't reach here
2234 // because the child context will return the checkpointed result
2235 submitter(callback_id).await.map_err(|e| {
2236 crate::error::DurableError::UserCode {
2237 message: e.to_string(),
2238 error_type: "SubmitterError".to_string(),
2239 stack_trace: None,
2240 }
2241 })?;
2242
2243 Ok(())
2244 }
2245 },
2246 &self.state,
2247 &op_id,
2248 self,
2249 &child_config,
2250 &logger,
2251 )
2252 .await?;
2253
2254 // Track replay after child context completion
2255 self.state.track_replay(&op_id.operation_id).await;
2256
2257 // Wait for the callback result
2258 callback.result().await
2259 }
2260
2261 // ========================================================================
2262 // Promise Combinators
2263 // ========================================================================
2264
2265 /// Waits for all futures to complete successfully.
2266 ///
2267 /// Returns all results if all futures succeed, or returns the first error encountered.
2268 /// This is implemented within a STEP operation for durability.
2269 ///
2270 /// # Arguments
2271 ///
2272 /// * `futures` - Vector of futures to execute
2273 ///
2274 /// # Returns
2275 ///
2276 /// `Ok(Vec<T>)` if all futures succeed, or `Err` with the first error.
2277 ///
2278 /// # Example
2279 ///
2280 /// ```rust,ignore
2281 /// let results = ctx.all(vec![
2282 /// ctx.step(|_| Ok(1), None),
2283 /// ctx.step(|_| Ok(2), None),
2284 /// ctx.step(|_| Ok(3), None),
2285 /// ]).await?;
2286 /// assert_eq!(results, vec![1, 2, 3]);
2287 /// ```
2288 ///
2289 /// # Requirements
2290 ///
2291 /// - 20.1: Wait for all promises to complete successfully, return error on first failure
2292 /// - 20.5: Implement within a STEP operation for durability
2293 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2294 pub async fn all<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<Vec<T>>
2295 where
2296 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2297 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2298 {
2299 let op_id = self.next_operation_identifier(Some("all".to_string()));
2300
2301 let logger = self.logger.read().unwrap().clone();
2302 let result = crate::handlers::all_handler(futures, &self.state, &op_id, &logger).await;
2303
2304 // Track replay after completion
2305 if result.is_ok() {
2306 self.state.track_replay(&op_id.operation_id).await;
2307 }
2308
2309 result
2310 }
2311
2312 /// Waits for all futures to settle (success or failure).
2313 ///
2314 /// Returns a BatchResult containing outcomes for all futures, regardless of success or failure.
2315 /// This is implemented within a STEP operation for durability.
2316 ///
2317 /// # Arguments
2318 ///
2319 /// * `futures` - Vector of futures to execute
2320 ///
2321 /// # Returns
2322 ///
2323 /// `BatchResult<T>` containing results for all futures.
2324 ///
2325 /// # Example
2326 ///
2327 /// ```rust,ignore
2328 /// let results = ctx.all_settled(vec![
2329 /// ctx.step(|_| Ok(1), None),
2330 /// ctx.step(|_| Err("error".into()), None),
2331 /// ctx.step(|_| Ok(3), None),
2332 /// ]).await?;
2333 /// assert_eq!(results.success_count(), 2);
2334 /// assert_eq!(results.failure_count(), 1);
2335 /// ```
2336 ///
2337 /// # Requirements
2338 ///
2339 /// - 20.2: Wait for all promises to settle, return BatchResult with all outcomes
2340 /// - 20.5: Implement within a STEP operation for durability
2341 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2342 pub async fn all_settled<T, Fut>(
2343 &self,
2344 futures: Vec<Fut>,
2345 ) -> DurableResult<crate::concurrency::BatchResult<T>>
2346 where
2347 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2348 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2349 {
2350 let op_id = self.next_operation_identifier(Some("all_settled".to_string()));
2351
2352 let logger = self.logger.read().unwrap().clone();
2353 let result =
2354 crate::handlers::all_settled_handler(futures, &self.state, &op_id, &logger).await;
2355
2356 // Track replay after completion
2357 if result.is_ok() {
2358 self.state.track_replay(&op_id.operation_id).await;
2359 }
2360
2361 result
2362 }
2363
2364 /// Returns the result of the first future to settle.
2365 ///
2366 /// Returns the result (success or failure) of whichever future completes first.
2367 /// This is implemented within a STEP operation for durability.
2368 ///
2369 /// # Arguments
2370 ///
2371 /// * `futures` - Vector of futures to execute
2372 ///
2373 /// # Returns
2374 ///
2375 /// The result of the first future to settle.
2376 ///
2377 /// # Example
2378 ///
2379 /// ```rust,ignore
2380 /// let result = ctx.race(vec![
2381 /// ctx.step(|_| Ok(1), None),
2382 /// ctx.step(|_| Ok(2), None),
2383 /// ]).await?;
2384 /// // result is either 1 or 2, whichever completed first
2385 /// ```
2386 ///
2387 /// # Requirements
2388 ///
2389 /// - 20.3: Return result of first promise to settle
2390 /// - 20.5: Implement within a STEP operation for durability
2391 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2392 pub async fn race<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2393 where
2394 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2395 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2396 {
2397 let op_id = self.next_operation_identifier(Some("race".to_string()));
2398
2399 let logger = self.logger.read().unwrap().clone();
2400 let result = crate::handlers::race_handler(futures, &self.state, &op_id, &logger).await;
2401
2402 // Track replay after completion
2403 if result.is_ok() {
2404 self.state.track_replay(&op_id.operation_id).await;
2405 }
2406
2407 result
2408 }
2409
2410 /// Returns the result of the first future to succeed.
2411 ///
2412 /// Returns the result of the first future to succeed. If all futures fail,
2413 /// returns an error containing all the failures.
2414 /// This is implemented within a STEP operation for durability.
2415 ///
2416 /// # Arguments
2417 ///
2418 /// * `futures` - Vector of futures to execute
2419 ///
2420 /// # Returns
2421 ///
2422 /// The result of the first future to succeed, or an error if all fail.
2423 ///
2424 /// # Example
2425 ///
2426 /// ```rust,ignore
2427 /// let result = ctx.any(vec![
2428 /// ctx.step(|_| Err("error".into()), None),
2429 /// ctx.step(|_| Ok(2), None),
2430 /// ctx.step(|_| Ok(3), None),
2431 /// ]).await?;
2432 /// // result is either 2 or 3, whichever succeeded first
2433 /// ```
2434 ///
2435 /// # Requirements
2436 ///
2437 /// - 20.4: Return result of first promise to succeed, return error only if all fail
2438 /// - 20.5: Implement within a STEP operation for durability
2439 /// - 5.1: THE SDK SHALL provide a `DurableResult<T>` type alias for `Result<T, DurableError>`
2440 pub async fn any<T, Fut>(&self, futures: Vec<Fut>) -> DurableResult<T>
2441 where
2442 T: serde::Serialize + serde::de::DeserializeOwned + Send + Clone + 'static,
2443 Fut: std::future::Future<Output = DurableResult<T>> + Send + 'static,
2444 {
2445 let op_id = self.next_operation_identifier(Some("any".to_string()));
2446
2447 let logger = self.logger.read().unwrap().clone();
2448 let result = crate::handlers::any_handler(futures, &self.state, &op_id, &logger).await;
2449
2450 // Track replay after completion
2451 if result.is_ok() {
2452 self.state.track_replay(&op_id.operation_id).await;
2453 }
2454
2455 result
2456 }
2457}
2458
2459/// Configuration for wait_for_condition operations.
2460///
2461/// # Requirements
2462///
2463/// - 4.7: wait_for_condition uses wait_strategy to determine polling delay
2464/// - 4.8: Backward-compatible constructor converts interval + max_attempts to a WaitStrategy
2465#[allow(clippy::type_complexity)]
2466pub struct WaitForConditionConfig<S> {
2467 /// Initial state to pass to the check function.
2468 pub initial_state: S,
2469 /// Wait strategy function that determines polling behavior.
2470 ///
2471 /// Takes a reference to the current state and the attempt number (1-indexed),
2472 /// and returns a [`WaitDecision`](crate::config::WaitDecision).
2473 pub wait_strategy: Box<dyn Fn(&S, usize) -> crate::config::WaitDecision + Send + Sync>,
2474 /// Overall timeout for the operation.
2475 pub timeout: Option<crate::duration::Duration>,
2476 /// Optional custom serializer/deserializer.
2477 pub serdes: Option<std::sync::Arc<dyn crate::config::SerDesAny>>,
2478}
2479
2480impl<S> std::fmt::Debug for WaitForConditionConfig<S>
2481where
2482 S: std::fmt::Debug,
2483{
2484 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2485 f.debug_struct("WaitForConditionConfig")
2486 .field("initial_state", &self.initial_state)
2487 .field("wait_strategy", &"<fn>")
2488 .field("timeout", &self.timeout)
2489 .field("serdes", &self.serdes.is_some())
2490 .finish()
2491 }
2492}
2493
2494impl<S> WaitForConditionConfig<S> {
2495 /// Creates a backward-compatible `WaitForConditionConfig` from interval and max_attempts.
2496 ///
2497 /// This constructor converts the old-style `interval` + `max_attempts` parameters
2498 /// into a wait strategy that uses a fixed delay (no backoff, no jitter).
2499 ///
2500 /// # Arguments
2501 ///
2502 /// * `initial_state` - Initial state to pass to the check function.
2503 /// * `interval` - Fixed interval between condition checks.
2504 /// * `max_attempts` - Maximum number of attempts (`None` for unlimited).
2505 ///
2506 /// # Example
2507 ///
2508 /// ```rust,ignore
2509 /// use durable_execution_sdk::{WaitForConditionConfig, Duration};
2510 ///
2511 /// let config = WaitForConditionConfig::from_interval(
2512 /// my_initial_state,
2513 /// Duration::from_seconds(5),
2514 /// Some(10),
2515 /// );
2516 /// ```
2517 ///
2518 /// # Requirements
2519 ///
2520 /// - 4.8: Backward-compatible constructor converts interval + max_attempts to a WaitStrategy
2521 pub fn from_interval(
2522 initial_state: S,
2523 interval: crate::duration::Duration,
2524 max_attempts: Option<usize>,
2525 ) -> Self
2526 where
2527 S: Send + Sync + 'static,
2528 {
2529 let interval_secs = interval.to_seconds();
2530 let max = max_attempts.unwrap_or(usize::MAX);
2531
2532 Self {
2533 initial_state,
2534 wait_strategy: Box::new(move |_state: &S, attempts_made: usize| {
2535 // In the backward-compatible mode, the check function's Ok/Err
2536 // determines whether polling is done. The strategy only provides
2537 // the delay and enforces max_attempts.
2538 // Return Done when max_attempts exceeded (handler will checkpoint failure
2539 // if check returned Err, or success if check returned Ok).
2540 if attempts_made >= max {
2541 return crate::config::WaitDecision::Done;
2542 }
2543 crate::config::WaitDecision::Continue {
2544 delay: crate::duration::Duration::from_seconds(interval_secs),
2545 }
2546 }),
2547 timeout: None,
2548 serdes: None,
2549 }
2550 }
2551}
2552
2553impl<S: Default + Send + Sync + 'static> Default for WaitForConditionConfig<S> {
2554 fn default() -> Self {
2555 Self::from_interval(
2556 S::default(),
2557 crate::duration::Duration::from_seconds(5),
2558 None,
2559 )
2560 }
2561}
2562
2563/// Context provided to wait_for_condition check functions.
2564#[derive(Debug, Clone)]
2565pub struct WaitForConditionContext {
2566 /// Current attempt number (1-indexed).
2567 pub attempt: usize,
2568 /// Maximum number of attempts (None for unlimited).
2569 pub max_attempts: Option<usize>,
2570}
2571
2572impl Clone for DurableContext {
2573 fn clone(&self) -> Self {
2574 Self {
2575 state: self.state.clone(),
2576 lambda_context: self.lambda_context.clone(),
2577 parent_id: self.parent_id.clone(),
2578 id_generator: self.id_generator.clone(),
2579 logger: self.logger.clone(),
2580 }
2581 }
2582}
2583
2584impl std::fmt::Debug for DurableContext {
2585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2586 f.debug_struct("DurableContext")
2587 .field("durable_execution_arn", &self.durable_execution_arn())
2588 .field("parent_id", &self.parent_id)
2589 .field("step_counter", &self.current_step_counter())
2590 .finish_non_exhaustive()
2591 }
2592}
2593
2594// Add hex encoding dependency
2595mod hex {
2596 const HEX_CHARS: &[u8; 16] = b"0123456789abcdef";
2597
2598 pub fn encode(bytes: &[u8]) -> String {
2599 let mut result = String::with_capacity(bytes.len() * 2);
2600 for &byte in bytes {
2601 result.push(HEX_CHARS[(byte >> 4) as usize] as char);
2602 result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
2603 }
2604 result
2605 }
2606}
2607
2608#[cfg(test)]
2609mod tests {
2610 use super::*;
2611
2612 #[test]
2613 fn test_operation_identifier_new() {
2614 let id = OperationIdentifier::new(
2615 "op-123",
2616 Some("parent-456".to_string()),
2617 Some("my-step".to_string()),
2618 );
2619 assert_eq!(id.operation_id, "op-123");
2620 assert_eq!(id.parent_id, Some("parent-456".to_string()));
2621 assert_eq!(id.name, Some("my-step".to_string()));
2622 }
2623
2624 #[test]
2625 fn test_operation_identifier_root() {
2626 let id = OperationIdentifier::root("op-123");
2627 assert_eq!(id.operation_id, "op-123");
2628 assert!(id.parent_id.is_none());
2629 assert!(id.name.is_none());
2630 }
2631
2632 #[test]
2633 fn test_operation_identifier_with_parent() {
2634 let id = OperationIdentifier::with_parent("op-123", "parent-456");
2635 assert_eq!(id.operation_id, "op-123");
2636 assert_eq!(id.parent_id, Some("parent-456".to_string()));
2637 assert!(id.name.is_none());
2638 }
2639
2640 #[test]
2641 fn test_operation_identifier_with_name() {
2642 let id = OperationIdentifier::root("op-123").with_name("my-step");
2643 assert_eq!(id.name, Some("my-step".to_string()));
2644 }
2645
2646 #[test]
2647 fn test_operation_identifier_display() {
2648 let id = OperationIdentifier::root("op-123");
2649 assert_eq!(format!("{}", id), "op-123");
2650
2651 let id_with_name = OperationIdentifier::root("op-123").with_name("my-step");
2652 assert_eq!(format!("{}", id_with_name), "my-step(op-123)");
2653 }
2654
2655 #[test]
2656 fn test_generate_operation_id_deterministic() {
2657 let id1 = generate_operation_id("base-123", 0);
2658 let id2 = generate_operation_id("base-123", 0);
2659 assert_eq!(id1, id2);
2660 }
2661
2662 #[test]
2663 fn test_generate_operation_id_different_counters() {
2664 let id1 = generate_operation_id("base-123", 0);
2665 let id2 = generate_operation_id("base-123", 1);
2666 assert_ne!(id1, id2);
2667 }
2668
2669 #[test]
2670 fn test_generate_operation_id_different_bases() {
2671 let id1 = generate_operation_id("base-123", 0);
2672 let id2 = generate_operation_id("base-456", 0);
2673 assert_ne!(id1, id2);
2674 }
2675
2676 #[test]
2677 fn test_generate_operation_id_length() {
2678 let id = generate_operation_id("base-123", 0);
2679 assert_eq!(id.len(), 32); // 16 bytes = 32 hex chars
2680 }
2681
2682 #[test]
2683 fn test_operation_id_generator_new() {
2684 let gen = OperationIdGenerator::new("base-123");
2685 assert_eq!(gen.base_id(), "base-123");
2686 assert_eq!(gen.current_counter(), 0);
2687 }
2688
2689 #[test]
2690 fn test_operation_id_generator_with_counter() {
2691 let gen = OperationIdGenerator::with_counter("base-123", 10);
2692 assert_eq!(gen.current_counter(), 10);
2693 }
2694
2695 #[test]
2696 fn test_operation_id_generator_next_id() {
2697 let gen = OperationIdGenerator::new("base-123");
2698
2699 let id1 = gen.next_id();
2700 assert_eq!(gen.current_counter(), 1);
2701
2702 let id2 = gen.next_id();
2703 assert_eq!(gen.current_counter(), 2);
2704
2705 assert_ne!(id1, id2);
2706 }
2707
2708 #[test]
2709 fn test_operation_id_generator_id_for_counter() {
2710 let gen = OperationIdGenerator::new("base-123");
2711
2712 let id_0 = gen.id_for_counter(0);
2713 let id_1 = gen.id_for_counter(1);
2714
2715 // id_for_counter should not increment the counter
2716 assert_eq!(gen.current_counter(), 0);
2717
2718 // Should match what next_id would produce
2719 let next = gen.next_id();
2720 assert_eq!(next, id_0);
2721
2722 let next = gen.next_id();
2723 assert_eq!(next, id_1);
2724 }
2725
2726 #[test]
2727 fn test_operation_id_generator_create_child() {
2728 let gen = OperationIdGenerator::new("base-123");
2729 gen.next_id(); // Increment parent counter
2730
2731 let child = gen.create_child("child-op-id");
2732 assert_eq!(child.base_id(), "child-op-id");
2733 assert_eq!(child.current_counter(), 0);
2734 }
2735
2736 #[test]
2737 fn test_operation_id_generator_clone() {
2738 let gen = OperationIdGenerator::new("base-123");
2739 gen.next_id();
2740 gen.next_id();
2741
2742 let cloned = gen.clone();
2743 assert_eq!(cloned.base_id(), gen.base_id());
2744 assert_eq!(cloned.current_counter(), gen.current_counter());
2745 }
2746
2747 #[test]
2748 fn test_operation_id_generator_thread_safety() {
2749 use std::thread;
2750
2751 let gen = Arc::new(OperationIdGenerator::new("base-123"));
2752 let mut handles = vec![];
2753
2754 for _ in 0..10 {
2755 let gen_clone = gen.clone();
2756 handles.push(thread::spawn(move || {
2757 let mut ids = vec![];
2758 for _ in 0..100 {
2759 ids.push(gen_clone.next_id());
2760 }
2761 ids
2762 }));
2763 }
2764
2765 let mut all_ids = vec![];
2766 for handle in handles {
2767 all_ids.extend(handle.join().unwrap());
2768 }
2769
2770 // All IDs should be unique
2771 let unique_count = {
2772 let mut set = std::collections::HashSet::new();
2773 for id in &all_ids {
2774 set.insert(id.clone());
2775 }
2776 set.len()
2777 };
2778
2779 assert_eq!(unique_count, 1000);
2780 assert_eq!(gen.current_counter(), 1000);
2781 }
2782
2783 #[test]
2784 fn test_log_info_new() {
2785 let info = LogInfo::new("arn:test");
2786 assert_eq!(info.durable_execution_arn, Some("arn:test".to_string()));
2787 assert!(info.operation_id.is_none());
2788 assert!(info.parent_id.is_none());
2789 }
2790
2791 #[test]
2792 fn test_log_info_with_operation_id() {
2793 let info = LogInfo::new("arn:test").with_operation_id("op-123");
2794 assert_eq!(info.operation_id, Some("op-123".to_string()));
2795 }
2796
2797 #[test]
2798 fn test_log_info_with_parent_id() {
2799 let info = LogInfo::new("arn:test").with_parent_id("parent-456");
2800 assert_eq!(info.parent_id, Some("parent-456".to_string()));
2801 }
2802
2803 #[test]
2804 fn test_log_info_with_extra() {
2805 let info = LogInfo::new("arn:test")
2806 .with_extra("key1", "value1")
2807 .with_extra("key2", "value2");
2808 assert_eq!(info.extra.len(), 2);
2809 assert_eq!(info.extra[0], ("key1".to_string(), "value1".to_string()));
2810 }
2811
2812 #[test]
2813 fn test_hex_encode() {
2814 assert_eq!(hex::encode(&[0x00]), "00");
2815 assert_eq!(hex::encode(&[0xff]), "ff");
2816 assert_eq!(hex::encode(&[0x12, 0x34, 0xab, 0xcd]), "1234abcd");
2817 }
2818}
2819
2820#[cfg(test)]
2821mod durable_context_tests {
2822 use super::*;
2823 use crate::client::SharedDurableServiceClient;
2824 use crate::lambda::InitialExecutionState;
2825 use std::sync::Arc;
2826
2827 #[cfg(test)]
2828 fn create_mock_client() -> SharedDurableServiceClient {
2829 use crate::client::MockDurableServiceClient;
2830 Arc::new(MockDurableServiceClient::new())
2831 }
2832
2833 fn create_test_state() -> Arc<ExecutionState> {
2834 let client = create_mock_client();
2835 Arc::new(ExecutionState::new(
2836 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
2837 "token-123",
2838 InitialExecutionState::new(),
2839 client,
2840 ))
2841 }
2842
2843 #[test]
2844 fn test_durable_context_new() {
2845 let state = create_test_state();
2846 let ctx = DurableContext::new(state.clone());
2847
2848 assert_eq!(
2849 ctx.durable_execution_arn(),
2850 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123"
2851 );
2852 assert!(ctx.parent_id().is_none());
2853 assert!(ctx.lambda_context().is_none());
2854 assert_eq!(ctx.current_step_counter(), 0);
2855 }
2856
2857 #[test]
2858 fn test_durable_context_next_operation_id() {
2859 let state = create_test_state();
2860 let ctx = DurableContext::new(state);
2861
2862 let id1 = ctx.next_operation_id();
2863 let id2 = ctx.next_operation_id();
2864
2865 assert_ne!(id1, id2);
2866 assert_eq!(ctx.current_step_counter(), 2);
2867 }
2868
2869 #[test]
2870 fn test_durable_context_next_operation_identifier() {
2871 let state = create_test_state();
2872 let ctx = DurableContext::new(state);
2873
2874 let op_id = ctx.next_operation_identifier(Some("my-step".to_string()));
2875
2876 assert!(op_id.parent_id.is_none());
2877 assert_eq!(op_id.name, Some("my-step".to_string()));
2878 assert!(!op_id.operation_id.is_empty());
2879 }
2880
2881 #[test]
2882 fn test_durable_context_create_child_context() {
2883 let state = create_test_state();
2884 let ctx = DurableContext::new(state);
2885
2886 // Generate a parent operation ID
2887 let parent_op_id = ctx.next_operation_id();
2888
2889 // Create child context
2890 let child_ctx = ctx.create_child_context(&parent_op_id);
2891
2892 assert_eq!(child_ctx.parent_id(), Some(parent_op_id.as_str()));
2893 assert_eq!(child_ctx.current_step_counter(), 0);
2894 assert_eq!(
2895 child_ctx.durable_execution_arn(),
2896 ctx.durable_execution_arn()
2897 );
2898 }
2899
2900 #[test]
2901 fn test_durable_context_child_generates_different_ids() {
2902 let state = create_test_state();
2903 let ctx = DurableContext::new(state);
2904
2905 let parent_op_id = ctx.next_operation_id();
2906 let child_ctx = ctx.create_child_context(&parent_op_id);
2907
2908 // Child should generate different IDs than parent
2909 let child_id = child_ctx.next_operation_id();
2910 let parent_id = ctx.next_operation_id();
2911
2912 assert_ne!(child_id, parent_id);
2913 }
2914
2915 #[test]
2916 fn test_durable_context_child_operation_identifier_has_parent() {
2917 let state = create_test_state();
2918 let ctx = DurableContext::new(state);
2919
2920 let parent_op_id = ctx.next_operation_id();
2921 let child_ctx = ctx.create_child_context(&parent_op_id);
2922
2923 let child_op_id = child_ctx.next_operation_identifier(None);
2924
2925 assert_eq!(child_op_id.parent_id, Some(parent_op_id));
2926 }
2927
2928 #[test]
2929 fn test_durable_context_set_logger() {
2930 let state = create_test_state();
2931 let mut ctx = DurableContext::new(state);
2932
2933 // Create a custom logger
2934 let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2935 ctx.set_logger(custom_logger);
2936
2937 // Just verify it doesn't panic - the logger is set
2938 let _ = ctx.logger();
2939 }
2940
2941 #[test]
2942 fn test_durable_context_with_logger() {
2943 let state = create_test_state();
2944 let ctx = DurableContext::new(state);
2945
2946 let custom_logger: Arc<dyn Logger> = Arc::new(TracingLogger);
2947 let ctx_with_logger = ctx.with_logger(custom_logger);
2948
2949 // Just verify it doesn't panic - the logger is set
2950 let _ = ctx_with_logger.logger();
2951 }
2952
2953 #[test]
2954 fn test_durable_context_create_log_info() {
2955 let state = create_test_state();
2956 let ctx = DurableContext::new(state);
2957
2958 let info = ctx.create_log_info();
2959
2960 assert_eq!(
2961 info.durable_execution_arn,
2962 Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
2963 );
2964 assert!(info.parent_id.is_none());
2965 }
2966
2967 #[test]
2968 fn test_durable_context_create_log_info_with_parent() {
2969 let state = create_test_state();
2970 let ctx = DurableContext::new(state);
2971
2972 let parent_op_id = ctx.next_operation_id();
2973 let child_ctx = ctx.create_child_context(&parent_op_id);
2974
2975 let info = child_ctx.create_log_info();
2976
2977 assert_eq!(info.parent_id, Some(parent_op_id));
2978 }
2979
2980 #[test]
2981 fn test_durable_context_create_log_info_with_operation() {
2982 let state = create_test_state();
2983 let ctx = DurableContext::new(state);
2984
2985 let info = ctx.create_log_info_with_operation("op-123");
2986
2987 assert_eq!(info.operation_id, Some("op-123".to_string()));
2988 }
2989
2990 #[test]
2991 fn test_log_info_with_replay() {
2992 let info = LogInfo::new("arn:test")
2993 .with_operation_id("op-123")
2994 .with_replay(true);
2995
2996 assert!(info.is_replay);
2997 assert_eq!(info.operation_id, Some("op-123".to_string()));
2998 }
2999
3000 #[test]
3001 fn test_log_info_default_not_replay() {
3002 let info = LogInfo::default();
3003 assert!(!info.is_replay);
3004 }
3005
3006 #[test]
3007 fn test_replay_logging_config_default() {
3008 let config = ReplayLoggingConfig::default();
3009 assert_eq!(config, ReplayLoggingConfig::SuppressAll);
3010 }
3011
3012 #[test]
3013 fn test_replay_aware_logger_suppress_all() {
3014 use std::sync::atomic::{AtomicUsize, Ordering};
3015
3016 // Create counters for each log level
3017 let debug_count = Arc::new(AtomicUsize::new(0));
3018 let info_count = Arc::new(AtomicUsize::new(0));
3019 let warn_count = Arc::new(AtomicUsize::new(0));
3020 let error_count = Arc::new(AtomicUsize::new(0));
3021
3022 let inner = Arc::new(custom_logger(
3023 {
3024 let count = debug_count.clone();
3025 move |_, _| {
3026 count.fetch_add(1, Ordering::SeqCst);
3027 }
3028 },
3029 {
3030 let count = info_count.clone();
3031 move |_, _| {
3032 count.fetch_add(1, Ordering::SeqCst);
3033 }
3034 },
3035 {
3036 let count = warn_count.clone();
3037 move |_, _| {
3038 count.fetch_add(1, Ordering::SeqCst);
3039 }
3040 },
3041 {
3042 let count = error_count.clone();
3043 move |_, _| {
3044 count.fetch_add(1, Ordering::SeqCst);
3045 }
3046 },
3047 ));
3048
3049 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
3050
3051 // Non-replay logs should pass through
3052 let non_replay_info = LogInfo::new("arn:test").with_replay(false);
3053 logger.debug("test", &non_replay_info);
3054 logger.info("test", &non_replay_info);
3055 logger.warn("test", &non_replay_info);
3056 logger.error("test", &non_replay_info);
3057
3058 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3059 assert_eq!(info_count.load(Ordering::SeqCst), 1);
3060 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3061 assert_eq!(error_count.load(Ordering::SeqCst), 1);
3062
3063 // Replay logs should be suppressed
3064 let replay_info = LogInfo::new("arn:test").with_replay(true);
3065 logger.debug("test", &replay_info);
3066 logger.info("test", &replay_info);
3067 logger.warn("test", &replay_info);
3068 logger.error("test", &replay_info);
3069
3070 // Counts should not have increased
3071 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3072 assert_eq!(info_count.load(Ordering::SeqCst), 1);
3073 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3074 assert_eq!(error_count.load(Ordering::SeqCst), 1);
3075 }
3076
3077 #[test]
3078 fn test_replay_aware_logger_allow_all() {
3079 use std::sync::atomic::{AtomicUsize, Ordering};
3080
3081 let call_count = Arc::new(AtomicUsize::new(0));
3082
3083 let inner = Arc::new(custom_logger(
3084 {
3085 let count = call_count.clone();
3086 move |_, _| {
3087 count.fetch_add(1, Ordering::SeqCst);
3088 }
3089 },
3090 {
3091 let count = call_count.clone();
3092 move |_, _| {
3093 count.fetch_add(1, Ordering::SeqCst);
3094 }
3095 },
3096 {
3097 let count = call_count.clone();
3098 move |_, _| {
3099 count.fetch_add(1, Ordering::SeqCst);
3100 }
3101 },
3102 {
3103 let count = call_count.clone();
3104 move |_, _| {
3105 count.fetch_add(1, Ordering::SeqCst);
3106 }
3107 },
3108 ));
3109
3110 let logger = ReplayAwareLogger::allow_all(inner);
3111
3112 // All logs should pass through even during replay
3113 let replay_info = LogInfo::new("arn:test").with_replay(true);
3114 logger.debug("test", &replay_info);
3115 logger.info("test", &replay_info);
3116 logger.warn("test", &replay_info);
3117 logger.error("test", &replay_info);
3118
3119 assert_eq!(call_count.load(Ordering::SeqCst), 4);
3120 }
3121
3122 #[test]
3123 fn test_replay_aware_logger_errors_only() {
3124 use std::sync::atomic::{AtomicUsize, Ordering};
3125
3126 let debug_count = Arc::new(AtomicUsize::new(0));
3127 let info_count = Arc::new(AtomicUsize::new(0));
3128 let warn_count = Arc::new(AtomicUsize::new(0));
3129 let error_count = Arc::new(AtomicUsize::new(0));
3130
3131 let inner = Arc::new(custom_logger(
3132 {
3133 let count = debug_count.clone();
3134 move |_, _| {
3135 count.fetch_add(1, Ordering::SeqCst);
3136 }
3137 },
3138 {
3139 let count = info_count.clone();
3140 move |_, _| {
3141 count.fetch_add(1, Ordering::SeqCst);
3142 }
3143 },
3144 {
3145 let count = warn_count.clone();
3146 move |_, _| {
3147 count.fetch_add(1, Ordering::SeqCst);
3148 }
3149 },
3150 {
3151 let count = error_count.clone();
3152 move |_, _| {
3153 count.fetch_add(1, Ordering::SeqCst);
3154 }
3155 },
3156 ));
3157
3158 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::ErrorsOnly);
3159
3160 // During replay, only errors should pass through
3161 let replay_info = LogInfo::new("arn:test").with_replay(true);
3162 logger.debug("test", &replay_info);
3163 logger.info("test", &replay_info);
3164 logger.warn("test", &replay_info);
3165 logger.error("test", &replay_info);
3166
3167 assert_eq!(debug_count.load(Ordering::SeqCst), 0);
3168 assert_eq!(info_count.load(Ordering::SeqCst), 0);
3169 assert_eq!(warn_count.load(Ordering::SeqCst), 0);
3170 assert_eq!(error_count.load(Ordering::SeqCst), 1);
3171 }
3172
3173 #[test]
3174 fn test_replay_aware_logger_warnings_and_errors() {
3175 use std::sync::atomic::{AtomicUsize, Ordering};
3176
3177 let debug_count = Arc::new(AtomicUsize::new(0));
3178 let info_count = Arc::new(AtomicUsize::new(0));
3179 let warn_count = Arc::new(AtomicUsize::new(0));
3180 let error_count = Arc::new(AtomicUsize::new(0));
3181
3182 let inner = Arc::new(custom_logger(
3183 {
3184 let count = debug_count.clone();
3185 move |_, _| {
3186 count.fetch_add(1, Ordering::SeqCst);
3187 }
3188 },
3189 {
3190 let count = info_count.clone();
3191 move |_, _| {
3192 count.fetch_add(1, Ordering::SeqCst);
3193 }
3194 },
3195 {
3196 let count = warn_count.clone();
3197 move |_, _| {
3198 count.fetch_add(1, Ordering::SeqCst);
3199 }
3200 },
3201 {
3202 let count = error_count.clone();
3203 move |_, _| {
3204 count.fetch_add(1, Ordering::SeqCst);
3205 }
3206 },
3207 ));
3208
3209 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::WarningsAndErrors);
3210
3211 // During replay, only warnings and errors should pass through
3212 let replay_info = LogInfo::new("arn:test").with_replay(true);
3213 logger.debug("test", &replay_info);
3214 logger.info("test", &replay_info);
3215 logger.warn("test", &replay_info);
3216 logger.error("test", &replay_info);
3217
3218 assert_eq!(debug_count.load(Ordering::SeqCst), 0);
3219 assert_eq!(info_count.load(Ordering::SeqCst), 0);
3220 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3221 assert_eq!(error_count.load(Ordering::SeqCst), 1);
3222 }
3223
3224 #[test]
3225 fn test_replay_aware_logger_suppress_replay_constructor() {
3226 let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3227 let logger = ReplayAwareLogger::suppress_replay(inner);
3228
3229 assert_eq!(logger.config(), ReplayLoggingConfig::SuppressAll);
3230 }
3231
3232 #[test]
3233 fn test_replay_aware_logger_debug() {
3234 let inner: Arc<dyn Logger> = Arc::new(TracingLogger);
3235 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::SuppressAll);
3236
3237 let debug_str = format!("{:?}", logger);
3238 assert!(debug_str.contains("ReplayAwareLogger"));
3239 assert!(debug_str.contains("SuppressAll"));
3240 }
3241
3242 #[test]
3243 fn test_durable_context_create_log_info_with_replay_method() {
3244 let state = create_test_state();
3245 let ctx = DurableContext::new(state);
3246
3247 let info = ctx.create_log_info_with_replay("op-123", true);
3248
3249 assert_eq!(info.operation_id, Some("op-123".to_string()));
3250 assert!(info.is_replay);
3251 }
3252
3253 #[test]
3254 fn test_durable_context_clone() {
3255 let state = create_test_state();
3256 let ctx = DurableContext::new(state);
3257
3258 ctx.next_operation_id();
3259 ctx.next_operation_id();
3260
3261 let cloned = ctx.clone();
3262
3263 assert_eq!(cloned.durable_execution_arn(), ctx.durable_execution_arn());
3264 assert_eq!(cloned.current_step_counter(), ctx.current_step_counter());
3265 }
3266
3267 #[test]
3268 fn test_durable_context_debug() {
3269 let state = create_test_state();
3270 let ctx = DurableContext::new(state);
3271
3272 let debug_str = format!("{:?}", ctx);
3273
3274 assert!(debug_str.contains("DurableContext"));
3275 assert!(debug_str.contains("durable_execution_arn"));
3276 }
3277
3278 #[test]
3279 fn test_durable_context_state_access() {
3280 let state = create_test_state();
3281 let ctx = DurableContext::new(state.clone());
3282
3283 // Verify we can access the state
3284 assert!(Arc::ptr_eq(&ctx.state(), &state));
3285 }
3286
3287 #[test]
3288 fn test_durable_context_send_sync() {
3289 // This test verifies at compile time that DurableContext is Send + Sync
3290 fn assert_send_sync<T: Send + Sync>() {}
3291 assert_send_sync::<DurableContext>();
3292 }
3293
3294 // ========================================================================
3295 // Simplified Logging API Tests
3296 // ========================================================================
3297
3298 #[test]
3299 fn test_log_info_method() {
3300 use std::sync::atomic::{AtomicUsize, Ordering};
3301 use std::sync::Mutex;
3302
3303 let info_count = Arc::new(AtomicUsize::new(0));
3304 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3305
3306 let captured_info_clone = captured_info.clone();
3307 let inner = Arc::new(custom_logger(
3308 |_, _| {},
3309 {
3310 let count = info_count.clone();
3311 move |_, info: &LogInfo| {
3312 count.fetch_add(1, Ordering::SeqCst);
3313 *captured_info_clone.lock().unwrap() = Some(info.clone());
3314 }
3315 },
3316 |_, _| {},
3317 |_, _| {},
3318 ));
3319
3320 let state = create_test_state();
3321 let ctx = DurableContext::new(state).with_logger(inner);
3322
3323 ctx.log_info("Test message");
3324
3325 assert_eq!(info_count.load(Ordering::SeqCst), 1);
3326
3327 let captured = captured_info.lock().unwrap();
3328 let info = captured.as_ref().unwrap();
3329 assert_eq!(
3330 info.durable_execution_arn,
3331 Some("arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123".to_string())
3332 );
3333 }
3334
3335 #[test]
3336 fn test_log_debug_method() {
3337 use std::sync::atomic::{AtomicUsize, Ordering};
3338
3339 let debug_count = Arc::new(AtomicUsize::new(0));
3340
3341 let inner = Arc::new(custom_logger(
3342 {
3343 let count = debug_count.clone();
3344 move |_, _| {
3345 count.fetch_add(1, Ordering::SeqCst);
3346 }
3347 },
3348 |_, _| {},
3349 |_, _| {},
3350 |_, _| {},
3351 ));
3352
3353 let state = create_test_state();
3354 let ctx = DurableContext::new(state).with_logger(inner);
3355
3356 ctx.log_debug("Debug message");
3357
3358 assert_eq!(debug_count.load(Ordering::SeqCst), 1);
3359 }
3360
3361 #[test]
3362 fn test_log_warn_method() {
3363 use std::sync::atomic::{AtomicUsize, Ordering};
3364
3365 let warn_count = Arc::new(AtomicUsize::new(0));
3366
3367 let inner = Arc::new(custom_logger(
3368 |_, _| {},
3369 |_, _| {},
3370 {
3371 let count = warn_count.clone();
3372 move |_, _| {
3373 count.fetch_add(1, Ordering::SeqCst);
3374 }
3375 },
3376 |_, _| {},
3377 ));
3378
3379 let state = create_test_state();
3380 let ctx = DurableContext::new(state).with_logger(inner);
3381
3382 ctx.log_warn("Warning message");
3383
3384 assert_eq!(warn_count.load(Ordering::SeqCst), 1);
3385 }
3386
3387 #[test]
3388 fn test_log_error_method() {
3389 use std::sync::atomic::{AtomicUsize, Ordering};
3390
3391 let error_count = Arc::new(AtomicUsize::new(0));
3392
3393 let inner = Arc::new(custom_logger(|_, _| {}, |_, _| {}, |_, _| {}, {
3394 let count = error_count.clone();
3395 move |_, _| {
3396 count.fetch_add(1, Ordering::SeqCst);
3397 }
3398 }));
3399
3400 let state = create_test_state();
3401 let ctx = DurableContext::new(state).with_logger(inner);
3402
3403 ctx.log_error("Error message");
3404
3405 assert_eq!(error_count.load(Ordering::SeqCst), 1);
3406 }
3407
3408 #[test]
3409 fn test_log_info_with_extra_fields() {
3410 use std::sync::Mutex;
3411
3412 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3413
3414 let captured_info_clone = captured_info.clone();
3415 let inner = Arc::new(custom_logger(
3416 |_, _| {},
3417 move |_, info: &LogInfo| {
3418 *captured_info_clone.lock().unwrap() = Some(info.clone());
3419 },
3420 |_, _| {},
3421 |_, _| {},
3422 ));
3423
3424 let state = create_test_state();
3425 let ctx = DurableContext::new(state).with_logger(inner);
3426
3427 ctx.log_info_with("Test message", &[("order_id", "123"), ("amount", "99.99")]);
3428
3429 let captured = captured_info.lock().unwrap();
3430 let info = captured.as_ref().unwrap();
3431
3432 // Verify extra fields are present
3433 assert_eq!(info.extra.len(), 2);
3434 assert!(info
3435 .extra
3436 .contains(&("order_id".to_string(), "123".to_string())));
3437 assert!(info
3438 .extra
3439 .contains(&("amount".to_string(), "99.99".to_string())));
3440 }
3441
3442 #[test]
3443 fn test_log_debug_with_extra_fields() {
3444 use std::sync::Mutex;
3445
3446 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3447
3448 let captured_info_clone = captured_info.clone();
3449 let inner = Arc::new(custom_logger(
3450 move |_, info: &LogInfo| {
3451 *captured_info_clone.lock().unwrap() = Some(info.clone());
3452 },
3453 |_, _| {},
3454 |_, _| {},
3455 |_, _| {},
3456 ));
3457
3458 let state = create_test_state();
3459 let ctx = DurableContext::new(state).with_logger(inner);
3460
3461 ctx.log_debug_with("Debug message", &[("key", "value")]);
3462
3463 let captured = captured_info.lock().unwrap();
3464 let info = captured.as_ref().unwrap();
3465
3466 assert_eq!(info.extra.len(), 1);
3467 assert!(info
3468 .extra
3469 .contains(&("key".to_string(), "value".to_string())));
3470 }
3471
3472 #[test]
3473 fn test_log_warn_with_extra_fields() {
3474 use std::sync::Mutex;
3475
3476 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3477
3478 let captured_info_clone = captured_info.clone();
3479 let inner = Arc::new(custom_logger(
3480 |_, _| {},
3481 |_, _| {},
3482 move |_, info: &LogInfo| {
3483 *captured_info_clone.lock().unwrap() = Some(info.clone());
3484 },
3485 |_, _| {},
3486 ));
3487
3488 let state = create_test_state();
3489 let ctx = DurableContext::new(state).with_logger(inner);
3490
3491 ctx.log_warn_with("Warning message", &[("retry", "3")]);
3492
3493 let captured = captured_info.lock().unwrap();
3494 let info = captured.as_ref().unwrap();
3495
3496 assert_eq!(info.extra.len(), 1);
3497 assert!(info.extra.contains(&("retry".to_string(), "3".to_string())));
3498 }
3499
3500 #[test]
3501 fn test_log_error_with_extra_fields() {
3502 use std::sync::Mutex;
3503
3504 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3505
3506 let captured_info_clone = captured_info.clone();
3507 let inner = Arc::new(custom_logger(
3508 |_, _| {},
3509 |_, _| {},
3510 |_, _| {},
3511 move |_, info: &LogInfo| {
3512 *captured_info_clone.lock().unwrap() = Some(info.clone());
3513 },
3514 ));
3515
3516 let state = create_test_state();
3517 let ctx = DurableContext::new(state).with_logger(inner);
3518
3519 ctx.log_error_with(
3520 "Error message",
3521 &[("error_code", "E001"), ("details", "Something went wrong")],
3522 );
3523
3524 let captured = captured_info.lock().unwrap();
3525 let info = captured.as_ref().unwrap();
3526
3527 assert_eq!(info.extra.len(), 2);
3528 assert!(info
3529 .extra
3530 .contains(&("error_code".to_string(), "E001".to_string())));
3531 assert!(info
3532 .extra
3533 .contains(&("details".to_string(), "Something went wrong".to_string())));
3534 }
3535
3536 #[test]
3537 fn test_log_methods_include_parent_id_in_child_context() {
3538 use std::sync::Mutex;
3539
3540 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3541
3542 let captured_info_clone = captured_info.clone();
3543 let inner: Arc<dyn Logger> = Arc::new(custom_logger(
3544 |_, _| {},
3545 move |_, info: &LogInfo| {
3546 *captured_info_clone.lock().unwrap() = Some(info.clone());
3547 },
3548 |_, _| {},
3549 |_, _| {},
3550 ));
3551
3552 let state = create_test_state();
3553 let ctx = DurableContext::new(state).with_logger(inner.clone());
3554
3555 let parent_op_id = ctx.next_operation_id();
3556 let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
3557
3558 child_ctx.log_info("Child context message");
3559
3560 let captured = captured_info.lock().unwrap();
3561 let info = captured.as_ref().unwrap();
3562
3563 // Verify parent_id is included
3564 assert_eq!(info.parent_id, Some(parent_op_id));
3565 }
3566
3567 // ========================================================================
3568 // Runtime Logger Reconfiguration Tests (Req 13.1, 13.2)
3569 // ========================================================================
3570
3571 #[test]
3572 fn test_configure_logger_swaps_logger() {
3573 // Req 13.1: configure_logger swaps the logger, subsequent calls use new logger
3574 use std::sync::atomic::{AtomicUsize, Ordering};
3575
3576 let original_count = Arc::new(AtomicUsize::new(0));
3577 let new_count = Arc::new(AtomicUsize::new(0));
3578
3579 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3580 |_, _| {},
3581 {
3582 let count = original_count.clone();
3583 move |_, _| {
3584 count.fetch_add(1, Ordering::SeqCst);
3585 }
3586 },
3587 |_, _| {},
3588 |_, _| {},
3589 ));
3590
3591 let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3592 |_, _| {},
3593 {
3594 let count = new_count.clone();
3595 move |_, _| {
3596 count.fetch_add(1, Ordering::SeqCst);
3597 }
3598 },
3599 |_, _| {},
3600 |_, _| {},
3601 ));
3602
3603 let state = create_test_state();
3604 let ctx = DurableContext::new(state).with_logger(original_logger);
3605
3606 // Log with original logger
3607 ctx.log_info("before swap");
3608 assert_eq!(original_count.load(Ordering::SeqCst), 1);
3609 assert_eq!(new_count.load(Ordering::SeqCst), 0);
3610
3611 // Swap logger at runtime (note: &self, not &mut self)
3612 ctx.configure_logger(new_logger);
3613
3614 // Subsequent calls use the new logger
3615 ctx.log_info("after swap");
3616 assert_eq!(original_count.load(Ordering::SeqCst), 1); // unchanged
3617 assert_eq!(new_count.load(Ordering::SeqCst), 1);
3618 }
3619
3620 #[test]
3621 fn test_original_logger_used_when_configure_logger_not_called() {
3622 // Req 13.2: Original logger used when configure_logger not called
3623 use std::sync::atomic::{AtomicUsize, Ordering};
3624
3625 let original_count = Arc::new(AtomicUsize::new(0));
3626
3627 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3628 |_, _| {},
3629 {
3630 let count = original_count.clone();
3631 move |_, _| {
3632 count.fetch_add(1, Ordering::SeqCst);
3633 }
3634 },
3635 |_, _| {},
3636 |_, _| {},
3637 ));
3638
3639 let state = create_test_state();
3640 let ctx = DurableContext::new(state).with_logger(original_logger);
3641
3642 // Log multiple times without calling configure_logger
3643 ctx.log_info("message 1");
3644 ctx.log_info("message 2");
3645 ctx.log_info("message 3");
3646
3647 assert_eq!(original_count.load(Ordering::SeqCst), 3);
3648 }
3649
3650 #[test]
3651 fn test_configure_logger_affects_child_contexts() {
3652 // Verify that child contexts sharing the same RwLock see the swapped logger
3653 use std::sync::atomic::{AtomicUsize, Ordering};
3654
3655 let original_count = Arc::new(AtomicUsize::new(0));
3656 let new_count = Arc::new(AtomicUsize::new(0));
3657
3658 let original_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3659 |_, _| {},
3660 {
3661 let count = original_count.clone();
3662 move |_, _| {
3663 count.fetch_add(1, Ordering::SeqCst);
3664 }
3665 },
3666 |_, _| {},
3667 |_, _| {},
3668 ));
3669
3670 let new_logger: Arc<dyn Logger> = Arc::new(custom_logger(
3671 |_, _| {},
3672 {
3673 let count = new_count.clone();
3674 move |_, _| {
3675 count.fetch_add(1, Ordering::SeqCst);
3676 }
3677 },
3678 |_, _| {},
3679 |_, _| {},
3680 ));
3681
3682 let state = create_test_state();
3683 let ctx = DurableContext::new(state).with_logger(original_logger);
3684 let parent_op_id = ctx.next_operation_id();
3685 let child_ctx = ctx.create_child_context(&parent_op_id);
3686
3687 // Child uses original logger
3688 child_ctx.log_info("child before swap");
3689 assert_eq!(original_count.load(Ordering::SeqCst), 1);
3690
3691 // Swap on parent
3692 ctx.configure_logger(new_logger);
3693
3694 // Child now uses the new logger (shared RwLock)
3695 child_ctx.log_info("child after swap");
3696 assert_eq!(new_count.load(Ordering::SeqCst), 1);
3697 assert_eq!(original_count.load(Ordering::SeqCst), 1); // unchanged
3698 }
3699}
3700
3701#[cfg(test)]
3702mod property_tests {
3703 use super::*;
3704 use proptest::prelude::*;
3705
3706 // Property 3: Operation ID Determinism
3707 // *For any* DurableContext with a given parent_id and step counter state,
3708 // calling create_operation_id() SHALL produce the same ID when called
3709 // in the same sequence position across multiple executions.
3710 // **Validates: Requirements 1.10**
3711 proptest! {
3712 #![proptest_config(ProptestConfig::with_cases(100))]
3713
3714 /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism
3715 /// Validates: Requirements 1.10
3716 #[test]
3717 fn prop_operation_id_determinism(
3718 base_id in "[a-zA-Z0-9:/-]{1,100}",
3719 counter in 0u64..10000u64,
3720 ) {
3721 // Generate the same ID twice with the same inputs
3722 let id1 = generate_operation_id(&base_id, counter);
3723 let id2 = generate_operation_id(&base_id, counter);
3724
3725 // IDs must be identical for the same inputs
3726 prop_assert_eq!(&id1, &id2, "Same base_id and counter must produce identical IDs");
3727
3728 // ID must be a valid hex string of expected length
3729 prop_assert_eq!(id1.len(), 32, "ID must be 32 hex characters");
3730 prop_assert!(id1.chars().all(|c| c.is_ascii_hexdigit()), "ID must be valid hex");
3731 }
3732
3733 /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism (Generator)
3734 /// Validates: Requirements 1.10
3735 #[test]
3736 fn prop_operation_id_generator_determinism(
3737 base_id in "[a-zA-Z0-9:/-]{1,100}",
3738 num_ids in 1usize..50usize,
3739 ) {
3740 // Create two generators with the same base ID
3741 let gen1 = OperationIdGenerator::new(&base_id);
3742 let gen2 = OperationIdGenerator::new(&base_id);
3743
3744 // Generate the same sequence of IDs from both
3745 let ids1: Vec<String> = (0..num_ids).map(|_| gen1.next_id()).collect();
3746 let ids2: Vec<String> = (0..num_ids).map(|_| gen2.next_id()).collect();
3747
3748 // Sequences must be identical
3749 prop_assert_eq!(&ids1, &ids2, "Same generator sequence must produce identical IDs");
3750
3751 // All IDs in a sequence must be unique
3752 let unique_count = {
3753 let mut set = std::collections::HashSet::new();
3754 for id in &ids1 {
3755 set.insert(id.clone());
3756 }
3757 set.len()
3758 };
3759 prop_assert_eq!(unique_count, num_ids, "All IDs in sequence must be unique");
3760 }
3761
3762 /// Feature: durable-execution-rust-sdk, Property 3: Operation ID Determinism (Replay Simulation)
3763 /// Validates: Requirements 1.10
3764 #[test]
3765 fn prop_operation_id_replay_determinism(
3766 base_id in "[a-zA-Z0-9:/-]{1,100}",
3767 operations in prop::collection::vec(0u32..3u32, 1..20),
3768 ) {
3769 // Simulate two executions with the same sequence of operations
3770 // Each operation type increments the counter
3771
3772 let gen1 = OperationIdGenerator::new(&base_id);
3773 let gen2 = OperationIdGenerator::new(&base_id);
3774
3775 let mut ids1 = Vec::new();
3776 let mut ids2 = Vec::new();
3777
3778 // First "execution"
3779 for op_type in &operations {
3780 // Each operation generates an ID
3781 ids1.push(gen1.next_id());
3782
3783 // Some operations might generate additional child IDs
3784 if *op_type == 2 {
3785 let parent_id = ids1.last().unwrap().clone();
3786 let child_gen = gen1.create_child(parent_id);
3787 ids1.push(child_gen.next_id());
3788 }
3789 }
3790
3791 // Second "execution" (replay) - must produce same IDs
3792 for op_type in &operations {
3793 ids2.push(gen2.next_id());
3794
3795 if *op_type == 2 {
3796 let parent_id = ids2.last().unwrap().clone();
3797 let child_gen = gen2.create_child(parent_id);
3798 ids2.push(child_gen.next_id());
3799 }
3800 }
3801
3802 // Both executions must produce identical ID sequences
3803 prop_assert_eq!(&ids1, &ids2, "Replay must produce identical operation IDs");
3804 }
3805 }
3806
3807 // Property 5: Concurrent ID Generation Uniqueness
3808 // *For any* number of concurrent tasks generating operation IDs from the same
3809 // DurableContext, all generated IDs SHALL be unique.
3810 // **Validates: Requirements 17.3**
3811 mod concurrent_id_tests {
3812 use super::*;
3813 use std::sync::Arc;
3814 use std::thread;
3815
3816 proptest! {
3817 #![proptest_config(ProptestConfig::with_cases(100))]
3818
3819 /// Feature: durable-execution-rust-sdk, Property 5: Concurrent ID Generation Uniqueness
3820 /// Validates: Requirements 17.3
3821 #[test]
3822 fn prop_concurrent_id_uniqueness(
3823 base_id in "[a-zA-Z0-9:/-]{1,100}",
3824 num_threads in 2usize..10usize,
3825 ids_per_thread in 10usize..100usize,
3826 ) {
3827 let gen = Arc::new(OperationIdGenerator::new(&base_id));
3828 let mut handles = vec![];
3829
3830 // Spawn multiple threads that concurrently generate IDs
3831 for _ in 0..num_threads {
3832 let gen_clone = gen.clone();
3833 let count = ids_per_thread;
3834 handles.push(thread::spawn(move || {
3835 let mut ids = Vec::with_capacity(count);
3836 for _ in 0..count {
3837 ids.push(gen_clone.next_id());
3838 }
3839 ids
3840 }));
3841 }
3842
3843 // Collect all IDs from all threads
3844 let mut all_ids = Vec::new();
3845 for handle in handles {
3846 all_ids.extend(handle.join().unwrap());
3847 }
3848
3849 let total_expected = num_threads * ids_per_thread;
3850
3851 // Verify we got the expected number of IDs
3852 prop_assert_eq!(all_ids.len(), total_expected, "Should have generated {} IDs", total_expected);
3853
3854 // Verify all IDs are unique
3855 let unique_count = {
3856 let mut set = std::collections::HashSet::new();
3857 for id in &all_ids {
3858 set.insert(id.clone());
3859 }
3860 set.len()
3861 };
3862
3863 prop_assert_eq!(
3864 unique_count,
3865 total_expected,
3866 "All {} IDs must be unique, but only {} were unique",
3867 total_expected,
3868 unique_count
3869 );
3870
3871 // Verify the counter was incremented correctly
3872 prop_assert_eq!(
3873 gen.current_counter() as usize,
3874 total_expected,
3875 "Counter should equal total IDs generated"
3876 );
3877 }
3878
3879 /// Feature: durable-execution-rust-sdk, Property 5: Concurrent ID Generation Uniqueness (Stress)
3880 /// Validates: Requirements 17.3
3881 #[test]
3882 fn prop_concurrent_id_uniqueness_stress(
3883 base_id in "[a-zA-Z0-9:/-]{1,50}",
3884 ) {
3885 // Fixed high-concurrency stress test
3886 let num_threads = 20;
3887 let ids_per_thread = 500;
3888
3889 let gen = Arc::new(OperationIdGenerator::new(&base_id));
3890 let mut handles = vec![];
3891
3892 for _ in 0..num_threads {
3893 let gen_clone = gen.clone();
3894 handles.push(thread::spawn(move || {
3895 let mut ids = Vec::with_capacity(ids_per_thread);
3896 for _ in 0..ids_per_thread {
3897 ids.push(gen_clone.next_id());
3898 }
3899 ids
3900 }));
3901 }
3902
3903 let mut all_ids = Vec::new();
3904 for handle in handles {
3905 all_ids.extend(handle.join().unwrap());
3906 }
3907
3908 let total_expected = num_threads * ids_per_thread;
3909
3910 // Verify all IDs are unique
3911 let unique_count = {
3912 let mut set = std::collections::HashSet::new();
3913 for id in &all_ids {
3914 set.insert(id.clone());
3915 }
3916 set.len()
3917 };
3918
3919 prop_assert_eq!(
3920 unique_count,
3921 total_expected,
3922 "All {} IDs must be unique under high concurrency",
3923 total_expected
3924 );
3925 }
3926 }
3927 }
3928
3929 // Property 9: Logging Methods Automatic Context
3930 // *For any* call to log_info, log_debug, log_warn, or log_error on DurableContext,
3931 // the resulting log message SHALL include durable_execution_arn and parent_id
3932 // (when available) without the caller needing to specify them.
3933 // **Validates: Requirements 4.5**
3934 mod logging_automatic_context_tests {
3935 use super::*;
3936 use std::sync::{Arc, Mutex};
3937
3938 proptest! {
3939 #![proptest_config(ProptestConfig::with_cases(100))]
3940
3941 /// Feature: sdk-ergonomics-improvements, Property 9: Logging Methods Automatic Context
3942 /// Validates: Requirements 4.5
3943 #[test]
3944 fn prop_logging_automatic_context(
3945 message in "[a-zA-Z0-9 ]{1,100}",
3946 log_level in 0u8..4u8,
3947 ) {
3948 use crate::client::MockDurableServiceClient;
3949 use crate::lambda::InitialExecutionState;
3950
3951 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
3952 let captured_info_clone = captured_info.clone();
3953
3954 // Create a logger that captures the LogInfo
3955 let inner = Arc::new(custom_logger(
3956 {
3957 let captured = captured_info_clone.clone();
3958 move |_, info: &LogInfo| {
3959 *captured.lock().unwrap() = Some(info.clone());
3960 }
3961 },
3962 {
3963 let captured = captured_info_clone.clone();
3964 move |_, info: &LogInfo| {
3965 *captured.lock().unwrap() = Some(info.clone());
3966 }
3967 },
3968 {
3969 let captured = captured_info_clone.clone();
3970 move |_, info: &LogInfo| {
3971 *captured.lock().unwrap() = Some(info.clone());
3972 }
3973 },
3974 {
3975 let captured = captured_info_clone.clone();
3976 move |_, info: &LogInfo| {
3977 *captured.lock().unwrap() = Some(info.clone());
3978 }
3979 },
3980 ));
3981
3982 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
3983 let state = Arc::new(ExecutionState::new(
3984 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
3985 "token-123",
3986 InitialExecutionState::new(),
3987 client,
3988 ));
3989 let ctx = DurableContext::new(state).with_logger(inner);
3990
3991 // Call the appropriate log method based on log_level
3992 match log_level {
3993 0 => ctx.log_debug(&message),
3994 1 => ctx.log_info(&message),
3995 2 => ctx.log_warn(&message),
3996 _ => ctx.log_error(&message),
3997 }
3998
3999 // Verify the captured LogInfo has the automatic context
4000 let captured = captured_info.lock().unwrap();
4001 let info = captured.as_ref().expect("LogInfo should be captured");
4002
4003 // durable_execution_arn must always be present
4004 prop_assert!(
4005 info.durable_execution_arn.is_some(),
4006 "durable_execution_arn must be automatically included"
4007 );
4008 prop_assert_eq!(
4009 info.durable_execution_arn.as_ref().unwrap(),
4010 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4011 "durable_execution_arn must match the context's ARN"
4012 );
4013 }
4014
4015 /// Feature: sdk-ergonomics-improvements, Property 9: Logging Methods Automatic Context (Child Context)
4016 /// Validates: Requirements 4.5
4017 #[test]
4018 fn prop_logging_automatic_context_child(
4019 message in "[a-zA-Z0-9 ]{1,100}",
4020 log_level in 0u8..4u8,
4021 ) {
4022 use crate::client::MockDurableServiceClient;
4023 use crate::lambda::InitialExecutionState;
4024
4025 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
4026 let captured_info_clone = captured_info.clone();
4027
4028 // Create a logger that captures the LogInfo
4029 let inner: Arc<dyn Logger> = Arc::new(custom_logger(
4030 {
4031 let captured = captured_info_clone.clone();
4032 move |_, info: &LogInfo| {
4033 *captured.lock().unwrap() = Some(info.clone());
4034 }
4035 },
4036 {
4037 let captured = captured_info_clone.clone();
4038 move |_, info: &LogInfo| {
4039 *captured.lock().unwrap() = Some(info.clone());
4040 }
4041 },
4042 {
4043 let captured = captured_info_clone.clone();
4044 move |_, info: &LogInfo| {
4045 *captured.lock().unwrap() = Some(info.clone());
4046 }
4047 },
4048 {
4049 let captured = captured_info_clone.clone();
4050 move |_, info: &LogInfo| {
4051 *captured.lock().unwrap() = Some(info.clone());
4052 }
4053 },
4054 ));
4055
4056 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4057 let state = Arc::new(ExecutionState::new(
4058 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4059 "token-123",
4060 InitialExecutionState::new(),
4061 client,
4062 ));
4063 let ctx = DurableContext::new(state).with_logger(inner.clone());
4064
4065 // Create a child context
4066 let parent_op_id = ctx.next_operation_id();
4067 let child_ctx = ctx.create_child_context(&parent_op_id).with_logger(inner);
4068
4069 // Call the appropriate log method based on log_level
4070 match log_level {
4071 0 => child_ctx.log_debug(&message),
4072 1 => child_ctx.log_info(&message),
4073 2 => child_ctx.log_warn(&message),
4074 _ => child_ctx.log_error(&message),
4075 }
4076
4077 // Verify the captured LogInfo has the automatic context including parent_id
4078 let captured = captured_info.lock().unwrap();
4079 let info = captured.as_ref().expect("LogInfo should be captured");
4080
4081 // durable_execution_arn must always be present
4082 prop_assert!(
4083 info.durable_execution_arn.is_some(),
4084 "durable_execution_arn must be automatically included in child context"
4085 );
4086
4087 // parent_id must be present in child context
4088 prop_assert!(
4089 info.parent_id.is_some(),
4090 "parent_id must be automatically included in child context"
4091 );
4092 prop_assert_eq!(
4093 info.parent_id.as_ref().unwrap(),
4094 &parent_op_id,
4095 "parent_id must match the parent operation ID"
4096 );
4097 }
4098 }
4099 }
4100
4101 // Property 10: Logging Methods Extra Fields
4102 // *For any* call to log_info_with, log_debug_with, log_warn_with, or log_error_with
4103 // with extra fields, those fields SHALL appear in the resulting log output.
4104 // **Validates: Requirements 4.6**
4105 mod logging_extra_fields_tests {
4106 use super::*;
4107 use std::sync::{Arc, Mutex};
4108
4109 proptest! {
4110 #![proptest_config(ProptestConfig::with_cases(100))]
4111
4112 /// Feature: sdk-ergonomics-improvements, Property 10: Logging Methods Extra Fields
4113 /// Validates: Requirements 4.6
4114 #[test]
4115 fn prop_logging_extra_fields(
4116 message in "[a-zA-Z0-9 ]{1,100}",
4117 log_level in 0u8..4u8,
4118 key1 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
4119 value1 in "[a-zA-Z0-9]{1,50}",
4120 key2 in "[a-zA-Z_][a-zA-Z0-9_]{0,20}",
4121 value2 in "[a-zA-Z0-9]{1,50}",
4122 ) {
4123 use crate::client::MockDurableServiceClient;
4124 use crate::lambda::InitialExecutionState;
4125
4126 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
4127 let captured_info_clone = captured_info.clone();
4128
4129 // Create a logger that captures the LogInfo
4130 let inner = Arc::new(custom_logger(
4131 {
4132 let captured = captured_info_clone.clone();
4133 move |_, info: &LogInfo| {
4134 *captured.lock().unwrap() = Some(info.clone());
4135 }
4136 },
4137 {
4138 let captured = captured_info_clone.clone();
4139 move |_, info: &LogInfo| {
4140 *captured.lock().unwrap() = Some(info.clone());
4141 }
4142 },
4143 {
4144 let captured = captured_info_clone.clone();
4145 move |_, info: &LogInfo| {
4146 *captured.lock().unwrap() = Some(info.clone());
4147 }
4148 },
4149 {
4150 let captured = captured_info_clone.clone();
4151 move |_, info: &LogInfo| {
4152 *captured.lock().unwrap() = Some(info.clone());
4153 }
4154 },
4155 ));
4156
4157 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4158 let state = Arc::new(ExecutionState::new(
4159 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4160 "token-123",
4161 InitialExecutionState::new(),
4162 client,
4163 ));
4164 let ctx = DurableContext::new(state).with_logger(inner);
4165
4166 // Create extra fields
4167 let fields: Vec<(&str, &str)> = vec![(&key1, &value1), (&key2, &value2)];
4168
4169 // Call the appropriate log method based on log_level
4170 match log_level {
4171 0 => ctx.log_debug_with(&message, &fields),
4172 1 => ctx.log_info_with(&message, &fields),
4173 2 => ctx.log_warn_with(&message, &fields),
4174 _ => ctx.log_error_with(&message, &fields),
4175 }
4176
4177 // Verify the captured LogInfo has the extra fields
4178 let captured = captured_info.lock().unwrap();
4179 let info = captured.as_ref().expect("LogInfo should be captured");
4180
4181 // Extra fields must be present
4182 prop_assert_eq!(
4183 info.extra.len(),
4184 2,
4185 "Extra fields must be included in the log output"
4186 );
4187
4188 // Verify each field is present
4189 prop_assert!(
4190 info.extra.contains(&(key1.clone(), value1.clone())),
4191 "First extra field must be present: {}={}", key1, value1
4192 );
4193 prop_assert!(
4194 info.extra.contains(&(key2.clone(), value2.clone())),
4195 "Second extra field must be present: {}={}", key2, value2
4196 );
4197 }
4198
4199 /// Feature: sdk-ergonomics-improvements, Property 10: Logging Methods Extra Fields (Empty)
4200 /// Validates: Requirements 4.6
4201 #[test]
4202 fn prop_logging_extra_fields_empty(
4203 message in "[a-zA-Z0-9 ]{1,100}",
4204 log_level in 0u8..4u8,
4205 ) {
4206 use crate::client::MockDurableServiceClient;
4207 use crate::lambda::InitialExecutionState;
4208
4209 let captured_info = Arc::new(Mutex::new(None::<LogInfo>));
4210 let captured_info_clone = captured_info.clone();
4211
4212 // Create a logger that captures the LogInfo
4213 let inner = Arc::new(custom_logger(
4214 {
4215 let captured = captured_info_clone.clone();
4216 move |_, info: &LogInfo| {
4217 *captured.lock().unwrap() = Some(info.clone());
4218 }
4219 },
4220 {
4221 let captured = captured_info_clone.clone();
4222 move |_, info: &LogInfo| {
4223 *captured.lock().unwrap() = Some(info.clone());
4224 }
4225 },
4226 {
4227 let captured = captured_info_clone.clone();
4228 move |_, info: &LogInfo| {
4229 *captured.lock().unwrap() = Some(info.clone());
4230 }
4231 },
4232 {
4233 let captured = captured_info_clone.clone();
4234 move |_, info: &LogInfo| {
4235 *captured.lock().unwrap() = Some(info.clone());
4236 }
4237 },
4238 ));
4239
4240 let client: crate::client::SharedDurableServiceClient = Arc::new(MockDurableServiceClient::new());
4241 let state = Arc::new(ExecutionState::new(
4242 "arn:aws:lambda:us-east-1:123456789012:function:test:durable:abc123",
4243 "token-123",
4244 InitialExecutionState::new(),
4245 client,
4246 ));
4247 let ctx = DurableContext::new(state).with_logger(inner);
4248
4249 // Call the appropriate log method with empty fields
4250 let empty_fields: &[(&str, &str)] = &[];
4251 match log_level {
4252 0 => ctx.log_debug_with(&message, empty_fields),
4253 1 => ctx.log_info_with(&message, empty_fields),
4254 2 => ctx.log_warn_with(&message, empty_fields),
4255 _ => ctx.log_error_with(&message, empty_fields),
4256 }
4257
4258 // Verify the captured LogInfo has no extra fields
4259 let captured = captured_info.lock().unwrap();
4260 let info = captured.as_ref().expect("LogInfo should be captured");
4261
4262 prop_assert!(
4263 info.extra.is_empty(),
4264 "Extra fields should be empty when none are provided"
4265 );
4266
4267 // But automatic context should still be present
4268 prop_assert!(
4269 info.durable_execution_arn.is_some(),
4270 "durable_execution_arn must still be present even with empty extra fields"
4271 );
4272 }
4273 }
4274 }
4275}
4276
4277#[cfg(test)]
4278mod sealed_trait_tests {
4279 use super::*;
4280 use std::sync::atomic::{AtomicUsize, Ordering};
4281
4282 /// Tests for sealed Logger trait implementations.
4283 ///
4284 /// The Logger trait is sealed, meaning it cannot be implemented outside this crate.
4285 /// This is enforced at compile time by requiring the private `Sealed` supertrait.
4286 /// External crates attempting to implement Logger will get a compile error:
4287 /// "the trait bound `MyType: Sealed` is not satisfied"
4288 ///
4289 /// These tests verify that the internal implementations work correctly.
4290 mod logger_tests {
4291 use super::*;
4292
4293 #[test]
4294 fn test_tracing_logger_implements_logger() {
4295 // TracingLogger should implement Logger (compile-time check)
4296 let logger: &dyn Logger = &TracingLogger;
4297 let info = LogInfo::default();
4298
4299 // These calls should not panic
4300 logger.debug("test debug", &info);
4301 logger.info("test info", &info);
4302 logger.warn("test warn", &info);
4303 logger.error("test error", &info);
4304 }
4305
4306 #[test]
4307 fn test_replay_aware_logger_implements_logger() {
4308 // ReplayAwareLogger should implement Logger (compile-time check)
4309 let inner = Arc::new(TracingLogger);
4310 let logger = ReplayAwareLogger::new(inner, ReplayLoggingConfig::AllowAll);
4311 let logger_ref: &dyn Logger = &logger;
4312 let info = LogInfo::default();
4313
4314 // These calls should not panic
4315 logger_ref.debug("test debug", &info);
4316 logger_ref.info("test info", &info);
4317 logger_ref.warn("test warn", &info);
4318 logger_ref.error("test error", &info);
4319 }
4320
4321 #[test]
4322 fn test_custom_logger_implements_logger() {
4323 // CustomLogger should implement Logger (compile-time check)
4324 let call_count = Arc::new(AtomicUsize::new(0));
4325 let count_clone = call_count.clone();
4326
4327 let logger = custom_logger(
4328 {
4329 let count = count_clone.clone();
4330 move |_msg, _info| {
4331 count.fetch_add(1, Ordering::SeqCst);
4332 }
4333 },
4334 {
4335 let count = count_clone.clone();
4336 move |_msg, _info| {
4337 count.fetch_add(1, Ordering::SeqCst);
4338 }
4339 },
4340 {
4341 let count = count_clone.clone();
4342 move |_msg, _info| {
4343 count.fetch_add(1, Ordering::SeqCst);
4344 }
4345 },
4346 {
4347 let count = count_clone.clone();
4348 move |_msg, _info| {
4349 count.fetch_add(1, Ordering::SeqCst);
4350 }
4351 },
4352 );
4353
4354 let logger_ref: &dyn Logger = &logger;
4355 let info = LogInfo::default();
4356
4357 logger_ref.debug("test", &info);
4358 logger_ref.info("test", &info);
4359 logger_ref.warn("test", &info);
4360 logger_ref.error("test", &info);
4361
4362 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4363 }
4364
4365 #[test]
4366 fn test_simple_custom_logger() {
4367 let call_count = Arc::new(AtomicUsize::new(0));
4368 let count_clone = call_count.clone();
4369
4370 let logger = simple_custom_logger(move |_level, _msg, _info| {
4371 count_clone.fetch_add(1, Ordering::SeqCst);
4372 });
4373
4374 let info = LogInfo::default();
4375
4376 logger.debug("test", &info);
4377 logger.info("test", &info);
4378 logger.warn("test", &info);
4379 logger.error("test", &info);
4380
4381 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4382 }
4383
4384 #[test]
4385 fn test_custom_logger_receives_correct_messages() {
4386 let messages = Arc::new(std::sync::Mutex::new(Vec::new()));
4387 let messages_clone = messages.clone();
4388
4389 let logger = simple_custom_logger(move |level, msg, _info| {
4390 messages_clone
4391 .lock()
4392 .unwrap()
4393 .push(format!("[{}] {}", level, msg));
4394 });
4395
4396 let info = LogInfo::default();
4397
4398 logger.debug("debug message", &info);
4399 logger.info("info message", &info);
4400 logger.warn("warn message", &info);
4401 logger.error("error message", &info);
4402
4403 let logged = messages.lock().unwrap();
4404 assert_eq!(logged.len(), 4);
4405 assert_eq!(logged[0], "[DEBUG] debug message");
4406 assert_eq!(logged[1], "[INFO] info message");
4407 assert_eq!(logged[2], "[WARN] warn message");
4408 assert_eq!(logged[3], "[ERROR] error message");
4409 }
4410
4411 #[test]
4412 fn test_custom_logger_receives_log_info() {
4413 let received_info = Arc::new(std::sync::Mutex::new(None));
4414 let info_clone = received_info.clone();
4415
4416 let logger = simple_custom_logger(move |_level, _msg, info| {
4417 *info_clone.lock().unwrap() = Some(info.clone());
4418 });
4419
4420 let info = LogInfo::new("arn:aws:test")
4421 .with_operation_id("op-123")
4422 .with_parent_id("parent-456")
4423 .with_replay(true);
4424
4425 logger.info("test", &info);
4426
4427 let received = received_info.lock().unwrap().clone().unwrap();
4428 assert_eq!(
4429 received.durable_execution_arn,
4430 Some("arn:aws:test".to_string())
4431 );
4432 assert_eq!(received.operation_id, Some("op-123".to_string()));
4433 assert_eq!(received.parent_id, Some("parent-456".to_string()));
4434 assert!(received.is_replay);
4435 }
4436
4437 #[test]
4438 fn test_replay_aware_logger_suppresses_during_replay() {
4439 let call_count = Arc::new(AtomicUsize::new(0));
4440 let count_clone = call_count.clone();
4441
4442 let inner_logger = Arc::new(custom_logger(
4443 {
4444 let count = count_clone.clone();
4445 move |_msg, _info| {
4446 count.fetch_add(1, Ordering::SeqCst);
4447 }
4448 },
4449 {
4450 let count = count_clone.clone();
4451 move |_msg, _info| {
4452 count.fetch_add(1, Ordering::SeqCst);
4453 }
4454 },
4455 {
4456 let count = count_clone.clone();
4457 move |_msg, _info| {
4458 count.fetch_add(1, Ordering::SeqCst);
4459 }
4460 },
4461 {
4462 let count = count_clone.clone();
4463 move |_msg, _info| {
4464 count.fetch_add(1, Ordering::SeqCst);
4465 }
4466 },
4467 ));
4468
4469 let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::SuppressAll);
4470
4471 // Non-replay logs should pass through
4472 let non_replay_info = LogInfo::default().with_replay(false);
4473 logger.debug("test", &non_replay_info);
4474 logger.info("test", &non_replay_info);
4475 logger.warn("test", &non_replay_info);
4476 logger.error("test", &non_replay_info);
4477 assert_eq!(call_count.load(Ordering::SeqCst), 4);
4478
4479 // Replay logs should be suppressed
4480 let replay_info = LogInfo::default().with_replay(true);
4481 logger.debug("test", &replay_info);
4482 logger.info("test", &replay_info);
4483 logger.warn("test", &replay_info);
4484 logger.error("test", &replay_info);
4485 assert_eq!(call_count.load(Ordering::SeqCst), 4); // Still 4, no new calls
4486 }
4487
4488 #[test]
4489 fn test_replay_aware_logger_errors_only_during_replay() {
4490 let call_count = Arc::new(AtomicUsize::new(0));
4491 let count_clone = call_count.clone();
4492
4493 let inner_logger = Arc::new(custom_logger(
4494 {
4495 let count = count_clone.clone();
4496 move |_msg, _info| {
4497 count.fetch_add(1, Ordering::SeqCst);
4498 }
4499 },
4500 {
4501 let count = count_clone.clone();
4502 move |_msg, _info| {
4503 count.fetch_add(1, Ordering::SeqCst);
4504 }
4505 },
4506 {
4507 let count = count_clone.clone();
4508 move |_msg, _info| {
4509 count.fetch_add(1, Ordering::SeqCst);
4510 }
4511 },
4512 {
4513 let count = count_clone.clone();
4514 move |_msg, _info| {
4515 count.fetch_add(1, Ordering::SeqCst);
4516 }
4517 },
4518 ));
4519
4520 let logger = ReplayAwareLogger::new(inner_logger, ReplayLoggingConfig::ErrorsOnly);
4521
4522 let replay_info = LogInfo::default().with_replay(true);
4523 logger.debug("test", &replay_info);
4524 logger.info("test", &replay_info);
4525 logger.warn("test", &replay_info);
4526 logger.error("test", &replay_info);
4527
4528 // Only error should pass through during replay
4529 assert_eq!(call_count.load(Ordering::SeqCst), 1);
4530 }
4531 }
4532}