1use serde::{Deserialize, Serialize};
7
8use crate::error::ErrorObject;
9use crate::operation::Operation;
10
11#[derive(Debug, Clone, Deserialize)]
16pub struct DurableExecutionInvocationInput {
17 #[serde(rename = "DurableExecutionArn")]
19 pub durable_execution_arn: String,
20
21 #[serde(rename = "CheckpointToken")]
23 pub checkpoint_token: String,
24
25 #[serde(rename = "InitialExecutionState")]
27 pub initial_execution_state: InitialExecutionState,
28
29 #[serde(rename = "Input", default)]
31 pub input: Option<serde_json::Value>,
32}
33
34#[derive(Debug, Clone, Default, Deserialize)]
39pub struct InitialExecutionState {
40 #[serde(rename = "Operations", default)]
42 pub operations: Vec<Operation>,
43
44 #[serde(rename = "NextMarker", skip_serializing_if = "Option::is_none")]
46 pub next_marker: Option<String>,
47}
48
49impl InitialExecutionState {
50 pub fn new() -> Self {
52 Self::default()
53 }
54
55 pub fn with_operations(operations: Vec<Operation>) -> Self {
57 Self {
58 operations,
59 next_marker: None,
60 }
61 }
62
63 pub fn has_more(&self) -> bool {
65 self.next_marker.is_some()
66 }
67}
68
69#[derive(Debug, Clone, Serialize)]
74pub struct DurableExecutionInvocationOutput {
75 #[serde(rename = "Status")]
77 pub status: InvocationStatus,
78
79 #[serde(rename = "Result", skip_serializing_if = "Option::is_none")]
81 pub result: Option<String>,
82
83 #[serde(rename = "Error", skip_serializing_if = "Option::is_none")]
85 pub error: Option<ErrorObject>,
86}
87
88impl DurableExecutionInvocationOutput {
89 pub const MAX_RESPONSE_SIZE: usize = 6 * 1024 * 1024;
91
92 pub fn succeeded(result: Option<String>) -> Self {
94 Self {
95 status: InvocationStatus::Succeeded,
96 result,
97 error: None,
98 }
99 }
100
101 pub fn failed(error: ErrorObject) -> Self {
103 Self {
104 status: InvocationStatus::Failed,
105 result: None,
106 error: Some(error),
107 }
108 }
109
110 pub fn pending() -> Self {
112 Self {
113 status: InvocationStatus::Pending,
114 result: None,
115 error: None,
116 }
117 }
118
119 pub fn is_succeeded(&self) -> bool {
121 matches!(self.status, InvocationStatus::Succeeded)
122 }
123
124 pub fn is_failed(&self) -> bool {
126 matches!(self.status, InvocationStatus::Failed)
127 }
128
129 pub fn is_pending(&self) -> bool {
131 matches!(self.status, InvocationStatus::Pending)
132 }
133
134 pub fn from_result<T: serde::Serialize>(result: &T) -> Self {
151 match serde_json::to_string(result) {
152 Ok(json) => {
153 if json.len() > Self::MAX_RESPONSE_SIZE {
154 Self::failed(ErrorObject::new(
155 "ResponseTooLarge",
156 format!(
157 "Response size {} bytes exceeds maximum {} bytes. Consider checkpointing large results.",
158 json.len(),
159 Self::MAX_RESPONSE_SIZE
160 )
161 ))
162 } else {
163 Self::succeeded(Some(json))
164 }
165 }
166 Err(e) => Self::failed(ErrorObject::new(
167 "SerializationError",
168 format!("Failed to serialize result: {}", e),
169 )),
170 }
171 }
172
173 pub fn from_error(error: &crate::error::DurableError) -> Self {
187 use crate::error::DurableError;
188
189 match error {
190 DurableError::Suspend { .. } => Self::pending(),
191 _ => Self::failed(ErrorObject::from(error)),
192 }
193 }
194
195 pub fn would_exceed_max_size<T: serde::Serialize>(result: &T) -> bool {
208 match serde_json::to_string(result) {
209 Ok(json) => json.len() > Self::MAX_RESPONSE_SIZE,
210 Err(_) => false, }
212 }
213
214 pub fn checkpointed_result(checkpoint_id: &str, original_size: usize) -> Self {
229 Self::succeeded(Some(format!(
230 "{{\"__checkpointed_result__\":\"{}\",\"size\":{}}}",
231 checkpoint_id, original_size
232 )))
233 }
234
235 pub fn is_checkpointed_result(&self) -> bool {
241 self.result
242 .as_ref()
243 .map(|r| r.contains("__checkpointed_result__"))
244 .unwrap_or(false)
245 }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
250#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
251pub enum InvocationStatus {
252 Succeeded,
254 Failed,
256 Pending,
258}
259
260impl std::fmt::Display for InvocationStatus {
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 match self {
263 Self::Succeeded => write!(f, "SUCCEEDED"),
264 Self::Failed => write!(f, "FAILED"),
265 Self::Pending => write!(f, "PENDING"),
266 }
267 }
268}
269
270impl From<Result<String, ErrorObject>> for DurableExecutionInvocationOutput {
273 fn from(result: Result<String, ErrorObject>) -> Self {
274 match result {
275 Ok(value) => Self::succeeded(Some(value)),
276 Err(error) => Self::failed(error),
277 }
278 }
279}
280
281impl From<Result<Option<String>, ErrorObject>> for DurableExecutionInvocationOutput {
282 fn from(result: Result<Option<String>, ErrorObject>) -> Self {
283 match result {
284 Ok(value) => Self::succeeded(value),
285 Err(error) => Self::failed(error),
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use crate::operation::OperationType;
294
295 #[test]
296 fn test_invocation_input_deserialization() {
297 let json = r#"{
298 "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123",
299 "CheckpointToken": "token-xyz",
300 "InitialExecutionState": {
301 "Operations": [
302 {
303 "Id": "op-1",
304 "Type": "STEP",
305 "Status": "SUCCEEDED",
306 "Result": "{\"value\": 42}"
307 }
308 ],
309 "NextMarker": null
310 },
311 "Input": {"orderId": "order-123"}
312 }"#;
313
314 let input: DurableExecutionInvocationInput = serde_json::from_str(json).unwrap();
315 assert_eq!(
316 input.durable_execution_arn,
317 "arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123"
318 );
319 assert_eq!(input.checkpoint_token, "token-xyz");
320 assert_eq!(input.initial_execution_state.operations.len(), 1);
321 assert_eq!(
322 input.initial_execution_state.operations[0].operation_id,
323 "op-1"
324 );
325 assert!(input.input.is_some());
326 }
327
328 #[test]
329 fn test_invocation_input_without_input() {
330 let json = r#"{
331 "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123",
332 "CheckpointToken": "token-xyz",
333 "InitialExecutionState": {
334 "Operations": []
335 }
336 }"#;
337
338 let input: DurableExecutionInvocationInput = serde_json::from_str(json).unwrap();
339 assert!(input.input.is_none());
340 assert!(input.initial_execution_state.operations.is_empty());
341 }
342
343 #[test]
344 fn test_initial_execution_state_new() {
345 let state = InitialExecutionState::new();
346 assert!(state.operations.is_empty());
347 assert!(state.next_marker.is_none());
348 assert!(!state.has_more());
349 }
350
351 #[test]
352 fn test_initial_execution_state_with_operations() {
353 let ops = vec![
354 Operation::new("op-1", OperationType::Step),
355 Operation::new("op-2", OperationType::Wait),
356 ];
357 let state = InitialExecutionState::with_operations(ops);
358 assert_eq!(state.operations.len(), 2);
359 assert!(!state.has_more());
360 }
361
362 #[test]
363 fn test_initial_execution_state_has_more() {
364 let mut state = InitialExecutionState::new();
365 assert!(!state.has_more());
366
367 state.next_marker = Some("marker-123".to_string());
368 assert!(state.has_more());
369 }
370
371 #[test]
372 fn test_invocation_output_succeeded() {
373 let output =
374 DurableExecutionInvocationOutput::succeeded(Some(r#"{"result": "ok"}"#.to_string()));
375 assert!(output.is_succeeded());
376 assert!(!output.is_failed());
377 assert!(!output.is_pending());
378 assert_eq!(output.result, Some(r#"{"result": "ok"}"#.to_string()));
379 assert!(output.error.is_none());
380 }
381
382 #[test]
383 fn test_invocation_output_succeeded_no_result() {
384 let output = DurableExecutionInvocationOutput::succeeded(None);
385 assert!(output.is_succeeded());
386 assert!(output.result.is_none());
387 }
388
389 #[test]
390 fn test_invocation_output_failed() {
391 let error = ErrorObject::new("TestError", "Something went wrong");
392 let output = DurableExecutionInvocationOutput::failed(error);
393 assert!(!output.is_succeeded());
394 assert!(output.is_failed());
395 assert!(!output.is_pending());
396 assert!(output.result.is_none());
397 assert!(output.error.is_some());
398 assert_eq!(output.error.as_ref().unwrap().error_type, "TestError");
399 }
400
401 #[test]
402 fn test_invocation_output_pending() {
403 let output = DurableExecutionInvocationOutput::pending();
404 assert!(!output.is_succeeded());
405 assert!(!output.is_failed());
406 assert!(output.is_pending());
407 assert!(output.result.is_none());
408 assert!(output.error.is_none());
409 }
410
411 #[test]
412 fn test_invocation_status_display() {
413 assert_eq!(InvocationStatus::Succeeded.to_string(), "SUCCEEDED");
414 assert_eq!(InvocationStatus::Failed.to_string(), "FAILED");
415 assert_eq!(InvocationStatus::Pending.to_string(), "PENDING");
416 }
417
418 #[test]
419 fn test_invocation_status_serialization() {
420 let json = serde_json::to_string(&InvocationStatus::Succeeded).unwrap();
421 assert_eq!(json, r#""SUCCEEDED""#);
422
423 let json = serde_json::to_string(&InvocationStatus::Failed).unwrap();
424 assert_eq!(json, r#""FAILED""#);
425
426 let json = serde_json::to_string(&InvocationStatus::Pending).unwrap();
427 assert_eq!(json, r#""PENDING""#);
428 }
429
430 #[test]
431 fn test_invocation_status_deserialization() {
432 let status: InvocationStatus = serde_json::from_str(r#""SUCCEEDED""#).unwrap();
433 assert_eq!(status, InvocationStatus::Succeeded);
434
435 let status: InvocationStatus = serde_json::from_str(r#""FAILED""#).unwrap();
436 assert_eq!(status, InvocationStatus::Failed);
437
438 let status: InvocationStatus = serde_json::from_str(r#""PENDING""#).unwrap();
439 assert_eq!(status, InvocationStatus::Pending);
440 }
441
442 #[test]
443 fn test_invocation_output_serialization() {
444 let output =
445 DurableExecutionInvocationOutput::succeeded(Some(r#"{"value": 42}"#.to_string()));
446 let json = serde_json::to_string(&output).unwrap();
447 assert!(json.contains(r#""Status":"SUCCEEDED""#));
448 assert!(json.contains(r#""Result":"{\"value\": 42}""#));
449 assert!(!json.contains("Error"));
450 }
451
452 #[test]
453 fn test_invocation_output_failed_serialization() {
454 let error = ErrorObject::new("TestError", "Something went wrong");
455 let output = DurableExecutionInvocationOutput::failed(error);
456 let json = serde_json::to_string(&output).unwrap();
457 assert!(json.contains(r#""Status":"FAILED""#));
458 assert!(json.contains(r#""ErrorType":"TestError""#));
459 assert!(!json.contains("Result"));
460 }
461
462 #[test]
463 fn test_invocation_output_pending_serialization() {
464 let output = DurableExecutionInvocationOutput::pending();
465 let json = serde_json::to_string(&output).unwrap();
466 assert!(json.contains(r#""Status":"PENDING""#));
467 assert!(!json.contains("Result"));
468 assert!(!json.contains("Error"));
469 }
470
471 #[test]
472 fn test_from_result_ok() {
473 let result: Result<String, ErrorObject> = Ok(r#"{"value": 42}"#.to_string());
474 let output: DurableExecutionInvocationOutput = result.into();
475 assert!(output.is_succeeded());
476 assert_eq!(output.result, Some(r#"{"value": 42}"#.to_string()));
477 }
478
479 #[test]
480 fn test_from_result_err() {
481 let result: Result<String, ErrorObject> = Err(ErrorObject::new("TestError", "Failed"));
482 let output: DurableExecutionInvocationOutput = result.into();
483 assert!(output.is_failed());
484 assert_eq!(output.error.as_ref().unwrap().error_type, "TestError");
485 }
486
487 #[test]
488 fn test_from_option_result_ok_some() {
489 let result: Result<Option<String>, ErrorObject> = Ok(Some(r#"{"value": 42}"#.to_string()));
490 let output: DurableExecutionInvocationOutput = result.into();
491 assert!(output.is_succeeded());
492 assert_eq!(output.result, Some(r#"{"value": 42}"#.to_string()));
493 }
494
495 #[test]
496 fn test_from_option_result_ok_none() {
497 let result: Result<Option<String>, ErrorObject> = Ok(None);
498 let output: DurableExecutionInvocationOutput = result.into();
499 assert!(output.is_succeeded());
500 assert!(output.result.is_none());
501 }
502
503 #[test]
504 fn test_from_result_serializable() {
505 #[derive(serde::Serialize)]
506 struct TestResult {
507 value: i32,
508 message: String,
509 }
510
511 let result = TestResult {
512 value: 42,
513 message: "success".to_string(),
514 };
515
516 let output = DurableExecutionInvocationOutput::from_result(&result);
517 assert!(output.is_succeeded());
518 assert!(output.result.is_some());
519 let json = output.result.unwrap();
520 assert!(json.contains("42"));
521 assert!(json.contains("success"));
522 }
523
524 #[test]
525 fn test_from_result_none() {
526 let result: Option<String> = None;
527 let output = DurableExecutionInvocationOutput::from_result(&result);
528 assert!(output.is_succeeded());
529 assert_eq!(output.result, Some("null".to_string()));
530 }
531
532 #[test]
533 fn test_from_error_suspend() {
534 use crate::error::DurableError;
535
536 let error = DurableError::Suspend {
537 scheduled_timestamp: None,
538 };
539 let output = DurableExecutionInvocationOutput::from_error(&error);
540 assert!(output.is_pending());
541 assert!(output.result.is_none());
542 assert!(output.error.is_none());
543 }
544
545 #[test]
546 fn test_from_error_execution() {
547 use crate::error::{DurableError, TerminationReason};
548
549 let error = DurableError::Execution {
550 message: "test error".to_string(),
551 termination_reason: TerminationReason::ExecutionError,
552 };
553 let output = DurableExecutionInvocationOutput::from_error(&error);
554 assert!(output.is_failed());
555 assert!(output.error.is_some());
556 assert_eq!(output.error.as_ref().unwrap().error_type, "ExecutionError");
557 }
558
559 #[test]
560 fn test_from_error_validation() {
561 use crate::error::DurableError;
562
563 let error = DurableError::Validation {
564 message: "invalid input".to_string(),
565 };
566 let output = DurableExecutionInvocationOutput::from_error(&error);
567 assert!(output.is_failed());
568 assert_eq!(output.error.as_ref().unwrap().error_type, "ValidationError");
569 }
570
571 #[test]
572 fn test_would_exceed_max_size_small() {
573 let small_data = "hello world";
574 assert!(!DurableExecutionInvocationOutput::would_exceed_max_size(
575 &small_data
576 ));
577 }
578
579 #[test]
580 fn test_max_response_size_constant() {
581 assert_eq!(
583 DurableExecutionInvocationOutput::MAX_RESPONSE_SIZE,
584 6 * 1024 * 1024
585 );
586 }
587
588 #[test]
589 fn test_checkpointed_result() {
590 let output = DurableExecutionInvocationOutput::checkpointed_result("op-123", 7_000_000);
591 assert!(output.is_succeeded());
592 assert!(output.is_checkpointed_result());
593 let result = output.result.unwrap();
594 assert!(result.contains("__checkpointed_result__"));
595 assert!(result.contains("op-123"));
596 assert!(result.contains("7000000"));
597 }
598
599 #[test]
600 fn test_is_checkpointed_result_false() {
601 let output =
602 DurableExecutionInvocationOutput::succeeded(Some(r#"{"value": 42}"#.to_string()));
603 assert!(!output.is_checkpointed_result());
604 }
605
606 #[test]
607 fn test_is_checkpointed_result_none() {
608 let output = DurableExecutionInvocationOutput::succeeded(None);
609 assert!(!output.is_checkpointed_result());
610 }
611
612 #[test]
613 fn test_is_checkpointed_result_pending() {
614 let output = DurableExecutionInvocationOutput::pending();
615 assert!(!output.is_checkpointed_result());
616 }
617}
618
619#[cfg(test)]
620mod property_tests {
621 use super::*;
622 use crate::error::{DurableError, TerminationReason};
623 use proptest::prelude::*;
624
625 proptest! {
631 #![proptest_config(ProptestConfig::with_cases(100))]
632
633 #[test]
639 fn prop_lambda_output_success_status(
640 value in any::<i64>(),
641 message in "[a-zA-Z0-9 ]{0,100}",
642 ) {
643 #[derive(serde::Serialize)]
644 struct TestResult {
645 value: i64,
646 message: String,
647 }
648
649 let result = TestResult { value, message: message.clone() };
650 let output = DurableExecutionInvocationOutput::from_result(&result);
651
652 prop_assert!(output.is_succeeded(), "Successful result must produce SUCCEEDED status");
654 prop_assert!(!output.is_failed(), "Successful result must not be FAILED");
655 prop_assert!(!output.is_pending(), "Successful result must not be PENDING");
656
657 prop_assert!(output.result.is_some(), "Successful result must have result data");
659 let json = output.result.as_ref().unwrap();
660 prop_assert!(json.contains(&value.to_string()), "Result must contain the value");
661
662 prop_assert!(output.error.is_none(), "Successful result must not have error");
664 }
665
666 #[test]
672 fn prop_lambda_output_failure_status(
673 error_message in "[a-zA-Z0-9 ]{1,100}",
674 error_variant in 0u8..7u8,
675 ) {
676 let error = match error_variant {
677 0 => DurableError::Execution {
678 message: error_message.clone(),
679 termination_reason: TerminationReason::ExecutionError,
680 },
681 1 => DurableError::Invocation {
682 message: error_message.clone(),
683 termination_reason: TerminationReason::InvocationError,
684 },
685 2 => DurableError::Checkpoint {
686 message: error_message.clone(),
687 is_retriable: false,
688 aws_error: None,
689 },
690 3 => DurableError::Callback {
691 message: error_message.clone(),
692 callback_id: None,
693 },
694 4 => DurableError::NonDeterministic {
695 message: error_message.clone(),
696 operation_id: None,
697 },
698 5 => DurableError::Validation {
699 message: error_message.clone(),
700 },
701 _ => DurableError::SerDes {
702 message: error_message.clone(),
703 },
704 };
705
706 let output = DurableExecutionInvocationOutput::from_error(&error);
707
708 prop_assert!(output.is_failed(), "Error must produce FAILED status");
710 prop_assert!(!output.is_succeeded(), "Error must not be SUCCEEDED");
711 prop_assert!(!output.is_pending(), "Error must not be PENDING");
712
713 prop_assert!(output.error.is_some(), "Failed output must have error details");
715 let error_obj = output.error.as_ref().unwrap();
716 prop_assert!(!error_obj.error_type.is_empty(), "Error type must not be empty");
717 prop_assert!(error_obj.error_message.contains(&error_message), "Error message must be preserved");
718
719 prop_assert!(output.result.is_none(), "Failed output must not have result");
721 }
722
723 #[test]
728 fn prop_lambda_output_suspend_status(
729 has_timestamp in any::<bool>(),
730 timestamp in any::<f64>(),
731 ) {
732 let error = if has_timestamp {
733 DurableError::Suspend {
734 scheduled_timestamp: Some(timestamp),
735 }
736 } else {
737 DurableError::Suspend {
738 scheduled_timestamp: None,
739 }
740 };
741
742 let output = DurableExecutionInvocationOutput::from_error(&error);
743
744 prop_assert!(output.is_pending(), "Suspend must produce PENDING status");
746 prop_assert!(!output.is_succeeded(), "Suspend must not be SUCCEEDED");
747 prop_assert!(!output.is_failed(), "Suspend must not be FAILED");
748
749 prop_assert!(output.result.is_none(), "Pending output must not have result");
751 prop_assert!(output.error.is_none(), "Pending output must not have error");
752 }
753
754 #[test]
759 fn prop_lambda_output_serialization_preserves_status(
760 status_variant in 0u8..3u8,
761 result_value in any::<Option<i32>>(),
762 error_message in "[a-zA-Z0-9 ]{0,50}",
763 ) {
764 let output = match status_variant {
765 0 => DurableExecutionInvocationOutput::succeeded(
766 result_value.map(|v| format!("{{\"value\":{}}}", v))
767 ),
768 1 => DurableExecutionInvocationOutput::failed(
769 ErrorObject::new("TestError", &error_message)
770 ),
771 _ => DurableExecutionInvocationOutput::pending(),
772 };
773
774 let json = serde_json::to_string(&output).expect("Serialization must succeed");
776
777 match status_variant {
779 0 => prop_assert!(json.contains("SUCCEEDED"), "JSON must contain SUCCEEDED"),
780 1 => prop_assert!(json.contains("FAILED"), "JSON must contain FAILED"),
781 _ => prop_assert!(json.contains("PENDING"), "JSON must contain PENDING"),
782 }
783
784 if output.result.is_some() {
786 prop_assert!(json.contains("Result"), "JSON must contain Result field");
787 }
788 if output.error.is_some() {
789 prop_assert!(json.contains("Error"), "JSON must contain Error field");
790 }
791 }
792
793 #[test]
798 fn prop_result_size_check_consistency(
799 data_size in 0usize..1000usize,
800 ) {
801 let data: String = "x".repeat(data_size);
803
804 let would_exceed = DurableExecutionInvocationOutput::would_exceed_max_size(&data);
805 let output = DurableExecutionInvocationOutput::from_result(&data);
806
807 if !would_exceed {
810 prop_assert!(output.is_succeeded(),
811 "If size check passes, output must be SUCCEEDED");
812 }
813
814 if data_size < 1000 {
816 prop_assert!(!would_exceed, "Small data should not exceed max size");
817 prop_assert!(output.is_succeeded(), "Small data should produce SUCCEEDED");
818 }
819 }
820 }
821}