use std::collections::HashMap;
use std::sync::Arc;
use chrono::Utc;
use serde_json::{json, Value};
use tracing::{debug, warn};
use fakecloud_aws::arn::Arn;
use fakecloud_core::delivery::DeliveryBus;
use fakecloud_dynamodb::state::SharedDynamoDbState;
use crate::choice::evaluate_choice;
use crate::error_handling::{find_catcher, should_retry};
use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
pub async fn execute_state_machine(
state: SharedStepFunctionsState,
execution_arn: String,
definition: String,
input: Option<String>,
delivery: Option<Arc<DeliveryBus>>,
dynamodb_state: Option<SharedDynamoDbState>,
) {
let def: Value = match serde_json::from_str(&definition) {
Ok(v) => v,
Err(e) => {
fail_execution(
&state,
&execution_arn,
"States.Runtime",
&format!("Failed to parse definition: {e}"),
);
return;
}
};
let raw_input: Value = input
.as_deref()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or(json!({}));
add_event(
&state,
&execution_arn,
"ExecutionStarted",
0,
json!({
"input": serde_json::to_string(&raw_input).unwrap_or_default(),
"roleArn": "arn:aws:iam::123456789012:role/execution-role"
}),
);
let def_owned = def;
let state_clone = state.clone();
let execution_arn_clone = execution_arn.clone();
let delivery_clone = delivery.clone();
let dynamodb_state_clone = dynamodb_state.clone();
let handle = tokio::spawn(async move {
run_states(
&def_owned,
raw_input,
&delivery_clone,
&dynamodb_state_clone,
&state_clone,
&execution_arn_clone,
)
.await
});
match handle.await {
Ok(Ok(output)) => {
succeed_execution(&state, &execution_arn, &output);
}
Ok(Err((error, cause))) => {
fail_execution(&state, &execution_arn, &error, &cause);
}
Err(join_err) => {
let msg = if join_err.is_panic() {
let payload = join_err.into_panic();
if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = payload.downcast_ref::<&'static str>() {
(*s).to_string()
} else {
"execution task panicked".to_string()
}
} else {
format!("execution task cancelled: {join_err}")
};
tracing::error!(
execution_arn = %execution_arn,
panic = %msg,
"Step Functions execution panicked"
);
fail_execution(&state, &execution_arn, "States.Runtime", &msg);
}
}
}
type StatesResult<'a> = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
>;
fn run_states<'a>(
def: &'a Value,
input: Value,
delivery: &'a Option<Arc<DeliveryBus>>,
dynamodb_state: &'a Option<SharedDynamoDbState>,
shared_state: &'a SharedStepFunctionsState,
execution_arn: &'a str,
) -> StatesResult<'a> {
Box::pin(async move {
let start_at = def["StartAt"]
.as_str()
.ok_or_else(|| {
(
"States.Runtime".to_string(),
"Missing StartAt in definition".to_string(),
)
})?
.to_string();
let states = def.get("States").ok_or_else(|| {
(
"States.Runtime".to_string(),
"Missing States in definition".to_string(),
)
})?;
let mut current_state = start_at;
let mut effective_input = input;
let mut iteration = 0;
let max_iterations = 500;
loop {
iteration += 1;
if iteration > max_iterations {
return Err((
"States.Runtime".to_string(),
"Maximum number of state transitions exceeded".to_string(),
));
}
let state_def = states.get(¤t_state).cloned().ok_or_else(|| {
(
"States.Runtime".to_string(),
format!("State '{current_state}' not found in definition"),
)
})?;
let state_type = state_def["Type"]
.as_str()
.ok_or_else(|| {
(
"States.Runtime".to_string(),
format!("State '{current_state}' missing Type field"),
)
})?
.to_string();
debug!(
execution_arn = %execution_arn,
state = %current_state,
state_type = %state_type,
"Executing state"
);
let advance = match state_type.as_str() {
"Pass" => run_pass_state(
¤t_state,
&state_def,
effective_input,
shared_state,
execution_arn,
),
"Succeed" => run_succeed_state(
¤t_state,
&state_def,
effective_input,
shared_state,
execution_arn,
),
"Fail" => run_fail_state(
¤t_state,
&state_def,
effective_input,
shared_state,
execution_arn,
),
"Choice" => run_choice_state(
¤t_state,
&state_def,
effective_input,
shared_state,
execution_arn,
),
"Wait" => {
run_wait_state(
¤t_state,
&state_def,
effective_input,
shared_state,
execution_arn,
)
.await
}
"Task" => {
run_task_state(
¤t_state,
&state_def,
effective_input,
delivery,
dynamodb_state,
shared_state,
execution_arn,
)
.await
}
"Parallel" => {
run_parallel_state(
¤t_state,
&state_def,
effective_input,
delivery,
dynamodb_state,
shared_state,
execution_arn,
)
.await
}
"Map" => {
run_map_state(
¤t_state,
&state_def,
effective_input,
delivery,
dynamodb_state,
shared_state,
execution_arn,
)
.await
}
other => Advance::Fail(
"States.Runtime".to_string(),
format!("Unsupported state type: '{other}'"),
),
};
match advance {
Advance::Next(next, new_input) => {
effective_input = new_input;
current_state = next;
}
Advance::End(output) => return Ok(output),
Advance::Fail(error, cause) => return Err((error, cause)),
}
}
})
}
enum Advance {
Next(String, Value),
End(Value),
Fail(String, String),
}
fn advance_from_next(state_def: &Value, input: Value) -> Advance {
match next_state(state_def) {
NextState::Name(next) => Advance::Next(next, input),
NextState::End => Advance::End(input),
NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
}
}
fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
match apply_state_catcher(state_def, input, &error, &cause) {
Some((next, new_input)) => Advance::Next(next, new_input),
None => Advance::Fail(error, cause),
}
}
fn run_pass_state(
name: &str,
state_def: &Value,
input: Value,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Advance {
let entered_event_id = add_event(
shared_state,
execution_arn,
"PassStateEntered",
0,
json!({
"name": name,
"input": serde_json::to_string(&input).unwrap_or_default(),
}),
);
let result = execute_pass_state(state_def, &input);
add_event(
shared_state,
execution_arn,
"PassStateExited",
entered_event_id,
json!({
"name": name,
"output": serde_json::to_string(&result).unwrap_or_default(),
}),
);
advance_from_next(state_def, result)
}
fn run_succeed_state(
name: &str,
state_def: &Value,
input: Value,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Advance {
add_event(
shared_state,
execution_arn,
"SucceedStateEntered",
0,
json!({
"name": name,
"input": serde_json::to_string(&input).unwrap_or_default(),
}),
);
let input_path = state_def["InputPath"].as_str();
let output_path = state_def["OutputPath"].as_str();
let processed = if input_path == Some("null") {
json!({})
} else {
apply_input_path(&input, input_path)
};
let output = if output_path == Some("null") {
json!({})
} else {
apply_output_path(&processed, output_path)
};
add_event(
shared_state,
execution_arn,
"SucceedStateExited",
0,
json!({
"name": name,
"output": serde_json::to_string(&output).unwrap_or_default(),
}),
);
Advance::End(output)
}
fn run_fail_state(
name: &str,
state_def: &Value,
input: Value,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Advance {
let error = state_def["Error"]
.as_str()
.unwrap_or("States.Fail")
.to_string();
let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
add_event(
shared_state,
execution_arn,
"FailStateEntered",
0,
json!({
"name": name,
"input": serde_json::to_string(&input).unwrap_or_default(),
}),
);
Advance::Fail(error, cause)
}
fn run_choice_state(
name: &str,
state_def: &Value,
input: Value,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Advance {
let entered_event_id = add_event(
shared_state,
execution_arn,
"ChoiceStateEntered",
0,
json!({
"name": name,
"input": serde_json::to_string(&input).unwrap_or_default(),
}),
);
let input_path = state_def["InputPath"].as_str();
let processed_input = if input_path == Some("null") {
json!({})
} else {
apply_input_path(&input, input_path)
};
match evaluate_choice(state_def, &processed_input) {
Some(next) => {
add_event(
shared_state,
execution_arn,
"ChoiceStateExited",
entered_event_id,
json!({
"name": name,
"output": serde_json::to_string(&input).unwrap_or_default(),
}),
);
Advance::Next(next, input)
}
None => Advance::Fail(
"States.NoChoiceMatched".to_string(),
format!("No choice rule matched and no Default in state '{name}'"),
),
}
}
async fn run_wait_state(
name: &str,
state_def: &Value,
input: Value,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Advance {
let entered_event_id = add_event(
shared_state,
execution_arn,
"WaitStateEntered",
0,
json!({
"name": name,
"input": serde_json::to_string(&input).unwrap_or_default(),
}),
);
execute_wait_state(state_def, &input).await;
add_event(
shared_state,
execution_arn,
"WaitStateExited",
entered_event_id,
json!({
"name": name,
"output": serde_json::to_string(&input).unwrap_or_default(),
}),
);
advance_from_next(state_def, input)
}
#[allow(clippy::too_many_arguments)]
async fn run_task_state(
name: &str,
state_def: &Value,
input: Value,
delivery: &Option<Arc<DeliveryBus>>,
dynamodb_state: &Option<SharedDynamoDbState>,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Advance {
let entered_event_id = add_event(
shared_state,
execution_arn,
"TaskStateEntered",
0,
json!({
"name": name,
"input": serde_json::to_string(&input).unwrap_or_default(),
}),
);
let result = execute_task_state(
state_def,
&input,
delivery,
dynamodb_state,
shared_state,
execution_arn,
entered_event_id,
)
.await;
match result {
Ok(output) => {
add_event(
shared_state,
execution_arn,
"TaskStateExited",
entered_event_id,
json!({
"name": name,
"output": serde_json::to_string(&output).unwrap_or_default(),
}),
);
advance_from_next(state_def, output)
}
Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
}
}
#[allow(clippy::too_many_arguments)]
async fn run_parallel_state(
name: &str,
state_def: &Value,
input: Value,
delivery: &Option<Arc<DeliveryBus>>,
dynamodb_state: &Option<SharedDynamoDbState>,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Advance {
let entered_event_id = add_event(
shared_state,
execution_arn,
"ParallelStateEntered",
0,
json!({
"name": name,
"input": serde_json::to_string(&input).unwrap_or_default(),
}),
);
let result = execute_parallel_state(
state_def,
&input,
delivery,
dynamodb_state,
shared_state,
execution_arn,
)
.await;
match result {
Ok(output) => {
add_event(
shared_state,
execution_arn,
"ParallelStateExited",
entered_event_id,
json!({
"name": name,
"output": serde_json::to_string(&output).unwrap_or_default(),
}),
);
advance_from_next(state_def, output)
}
Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
}
}
#[allow(clippy::too_many_arguments)]
async fn run_map_state(
name: &str,
state_def: &Value,
input: Value,
delivery: &Option<Arc<DeliveryBus>>,
dynamodb_state: &Option<SharedDynamoDbState>,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Advance {
let entered_event_id = add_event(
shared_state,
execution_arn,
"MapStateEntered",
0,
json!({
"name": name,
"input": serde_json::to_string(&input).unwrap_or_default(),
}),
);
let result = execute_map_state(
state_def,
&input,
delivery,
dynamodb_state,
shared_state,
execution_arn,
)
.await;
match result {
Ok(output) => {
add_event(
shared_state,
execution_arn,
"MapStateExited",
entered_event_id,
json!({
"name": name,
"output": serde_json::to_string(&output).unwrap_or_default(),
}),
);
advance_from_next(state_def, output)
}
Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
}
}
async fn execute_wait_state(state_def: &Value, input: &Value) {
if let Some(seconds) = state_def["Seconds"].as_u64() {
tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
return;
}
if let Some(path) = state_def["SecondsPath"].as_str() {
let val = crate::io_processing::resolve_path(input, path);
if let Some(seconds) = val.as_u64() {
tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
}
return;
}
if let Some(ts_str) = state_def["Timestamp"].as_str() {
if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
let now = Utc::now();
let target_utc = target.with_timezone(&chrono::Utc);
if target_utc > now {
let duration = (target_utc - now).to_std().unwrap_or_default();
tokio::time::sleep(duration).await;
}
}
return;
}
if let Some(path) = state_def["TimestampPath"].as_str() {
let val = crate::io_processing::resolve_path(input, path);
if let Some(ts_str) = val.as_str() {
if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
let now = Utc::now();
let target_utc = target.with_timezone(&chrono::Utc);
if target_utc > now {
let duration = (target_utc - now).to_std().unwrap_or_default();
tokio::time::sleep(duration).await;
}
}
}
return;
}
warn!(
"Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
);
}
fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
let input_path = state_def["InputPath"].as_str();
let result_path = state_def["ResultPath"].as_str();
let output_path = state_def["OutputPath"].as_str();
let effective_input = if input_path == Some("null") {
json!({})
} else {
apply_input_path(input, input_path)
};
let result = if let Some(r) = state_def.get("Result") {
r.clone()
} else {
effective_input.clone()
};
let after_result = if result_path == Some("null") {
input.clone()
} else {
apply_result_path(input, &result, result_path)
};
if output_path == Some("null") {
json!({})
} else {
apply_output_path(&after_result, output_path)
}
}
async fn execute_task_state(
state_def: &Value,
input: &Value,
delivery: &Option<Arc<DeliveryBus>>,
dynamodb_state: &Option<SharedDynamoDbState>,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
entered_event_id: i64,
) -> Result<Value, (String, String)> {
let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
let input_path = state_def["InputPath"].as_str();
let result_path = state_def["ResultPath"].as_str();
let output_path = state_def["OutputPath"].as_str();
let effective_input = if input_path == Some("null") {
json!({})
} else {
apply_input_path(input, input_path)
};
let task_input = if let Some(params) = state_def.get("Parameters") {
apply_parameters(params, &effective_input)
} else {
effective_input
};
let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
let mut attempt = 0u32;
loop {
add_event(
shared_state,
execution_arn,
"TaskScheduled",
entered_event_id,
json!({
"resource": resource,
"region": "us-east-1",
"parameters": serde_json::to_string(&task_input).unwrap_or_default(),
}),
);
add_event(
shared_state,
execution_arn,
"TaskStarted",
entered_event_id,
json!({ "resource": resource }),
);
let invoke_result = invoke_resource(
&resource,
&task_input,
delivery,
dynamodb_state,
timeout_seconds,
heartbeat_seconds,
shared_state,
)
.await;
match invoke_result {
Ok(result) => {
add_event(
shared_state,
execution_arn,
"TaskSucceeded",
entered_event_id,
json!({
"resource": resource,
"output": serde_json::to_string(&result).unwrap_or_default(),
}),
);
let selected = if let Some(selector) = state_def.get("ResultSelector") {
apply_parameters(selector, &result)
} else {
result
};
let after_result = if result_path == Some("null") {
input.clone()
} else {
apply_result_path(input, &selected, result_path)
};
let output = if output_path == Some("null") {
json!({})
} else {
apply_output_path(&after_result, output_path)
};
return Ok(output);
}
Err((error, cause)) => {
add_event(
shared_state,
execution_arn,
"TaskFailed",
entered_event_id,
json!({ "error": error, "cause": cause }),
);
if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
attempt += 1;
let actual_delay = delay_ms.min(5000);
tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
continue;
}
return Err((error, cause));
}
}
}
}
async fn execute_parallel_state(
state_def: &Value,
input: &Value,
delivery: &Option<Arc<DeliveryBus>>,
dynamodb_state: &Option<SharedDynamoDbState>,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Result<Value, (String, String)> {
let input_path = state_def["InputPath"].as_str();
let result_path = state_def["ResultPath"].as_str();
let output_path = state_def["OutputPath"].as_str();
let effective_input = if input_path == Some("null") {
json!({})
} else {
apply_input_path(input, input_path)
};
let branches = state_def["Branches"]
.as_array()
.cloned()
.unwrap_or_default();
if branches.is_empty() {
return Err((
"States.Runtime".to_string(),
"Parallel state has no Branches".to_string(),
));
}
let mut handles = Vec::new();
for branch_def in &branches {
let branch = branch_def.clone();
let branch_input = effective_input.clone();
let delivery = delivery.clone();
let ddb = dynamodb_state.clone();
let state = shared_state.clone();
let arn = execution_arn.to_string();
handles.push(tokio::spawn(async move {
run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
}));
}
let mut results = Vec::with_capacity(handles.len());
for handle in handles {
let result = handle.await.map_err(|e| {
(
"States.Runtime".to_string(),
format!("Branch execution panicked: {e}"),
)
})??;
results.push(result);
}
let branch_output = Value::Array(results);
let selected = if let Some(selector) = state_def.get("ResultSelector") {
apply_parameters(selector, &branch_output)
} else {
branch_output
};
let after_result = if result_path == Some("null") {
input.clone()
} else {
apply_result_path(input, &selected, result_path)
};
let output = if output_path == Some("null") {
json!({})
} else {
apply_output_path(&after_result, output_path)
};
Ok(output)
}
async fn execute_map_state(
state_def: &Value,
input: &Value,
delivery: &Option<Arc<DeliveryBus>>,
dynamodb_state: &Option<SharedDynamoDbState>,
shared_state: &SharedStepFunctionsState,
execution_arn: &str,
) -> Result<Value, (String, String)> {
let input_path = state_def["InputPath"].as_str();
let result_path = state_def["ResultPath"].as_str();
let output_path = state_def["OutputPath"].as_str();
let effective_input = if input_path == Some("null") {
json!({})
} else {
apply_input_path(input, input_path)
};
let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
let items = items_value.as_array().cloned().unwrap_or_default();
let iterator_def = state_def
.get("ItemProcessor")
.or_else(|| state_def.get("Iterator"))
.cloned()
.ok_or_else(|| {
(
"States.Runtime".to_string(),
"Map state has no ItemProcessor or Iterator".to_string(),
)
})?;
let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
let effective_concurrency = if max_concurrency == 0 {
40
} else {
max_concurrency as usize
};
let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
let mut handles = Vec::new();
for (index, item) in items.into_iter().enumerate() {
let iter_def = iterator_def.clone();
let delivery = delivery.clone();
let ddb = dynamodb_state.clone();
let state = shared_state.clone();
let arn = execution_arn.to_string();
let sem = semaphore.clone();
let item_input = if let Some(selector) = state_def.get("ItemSelector") {
let mut ctx = serde_json::Map::new();
ctx.insert("value".to_string(), item.clone());
ctx.insert("index".to_string(), json!(index));
apply_parameters(selector, &Value::Object(ctx))
} else {
item
};
add_event(
shared_state,
execution_arn,
"MapIterationStarted",
0,
json!({ "index": index }),
);
handles.push(tokio::spawn(async move {
let _permit = sem
.acquire()
.await
.map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
}));
}
let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
for handle in handles {
let (index, result) = handle.await.map_err(|e| {
(
"States.Runtime".to_string(),
format!("Map iteration panicked: {e}"),
)
})??;
match result {
Ok(output) => {
add_event(
shared_state,
execution_arn,
"MapIterationSucceeded",
0,
json!({ "index": index }),
);
results.push((index, output));
}
Err((error, cause)) => {
add_event(
shared_state,
execution_arn,
"MapIterationFailed",
0,
json!({ "index": index, "error": error }),
);
return Err((error, cause));
}
}
}
results.sort_by_key(|(i, _)| *i);
let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
let selected = if let Some(selector) = state_def.get("ResultSelector") {
apply_parameters(selector, &map_output)
} else {
map_output
};
let after_result = if result_path == Some("null") {
input.clone()
} else {
apply_result_path(input, &selected, result_path)
};
let output = if output_path == Some("null") {
json!({})
} else {
apply_output_path(&after_result, output_path)
};
Ok(output)
}
#[allow(clippy::too_many_arguments)]
async fn invoke_resource(
resource: &str,
input: &Value,
delivery: &Option<Arc<DeliveryBus>>,
dynamodb_state: &Option<SharedDynamoDbState>,
timeout_seconds: Option<u64>,
heartbeat_seconds: Option<u64>,
shared_state: &SharedStepFunctionsState,
) -> Result<Value, (String, String)> {
if resource.contains(":states:") && resource.contains(":activity:") {
return invoke_activity(
resource,
input,
shared_state,
timeout_seconds,
heartbeat_seconds,
)
.await;
}
if resource.contains(":lambda:") && resource.contains(":function:") {
return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
}
if resource.starts_with("arn:aws:states:::lambda:invoke") {
let function_name = input["FunctionName"].as_str().unwrap_or("");
let payload = if let Some(p) = input.get("Payload") {
p.clone()
} else {
input.clone()
};
return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
}
if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
return invoke_sqs_send_message(input, delivery);
}
if resource.starts_with("arn:aws:states:::sns:publish") {
return invoke_sns_publish(input, delivery);
}
if resource.starts_with("arn:aws:states:::events:putEvents") {
return invoke_eventbridge_put_events(input, delivery);
}
if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
return invoke_dynamodb_get_item(input, dynamodb_state);
}
if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
return invoke_dynamodb_put_item(input, dynamodb_state);
}
if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
return invoke_dynamodb_delete_item(input, dynamodb_state);
}
if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
return invoke_dynamodb_update_item(input, dynamodb_state);
}
Err((
"States.TaskFailed".to_string(),
format!("Unsupported resource: {resource}"),
))
}
fn invoke_sqs_send_message(
input: &Value,
delivery: &Option<Arc<DeliveryBus>>,
) -> Result<Value, (String, String)> {
let delivery = delivery.as_ref().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"No delivery bus configured for SQS".to_string(),
)
})?;
let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing QueueUrl in SQS sendMessage parameters".to_string(),
)
})?;
let message_body = input["MessageBody"]
.as_str()
.map(|s| s.to_string())
.unwrap_or_else(|| {
serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
});
let queue_arn = queue_url_to_arn(queue_url);
delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
Ok(json!({
"MessageId": uuid::Uuid::new_v4().to_string(),
"MD5OfMessageBody": md5_hex(&message_body),
}))
}
fn invoke_sns_publish(
input: &Value,
delivery: &Option<Arc<DeliveryBus>>,
) -> Result<Value, (String, String)> {
let delivery = delivery.as_ref().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"No delivery bus configured for SNS".to_string(),
)
})?;
let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing TopicArn in SNS publish parameters".to_string(),
)
})?;
let message = input["Message"]
.as_str()
.map(|s| s.to_string())
.unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
let subject = input["Subject"].as_str();
delivery.publish_to_sns(topic_arn, &message, subject);
Ok(json!({
"MessageId": uuid::Uuid::new_v4().to_string(),
}))
}
fn invoke_eventbridge_put_events(
input: &Value,
delivery: &Option<Arc<DeliveryBus>>,
) -> Result<Value, (String, String)> {
let delivery = delivery.as_ref().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"No delivery bus configured for EventBridge".to_string(),
)
})?;
let entries = input["Entries"]
.as_array()
.ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing Entries in EventBridge putEvents parameters".to_string(),
)
})?
.clone();
let mut event_ids = Vec::new();
for entry in &entries {
let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
let detail = entry["Detail"]
.as_str()
.map(|s| s.to_string())
.unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
event_ids.push(uuid::Uuid::new_v4().to_string());
}
Ok(json!({
"Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
"FailedEntryCount": 0,
}))
}
fn invoke_dynamodb_get_item(
input: &Value,
dynamodb_state: &Option<SharedDynamoDbState>,
) -> Result<Value, (String, String)> {
let ddb = dynamodb_state.as_ref().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"No DynamoDB state configured".to_string(),
)
})?;
let table_name = input["TableName"].as_str().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing TableName in DynamoDB getItem parameters".to_string(),
)
})?;
let key = input
.get("Key")
.and_then(|k| k.as_object())
.ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing Key in DynamoDB getItem parameters".to_string(),
)
})?;
let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
let __mas = ddb.read();
let state = __mas.default_ref();
let table = state.tables.get(table_name).ok_or_else(|| {
(
"States.TaskFailed".to_string(),
format!("Table '{table_name}' not found"),
)
})?;
let item = table
.find_item_index(&key_map)
.map(|idx| table.items[idx].clone());
match item {
Some(item_map) => {
let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
Ok(json!({ "Item": item_value }))
}
None => Ok(json!({})),
}
}
fn invoke_dynamodb_put_item(
input: &Value,
dynamodb_state: &Option<SharedDynamoDbState>,
) -> Result<Value, (String, String)> {
let ddb = dynamodb_state.as_ref().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"No DynamoDB state configured".to_string(),
)
})?;
let table_name = input["TableName"].as_str().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing TableName in DynamoDB putItem parameters".to_string(),
)
})?;
let item = input
.get("Item")
.and_then(|i| i.as_object())
.ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing Item in DynamoDB putItem parameters".to_string(),
)
})?;
let item_map: HashMap<String, Value> =
item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
let mut __mas = ddb.write();
let state = __mas.default_mut();
let table = state.tables.get_mut(table_name).ok_or_else(|| {
(
"States.TaskFailed".to_string(),
format!("Table '{table_name}' not found"),
)
})?;
if let Some(idx) = table.find_item_index(&item_map) {
table.items[idx] = item_map;
} else {
table.items.push(item_map);
}
Ok(json!({}))
}
fn invoke_dynamodb_delete_item(
input: &Value,
dynamodb_state: &Option<SharedDynamoDbState>,
) -> Result<Value, (String, String)> {
let ddb = dynamodb_state.as_ref().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"No DynamoDB state configured".to_string(),
)
})?;
let table_name = input["TableName"].as_str().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing TableName in DynamoDB deleteItem parameters".to_string(),
)
})?;
let key = input
.get("Key")
.and_then(|k| k.as_object())
.ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing Key in DynamoDB deleteItem parameters".to_string(),
)
})?;
let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
let mut __mas = ddb.write();
let state = __mas.default_mut();
let table = state.tables.get_mut(table_name).ok_or_else(|| {
(
"States.TaskFailed".to_string(),
format!("Table '{table_name}' not found"),
)
})?;
if let Some(idx) = table.find_item_index(&key_map) {
table.items.remove(idx);
}
Ok(json!({}))
}
fn invoke_dynamodb_update_item(
input: &Value,
dynamodb_state: &Option<SharedDynamoDbState>,
) -> Result<Value, (String, String)> {
let ddb = dynamodb_state.as_ref().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"No DynamoDB state configured".to_string(),
)
})?;
let table_name = input["TableName"].as_str().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing TableName in DynamoDB updateItem parameters".to_string(),
)
})?;
let key = input
.get("Key")
.and_then(|k| k.as_object())
.ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"Missing Key in DynamoDB updateItem parameters".to_string(),
)
})?;
let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
let mut __mas = ddb.write();
let state = __mas.default_mut();
let table = state.tables.get_mut(table_name).ok_or_else(|| {
(
"States.TaskFailed".to_string(),
format!("Table '{table_name}' not found"),
)
})?;
if let Some(update_expr) = input["UpdateExpression"].as_str() {
let attr_values = input
.get("ExpressionAttributeValues")
.and_then(|v| v.as_object())
.cloned()
.unwrap_or_default();
let attr_names = input
.get("ExpressionAttributeNames")
.and_then(|v| v.as_object())
.cloned()
.unwrap_or_default();
if let Some(idx) = table.find_item_index(&key_map) {
apply_update_expression(
&mut table.items[idx],
update_expr,
&attr_values,
&attr_names,
);
} else {
let mut new_item = key_map;
apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
table.items.push(new_item);
}
}
Ok(json!({}))
}
fn apply_update_expression(
item: &mut HashMap<String, Value>,
expr: &str,
attr_values: &serde_json::Map<String, Value>,
attr_names: &serde_json::Map<String, Value>,
) {
let clauses = split_update_clauses(expr);
for (clause, body) in clauses {
match clause {
UpdateClause::Set => apply_set(item, &body, attr_values, attr_names),
UpdateClause::Remove => apply_remove(item, &body, attr_names),
UpdateClause::Add => apply_add(item, &body, attr_values, attr_names),
UpdateClause::Delete => apply_delete(item, &body, attr_values, attr_names),
}
}
}
#[derive(Clone, Copy)]
enum UpdateClause {
Set,
Remove,
Add,
Delete,
}
fn split_update_clauses(expr: &str) -> Vec<(UpdateClause, String)> {
let mut out = Vec::new();
let mut current: Option<UpdateClause> = None;
let mut buf = String::new();
for token in expr.split_whitespace() {
let upper = token.to_ascii_uppercase();
let next_clause = match upper.as_str() {
"SET" => Some(UpdateClause::Set),
"REMOVE" => Some(UpdateClause::Remove),
"ADD" => Some(UpdateClause::Add),
"DELETE" => Some(UpdateClause::Delete),
_ => None,
};
if let Some(nc) = next_clause {
if let Some(prev) = current.take() {
out.push((prev, buf.trim().to_string()));
buf.clear();
}
current = Some(nc);
} else if current.is_some() {
if !buf.is_empty() {
buf.push(' ');
}
buf.push_str(token);
}
}
if let Some(c) = current {
out.push((c, buf.trim().to_string()));
}
out
}
fn resolve_attr_name(token: &str, attr_names: &serde_json::Map<String, Value>) -> String {
if token.starts_with('#') {
attr_names
.get(token)
.and_then(|v| v.as_str())
.unwrap_or(token)
.to_string()
} else {
token.to_string()
}
}
fn apply_set(
item: &mut HashMap<String, Value>,
body: &str,
attr_values: &serde_json::Map<String, Value>,
attr_names: &serde_json::Map<String, Value>,
) {
for assignment in split_top_commas(body) {
let Some((lhs, rhs)) = assignment.split_once('=') else {
continue;
};
let attr_name = resolve_attr_name(lhs.trim(), attr_names);
let value = evaluate_set_rhs(rhs.trim(), &attr_name, item, attr_values, attr_names);
if let Some(v) = value {
item.insert(attr_name, v);
}
}
}
fn evaluate_set_rhs(
rhs: &str,
attr_name: &str,
item: &HashMap<String, Value>,
attr_values: &serde_json::Map<String, Value>,
attr_names: &serde_json::Map<String, Value>,
) -> Option<Value> {
if let Some(args) = rhs
.strip_prefix("if_not_exists(")
.and_then(|s| s.strip_suffix(')'))
{
let parts: Vec<&str> = args.splitn(2, ',').collect();
if parts.len() == 2 {
let path = resolve_attr_name(parts[0].trim(), attr_names);
if item.contains_key(&path) {
return item.get(&path).cloned();
}
return resolve_value(parts[1].trim(), attr_values);
}
return None;
}
for op in ['+', '-'] {
if let Some((left, right)) = split_top_op(rhs, op) {
let left = left.trim();
let right = right.trim();
let left_val = if left.starts_with(':') {
resolve_value(left, attr_values)
} else {
let name = resolve_attr_name(left, attr_names);
item.get(&name).cloned()
};
let right_val = if right.starts_with(':') {
resolve_value(right, attr_values)
} else {
let name = resolve_attr_name(right, attr_names);
item.get(&name).cloned()
};
return arithmetic(left_val.as_ref(), op, right_val.as_ref());
}
}
if rhs.starts_with(':') {
return resolve_value(rhs, attr_values);
}
if rhs.starts_with('#') {
let _ = attr_name;
let name = resolve_attr_name(rhs, attr_names);
return item.get(&name).cloned();
}
None
}
fn arithmetic(left: Option<&Value>, op: char, right: Option<&Value>) -> Option<Value> {
let lf = number_from_dynamo(left?)?;
let rf = number_from_dynamo(right?)?;
let out = match op {
'+' => lf + rf,
'-' => lf - rf,
_ => return None,
};
Some(json!({ "N": format_number(out) }))
}
fn number_from_dynamo(v: &Value) -> Option<f64> {
v.get("N")?.as_str()?.parse().ok()
}
fn format_number(n: f64) -> String {
if n.fract() == 0.0 && n.is_finite() && n >= i64::MIN as f64 && n < i64::MAX as f64 {
format!("{}", n as i64)
} else {
format!("{n}")
}
}
fn resolve_value(token: &str, attr_values: &serde_json::Map<String, Value>) -> Option<Value> {
attr_values.get(token).cloned()
}
fn apply_remove(
item: &mut HashMap<String, Value>,
body: &str,
attr_names: &serde_json::Map<String, Value>,
) {
for path in split_top_commas(body) {
let name = resolve_attr_name(path.trim(), attr_names);
item.remove(&name);
}
}
fn apply_add(
item: &mut HashMap<String, Value>,
body: &str,
attr_values: &serde_json::Map<String, Value>,
attr_names: &serde_json::Map<String, Value>,
) {
for clause in split_top_commas(body) {
let mut parts = clause.split_whitespace();
let Some(path) = parts.next() else { continue };
let Some(value_ref) = parts.next() else {
continue;
};
let attr_name = resolve_attr_name(path, attr_names);
let Some(delta) = resolve_value(value_ref, attr_values) else {
continue;
};
let current = item.get(&attr_name).cloned();
let next = match (current.as_ref(), &delta) {
(None, _) => delta.clone(),
(Some(cur), _) => arithmetic(Some(cur), '+', Some(&delta)).unwrap_or(delta.clone()),
};
item.insert(attr_name, next);
}
}
fn apply_delete(
item: &mut HashMap<String, Value>,
body: &str,
attr_values: &serde_json::Map<String, Value>,
attr_names: &serde_json::Map<String, Value>,
) {
for clause in split_top_commas(body) {
let mut parts = clause.split_whitespace();
let Some(path) = parts.next() else { continue };
let Some(value_ref) = parts.next() else {
continue;
};
let attr_name = resolve_attr_name(path, attr_names);
let Some(elements) = resolve_value(value_ref, attr_values) else {
continue;
};
let Some(current) = item.get_mut(&attr_name) else {
continue;
};
for set_kind in ["SS", "NS", "BS"] {
if let (Some(cur_arr), Some(rem_arr)) = (
current.get_mut(set_kind).and_then(|v| v.as_array_mut()),
elements.get(set_kind).and_then(|v| v.as_array()),
) {
let to_remove: std::collections::HashSet<String> = rem_arr
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
cur_arr.retain(|v| !v.as_str().is_some_and(|s| to_remove.contains(s)));
if cur_arr.is_empty() {
item.remove(&attr_name);
}
break;
}
}
}
}
fn split_top_commas(s: &str) -> Vec<String> {
let mut out = Vec::new();
let mut depth = 0i32;
let mut buf = String::new();
for c in s.chars() {
match c {
'(' => {
depth += 1;
buf.push(c);
}
')' => {
depth -= 1;
buf.push(c);
}
',' if depth == 0 => {
out.push(std::mem::take(&mut buf).trim().to_string());
}
_ => buf.push(c),
}
}
if !buf.trim().is_empty() {
out.push(buf.trim().to_string());
}
out
}
fn split_top_op(s: &str, op: char) -> Option<(&str, &str)> {
let mut depth = 0i32;
for (i, c) in s.char_indices() {
match c {
'(' => depth += 1,
')' => depth -= 1,
c if c == op && depth == 0 && i > 0 => {
return Some((&s[..i], &s[i + c.len_utf8()..]));
}
_ => {}
}
}
None
}
fn queue_url_to_arn(url: &str) -> String {
let parts: Vec<&str> = url.rsplitn(3, '/').collect();
if parts.len() >= 2 {
let queue_name = parts[0];
let account_id = parts[1];
Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
} else {
url.to_string()
}
}
fn md5_hex(data: &str) -> String {
use md5::Digest;
let result = md5::Md5::digest(data.as_bytes());
format!("{result:032x}")
}
async fn invoke_lambda_direct(
function_arn: &str,
input: &Value,
delivery: &Option<Arc<DeliveryBus>>,
timeout_seconds: Option<u64>,
) -> Result<Value, (String, String)> {
let delivery = delivery.as_ref().ok_or_else(|| {
(
"States.TaskFailed".to_string(),
"No delivery bus configured for Lambda invocation".to_string(),
)
})?;
let payload = serde_json::to_string(input).unwrap_or_default();
let invoke_future = delivery.invoke_lambda(function_arn, &payload);
let result = if let Some(timeout) = timeout_seconds {
match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
Ok(r) => r,
Err(_) => {
return Err((
"States.Timeout".to_string(),
format!("Task timed out after {timeout} seconds"),
));
}
}
} else {
invoke_future.await
};
match result {
Some(Ok(bytes)) => {
let response_str = String::from_utf8_lossy(&bytes);
let value: Value =
serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
Ok(value)
}
Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
None => {
Ok(json!({}))
}
}
}
async fn invoke_activity(
activity_arn: &str,
input: &Value,
shared_state: &SharedStepFunctionsState,
timeout_seconds: Option<u64>,
heartbeat_seconds: Option<u64>,
) -> Result<Value, (String, String)> {
use crate::state::TaskTokenState;
let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
{
let accounts = shared_state.read();
let exists = accounts
.get(&activity_account)
.map(|s| s.activities.contains_key(activity_arn))
.unwrap_or(false);
if !exists {
return Err((
"States.TaskFailed".to_string(),
format!("Activity does not exist: {activity_arn}"),
));
}
}
let token = format!(
"FCToken-{}-{}",
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
uuid::Uuid::new_v4().simple(),
);
let now = chrono::Utc::now();
let input_str = serde_json::to_string(input).unwrap_or_else(|_| "{}".to_string());
{
let mut accounts = shared_state.write();
let state = accounts.get_or_create(&activity_account);
state.task_tokens.insert(
token.clone(),
TaskTokenState {
activity_arn: activity_arn.to_string(),
status: "PENDING".to_string(),
output: None,
error: None,
cause: None,
input: Some(input_str),
created_at: now,
last_heartbeat_at: None,
heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
timeout_seconds: timeout_seconds.map(|s| s as i64),
},
);
}
let absolute_deadline =
std::time::Instant::now() + std::time::Duration::from_secs(timeout_seconds.unwrap_or(3600));
loop {
let now_ts = chrono::Utc::now();
let snapshot = {
let accounts = shared_state.read();
accounts
.get(&activity_account)
.and_then(|s| s.task_tokens.get(&token).cloned())
};
let Some(entry) = snapshot else {
return Err((
"States.TaskFailed".to_string(),
"Activity task token disappeared".to_string(),
));
};
match entry.status.as_str() {
"SUCCEEDED" => {
cleanup_token(shared_state, &activity_account, &token);
let output = entry.output.unwrap_or_else(|| "{}".to_string());
let value: Value = serde_json::from_str(&output).unwrap_or(Value::String(output));
return Ok(value);
}
"FAILED" => {
cleanup_token(shared_state, &activity_account, &token);
return Err((
entry
.error
.unwrap_or_else(|| "States.TaskFailed".to_string()),
entry.cause.unwrap_or_default(),
));
}
_ => {}
}
if entry.status == "IN_PROGRESS" {
if let Some(hb) = entry.heartbeat_seconds {
let last = entry.last_heartbeat_at.unwrap_or(entry.created_at);
if (now_ts - last).num_seconds() > hb {
cleanup_token(shared_state, &activity_account, &token);
return Err((
"States.HeartbeatTimeout".to_string(),
format!("Activity worker missed heartbeat ({hb}s window)"),
));
}
}
}
if std::time::Instant::now() >= absolute_deadline {
cleanup_token(shared_state, &activity_account, &token);
let secs = timeout_seconds.unwrap_or(3600);
return Err((
"States.Timeout".to_string(),
format!("Activity timed out after {secs} seconds"),
));
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
fn cleanup_token(shared_state: &SharedStepFunctionsState, account_id: &str, token: &str) {
let mut accounts = shared_state.write();
if let Some(state) = accounts.get_mut(account_id) {
state.task_tokens.remove(token);
}
}
fn apply_parameters(template: &Value, input: &Value) -> Value {
match template {
Value::Object(map) => {
let mut result = serde_json::Map::new();
for (key, value) in map {
if let Some(stripped) = key.strip_suffix(".$") {
if let Some(path) = value.as_str() {
result.insert(
stripped.to_string(),
crate::io_processing::resolve_path(input, path),
);
}
} else {
result.insert(key.clone(), apply_parameters(value, input));
}
}
Value::Object(result)
}
Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
other => other.clone(),
}
}
enum NextState {
Name(String),
End,
Error(String),
}
fn next_state(state_def: &Value) -> NextState {
if state_def["End"].as_bool() == Some(true) {
return NextState::End;
}
match state_def["Next"].as_str() {
Some(next) => NextState::Name(next.to_string()),
None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
}
}
fn apply_state_catcher(
state_def: &Value,
effective_input: &Value,
error: &str,
cause: &str,
) -> Option<(String, Value)> {
let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
let (next, result_path) = find_catcher(&catchers, error)?;
let error_output = json!({
"Error": error,
"Cause": cause,
});
let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
Some((next, new_input))
}
fn account_id_from_arn(arn: &str) -> &str {
arn.split(':').nth(4).unwrap_or("000000000000")
}
fn add_event(
state: &SharedStepFunctionsState,
execution_arn: &str,
event_type: &str,
previous_event_id: i64,
details: Value,
) -> i64 {
let account_id = account_id_from_arn(execution_arn).to_string();
let mut accounts = state.write();
let s = accounts.get_or_create(&account_id);
if let Some(exec) = s.executions.get_mut(execution_arn) {
let id = exec.history_events.len() as i64 + 1;
exec.history_events.push(HistoryEvent {
id,
event_type: event_type.to_string(),
timestamp: Utc::now(),
previous_event_id,
details,
});
id
} else {
0
}
}
fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
let account_id = account_id_from_arn(execution_arn).to_string();
{
let accounts = state.read();
if let Some(s) = accounts.get(&account_id) {
if let Some(exec) = s.executions.get(execution_arn) {
if exec.status != ExecutionStatus::Running {
return;
}
}
}
}
let output_str = serde_json::to_string(output).unwrap_or_default();
add_event(
state,
execution_arn,
"ExecutionSucceeded",
0,
json!({ "output": output_str }),
);
let mut accounts = state.write();
let s = accounts.get_or_create(&account_id);
if let Some(exec) = s.executions.get_mut(execution_arn) {
exec.status = ExecutionStatus::Succeeded;
exec.output = Some(output_str);
exec.stop_date = Some(Utc::now());
}
}
fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
let account_id = account_id_from_arn(execution_arn).to_string();
{
let accounts = state.read();
if let Some(s) = accounts.get(&account_id) {
if let Some(exec) = s.executions.get(execution_arn) {
if exec.status != ExecutionStatus::Running {
return;
}
}
}
}
add_event(
state,
execution_arn,
"ExecutionFailed",
0,
json!({ "error": error, "cause": cause }),
);
let mut accounts = state.write();
let s = accounts.get_or_create(&account_id);
if let Some(exec) = s.executions.get_mut(execution_arn) {
exec.status = ExecutionStatus::Failed;
exec.error = Some(error.to_string());
exec.cause = Some(cause.to_string());
exec.stop_date = Some(Utc::now());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::Execution;
use parking_lot::RwLock;
use std::sync::Arc;
fn make_state() -> SharedStepFunctionsState {
Arc::new(RwLock::new(
fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
))
}
fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
let mut accounts = state.write();
let s = accounts.get_or_create("123456789012");
s.executions.insert(
arn.to_string(),
Execution {
execution_arn: arn.to_string(),
state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
.to_string(),
state_machine_name: "test".to_string(),
name: "exec-1".to_string(),
status: ExecutionStatus::Running,
input,
output: None,
start_date: Utc::now(),
stop_date: None,
error: None,
cause: None,
history_events: vec![],
},
);
}
#[tokio::test]
async fn test_simple_pass_state() {
let state = make_state();
let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
let definition = json!({
"StartAt": "PassState",
"States": {
"PassState": {
"Type": "Pass",
"Result": {"processed": true},
"End": true
}
}
})
.to_string();
execute_state_machine(
state.clone(),
arn.to_string(),
definition,
Some(r#"{"hello":"world"}"#.to_string()),
None,
None,
)
.await;
let __a = state.read();
let s = __a.default_ref();
let exec = s.executions.get(arn).unwrap();
assert_eq!(exec.status, ExecutionStatus::Succeeded);
assert!(exec.output.is_some());
let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
assert_eq!(output, json!({"processed": true}));
}
#[tokio::test]
async fn test_pass_chain() {
let state = make_state();
let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
create_execution(&state, arn, Some(r#"{}"#.to_string()));
let definition = json!({
"StartAt": "First",
"States": {
"First": {
"Type": "Pass",
"Result": "step1",
"ResultPath": "$.first",
"Next": "Second"
},
"Second": {
"Type": "Pass",
"Result": "step2",
"ResultPath": "$.second",
"End": true
}
}
})
.to_string();
execute_state_machine(
state.clone(),
arn.to_string(),
definition,
Some("{}".to_string()),
None,
None,
)
.await;
let __a = state.read();
let s = __a.default_ref();
let exec = s.executions.get(arn).unwrap();
assert_eq!(exec.status, ExecutionStatus::Succeeded);
let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
assert_eq!(output["first"], json!("step1"));
assert_eq!(output["second"], json!("step2"));
}
#[tokio::test]
async fn test_succeed_state() {
let state = make_state();
let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
let definition = json!({
"StartAt": "Done",
"States": {
"Done": {
"Type": "Succeed"
}
}
})
.to_string();
execute_state_machine(
state.clone(),
arn.to_string(),
definition,
Some(r#"{"data": "value"}"#.to_string()),
None,
None,
)
.await;
let __a = state.read();
let s = __a.default_ref();
let exec = s.executions.get(arn).unwrap();
assert_eq!(exec.status, ExecutionStatus::Succeeded);
}
#[tokio::test]
async fn test_fail_state() {
let state = make_state();
let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
create_execution(&state, arn, None);
let definition = json!({
"StartAt": "FailState",
"States": {
"FailState": {
"Type": "Fail",
"Error": "CustomError",
"Cause": "Something went wrong"
}
}
})
.to_string();
execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
let __a = state.read();
let s = __a.default_ref();
let exec = s.executions.get(arn).unwrap();
assert_eq!(exec.status, ExecutionStatus::Failed);
assert_eq!(exec.error.as_deref(), Some("CustomError"));
assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
}
#[tokio::test]
async fn test_history_events_recorded() {
let state = make_state();
let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
create_execution(&state, arn, Some("{}".to_string()));
let definition = json!({
"StartAt": "PassState",
"States": {
"PassState": {
"Type": "Pass",
"End": true
}
}
})
.to_string();
execute_state_machine(
state.clone(),
arn.to_string(),
definition,
Some("{}".to_string()),
None,
None,
)
.await;
let __a = state.read();
let s = __a.default_ref();
let exec = s.executions.get(arn).unwrap();
let event_types: Vec<&str> = exec
.history_events
.iter()
.map(|e| e.event_type.as_str())
.collect();
assert_eq!(
event_types,
vec![
"ExecutionStarted",
"PassStateEntered",
"PassStateExited",
"ExecutionSucceeded"
]
);
}
fn drive(state: &SharedStepFunctionsState, arn: &str, def: Value, input: Option<&str>) {
create_execution(state, arn, input.map(|s| s.to_string()));
let fut = execute_state_machine(
state.clone(),
arn.to_string(),
def.to_string(),
input.map(|s| s.to_string()),
None,
None,
);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
rt.block_on(fut);
}
fn read_exec<R>(
state: &SharedStepFunctionsState,
arn: &str,
f: impl FnOnce(&Execution) -> R,
) -> R {
let __a = state.read();
let s = __a.default_ref();
f(s.executions.get(arn).expect("execution missing"))
}
fn arn_for(name: &str) -> String {
format!("arn:aws:states:us-east-1:123456789012:execution:test:{name}")
}
#[test]
fn pass_state_input_output_path_select_fields() {
let state = make_state();
let arn = arn_for("pass-paths");
let def = json!({
"StartAt": "P",
"States": {
"P": {
"Type": "Pass",
"InputPath": "$.inner",
"OutputPath": "$.kept",
"End": true
}
}
});
drive(
&state,
&arn,
def,
Some(r#"{"inner":{"kept":"yes","dropped":true},"sibling":1}"#),
);
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
assert_eq!(output, json!("yes"));
});
}
#[test]
fn succeed_state_honors_input_path_null() {
let state = make_state();
let arn = arn_for("succeed-null");
let def = json!({
"StartAt": "S",
"States": { "S": { "Type": "Succeed", "InputPath": "null" } }
});
drive(&state, &arn, def, Some(r#"{"a":1}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
assert_eq!(output, json!({}));
});
}
#[test]
fn fail_state_defaults_when_fields_missing() {
let state = make_state();
let arn = arn_for("fail-default");
let def = json!({
"StartAt": "F",
"States": { "F": { "Type": "Fail" } }
});
drive(&state, &arn, def, None);
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert_eq!(exec.error.as_deref(), Some("States.Fail"));
assert_eq!(exec.cause.as_deref(), Some(""));
});
}
fn choice_def() -> Value {
json!({
"StartAt": "C",
"States": {
"C": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.n",
"NumericGreaterThan": 10,
"Next": "Big"
}
],
"Default": "Small"
},
"Big": { "Type": "Pass", "Result": "big", "End": true },
"Small": { "Type": "Pass", "Result": "small", "End": true }
}
})
}
#[test]
fn choice_routes_to_matching_branch() {
let state = make_state();
let arn = arn_for("choice-big");
drive(&state, &arn, choice_def(), Some(r#"{"n":42}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
assert_eq!(
serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
json!("big")
);
});
}
#[test]
fn choice_falls_through_to_default() {
let state = make_state();
let arn = arn_for("choice-default");
drive(&state, &arn, choice_def(), Some(r#"{"n":3}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
assert_eq!(
serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
json!("small")
);
});
}
#[test]
fn choice_no_match_and_no_default_fails() {
let state = make_state();
let arn = arn_for("choice-nomatch");
let def = json!({
"StartAt": "C",
"States": {
"C": {
"Type": "Choice",
"Choices": [
{ "Variable": "$.n", "NumericEquals": 1, "Next": "End1" }
]
},
"End1": { "Type": "Pass", "End": true }
}
});
drive(&state, &arn, def, Some(r#"{"n":99}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert_eq!(exec.error.as_deref(), Some("States.NoChoiceMatched"));
});
}
#[test]
fn wait_seconds_then_advances() {
let state = make_state();
let arn = arn_for("wait-secs");
let def = json!({
"StartAt": "W",
"States": {
"W": { "Type": "Wait", "Seconds": 0, "Next": "Done" },
"Done": { "Type": "Succeed" }
}
});
drive(&state, &arn, def, Some(r#"{"k":1}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
});
}
#[test]
fn wait_timestamp_in_past_is_noop() {
let state = make_state();
let arn = arn_for("wait-past");
let def = json!({
"StartAt": "W",
"States": {
"W": {
"Type": "Wait",
"Timestamp": "2000-01-01T00:00:00Z",
"End": true
}
}
});
drive(&state, &arn, def, Some(r#"{"k":1}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
});
}
#[test]
fn wait_without_any_duration_falls_through() {
let state = make_state();
let arn = arn_for("wait-none");
let def = json!({
"StartAt": "W",
"States": { "W": { "Type": "Wait", "End": true } }
});
drive(&state, &arn, def, None);
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
});
}
#[test]
fn parallel_runs_two_pass_branches_and_collects_results() {
let state = make_state();
let arn = arn_for("parallel-ok");
let def = json!({
"StartAt": "P",
"States": {
"P": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "A",
"States": { "A": { "Type": "Pass", "Result": "a", "End": true } }
},
{
"StartAt": "B",
"States": { "B": { "Type": "Pass", "Result": "b", "End": true } }
}
]
}
}
});
drive(&state, &arn, def, Some(r#"{}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
assert_eq!(output, json!(["a", "b"]));
});
}
#[test]
fn parallel_empty_branches_fails() {
let state = make_state();
let arn = arn_for("parallel-empty");
let def = json!({
"StartAt": "P",
"States": { "P": { "Type": "Parallel", "Branches": [], "End": true } }
});
drive(&state, &arn, def, None);
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
});
}
#[test]
fn map_iterates_pass_state_over_items_path() {
let state = make_state();
let arn = arn_for("map-ok");
let def = json!({
"StartAt": "M",
"States": {
"M": {
"Type": "Map",
"ItemsPath": "$.items",
"Iterator": {
"StartAt": "Item",
"States": { "Item": { "Type": "Pass", "End": true } }
},
"End": true
}
}
});
drive(&state, &arn, def, Some(r#"{"items":[1,2,3]}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
assert_eq!(output, json!([1, 2, 3]));
});
}
#[test]
fn task_unsupported_resource_propagates_failure() {
let state = make_state();
let arn = arn_for("task-unsupported");
let def = json!({
"StartAt": "T",
"States": {
"T": {
"Type": "Task",
"Resource": "arn:aws:states:::nothing:here",
"End": true
}
}
});
drive(&state, &arn, def, None);
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert_eq!(exec.error.as_deref(), Some("States.TaskFailed"));
assert!(exec.cause.as_deref().unwrap().contains("Unsupported"));
});
}
#[test]
fn task_sqs_send_without_delivery_fails() {
let state = make_state();
let arn = arn_for("task-sqs-nodelivery");
let def = json!({
"StartAt": "T",
"States": {
"T": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage",
"Parameters": { "QueueUrl": "http://localhost/123/q", "MessageBody": "hi" },
"End": true
}
}
});
drive(&state, &arn, def, Some("{}"));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert!(exec.cause.as_deref().unwrap().contains("delivery bus"));
});
}
#[test]
fn task_catch_routes_error_into_handler() {
let state = make_state();
let arn = arn_for("task-catch");
let def = json!({
"StartAt": "T",
"States": {
"T": {
"Type": "Task",
"Resource": "arn:aws:states:::nothing:here",
"Catch": [
{ "ErrorEquals": ["States.ALL"], "Next": "Handler", "ResultPath": "$.err" }
],
"End": true
},
"Handler": { "Type": "Pass", "End": true }
}
});
drive(&state, &arn, def, Some(r#"{"orig":"v"}"#));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Succeeded);
let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
assert_eq!(output["orig"], json!("v"));
assert_eq!(output["err"]["Error"], json!("States.TaskFailed"));
});
}
#[test]
fn invalid_definition_json_fails_execution() {
let state = make_state();
let arn = arn_for("bad-json");
create_execution(&state, &arn, None);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
rt.block_on(execute_state_machine(
state.clone(),
arn.clone(),
"not json{".to_string(),
None,
None,
None,
));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
assert!(exec.cause.as_deref().unwrap().contains("Failed to parse"));
});
}
#[test]
fn missing_start_at_fails_execution() {
let state = make_state();
let arn = arn_for("no-startat");
let def = json!({ "States": { "X": { "Type": "Succeed" } } });
drive(&state, &arn, def, None);
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert!(exec.cause.as_deref().unwrap().contains("StartAt"));
});
}
#[test]
fn next_state_not_found_fails_execution() {
let state = make_state();
let arn = arn_for("dangling-next");
let def = json!({
"StartAt": "A",
"States": { "A": { "Type": "Pass", "Next": "DoesNotExist" } }
});
drive(&state, &arn, def, None);
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert!(exec.cause.as_deref().unwrap().contains("not found"));
});
}
#[test]
fn unsupported_state_type_fails_execution() {
let state = make_state();
let arn = arn_for("bad-type");
let def = json!({
"StartAt": "X",
"States": { "X": { "Type": "WatChoo", "End": true } }
});
drive(&state, &arn, def, None);
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert!(exec
.cause
.as_deref()
.unwrap()
.contains("Unsupported state type"));
});
}
#[test]
fn apply_parameters_substitutes_json_path_refs() {
let template = json!({
"literal": "constant",
"ref.$": "$.user.id",
"nested": { "inner.$": "$.user.name" },
"list": [ { "x.$": "$.user.id" } ]
});
let input = json!({ "user": { "id": 42, "name": "zoe" } });
let out = apply_parameters(&template, &input);
assert_eq!(out["literal"], json!("constant"));
assert_eq!(out["ref"], json!(42));
assert_eq!(out["nested"]["inner"], json!("zoe"));
assert_eq!(out["list"][0]["x"], json!(42));
}
#[test]
fn next_state_returns_end_name_or_error() {
match next_state(&json!({ "End": true })) {
NextState::End => {}
_ => panic!("expected End"),
}
match next_state(&json!({ "Next": "A" })) {
NextState::Name(n) => assert_eq!(n, "A"),
_ => panic!("expected Name"),
}
match next_state(&json!({})) {
NextState::Error(_) => {}
_ => panic!("expected Error"),
}
}
#[test]
fn apply_state_catcher_matches_wildcard_and_stashes_error() {
let state_def = json!({
"Catch": [
{ "ErrorEquals": ["States.ALL"], "Next": "H", "ResultPath": "$.caught" }
]
});
let input = json!({ "a": 1 });
let (next, new_input) =
apply_state_catcher(&state_def, &input, "Boom", "it exploded").unwrap();
assert_eq!(next, "H");
assert_eq!(new_input["a"], json!(1));
assert_eq!(new_input["caught"]["Error"], json!("Boom"));
assert_eq!(new_input["caught"]["Cause"], json!("it exploded"));
}
#[test]
fn apply_state_catcher_returns_none_without_match() {
let state_def = json!({
"Catch": [
{ "ErrorEquals": ["Specific.Error"], "Next": "H" }
]
});
let input = json!({});
assert!(apply_state_catcher(&state_def, &input, "Other", "why").is_none());
}
#[test]
fn queue_url_to_arn_parses_account_and_name() {
assert_eq!(
queue_url_to_arn("http://sqs.local:4566/123456789012/my-queue"),
"arn:aws:sqs:us-east-1:123456789012:my-queue"
);
}
#[test]
fn queue_url_to_arn_falls_back_for_unparseable_input() {
assert_eq!(queue_url_to_arn("bad"), "bad");
}
#[test]
fn md5_hex_is_deterministic_and_32_chars() {
let a = md5_hex("hello");
let b = md5_hex("hello");
assert_eq!(a, b);
assert_eq!(a.len(), 32);
assert_ne!(a, md5_hex("world"));
}
#[test]
fn apply_update_expression_sets_direct_and_aliased_attrs() {
let mut item: HashMap<String, Value> = HashMap::new();
item.insert("id".to_string(), json!({"S": "1"}));
let mut attr_values = serde_json::Map::new();
attr_values.insert(":n".to_string(), json!({"S": "Alice"}));
attr_values.insert(":c".to_string(), json!({"N": "5"}));
let mut attr_names = serde_json::Map::new();
attr_names.insert("#name".to_string(), json!("name"));
apply_update_expression(
&mut item,
"SET #name = :n, count = :c",
&attr_values,
&attr_names,
);
assert_eq!(item.get("name").unwrap(), &json!({"S": "Alice"}));
assert_eq!(item.get("count").unwrap(), &json!({"N": "5"}));
assert_eq!(item.get("id").unwrap(), &json!({"S": "1"}));
}
#[test]
fn apply_update_expression_accepts_lowercase_set_keyword() {
let mut item: HashMap<String, Value> = HashMap::new();
let mut attr_values = serde_json::Map::new();
attr_values.insert(":v".to_string(), json!({"S": "x"}));
apply_update_expression(
&mut item,
"set field = :v",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item.get("field").unwrap(), &json!({"S": "x"}));
}
#[test]
fn apply_update_expression_set_arithmetic_increments_counter() {
let mut item: HashMap<String, Value> = HashMap::new();
item.insert("count".to_string(), json!({"N": "10"}));
let mut attr_values = serde_json::Map::new();
attr_values.insert(":inc".to_string(), json!({"N": "3"}));
apply_update_expression(
&mut item,
"SET count = count + :inc",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item.get("count").unwrap(), &json!({"N": "13"}));
}
#[test]
fn apply_update_expression_set_decrement() {
let mut item: HashMap<String, Value> = HashMap::new();
item.insert("count".to_string(), json!({"N": "10"}));
let mut attr_values = serde_json::Map::new();
attr_values.insert(":d".to_string(), json!({"N": "4"}));
apply_update_expression(
&mut item,
"SET count = count - :d",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item.get("count").unwrap(), &json!({"N": "6"}));
}
#[test]
fn apply_update_expression_remove_drops_attributes() {
let mut item: HashMap<String, Value> = HashMap::new();
item.insert("a".to_string(), json!({"S": "x"}));
item.insert("b".to_string(), json!({"S": "y"}));
item.insert("c".to_string(), json!({"S": "z"}));
apply_update_expression(
&mut item,
"REMOVE a, c",
&serde_json::Map::new(),
&serde_json::Map::new(),
);
assert!(!item.contains_key("a"));
assert!(item.contains_key("b"));
assert!(!item.contains_key("c"));
}
#[test]
fn apply_update_expression_add_increments_existing_or_initializes() {
let mut item: HashMap<String, Value> = HashMap::new();
item.insert("count".to_string(), json!({"N": "5"}));
let mut attr_values = serde_json::Map::new();
attr_values.insert(":inc".to_string(), json!({"N": "2"}));
apply_update_expression(
&mut item,
"ADD count :inc",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item.get("count").unwrap(), &json!({"N": "7"}));
let mut item2: HashMap<String, Value> = HashMap::new();
apply_update_expression(
&mut item2,
"ADD count :inc",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item2.get("count").unwrap(), &json!({"N": "2"}));
}
#[test]
fn apply_update_expression_delete_removes_set_elements() {
let mut item: HashMap<String, Value> = HashMap::new();
item.insert("tags".to_string(), json!({"SS": ["a", "b", "c"]}));
let mut attr_values = serde_json::Map::new();
attr_values.insert(":rm".to_string(), json!({"SS": ["b"]}));
apply_update_expression(
&mut item,
"DELETE tags :rm",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item.get("tags").unwrap(), &json!({"SS": ["a", "c"]}));
}
#[test]
fn apply_update_expression_if_not_exists_initializes_only_when_absent() {
let mut item: HashMap<String, Value> = HashMap::new();
let mut attr_values = serde_json::Map::new();
attr_values.insert(":zero".to_string(), json!({"N": "0"}));
apply_update_expression(
&mut item,
"SET count = if_not_exists(count, :zero)",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item.get("count").unwrap(), &json!({"N": "0"}));
let mut item2: HashMap<String, Value> = HashMap::new();
item2.insert("count".to_string(), json!({"N": "42"}));
apply_update_expression(
&mut item2,
"SET count = if_not_exists(count, :zero)",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item2.get("count").unwrap(), &json!({"N": "42"}));
}
#[test]
fn apply_update_expression_combines_clauses() {
let mut item: HashMap<String, Value> = HashMap::new();
item.insert("a".to_string(), json!({"S": "old"}));
item.insert("b".to_string(), json!({"N": "1"}));
item.insert("c".to_string(), json!({"S": "drop"}));
let mut attr_values = serde_json::Map::new();
attr_values.insert(":new".to_string(), json!({"S": "new"}));
attr_values.insert(":one".to_string(), json!({"N": "1"}));
apply_update_expression(
&mut item,
"SET a = :new ADD b :one REMOVE c",
&attr_values,
&serde_json::Map::new(),
);
assert_eq!(item.get("a").unwrap(), &json!({"S": "new"}));
assert_eq!(item.get("b").unwrap(), &json!({"N": "2"}));
assert!(!item.contains_key("c"));
}
#[test]
fn task_dynamodb_get_item_without_state_fails() {
let state = make_state();
let arn = arn_for("ddb-get-nostate");
let def = json!({
"StartAt": "T",
"States": {
"T": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": { "TableName": "t", "Key": { "id": { "S": "1" } } },
"End": true
}
}
});
drive(&state, &arn, def, Some("{}"));
read_exec(&state, &arn, |exec| {
assert_eq!(exec.status, ExecutionStatus::Failed);
assert!(exec.cause.as_deref().unwrap().contains("DynamoDB"));
});
}
#[test]
fn succeed_execution_is_noop_when_already_terminal() {
let state = make_state();
let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already";
create_execution(&state, arn, None);
{
let mut __a = state.write();
let s = __a.default_mut();
s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Failed;
}
succeed_execution(&state, arn, &json!({"x":1}));
let __a = state.read();
let s = __a.default_ref();
let exec = s.executions.get(arn).unwrap();
assert_eq!(exec.status, ExecutionStatus::Failed);
assert!(exec.output.is_none());
}
#[test]
fn fail_execution_is_noop_when_already_terminal() {
let state = make_state();
let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already2";
create_execution(&state, arn, None);
{
let mut __a = state.write();
let s = __a.default_mut();
s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Succeeded;
}
fail_execution(&state, arn, "Oops", "nope");
let __a = state.read();
let s = __a.default_ref();
let exec = s.executions.get(arn).unwrap();
assert_eq!(exec.status, ExecutionStatus::Succeeded);
assert!(exec.error.is_none());
}
#[test]
fn pass_state_result_path_merges_into_input() {
let state = make_state();
let arn = arn_for("result-path");
let def = json!({
"StartAt": "P",
"States": {
"P": {"Type": "Pass", "Result": {"x": 2}, "ResultPath": "$.data", "End": true}
}
});
drive(&state, &arn, def, Some(r#"{"a":1}"#));
let output = read_exec(&state, &arn, |e| e.output.clone().unwrap_or_default());
let v: Value = serde_json::from_str(&output).unwrap();
assert_eq!(v["a"], 1);
assert_eq!(v["data"]["x"], 2);
}
#[test]
fn choice_string_greater_than_equals() {
let state = make_state();
let arn = arn_for("choice-sgte");
let def = json!({
"StartAt": "C",
"States": {
"C": {
"Type": "Choice",
"Choices": [
{"Variable": "$.val", "StringGreaterThanEquals": "apple", "Next": "End"}
],
"Default": "Fail"
},
"End": {"Type": "Pass", "End": true},
"Fail": {"Type": "Fail"}
}
});
drive(&state, &arn, def, Some(r#"{"val":"banana"}"#));
let status = read_exec(&state, &arn, |e| e.status);
assert_eq!(status, ExecutionStatus::Succeeded);
}
#[test]
fn choice_is_present_and_is_null() {
let state = make_state();
let arn = arn_for("choice-ispres");
let def = json!({
"StartAt": "C",
"States": {
"C": {
"Type": "Choice",
"Choices": [
{"Variable": "$.foo", "IsPresent": true, "Next": "End"}
],
"Default": "Fail"
},
"End": {"Type": "Pass", "End": true},
"Fail": {"Type": "Fail"}
}
});
drive(&state, &arn, def, Some(r#"{"foo":null}"#));
assert_eq!(
read_exec(&state, &arn, |e| e.status),
ExecutionStatus::Succeeded
);
}
#[test]
fn choice_or_short_circuits() {
let state = make_state();
let arn = arn_for("choice-or");
let def = json!({
"StartAt": "C",
"States": {
"C": {
"Type": "Choice",
"Choices": [{
"Or": [
{"Variable": "$.x", "NumericEquals": 99},
{"Variable": "$.y", "StringEquals": "b"}
],
"Next": "End"
}],
"Default": "Fail"
},
"End": {"Type": "Pass", "End": true},
"Fail": {"Type": "Fail"}
}
});
drive(&state, &arn, def, Some(r#"{"x":1,"y":"b"}"#));
assert_eq!(
read_exec(&state, &arn, |e| e.status),
ExecutionStatus::Succeeded
);
}
#[test]
fn choice_not_negates() {
let state = make_state();
let arn = arn_for("choice-not");
let def = json!({
"StartAt": "C",
"States": {
"C": {
"Type": "Choice",
"Choices": [{
"Not": {"Variable": "$.x", "NumericEquals": 99},
"Next": "End"
}],
"Default": "Fail"
},
"End": {"Type": "Pass", "End": true},
"Fail": {"Type": "Fail"}
}
});
drive(&state, &arn, def, Some(r#"{"x":1}"#));
assert_eq!(
read_exec(&state, &arn, |e| e.status),
ExecutionStatus::Succeeded
);
}
#[test]
fn choice_boolean_equals() {
let state = make_state();
let arn = arn_for("choice-bool");
let def = json!({
"StartAt": "C",
"States": {
"C": {
"Type": "Choice",
"Choices": [
{"Variable": "$.ok", "BooleanEquals": true, "Next": "End"}
],
"Default": "Fail"
},
"End": {"Type": "Pass", "End": true},
"Fail": {"Type": "Fail"}
}
});
drive(&state, &arn, def, Some(r#"{"ok":true}"#));
assert_eq!(
read_exec(&state, &arn, |e| e.status),
ExecutionStatus::Succeeded
);
}
#[test]
fn wait_seconds_path_uses_input_value() {
let state = make_state();
let arn = arn_for("wait-sp");
let def = json!({
"StartAt": "W",
"States": {
"W": {"Type": "Wait", "SecondsPath": "$.wait", "End": true}
}
});
drive(&state, &arn, def, Some(r#"{"wait":0}"#));
assert_eq!(
read_exec(&state, &arn, |e| e.status),
ExecutionStatus::Succeeded
);
}
#[test]
fn map_state_empty_array_succeeds() {
let state = make_state();
let arn = arn_for("map-empty");
let def = json!({
"StartAt": "M",
"States": {
"M": {
"Type": "Map",
"ItemsPath": "$.items",
"ItemProcessor": {
"StartAt": "P",
"States": {
"P": {"Type": "Pass", "End": true}
}
},
"End": true
}
}
});
drive(&state, &arn, def, Some(r#"{"items":[]}"#));
assert_eq!(
read_exec(&state, &arn, |e| e.status),
ExecutionStatus::Succeeded
);
}
#[test]
fn fail_state_with_explicit_error_and_cause() {
let state = make_state();
let arn = arn_for("fail-fields");
create_execution(&state, &arn, None);
let def = json!({
"StartAt": "F",
"States": {
"F": {"Type": "Fail", "Error": "MyError", "Cause": "my cause"}
}
});
drive(&state, &arn, def, None);
let status = read_exec(&state, &arn, |e| e.status);
assert_eq!(status, ExecutionStatus::Failed);
let err = read_exec(&state, &arn, |e| e.error.clone().unwrap_or_default());
assert_eq!(err, "MyError");
}
}