Skip to main content

durable_lambda_core/
event.rs

1//! Event parsing helpers for durable execution Lambda payloads.
2//!
3//! Parse the `InitialExecutionState` envelope that AWS sends to durable Lambda
4//! handlers. These helpers are shared across all API approach crates (closure,
5//! macro, trait, builder) so event parsing logic stays in one place.
6
7use aws_sdk_lambda::types::{Operation, OperationStatus, OperationType, StepDetails};
8
9/// Parse operations from the `InitialExecutionState` JSON payload.
10///
11/// Constructs [`Operation`] objects from the JSON `"Operations"` array using
12/// the builder pattern. Operations that cannot be fully parsed are silently
13/// skipped.
14///
15/// # Examples
16///
17/// ```
18/// let state = serde_json::json!({
19///     "Operations": [{
20///         "Id": "op-1",
21///         "Type": "Step",
22///         "Status": "Succeeded",
23///         "StartTimestamp": 1700000000.0,
24///         "StepDetails": { "Result": "{\"ok\":true}" }
25///     }]
26/// });
27/// let ops = durable_lambda_core::event::parse_operations(&state);
28/// assert_eq!(ops.len(), 1);
29/// ```
30pub fn parse_operations(initial_state: &serde_json::Value) -> Vec<Operation> {
31    let Some(ops_array) = initial_state["Operations"].as_array() else {
32        return vec![];
33    };
34
35    ops_array
36        .iter()
37        .filter_map(|op_json| {
38            let id = op_json["Id"].as_str()?;
39            let op_type = parse_operation_type(op_json["Type"].as_str()?)?;
40            let status = parse_operation_status(op_json["Status"].as_str()?)?;
41
42            let timestamp = op_json["StartTimestamp"]
43                .as_f64()
44                .map(aws_smithy_types::DateTime::from_secs_f64)
45                .unwrap_or_else(|| aws_smithy_types::DateTime::from_secs(0));
46
47            let mut builder = Operation::builder()
48                .id(id)
49                .r#type(op_type)
50                .status(status)
51                .start_timestamp(timestamp);
52
53            // Parse step details if present.
54            if let Some(step_details_json) = op_json.get("StepDetails") {
55                let mut sd_builder = StepDetails::builder();
56
57                if let Some(result) = step_details_json["Result"].as_str() {
58                    sd_builder = sd_builder.result(result);
59                }
60
61                if let Some(error_json) = step_details_json.get("Error") {
62                    if let (Some(error_type), Some(error_data)) = (
63                        error_json["ErrorType"].as_str(),
64                        error_json["ErrorData"].as_str(),
65                    ) {
66                        sd_builder = sd_builder.error(
67                            aws_sdk_lambda::types::ErrorObject::builder()
68                                .error_type(error_type)
69                                .error_data(error_data)
70                                .build(),
71                        );
72                    }
73                }
74
75                if let Some(attempt) = step_details_json["Attempt"].as_i64() {
76                    sd_builder = sd_builder.attempt(attempt as i32);
77                }
78
79                builder = builder.step_details(sd_builder.build());
80            }
81
82            // Parse execution details if present.
83            if let Some(exec_json) = op_json.get("ExecutionDetails") {
84                let mut ed_builder = aws_sdk_lambda::types::ExecutionDetails::builder();
85                if let Some(input) = exec_json["InputPayload"].as_str() {
86                    ed_builder = ed_builder.input_payload(input);
87                }
88                builder = builder.execution_details(ed_builder.build());
89            }
90
91            builder.build().ok()
92        })
93        .collect()
94}
95
96/// Parse an operation type string into the AWS SDK enum.
97///
98/// Accepts both PascalCase (`"Step"`) and UPPER_CASE (`"STEP"`) variants.
99///
100/// # Examples
101///
102/// ```
103/// use durable_lambda_core::event::parse_operation_type;
104/// use aws_sdk_lambda::types::OperationType;
105///
106/// assert_eq!(parse_operation_type("Step"), Some(OperationType::Step));
107/// assert_eq!(parse_operation_type("EXECUTION"), Some(OperationType::Execution));
108/// assert_eq!(parse_operation_type("unknown"), None);
109/// ```
110pub fn parse_operation_type(s: &str) -> Option<OperationType> {
111    match s {
112        "Step" | "STEP" => Some(OperationType::Step),
113        "Execution" | "EXECUTION" => Some(OperationType::Execution),
114        "Wait" | "WAIT" => Some(OperationType::Wait),
115        "Callback" | "CALLBACK" => Some(OperationType::Callback),
116        "ChainedInvoke" | "CHAINED_INVOKE" => Some(OperationType::ChainedInvoke),
117        _ => None,
118    }
119}
120
121/// Parse an operation status string into the AWS SDK enum.
122///
123/// Accepts both PascalCase (`"Succeeded"`) and UPPER_CASE (`"SUCCEEDED"`) variants.
124///
125/// # Examples
126///
127/// ```
128/// use durable_lambda_core::event::parse_operation_status;
129/// use aws_sdk_lambda::types::OperationStatus;
130///
131/// assert_eq!(parse_operation_status("Succeeded"), Some(OperationStatus::Succeeded));
132/// assert_eq!(parse_operation_status("PENDING"), Some(OperationStatus::Pending));
133/// assert_eq!(parse_operation_status("unknown"), None);
134/// ```
135pub fn parse_operation_status(s: &str) -> Option<OperationStatus> {
136    match s {
137        "Succeeded" | "SUCCEEDED" => Some(OperationStatus::Succeeded),
138        "Failed" | "FAILED" => Some(OperationStatus::Failed),
139        "Pending" | "PENDING" => Some(OperationStatus::Pending),
140        "Ready" | "READY" => Some(OperationStatus::Ready),
141        "Started" | "STARTED" => Some(OperationStatus::Started),
142        _ => None,
143    }
144}
145
146/// Structured data extracted from a durable Lambda invocation payload.
147///
148/// Contains all fields needed to construct a [`DurableContext`](crate::context::DurableContext):
149/// the execution ARN, checkpoint token, initial operations, pagination marker,
150/// and the user's original event payload.
151///
152/// # Examples
153///
154/// ```
155/// let payload = serde_json::json!({
156///     "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
157///     "CheckpointToken": "tok-1",
158///     "InitialExecutionState": {
159///         "Operations": [],
160///         "NextMarker": ""
161///     }
162/// });
163/// let data = durable_lambda_core::event::parse_invocation(&payload).unwrap();
164/// assert_eq!(data.durable_execution_arn, "arn:aws:lambda:us-east-1:123:durable-execution/test");
165/// ```
166#[derive(Debug)]
167pub struct InvocationData {
168    /// The durable execution ARN from the event envelope.
169    pub durable_execution_arn: String,
170    /// The initial checkpoint token from the event envelope.
171    pub checkpoint_token: String,
172    /// Parsed operations from InitialExecutionState.
173    pub operations: Vec<aws_sdk_lambda::types::Operation>,
174    /// Pagination marker for additional operations pages (None if all loaded).
175    pub next_marker: Option<String>,
176    /// The user's original event payload extracted from the Execution operation.
177    pub user_event: serde_json::Value,
178}
179
180/// Parse all durable execution fields from a Lambda event payload.
181///
182/// Extracts ARN, checkpoint token, initial operations, pagination marker,
183/// and user event from the standard durable Lambda event envelope.
184///
185/// # Arguments
186///
187/// * `payload` - The raw Lambda event payload as JSON
188///
189/// # Errors
190///
191/// Returns `Err(&'static str)` if `DurableExecutionArn` or `CheckpointToken`
192/// is missing from the payload.
193///
194/// # Examples
195///
196/// ```
197/// let payload = serde_json::json!({
198///     "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
199///     "CheckpointToken": "tok-1",
200///     "InitialExecutionState": {
201///         "Operations": [{
202///             "Id": "exec-1",
203///             "Type": "Execution",
204///             "Status": "Started",
205///             "ExecutionDetails": { "InputPayload": "{\"order_id\": 42}" }
206///         }]
207///     }
208/// });
209/// let data = durable_lambda_core::event::parse_invocation(&payload).unwrap();
210/// assert_eq!(data.checkpoint_token, "tok-1");
211/// assert_eq!(data.user_event["order_id"], 42);
212/// ```
213pub fn parse_invocation(payload: &serde_json::Value) -> Result<InvocationData, &'static str> {
214    let durable_execution_arn = payload["DurableExecutionArn"]
215        .as_str()
216        .ok_or("missing DurableExecutionArn in event")?
217        .to_string();
218
219    let checkpoint_token = payload["CheckpointToken"]
220        .as_str()
221        .ok_or("missing CheckpointToken in event")?
222        .to_string();
223
224    let initial_state = &payload["InitialExecutionState"];
225    let operations = parse_operations(initial_state);
226
227    let next_marker = initial_state["NextMarker"]
228        .as_str()
229        .filter(|s| !s.is_empty())
230        .map(|s| s.to_string());
231
232    let user_event = extract_user_event(initial_state);
233
234    Ok(InvocationData {
235        durable_execution_arn,
236        checkpoint_token,
237        operations,
238        next_marker,
239        user_event,
240    })
241}
242
243/// Extract the user's original event payload from the `InitialExecutionState`.
244///
245/// The first operation with type `EXECUTION` contains the user's input payload
246/// in its `ExecutionDetails.InputPayload` field. If not found or unparsable,
247/// returns an empty JSON object.
248///
249/// # Examples
250///
251/// ```
252/// let state = serde_json::json!({
253///     "Operations": [{
254///         "Id": "exec-1",
255///         "Type": "Execution",
256///         "Status": "Started",
257///         "ExecutionDetails": {
258///             "InputPayload": "{\"order_id\": 42}"
259///         }
260///     }]
261/// });
262/// let event = durable_lambda_core::event::extract_user_event(&state);
263/// assert_eq!(event["order_id"], 42);
264/// ```
265pub fn extract_user_event(initial_state: &serde_json::Value) -> serde_json::Value {
266    if let Some(ops) = initial_state["Operations"].as_array() {
267        for op in ops {
268            if op["Type"].as_str() == Some("Execution") || op["Type"].as_str() == Some("EXECUTION")
269            {
270                if let Some(input) = op
271                    .get("ExecutionDetails")
272                    .and_then(|ed| ed["InputPayload"].as_str())
273                {
274                    if let Ok(parsed) = serde_json::from_str(input) {
275                        return parsed;
276                    }
277                }
278            }
279        }
280    }
281    serde_json::Value::Object(serde_json::Map::new())
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn parse_empty_operations() {
290        let state = serde_json::json!({});
291        assert!(parse_operations(&state).is_empty());
292    }
293
294    #[test]
295    fn parse_operations_with_step() {
296        let state = serde_json::json!({
297            "Operations": [{
298                "Id": "step-1",
299                "Type": "Step",
300                "Status": "Succeeded",
301                "StartTimestamp": 1700000000.0,
302                "StepDetails": {
303                    "Result": "{\"value\": 42}",
304                    "Attempt": 1
305                }
306            }]
307        });
308        let ops = parse_operations(&state);
309        assert_eq!(ops.len(), 1);
310        assert_eq!(ops[0].id(), "step-1");
311    }
312
313    #[test]
314    fn parse_operations_skips_invalid() {
315        let state = serde_json::json!({
316            "Operations": [
317                { "Id": "good", "Type": "Step", "Status": "Succeeded" },
318                { "Id": "bad", "Type": "Unknown", "Status": "Succeeded" },
319            ]
320        });
321        let ops = parse_operations(&state);
322        assert_eq!(ops.len(), 1);
323        assert_eq!(ops[0].id(), "good");
324    }
325
326    #[test]
327    fn parse_operation_type_all_variants() {
328        assert_eq!(parse_operation_type("Step"), Some(OperationType::Step));
329        assert_eq!(parse_operation_type("STEP"), Some(OperationType::Step));
330        assert_eq!(
331            parse_operation_type("Execution"),
332            Some(OperationType::Execution)
333        );
334        assert_eq!(
335            parse_operation_type("EXECUTION"),
336            Some(OperationType::Execution)
337        );
338        assert_eq!(parse_operation_type("Wait"), Some(OperationType::Wait));
339        assert_eq!(parse_operation_type("WAIT"), Some(OperationType::Wait));
340        assert_eq!(
341            parse_operation_type("Callback"),
342            Some(OperationType::Callback)
343        );
344        assert_eq!(
345            parse_operation_type("CALLBACK"),
346            Some(OperationType::Callback)
347        );
348        assert_eq!(
349            parse_operation_type("ChainedInvoke"),
350            Some(OperationType::ChainedInvoke)
351        );
352        assert_eq!(
353            parse_operation_type("CHAINED_INVOKE"),
354            Some(OperationType::ChainedInvoke)
355        );
356        assert_eq!(parse_operation_type("bogus"), None);
357    }
358
359    #[test]
360    fn parse_operation_status_all_variants() {
361        assert_eq!(
362            parse_operation_status("Succeeded"),
363            Some(OperationStatus::Succeeded)
364        );
365        assert_eq!(
366            parse_operation_status("SUCCEEDED"),
367            Some(OperationStatus::Succeeded)
368        );
369        assert_eq!(
370            parse_operation_status("Failed"),
371            Some(OperationStatus::Failed)
372        );
373        assert_eq!(
374            parse_operation_status("Pending"),
375            Some(OperationStatus::Pending)
376        );
377        assert_eq!(
378            parse_operation_status("Ready"),
379            Some(OperationStatus::Ready)
380        );
381        assert_eq!(
382            parse_operation_status("Started"),
383            Some(OperationStatus::Started)
384        );
385        assert_eq!(parse_operation_status("bogus"), None);
386    }
387
388    #[test]
389    fn extract_user_event_from_execution_op() {
390        let state = serde_json::json!({
391            "Operations": [{
392                "Id": "exec-1",
393                "Type": "Execution",
394                "Status": "Started",
395                "ExecutionDetails": {
396                    "InputPayload": "{\"order_id\": 42}"
397                }
398            }]
399        });
400        let event = extract_user_event(&state);
401        assert_eq!(event["order_id"], 42);
402    }
403
404    #[test]
405    fn extract_user_event_returns_empty_when_missing() {
406        let state = serde_json::json!({ "Operations": [] });
407        let event = extract_user_event(&state);
408        assert!(event.as_object().unwrap().is_empty());
409    }
410
411    #[test]
412    fn extract_user_event_handles_uppercase_type() {
413        let state = serde_json::json!({
414            "Operations": [{
415                "Id": "exec-1",
416                "Type": "EXECUTION",
417                "Status": "STARTED",
418                "ExecutionDetails": {
419                    "InputPayload": "{\"key\": \"value\"}"
420                }
421            }]
422        });
423        let event = extract_user_event(&state);
424        assert_eq!(event["key"], "value");
425    }
426
427    #[test]
428    fn parse_invocation_valid_complete_payload() {
429        let payload = serde_json::json!({
430            "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
431            "CheckpointToken": "tok-abc",
432            "InitialExecutionState": {
433                "Operations": [{
434                    "Id": "exec-1",
435                    "Type": "Execution",
436                    "Status": "Started",
437                    "ExecutionDetails": { "InputPayload": "{\"order_id\": 99}" }
438                }],
439                "NextMarker": "page-2"
440            }
441        });
442        let data = parse_invocation(&payload).unwrap();
443        assert_eq!(
444            data.durable_execution_arn,
445            "arn:aws:lambda:us-east-1:123:durable-execution/test"
446        );
447        assert_eq!(data.checkpoint_token, "tok-abc");
448        assert_eq!(data.operations.len(), 1);
449        assert_eq!(data.next_marker, Some("page-2".to_string()));
450        assert_eq!(data.user_event["order_id"], 99);
451    }
452
453    #[test]
454    fn parse_invocation_missing_arn_returns_error() {
455        let payload = serde_json::json!({
456            "CheckpointToken": "tok-1",
457            "InitialExecutionState": { "Operations": [] }
458        });
459        let result = parse_invocation(&payload);
460        assert!(result.is_err());
461        assert_eq!(result.unwrap_err(), "missing DurableExecutionArn in event");
462    }
463
464    #[test]
465    fn parse_invocation_missing_token_returns_error() {
466        let payload = serde_json::json!({
467            "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
468            "InitialExecutionState": { "Operations": [] }
469        });
470        let result = parse_invocation(&payload);
471        assert!(result.is_err());
472        assert_eq!(result.unwrap_err(), "missing CheckpointToken in event");
473    }
474
475    #[test]
476    fn parse_invocation_empty_next_marker_produces_none() {
477        let payload = serde_json::json!({
478            "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
479            "CheckpointToken": "tok-1",
480            "InitialExecutionState": {
481                "Operations": [],
482                "NextMarker": ""
483            }
484        });
485        let data = parse_invocation(&payload).unwrap();
486        assert_eq!(data.next_marker, None);
487    }
488
489    #[test]
490    fn parse_invocation_nonempty_next_marker_produces_some() {
491        let payload = serde_json::json!({
492            "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
493            "CheckpointToken": "tok-1",
494            "InitialExecutionState": {
495                "Operations": [],
496                "NextMarker": "cursor-xyz"
497            }
498        });
499        let data = parse_invocation(&payload).unwrap();
500        assert_eq!(data.next_marker, Some("cursor-xyz".to_string()));
501    }
502}