1use serde::{Deserialize, Serialize};
7
8use crate::error::ErrorObject;
9use crate::operation::Operation;
10
11#[derive(Debug, Clone, Deserialize)]
16pub struct DurableExecutionInvocationInput {
17 #[serde(rename = "DurableExecutionArn")]
19 pub durable_execution_arn: String,
20
21 #[serde(rename = "CheckpointToken")]
23 pub checkpoint_token: String,
24
25 #[serde(rename = "InitialExecutionState")]
27 pub initial_execution_state: InitialExecutionState,
28
29 #[serde(rename = "Input", default)]
31 pub input: Option<serde_json::Value>,
32}
33
34#[derive(Debug, Clone, Default, Deserialize)]
39pub struct InitialExecutionState {
40 #[serde(rename = "Operations", default)]
42 pub operations: Vec<Operation>,
43
44 #[serde(rename = "NextMarker", skip_serializing_if = "Option::is_none")]
46 pub next_marker: Option<String>,
47}
48
49impl InitialExecutionState {
50 pub fn new() -> Self {
52 Self::default()
53 }
54
55 pub fn with_operations(operations: Vec<Operation>) -> Self {
57 Self {
58 operations,
59 next_marker: None,
60 }
61 }
62
63 pub fn has_more(&self) -> bool {
65 self.next_marker.is_some()
66 }
67}
68
69#[derive(Debug, Clone, Serialize)]
74pub struct DurableExecutionInvocationOutput {
75 #[serde(rename = "Status")]
77 pub status: InvocationStatus,
78
79 #[serde(rename = "Result", skip_serializing_if = "Option::is_none")]
81 pub result: Option<String>,
82
83 #[serde(rename = "Error", skip_serializing_if = "Option::is_none")]
85 pub error: Option<ErrorObject>,
86}
87
88impl DurableExecutionInvocationOutput {
89 pub const MAX_RESPONSE_SIZE: usize = 6 * 1024 * 1024;
91
92 pub fn succeeded(result: Option<String>) -> Self {
94 Self {
95 status: InvocationStatus::Succeeded,
96 result,
97 error: None,
98 }
99 }
100
101 pub fn failed(error: ErrorObject) -> Self {
103 Self {
104 status: InvocationStatus::Failed,
105 result: None,
106 error: Some(error),
107 }
108 }
109
110 pub fn pending() -> Self {
112 Self {
113 status: InvocationStatus::Pending,
114 result: None,
115 error: None,
116 }
117 }
118
119 pub fn is_succeeded(&self) -> bool {
121 matches!(self.status, InvocationStatus::Succeeded)
122 }
123
124 pub fn is_failed(&self) -> bool {
126 matches!(self.status, InvocationStatus::Failed)
127 }
128
129 pub fn is_pending(&self) -> bool {
131 matches!(self.status, InvocationStatus::Pending)
132 }
133
134 pub fn from_result<T: serde::Serialize>(result: &T) -> Self {
156 match serde_json::to_string(result) {
157 Ok(json) => {
158 if json.len() > Self::MAX_RESPONSE_SIZE {
159 Self::failed(ErrorObject::new(
160 "ResponseTooLarge",
161 format!(
162 "Response size {} bytes exceeds maximum {} bytes. Consider checkpointing large results.",
163 json.len(),
164 Self::MAX_RESPONSE_SIZE
165 )
166 ))
167 } else {
168 Self::succeeded(Some(json))
169 }
170 }
171 Err(e) => Self::failed(ErrorObject::new(
172 "SerializationError",
173 format!("Failed to serialize result: {}", e),
174 )),
175 }
176 }
177
178 pub fn from_error(error: &crate::error::DurableError) -> Self {
197 use crate::error::DurableError;
198
199 match error {
200 DurableError::Suspend { .. } => Self::pending(),
201 _ => Self::failed(ErrorObject::from(error)),
202 }
203 }
204
205 pub fn would_exceed_max_size<T: serde::Serialize>(result: &T) -> bool {
218 match serde_json::to_string(result) {
219 Ok(json) => json.len() > Self::MAX_RESPONSE_SIZE,
220 Err(_) => false, }
222 }
223
224 pub fn checkpointed_result(checkpoint_id: &str, original_size: usize) -> Self {
243 Self::succeeded(Some(format!(
244 "{{\"__checkpointed_result__\":\"{}\",\"size\":{}}}",
245 checkpoint_id, original_size
246 )))
247 }
248
249 pub fn is_checkpointed_result(&self) -> bool {
255 self.result
256 .as_ref()
257 .map(|r| r.contains("__checkpointed_result__"))
258 .unwrap_or(false)
259 }
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
264#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
265pub enum InvocationStatus {
266 Succeeded,
268 Failed,
270 Pending,
272}
273
274impl std::fmt::Display for InvocationStatus {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 Self::Succeeded => write!(f, "SUCCEEDED"),
278 Self::Failed => write!(f, "FAILED"),
279 Self::Pending => write!(f, "PENDING"),
280 }
281 }
282}
283
284impl From<Result<String, ErrorObject>> for DurableExecutionInvocationOutput {
287 fn from(result: Result<String, ErrorObject>) -> Self {
288 match result {
289 Ok(value) => Self::succeeded(Some(value)),
290 Err(error) => Self::failed(error),
291 }
292 }
293}
294
295impl From<Result<Option<String>, ErrorObject>> for DurableExecutionInvocationOutput {
296 fn from(result: Result<Option<String>, ErrorObject>) -> Self {
297 match result {
298 Ok(value) => Self::succeeded(value),
299 Err(error) => Self::failed(error),
300 }
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use crate::operation::OperationType;
308
309 #[test]
310 fn test_invocation_input_deserialization() {
311 let json = r#"{
312 "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123",
313 "CheckpointToken": "token-xyz",
314 "InitialExecutionState": {
315 "Operations": [
316 {
317 "Id": "op-1",
318 "Type": "STEP",
319 "Status": "SUCCEEDED",
320 "Result": "{\"value\": 42}"
321 }
322 ],
323 "NextMarker": null
324 },
325 "Input": {"orderId": "order-123"}
326 }"#;
327
328 let input: DurableExecutionInvocationInput = serde_json::from_str(json).unwrap();
329 assert_eq!(
330 input.durable_execution_arn,
331 "arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123"
332 );
333 assert_eq!(input.checkpoint_token, "token-xyz");
334 assert_eq!(input.initial_execution_state.operations.len(), 1);
335 assert_eq!(
336 input.initial_execution_state.operations[0].operation_id,
337 "op-1"
338 );
339 assert!(input.input.is_some());
340 }
341
342 #[test]
343 fn test_invocation_input_without_input() {
344 let json = r#"{
345 "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123",
346 "CheckpointToken": "token-xyz",
347 "InitialExecutionState": {
348 "Operations": []
349 }
350 }"#;
351
352 let input: DurableExecutionInvocationInput = serde_json::from_str(json).unwrap();
353 assert!(input.input.is_none());
354 assert!(input.initial_execution_state.operations.is_empty());
355 }
356
357 #[test]
358 fn test_initial_execution_state_new() {
359 let state = InitialExecutionState::new();
360 assert!(state.operations.is_empty());
361 assert!(state.next_marker.is_none());
362 assert!(!state.has_more());
363 }
364
365 #[test]
366 fn test_initial_execution_state_with_operations() {
367 let ops = vec![
368 Operation::new("op-1", OperationType::Step),
369 Operation::new("op-2", OperationType::Wait),
370 ];
371 let state = InitialExecutionState::with_operations(ops);
372 assert_eq!(state.operations.len(), 2);
373 assert!(!state.has_more());
374 }
375
376 #[test]
377 fn test_initial_execution_state_has_more() {
378 let mut state = InitialExecutionState::new();
379 assert!(!state.has_more());
380
381 state.next_marker = Some("marker-123".to_string());
382 assert!(state.has_more());
383 }
384
385 #[test]
386 fn test_invocation_output_succeeded() {
387 let output =
388 DurableExecutionInvocationOutput::succeeded(Some(r#"{"result": "ok"}"#.to_string()));
389 assert!(output.is_succeeded());
390 assert!(!output.is_failed());
391 assert!(!output.is_pending());
392 assert_eq!(output.result, Some(r#"{"result": "ok"}"#.to_string()));
393 assert!(output.error.is_none());
394 }
395
396 #[test]
397 fn test_invocation_output_succeeded_no_result() {
398 let output = DurableExecutionInvocationOutput::succeeded(None);
399 assert!(output.is_succeeded());
400 assert!(output.result.is_none());
401 }
402
403 #[test]
404 fn test_invocation_output_failed() {
405 let error = ErrorObject::new("TestError", "Something went wrong");
406 let output = DurableExecutionInvocationOutput::failed(error);
407 assert!(!output.is_succeeded());
408 assert!(output.is_failed());
409 assert!(!output.is_pending());
410 assert!(output.result.is_none());
411 assert!(output.error.is_some());
412 assert_eq!(output.error.as_ref().unwrap().error_type, "TestError");
413 }
414
415 #[test]
416 fn test_invocation_output_pending() {
417 let output = DurableExecutionInvocationOutput::pending();
418 assert!(!output.is_succeeded());
419 assert!(!output.is_failed());
420 assert!(output.is_pending());
421 assert!(output.result.is_none());
422 assert!(output.error.is_none());
423 }
424
425 #[test]
426 fn test_invocation_status_display() {
427 assert_eq!(InvocationStatus::Succeeded.to_string(), "SUCCEEDED");
428 assert_eq!(InvocationStatus::Failed.to_string(), "FAILED");
429 assert_eq!(InvocationStatus::Pending.to_string(), "PENDING");
430 }
431
432 #[test]
433 fn test_invocation_status_serialization() {
434 let json = serde_json::to_string(&InvocationStatus::Succeeded).unwrap();
435 assert_eq!(json, r#""SUCCEEDED""#);
436
437 let json = serde_json::to_string(&InvocationStatus::Failed).unwrap();
438 assert_eq!(json, r#""FAILED""#);
439
440 let json = serde_json::to_string(&InvocationStatus::Pending).unwrap();
441 assert_eq!(json, r#""PENDING""#);
442 }
443
444 #[test]
445 fn test_invocation_status_deserialization() {
446 let status: InvocationStatus = serde_json::from_str(r#""SUCCEEDED""#).unwrap();
447 assert_eq!(status, InvocationStatus::Succeeded);
448
449 let status: InvocationStatus = serde_json::from_str(r#""FAILED""#).unwrap();
450 assert_eq!(status, InvocationStatus::Failed);
451
452 let status: InvocationStatus = serde_json::from_str(r#""PENDING""#).unwrap();
453 assert_eq!(status, InvocationStatus::Pending);
454 }
455
456 #[test]
457 fn test_invocation_output_serialization() {
458 let output =
459 DurableExecutionInvocationOutput::succeeded(Some(r#"{"value": 42}"#.to_string()));
460 let json = serde_json::to_string(&output).unwrap();
461 assert!(json.contains(r#""Status":"SUCCEEDED""#));
462 assert!(json.contains(r#""Result":"{\"value\": 42}""#));
463 assert!(!json.contains("Error"));
464 }
465
466 #[test]
467 fn test_invocation_output_failed_serialization() {
468 let error = ErrorObject::new("TestError", "Something went wrong");
469 let output = DurableExecutionInvocationOutput::failed(error);
470 let json = serde_json::to_string(&output).unwrap();
471 assert!(json.contains(r#""Status":"FAILED""#));
472 assert!(json.contains(r#""ErrorType":"TestError""#));
473 assert!(!json.contains("Result"));
474 }
475
476 #[test]
477 fn test_invocation_output_pending_serialization() {
478 let output = DurableExecutionInvocationOutput::pending();
479 let json = serde_json::to_string(&output).unwrap();
480 assert!(json.contains(r#""Status":"PENDING""#));
481 assert!(!json.contains("Result"));
482 assert!(!json.contains("Error"));
483 }
484
485 #[test]
486 fn test_from_result_ok() {
487 let result: Result<String, ErrorObject> = Ok(r#"{"value": 42}"#.to_string());
488 let output: DurableExecutionInvocationOutput = result.into();
489 assert!(output.is_succeeded());
490 assert_eq!(output.result, Some(r#"{"value": 42}"#.to_string()));
491 }
492
493 #[test]
494 fn test_from_result_err() {
495 let result: Result<String, ErrorObject> = Err(ErrorObject::new("TestError", "Failed"));
496 let output: DurableExecutionInvocationOutput = result.into();
497 assert!(output.is_failed());
498 assert_eq!(output.error.as_ref().unwrap().error_type, "TestError");
499 }
500
501 #[test]
502 fn test_from_option_result_ok_some() {
503 let result: Result<Option<String>, ErrorObject> = Ok(Some(r#"{"value": 42}"#.to_string()));
504 let output: DurableExecutionInvocationOutput = result.into();
505 assert!(output.is_succeeded());
506 assert_eq!(output.result, Some(r#"{"value": 42}"#.to_string()));
507 }
508
509 #[test]
510 fn test_from_option_result_ok_none() {
511 let result: Result<Option<String>, ErrorObject> = Ok(None);
512 let output: DurableExecutionInvocationOutput = result.into();
513 assert!(output.is_succeeded());
514 assert!(output.result.is_none());
515 }
516
517 #[test]
518 fn test_from_result_serializable() {
519 #[derive(serde::Serialize)]
520 struct TestResult {
521 value: i32,
522 message: String,
523 }
524
525 let result = TestResult {
526 value: 42,
527 message: "success".to_string(),
528 };
529
530 let output = DurableExecutionInvocationOutput::from_result(&result);
531 assert!(output.is_succeeded());
532 assert!(output.result.is_some());
533 let json = output.result.unwrap();
534 assert!(json.contains("42"));
535 assert!(json.contains("success"));
536 }
537
538 #[test]
539 fn test_from_result_none() {
540 let result: Option<String> = None;
541 let output = DurableExecutionInvocationOutput::from_result(&result);
542 assert!(output.is_succeeded());
543 assert_eq!(output.result, Some("null".to_string()));
544 }
545
546 #[test]
547 fn test_from_error_suspend() {
548 use crate::error::DurableError;
549
550 let error = DurableError::Suspend {
551 scheduled_timestamp: None,
552 };
553 let output = DurableExecutionInvocationOutput::from_error(&error);
554 assert!(output.is_pending());
555 assert!(output.result.is_none());
556 assert!(output.error.is_none());
557 }
558
559 #[test]
560 fn test_from_error_execution() {
561 use crate::error::{DurableError, TerminationReason};
562
563 let error = DurableError::Execution {
564 message: "test error".to_string(),
565 termination_reason: TerminationReason::ExecutionError,
566 };
567 let output = DurableExecutionInvocationOutput::from_error(&error);
568 assert!(output.is_failed());
569 assert!(output.error.is_some());
570 assert_eq!(output.error.as_ref().unwrap().error_type, "ExecutionError");
571 }
572
573 #[test]
574 fn test_from_error_validation() {
575 use crate::error::DurableError;
576
577 let error = DurableError::Validation {
578 message: "invalid input".to_string(),
579 };
580 let output = DurableExecutionInvocationOutput::from_error(&error);
581 assert!(output.is_failed());
582 assert_eq!(output.error.as_ref().unwrap().error_type, "ValidationError");
583 }
584
585 #[test]
586 fn test_would_exceed_max_size_small() {
587 let small_data = "hello world";
588 assert!(!DurableExecutionInvocationOutput::would_exceed_max_size(
589 &small_data
590 ));
591 }
592
593 #[test]
594 fn test_max_response_size_constant() {
595 assert_eq!(
597 DurableExecutionInvocationOutput::MAX_RESPONSE_SIZE,
598 6 * 1024 * 1024
599 );
600 }
601
602 #[test]
603 fn test_checkpointed_result() {
604 let output = DurableExecutionInvocationOutput::checkpointed_result("op-123", 7_000_000);
605 assert!(output.is_succeeded());
606 assert!(output.is_checkpointed_result());
607 let result = output.result.unwrap();
608 assert!(result.contains("__checkpointed_result__"));
609 assert!(result.contains("op-123"));
610 assert!(result.contains("7000000"));
611 }
612
613 #[test]
614 fn test_is_checkpointed_result_false() {
615 let output =
616 DurableExecutionInvocationOutput::succeeded(Some(r#"{"value": 42}"#.to_string()));
617 assert!(!output.is_checkpointed_result());
618 }
619
620 #[test]
621 fn test_is_checkpointed_result_none() {
622 let output = DurableExecutionInvocationOutput::succeeded(None);
623 assert!(!output.is_checkpointed_result());
624 }
625
626 #[test]
627 fn test_is_checkpointed_result_pending() {
628 let output = DurableExecutionInvocationOutput::pending();
629 assert!(!output.is_checkpointed_result());
630 }
631}
632
633#[cfg(test)]
634mod property_tests {
635 use super::*;
636 use crate::error::{DurableError, TerminationReason};
637 use proptest::prelude::*;
638
639 proptest! {
645 #![proptest_config(ProptestConfig::with_cases(100))]
646
647 #[test]
653 fn prop_lambda_output_success_status(
654 value in any::<i64>(),
655 message in "[a-zA-Z0-9 ]{0,100}",
656 ) {
657 #[derive(serde::Serialize)]
658 struct TestResult {
659 value: i64,
660 message: String,
661 }
662
663 let result = TestResult { value, message: message.clone() };
664 let output = DurableExecutionInvocationOutput::from_result(&result);
665
666 prop_assert!(output.is_succeeded(), "Successful result must produce SUCCEEDED status");
668 prop_assert!(!output.is_failed(), "Successful result must not be FAILED");
669 prop_assert!(!output.is_pending(), "Successful result must not be PENDING");
670
671 prop_assert!(output.result.is_some(), "Successful result must have result data");
673 let json = output.result.as_ref().unwrap();
674 prop_assert!(json.contains(&value.to_string()), "Result must contain the value");
675
676 prop_assert!(output.error.is_none(), "Successful result must not have error");
678 }
679
680 #[test]
686 fn prop_lambda_output_failure_status(
687 error_message in "[a-zA-Z0-9 ]{1,100}",
688 error_variant in 0u8..7u8,
689 ) {
690 let error = match error_variant {
691 0 => DurableError::Execution {
692 message: error_message.clone(),
693 termination_reason: TerminationReason::ExecutionError,
694 },
695 1 => DurableError::Invocation {
696 message: error_message.clone(),
697 termination_reason: TerminationReason::InvocationError,
698 },
699 2 => DurableError::Checkpoint {
700 message: error_message.clone(),
701 is_retriable: false,
702 aws_error: None,
703 },
704 3 => DurableError::Callback {
705 message: error_message.clone(),
706 callback_id: None,
707 },
708 4 => DurableError::NonDeterministic {
709 message: error_message.clone(),
710 operation_id: None,
711 },
712 5 => DurableError::Validation {
713 message: error_message.clone(),
714 },
715 _ => DurableError::SerDes {
716 message: error_message.clone(),
717 },
718 };
719
720 let output = DurableExecutionInvocationOutput::from_error(&error);
721
722 prop_assert!(output.is_failed(), "Error must produce FAILED status");
724 prop_assert!(!output.is_succeeded(), "Error must not be SUCCEEDED");
725 prop_assert!(!output.is_pending(), "Error must not be PENDING");
726
727 prop_assert!(output.error.is_some(), "Failed output must have error details");
729 let error_obj = output.error.as_ref().unwrap();
730 prop_assert!(!error_obj.error_type.is_empty(), "Error type must not be empty");
731 prop_assert!(error_obj.error_message.contains(&error_message), "Error message must be preserved");
732
733 prop_assert!(output.result.is_none(), "Failed output must not have result");
735 }
736
737 #[test]
742 fn prop_lambda_output_suspend_status(
743 has_timestamp in any::<bool>(),
744 timestamp in any::<f64>(),
745 ) {
746 let error = if has_timestamp {
747 DurableError::Suspend {
748 scheduled_timestamp: Some(timestamp),
749 }
750 } else {
751 DurableError::Suspend {
752 scheduled_timestamp: None,
753 }
754 };
755
756 let output = DurableExecutionInvocationOutput::from_error(&error);
757
758 prop_assert!(output.is_pending(), "Suspend must produce PENDING status");
760 prop_assert!(!output.is_succeeded(), "Suspend must not be SUCCEEDED");
761 prop_assert!(!output.is_failed(), "Suspend must not be FAILED");
762
763 prop_assert!(output.result.is_none(), "Pending output must not have result");
765 prop_assert!(output.error.is_none(), "Pending output must not have error");
766 }
767
768 #[test]
773 fn prop_lambda_output_serialization_preserves_status(
774 status_variant in 0u8..3u8,
775 result_value in any::<Option<i32>>(),
776 error_message in "[a-zA-Z0-9 ]{0,50}",
777 ) {
778 let output = match status_variant {
779 0 => DurableExecutionInvocationOutput::succeeded(
780 result_value.map(|v| format!("{{\"value\":{}}}", v))
781 ),
782 1 => DurableExecutionInvocationOutput::failed(
783 ErrorObject::new("TestError", &error_message)
784 ),
785 _ => DurableExecutionInvocationOutput::pending(),
786 };
787
788 let json = serde_json::to_string(&output).expect("Serialization must succeed");
790
791 match status_variant {
793 0 => prop_assert!(json.contains("SUCCEEDED"), "JSON must contain SUCCEEDED"),
794 1 => prop_assert!(json.contains("FAILED"), "JSON must contain FAILED"),
795 _ => prop_assert!(json.contains("PENDING"), "JSON must contain PENDING"),
796 }
797
798 if output.result.is_some() {
800 prop_assert!(json.contains("Result"), "JSON must contain Result field");
801 }
802 if output.error.is_some() {
803 prop_assert!(json.contains("Error"), "JSON must contain Error field");
804 }
805 }
806
807 #[test]
812 fn prop_result_size_check_consistency(
813 data_size in 0usize..1000usize,
814 ) {
815 let data: String = "x".repeat(data_size);
817
818 let would_exceed = DurableExecutionInvocationOutput::would_exceed_max_size(&data);
819 let output = DurableExecutionInvocationOutput::from_result(&data);
820
821 if !would_exceed {
824 prop_assert!(output.is_succeeded(),
825 "If size check passes, output must be SUCCEEDED");
826 }
827
828 if data_size < 1000 {
830 prop_assert!(!would_exceed, "Small data should not exceed max size");
831 prop_assert!(output.is_succeeded(), "Small data should produce SUCCEEDED");
832 }
833 }
834 }
835}