use serde::de::DeserializeOwned;
use crate::error::{DurableError, TerminationReason};
use crate::operation::OperationType;
use crate::serdes::{JsonSerDes, SerDes, SerDesContext};
use crate::state::CheckpointedResult;
#[derive(Debug)]
pub enum ReplayResult<T> {
Replayed(T),
NotFound,
InProgress,
}
pub fn check_replay<T>(
checkpoint_result: &CheckpointedResult,
expected_type: OperationType,
operation_id: &str,
durable_execution_arn: &str,
) -> Result<ReplayResult<T>, DurableError>
where
T: serde::Serialize + DeserializeOwned,
{
if !checkpoint_result.is_existent() {
return Ok(ReplayResult::NotFound);
}
if let Some(op_type) = checkpoint_result.operation_type() {
if op_type != expected_type {
return Err(DurableError::NonDeterministic {
message: format!(
"Expected {:?} operation but found {:?} at operation_id {}",
expected_type, op_type, operation_id
),
operation_id: Some(operation_id.to_string()),
});
}
}
if checkpoint_result.is_succeeded() {
if let Some(result_str) = checkpoint_result.result() {
let serdes = JsonSerDes::<T>::new();
let serdes_ctx = SerDesContext::new(operation_id, durable_execution_arn);
let result =
serdes
.deserialize(result_str, &serdes_ctx)
.map_err(|e| DurableError::SerDes {
message: format!("Failed to deserialize checkpointed result: {}", e),
})?;
return Ok(ReplayResult::Replayed(result));
}
}
if checkpoint_result.is_failed() {
if let Some(error) = checkpoint_result.error() {
return Err(DurableError::UserCode {
message: error.error_message.clone(),
error_type: error.error_type.clone(),
stack_trace: error.stack_trace.clone(),
});
} else {
return Err(DurableError::execution(
"Operation failed with unknown error",
));
}
}
if checkpoint_result.is_terminal() {
let status = checkpoint_result.status().unwrap();
return Err(DurableError::Execution {
message: format!("Operation was {}", status),
termination_reason: TerminationReason::StepInterrupted,
});
}
Ok(ReplayResult::InProgress)
}
pub fn check_replay_status(
checkpoint_result: &CheckpointedResult,
expected_type: OperationType,
operation_id: &str,
) -> Result<bool, DurableError> {
if !checkpoint_result.is_existent() {
return Ok(false);
}
if let Some(op_type) = checkpoint_result.operation_type() {
if op_type != expected_type {
return Err(DurableError::NonDeterministic {
message: format!(
"Expected {:?} operation but found {:?} at operation_id {}",
expected_type, op_type, operation_id
),
operation_id: Some(operation_id.to_string()),
});
}
}
if checkpoint_result.is_succeeded() {
return Ok(true);
}
if checkpoint_result.is_failed() {
if let Some(error) = checkpoint_result.error() {
return Err(DurableError::UserCode {
message: error.error_message.clone(),
error_type: error.error_type.clone(),
stack_trace: error.stack_trace.clone(),
});
} else {
return Err(DurableError::execution(
"Operation failed with unknown error",
));
}
}
if checkpoint_result.is_terminal() {
let status = checkpoint_result.status().unwrap();
return Err(DurableError::Execution {
message: format!("Operation was {}", status),
termination_reason: TerminationReason::StepInterrupted,
});
}
Ok(false)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::ErrorObject;
use crate::operation::{Operation, OperationStatus};
fn create_succeeded_operation(op_type: OperationType, result: &str) -> CheckpointedResult {
let mut op = Operation::new("test-op", op_type);
op.status = OperationStatus::Succeeded;
op.result = Some(result.to_string());
CheckpointedResult::new(Some(op))
}
fn create_failed_operation(op_type: OperationType, error_msg: &str) -> CheckpointedResult {
let mut op = Operation::new("test-op", op_type);
op.status = OperationStatus::Failed;
op.error = Some(ErrorObject::new("TestError", error_msg));
CheckpointedResult::new(Some(op))
}
fn create_started_operation(op_type: OperationType) -> CheckpointedResult {
let op = Operation::new("test-op", op_type);
CheckpointedResult::new(Some(op))
}
fn create_cancelled_operation(op_type: OperationType) -> CheckpointedResult {
let mut op = Operation::new("test-op", op_type);
op.status = OperationStatus::Cancelled;
CheckpointedResult::new(Some(op))
}
#[test]
fn test_check_replay_not_found() {
let checkpoint = CheckpointedResult::empty();
let result: Result<ReplayResult<i32>, DurableError> =
check_replay(&checkpoint, OperationType::Step, "test-op", "arn:test");
assert!(result.is_ok());
assert!(matches!(result.unwrap(), ReplayResult::NotFound));
}
#[test]
fn test_check_replay_succeeded() {
let checkpoint = create_succeeded_operation(OperationType::Step, "42");
let result: Result<ReplayResult<i32>, DurableError> =
check_replay(&checkpoint, OperationType::Step, "test-op", "arn:test");
assert!(result.is_ok());
match result.unwrap() {
ReplayResult::Replayed(value) => assert_eq!(value, 42),
_ => panic!("Expected Replayed"),
}
}
#[test]
fn test_check_replay_failed() {
let checkpoint = create_failed_operation(OperationType::Step, "test error");
let result: Result<ReplayResult<i32>, DurableError> =
check_replay(&checkpoint, OperationType::Step, "test-op", "arn:test");
assert!(result.is_err());
match result.unwrap_err() {
DurableError::UserCode { message, .. } => {
assert!(message.contains("test error"));
}
_ => panic!("Expected UserCode error"),
}
}
#[test]
fn test_check_replay_non_deterministic() {
let checkpoint = create_succeeded_operation(OperationType::Wait, "null");
let result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
OperationType::Step, "test-op",
"arn:test",
);
assert!(result.is_err());
match result.unwrap_err() {
DurableError::NonDeterministic {
operation_id,
message,
} => {
assert_eq!(operation_id, Some("test-op".to_string()));
assert!(message.contains("Expected Step"));
assert!(message.contains("found Wait"));
}
_ => panic!("Expected NonDeterministic error"),
}
}
#[test]
fn test_check_replay_in_progress() {
let checkpoint = create_started_operation(OperationType::Step);
let result: Result<ReplayResult<i32>, DurableError> =
check_replay(&checkpoint, OperationType::Step, "test-op", "arn:test");
assert!(result.is_ok());
assert!(matches!(result.unwrap(), ReplayResult::InProgress));
}
#[test]
fn test_check_replay_cancelled() {
let checkpoint = create_cancelled_operation(OperationType::Step);
let result: Result<ReplayResult<i32>, DurableError> =
check_replay(&checkpoint, OperationType::Step, "test-op", "arn:test");
assert!(result.is_err());
match result.unwrap_err() {
DurableError::Execution { message, .. } => {
assert!(message.contains("Cancelled"));
}
_ => panic!("Expected Execution error"),
}
}
#[test]
fn test_check_replay_status_not_found() {
let checkpoint = CheckpointedResult::empty();
let result = check_replay_status(&checkpoint, OperationType::Step, "test-op");
assert!(result.is_ok());
assert!(!result.unwrap());
}
#[test]
fn test_check_replay_status_succeeded() {
let checkpoint = create_succeeded_operation(OperationType::Step, "42");
let result = check_replay_status(&checkpoint, OperationType::Step, "test-op");
assert!(result.is_ok());
assert!(result.unwrap());
}
#[test]
fn test_check_replay_status_failed() {
let checkpoint = create_failed_operation(OperationType::Step, "test error");
let result = check_replay_status(&checkpoint, OperationType::Step, "test-op");
assert!(result.is_err());
}
#[test]
fn test_check_replay_status_non_deterministic() {
let checkpoint = create_succeeded_operation(OperationType::Wait, "null");
let result = check_replay_status(&checkpoint, OperationType::Step, "test-op");
assert!(result.is_err());
match result.unwrap_err() {
DurableError::NonDeterministic { .. } => {}
_ => panic!("Expected NonDeterministic error"),
}
}
}
#[cfg(test)]
mod property_tests {
use super::*;
use crate::error::ErrorObject;
use crate::operation::{Operation, OperationStatus};
use proptest::prelude::*;
mod replay_round_trip_tests {
use super::*;
fn operation_type_strategy() -> impl Strategy<Value = OperationType> {
prop_oneof![
Just(OperationType::Step),
Just(OperationType::Wait),
Just(OperationType::Callback),
Just(OperationType::Invoke),
Just(OperationType::Context),
]
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_replay_returns_checkpointed_result_for_success(
result_value in any::<i32>(),
op_type in operation_type_strategy(),
) {
let mut op = Operation::new("test-op", op_type);
op.status = OperationStatus::Succeeded;
op.result = Some(result_value.to_string());
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
op_type,
"test-op",
"arn:test",
);
prop_assert!(replay_result.is_ok(), "Replay should succeed");
match replay_result.unwrap() {
ReplayResult::Replayed(value) => {
prop_assert_eq!(value, result_value, "Replayed value should match original");
}
_ => prop_assert!(false, "Expected Replayed result"),
}
}
#[test]
fn prop_replay_returns_error_for_failed_operations(
error_msg in "[a-zA-Z0-9 ]{1,50}",
op_type in operation_type_strategy(),
) {
let mut op = Operation::new("test-op", op_type);
op.status = OperationStatus::Failed;
op.error = Some(ErrorObject::new("TestError", &error_msg));
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
op_type,
"test-op",
"arn:test",
);
prop_assert!(replay_result.is_err(), "Replay should return error");
match replay_result.unwrap_err() {
DurableError::UserCode { message, error_type, .. } => {
prop_assert!(message.contains(&error_msg), "Error message should match");
prop_assert_eq!(error_type, "TestError", "Error type should match");
}
_ => prop_assert!(false, "Expected UserCode error"),
}
}
#[test]
fn prop_no_checkpoint_returns_not_found(
op_type in operation_type_strategy(),
) {
let checkpoint = CheckpointedResult::empty();
let replay_result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
op_type,
"test-op",
"arn:test",
);
prop_assert!(replay_result.is_ok(), "Should succeed");
prop_assert!(matches!(replay_result.unwrap(), ReplayResult::NotFound), "Should return NotFound");
}
#[test]
fn prop_round_trip_consistency_string(
value in "[a-zA-Z0-9]{1,100}",
op_type in operation_type_strategy(),
) {
let serialized = serde_json::to_string(&value).unwrap();
let mut op = Operation::new("test-op", op_type);
op.status = OperationStatus::Succeeded;
op.result = Some(serialized);
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<String>, DurableError> = check_replay(
&checkpoint,
op_type,
"test-op",
"arn:test",
);
prop_assert!(replay_result.is_ok(), "Replay should succeed");
match replay_result.unwrap() {
ReplayResult::Replayed(replayed_value) => {
prop_assert_eq!(replayed_value, value, "Round-trip should preserve value");
}
_ => prop_assert!(false, "Expected Replayed result"),
}
}
#[test]
fn prop_round_trip_consistency_complex(
field1 in any::<i32>(),
field2 in "[a-zA-Z0-9]{1,50}",
field3 in any::<bool>(),
op_type in operation_type_strategy(),
) {
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
struct TestStruct {
field1: i32,
field2: String,
field3: bool,
}
let value = TestStruct {
field1,
field2: field2.clone(),
field3,
};
let serialized = serde_json::to_string(&value).unwrap();
let mut op = Operation::new("test-op", op_type);
op.status = OperationStatus::Succeeded;
op.result = Some(serialized);
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<TestStruct>, DurableError> = check_replay(
&checkpoint,
op_type,
"test-op",
"arn:test",
);
prop_assert!(replay_result.is_ok(), "Replay should succeed");
match replay_result.unwrap() {
ReplayResult::Replayed(replayed_value) => {
prop_assert_eq!(replayed_value, value, "Round-trip should preserve complex value");
}
_ => prop_assert!(false, "Expected Replayed result"),
}
}
#[test]
fn prop_in_progress_returns_in_progress(
op_type in operation_type_strategy(),
) {
let op = Operation::new("test-op", op_type);
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
op_type,
"test-op",
"arn:test",
);
prop_assert!(replay_result.is_ok(), "Should succeed");
prop_assert!(matches!(replay_result.unwrap(), ReplayResult::InProgress), "Should return InProgress");
}
}
}
mod non_deterministic_detection_tests {
use super::*;
use crate::error::ErrorObject;
use crate::operation::{Operation, OperationStatus};
fn operation_type_strategy() -> impl Strategy<Value = OperationType> {
prop_oneof![
Just(OperationType::Step),
Just(OperationType::Wait),
Just(OperationType::Callback),
Just(OperationType::Invoke),
Just(OperationType::Context),
]
}
fn different_operation_types_strategy(
) -> impl Strategy<Value = (OperationType, OperationType)> {
(operation_type_strategy(), operation_type_strategy())
.prop_filter("Types must be different", |(a, b)| a != b)
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_mismatched_types_raise_non_deterministic_error(
(checkpointed_type, expected_type) in different_operation_types_strategy(),
result_value in any::<i32>(),
) {
let mut op = Operation::new("test-op", checkpointed_type);
op.status = OperationStatus::Succeeded;
op.result = Some(result_value.to_string());
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
expected_type, "test-op",
"arn:test",
);
prop_assert!(replay_result.is_err(), "Should return error for mismatched types");
match replay_result.unwrap_err() {
DurableError::NonDeterministic { operation_id, message } => {
prop_assert_eq!(operation_id, Some("test-op".to_string()), "Operation ID should match");
prop_assert!(message.contains(&format!("{:?}", expected_type)), "Message should mention expected type");
prop_assert!(message.contains(&format!("{:?}", checkpointed_type)), "Message should mention found type");
}
other => prop_assert!(false, "Expected NonDeterministic error, got {:?}", other),
}
}
#[test]
fn prop_matching_types_do_not_raise_error(
op_type in operation_type_strategy(),
result_value in any::<i32>(),
) {
let mut op = Operation::new("test-op", op_type);
op.status = OperationStatus::Succeeded;
op.result = Some(result_value.to_string());
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
op_type, "test-op",
"arn:test",
);
if let Err(DurableError::NonDeterministic { .. }) = &replay_result {
prop_assert!(false, "Should not return NonDeterministic error for matching types");
}
prop_assert!(replay_result.is_ok(), "Should succeed for matching types");
match replay_result.unwrap() {
ReplayResult::Replayed(value) => {
prop_assert_eq!(value, result_value, "Should return correct value");
}
_ => prop_assert!(false, "Expected Replayed result"),
}
}
#[test]
fn prop_non_deterministic_detected_before_error_returned(
(checkpointed_type, expected_type) in different_operation_types_strategy(),
error_msg in "[a-zA-Z0-9 ]{1,50}",
) {
let mut op = Operation::new("test-op", checkpointed_type);
op.status = OperationStatus::Failed;
op.error = Some(ErrorObject::new("TestError", &error_msg));
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
expected_type, "test-op",
"arn:test",
);
prop_assert!(replay_result.is_err(), "Should return error");
match replay_result.unwrap_err() {
DurableError::NonDeterministic { .. } => {
}
DurableError::UserCode { .. } => {
prop_assert!(false, "Should detect non-determinism before returning stored error");
}
other => prop_assert!(false, "Expected NonDeterministic error, got {:?}", other),
}
}
#[test]
fn prop_non_deterministic_detected_for_in_progress(
(checkpointed_type, expected_type) in different_operation_types_strategy(),
) {
let op = Operation::new("test-op", checkpointed_type);
let checkpoint = CheckpointedResult::new(Some(op));
let replay_result: Result<ReplayResult<i32>, DurableError> = check_replay(
&checkpoint,
expected_type, "test-op",
"arn:test",
);
prop_assert!(replay_result.is_err(), "Should return error for mismatched types");
match replay_result.unwrap_err() {
DurableError::NonDeterministic { operation_id, .. } => {
prop_assert_eq!(operation_id, Some("test-op".to_string()), "Operation ID should match");
}
other => prop_assert!(false, "Expected NonDeterministic error, got {:?}", other),
}
}
#[test]
fn prop_check_replay_status_detects_non_determinism(
(checkpointed_type, expected_type) in different_operation_types_strategy(),
) {
let mut op = Operation::new("test-op", checkpointed_type);
op.status = OperationStatus::Succeeded;
let checkpoint = CheckpointedResult::new(Some(op));
let result = check_replay_status(
&checkpoint,
expected_type, "test-op",
);
prop_assert!(result.is_err(), "Should return error for mismatched types");
match result.unwrap_err() {
DurableError::NonDeterministic { operation_id, .. } => {
prop_assert_eq!(operation_id, Some("test-op".to_string()), "Operation ID should match");
}
other => prop_assert!(false, "Expected NonDeterministic error, got {:?}", other),
}
}
}
}
}