1use aws_sdk_lambda::types::{Operation, OperationStatus, OperationType, StepDetails};
8
9pub fn parse_operations(initial_state: &serde_json::Value) -> Vec<Operation> {
31 let Some(ops_array) = initial_state["Operations"].as_array() else {
32 return vec![];
33 };
34
35 ops_array
36 .iter()
37 .filter_map(|op_json| {
38 let id = op_json["Id"].as_str()?;
39 let op_type = parse_operation_type(op_json["Type"].as_str()?)?;
40 let status = parse_operation_status(op_json["Status"].as_str()?)?;
41
42 let timestamp = op_json["StartTimestamp"]
43 .as_f64()
44 .map(aws_smithy_types::DateTime::from_secs_f64)
45 .unwrap_or_else(|| aws_smithy_types::DateTime::from_secs(0));
46
47 let mut builder = Operation::builder()
48 .id(id)
49 .r#type(op_type)
50 .status(status)
51 .start_timestamp(timestamp);
52
53 if let Some(step_details_json) = op_json.get("StepDetails") {
55 let mut sd_builder = StepDetails::builder();
56
57 if let Some(result) = step_details_json["Result"].as_str() {
58 sd_builder = sd_builder.result(result);
59 }
60
61 if let Some(error_json) = step_details_json.get("Error") {
62 if let (Some(error_type), Some(error_data)) = (
63 error_json["ErrorType"].as_str(),
64 error_json["ErrorData"].as_str(),
65 ) {
66 sd_builder = sd_builder.error(
67 aws_sdk_lambda::types::ErrorObject::builder()
68 .error_type(error_type)
69 .error_data(error_data)
70 .build(),
71 );
72 }
73 }
74
75 if let Some(attempt) = step_details_json["Attempt"].as_i64() {
76 sd_builder = sd_builder.attempt(attempt as i32);
77 }
78
79 builder = builder.step_details(sd_builder.build());
80 }
81
82 if let Some(exec_json) = op_json.get("ExecutionDetails") {
84 let mut ed_builder = aws_sdk_lambda::types::ExecutionDetails::builder();
85 if let Some(input) = exec_json["InputPayload"].as_str() {
86 ed_builder = ed_builder.input_payload(input);
87 }
88 builder = builder.execution_details(ed_builder.build());
89 }
90
91 builder.build().ok()
92 })
93 .collect()
94}
95
96pub fn parse_operation_type(s: &str) -> Option<OperationType> {
111 match s {
112 "Step" | "STEP" => Some(OperationType::Step),
113 "Execution" | "EXECUTION" => Some(OperationType::Execution),
114 "Wait" | "WAIT" => Some(OperationType::Wait),
115 "Callback" | "CALLBACK" => Some(OperationType::Callback),
116 "ChainedInvoke" | "CHAINED_INVOKE" => Some(OperationType::ChainedInvoke),
117 _ => None,
118 }
119}
120
121pub fn parse_operation_status(s: &str) -> Option<OperationStatus> {
136 match s {
137 "Succeeded" | "SUCCEEDED" => Some(OperationStatus::Succeeded),
138 "Failed" | "FAILED" => Some(OperationStatus::Failed),
139 "Pending" | "PENDING" => Some(OperationStatus::Pending),
140 "Ready" | "READY" => Some(OperationStatus::Ready),
141 "Started" | "STARTED" => Some(OperationStatus::Started),
142 _ => None,
143 }
144}
145
146#[derive(Debug)]
167pub struct InvocationData {
168 pub durable_execution_arn: String,
170 pub checkpoint_token: String,
172 pub operations: Vec<aws_sdk_lambda::types::Operation>,
174 pub next_marker: Option<String>,
176 pub user_event: serde_json::Value,
178}
179
180pub fn parse_invocation(payload: &serde_json::Value) -> Result<InvocationData, &'static str> {
214 let durable_execution_arn = payload["DurableExecutionArn"]
215 .as_str()
216 .ok_or("missing DurableExecutionArn in event")?
217 .to_string();
218
219 let checkpoint_token = payload["CheckpointToken"]
220 .as_str()
221 .ok_or("missing CheckpointToken in event")?
222 .to_string();
223
224 let initial_state = &payload["InitialExecutionState"];
225 let operations = parse_operations(initial_state);
226
227 let next_marker = initial_state["NextMarker"]
228 .as_str()
229 .filter(|s| !s.is_empty())
230 .map(|s| s.to_string());
231
232 let user_event = extract_user_event(initial_state);
233
234 Ok(InvocationData {
235 durable_execution_arn,
236 checkpoint_token,
237 operations,
238 next_marker,
239 user_event,
240 })
241}
242
243pub fn extract_user_event(initial_state: &serde_json::Value) -> serde_json::Value {
266 if let Some(ops) = initial_state["Operations"].as_array() {
267 for op in ops {
268 if op["Type"].as_str() == Some("Execution") || op["Type"].as_str() == Some("EXECUTION")
269 {
270 if let Some(input) = op
271 .get("ExecutionDetails")
272 .and_then(|ed| ed["InputPayload"].as_str())
273 {
274 if let Ok(parsed) = serde_json::from_str(input) {
275 return parsed;
276 }
277 }
278 }
279 }
280 }
281 serde_json::Value::Object(serde_json::Map::new())
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn parse_empty_operations() {
290 let state = serde_json::json!({});
291 assert!(parse_operations(&state).is_empty());
292 }
293
294 #[test]
295 fn parse_operations_with_step() {
296 let state = serde_json::json!({
297 "Operations": [{
298 "Id": "step-1",
299 "Type": "Step",
300 "Status": "Succeeded",
301 "StartTimestamp": 1700000000.0,
302 "StepDetails": {
303 "Result": "{\"value\": 42}",
304 "Attempt": 1
305 }
306 }]
307 });
308 let ops = parse_operations(&state);
309 assert_eq!(ops.len(), 1);
310 assert_eq!(ops[0].id(), "step-1");
311 }
312
313 #[test]
314 fn parse_operations_skips_invalid() {
315 let state = serde_json::json!({
316 "Operations": [
317 { "Id": "good", "Type": "Step", "Status": "Succeeded" },
318 { "Id": "bad", "Type": "Unknown", "Status": "Succeeded" },
319 ]
320 });
321 let ops = parse_operations(&state);
322 assert_eq!(ops.len(), 1);
323 assert_eq!(ops[0].id(), "good");
324 }
325
326 #[test]
327 fn parse_operation_type_all_variants() {
328 assert_eq!(parse_operation_type("Step"), Some(OperationType::Step));
329 assert_eq!(parse_operation_type("STEP"), Some(OperationType::Step));
330 assert_eq!(
331 parse_operation_type("Execution"),
332 Some(OperationType::Execution)
333 );
334 assert_eq!(
335 parse_operation_type("EXECUTION"),
336 Some(OperationType::Execution)
337 );
338 assert_eq!(parse_operation_type("Wait"), Some(OperationType::Wait));
339 assert_eq!(parse_operation_type("WAIT"), Some(OperationType::Wait));
340 assert_eq!(
341 parse_operation_type("Callback"),
342 Some(OperationType::Callback)
343 );
344 assert_eq!(
345 parse_operation_type("CALLBACK"),
346 Some(OperationType::Callback)
347 );
348 assert_eq!(
349 parse_operation_type("ChainedInvoke"),
350 Some(OperationType::ChainedInvoke)
351 );
352 assert_eq!(
353 parse_operation_type("CHAINED_INVOKE"),
354 Some(OperationType::ChainedInvoke)
355 );
356 assert_eq!(parse_operation_type("bogus"), None);
357 }
358
359 #[test]
360 fn parse_operation_status_all_variants() {
361 assert_eq!(
362 parse_operation_status("Succeeded"),
363 Some(OperationStatus::Succeeded)
364 );
365 assert_eq!(
366 parse_operation_status("SUCCEEDED"),
367 Some(OperationStatus::Succeeded)
368 );
369 assert_eq!(
370 parse_operation_status("Failed"),
371 Some(OperationStatus::Failed)
372 );
373 assert_eq!(
374 parse_operation_status("Pending"),
375 Some(OperationStatus::Pending)
376 );
377 assert_eq!(
378 parse_operation_status("Ready"),
379 Some(OperationStatus::Ready)
380 );
381 assert_eq!(
382 parse_operation_status("Started"),
383 Some(OperationStatus::Started)
384 );
385 assert_eq!(parse_operation_status("bogus"), None);
386 }
387
388 #[test]
389 fn extract_user_event_from_execution_op() {
390 let state = serde_json::json!({
391 "Operations": [{
392 "Id": "exec-1",
393 "Type": "Execution",
394 "Status": "Started",
395 "ExecutionDetails": {
396 "InputPayload": "{\"order_id\": 42}"
397 }
398 }]
399 });
400 let event = extract_user_event(&state);
401 assert_eq!(event["order_id"], 42);
402 }
403
404 #[test]
405 fn extract_user_event_returns_empty_when_missing() {
406 let state = serde_json::json!({ "Operations": [] });
407 let event = extract_user_event(&state);
408 assert!(event.as_object().unwrap().is_empty());
409 }
410
411 #[test]
412 fn extract_user_event_handles_uppercase_type() {
413 let state = serde_json::json!({
414 "Operations": [{
415 "Id": "exec-1",
416 "Type": "EXECUTION",
417 "Status": "STARTED",
418 "ExecutionDetails": {
419 "InputPayload": "{\"key\": \"value\"}"
420 }
421 }]
422 });
423 let event = extract_user_event(&state);
424 assert_eq!(event["key"], "value");
425 }
426
427 #[test]
428 fn parse_invocation_valid_complete_payload() {
429 let payload = serde_json::json!({
430 "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
431 "CheckpointToken": "tok-abc",
432 "InitialExecutionState": {
433 "Operations": [{
434 "Id": "exec-1",
435 "Type": "Execution",
436 "Status": "Started",
437 "ExecutionDetails": { "InputPayload": "{\"order_id\": 99}" }
438 }],
439 "NextMarker": "page-2"
440 }
441 });
442 let data = parse_invocation(&payload).unwrap();
443 assert_eq!(
444 data.durable_execution_arn,
445 "arn:aws:lambda:us-east-1:123:durable-execution/test"
446 );
447 assert_eq!(data.checkpoint_token, "tok-abc");
448 assert_eq!(data.operations.len(), 1);
449 assert_eq!(data.next_marker, Some("page-2".to_string()));
450 assert_eq!(data.user_event["order_id"], 99);
451 }
452
453 #[test]
454 fn parse_invocation_missing_arn_returns_error() {
455 let payload = serde_json::json!({
456 "CheckpointToken": "tok-1",
457 "InitialExecutionState": { "Operations": [] }
458 });
459 let result = parse_invocation(&payload);
460 assert!(result.is_err());
461 assert_eq!(result.unwrap_err(), "missing DurableExecutionArn in event");
462 }
463
464 #[test]
465 fn parse_invocation_missing_token_returns_error() {
466 let payload = serde_json::json!({
467 "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
468 "InitialExecutionState": { "Operations": [] }
469 });
470 let result = parse_invocation(&payload);
471 assert!(result.is_err());
472 assert_eq!(result.unwrap_err(), "missing CheckpointToken in event");
473 }
474
475 #[test]
476 fn parse_invocation_empty_next_marker_produces_none() {
477 let payload = serde_json::json!({
478 "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
479 "CheckpointToken": "tok-1",
480 "InitialExecutionState": {
481 "Operations": [],
482 "NextMarker": ""
483 }
484 });
485 let data = parse_invocation(&payload).unwrap();
486 assert_eq!(data.next_marker, None);
487 }
488
489 #[test]
490 fn parse_invocation_nonempty_next_marker_produces_some() {
491 let payload = serde_json::json!({
492 "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:durable-execution/test",
493 "CheckpointToken": "tok-1",
494 "InitialExecutionState": {
495 "Operations": [],
496 "NextMarker": "cursor-xyz"
497 }
498 });
499 let data = parse_invocation(&payload).unwrap();
500 assert_eq!(data.next_marker, Some("cursor-xyz".to_string()));
501 }
502}