Skip to main content

fakecloud_stepfunctions/
interpreter.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::state::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
16
17/// Execute a state machine definition with the given input.
18/// Updates the execution record in shared state as it progresses.
19pub async fn execute_state_machine(
20    state: SharedStepFunctionsState,
21    execution_arn: String,
22    definition: String,
23    input: Option<String>,
24    delivery: Option<Arc<DeliveryBus>>,
25    dynamodb_state: Option<SharedDynamoDbState>,
26) {
27    let def: Value = match serde_json::from_str(&definition) {
28        Ok(v) => v,
29        Err(e) => {
30            fail_execution(
31                &state,
32                &execution_arn,
33                "States.Runtime",
34                &format!("Failed to parse definition: {e}"),
35            );
36            return;
37        }
38    };
39
40    let raw_input: Value = input
41        .as_deref()
42        .and_then(|s| serde_json::from_str(s).ok())
43        .unwrap_or(json!({}));
44
45    // Record ExecutionStarted event
46    add_event(
47        &state,
48        &execution_arn,
49        "ExecutionStarted",
50        0,
51        json!({
52            "input": serde_json::to_string(&raw_input).unwrap_or_default(),
53            "roleArn": "arn:aws:iam::123456789012:role/execution-role"
54        }),
55    );
56
57    // Run the state machine inside an inner tokio::spawn so that any panic
58    // bubbles up as a JoinError instead of tearing down the caller. Without
59    // this the panic propagates through the outer spawn in `start_execution`
60    // which leaves the execution stuck in Running and leaks the panic to
61    // tokio's default hook.
62    let def_owned = def;
63    let state_clone = state.clone();
64    let execution_arn_clone = execution_arn.clone();
65    let delivery_clone = delivery.clone();
66    let dynamodb_state_clone = dynamodb_state.clone();
67    let handle = tokio::spawn(async move {
68        run_states(
69            &def_owned,
70            raw_input,
71            &delivery_clone,
72            &dynamodb_state_clone,
73            &state_clone,
74            &execution_arn_clone,
75        )
76        .await
77    });
78
79    match handle.await {
80        Ok(Ok(output)) => {
81            succeed_execution(&state, &execution_arn, &output);
82        }
83        Ok(Err((error, cause))) => {
84            fail_execution(&state, &execution_arn, &error, &cause);
85        }
86        Err(join_err) => {
87            let msg = if join_err.is_panic() {
88                let payload = join_err.into_panic();
89                if let Some(s) = payload.downcast_ref::<String>() {
90                    s.clone()
91                } else if let Some(s) = payload.downcast_ref::<&'static str>() {
92                    (*s).to_string()
93                } else {
94                    "execution task panicked".to_string()
95                }
96            } else {
97                format!("execution task cancelled: {join_err}")
98            };
99            tracing::error!(
100                execution_arn = %execution_arn,
101                panic = %msg,
102                "Step Functions execution panicked"
103            );
104            fail_execution(&state, &execution_arn, "States.Runtime", &msg);
105        }
106    }
107}
108
109type StatesResult<'a> = std::pin::Pin<
110    Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
111>;
112
113/// Core execution loop: runs through states in a definition and returns the output.
114/// Used by the top-level executor, Parallel branches, and Map iterations.
115fn run_states<'a>(
116    def: &'a Value,
117    input: Value,
118    delivery: &'a Option<Arc<DeliveryBus>>,
119    dynamodb_state: &'a Option<SharedDynamoDbState>,
120    shared_state: &'a SharedStepFunctionsState,
121    execution_arn: &'a str,
122) -> StatesResult<'a> {
123    Box::pin(async move {
124        let start_at = def["StartAt"]
125            .as_str()
126            .ok_or_else(|| {
127                (
128                    "States.Runtime".to_string(),
129                    "Missing StartAt in definition".to_string(),
130                )
131            })?
132            .to_string();
133
134        let states = def.get("States").ok_or_else(|| {
135            (
136                "States.Runtime".to_string(),
137                "Missing States in definition".to_string(),
138            )
139        })?;
140
141        let mut current_state = start_at;
142        let mut effective_input = input;
143        let mut iteration = 0;
144        let max_iterations = 500;
145
146        loop {
147            iteration += 1;
148            if iteration > max_iterations {
149                return Err((
150                    "States.Runtime".to_string(),
151                    "Maximum number of state transitions exceeded".to_string(),
152                ));
153            }
154
155            let state_def = states.get(&current_state).cloned().ok_or_else(|| {
156                (
157                    "States.Runtime".to_string(),
158                    format!("State '{current_state}' not found in definition"),
159                )
160            })?;
161
162            let state_type = state_def["Type"]
163                .as_str()
164                .ok_or_else(|| {
165                    (
166                        "States.Runtime".to_string(),
167                        format!("State '{current_state}' missing Type field"),
168                    )
169                })?
170                .to_string();
171
172            debug!(
173                execution_arn = %execution_arn,
174                state = %current_state,
175                state_type = %state_type,
176                "Executing state"
177            );
178
179            let advance = match state_type.as_str() {
180                "Pass" => run_pass_state(
181                    &current_state,
182                    &state_def,
183                    effective_input,
184                    shared_state,
185                    execution_arn,
186                ),
187                "Succeed" => run_succeed_state(
188                    &current_state,
189                    &state_def,
190                    effective_input,
191                    shared_state,
192                    execution_arn,
193                ),
194                "Fail" => run_fail_state(
195                    &current_state,
196                    &state_def,
197                    effective_input,
198                    shared_state,
199                    execution_arn,
200                ),
201                "Choice" => run_choice_state(
202                    &current_state,
203                    &state_def,
204                    effective_input,
205                    shared_state,
206                    execution_arn,
207                ),
208                "Wait" => {
209                    run_wait_state(
210                        &current_state,
211                        &state_def,
212                        effective_input,
213                        shared_state,
214                        execution_arn,
215                    )
216                    .await
217                }
218                "Task" => {
219                    run_task_state(
220                        &current_state,
221                        &state_def,
222                        effective_input,
223                        delivery,
224                        dynamodb_state,
225                        shared_state,
226                        execution_arn,
227                    )
228                    .await
229                }
230                "Parallel" => {
231                    run_parallel_state(
232                        &current_state,
233                        &state_def,
234                        effective_input,
235                        delivery,
236                        dynamodb_state,
237                        shared_state,
238                        execution_arn,
239                    )
240                    .await
241                }
242                "Map" => {
243                    run_map_state(
244                        &current_state,
245                        &state_def,
246                        effective_input,
247                        delivery,
248                        dynamodb_state,
249                        shared_state,
250                        execution_arn,
251                    )
252                    .await
253                }
254                other => Advance::Fail(
255                    "States.Runtime".to_string(),
256                    format!("Unsupported state type: '{other}'"),
257                ),
258            };
259
260            match advance {
261                Advance::Next(next, new_input) => {
262                    effective_input = new_input;
263                    current_state = next;
264                }
265                Advance::End(output) => return Ok(output),
266                Advance::Fail(error, cause) => return Err((error, cause)),
267            }
268        }
269    })
270}
271
272/// Result of executing a single state in the state machine.
273enum Advance {
274    /// Continue to the given state with the given input.
275    Next(String, Value),
276    /// Terminate the state machine with the given output.
277    End(Value),
278    /// Fail the state machine with the given error and cause.
279    Fail(String, String),
280}
281
282fn advance_from_next(state_def: &Value, input: Value) -> Advance {
283    match next_state(state_def) {
284        NextState::Name(next) => Advance::Next(next, input),
285        NextState::End => Advance::End(input),
286        NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
287    }
288}
289
290fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
291    match apply_state_catcher(state_def, input, &error, &cause) {
292        Some((next, new_input)) => Advance::Next(next, new_input),
293        None => Advance::Fail(error, cause),
294    }
295}
296
297fn run_pass_state(
298    name: &str,
299    state_def: &Value,
300    input: Value,
301    shared_state: &SharedStepFunctionsState,
302    execution_arn: &str,
303) -> Advance {
304    let entered_event_id = add_event(
305        shared_state,
306        execution_arn,
307        "PassStateEntered",
308        0,
309        json!({
310            "name": name,
311            "input": serde_json::to_string(&input).unwrap_or_default(),
312        }),
313    );
314
315    let result = execute_pass_state(state_def, &input);
316
317    add_event(
318        shared_state,
319        execution_arn,
320        "PassStateExited",
321        entered_event_id,
322        json!({
323            "name": name,
324            "output": serde_json::to_string(&result).unwrap_or_default(),
325        }),
326    );
327
328    advance_from_next(state_def, result)
329}
330
331fn run_succeed_state(
332    name: &str,
333    state_def: &Value,
334    input: Value,
335    shared_state: &SharedStepFunctionsState,
336    execution_arn: &str,
337) -> Advance {
338    add_event(
339        shared_state,
340        execution_arn,
341        "SucceedStateEntered",
342        0,
343        json!({
344            "name": name,
345            "input": serde_json::to_string(&input).unwrap_or_default(),
346        }),
347    );
348
349    let input_path = state_def["InputPath"].as_str();
350    let output_path = state_def["OutputPath"].as_str();
351
352    let processed = if input_path == Some("null") {
353        json!({})
354    } else {
355        apply_input_path(&input, input_path)
356    };
357
358    let output = if output_path == Some("null") {
359        json!({})
360    } else {
361        apply_output_path(&processed, output_path)
362    };
363
364    add_event(
365        shared_state,
366        execution_arn,
367        "SucceedStateExited",
368        0,
369        json!({
370            "name": name,
371            "output": serde_json::to_string(&output).unwrap_or_default(),
372        }),
373    );
374
375    Advance::End(output)
376}
377
378fn run_fail_state(
379    name: &str,
380    state_def: &Value,
381    input: Value,
382    shared_state: &SharedStepFunctionsState,
383    execution_arn: &str,
384) -> Advance {
385    let error = state_def["Error"]
386        .as_str()
387        .unwrap_or("States.Fail")
388        .to_string();
389    let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
390
391    add_event(
392        shared_state,
393        execution_arn,
394        "FailStateEntered",
395        0,
396        json!({
397            "name": name,
398            "input": serde_json::to_string(&input).unwrap_or_default(),
399        }),
400    );
401
402    Advance::Fail(error, cause)
403}
404
405fn run_choice_state(
406    name: &str,
407    state_def: &Value,
408    input: Value,
409    shared_state: &SharedStepFunctionsState,
410    execution_arn: &str,
411) -> Advance {
412    let entered_event_id = add_event(
413        shared_state,
414        execution_arn,
415        "ChoiceStateEntered",
416        0,
417        json!({
418            "name": name,
419            "input": serde_json::to_string(&input).unwrap_or_default(),
420        }),
421    );
422
423    let input_path = state_def["InputPath"].as_str();
424    let processed_input = if input_path == Some("null") {
425        json!({})
426    } else {
427        apply_input_path(&input, input_path)
428    };
429
430    match evaluate_choice(state_def, &processed_input) {
431        Some(next) => {
432            add_event(
433                shared_state,
434                execution_arn,
435                "ChoiceStateExited",
436                entered_event_id,
437                json!({
438                    "name": name,
439                    "output": serde_json::to_string(&input).unwrap_or_default(),
440                }),
441            );
442            Advance::Next(next, input)
443        }
444        None => Advance::Fail(
445            "States.NoChoiceMatched".to_string(),
446            format!("No choice rule matched and no Default in state '{name}'"),
447        ),
448    }
449}
450
451async fn run_wait_state(
452    name: &str,
453    state_def: &Value,
454    input: Value,
455    shared_state: &SharedStepFunctionsState,
456    execution_arn: &str,
457) -> Advance {
458    let entered_event_id = add_event(
459        shared_state,
460        execution_arn,
461        "WaitStateEntered",
462        0,
463        json!({
464            "name": name,
465            "input": serde_json::to_string(&input).unwrap_or_default(),
466        }),
467    );
468
469    execute_wait_state(state_def, &input).await;
470
471    add_event(
472        shared_state,
473        execution_arn,
474        "WaitStateExited",
475        entered_event_id,
476        json!({
477            "name": name,
478            "output": serde_json::to_string(&input).unwrap_or_default(),
479        }),
480    );
481
482    advance_from_next(state_def, input)
483}
484
485#[allow(clippy::too_many_arguments)]
486async fn run_task_state(
487    name: &str,
488    state_def: &Value,
489    input: Value,
490    delivery: &Option<Arc<DeliveryBus>>,
491    dynamodb_state: &Option<SharedDynamoDbState>,
492    shared_state: &SharedStepFunctionsState,
493    execution_arn: &str,
494) -> Advance {
495    let entered_event_id = add_event(
496        shared_state,
497        execution_arn,
498        "TaskStateEntered",
499        0,
500        json!({
501            "name": name,
502            "input": serde_json::to_string(&input).unwrap_or_default(),
503        }),
504    );
505
506    let result = execute_task_state(
507        state_def,
508        &input,
509        delivery,
510        dynamodb_state,
511        shared_state,
512        execution_arn,
513        entered_event_id,
514    )
515    .await;
516
517    match result {
518        Ok(output) => {
519            add_event(
520                shared_state,
521                execution_arn,
522                "TaskStateExited",
523                entered_event_id,
524                json!({
525                    "name": name,
526                    "output": serde_json::to_string(&output).unwrap_or_default(),
527                }),
528            );
529            advance_from_next(state_def, output)
530        }
531        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
532    }
533}
534
535#[allow(clippy::too_many_arguments)]
536async fn run_parallel_state(
537    name: &str,
538    state_def: &Value,
539    input: Value,
540    delivery: &Option<Arc<DeliveryBus>>,
541    dynamodb_state: &Option<SharedDynamoDbState>,
542    shared_state: &SharedStepFunctionsState,
543    execution_arn: &str,
544) -> Advance {
545    let entered_event_id = add_event(
546        shared_state,
547        execution_arn,
548        "ParallelStateEntered",
549        0,
550        json!({
551            "name": name,
552            "input": serde_json::to_string(&input).unwrap_or_default(),
553        }),
554    );
555
556    let result = execute_parallel_state(
557        state_def,
558        &input,
559        delivery,
560        dynamodb_state,
561        shared_state,
562        execution_arn,
563    )
564    .await;
565
566    match result {
567        Ok(output) => {
568            add_event(
569                shared_state,
570                execution_arn,
571                "ParallelStateExited",
572                entered_event_id,
573                json!({
574                    "name": name,
575                    "output": serde_json::to_string(&output).unwrap_or_default(),
576                }),
577            );
578            advance_from_next(state_def, output)
579        }
580        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
581    }
582}
583
584#[allow(clippy::too_many_arguments)]
585async fn run_map_state(
586    name: &str,
587    state_def: &Value,
588    input: Value,
589    delivery: &Option<Arc<DeliveryBus>>,
590    dynamodb_state: &Option<SharedDynamoDbState>,
591    shared_state: &SharedStepFunctionsState,
592    execution_arn: &str,
593) -> Advance {
594    let entered_event_id = add_event(
595        shared_state,
596        execution_arn,
597        "MapStateEntered",
598        0,
599        json!({
600            "name": name,
601            "input": serde_json::to_string(&input).unwrap_or_default(),
602        }),
603    );
604
605    let result = execute_map_state(
606        state_def,
607        &input,
608        delivery,
609        dynamodb_state,
610        shared_state,
611        execution_arn,
612    )
613    .await;
614
615    match result {
616        Ok(output) => {
617            add_event(
618                shared_state,
619                execution_arn,
620                "MapStateExited",
621                entered_event_id,
622                json!({
623                    "name": name,
624                    "output": serde_json::to_string(&output).unwrap_or_default(),
625                }),
626            );
627            advance_from_next(state_def, output)
628        }
629        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
630    }
631}
632
633/// Execute a Wait state: pause execution for a specified duration or until a timestamp.
634async fn execute_wait_state(state_def: &Value, input: &Value) {
635    if let Some(seconds) = state_def["Seconds"].as_u64() {
636        tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
637        return;
638    }
639
640    if let Some(path) = state_def["SecondsPath"].as_str() {
641        let val = crate::io_processing::resolve_path(input, path);
642        if let Some(seconds) = val.as_u64() {
643            tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
644        }
645        return;
646    }
647
648    if let Some(ts_str) = state_def["Timestamp"].as_str() {
649        if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
650            let now = Utc::now();
651            let target_utc = target.with_timezone(&chrono::Utc);
652            if target_utc > now {
653                let duration = (target_utc - now).to_std().unwrap_or_default();
654                tokio::time::sleep(duration).await;
655            }
656        }
657        return;
658    }
659
660    if let Some(path) = state_def["TimestampPath"].as_str() {
661        let val = crate::io_processing::resolve_path(input, path);
662        if let Some(ts_str) = val.as_str() {
663            if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
664                let now = Utc::now();
665                let target_utc = target.with_timezone(&chrono::Utc);
666                if target_utc > now {
667                    let duration = (target_utc - now).to_std().unwrap_or_default();
668                    tokio::time::sleep(duration).await;
669                }
670            }
671        }
672        return;
673    }
674
675    warn!(
676        "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
677    );
678}
679
680/// Execute a Pass state: apply InputPath, use Result if present, apply ResultPath and OutputPath.
681fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
682    let input_path = state_def["InputPath"].as_str();
683    let result_path = state_def["ResultPath"].as_str();
684    let output_path = state_def["OutputPath"].as_str();
685
686    let effective_input = if input_path == Some("null") {
687        json!({})
688    } else {
689        apply_input_path(input, input_path)
690    };
691
692    let result = if let Some(r) = state_def.get("Result") {
693        r.clone()
694    } else {
695        effective_input.clone()
696    };
697
698    let after_result = if result_path == Some("null") {
699        input.clone()
700    } else {
701        apply_result_path(input, &result, result_path)
702    };
703
704    if output_path == Some("null") {
705        json!({})
706    } else {
707        apply_output_path(&after_result, output_path)
708    }
709}
710
711/// Execute a Task state: invoke the resource (Lambda, SQS, SNS, EventBridge, DynamoDB),
712/// apply I/O processing, handle Retry.
713async fn execute_task_state(
714    state_def: &Value,
715    input: &Value,
716    delivery: &Option<Arc<DeliveryBus>>,
717    dynamodb_state: &Option<SharedDynamoDbState>,
718    shared_state: &SharedStepFunctionsState,
719    execution_arn: &str,
720    entered_event_id: i64,
721) -> Result<Value, (String, String)> {
722    let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
723
724    let input_path = state_def["InputPath"].as_str();
725    let result_path = state_def["ResultPath"].as_str();
726    let output_path = state_def["OutputPath"].as_str();
727
728    let effective_input = if input_path == Some("null") {
729        json!({})
730    } else {
731        apply_input_path(input, input_path)
732    };
733
734    let task_input = if let Some(params) = state_def.get("Parameters") {
735        apply_parameters(params, &effective_input)
736    } else {
737        effective_input
738    };
739
740    let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
741    let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
742    let mut attempt = 0u32;
743
744    loop {
745        add_event(
746            shared_state,
747            execution_arn,
748            "TaskScheduled",
749            entered_event_id,
750            json!({
751                "resource": resource,
752                "region": "us-east-1",
753                "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
754            }),
755        );
756
757        add_event(
758            shared_state,
759            execution_arn,
760            "TaskStarted",
761            entered_event_id,
762            json!({ "resource": resource }),
763        );
764
765        let invoke_result = invoke_resource(
766            &resource,
767            &task_input,
768            delivery,
769            dynamodb_state,
770            timeout_seconds,
771        )
772        .await;
773
774        match invoke_result {
775            Ok(result) => {
776                add_event(
777                    shared_state,
778                    execution_arn,
779                    "TaskSucceeded",
780                    entered_event_id,
781                    json!({
782                        "resource": resource,
783                        "output": serde_json::to_string(&result).unwrap_or_default(),
784                    }),
785                );
786
787                let selected = if let Some(selector) = state_def.get("ResultSelector") {
788                    apply_parameters(selector, &result)
789                } else {
790                    result
791                };
792
793                let after_result = if result_path == Some("null") {
794                    input.clone()
795                } else {
796                    apply_result_path(input, &selected, result_path)
797                };
798
799                let output = if output_path == Some("null") {
800                    json!({})
801                } else {
802                    apply_output_path(&after_result, output_path)
803                };
804
805                return Ok(output);
806            }
807            Err((error, cause)) => {
808                add_event(
809                    shared_state,
810                    execution_arn,
811                    "TaskFailed",
812                    entered_event_id,
813                    json!({ "error": error, "cause": cause }),
814                );
815
816                if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
817                    attempt += 1;
818                    let actual_delay = delay_ms.min(5000);
819                    tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
820                    continue;
821                }
822
823                return Err((error, cause));
824            }
825        }
826    }
827}
828
829/// Execute a Parallel state: run all branches concurrently, collect results into an array.
830async fn execute_parallel_state(
831    state_def: &Value,
832    input: &Value,
833    delivery: &Option<Arc<DeliveryBus>>,
834    dynamodb_state: &Option<SharedDynamoDbState>,
835    shared_state: &SharedStepFunctionsState,
836    execution_arn: &str,
837) -> Result<Value, (String, String)> {
838    let input_path = state_def["InputPath"].as_str();
839    let result_path = state_def["ResultPath"].as_str();
840    let output_path = state_def["OutputPath"].as_str();
841
842    let effective_input = if input_path == Some("null") {
843        json!({})
844    } else {
845        apply_input_path(input, input_path)
846    };
847
848    let branches = state_def["Branches"]
849        .as_array()
850        .cloned()
851        .unwrap_or_default();
852
853    if branches.is_empty() {
854        return Err((
855            "States.Runtime".to_string(),
856            "Parallel state has no Branches".to_string(),
857        ));
858    }
859
860    // Spawn all branches concurrently
861    let mut handles = Vec::new();
862    for branch_def in &branches {
863        let branch = branch_def.clone();
864        let branch_input = effective_input.clone();
865        let delivery = delivery.clone();
866        let ddb = dynamodb_state.clone();
867        let state = shared_state.clone();
868        let arn = execution_arn.to_string();
869
870        handles.push(tokio::spawn(async move {
871            run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
872        }));
873    }
874
875    // Collect results in order
876    let mut results = Vec::with_capacity(handles.len());
877    for handle in handles {
878        let result = handle.await.map_err(|e| {
879            (
880                "States.Runtime".to_string(),
881                format!("Branch execution panicked: {e}"),
882            )
883        })??;
884        results.push(result);
885    }
886
887    let branch_output = Value::Array(results);
888
889    // Apply ResultSelector if present
890    let selected = if let Some(selector) = state_def.get("ResultSelector") {
891        apply_parameters(selector, &branch_output)
892    } else {
893        branch_output
894    };
895
896    // Apply ResultPath
897    let after_result = if result_path == Some("null") {
898        input.clone()
899    } else {
900        apply_result_path(input, &selected, result_path)
901    };
902
903    // Apply OutputPath
904    let output = if output_path == Some("null") {
905        json!({})
906    } else {
907        apply_output_path(&after_result, output_path)
908    };
909
910    Ok(output)
911}
912
913/// Execute a Map state: iterate over an array and run a sub-state machine per item.
914async fn execute_map_state(
915    state_def: &Value,
916    input: &Value,
917    delivery: &Option<Arc<DeliveryBus>>,
918    dynamodb_state: &Option<SharedDynamoDbState>,
919    shared_state: &SharedStepFunctionsState,
920    execution_arn: &str,
921) -> Result<Value, (String, String)> {
922    let input_path = state_def["InputPath"].as_str();
923    let result_path = state_def["ResultPath"].as_str();
924    let output_path = state_def["OutputPath"].as_str();
925
926    let effective_input = if input_path == Some("null") {
927        json!({})
928    } else {
929        apply_input_path(input, input_path)
930    };
931
932    // Get the items to iterate over
933    let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
934    let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
935    let items = items_value.as_array().cloned().unwrap_or_default();
936
937    // Get the iterator definition (ItemProcessor or Iterator for backwards compat)
938    let iterator_def = state_def
939        .get("ItemProcessor")
940        .or_else(|| state_def.get("Iterator"))
941        .cloned()
942        .ok_or_else(|| {
943            (
944                "States.Runtime".to_string(),
945                "Map state has no ItemProcessor or Iterator".to_string(),
946            )
947        })?;
948
949    let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
950    let effective_concurrency = if max_concurrency == 0 {
951        40
952    } else {
953        max_concurrency as usize
954    };
955
956    let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
957
958    // Process all items
959    let mut handles = Vec::new();
960    for (index, item) in items.into_iter().enumerate() {
961        let iter_def = iterator_def.clone();
962        let delivery = delivery.clone();
963        let ddb = dynamodb_state.clone();
964        let state = shared_state.clone();
965        let arn = execution_arn.to_string();
966        let sem = semaphore.clone();
967
968        // Apply ItemSelector if present
969        let item_input = if let Some(selector) = state_def.get("ItemSelector") {
970            let mut ctx = serde_json::Map::new();
971            ctx.insert("value".to_string(), item.clone());
972            ctx.insert("index".to_string(), json!(index));
973            apply_parameters(selector, &Value::Object(ctx))
974        } else {
975            item
976        };
977
978        add_event(
979            shared_state,
980            execution_arn,
981            "MapIterationStarted",
982            0,
983            json!({ "index": index }),
984        );
985
986        handles.push(tokio::spawn(async move {
987            let _permit = sem
988                .acquire()
989                .await
990                .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
991            let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
992            Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
993        }));
994    }
995
996    // Collect results in order
997    let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
998    for handle in handles {
999        let (index, result) = handle.await.map_err(|e| {
1000            (
1001                "States.Runtime".to_string(),
1002                format!("Map iteration panicked: {e}"),
1003            )
1004        })??;
1005
1006        match result {
1007            Ok(output) => {
1008                add_event(
1009                    shared_state,
1010                    execution_arn,
1011                    "MapIterationSucceeded",
1012                    0,
1013                    json!({ "index": index }),
1014                );
1015                results.push((index, output));
1016            }
1017            Err((error, cause)) => {
1018                add_event(
1019                    shared_state,
1020                    execution_arn,
1021                    "MapIterationFailed",
1022                    0,
1023                    json!({ "index": index, "error": error }),
1024                );
1025                return Err((error, cause));
1026            }
1027        }
1028    }
1029
1030    // Sort by index to maintain order
1031    results.sort_by_key(|(i, _)| *i);
1032    let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
1033
1034    // Apply ResultSelector if present
1035    let selected = if let Some(selector) = state_def.get("ResultSelector") {
1036        apply_parameters(selector, &map_output)
1037    } else {
1038        map_output
1039    };
1040
1041    // Apply ResultPath
1042    let after_result = if result_path == Some("null") {
1043        input.clone()
1044    } else {
1045        apply_result_path(input, &selected, result_path)
1046    };
1047
1048    // Apply OutputPath
1049    let output = if output_path == Some("null") {
1050        json!({})
1051    } else {
1052        apply_output_path(&after_result, output_path)
1053    };
1054
1055    Ok(output)
1056}
1057
1058/// Invoke a resource (Lambda function or SDK integration).
1059async fn invoke_resource(
1060    resource: &str,
1061    input: &Value,
1062    delivery: &Option<Arc<DeliveryBus>>,
1063    dynamodb_state: &Option<SharedDynamoDbState>,
1064    timeout_seconds: Option<u64>,
1065) -> Result<Value, (String, String)> {
1066    // Direct Lambda ARN: arn:aws:lambda:<region>:<account>:function:<name>
1067    if resource.contains(":lambda:") && resource.contains(":function:") {
1068        return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1069    }
1070
1071    // SDK integration patterns: arn:aws:states:::<service>:<action>
1072    if resource.starts_with("arn:aws:states:::lambda:invoke") {
1073        let function_name = input["FunctionName"].as_str().unwrap_or("");
1074        let payload = if let Some(p) = input.get("Payload") {
1075            p.clone()
1076        } else {
1077            input.clone()
1078        };
1079        return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1080    }
1081
1082    if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1083        return invoke_sqs_send_message(input, delivery);
1084    }
1085
1086    if resource.starts_with("arn:aws:states:::sns:publish") {
1087        return invoke_sns_publish(input, delivery);
1088    }
1089
1090    if resource.starts_with("arn:aws:states:::events:putEvents") {
1091        return invoke_eventbridge_put_events(input, delivery);
1092    }
1093
1094    if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1095        return invoke_dynamodb_get_item(input, dynamodb_state);
1096    }
1097
1098    if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1099        return invoke_dynamodb_put_item(input, dynamodb_state);
1100    }
1101
1102    if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1103        return invoke_dynamodb_delete_item(input, dynamodb_state);
1104    }
1105
1106    if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1107        return invoke_dynamodb_update_item(input, dynamodb_state);
1108    }
1109
1110    Err((
1111        "States.TaskFailed".to_string(),
1112        format!("Unsupported resource: {resource}"),
1113    ))
1114}
1115
1116/// Send a message to an SQS queue via DeliveryBus.
1117fn invoke_sqs_send_message(
1118    input: &Value,
1119    delivery: &Option<Arc<DeliveryBus>>,
1120) -> Result<Value, (String, String)> {
1121    let delivery = delivery.as_ref().ok_or_else(|| {
1122        (
1123            "States.TaskFailed".to_string(),
1124            "No delivery bus configured for SQS".to_string(),
1125        )
1126    })?;
1127
1128    let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1129        (
1130            "States.TaskFailed".to_string(),
1131            "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1132        )
1133    })?;
1134
1135    let message_body = input["MessageBody"]
1136        .as_str()
1137        .map(|s| s.to_string())
1138        .unwrap_or_else(|| {
1139            // If MessageBody is not a string, serialize the value
1140            serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1141        });
1142
1143    // Convert QueueUrl to ARN format for the delivery bus
1144    // QueueUrl format: http://.../<account>/<queue-name>
1145    // ARN format: arn:aws:sqs:<region>:<account>:<queue-name>
1146    let queue_arn = queue_url_to_arn(queue_url);
1147
1148    delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1149
1150    Ok(json!({
1151        "MessageId": uuid::Uuid::new_v4().to_string(),
1152        "MD5OfMessageBody": md5_hex(&message_body),
1153    }))
1154}
1155
1156/// Publish a message to an SNS topic via DeliveryBus.
1157fn invoke_sns_publish(
1158    input: &Value,
1159    delivery: &Option<Arc<DeliveryBus>>,
1160) -> Result<Value, (String, String)> {
1161    let delivery = delivery.as_ref().ok_or_else(|| {
1162        (
1163            "States.TaskFailed".to_string(),
1164            "No delivery bus configured for SNS".to_string(),
1165        )
1166    })?;
1167
1168    let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1169        (
1170            "States.TaskFailed".to_string(),
1171            "Missing TopicArn in SNS publish parameters".to_string(),
1172        )
1173    })?;
1174
1175    let message = input["Message"]
1176        .as_str()
1177        .map(|s| s.to_string())
1178        .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1179
1180    let subject = input["Subject"].as_str();
1181
1182    delivery.publish_to_sns(topic_arn, &message, subject);
1183
1184    Ok(json!({
1185        "MessageId": uuid::Uuid::new_v4().to_string(),
1186    }))
1187}
1188
1189/// Put events onto an EventBridge bus via DeliveryBus.
1190fn invoke_eventbridge_put_events(
1191    input: &Value,
1192    delivery: &Option<Arc<DeliveryBus>>,
1193) -> Result<Value, (String, String)> {
1194    let delivery = delivery.as_ref().ok_or_else(|| {
1195        (
1196            "States.TaskFailed".to_string(),
1197            "No delivery bus configured for EventBridge".to_string(),
1198        )
1199    })?;
1200
1201    let entries = input["Entries"]
1202        .as_array()
1203        .ok_or_else(|| {
1204            (
1205                "States.TaskFailed".to_string(),
1206                "Missing Entries in EventBridge putEvents parameters".to_string(),
1207            )
1208        })?
1209        .clone();
1210
1211    let mut event_ids = Vec::new();
1212    for entry in &entries {
1213        let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1214        let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1215        let detail = entry["Detail"]
1216            .as_str()
1217            .map(|s| s.to_string())
1218            .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1219        let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1220
1221        delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1222        event_ids.push(uuid::Uuid::new_v4().to_string());
1223    }
1224
1225    Ok(json!({
1226        "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1227        "FailedEntryCount": 0,
1228    }))
1229}
1230
1231/// Get an item from DynamoDB via direct state access.
1232fn invoke_dynamodb_get_item(
1233    input: &Value,
1234    dynamodb_state: &Option<SharedDynamoDbState>,
1235) -> Result<Value, (String, String)> {
1236    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1237        (
1238            "States.TaskFailed".to_string(),
1239            "No DynamoDB state configured".to_string(),
1240        )
1241    })?;
1242
1243    let table_name = input["TableName"].as_str().ok_or_else(|| {
1244        (
1245            "States.TaskFailed".to_string(),
1246            "Missing TableName in DynamoDB getItem parameters".to_string(),
1247        )
1248    })?;
1249
1250    let key = input
1251        .get("Key")
1252        .and_then(|k| k.as_object())
1253        .ok_or_else(|| {
1254            (
1255                "States.TaskFailed".to_string(),
1256                "Missing Key in DynamoDB getItem parameters".to_string(),
1257            )
1258        })?;
1259
1260    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1261
1262    let __mas = ddb.read();
1263    let state = __mas.default_ref();
1264    let table = state.tables.get(table_name).ok_or_else(|| {
1265        (
1266            "States.TaskFailed".to_string(),
1267            format!("Table '{table_name}' not found"),
1268        )
1269    })?;
1270
1271    let item = table
1272        .find_item_index(&key_map)
1273        .map(|idx| table.items[idx].clone());
1274
1275    match item {
1276        Some(item_map) => {
1277            let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1278            Ok(json!({ "Item": item_value }))
1279        }
1280        None => Ok(json!({})),
1281    }
1282}
1283
1284/// Put an item into DynamoDB via direct state access.
1285fn invoke_dynamodb_put_item(
1286    input: &Value,
1287    dynamodb_state: &Option<SharedDynamoDbState>,
1288) -> Result<Value, (String, String)> {
1289    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1290        (
1291            "States.TaskFailed".to_string(),
1292            "No DynamoDB state configured".to_string(),
1293        )
1294    })?;
1295
1296    let table_name = input["TableName"].as_str().ok_or_else(|| {
1297        (
1298            "States.TaskFailed".to_string(),
1299            "Missing TableName in DynamoDB putItem parameters".to_string(),
1300        )
1301    })?;
1302
1303    let item = input
1304        .get("Item")
1305        .and_then(|i| i.as_object())
1306        .ok_or_else(|| {
1307            (
1308                "States.TaskFailed".to_string(),
1309                "Missing Item in DynamoDB putItem parameters".to_string(),
1310            )
1311        })?;
1312
1313    let item_map: HashMap<String, Value> =
1314        item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1315
1316    let mut __mas = ddb.write();
1317    let state = __mas.default_mut();
1318    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1319        (
1320            "States.TaskFailed".to_string(),
1321            format!("Table '{table_name}' not found"),
1322        )
1323    })?;
1324
1325    // Replace existing item with same key, or insert new
1326    if let Some(idx) = table.find_item_index(&item_map) {
1327        table.items[idx] = item_map;
1328    } else {
1329        table.items.push(item_map);
1330    }
1331
1332    Ok(json!({}))
1333}
1334
1335/// Delete an item from DynamoDB via direct state access.
1336fn invoke_dynamodb_delete_item(
1337    input: &Value,
1338    dynamodb_state: &Option<SharedDynamoDbState>,
1339) -> Result<Value, (String, String)> {
1340    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1341        (
1342            "States.TaskFailed".to_string(),
1343            "No DynamoDB state configured".to_string(),
1344        )
1345    })?;
1346
1347    let table_name = input["TableName"].as_str().ok_or_else(|| {
1348        (
1349            "States.TaskFailed".to_string(),
1350            "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1351        )
1352    })?;
1353
1354    let key = input
1355        .get("Key")
1356        .and_then(|k| k.as_object())
1357        .ok_or_else(|| {
1358            (
1359                "States.TaskFailed".to_string(),
1360                "Missing Key in DynamoDB deleteItem parameters".to_string(),
1361            )
1362        })?;
1363
1364    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1365
1366    let mut __mas = ddb.write();
1367    let state = __mas.default_mut();
1368    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1369        (
1370            "States.TaskFailed".to_string(),
1371            format!("Table '{table_name}' not found"),
1372        )
1373    })?;
1374
1375    if let Some(idx) = table.find_item_index(&key_map) {
1376        table.items.remove(idx);
1377    }
1378
1379    Ok(json!({}))
1380}
1381
1382/// Update an item in DynamoDB via direct state access.
1383/// Simplified: merges Key+ExpressionAttributeValues into the existing item.
1384fn invoke_dynamodb_update_item(
1385    input: &Value,
1386    dynamodb_state: &Option<SharedDynamoDbState>,
1387) -> Result<Value, (String, String)> {
1388    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1389        (
1390            "States.TaskFailed".to_string(),
1391            "No DynamoDB state configured".to_string(),
1392        )
1393    })?;
1394
1395    let table_name = input["TableName"].as_str().ok_or_else(|| {
1396        (
1397            "States.TaskFailed".to_string(),
1398            "Missing TableName in DynamoDB updateItem parameters".to_string(),
1399        )
1400    })?;
1401
1402    let key = input
1403        .get("Key")
1404        .and_then(|k| k.as_object())
1405        .ok_or_else(|| {
1406            (
1407                "States.TaskFailed".to_string(),
1408                "Missing Key in DynamoDB updateItem parameters".to_string(),
1409            )
1410        })?;
1411
1412    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1413
1414    let mut __mas = ddb.write();
1415    let state = __mas.default_mut();
1416    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1417        (
1418            "States.TaskFailed".to_string(),
1419            format!("Table '{table_name}' not found"),
1420        )
1421    })?;
1422
1423    // Parse UpdateExpression to apply SET operations
1424    if let Some(update_expr) = input["UpdateExpression"].as_str() {
1425        let attr_values = input
1426            .get("ExpressionAttributeValues")
1427            .and_then(|v| v.as_object())
1428            .cloned()
1429            .unwrap_or_default();
1430        let attr_names = input
1431            .get("ExpressionAttributeNames")
1432            .and_then(|v| v.as_object())
1433            .cloned()
1434            .unwrap_or_default();
1435
1436        if let Some(idx) = table.find_item_index(&key_map) {
1437            apply_update_expression(
1438                &mut table.items[idx],
1439                update_expr,
1440                &attr_values,
1441                &attr_names,
1442            );
1443        } else {
1444            // Create new item with key + update expression values
1445            let mut new_item = key_map;
1446            apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1447            table.items.push(new_item);
1448        }
1449    }
1450
1451    Ok(json!({}))
1452}
1453
1454/// Apply a simple SET UpdateExpression to an item.
1455fn apply_update_expression(
1456    item: &mut HashMap<String, Value>,
1457    expr: &str,
1458    attr_values: &serde_json::Map<String, Value>,
1459    attr_names: &serde_json::Map<String, Value>,
1460) {
1461    // Parse "SET #name1 = :val1, #name2 = :val2" or "SET field = :val"
1462    // DynamoDB keywords are case-insensitive
1463    let trimmed = expr.trim();
1464    let set_part = if trimmed.len() >= 4 && trimmed[..4].eq_ignore_ascii_case("SET ") {
1465        &trimmed[4..]
1466    } else {
1467        trimmed
1468    };
1469
1470    for assignment in set_part.split(',') {
1471        let parts: Vec<&str> = assignment.splitn(2, '=').collect();
1472        if parts.len() == 2 {
1473            let attr_ref = parts[0].trim();
1474            let val_ref = parts[1].trim();
1475
1476            // Resolve attribute name (could be #alias or direct name)
1477            let attr_name = if attr_ref.starts_with('#') {
1478                attr_names
1479                    .get(attr_ref)
1480                    .and_then(|v| v.as_str())
1481                    .unwrap_or(attr_ref)
1482                    .to_string()
1483            } else {
1484                attr_ref.to_string()
1485            };
1486
1487            // Resolve value
1488            if val_ref.starts_with(':') {
1489                if let Some(val) = attr_values.get(val_ref) {
1490                    item.insert(attr_name, val.clone());
1491                }
1492            }
1493        }
1494    }
1495}
1496
1497/// Convert an SQS queue URL to an ARN.
1498/// QueueUrl format: http://localhost:4566/123456789012/my-queue
1499fn queue_url_to_arn(url: &str) -> String {
1500    let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1501    if parts.len() >= 2 {
1502        let queue_name = parts[0];
1503        let account_id = parts[1];
1504        Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
1505    } else {
1506        url.to_string()
1507    }
1508}
1509
1510/// Compute MD5 hex digest for SQS message response format.
1511fn md5_hex(data: &str) -> String {
1512    use md5::Digest;
1513    let result = md5::Md5::digest(data.as_bytes());
1514    format!("{result:032x}")
1515}
1516
1517/// Invoke a Lambda function directly via DeliveryBus.
1518async fn invoke_lambda_direct(
1519    function_arn: &str,
1520    input: &Value,
1521    delivery: &Option<Arc<DeliveryBus>>,
1522    timeout_seconds: Option<u64>,
1523) -> Result<Value, (String, String)> {
1524    let delivery = delivery.as_ref().ok_or_else(|| {
1525        (
1526            "States.TaskFailed".to_string(),
1527            "No delivery bus configured for Lambda invocation".to_string(),
1528        )
1529    })?;
1530
1531    let payload = serde_json::to_string(input).unwrap_or_default();
1532
1533    let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1534
1535    let result = if let Some(timeout) = timeout_seconds {
1536        match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1537            Ok(r) => r,
1538            Err(_) => {
1539                return Err((
1540                    "States.Timeout".to_string(),
1541                    format!("Task timed out after {timeout} seconds"),
1542                ));
1543            }
1544        }
1545    } else {
1546        invoke_future.await
1547    };
1548
1549    match result {
1550        Some(Ok(bytes)) => {
1551            let response_str = String::from_utf8_lossy(&bytes);
1552            let value: Value =
1553                serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1554            Ok(value)
1555        }
1556        Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1557        None => {
1558            // No runtime available — return empty result
1559            Ok(json!({}))
1560        }
1561    }
1562}
1563
1564/// Apply Parameters template: keys ending with .$ are treated as JsonPath references.
1565fn apply_parameters(template: &Value, input: &Value) -> Value {
1566    match template {
1567        Value::Object(map) => {
1568            let mut result = serde_json::Map::new();
1569            for (key, value) in map {
1570                if let Some(stripped) = key.strip_suffix(".$") {
1571                    if let Some(path) = value.as_str() {
1572                        result.insert(
1573                            stripped.to_string(),
1574                            crate::io_processing::resolve_path(input, path),
1575                        );
1576                    }
1577                } else {
1578                    result.insert(key.clone(), apply_parameters(value, input));
1579                }
1580            }
1581            Value::Object(result)
1582        }
1583        Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1584        other => other.clone(),
1585    }
1586}
1587
1588enum NextState {
1589    Name(String),
1590    End,
1591    Error(String),
1592}
1593
1594fn next_state(state_def: &Value) -> NextState {
1595    if state_def["End"].as_bool() == Some(true) {
1596        return NextState::End;
1597    }
1598    match state_def["Next"].as_str() {
1599        Some(next) => NextState::Name(next.to_string()),
1600        None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
1601    }
1602}
1603
1604/// Find the first `Catch` clause on `state_def` that matches `error` and
1605/// apply its `ResultPath` to produce the state to transition to and the
1606/// new effective input. Returns None when no catcher applies, in which
1607/// case the error should propagate up.
1608fn apply_state_catcher(
1609    state_def: &Value,
1610    effective_input: &Value,
1611    error: &str,
1612    cause: &str,
1613) -> Option<(String, Value)> {
1614    let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
1615    let (next, result_path) = find_catcher(&catchers, error)?;
1616    let error_output = json!({
1617        "Error": error,
1618        "Cause": cause,
1619    });
1620    let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
1621    Some((next, new_input))
1622}
1623
1624/// Extract account ID from an execution ARN (`arn:aws:states:region:account_id:...`).
1625fn account_id_from_arn(arn: &str) -> &str {
1626    arn.split(':').nth(4).unwrap_or("000000000000")
1627}
1628
1629fn add_event(
1630    state: &SharedStepFunctionsState,
1631    execution_arn: &str,
1632    event_type: &str,
1633    previous_event_id: i64,
1634    details: Value,
1635) -> i64 {
1636    let account_id = account_id_from_arn(execution_arn).to_string();
1637    let mut accounts = state.write();
1638    let s = accounts.get_or_create(&account_id);
1639    if let Some(exec) = s.executions.get_mut(execution_arn) {
1640        let id = exec.history_events.len() as i64 + 1;
1641        exec.history_events.push(HistoryEvent {
1642            id,
1643            event_type: event_type.to_string(),
1644            timestamp: Utc::now(),
1645            previous_event_id,
1646            details,
1647        });
1648        id
1649    } else {
1650        0
1651    }
1652}
1653
1654fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
1655    let account_id = account_id_from_arn(execution_arn).to_string();
1656    // Check terminal status before recording events to avoid inconsistent history
1657    {
1658        let accounts = state.read();
1659        if let Some(s) = accounts.get(&account_id) {
1660            if let Some(exec) = s.executions.get(execution_arn) {
1661                if exec.status != ExecutionStatus::Running {
1662                    return;
1663                }
1664            }
1665        }
1666    }
1667
1668    let output_str = serde_json::to_string(output).unwrap_or_default();
1669
1670    add_event(
1671        state,
1672        execution_arn,
1673        "ExecutionSucceeded",
1674        0,
1675        json!({ "output": output_str }),
1676    );
1677
1678    let mut accounts = state.write();
1679    let s = accounts.get_or_create(&account_id);
1680    if let Some(exec) = s.executions.get_mut(execution_arn) {
1681        exec.status = ExecutionStatus::Succeeded;
1682        exec.output = Some(output_str);
1683        exec.stop_date = Some(Utc::now());
1684    }
1685}
1686
1687fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
1688    let account_id = account_id_from_arn(execution_arn).to_string();
1689    // Check terminal status before recording events to avoid inconsistent history
1690    {
1691        let accounts = state.read();
1692        if let Some(s) = accounts.get(&account_id) {
1693            if let Some(exec) = s.executions.get(execution_arn) {
1694                if exec.status != ExecutionStatus::Running {
1695                    return;
1696                }
1697            }
1698        }
1699    }
1700
1701    add_event(
1702        state,
1703        execution_arn,
1704        "ExecutionFailed",
1705        0,
1706        json!({ "error": error, "cause": cause }),
1707    );
1708
1709    let mut accounts = state.write();
1710    let s = accounts.get_or_create(&account_id);
1711    if let Some(exec) = s.executions.get_mut(execution_arn) {
1712        exec.status = ExecutionStatus::Failed;
1713        exec.error = Some(error.to_string());
1714        exec.cause = Some(cause.to_string());
1715        exec.stop_date = Some(Utc::now());
1716    }
1717}
1718
1719#[cfg(test)]
1720mod tests {
1721    use super::*;
1722    use crate::state::Execution;
1723    use parking_lot::RwLock;
1724    use std::sync::Arc;
1725
1726    fn make_state() -> SharedStepFunctionsState {
1727        Arc::new(RwLock::new(
1728            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
1729        ))
1730    }
1731
1732    fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
1733        let mut accounts = state.write();
1734        let s = accounts.get_or_create("123456789012");
1735        s.executions.insert(
1736            arn.to_string(),
1737            Execution {
1738                execution_arn: arn.to_string(),
1739                state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
1740                    .to_string(),
1741                state_machine_name: "test".to_string(),
1742                name: "exec-1".to_string(),
1743                status: ExecutionStatus::Running,
1744                input,
1745                output: None,
1746                start_date: Utc::now(),
1747                stop_date: None,
1748                error: None,
1749                cause: None,
1750                history_events: vec![],
1751            },
1752        );
1753    }
1754
1755    #[tokio::test]
1756    async fn test_simple_pass_state() {
1757        let state = make_state();
1758        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1759        create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
1760
1761        let definition = json!({
1762            "StartAt": "PassState",
1763            "States": {
1764                "PassState": {
1765                    "Type": "Pass",
1766                    "Result": {"processed": true},
1767                    "End": true
1768                }
1769            }
1770        })
1771        .to_string();
1772
1773        execute_state_machine(
1774            state.clone(),
1775            arn.to_string(),
1776            definition,
1777            Some(r#"{"hello":"world"}"#.to_string()),
1778            None,
1779            None,
1780        )
1781        .await;
1782
1783        let __a = state.read();
1784        let s = __a.default_ref();
1785        let exec = s.executions.get(arn).unwrap();
1786        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1787        assert!(exec.output.is_some());
1788        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1789        assert_eq!(output, json!({"processed": true}));
1790    }
1791
1792    #[tokio::test]
1793    async fn test_pass_chain() {
1794        let state = make_state();
1795        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1796        create_execution(&state, arn, Some(r#"{}"#.to_string()));
1797
1798        let definition = json!({
1799            "StartAt": "First",
1800            "States": {
1801                "First": {
1802                    "Type": "Pass",
1803                    "Result": "step1",
1804                    "ResultPath": "$.first",
1805                    "Next": "Second"
1806                },
1807                "Second": {
1808                    "Type": "Pass",
1809                    "Result": "step2",
1810                    "ResultPath": "$.second",
1811                    "End": true
1812                }
1813            }
1814        })
1815        .to_string();
1816
1817        execute_state_machine(
1818            state.clone(),
1819            arn.to_string(),
1820            definition,
1821            Some("{}".to_string()),
1822            None,
1823            None,
1824        )
1825        .await;
1826
1827        let __a = state.read();
1828        let s = __a.default_ref();
1829        let exec = s.executions.get(arn).unwrap();
1830        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1831        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1832        assert_eq!(output["first"], json!("step1"));
1833        assert_eq!(output["second"], json!("step2"));
1834    }
1835
1836    #[tokio::test]
1837    async fn test_succeed_state() {
1838        let state = make_state();
1839        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1840        create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
1841
1842        let definition = json!({
1843            "StartAt": "Done",
1844            "States": {
1845                "Done": {
1846                    "Type": "Succeed"
1847                }
1848            }
1849        })
1850        .to_string();
1851
1852        execute_state_machine(
1853            state.clone(),
1854            arn.to_string(),
1855            definition,
1856            Some(r#"{"data": "value"}"#.to_string()),
1857            None,
1858            None,
1859        )
1860        .await;
1861
1862        let __a = state.read();
1863        let s = __a.default_ref();
1864        let exec = s.executions.get(arn).unwrap();
1865        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1866    }
1867
1868    #[tokio::test]
1869    async fn test_fail_state() {
1870        let state = make_state();
1871        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1872        create_execution(&state, arn, None);
1873
1874        let definition = json!({
1875            "StartAt": "FailState",
1876            "States": {
1877                "FailState": {
1878                    "Type": "Fail",
1879                    "Error": "CustomError",
1880                    "Cause": "Something went wrong"
1881                }
1882            }
1883        })
1884        .to_string();
1885
1886        execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
1887
1888        let __a = state.read();
1889        let s = __a.default_ref();
1890        let exec = s.executions.get(arn).unwrap();
1891        assert_eq!(exec.status, ExecutionStatus::Failed);
1892        assert_eq!(exec.error.as_deref(), Some("CustomError"));
1893        assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
1894    }
1895
1896    #[tokio::test]
1897    async fn test_history_events_recorded() {
1898        let state = make_state();
1899        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1900        create_execution(&state, arn, Some("{}".to_string()));
1901
1902        let definition = json!({
1903            "StartAt": "PassState",
1904            "States": {
1905                "PassState": {
1906                    "Type": "Pass",
1907                    "End": true
1908                }
1909            }
1910        })
1911        .to_string();
1912
1913        execute_state_machine(
1914            state.clone(),
1915            arn.to_string(),
1916            definition,
1917            Some("{}".to_string()),
1918            None,
1919            None,
1920        )
1921        .await;
1922
1923        let __a = state.read();
1924        let s = __a.default_ref();
1925        let exec = s.executions.get(arn).unwrap();
1926        let event_types: Vec<&str> = exec
1927            .history_events
1928            .iter()
1929            .map(|e| e.event_type.as_str())
1930            .collect();
1931        assert_eq!(
1932            event_types,
1933            vec![
1934                "ExecutionStarted",
1935                "PassStateEntered",
1936                "PassStateExited",
1937                "ExecutionSucceeded"
1938            ]
1939        );
1940    }
1941
1942    fn drive(state: &SharedStepFunctionsState, arn: &str, def: Value, input: Option<&str>) {
1943        create_execution(state, arn, input.map(|s| s.to_string()));
1944        let fut = execute_state_machine(
1945            state.clone(),
1946            arn.to_string(),
1947            def.to_string(),
1948            input.map(|s| s.to_string()),
1949            None,
1950            None,
1951        );
1952        let rt = tokio::runtime::Builder::new_current_thread()
1953            .enable_time()
1954            .build()
1955            .unwrap();
1956        rt.block_on(fut);
1957    }
1958
1959    fn read_exec<R>(
1960        state: &SharedStepFunctionsState,
1961        arn: &str,
1962        f: impl FnOnce(&Execution) -> R,
1963    ) -> R {
1964        let __a = state.read();
1965        let s = __a.default_ref();
1966        f(s.executions.get(arn).expect("execution missing"))
1967    }
1968
1969    fn arn_for(name: &str) -> String {
1970        format!("arn:aws:states:us-east-1:123456789012:execution:test:{name}")
1971    }
1972
1973    // ── Pass state: InputPath / OutputPath ───────────────────────────
1974
1975    #[test]
1976    fn pass_state_input_output_path_select_fields() {
1977        let state = make_state();
1978        let arn = arn_for("pass-paths");
1979        let def = json!({
1980            "StartAt": "P",
1981            "States": {
1982                "P": {
1983                    "Type": "Pass",
1984                    "InputPath": "$.inner",
1985                    "OutputPath": "$.kept",
1986                    "End": true
1987                }
1988            }
1989        });
1990        drive(
1991            &state,
1992            &arn,
1993            def,
1994            Some(r#"{"inner":{"kept":"yes","dropped":true},"sibling":1}"#),
1995        );
1996
1997        read_exec(&state, &arn, |exec| {
1998            assert_eq!(exec.status, ExecutionStatus::Succeeded);
1999            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2000            assert_eq!(output, json!("yes"));
2001        });
2002    }
2003
2004    // ── Succeed / Fail variants ──────────────────────────────────────
2005
2006    #[test]
2007    fn succeed_state_honors_input_path_null() {
2008        let state = make_state();
2009        let arn = arn_for("succeed-null");
2010        let def = json!({
2011            "StartAt": "S",
2012            "States": { "S": { "Type": "Succeed", "InputPath": "null" } }
2013        });
2014        drive(&state, &arn, def, Some(r#"{"a":1}"#));
2015
2016        read_exec(&state, &arn, |exec| {
2017            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2018            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2019            assert_eq!(output, json!({}));
2020        });
2021    }
2022
2023    #[test]
2024    fn fail_state_defaults_when_fields_missing() {
2025        let state = make_state();
2026        let arn = arn_for("fail-default");
2027        let def = json!({
2028            "StartAt": "F",
2029            "States": { "F": { "Type": "Fail" } }
2030        });
2031        drive(&state, &arn, def, None);
2032
2033        read_exec(&state, &arn, |exec| {
2034            assert_eq!(exec.status, ExecutionStatus::Failed);
2035            assert_eq!(exec.error.as_deref(), Some("States.Fail"));
2036            assert_eq!(exec.cause.as_deref(), Some(""));
2037        });
2038    }
2039
2040    // ── Choice ───────────────────────────────────────────────────────
2041
2042    fn choice_def() -> Value {
2043        json!({
2044            "StartAt": "C",
2045            "States": {
2046                "C": {
2047                    "Type": "Choice",
2048                    "Choices": [
2049                        {
2050                            "Variable": "$.n",
2051                            "NumericGreaterThan": 10,
2052                            "Next": "Big"
2053                        }
2054                    ],
2055                    "Default": "Small"
2056                },
2057                "Big":   { "Type": "Pass", "Result": "big",   "End": true },
2058                "Small": { "Type": "Pass", "Result": "small", "End": true }
2059            }
2060        })
2061    }
2062
2063    #[test]
2064    fn choice_routes_to_matching_branch() {
2065        let state = make_state();
2066        let arn = arn_for("choice-big");
2067        drive(&state, &arn, choice_def(), Some(r#"{"n":42}"#));
2068
2069        read_exec(&state, &arn, |exec| {
2070            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2071            assert_eq!(
2072                serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2073                json!("big")
2074            );
2075        });
2076    }
2077
2078    #[test]
2079    fn choice_falls_through_to_default() {
2080        let state = make_state();
2081        let arn = arn_for("choice-default");
2082        drive(&state, &arn, choice_def(), Some(r#"{"n":3}"#));
2083
2084        read_exec(&state, &arn, |exec| {
2085            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2086            assert_eq!(
2087                serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2088                json!("small")
2089            );
2090        });
2091    }
2092
2093    #[test]
2094    fn choice_no_match_and_no_default_fails() {
2095        let state = make_state();
2096        let arn = arn_for("choice-nomatch");
2097        let def = json!({
2098            "StartAt": "C",
2099            "States": {
2100                "C": {
2101                    "Type": "Choice",
2102                    "Choices": [
2103                        { "Variable": "$.n", "NumericEquals": 1, "Next": "End1" }
2104                    ]
2105                },
2106                "End1": { "Type": "Pass", "End": true }
2107            }
2108        });
2109        drive(&state, &arn, def, Some(r#"{"n":99}"#));
2110
2111        read_exec(&state, &arn, |exec| {
2112            assert_eq!(exec.status, ExecutionStatus::Failed);
2113            assert_eq!(exec.error.as_deref(), Some("States.NoChoiceMatched"));
2114        });
2115    }
2116
2117    // ── Wait ─────────────────────────────────────────────────────────
2118
2119    #[test]
2120    fn wait_seconds_then_advances() {
2121        let state = make_state();
2122        let arn = arn_for("wait-secs");
2123        let def = json!({
2124            "StartAt": "W",
2125            "States": {
2126                "W": { "Type": "Wait", "Seconds": 0, "Next": "Done" },
2127                "Done": { "Type": "Succeed" }
2128            }
2129        });
2130        drive(&state, &arn, def, Some(r#"{"k":1}"#));
2131
2132        read_exec(&state, &arn, |exec| {
2133            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2134        });
2135    }
2136
2137    #[test]
2138    fn wait_timestamp_in_past_is_noop() {
2139        let state = make_state();
2140        let arn = arn_for("wait-past");
2141        let def = json!({
2142            "StartAt": "W",
2143            "States": {
2144                "W": {
2145                    "Type": "Wait",
2146                    "Timestamp": "2000-01-01T00:00:00Z",
2147                    "End": true
2148                }
2149            }
2150        });
2151        drive(&state, &arn, def, Some(r#"{"k":1}"#));
2152
2153        read_exec(&state, &arn, |exec| {
2154            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2155        });
2156    }
2157
2158    #[test]
2159    fn wait_without_any_duration_falls_through() {
2160        let state = make_state();
2161        let arn = arn_for("wait-none");
2162        let def = json!({
2163            "StartAt": "W",
2164            "States": { "W": { "Type": "Wait", "End": true } }
2165        });
2166        drive(&state, &arn, def, None);
2167
2168        read_exec(&state, &arn, |exec| {
2169            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2170        });
2171    }
2172
2173    // ── Parallel ─────────────────────────────────────────────────────
2174
2175    #[test]
2176    fn parallel_runs_two_pass_branches_and_collects_results() {
2177        let state = make_state();
2178        let arn = arn_for("parallel-ok");
2179        let def = json!({
2180            "StartAt": "P",
2181            "States": {
2182                "P": {
2183                    "Type": "Parallel",
2184                    "End": true,
2185                    "Branches": [
2186                        {
2187                            "StartAt": "A",
2188                            "States": { "A": { "Type": "Pass", "Result": "a", "End": true } }
2189                        },
2190                        {
2191                            "StartAt": "B",
2192                            "States": { "B": { "Type": "Pass", "Result": "b", "End": true } }
2193                        }
2194                    ]
2195                }
2196            }
2197        });
2198        drive(&state, &arn, def, Some(r#"{}"#));
2199
2200        read_exec(&state, &arn, |exec| {
2201            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2202            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2203            assert_eq!(output, json!(["a", "b"]));
2204        });
2205    }
2206
2207    #[test]
2208    fn parallel_empty_branches_fails() {
2209        let state = make_state();
2210        let arn = arn_for("parallel-empty");
2211        let def = json!({
2212            "StartAt": "P",
2213            "States": { "P": { "Type": "Parallel", "Branches": [], "End": true } }
2214        });
2215        drive(&state, &arn, def, None);
2216
2217        read_exec(&state, &arn, |exec| {
2218            assert_eq!(exec.status, ExecutionStatus::Failed);
2219            assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2220        });
2221    }
2222
2223    // ── Map ──────────────────────────────────────────────────────────
2224
2225    #[test]
2226    fn map_iterates_pass_state_over_items_path() {
2227        let state = make_state();
2228        let arn = arn_for("map-ok");
2229        let def = json!({
2230            "StartAt": "M",
2231            "States": {
2232                "M": {
2233                    "Type": "Map",
2234                    "ItemsPath": "$.items",
2235                    "Iterator": {
2236                        "StartAt": "Item",
2237                        "States": { "Item": { "Type": "Pass", "End": true } }
2238                    },
2239                    "End": true
2240                }
2241            }
2242        });
2243        drive(&state, &arn, def, Some(r#"{"items":[1,2,3]}"#));
2244
2245        read_exec(&state, &arn, |exec| {
2246            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2247            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2248            assert_eq!(output, json!([1, 2, 3]));
2249        });
2250    }
2251
2252    // ── Task: unsupported resources / delivery == None ───────────────
2253
2254    #[test]
2255    fn task_unsupported_resource_propagates_failure() {
2256        let state = make_state();
2257        let arn = arn_for("task-unsupported");
2258        let def = json!({
2259            "StartAt": "T",
2260            "States": {
2261                "T": {
2262                    "Type": "Task",
2263                    "Resource": "arn:aws:states:::nothing:here",
2264                    "End": true
2265                }
2266            }
2267        });
2268        drive(&state, &arn, def, None);
2269
2270        read_exec(&state, &arn, |exec| {
2271            assert_eq!(exec.status, ExecutionStatus::Failed);
2272            assert_eq!(exec.error.as_deref(), Some("States.TaskFailed"));
2273            assert!(exec.cause.as_deref().unwrap().contains("Unsupported"));
2274        });
2275    }
2276
2277    #[test]
2278    fn task_sqs_send_without_delivery_fails() {
2279        let state = make_state();
2280        let arn = arn_for("task-sqs-nodelivery");
2281        let def = json!({
2282            "StartAt": "T",
2283            "States": {
2284                "T": {
2285                    "Type": "Task",
2286                    "Resource": "arn:aws:states:::sqs:sendMessage",
2287                    "Parameters": { "QueueUrl": "http://localhost/123/q", "MessageBody": "hi" },
2288                    "End": true
2289                }
2290            }
2291        });
2292        drive(&state, &arn, def, Some("{}"));
2293
2294        read_exec(&state, &arn, |exec| {
2295            assert_eq!(exec.status, ExecutionStatus::Failed);
2296            assert!(exec.cause.as_deref().unwrap().contains("delivery bus"));
2297        });
2298    }
2299
2300    // ── Task: Catch clause ───────────────────────────────────────────
2301
2302    #[test]
2303    fn task_catch_routes_error_into_handler() {
2304        let state = make_state();
2305        let arn = arn_for("task-catch");
2306        let def = json!({
2307            "StartAt": "T",
2308            "States": {
2309                "T": {
2310                    "Type": "Task",
2311                    "Resource": "arn:aws:states:::nothing:here",
2312                    "Catch": [
2313                        { "ErrorEquals": ["States.ALL"], "Next": "Handler", "ResultPath": "$.err" }
2314                    ],
2315                    "End": true
2316                },
2317                "Handler": { "Type": "Pass", "End": true }
2318            }
2319        });
2320        drive(&state, &arn, def, Some(r#"{"orig":"v"}"#));
2321
2322        read_exec(&state, &arn, |exec| {
2323            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2324            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2325            // Handler is Pass with no Result — effective input flows through.
2326            assert_eq!(output["orig"], json!("v"));
2327            assert_eq!(output["err"]["Error"], json!("States.TaskFailed"));
2328        });
2329    }
2330
2331    // ── Top-level errors: definition / start-at / missing states ─────
2332
2333    #[test]
2334    fn invalid_definition_json_fails_execution() {
2335        let state = make_state();
2336        let arn = arn_for("bad-json");
2337        create_execution(&state, &arn, None);
2338        let rt = tokio::runtime::Builder::new_current_thread()
2339            .enable_time()
2340            .build()
2341            .unwrap();
2342        rt.block_on(execute_state_machine(
2343            state.clone(),
2344            arn.clone(),
2345            "not json{".to_string(),
2346            None,
2347            None,
2348            None,
2349        ));
2350
2351        read_exec(&state, &arn, |exec| {
2352            assert_eq!(exec.status, ExecutionStatus::Failed);
2353            assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2354            assert!(exec.cause.as_deref().unwrap().contains("Failed to parse"));
2355        });
2356    }
2357
2358    #[test]
2359    fn missing_start_at_fails_execution() {
2360        let state = make_state();
2361        let arn = arn_for("no-startat");
2362        let def = json!({ "States": { "X": { "Type": "Succeed" } } });
2363        drive(&state, &arn, def, None);
2364
2365        read_exec(&state, &arn, |exec| {
2366            assert_eq!(exec.status, ExecutionStatus::Failed);
2367            assert!(exec.cause.as_deref().unwrap().contains("StartAt"));
2368        });
2369    }
2370
2371    #[test]
2372    fn next_state_not_found_fails_execution() {
2373        let state = make_state();
2374        let arn = arn_for("dangling-next");
2375        let def = json!({
2376            "StartAt": "A",
2377            "States": { "A": { "Type": "Pass", "Next": "DoesNotExist" } }
2378        });
2379        drive(&state, &arn, def, None);
2380
2381        read_exec(&state, &arn, |exec| {
2382            assert_eq!(exec.status, ExecutionStatus::Failed);
2383            assert!(exec.cause.as_deref().unwrap().contains("not found"));
2384        });
2385    }
2386
2387    #[test]
2388    fn unsupported_state_type_fails_execution() {
2389        let state = make_state();
2390        let arn = arn_for("bad-type");
2391        let def = json!({
2392            "StartAt": "X",
2393            "States": { "X": { "Type": "WatChoo", "End": true } }
2394        });
2395        drive(&state, &arn, def, None);
2396
2397        read_exec(&state, &arn, |exec| {
2398            assert_eq!(exec.status, ExecutionStatus::Failed);
2399            assert!(exec
2400                .cause
2401                .as_deref()
2402                .unwrap()
2403                .contains("Unsupported state type"));
2404        });
2405    }
2406
2407    // ── Pure helpers ─────────────────────────────────────────────────
2408
2409    #[test]
2410    fn apply_parameters_substitutes_json_path_refs() {
2411        let template = json!({
2412            "literal": "constant",
2413            "ref.$": "$.user.id",
2414            "nested": { "inner.$": "$.user.name" },
2415            "list": [ { "x.$": "$.user.id" } ]
2416        });
2417        let input = json!({ "user": { "id": 42, "name": "zoe" } });
2418        let out = apply_parameters(&template, &input);
2419        assert_eq!(out["literal"], json!("constant"));
2420        assert_eq!(out["ref"], json!(42));
2421        assert_eq!(out["nested"]["inner"], json!("zoe"));
2422        assert_eq!(out["list"][0]["x"], json!(42));
2423    }
2424
2425    #[test]
2426    fn next_state_returns_end_name_or_error() {
2427        match next_state(&json!({ "End": true })) {
2428            NextState::End => {}
2429            _ => panic!("expected End"),
2430        }
2431        match next_state(&json!({ "Next": "A" })) {
2432            NextState::Name(n) => assert_eq!(n, "A"),
2433            _ => panic!("expected Name"),
2434        }
2435        match next_state(&json!({})) {
2436            NextState::Error(_) => {}
2437            _ => panic!("expected Error"),
2438        }
2439    }
2440
2441    #[test]
2442    fn apply_state_catcher_matches_wildcard_and_stashes_error() {
2443        let state_def = json!({
2444            "Catch": [
2445                { "ErrorEquals": ["States.ALL"], "Next": "H", "ResultPath": "$.caught" }
2446            ]
2447        });
2448        let input = json!({ "a": 1 });
2449        let (next, new_input) =
2450            apply_state_catcher(&state_def, &input, "Boom", "it exploded").unwrap();
2451        assert_eq!(next, "H");
2452        assert_eq!(new_input["a"], json!(1));
2453        assert_eq!(new_input["caught"]["Error"], json!("Boom"));
2454        assert_eq!(new_input["caught"]["Cause"], json!("it exploded"));
2455    }
2456
2457    #[test]
2458    fn apply_state_catcher_returns_none_without_match() {
2459        let state_def = json!({
2460            "Catch": [
2461                { "ErrorEquals": ["Specific.Error"], "Next": "H" }
2462            ]
2463        });
2464        let input = json!({});
2465        assert!(apply_state_catcher(&state_def, &input, "Other", "why").is_none());
2466    }
2467
2468    #[test]
2469    fn queue_url_to_arn_parses_account_and_name() {
2470        assert_eq!(
2471            queue_url_to_arn("http://sqs.local:4566/123456789012/my-queue"),
2472            "arn:aws:sqs:us-east-1:123456789012:my-queue"
2473        );
2474    }
2475
2476    #[test]
2477    fn queue_url_to_arn_falls_back_for_unparseable_input() {
2478        assert_eq!(queue_url_to_arn("bad"), "bad");
2479    }
2480
2481    #[test]
2482    fn md5_hex_is_deterministic_and_32_chars() {
2483        let a = md5_hex("hello");
2484        let b = md5_hex("hello");
2485        assert_eq!(a, b);
2486        assert_eq!(a.len(), 32);
2487        assert_ne!(a, md5_hex("world"));
2488    }
2489
2490    #[test]
2491    fn apply_update_expression_sets_direct_and_aliased_attrs() {
2492        let mut item: HashMap<String, Value> = HashMap::new();
2493        item.insert("id".to_string(), json!({"S": "1"}));
2494
2495        let mut attr_values = serde_json::Map::new();
2496        attr_values.insert(":n".to_string(), json!({"S": "Alice"}));
2497        attr_values.insert(":c".to_string(), json!({"N": "5"}));
2498
2499        let mut attr_names = serde_json::Map::new();
2500        attr_names.insert("#name".to_string(), json!("name"));
2501
2502        apply_update_expression(
2503            &mut item,
2504            "SET #name = :n, count = :c",
2505            &attr_values,
2506            &attr_names,
2507        );
2508
2509        assert_eq!(item.get("name").unwrap(), &json!({"S": "Alice"}));
2510        assert_eq!(item.get("count").unwrap(), &json!({"N": "5"}));
2511        assert_eq!(item.get("id").unwrap(), &json!({"S": "1"}));
2512    }
2513
2514    #[test]
2515    fn apply_update_expression_accepts_lowercase_set_keyword() {
2516        let mut item: HashMap<String, Value> = HashMap::new();
2517        let mut attr_values = serde_json::Map::new();
2518        attr_values.insert(":v".to_string(), json!({"S": "x"}));
2519        apply_update_expression(
2520            &mut item,
2521            "set field = :v",
2522            &attr_values,
2523            &serde_json::Map::new(),
2524        );
2525        assert_eq!(item.get("field").unwrap(), &json!({"S": "x"}));
2526    }
2527
2528    // ── DynamoDB invoke: error paths without delivery bus ────────────
2529
2530    #[test]
2531    fn task_dynamodb_get_item_without_state_fails() {
2532        let state = make_state();
2533        let arn = arn_for("ddb-get-nostate");
2534        let def = json!({
2535            "StartAt": "T",
2536            "States": {
2537                "T": {
2538                    "Type": "Task",
2539                    "Resource": "arn:aws:states:::dynamodb:getItem",
2540                    "Parameters": { "TableName": "t", "Key": { "id": { "S": "1" } } },
2541                    "End": true
2542                }
2543            }
2544        });
2545        drive(&state, &arn, def, Some("{}"));
2546        read_exec(&state, &arn, |exec| {
2547            assert_eq!(exec.status, ExecutionStatus::Failed);
2548            assert!(exec.cause.as_deref().unwrap().contains("DynamoDB"));
2549        });
2550    }
2551
2552    // ── Terminal guards on succeed/fail helpers ──────────────────────
2553
2554    #[test]
2555    fn succeed_execution_is_noop_when_already_terminal() {
2556        let state = make_state();
2557        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already";
2558        create_execution(&state, arn, None);
2559        {
2560            let mut __a = state.write();
2561            let s = __a.default_mut();
2562            s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Failed;
2563        }
2564        succeed_execution(&state, arn, &json!({"x":1}));
2565        let __a = state.read();
2566        let s = __a.default_ref();
2567        let exec = s.executions.get(arn).unwrap();
2568        assert_eq!(exec.status, ExecutionStatus::Failed);
2569        assert!(exec.output.is_none());
2570    }
2571
2572    #[test]
2573    fn fail_execution_is_noop_when_already_terminal() {
2574        let state = make_state();
2575        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already2";
2576        create_execution(&state, arn, None);
2577        {
2578            let mut __a = state.write();
2579            let s = __a.default_mut();
2580            s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Succeeded;
2581        }
2582        fail_execution(&state, arn, "Oops", "nope");
2583        let __a = state.read();
2584        let s = __a.default_ref();
2585        let exec = s.executions.get(arn).unwrap();
2586        assert_eq!(exec.status, ExecutionStatus::Succeeded);
2587        assert!(exec.error.is_none());
2588    }
2589
2590    // ── Pass state with ResultPath ──
2591
2592    #[test]
2593    fn pass_state_result_path_merges_into_input() {
2594        let state = make_state();
2595        let arn = arn_for("result-path");
2596        let def = json!({
2597            "StartAt": "P",
2598            "States": {
2599                "P": {"Type": "Pass", "Result": {"x": 2}, "ResultPath": "$.data", "End": true}
2600            }
2601        });
2602        drive(&state, &arn, def, Some(r#"{"a":1}"#));
2603        let output = read_exec(&state, &arn, |e| e.output.clone().unwrap_or_default());
2604        let v: Value = serde_json::from_str(&output).unwrap();
2605        assert_eq!(v["a"], 1);
2606        assert_eq!(v["data"]["x"], 2);
2607    }
2608
2609    // ── Choice with many operators ──
2610
2611    #[test]
2612    fn choice_string_greater_than_equals() {
2613        let state = make_state();
2614        let arn = arn_for("choice-sgte");
2615        let def = json!({
2616            "StartAt": "C",
2617            "States": {
2618                "C": {
2619                    "Type": "Choice",
2620                    "Choices": [
2621                        {"Variable": "$.val", "StringGreaterThanEquals": "apple", "Next": "End"}
2622                    ],
2623                    "Default": "Fail"
2624                },
2625                "End": {"Type": "Pass", "End": true},
2626                "Fail": {"Type": "Fail"}
2627            }
2628        });
2629        drive(&state, &arn, def, Some(r#"{"val":"banana"}"#));
2630        let status = read_exec(&state, &arn, |e| e.status);
2631        assert_eq!(status, ExecutionStatus::Succeeded);
2632    }
2633
2634    #[test]
2635    fn choice_is_present_and_is_null() {
2636        let state = make_state();
2637        let arn = arn_for("choice-ispres");
2638        let def = json!({
2639            "StartAt": "C",
2640            "States": {
2641                "C": {
2642                    "Type": "Choice",
2643                    "Choices": [
2644                        {"Variable": "$.foo", "IsPresent": true, "Next": "End"}
2645                    ],
2646                    "Default": "Fail"
2647                },
2648                "End": {"Type": "Pass", "End": true},
2649                "Fail": {"Type": "Fail"}
2650            }
2651        });
2652        drive(&state, &arn, def, Some(r#"{"foo":null}"#));
2653        assert_eq!(
2654            read_exec(&state, &arn, |e| e.status),
2655            ExecutionStatus::Succeeded
2656        );
2657    }
2658
2659    #[test]
2660    fn choice_or_short_circuits() {
2661        let state = make_state();
2662        let arn = arn_for("choice-or");
2663        let def = json!({
2664            "StartAt": "C",
2665            "States": {
2666                "C": {
2667                    "Type": "Choice",
2668                    "Choices": [{
2669                        "Or": [
2670                            {"Variable": "$.x", "NumericEquals": 99},
2671                            {"Variable": "$.y", "StringEquals": "b"}
2672                        ],
2673                        "Next": "End"
2674                    }],
2675                    "Default": "Fail"
2676                },
2677                "End": {"Type": "Pass", "End": true},
2678                "Fail": {"Type": "Fail"}
2679            }
2680        });
2681        drive(&state, &arn, def, Some(r#"{"x":1,"y":"b"}"#));
2682        assert_eq!(
2683            read_exec(&state, &arn, |e| e.status),
2684            ExecutionStatus::Succeeded
2685        );
2686    }
2687
2688    #[test]
2689    fn choice_not_negates() {
2690        let state = make_state();
2691        let arn = arn_for("choice-not");
2692        let def = json!({
2693            "StartAt": "C",
2694            "States": {
2695                "C": {
2696                    "Type": "Choice",
2697                    "Choices": [{
2698                        "Not": {"Variable": "$.x", "NumericEquals": 99},
2699                        "Next": "End"
2700                    }],
2701                    "Default": "Fail"
2702                },
2703                "End": {"Type": "Pass", "End": true},
2704                "Fail": {"Type": "Fail"}
2705            }
2706        });
2707        drive(&state, &arn, def, Some(r#"{"x":1}"#));
2708        assert_eq!(
2709            read_exec(&state, &arn, |e| e.status),
2710            ExecutionStatus::Succeeded
2711        );
2712    }
2713
2714    #[test]
2715    fn choice_boolean_equals() {
2716        let state = make_state();
2717        let arn = arn_for("choice-bool");
2718        let def = json!({
2719            "StartAt": "C",
2720            "States": {
2721                "C": {
2722                    "Type": "Choice",
2723                    "Choices": [
2724                        {"Variable": "$.ok", "BooleanEquals": true, "Next": "End"}
2725                    ],
2726                    "Default": "Fail"
2727                },
2728                "End": {"Type": "Pass", "End": true},
2729                "Fail": {"Type": "Fail"}
2730            }
2731        });
2732        drive(&state, &arn, def, Some(r#"{"ok":true}"#));
2733        assert_eq!(
2734            read_exec(&state, &arn, |e| e.status),
2735            ExecutionStatus::Succeeded
2736        );
2737    }
2738
2739    // ── Wait with SecondsPath ──
2740
2741    #[test]
2742    fn wait_seconds_path_uses_input_value() {
2743        let state = make_state();
2744        let arn = arn_for("wait-sp");
2745        let def = json!({
2746            "StartAt": "W",
2747            "States": {
2748                "W": {"Type": "Wait", "SecondsPath": "$.wait", "End": true}
2749            }
2750        });
2751        drive(&state, &arn, def, Some(r#"{"wait":0}"#));
2752        assert_eq!(
2753            read_exec(&state, &arn, |e| e.status),
2754            ExecutionStatus::Succeeded
2755        );
2756    }
2757
2758    // ── Map state with empty input array ──
2759
2760    #[test]
2761    fn map_state_empty_array_succeeds() {
2762        let state = make_state();
2763        let arn = arn_for("map-empty");
2764        let def = json!({
2765            "StartAt": "M",
2766            "States": {
2767                "M": {
2768                    "Type": "Map",
2769                    "ItemsPath": "$.items",
2770                    "ItemProcessor": {
2771                        "StartAt": "P",
2772                        "States": {
2773                            "P": {"Type": "Pass", "End": true}
2774                        }
2775                    },
2776                    "End": true
2777                }
2778            }
2779        });
2780        drive(&state, &arn, def, Some(r#"{"items":[]}"#));
2781        assert_eq!(
2782            read_exec(&state, &arn, |e| e.status),
2783            ExecutionStatus::Succeeded
2784        );
2785    }
2786
2787    // ── Fail state with Error + Cause ──
2788
2789    #[test]
2790    fn fail_state_with_explicit_error_and_cause() {
2791        let state = make_state();
2792        let arn = arn_for("fail-fields");
2793        create_execution(&state, &arn, None);
2794        let def = json!({
2795            "StartAt": "F",
2796            "States": {
2797                "F": {"Type": "Fail", "Error": "MyError", "Cause": "my cause"}
2798            }
2799        });
2800        drive(&state, &arn, def, None);
2801        let status = read_exec(&state, &arn, |e| e.status);
2802        assert_eq!(status, ExecutionStatus::Failed);
2803        let err = read_exec(&state, &arn, |e| e.error.clone().unwrap_or_default());
2804        assert_eq!(err, "MyError");
2805    }
2806}