Skip to main content

durable_lambda_core/
replay.rs

1//! Replay engine — operation-keyed state with visited tracking.
2//!
3//! Implement FR1-FR5: history loading, replay/execute mode detection,
4//! cached result return, checkpoint execution, and replay status transitions.
5//!
6//! The replay engine uses a `HashMap<String, Operation>` keyed by operation ID
7//! (matching the Python SDK's approach) and tracks which operations have been
8//! visited. The replay status transitions from `Replaying` to `Executing` when
9//! all completed operations in history have been visited.
10
11use std::collections::{HashMap, HashSet};
12
13use aws_sdk_lambda::types::{Operation, OperationStatus};
14
15use crate::operation_id::OperationIdGenerator;
16use crate::types::ExecutionMode;
17
18/// Manage replay state for a durable execution.
19///
20/// The engine holds the complete operation state loaded from AWS, tracks which
21/// operations have been visited during the current invocation, and determines
22/// whether the execution is replaying cached results or executing new work.
23///
24/// # Replay Status Transitions
25///
26/// - Starts in [`ExecutionMode::Replaying`] if completed operations exist in history.
27/// - Starts in [`ExecutionMode::Executing`] if history is empty or has no completed operations.
28/// - Transitions from `Replaying` to `Executing` when all completed operations
29///   have been visited via [`track_replay`](Self::track_replay).
30///
31/// # Examples
32///
33/// ```
34/// use durable_lambda_core::replay::ReplayEngine;
35/// use durable_lambda_core::types::ExecutionMode;
36/// use std::collections::HashMap;
37///
38/// // Empty history → starts in Executing mode.
39/// let engine = ReplayEngine::new(HashMap::new(), None);
40/// assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
41/// ```
42pub struct ReplayEngine {
43    /// All operations from the durable execution state, keyed by operation ID.
44    operations: HashMap<String, Operation>,
45    /// Operation IDs that have been visited during the current invocation.
46    visited: HashSet<String>,
47    /// IDs of operations with completed statuses (cached at init for perf).
48    completed_ids: HashSet<String>,
49    /// Current replay/execute mode.
50    mode: ExecutionMode,
51    /// Deterministic operation ID generator.
52    id_generator: OperationIdGenerator,
53}
54
55/// Check whether an operation status represents a completed state.
56///
57/// Completed statuses: `Succeeded`, `Failed`, `Cancelled`, `TimedOut`, `Stopped`.
58fn is_completed_status(status: &OperationStatus) -> bool {
59    matches!(
60        status,
61        OperationStatus::Succeeded
62            | OperationStatus::Failed
63            | OperationStatus::Cancelled
64            | OperationStatus::TimedOut
65            | OperationStatus::Stopped
66    )
67}
68
69impl ReplayEngine {
70    /// Create a new replay engine from loaded operations.
71    ///
72    /// Sets the initial [`ExecutionMode`] based on whether completed operations
73    /// exist in the history. Operations with type `Execution` are excluded from
74    /// replay tracking (they represent the root invocation, not user operations).
75    ///
76    /// # Arguments
77    ///
78    /// * `operations` — All operations from the durable execution state, keyed by ID.
79    /// * `parent_id` — Parent operation ID for child context scoping (`None` for root).
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use durable_lambda_core::replay::ReplayEngine;
85    /// use durable_lambda_core::types::ExecutionMode;
86    /// use std::collections::HashMap;
87    ///
88    /// let engine = ReplayEngine::new(HashMap::new(), None);
89    /// assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
90    /// ```
91    pub fn new(operations: HashMap<String, Operation>, parent_id: Option<String>) -> Self {
92        let completed_ids: HashSet<String> = operations
93            .iter()
94            .filter(|(_, op)| {
95                is_completed_status(&op.status)
96                    && op.r#type != aws_sdk_lambda::types::OperationType::Execution
97            })
98            .map(|(id, _)| id.clone())
99            .collect();
100
101        let mode = if completed_ids.is_empty() {
102            ExecutionMode::Executing
103        } else {
104            ExecutionMode::Replaying
105        };
106
107        Self {
108            operations,
109            visited: HashSet::new(),
110            completed_ids,
111            mode,
112            id_generator: OperationIdGenerator::new(parent_id),
113        }
114    }
115
116    /// Look up an operation by ID, returning it if it exists with a completed status.
117    ///
118    /// Returns `None` if the operation doesn't exist or is not in a completed state.
119    ///
120    /// # Examples
121    ///
122    /// ```
123    /// use durable_lambda_core::replay::ReplayEngine;
124    /// use std::collections::HashMap;
125    ///
126    /// let engine = ReplayEngine::new(HashMap::new(), None);
127    /// assert!(engine.check_result("nonexistent").is_none());
128    /// ```
129    pub fn check_result(&self, operation_id: &str) -> Option<&Operation> {
130        self.operations
131            .get(operation_id)
132            .filter(|op| is_completed_status(&op.status))
133    }
134
135    /// Mark an operation as visited and update replay status.
136    ///
137    /// After visiting, checks whether all completed operations have been visited.
138    /// If so, transitions the mode from [`ExecutionMode::Replaying`] to
139    /// [`ExecutionMode::Executing`].
140    ///
141    /// # Examples
142    ///
143    /// ```
144    /// use durable_lambda_core::replay::ReplayEngine;
145    /// use std::collections::HashMap;
146    ///
147    /// let mut engine = ReplayEngine::new(HashMap::new(), None);
148    /// engine.track_replay("some-op-id");
149    /// ```
150    pub fn track_replay(&mut self, operation_id: &str) {
151        self.visited.insert(operation_id.to_string());
152
153        if self.mode == ExecutionMode::Replaying && self.completed_ids.is_subset(&self.visited) {
154            self.mode = ExecutionMode::Executing;
155        }
156    }
157
158    /// Return whether the engine is currently in replay mode.
159    ///
160    /// # Examples
161    ///
162    /// ```
163    /// use durable_lambda_core::replay::ReplayEngine;
164    /// use std::collections::HashMap;
165    ///
166    /// let engine = ReplayEngine::new(HashMap::new(), None);
167    /// assert!(!engine.is_replaying());
168    /// ```
169    pub fn is_replaying(&self) -> bool {
170        self.mode == ExecutionMode::Replaying
171    }
172
173    /// Return the current execution mode.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// use durable_lambda_core::replay::ReplayEngine;
179    /// use durable_lambda_core::types::ExecutionMode;
180    /// use std::collections::HashMap;
181    ///
182    /// let engine = ReplayEngine::new(HashMap::new(), None);
183    /// assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
184    /// ```
185    pub fn execution_mode(&self) -> ExecutionMode {
186        self.mode.clone()
187    }
188
189    /// Generate the next deterministic operation ID.
190    ///
191    /// Delegates to the internal [`OperationIdGenerator`].
192    ///
193    /// # Examples
194    ///
195    /// ```
196    /// use durable_lambda_core::replay::ReplayEngine;
197    /// use std::collections::HashMap;
198    ///
199    /// let mut engine = ReplayEngine::new(HashMap::new(), None);
200    /// let id = engine.generate_operation_id();
201    /// assert_eq!(id.len(), 64);
202    /// ```
203    pub fn generate_operation_id(&mut self) -> String {
204        self.id_generator.next_id()
205    }
206
207    /// Look up an operation by ID, returning it regardless of status.
208    ///
209    /// Unlike [`check_result`](Self::check_result) which only returns
210    /// operations in a completed status, this returns the operation in
211    /// any status (Started, Pending, Succeeded, etc.). Used by callback
212    /// operations that need to extract the server-generated `callback_id`
213    /// from operations that may still be in a non-completed state.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// use durable_lambda_core::replay::ReplayEngine;
219    /// use std::collections::HashMap;
220    ///
221    /// let engine = ReplayEngine::new(HashMap::new(), None);
222    /// assert!(engine.get_operation("nonexistent").is_none());
223    /// ```
224    pub fn get_operation(&self, operation_id: &str) -> Option<&Operation> {
225        self.operations.get(operation_id)
226    }
227
228    /// Return a reference to the operations map.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use durable_lambda_core::replay::ReplayEngine;
234    /// use std::collections::HashMap;
235    ///
236    /// let engine = ReplayEngine::new(HashMap::new(), None);
237    /// assert!(engine.operations().is_empty());
238    /// ```
239    pub fn operations(&self) -> &HashMap<String, Operation> {
240        &self.operations
241    }
242
243    /// Insert or update an operation in the state.
244    ///
245    /// If the operation has a completed status (and is not the root `Execution`
246    /// type), it is added to the completed set for replay tracking.
247    ///
248    /// # Examples
249    ///
250    /// ```
251    /// use durable_lambda_core::replay::ReplayEngine;
252    /// use aws_sdk_lambda::types::{Operation, OperationType, OperationStatus};
253    /// use std::collections::HashMap;
254    ///
255    /// let mut engine = ReplayEngine::new(HashMap::new(), None);
256    /// assert!(engine.operations().is_empty());
257    ///
258    /// let op = Operation::builder()
259    ///     .id("op-1")
260    ///     .r#type(OperationType::Step)
261    ///     .status(OperationStatus::Succeeded)
262    ///     .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
263    ///     .build()
264    ///     .unwrap();
265    /// engine.insert_operation("op-1".to_string(), op);
266    /// assert_eq!(engine.operations().len(), 1);
267    /// ```
268    pub fn insert_operation(&mut self, id: String, operation: Operation) {
269        if is_completed_status(&operation.status)
270            && operation.r#type != aws_sdk_lambda::types::OperationType::Execution
271        {
272            self.completed_ids.insert(id.clone());
273        }
274        self.operations.insert(id, operation);
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use aws_sdk_lambda::types::{Operation, OperationStatus, OperationType};
282    fn make_operation(id: &str, status: OperationStatus, op_type: OperationType) -> Operation {
283        Operation::builder()
284            .id(id)
285            .r#type(op_type)
286            .status(status)
287            .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
288            .build()
289            .unwrap()
290    }
291
292    #[test]
293    fn empty_history_starts_executing() {
294        let engine = ReplayEngine::new(HashMap::new(), None);
295        assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
296        assert!(!engine.is_replaying());
297    }
298
299    #[test]
300    fn completed_operations_start_replaying() {
301        let mut ops = HashMap::new();
302        ops.insert(
303            "op1".to_string(),
304            make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
305        );
306
307        let engine = ReplayEngine::new(ops, None);
308        assert_eq!(engine.execution_mode(), ExecutionMode::Replaying);
309        assert!(engine.is_replaying());
310    }
311
312    #[test]
313    fn only_pending_operations_start_executing() {
314        let mut ops = HashMap::new();
315        ops.insert(
316            "op1".to_string(),
317            make_operation("op1", OperationStatus::Pending, OperationType::Step),
318        );
319
320        let engine = ReplayEngine::new(ops, None);
321        assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
322    }
323
324    #[test]
325    fn execution_type_excluded_from_replay_tracking() {
326        let mut ops = HashMap::new();
327        // Only an EXECUTION-type completed op — should NOT count for replay.
328        ops.insert(
329            "exec".to_string(),
330            make_operation("exec", OperationStatus::Succeeded, OperationType::Execution),
331        );
332
333        let engine = ReplayEngine::new(ops, None);
334        assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
335    }
336
337    #[test]
338    fn transitions_to_executing_after_all_visited() {
339        let mut ops = HashMap::new();
340        ops.insert(
341            "op1".to_string(),
342            make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
343        );
344        ops.insert(
345            "op2".to_string(),
346            make_operation("op2", OperationStatus::Failed, OperationType::Step),
347        );
348
349        let mut engine = ReplayEngine::new(ops, None);
350        assert!(engine.is_replaying());
351
352        engine.track_replay("op1");
353        assert!(engine.is_replaying()); // Still replaying — op2 not visited.
354
355        engine.track_replay("op2");
356        assert!(!engine.is_replaying()); // All completed ops visited → Executing.
357        assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
358    }
359
360    #[test]
361    fn check_result_returns_completed_operations() {
362        let mut ops = HashMap::new();
363        ops.insert(
364            "op1".to_string(),
365            make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
366        );
367        ops.insert(
368            "op2".to_string(),
369            make_operation("op2", OperationStatus::Pending, OperationType::Step),
370        );
371
372        let engine = ReplayEngine::new(ops, None);
373        assert!(engine.check_result("op1").is_some());
374        assert!(engine.check_result("op2").is_none()); // Pending, not completed.
375        assert!(engine.check_result("op3").is_none()); // Doesn't exist.
376    }
377
378    #[test]
379    fn generate_operation_id_is_deterministic() {
380        let mut engine1 = ReplayEngine::new(HashMap::new(), None);
381        let mut engine2 = ReplayEngine::new(HashMap::new(), None);
382
383        let id1a = engine1.generate_operation_id();
384        let id1b = engine2.generate_operation_id();
385        assert_eq!(id1a, id1b);
386
387        let id2a = engine1.generate_operation_id();
388        let id2b = engine2.generate_operation_id();
389        assert_eq!(id2a, id2b);
390        assert_ne!(id1a, id2a);
391    }
392
393    #[test]
394    fn mixed_statuses_only_track_completed() {
395        let mut ops = HashMap::new();
396        ops.insert(
397            "done".to_string(),
398            make_operation("done", OperationStatus::Succeeded, OperationType::Step),
399        );
400        ops.insert(
401            "pending".to_string(),
402            make_operation("pending", OperationStatus::Pending, OperationType::Wait),
403        );
404        ops.insert(
405            "started".to_string(),
406            make_operation("started", OperationStatus::Started, OperationType::Step),
407        );
408
409        let mut engine = ReplayEngine::new(ops, None);
410        assert!(engine.is_replaying());
411
412        // Only need to visit the one completed op to transition.
413        engine.track_replay("done");
414        assert!(!engine.is_replaying());
415    }
416
417    #[test]
418    fn all_completed_statuses_are_tracked() {
419        for status in [
420            OperationStatus::Succeeded,
421            OperationStatus::Failed,
422            OperationStatus::Cancelled,
423            OperationStatus::TimedOut,
424            OperationStatus::Stopped,
425        ] {
426            let mut ops = HashMap::new();
427            ops.insert(
428                "op".to_string(),
429                make_operation("op", status, OperationType::Step),
430            );
431            let engine = ReplayEngine::new(ops, None);
432            assert!(engine.is_replaying(), "Should replay for completed status");
433        }
434    }
435
436    #[test]
437    fn insert_operation_updates_state() {
438        let mut engine = ReplayEngine::new(HashMap::new(), None);
439        assert!(!engine.is_replaying());
440
441        let op = make_operation("new_op", OperationStatus::Succeeded, OperationType::Step);
442        engine.insert_operation("new_op".to_string(), op);
443
444        assert!(engine.check_result("new_op").is_some());
445    }
446}