durable_lambda_core/replay.rs
1//! Replay engine — operation-keyed state with visited tracking.
2//!
3//! Implement FR1-FR5: history loading, replay/execute mode detection,
4//! cached result return, checkpoint execution, and replay status transitions.
5//!
6//! The replay engine uses a `HashMap<String, Operation>` keyed by operation ID
7//! (matching the Python SDK's approach) and tracks which operations have been
8//! visited. The replay status transitions from `Replaying` to `Executing` when
9//! all completed operations in history have been visited.
10
11use std::collections::{HashMap, HashSet};
12
13use aws_sdk_lambda::types::{Operation, OperationStatus};
14
15use crate::operation_id::OperationIdGenerator;
16use crate::types::ExecutionMode;
17
18/// Manage replay state for a durable execution.
19///
20/// The engine holds the complete operation state loaded from AWS, tracks which
21/// operations have been visited during the current invocation, and determines
22/// whether the execution is replaying cached results or executing new work.
23///
24/// # Replay Status Transitions
25///
26/// - Starts in [`ExecutionMode::Replaying`] if completed operations exist in history.
27/// - Starts in [`ExecutionMode::Executing`] if history is empty or has no completed operations.
28/// - Transitions from `Replaying` to `Executing` when all completed operations
29/// have been visited via [`track_replay`](Self::track_replay).
30///
31/// # Examples
32///
33/// ```
34/// use durable_lambda_core::replay::ReplayEngine;
35/// use durable_lambda_core::types::ExecutionMode;
36/// use std::collections::HashMap;
37///
38/// // Empty history → starts in Executing mode.
39/// let engine = ReplayEngine::new(HashMap::new(), None);
40/// assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
41/// ```
42pub struct ReplayEngine {
43 /// All operations from the durable execution state, keyed by operation ID.
44 operations: HashMap<String, Operation>,
45 /// Operation IDs that have been visited during the current invocation.
46 visited: HashSet<String>,
47 /// IDs of operations with completed statuses (cached at init for perf).
48 completed_ids: HashSet<String>,
49 /// Current replay/execute mode.
50 mode: ExecutionMode,
51 /// Deterministic operation ID generator.
52 id_generator: OperationIdGenerator,
53}
54
55/// Check whether an operation status represents a completed state.
56///
57/// Completed statuses: `Succeeded`, `Failed`, `Cancelled`, `TimedOut`, `Stopped`.
58fn is_completed_status(status: &OperationStatus) -> bool {
59 matches!(
60 status,
61 OperationStatus::Succeeded
62 | OperationStatus::Failed
63 | OperationStatus::Cancelled
64 | OperationStatus::TimedOut
65 | OperationStatus::Stopped
66 )
67}
68
69impl ReplayEngine {
70 /// Create a new replay engine from loaded operations.
71 ///
72 /// Sets the initial [`ExecutionMode`] based on whether completed operations
73 /// exist in the history. Operations with type `Execution` are excluded from
74 /// replay tracking (they represent the root invocation, not user operations).
75 ///
76 /// # Arguments
77 ///
78 /// * `operations` — All operations from the durable execution state, keyed by ID.
79 /// * `parent_id` — Parent operation ID for child context scoping (`None` for root).
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use durable_lambda_core::replay::ReplayEngine;
85 /// use durable_lambda_core::types::ExecutionMode;
86 /// use std::collections::HashMap;
87 ///
88 /// let engine = ReplayEngine::new(HashMap::new(), None);
89 /// assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
90 /// ```
91 pub fn new(operations: HashMap<String, Operation>, parent_id: Option<String>) -> Self {
92 let completed_ids: HashSet<String> = operations
93 .iter()
94 .filter(|(_, op)| {
95 is_completed_status(&op.status)
96 && op.r#type != aws_sdk_lambda::types::OperationType::Execution
97 })
98 .map(|(id, _)| id.clone())
99 .collect();
100
101 let mode = if completed_ids.is_empty() {
102 ExecutionMode::Executing
103 } else {
104 ExecutionMode::Replaying
105 };
106
107 Self {
108 operations,
109 visited: HashSet::new(),
110 completed_ids,
111 mode,
112 id_generator: OperationIdGenerator::new(parent_id),
113 }
114 }
115
116 /// Look up an operation by ID, returning it if it exists with a completed status.
117 ///
118 /// Returns `None` if the operation doesn't exist or is not in a completed state.
119 ///
120 /// # Examples
121 ///
122 /// ```
123 /// use durable_lambda_core::replay::ReplayEngine;
124 /// use std::collections::HashMap;
125 ///
126 /// let engine = ReplayEngine::new(HashMap::new(), None);
127 /// assert!(engine.check_result("nonexistent").is_none());
128 /// ```
129 pub fn check_result(&self, operation_id: &str) -> Option<&Operation> {
130 self.operations
131 .get(operation_id)
132 .filter(|op| is_completed_status(&op.status))
133 }
134
135 /// Mark an operation as visited and update replay status.
136 ///
137 /// After visiting, checks whether all completed operations have been visited.
138 /// If so, transitions the mode from [`ExecutionMode::Replaying`] to
139 /// [`ExecutionMode::Executing`].
140 ///
141 /// # Examples
142 ///
143 /// ```
144 /// use durable_lambda_core::replay::ReplayEngine;
145 /// use std::collections::HashMap;
146 ///
147 /// let mut engine = ReplayEngine::new(HashMap::new(), None);
148 /// engine.track_replay("some-op-id");
149 /// ```
150 pub fn track_replay(&mut self, operation_id: &str) {
151 self.visited.insert(operation_id.to_string());
152
153 if self.mode == ExecutionMode::Replaying && self.completed_ids.is_subset(&self.visited) {
154 self.mode = ExecutionMode::Executing;
155 }
156 }
157
158 /// Return whether the engine is currently in replay mode.
159 ///
160 /// # Examples
161 ///
162 /// ```
163 /// use durable_lambda_core::replay::ReplayEngine;
164 /// use std::collections::HashMap;
165 ///
166 /// let engine = ReplayEngine::new(HashMap::new(), None);
167 /// assert!(!engine.is_replaying());
168 /// ```
169 pub fn is_replaying(&self) -> bool {
170 self.mode == ExecutionMode::Replaying
171 }
172
173 /// Return the current execution mode.
174 ///
175 /// # Examples
176 ///
177 /// ```
178 /// use durable_lambda_core::replay::ReplayEngine;
179 /// use durable_lambda_core::types::ExecutionMode;
180 /// use std::collections::HashMap;
181 ///
182 /// let engine = ReplayEngine::new(HashMap::new(), None);
183 /// assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
184 /// ```
185 pub fn execution_mode(&self) -> ExecutionMode {
186 self.mode.clone()
187 }
188
189 /// Generate the next deterministic operation ID.
190 ///
191 /// Delegates to the internal [`OperationIdGenerator`].
192 ///
193 /// # Examples
194 ///
195 /// ```
196 /// use durable_lambda_core::replay::ReplayEngine;
197 /// use std::collections::HashMap;
198 ///
199 /// let mut engine = ReplayEngine::new(HashMap::new(), None);
200 /// let id = engine.generate_operation_id();
201 /// assert_eq!(id.len(), 64);
202 /// ```
203 pub fn generate_operation_id(&mut self) -> String {
204 self.id_generator.next_id()
205 }
206
207 /// Look up an operation by ID, returning it regardless of status.
208 ///
209 /// Unlike [`check_result`](Self::check_result) which only returns
210 /// operations in a completed status, this returns the operation in
211 /// any status (Started, Pending, Succeeded, etc.). Used by callback
212 /// operations that need to extract the server-generated `callback_id`
213 /// from operations that may still be in a non-completed state.
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// use durable_lambda_core::replay::ReplayEngine;
219 /// use std::collections::HashMap;
220 ///
221 /// let engine = ReplayEngine::new(HashMap::new(), None);
222 /// assert!(engine.get_operation("nonexistent").is_none());
223 /// ```
224 pub fn get_operation(&self, operation_id: &str) -> Option<&Operation> {
225 self.operations.get(operation_id)
226 }
227
228 /// Return a reference to the operations map.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use durable_lambda_core::replay::ReplayEngine;
234 /// use std::collections::HashMap;
235 ///
236 /// let engine = ReplayEngine::new(HashMap::new(), None);
237 /// assert!(engine.operations().is_empty());
238 /// ```
239 pub fn operations(&self) -> &HashMap<String, Operation> {
240 &self.operations
241 }
242
243 /// Insert or update an operation in the state.
244 ///
245 /// If the operation has a completed status (and is not the root `Execution`
246 /// type), it is added to the completed set for replay tracking.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// use durable_lambda_core::replay::ReplayEngine;
252 /// use aws_sdk_lambda::types::{Operation, OperationType, OperationStatus};
253 /// use std::collections::HashMap;
254 ///
255 /// let mut engine = ReplayEngine::new(HashMap::new(), None);
256 /// assert!(engine.operations().is_empty());
257 ///
258 /// let op = Operation::builder()
259 /// .id("op-1")
260 /// .r#type(OperationType::Step)
261 /// .status(OperationStatus::Succeeded)
262 /// .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
263 /// .build()
264 /// .unwrap();
265 /// engine.insert_operation("op-1".to_string(), op);
266 /// assert_eq!(engine.operations().len(), 1);
267 /// ```
268 pub fn insert_operation(&mut self, id: String, operation: Operation) {
269 if is_completed_status(&operation.status)
270 && operation.r#type != aws_sdk_lambda::types::OperationType::Execution
271 {
272 self.completed_ids.insert(id.clone());
273 }
274 self.operations.insert(id, operation);
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use aws_sdk_lambda::types::{Operation, OperationStatus, OperationType};
282 fn make_operation(id: &str, status: OperationStatus, op_type: OperationType) -> Operation {
283 Operation::builder()
284 .id(id)
285 .r#type(op_type)
286 .status(status)
287 .start_timestamp(aws_smithy_types::DateTime::from_secs(0))
288 .build()
289 .unwrap()
290 }
291
292 #[test]
293 fn empty_history_starts_executing() {
294 let engine = ReplayEngine::new(HashMap::new(), None);
295 assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
296 assert!(!engine.is_replaying());
297 }
298
299 #[test]
300 fn completed_operations_start_replaying() {
301 let mut ops = HashMap::new();
302 ops.insert(
303 "op1".to_string(),
304 make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
305 );
306
307 let engine = ReplayEngine::new(ops, None);
308 assert_eq!(engine.execution_mode(), ExecutionMode::Replaying);
309 assert!(engine.is_replaying());
310 }
311
312 #[test]
313 fn only_pending_operations_start_executing() {
314 let mut ops = HashMap::new();
315 ops.insert(
316 "op1".to_string(),
317 make_operation("op1", OperationStatus::Pending, OperationType::Step),
318 );
319
320 let engine = ReplayEngine::new(ops, None);
321 assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
322 }
323
324 #[test]
325 fn execution_type_excluded_from_replay_tracking() {
326 let mut ops = HashMap::new();
327 // Only an EXECUTION-type completed op — should NOT count for replay.
328 ops.insert(
329 "exec".to_string(),
330 make_operation("exec", OperationStatus::Succeeded, OperationType::Execution),
331 );
332
333 let engine = ReplayEngine::new(ops, None);
334 assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
335 }
336
337 #[test]
338 fn transitions_to_executing_after_all_visited() {
339 let mut ops = HashMap::new();
340 ops.insert(
341 "op1".to_string(),
342 make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
343 );
344 ops.insert(
345 "op2".to_string(),
346 make_operation("op2", OperationStatus::Failed, OperationType::Step),
347 );
348
349 let mut engine = ReplayEngine::new(ops, None);
350 assert!(engine.is_replaying());
351
352 engine.track_replay("op1");
353 assert!(engine.is_replaying()); // Still replaying — op2 not visited.
354
355 engine.track_replay("op2");
356 assert!(!engine.is_replaying()); // All completed ops visited → Executing.
357 assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
358 }
359
360 #[test]
361 fn check_result_returns_completed_operations() {
362 let mut ops = HashMap::new();
363 ops.insert(
364 "op1".to_string(),
365 make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
366 );
367 ops.insert(
368 "op2".to_string(),
369 make_operation("op2", OperationStatus::Pending, OperationType::Step),
370 );
371
372 let engine = ReplayEngine::new(ops, None);
373 assert!(engine.check_result("op1").is_some());
374 assert!(engine.check_result("op2").is_none()); // Pending, not completed.
375 assert!(engine.check_result("op3").is_none()); // Doesn't exist.
376 }
377
378 #[test]
379 fn generate_operation_id_is_deterministic() {
380 let mut engine1 = ReplayEngine::new(HashMap::new(), None);
381 let mut engine2 = ReplayEngine::new(HashMap::new(), None);
382
383 let id1a = engine1.generate_operation_id();
384 let id1b = engine2.generate_operation_id();
385 assert_eq!(id1a, id1b);
386
387 let id2a = engine1.generate_operation_id();
388 let id2b = engine2.generate_operation_id();
389 assert_eq!(id2a, id2b);
390 assert_ne!(id1a, id2a);
391 }
392
393 #[test]
394 fn mixed_statuses_only_track_completed() {
395 let mut ops = HashMap::new();
396 ops.insert(
397 "done".to_string(),
398 make_operation("done", OperationStatus::Succeeded, OperationType::Step),
399 );
400 ops.insert(
401 "pending".to_string(),
402 make_operation("pending", OperationStatus::Pending, OperationType::Wait),
403 );
404 ops.insert(
405 "started".to_string(),
406 make_operation("started", OperationStatus::Started, OperationType::Step),
407 );
408
409 let mut engine = ReplayEngine::new(ops, None);
410 assert!(engine.is_replaying());
411
412 // Only need to visit the one completed op to transition.
413 engine.track_replay("done");
414 assert!(!engine.is_replaying());
415 }
416
417 #[test]
418 fn all_completed_statuses_are_tracked() {
419 for status in [
420 OperationStatus::Succeeded,
421 OperationStatus::Failed,
422 OperationStatus::Cancelled,
423 OperationStatus::TimedOut,
424 OperationStatus::Stopped,
425 ] {
426 let mut ops = HashMap::new();
427 ops.insert(
428 "op".to_string(),
429 make_operation("op", status, OperationType::Step),
430 );
431 let engine = ReplayEngine::new(ops, None);
432 assert!(engine.is_replaying(), "Should replay for completed status");
433 }
434 }
435
436 #[test]
437 fn insert_operation_updates_state() {
438 let mut engine = ReplayEngine::new(HashMap::new(), None);
439 assert!(!engine.is_replaying());
440
441 let op = make_operation("new_op", OperationStatus::Succeeded, OperationType::Step);
442 engine.insert_operation("new_op".to_string(), op);
443
444 assert!(engine.check_result("new_op").is_some());
445 }
446}