use serde::{Deserialize, Serialize};
use crate::error::ErrorObject;
use crate::operation::Operation;
#[derive(Debug, Clone, Deserialize)]
pub struct DurableExecutionInvocationInput {
#[serde(rename = "DurableExecutionArn")]
pub durable_execution_arn: String,
#[serde(rename = "CheckpointToken")]
pub checkpoint_token: String,
#[serde(rename = "InitialExecutionState")]
pub initial_execution_state: InitialExecutionState,
#[serde(rename = "Input", default)]
pub input: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct InitialExecutionState {
#[serde(rename = "Operations", default)]
pub operations: Vec<Operation>,
#[serde(rename = "NextMarker", skip_serializing_if = "Option::is_none")]
pub next_marker: Option<String>,
}
impl InitialExecutionState {
pub fn new() -> Self {
Self::default()
}
pub fn with_operations(operations: Vec<Operation>) -> Self {
Self {
operations,
next_marker: None,
}
}
pub fn has_more(&self) -> bool {
self.next_marker.is_some()
}
}
#[derive(Debug, Clone, Serialize)]
pub struct DurableExecutionInvocationOutput {
#[serde(rename = "Status")]
pub status: InvocationStatus,
#[serde(rename = "Result", skip_serializing_if = "Option::is_none")]
pub result: Option<String>,
#[serde(rename = "Error", skip_serializing_if = "Option::is_none")]
pub error: Option<ErrorObject>,
}
impl DurableExecutionInvocationOutput {
pub const MAX_RESPONSE_SIZE: usize = 6 * 1024 * 1024;
pub fn succeeded(result: Option<String>) -> Self {
Self {
status: InvocationStatus::Succeeded,
result,
error: None,
}
}
pub fn failed(error: ErrorObject) -> Self {
Self {
status: InvocationStatus::Failed,
result: None,
error: Some(error),
}
}
pub fn pending() -> Self {
Self {
status: InvocationStatus::Pending,
result: None,
error: None,
}
}
pub fn is_succeeded(&self) -> bool {
matches!(self.status, InvocationStatus::Succeeded)
}
pub fn is_failed(&self) -> bool {
matches!(self.status, InvocationStatus::Failed)
}
pub fn is_pending(&self) -> bool {
matches!(self.status, InvocationStatus::Pending)
}
pub fn from_result<T: serde::Serialize>(result: &T) -> Self {
match serde_json::to_string(result) {
Ok(json) => {
if json.len() > Self::MAX_RESPONSE_SIZE {
Self::failed(ErrorObject::new(
"ResponseTooLarge",
format!(
"Response size {} bytes exceeds maximum {} bytes. Consider checkpointing large results.",
json.len(),
Self::MAX_RESPONSE_SIZE
)
))
} else {
Self::succeeded(Some(json))
}
}
Err(e) => Self::failed(ErrorObject::new(
"SerializationError",
format!("Failed to serialize result: {}", e),
)),
}
}
pub fn from_error(error: &crate::error::DurableError) -> Self {
use crate::error::DurableError;
match error {
DurableError::Suspend { .. } => Self::pending(),
_ => Self::failed(ErrorObject::from(error)),
}
}
pub fn would_exceed_max_size<T: serde::Serialize>(result: &T) -> bool {
match serde_json::to_string(result) {
Ok(json) => json.len() > Self::MAX_RESPONSE_SIZE,
Err(_) => false, }
}
pub fn checkpointed_result(checkpoint_id: &str, original_size: usize) -> Self {
Self::succeeded(Some(format!(
"{{\"__checkpointed_result__\":\"{}\",\"size\":{}}}",
checkpoint_id, original_size
)))
}
pub fn is_checkpointed_result(&self) -> bool {
self.result
.as_ref()
.map(|r| r.contains("__checkpointed_result__"))
.unwrap_or(false)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum InvocationStatus {
Succeeded,
Failed,
Pending,
}
impl std::fmt::Display for InvocationStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Succeeded => write!(f, "SUCCEEDED"),
Self::Failed => write!(f, "FAILED"),
Self::Pending => write!(f, "PENDING"),
}
}
}
impl From<Result<String, ErrorObject>> for DurableExecutionInvocationOutput {
fn from(result: Result<String, ErrorObject>) -> Self {
match result {
Ok(value) => Self::succeeded(Some(value)),
Err(error) => Self::failed(error),
}
}
}
impl From<Result<Option<String>, ErrorObject>> for DurableExecutionInvocationOutput {
fn from(result: Result<Option<String>, ErrorObject>) -> Self {
match result {
Ok(value) => Self::succeeded(value),
Err(error) => Self::failed(error),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::operation::OperationType;
#[test]
fn test_invocation_input_deserialization() {
let json = r#"{
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123",
"CheckpointToken": "token-xyz",
"InitialExecutionState": {
"Operations": [
{
"Id": "op-1",
"Type": "STEP",
"Status": "SUCCEEDED",
"Result": "{\"value\": 42}"
}
],
"NextMarker": null
},
"Input": {"orderId": "order-123"}
}"#;
let input: DurableExecutionInvocationInput = serde_json::from_str(json).unwrap();
assert_eq!(
input.durable_execution_arn,
"arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123"
);
assert_eq!(input.checkpoint_token, "token-xyz");
assert_eq!(input.initial_execution_state.operations.len(), 1);
assert_eq!(
input.initial_execution_state.operations[0].operation_id,
"op-1"
);
assert!(input.input.is_some());
}
#[test]
fn test_invocation_input_without_input() {
let json = r#"{
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:durable:abc123",
"CheckpointToken": "token-xyz",
"InitialExecutionState": {
"Operations": []
}
}"#;
let input: DurableExecutionInvocationInput = serde_json::from_str(json).unwrap();
assert!(input.input.is_none());
assert!(input.initial_execution_state.operations.is_empty());
}
#[test]
fn test_initial_execution_state_new() {
let state = InitialExecutionState::new();
assert!(state.operations.is_empty());
assert!(state.next_marker.is_none());
assert!(!state.has_more());
}
#[test]
fn test_initial_execution_state_with_operations() {
let ops = vec![
Operation::new("op-1", OperationType::Step),
Operation::new("op-2", OperationType::Wait),
];
let state = InitialExecutionState::with_operations(ops);
assert_eq!(state.operations.len(), 2);
assert!(!state.has_more());
}
#[test]
fn test_initial_execution_state_has_more() {
let mut state = InitialExecutionState::new();
assert!(!state.has_more());
state.next_marker = Some("marker-123".to_string());
assert!(state.has_more());
}
#[test]
fn test_invocation_output_succeeded() {
let output =
DurableExecutionInvocationOutput::succeeded(Some(r#"{"result": "ok"}"#.to_string()));
assert!(output.is_succeeded());
assert!(!output.is_failed());
assert!(!output.is_pending());
assert_eq!(output.result, Some(r#"{"result": "ok"}"#.to_string()));
assert!(output.error.is_none());
}
#[test]
fn test_invocation_output_succeeded_no_result() {
let output = DurableExecutionInvocationOutput::succeeded(None);
assert!(output.is_succeeded());
assert!(output.result.is_none());
}
#[test]
fn test_invocation_output_failed() {
let error = ErrorObject::new("TestError", "Something went wrong");
let output = DurableExecutionInvocationOutput::failed(error);
assert!(!output.is_succeeded());
assert!(output.is_failed());
assert!(!output.is_pending());
assert!(output.result.is_none());
assert!(output.error.is_some());
assert_eq!(output.error.as_ref().unwrap().error_type, "TestError");
}
#[test]
fn test_invocation_output_pending() {
let output = DurableExecutionInvocationOutput::pending();
assert!(!output.is_succeeded());
assert!(!output.is_failed());
assert!(output.is_pending());
assert!(output.result.is_none());
assert!(output.error.is_none());
}
#[test]
fn test_invocation_status_display() {
assert_eq!(InvocationStatus::Succeeded.to_string(), "SUCCEEDED");
assert_eq!(InvocationStatus::Failed.to_string(), "FAILED");
assert_eq!(InvocationStatus::Pending.to_string(), "PENDING");
}
#[test]
fn test_invocation_status_serialization() {
let json = serde_json::to_string(&InvocationStatus::Succeeded).unwrap();
assert_eq!(json, r#""SUCCEEDED""#);
let json = serde_json::to_string(&InvocationStatus::Failed).unwrap();
assert_eq!(json, r#""FAILED""#);
let json = serde_json::to_string(&InvocationStatus::Pending).unwrap();
assert_eq!(json, r#""PENDING""#);
}
#[test]
fn test_invocation_status_deserialization() {
let status: InvocationStatus = serde_json::from_str(r#""SUCCEEDED""#).unwrap();
assert_eq!(status, InvocationStatus::Succeeded);
let status: InvocationStatus = serde_json::from_str(r#""FAILED""#).unwrap();
assert_eq!(status, InvocationStatus::Failed);
let status: InvocationStatus = serde_json::from_str(r#""PENDING""#).unwrap();
assert_eq!(status, InvocationStatus::Pending);
}
#[test]
fn test_invocation_output_serialization() {
let output =
DurableExecutionInvocationOutput::succeeded(Some(r#"{"value": 42}"#.to_string()));
let json = serde_json::to_string(&output).unwrap();
assert!(json.contains(r#""Status":"SUCCEEDED""#));
assert!(json.contains(r#""Result":"{\"value\": 42}""#));
assert!(!json.contains("Error"));
}
#[test]
fn test_invocation_output_failed_serialization() {
let error = ErrorObject::new("TestError", "Something went wrong");
let output = DurableExecutionInvocationOutput::failed(error);
let json = serde_json::to_string(&output).unwrap();
assert!(json.contains(r#""Status":"FAILED""#));
assert!(json.contains(r#""ErrorType":"TestError""#));
assert!(!json.contains("Result"));
}
#[test]
fn test_invocation_output_pending_serialization() {
let output = DurableExecutionInvocationOutput::pending();
let json = serde_json::to_string(&output).unwrap();
assert!(json.contains(r#""Status":"PENDING""#));
assert!(!json.contains("Result"));
assert!(!json.contains("Error"));
}
#[test]
fn test_from_result_ok() {
let result: Result<String, ErrorObject> = Ok(r#"{"value": 42}"#.to_string());
let output: DurableExecutionInvocationOutput = result.into();
assert!(output.is_succeeded());
assert_eq!(output.result, Some(r#"{"value": 42}"#.to_string()));
}
#[test]
fn test_from_result_err() {
let result: Result<String, ErrorObject> = Err(ErrorObject::new("TestError", "Failed"));
let output: DurableExecutionInvocationOutput = result.into();
assert!(output.is_failed());
assert_eq!(output.error.as_ref().unwrap().error_type, "TestError");
}
#[test]
fn test_from_option_result_ok_some() {
let result: Result<Option<String>, ErrorObject> = Ok(Some(r#"{"value": 42}"#.to_string()));
let output: DurableExecutionInvocationOutput = result.into();
assert!(output.is_succeeded());
assert_eq!(output.result, Some(r#"{"value": 42}"#.to_string()));
}
#[test]
fn test_from_option_result_ok_none() {
let result: Result<Option<String>, ErrorObject> = Ok(None);
let output: DurableExecutionInvocationOutput = result.into();
assert!(output.is_succeeded());
assert!(output.result.is_none());
}
#[test]
fn test_from_result_serializable() {
#[derive(serde::Serialize)]
struct TestResult {
value: i32,
message: String,
}
let result = TestResult {
value: 42,
message: "success".to_string(),
};
let output = DurableExecutionInvocationOutput::from_result(&result);
assert!(output.is_succeeded());
assert!(output.result.is_some());
let json = output.result.unwrap();
assert!(json.contains("42"));
assert!(json.contains("success"));
}
#[test]
fn test_from_result_none() {
let result: Option<String> = None;
let output = DurableExecutionInvocationOutput::from_result(&result);
assert!(output.is_succeeded());
assert_eq!(output.result, Some("null".to_string()));
}
#[test]
fn test_from_error_suspend() {
use crate::error::DurableError;
let error = DurableError::Suspend {
scheduled_timestamp: None,
};
let output = DurableExecutionInvocationOutput::from_error(&error);
assert!(output.is_pending());
assert!(output.result.is_none());
assert!(output.error.is_none());
}
#[test]
fn test_from_error_execution() {
use crate::error::{DurableError, TerminationReason};
let error = DurableError::Execution {
message: "test error".to_string(),
termination_reason: TerminationReason::ExecutionError,
};
let output = DurableExecutionInvocationOutput::from_error(&error);
assert!(output.is_failed());
assert!(output.error.is_some());
assert_eq!(output.error.as_ref().unwrap().error_type, "ExecutionError");
}
#[test]
fn test_from_error_validation() {
use crate::error::DurableError;
let error = DurableError::Validation {
message: "invalid input".to_string(),
};
let output = DurableExecutionInvocationOutput::from_error(&error);
assert!(output.is_failed());
assert_eq!(output.error.as_ref().unwrap().error_type, "ValidationError");
}
#[test]
fn test_would_exceed_max_size_small() {
let small_data = "hello world";
assert!(!DurableExecutionInvocationOutput::would_exceed_max_size(
&small_data
));
}
#[test]
fn test_max_response_size_constant() {
assert_eq!(
DurableExecutionInvocationOutput::MAX_RESPONSE_SIZE,
6 * 1024 * 1024
);
}
#[test]
fn test_checkpointed_result() {
let output = DurableExecutionInvocationOutput::checkpointed_result("op-123", 7_000_000);
assert!(output.is_succeeded());
assert!(output.is_checkpointed_result());
let result = output.result.unwrap();
assert!(result.contains("__checkpointed_result__"));
assert!(result.contains("op-123"));
assert!(result.contains("7000000"));
}
#[test]
fn test_is_checkpointed_result_false() {
let output =
DurableExecutionInvocationOutput::succeeded(Some(r#"{"value": 42}"#.to_string()));
assert!(!output.is_checkpointed_result());
}
#[test]
fn test_is_checkpointed_result_none() {
let output = DurableExecutionInvocationOutput::succeeded(None);
assert!(!output.is_checkpointed_result());
}
#[test]
fn test_is_checkpointed_result_pending() {
let output = DurableExecutionInvocationOutput::pending();
assert!(!output.is_checkpointed_result());
}
}
#[cfg(test)]
mod property_tests {
use super::*;
use crate::error::{DurableError, TerminationReason};
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_lambda_output_success_status(
value in any::<i64>(),
message in "[a-zA-Z0-9 ]{0,100}",
) {
#[derive(serde::Serialize)]
struct TestResult {
value: i64,
message: String,
}
let result = TestResult { value, message: message.clone() };
let output = DurableExecutionInvocationOutput::from_result(&result);
prop_assert!(output.is_succeeded(), "Successful result must produce SUCCEEDED status");
prop_assert!(!output.is_failed(), "Successful result must not be FAILED");
prop_assert!(!output.is_pending(), "Successful result must not be PENDING");
prop_assert!(output.result.is_some(), "Successful result must have result data");
let json = output.result.as_ref().unwrap();
prop_assert!(json.contains(&value.to_string()), "Result must contain the value");
prop_assert!(output.error.is_none(), "Successful result must not have error");
}
#[test]
fn prop_lambda_output_failure_status(
error_message in "[a-zA-Z0-9 ]{1,100}",
error_variant in 0u8..7u8,
) {
let error = match error_variant {
0 => DurableError::Execution {
message: error_message.clone(),
termination_reason: TerminationReason::ExecutionError,
},
1 => DurableError::Invocation {
message: error_message.clone(),
termination_reason: TerminationReason::InvocationError,
},
2 => DurableError::Checkpoint {
message: error_message.clone(),
is_retriable: false,
aws_error: None,
},
3 => DurableError::Callback {
message: error_message.clone(),
callback_id: None,
},
4 => DurableError::NonDeterministic {
message: error_message.clone(),
operation_id: None,
},
5 => DurableError::Validation {
message: error_message.clone(),
},
_ => DurableError::SerDes {
message: error_message.clone(),
},
};
let output = DurableExecutionInvocationOutput::from_error(&error);
prop_assert!(output.is_failed(), "Error must produce FAILED status");
prop_assert!(!output.is_succeeded(), "Error must not be SUCCEEDED");
prop_assert!(!output.is_pending(), "Error must not be PENDING");
prop_assert!(output.error.is_some(), "Failed output must have error details");
let error_obj = output.error.as_ref().unwrap();
prop_assert!(!error_obj.error_type.is_empty(), "Error type must not be empty");
prop_assert!(error_obj.error_message.contains(&error_message), "Error message must be preserved");
prop_assert!(output.result.is_none(), "Failed output must not have result");
}
#[test]
fn prop_lambda_output_suspend_status(
has_timestamp in any::<bool>(),
timestamp in any::<f64>(),
) {
let error = if has_timestamp {
DurableError::Suspend {
scheduled_timestamp: Some(timestamp),
}
} else {
DurableError::Suspend {
scheduled_timestamp: None,
}
};
let output = DurableExecutionInvocationOutput::from_error(&error);
prop_assert!(output.is_pending(), "Suspend must produce PENDING status");
prop_assert!(!output.is_succeeded(), "Suspend must not be SUCCEEDED");
prop_assert!(!output.is_failed(), "Suspend must not be FAILED");
prop_assert!(output.result.is_none(), "Pending output must not have result");
prop_assert!(output.error.is_none(), "Pending output must not have error");
}
#[test]
fn prop_lambda_output_serialization_preserves_status(
status_variant in 0u8..3u8,
result_value in any::<Option<i32>>(),
error_message in "[a-zA-Z0-9 ]{0,50}",
) {
let output = match status_variant {
0 => DurableExecutionInvocationOutput::succeeded(
result_value.map(|v| format!("{{\"value\":{}}}", v))
),
1 => DurableExecutionInvocationOutput::failed(
ErrorObject::new("TestError", &error_message)
),
_ => DurableExecutionInvocationOutput::pending(),
};
let json = serde_json::to_string(&output).expect("Serialization must succeed");
match status_variant {
0 => prop_assert!(json.contains("SUCCEEDED"), "JSON must contain SUCCEEDED"),
1 => prop_assert!(json.contains("FAILED"), "JSON must contain FAILED"),
_ => prop_assert!(json.contains("PENDING"), "JSON must contain PENDING"),
}
if output.result.is_some() {
prop_assert!(json.contains("Result"), "JSON must contain Result field");
}
if output.error.is_some() {
prop_assert!(json.contains("Error"), "JSON must contain Error field");
}
}
#[test]
fn prop_result_size_check_consistency(
data_size in 0usize..1000usize,
) {
let data: String = "x".repeat(data_size);
let would_exceed = DurableExecutionInvocationOutput::would_exceed_max_size(&data);
let output = DurableExecutionInvocationOutput::from_result(&data);
if !would_exceed {
prop_assert!(output.is_succeeded(),
"If size check passes, output must be SUCCEEDED");
}
if data_size < 1000 {
prop_assert!(!would_exceed, "Small data should not exceed max size");
prop_assert!(output.is_succeeded(), "Small data should produce SUCCEEDED");
}
}
}
}