Skip to main content

fakecloud_stepfunctions/
interpreter.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::state::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
16
17/// Execute a state machine definition with the given input.
18/// Updates the execution record in shared state as it progresses.
19pub async fn execute_state_machine(
20    state: SharedStepFunctionsState,
21    execution_arn: String,
22    definition: String,
23    input: Option<String>,
24    delivery: Option<Arc<DeliveryBus>>,
25    dynamodb_state: Option<SharedDynamoDbState>,
26) {
27    let def: Value = match serde_json::from_str(&definition) {
28        Ok(v) => v,
29        Err(e) => {
30            fail_execution(
31                &state,
32                &execution_arn,
33                "States.Runtime",
34                &format!("Failed to parse definition: {e}"),
35            );
36            return;
37        }
38    };
39
40    let raw_input: Value = input
41        .as_deref()
42        .and_then(|s| serde_json::from_str(s).ok())
43        .unwrap_or(json!({}));
44
45    // Record ExecutionStarted event
46    add_event(
47        &state,
48        &execution_arn,
49        "ExecutionStarted",
50        0,
51        json!({
52            "input": serde_json::to_string(&raw_input).unwrap_or_default(),
53            "roleArn": "arn:aws:iam::123456789012:role/execution-role"
54        }),
55    );
56
57    match run_states(
58        &def,
59        raw_input,
60        &delivery,
61        &dynamodb_state,
62        &state,
63        &execution_arn,
64    )
65    .await
66    {
67        Ok(output) => {
68            succeed_execution(&state, &execution_arn, &output);
69        }
70        Err((error, cause)) => {
71            fail_execution(&state, &execution_arn, &error, &cause);
72        }
73    }
74}
75
76type StatesResult<'a> = std::pin::Pin<
77    Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
78>;
79
80/// Core execution loop: runs through states in a definition and returns the output.
81/// Used by the top-level executor, Parallel branches, and Map iterations.
82fn run_states<'a>(
83    def: &'a Value,
84    input: Value,
85    delivery: &'a Option<Arc<DeliveryBus>>,
86    dynamodb_state: &'a Option<SharedDynamoDbState>,
87    shared_state: &'a SharedStepFunctionsState,
88    execution_arn: &'a str,
89) -> StatesResult<'a> {
90    Box::pin(async move {
91        let start_at = def["StartAt"]
92            .as_str()
93            .ok_or_else(|| {
94                (
95                    "States.Runtime".to_string(),
96                    "Missing StartAt in definition".to_string(),
97                )
98            })?
99            .to_string();
100
101        let states = def.get("States").ok_or_else(|| {
102            (
103                "States.Runtime".to_string(),
104                "Missing States in definition".to_string(),
105            )
106        })?;
107
108        let mut current_state = start_at;
109        let mut effective_input = input;
110        let mut iteration = 0;
111        let max_iterations = 500;
112
113        loop {
114            iteration += 1;
115            if iteration > max_iterations {
116                return Err((
117                    "States.Runtime".to_string(),
118                    "Maximum number of state transitions exceeded".to_string(),
119                ));
120            }
121
122            let state_def = states.get(&current_state).cloned().ok_or_else(|| {
123                (
124                    "States.Runtime".to_string(),
125                    format!("State '{current_state}' not found in definition"),
126                )
127            })?;
128
129            let state_type = state_def["Type"]
130                .as_str()
131                .ok_or_else(|| {
132                    (
133                        "States.Runtime".to_string(),
134                        format!("State '{current_state}' missing Type field"),
135                    )
136                })?
137                .to_string();
138
139            debug!(
140                execution_arn = %execution_arn,
141                state = %current_state,
142                state_type = %state_type,
143                "Executing state"
144            );
145
146            let advance = match state_type.as_str() {
147                "Pass" => run_pass_state(
148                    &current_state,
149                    &state_def,
150                    effective_input,
151                    shared_state,
152                    execution_arn,
153                ),
154                "Succeed" => run_succeed_state(
155                    &current_state,
156                    &state_def,
157                    effective_input,
158                    shared_state,
159                    execution_arn,
160                ),
161                "Fail" => run_fail_state(
162                    &current_state,
163                    &state_def,
164                    effective_input,
165                    shared_state,
166                    execution_arn,
167                ),
168                "Choice" => run_choice_state(
169                    &current_state,
170                    &state_def,
171                    effective_input,
172                    shared_state,
173                    execution_arn,
174                ),
175                "Wait" => {
176                    run_wait_state(
177                        &current_state,
178                        &state_def,
179                        effective_input,
180                        shared_state,
181                        execution_arn,
182                    )
183                    .await
184                }
185                "Task" => {
186                    run_task_state(
187                        &current_state,
188                        &state_def,
189                        effective_input,
190                        delivery,
191                        dynamodb_state,
192                        shared_state,
193                        execution_arn,
194                    )
195                    .await
196                }
197                "Parallel" => {
198                    run_parallel_state(
199                        &current_state,
200                        &state_def,
201                        effective_input,
202                        delivery,
203                        dynamodb_state,
204                        shared_state,
205                        execution_arn,
206                    )
207                    .await
208                }
209                "Map" => {
210                    run_map_state(
211                        &current_state,
212                        &state_def,
213                        effective_input,
214                        delivery,
215                        dynamodb_state,
216                        shared_state,
217                        execution_arn,
218                    )
219                    .await
220                }
221                other => Advance::Fail(
222                    "States.Runtime".to_string(),
223                    format!("Unsupported state type: '{other}'"),
224                ),
225            };
226
227            match advance {
228                Advance::Next(next, new_input) => {
229                    effective_input = new_input;
230                    current_state = next;
231                }
232                Advance::End(output) => return Ok(output),
233                Advance::Fail(error, cause) => return Err((error, cause)),
234            }
235        }
236    })
237}
238
239/// Result of executing a single state in the state machine.
240enum Advance {
241    /// Continue to the given state with the given input.
242    Next(String, Value),
243    /// Terminate the state machine with the given output.
244    End(Value),
245    /// Fail the state machine with the given error and cause.
246    Fail(String, String),
247}
248
249fn advance_from_next(state_def: &Value, input: Value) -> Advance {
250    match next_state(state_def) {
251        NextState::Name(next) => Advance::Next(next, input),
252        NextState::End => Advance::End(input),
253        NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
254    }
255}
256
257fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
258    match apply_state_catcher(state_def, input, &error, &cause) {
259        Some((next, new_input)) => Advance::Next(next, new_input),
260        None => Advance::Fail(error, cause),
261    }
262}
263
264fn run_pass_state(
265    name: &str,
266    state_def: &Value,
267    input: Value,
268    shared_state: &SharedStepFunctionsState,
269    execution_arn: &str,
270) -> Advance {
271    let entered_event_id = add_event(
272        shared_state,
273        execution_arn,
274        "PassStateEntered",
275        0,
276        json!({
277            "name": name,
278            "input": serde_json::to_string(&input).unwrap_or_default(),
279        }),
280    );
281
282    let result = execute_pass_state(state_def, &input);
283
284    add_event(
285        shared_state,
286        execution_arn,
287        "PassStateExited",
288        entered_event_id,
289        json!({
290            "name": name,
291            "output": serde_json::to_string(&result).unwrap_or_default(),
292        }),
293    );
294
295    advance_from_next(state_def, result)
296}
297
298fn run_succeed_state(
299    name: &str,
300    state_def: &Value,
301    input: Value,
302    shared_state: &SharedStepFunctionsState,
303    execution_arn: &str,
304) -> Advance {
305    add_event(
306        shared_state,
307        execution_arn,
308        "SucceedStateEntered",
309        0,
310        json!({
311            "name": name,
312            "input": serde_json::to_string(&input).unwrap_or_default(),
313        }),
314    );
315
316    let input_path = state_def["InputPath"].as_str();
317    let output_path = state_def["OutputPath"].as_str();
318
319    let processed = if input_path == Some("null") {
320        json!({})
321    } else {
322        apply_input_path(&input, input_path)
323    };
324
325    let output = if output_path == Some("null") {
326        json!({})
327    } else {
328        apply_output_path(&processed, output_path)
329    };
330
331    add_event(
332        shared_state,
333        execution_arn,
334        "SucceedStateExited",
335        0,
336        json!({
337            "name": name,
338            "output": serde_json::to_string(&output).unwrap_or_default(),
339        }),
340    );
341
342    Advance::End(output)
343}
344
345fn run_fail_state(
346    name: &str,
347    state_def: &Value,
348    input: Value,
349    shared_state: &SharedStepFunctionsState,
350    execution_arn: &str,
351) -> Advance {
352    let error = state_def["Error"]
353        .as_str()
354        .unwrap_or("States.Fail")
355        .to_string();
356    let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
357
358    add_event(
359        shared_state,
360        execution_arn,
361        "FailStateEntered",
362        0,
363        json!({
364            "name": name,
365            "input": serde_json::to_string(&input).unwrap_or_default(),
366        }),
367    );
368
369    Advance::Fail(error, cause)
370}
371
372fn run_choice_state(
373    name: &str,
374    state_def: &Value,
375    input: Value,
376    shared_state: &SharedStepFunctionsState,
377    execution_arn: &str,
378) -> Advance {
379    let entered_event_id = add_event(
380        shared_state,
381        execution_arn,
382        "ChoiceStateEntered",
383        0,
384        json!({
385            "name": name,
386            "input": serde_json::to_string(&input).unwrap_or_default(),
387        }),
388    );
389
390    let input_path = state_def["InputPath"].as_str();
391    let processed_input = if input_path == Some("null") {
392        json!({})
393    } else {
394        apply_input_path(&input, input_path)
395    };
396
397    match evaluate_choice(state_def, &processed_input) {
398        Some(next) => {
399            add_event(
400                shared_state,
401                execution_arn,
402                "ChoiceStateExited",
403                entered_event_id,
404                json!({
405                    "name": name,
406                    "output": serde_json::to_string(&input).unwrap_or_default(),
407                }),
408            );
409            Advance::Next(next, input)
410        }
411        None => Advance::Fail(
412            "States.NoChoiceMatched".to_string(),
413            format!("No choice rule matched and no Default in state '{name}'"),
414        ),
415    }
416}
417
418async fn run_wait_state(
419    name: &str,
420    state_def: &Value,
421    input: Value,
422    shared_state: &SharedStepFunctionsState,
423    execution_arn: &str,
424) -> Advance {
425    let entered_event_id = add_event(
426        shared_state,
427        execution_arn,
428        "WaitStateEntered",
429        0,
430        json!({
431            "name": name,
432            "input": serde_json::to_string(&input).unwrap_or_default(),
433        }),
434    );
435
436    execute_wait_state(state_def, &input).await;
437
438    add_event(
439        shared_state,
440        execution_arn,
441        "WaitStateExited",
442        entered_event_id,
443        json!({
444            "name": name,
445            "output": serde_json::to_string(&input).unwrap_or_default(),
446        }),
447    );
448
449    advance_from_next(state_def, input)
450}
451
452#[allow(clippy::too_many_arguments)]
453async fn run_task_state(
454    name: &str,
455    state_def: &Value,
456    input: Value,
457    delivery: &Option<Arc<DeliveryBus>>,
458    dynamodb_state: &Option<SharedDynamoDbState>,
459    shared_state: &SharedStepFunctionsState,
460    execution_arn: &str,
461) -> Advance {
462    let entered_event_id = add_event(
463        shared_state,
464        execution_arn,
465        "TaskStateEntered",
466        0,
467        json!({
468            "name": name,
469            "input": serde_json::to_string(&input).unwrap_or_default(),
470        }),
471    );
472
473    let result = execute_task_state(
474        state_def,
475        &input,
476        delivery,
477        dynamodb_state,
478        shared_state,
479        execution_arn,
480        entered_event_id,
481    )
482    .await;
483
484    match result {
485        Ok(output) => {
486            add_event(
487                shared_state,
488                execution_arn,
489                "TaskStateExited",
490                entered_event_id,
491                json!({
492                    "name": name,
493                    "output": serde_json::to_string(&output).unwrap_or_default(),
494                }),
495            );
496            advance_from_next(state_def, output)
497        }
498        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
499    }
500}
501
502#[allow(clippy::too_many_arguments)]
503async fn run_parallel_state(
504    name: &str,
505    state_def: &Value,
506    input: Value,
507    delivery: &Option<Arc<DeliveryBus>>,
508    dynamodb_state: &Option<SharedDynamoDbState>,
509    shared_state: &SharedStepFunctionsState,
510    execution_arn: &str,
511) -> Advance {
512    let entered_event_id = add_event(
513        shared_state,
514        execution_arn,
515        "ParallelStateEntered",
516        0,
517        json!({
518            "name": name,
519            "input": serde_json::to_string(&input).unwrap_or_default(),
520        }),
521    );
522
523    let result = execute_parallel_state(
524        state_def,
525        &input,
526        delivery,
527        dynamodb_state,
528        shared_state,
529        execution_arn,
530    )
531    .await;
532
533    match result {
534        Ok(output) => {
535            add_event(
536                shared_state,
537                execution_arn,
538                "ParallelStateExited",
539                entered_event_id,
540                json!({
541                    "name": name,
542                    "output": serde_json::to_string(&output).unwrap_or_default(),
543                }),
544            );
545            advance_from_next(state_def, output)
546        }
547        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
548    }
549}
550
551#[allow(clippy::too_many_arguments)]
552async fn run_map_state(
553    name: &str,
554    state_def: &Value,
555    input: Value,
556    delivery: &Option<Arc<DeliveryBus>>,
557    dynamodb_state: &Option<SharedDynamoDbState>,
558    shared_state: &SharedStepFunctionsState,
559    execution_arn: &str,
560) -> Advance {
561    let entered_event_id = add_event(
562        shared_state,
563        execution_arn,
564        "MapStateEntered",
565        0,
566        json!({
567            "name": name,
568            "input": serde_json::to_string(&input).unwrap_or_default(),
569        }),
570    );
571
572    let result = execute_map_state(
573        state_def,
574        &input,
575        delivery,
576        dynamodb_state,
577        shared_state,
578        execution_arn,
579    )
580    .await;
581
582    match result {
583        Ok(output) => {
584            add_event(
585                shared_state,
586                execution_arn,
587                "MapStateExited",
588                entered_event_id,
589                json!({
590                    "name": name,
591                    "output": serde_json::to_string(&output).unwrap_or_default(),
592                }),
593            );
594            advance_from_next(state_def, output)
595        }
596        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
597    }
598}
599
600/// Execute a Wait state: pause execution for a specified duration or until a timestamp.
601async fn execute_wait_state(state_def: &Value, input: &Value) {
602    if let Some(seconds) = state_def["Seconds"].as_u64() {
603        tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
604        return;
605    }
606
607    if let Some(path) = state_def["SecondsPath"].as_str() {
608        let val = crate::io_processing::resolve_path(input, path);
609        if let Some(seconds) = val.as_u64() {
610            tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
611        }
612        return;
613    }
614
615    if let Some(ts_str) = state_def["Timestamp"].as_str() {
616        if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
617            let now = Utc::now();
618            let target_utc = target.with_timezone(&chrono::Utc);
619            if target_utc > now {
620                let duration = (target_utc - now).to_std().unwrap_or_default();
621                tokio::time::sleep(duration).await;
622            }
623        }
624        return;
625    }
626
627    if let Some(path) = state_def["TimestampPath"].as_str() {
628        let val = crate::io_processing::resolve_path(input, path);
629        if let Some(ts_str) = val.as_str() {
630            if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
631                let now = Utc::now();
632                let target_utc = target.with_timezone(&chrono::Utc);
633                if target_utc > now {
634                    let duration = (target_utc - now).to_std().unwrap_or_default();
635                    tokio::time::sleep(duration).await;
636                }
637            }
638        }
639        return;
640    }
641
642    warn!(
643        "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
644    );
645}
646
647/// Execute a Pass state: apply InputPath, use Result if present, apply ResultPath and OutputPath.
648fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
649    let input_path = state_def["InputPath"].as_str();
650    let result_path = state_def["ResultPath"].as_str();
651    let output_path = state_def["OutputPath"].as_str();
652
653    let effective_input = if input_path == Some("null") {
654        json!({})
655    } else {
656        apply_input_path(input, input_path)
657    };
658
659    let result = if let Some(r) = state_def.get("Result") {
660        r.clone()
661    } else {
662        effective_input.clone()
663    };
664
665    let after_result = if result_path == Some("null") {
666        input.clone()
667    } else {
668        apply_result_path(input, &result, result_path)
669    };
670
671    if output_path == Some("null") {
672        json!({})
673    } else {
674        apply_output_path(&after_result, output_path)
675    }
676}
677
678/// Execute a Task state: invoke the resource (Lambda, SQS, SNS, EventBridge, DynamoDB),
679/// apply I/O processing, handle Retry.
680async fn execute_task_state(
681    state_def: &Value,
682    input: &Value,
683    delivery: &Option<Arc<DeliveryBus>>,
684    dynamodb_state: &Option<SharedDynamoDbState>,
685    shared_state: &SharedStepFunctionsState,
686    execution_arn: &str,
687    entered_event_id: i64,
688) -> Result<Value, (String, String)> {
689    let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
690
691    let input_path = state_def["InputPath"].as_str();
692    let result_path = state_def["ResultPath"].as_str();
693    let output_path = state_def["OutputPath"].as_str();
694
695    let effective_input = if input_path == Some("null") {
696        json!({})
697    } else {
698        apply_input_path(input, input_path)
699    };
700
701    let task_input = if let Some(params) = state_def.get("Parameters") {
702        apply_parameters(params, &effective_input)
703    } else {
704        effective_input
705    };
706
707    let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
708    let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
709    let mut attempt = 0u32;
710
711    loop {
712        add_event(
713            shared_state,
714            execution_arn,
715            "TaskScheduled",
716            entered_event_id,
717            json!({
718                "resource": resource,
719                "region": "us-east-1",
720                "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
721            }),
722        );
723
724        add_event(
725            shared_state,
726            execution_arn,
727            "TaskStarted",
728            entered_event_id,
729            json!({ "resource": resource }),
730        );
731
732        let invoke_result = invoke_resource(
733            &resource,
734            &task_input,
735            delivery,
736            dynamodb_state,
737            timeout_seconds,
738        )
739        .await;
740
741        match invoke_result {
742            Ok(result) => {
743                add_event(
744                    shared_state,
745                    execution_arn,
746                    "TaskSucceeded",
747                    entered_event_id,
748                    json!({
749                        "resource": resource,
750                        "output": serde_json::to_string(&result).unwrap_or_default(),
751                    }),
752                );
753
754                let selected = if let Some(selector) = state_def.get("ResultSelector") {
755                    apply_parameters(selector, &result)
756                } else {
757                    result
758                };
759
760                let after_result = if result_path == Some("null") {
761                    input.clone()
762                } else {
763                    apply_result_path(input, &selected, result_path)
764                };
765
766                let output = if output_path == Some("null") {
767                    json!({})
768                } else {
769                    apply_output_path(&after_result, output_path)
770                };
771
772                return Ok(output);
773            }
774            Err((error, cause)) => {
775                add_event(
776                    shared_state,
777                    execution_arn,
778                    "TaskFailed",
779                    entered_event_id,
780                    json!({ "error": error, "cause": cause }),
781                );
782
783                if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
784                    attempt += 1;
785                    let actual_delay = delay_ms.min(5000);
786                    tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
787                    continue;
788                }
789
790                return Err((error, cause));
791            }
792        }
793    }
794}
795
796/// Execute a Parallel state: run all branches concurrently, collect results into an array.
797async fn execute_parallel_state(
798    state_def: &Value,
799    input: &Value,
800    delivery: &Option<Arc<DeliveryBus>>,
801    dynamodb_state: &Option<SharedDynamoDbState>,
802    shared_state: &SharedStepFunctionsState,
803    execution_arn: &str,
804) -> Result<Value, (String, String)> {
805    let input_path = state_def["InputPath"].as_str();
806    let result_path = state_def["ResultPath"].as_str();
807    let output_path = state_def["OutputPath"].as_str();
808
809    let effective_input = if input_path == Some("null") {
810        json!({})
811    } else {
812        apply_input_path(input, input_path)
813    };
814
815    let branches = state_def["Branches"]
816        .as_array()
817        .cloned()
818        .unwrap_or_default();
819
820    if branches.is_empty() {
821        return Err((
822            "States.Runtime".to_string(),
823            "Parallel state has no Branches".to_string(),
824        ));
825    }
826
827    // Spawn all branches concurrently
828    let mut handles = Vec::new();
829    for branch_def in &branches {
830        let branch = branch_def.clone();
831        let branch_input = effective_input.clone();
832        let delivery = delivery.clone();
833        let ddb = dynamodb_state.clone();
834        let state = shared_state.clone();
835        let arn = execution_arn.to_string();
836
837        handles.push(tokio::spawn(async move {
838            run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
839        }));
840    }
841
842    // Collect results in order
843    let mut results = Vec::with_capacity(handles.len());
844    for handle in handles {
845        let result = handle.await.map_err(|e| {
846            (
847                "States.Runtime".to_string(),
848                format!("Branch execution panicked: {e}"),
849            )
850        })??;
851        results.push(result);
852    }
853
854    let branch_output = Value::Array(results);
855
856    // Apply ResultSelector if present
857    let selected = if let Some(selector) = state_def.get("ResultSelector") {
858        apply_parameters(selector, &branch_output)
859    } else {
860        branch_output
861    };
862
863    // Apply ResultPath
864    let after_result = if result_path == Some("null") {
865        input.clone()
866    } else {
867        apply_result_path(input, &selected, result_path)
868    };
869
870    // Apply OutputPath
871    let output = if output_path == Some("null") {
872        json!({})
873    } else {
874        apply_output_path(&after_result, output_path)
875    };
876
877    Ok(output)
878}
879
880/// Execute a Map state: iterate over an array and run a sub-state machine per item.
881async fn execute_map_state(
882    state_def: &Value,
883    input: &Value,
884    delivery: &Option<Arc<DeliveryBus>>,
885    dynamodb_state: &Option<SharedDynamoDbState>,
886    shared_state: &SharedStepFunctionsState,
887    execution_arn: &str,
888) -> Result<Value, (String, String)> {
889    let input_path = state_def["InputPath"].as_str();
890    let result_path = state_def["ResultPath"].as_str();
891    let output_path = state_def["OutputPath"].as_str();
892
893    let effective_input = if input_path == Some("null") {
894        json!({})
895    } else {
896        apply_input_path(input, input_path)
897    };
898
899    // Get the items to iterate over
900    let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
901    let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
902    let items = items_value.as_array().cloned().unwrap_or_default();
903
904    // Get the iterator definition (ItemProcessor or Iterator for backwards compat)
905    let iterator_def = state_def
906        .get("ItemProcessor")
907        .or_else(|| state_def.get("Iterator"))
908        .cloned()
909        .ok_or_else(|| {
910            (
911                "States.Runtime".to_string(),
912                "Map state has no ItemProcessor or Iterator".to_string(),
913            )
914        })?;
915
916    let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
917    let effective_concurrency = if max_concurrency == 0 {
918        40
919    } else {
920        max_concurrency as usize
921    };
922
923    let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
924
925    // Process all items
926    let mut handles = Vec::new();
927    for (index, item) in items.into_iter().enumerate() {
928        let iter_def = iterator_def.clone();
929        let delivery = delivery.clone();
930        let ddb = dynamodb_state.clone();
931        let state = shared_state.clone();
932        let arn = execution_arn.to_string();
933        let sem = semaphore.clone();
934
935        // Apply ItemSelector if present
936        let item_input = if let Some(selector) = state_def.get("ItemSelector") {
937            let mut ctx = serde_json::Map::new();
938            ctx.insert("value".to_string(), item.clone());
939            ctx.insert("index".to_string(), json!(index));
940            apply_parameters(selector, &Value::Object(ctx))
941        } else {
942            item
943        };
944
945        add_event(
946            shared_state,
947            execution_arn,
948            "MapIterationStarted",
949            0,
950            json!({ "index": index }),
951        );
952
953        handles.push(tokio::spawn(async move {
954            let _permit = sem
955                .acquire()
956                .await
957                .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
958            let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
959            Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
960        }));
961    }
962
963    // Collect results in order
964    let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
965    for handle in handles {
966        let (index, result) = handle.await.map_err(|e| {
967            (
968                "States.Runtime".to_string(),
969                format!("Map iteration panicked: {e}"),
970            )
971        })??;
972
973        match result {
974            Ok(output) => {
975                add_event(
976                    shared_state,
977                    execution_arn,
978                    "MapIterationSucceeded",
979                    0,
980                    json!({ "index": index }),
981                );
982                results.push((index, output));
983            }
984            Err((error, cause)) => {
985                add_event(
986                    shared_state,
987                    execution_arn,
988                    "MapIterationFailed",
989                    0,
990                    json!({ "index": index, "error": error }),
991                );
992                return Err((error, cause));
993            }
994        }
995    }
996
997    // Sort by index to maintain order
998    results.sort_by_key(|(i, _)| *i);
999    let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
1000
1001    // Apply ResultSelector if present
1002    let selected = if let Some(selector) = state_def.get("ResultSelector") {
1003        apply_parameters(selector, &map_output)
1004    } else {
1005        map_output
1006    };
1007
1008    // Apply ResultPath
1009    let after_result = if result_path == Some("null") {
1010        input.clone()
1011    } else {
1012        apply_result_path(input, &selected, result_path)
1013    };
1014
1015    // Apply OutputPath
1016    let output = if output_path == Some("null") {
1017        json!({})
1018    } else {
1019        apply_output_path(&after_result, output_path)
1020    };
1021
1022    Ok(output)
1023}
1024
1025/// Invoke a resource (Lambda function or SDK integration).
1026async fn invoke_resource(
1027    resource: &str,
1028    input: &Value,
1029    delivery: &Option<Arc<DeliveryBus>>,
1030    dynamodb_state: &Option<SharedDynamoDbState>,
1031    timeout_seconds: Option<u64>,
1032) -> Result<Value, (String, String)> {
1033    // Direct Lambda ARN: arn:aws:lambda:<region>:<account>:function:<name>
1034    if resource.contains(":lambda:") && resource.contains(":function:") {
1035        return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1036    }
1037
1038    // SDK integration patterns: arn:aws:states:::<service>:<action>
1039    if resource.starts_with("arn:aws:states:::lambda:invoke") {
1040        let function_name = input["FunctionName"].as_str().unwrap_or("");
1041        let payload = if let Some(p) = input.get("Payload") {
1042            p.clone()
1043        } else {
1044            input.clone()
1045        };
1046        return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1047    }
1048
1049    if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1050        return invoke_sqs_send_message(input, delivery);
1051    }
1052
1053    if resource.starts_with("arn:aws:states:::sns:publish") {
1054        return invoke_sns_publish(input, delivery);
1055    }
1056
1057    if resource.starts_with("arn:aws:states:::events:putEvents") {
1058        return invoke_eventbridge_put_events(input, delivery);
1059    }
1060
1061    if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1062        return invoke_dynamodb_get_item(input, dynamodb_state);
1063    }
1064
1065    if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1066        return invoke_dynamodb_put_item(input, dynamodb_state);
1067    }
1068
1069    if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1070        return invoke_dynamodb_delete_item(input, dynamodb_state);
1071    }
1072
1073    if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1074        return invoke_dynamodb_update_item(input, dynamodb_state);
1075    }
1076
1077    Err((
1078        "States.TaskFailed".to_string(),
1079        format!("Unsupported resource: {resource}"),
1080    ))
1081}
1082
1083/// Send a message to an SQS queue via DeliveryBus.
1084fn invoke_sqs_send_message(
1085    input: &Value,
1086    delivery: &Option<Arc<DeliveryBus>>,
1087) -> Result<Value, (String, String)> {
1088    let delivery = delivery.as_ref().ok_or_else(|| {
1089        (
1090            "States.TaskFailed".to_string(),
1091            "No delivery bus configured for SQS".to_string(),
1092        )
1093    })?;
1094
1095    let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1096        (
1097            "States.TaskFailed".to_string(),
1098            "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1099        )
1100    })?;
1101
1102    let message_body = input["MessageBody"]
1103        .as_str()
1104        .map(|s| s.to_string())
1105        .unwrap_or_else(|| {
1106            // If MessageBody is not a string, serialize the value
1107            serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1108        });
1109
1110    // Convert QueueUrl to ARN format for the delivery bus
1111    // QueueUrl format: http://.../<account>/<queue-name>
1112    // ARN format: arn:aws:sqs:<region>:<account>:<queue-name>
1113    let queue_arn = queue_url_to_arn(queue_url);
1114
1115    delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1116
1117    Ok(json!({
1118        "MessageId": uuid::Uuid::new_v4().to_string(),
1119        "MD5OfMessageBody": md5_hex(&message_body),
1120    }))
1121}
1122
1123/// Publish a message to an SNS topic via DeliveryBus.
1124fn invoke_sns_publish(
1125    input: &Value,
1126    delivery: &Option<Arc<DeliveryBus>>,
1127) -> Result<Value, (String, String)> {
1128    let delivery = delivery.as_ref().ok_or_else(|| {
1129        (
1130            "States.TaskFailed".to_string(),
1131            "No delivery bus configured for SNS".to_string(),
1132        )
1133    })?;
1134
1135    let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1136        (
1137            "States.TaskFailed".to_string(),
1138            "Missing TopicArn in SNS publish parameters".to_string(),
1139        )
1140    })?;
1141
1142    let message = input["Message"]
1143        .as_str()
1144        .map(|s| s.to_string())
1145        .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1146
1147    let subject = input["Subject"].as_str();
1148
1149    delivery.publish_to_sns(topic_arn, &message, subject);
1150
1151    Ok(json!({
1152        "MessageId": uuid::Uuid::new_v4().to_string(),
1153    }))
1154}
1155
1156/// Put events onto an EventBridge bus via DeliveryBus.
1157fn invoke_eventbridge_put_events(
1158    input: &Value,
1159    delivery: &Option<Arc<DeliveryBus>>,
1160) -> Result<Value, (String, String)> {
1161    let delivery = delivery.as_ref().ok_or_else(|| {
1162        (
1163            "States.TaskFailed".to_string(),
1164            "No delivery bus configured for EventBridge".to_string(),
1165        )
1166    })?;
1167
1168    let entries = input["Entries"]
1169        .as_array()
1170        .ok_or_else(|| {
1171            (
1172                "States.TaskFailed".to_string(),
1173                "Missing Entries in EventBridge putEvents parameters".to_string(),
1174            )
1175        })?
1176        .clone();
1177
1178    let mut event_ids = Vec::new();
1179    for entry in &entries {
1180        let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1181        let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1182        let detail = entry["Detail"]
1183            .as_str()
1184            .map(|s| s.to_string())
1185            .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1186        let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1187
1188        delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1189        event_ids.push(uuid::Uuid::new_v4().to_string());
1190    }
1191
1192    Ok(json!({
1193        "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1194        "FailedEntryCount": 0,
1195    }))
1196}
1197
1198/// Get an item from DynamoDB via direct state access.
1199fn invoke_dynamodb_get_item(
1200    input: &Value,
1201    dynamodb_state: &Option<SharedDynamoDbState>,
1202) -> Result<Value, (String, String)> {
1203    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1204        (
1205            "States.TaskFailed".to_string(),
1206            "No DynamoDB state configured".to_string(),
1207        )
1208    })?;
1209
1210    let table_name = input["TableName"].as_str().ok_or_else(|| {
1211        (
1212            "States.TaskFailed".to_string(),
1213            "Missing TableName in DynamoDB getItem parameters".to_string(),
1214        )
1215    })?;
1216
1217    let key = input
1218        .get("Key")
1219        .and_then(|k| k.as_object())
1220        .ok_or_else(|| {
1221            (
1222                "States.TaskFailed".to_string(),
1223                "Missing Key in DynamoDB getItem parameters".to_string(),
1224            )
1225        })?;
1226
1227    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1228
1229    let state = ddb.read();
1230    let table = state.tables.get(table_name).ok_or_else(|| {
1231        (
1232            "States.TaskFailed".to_string(),
1233            format!("Table '{table_name}' not found"),
1234        )
1235    })?;
1236
1237    let item = table
1238        .find_item_index(&key_map)
1239        .map(|idx| table.items[idx].clone());
1240
1241    match item {
1242        Some(item_map) => {
1243            let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1244            Ok(json!({ "Item": item_value }))
1245        }
1246        None => Ok(json!({})),
1247    }
1248}
1249
1250/// Put an item into DynamoDB via direct state access.
1251fn invoke_dynamodb_put_item(
1252    input: &Value,
1253    dynamodb_state: &Option<SharedDynamoDbState>,
1254) -> Result<Value, (String, String)> {
1255    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1256        (
1257            "States.TaskFailed".to_string(),
1258            "No DynamoDB state configured".to_string(),
1259        )
1260    })?;
1261
1262    let table_name = input["TableName"].as_str().ok_or_else(|| {
1263        (
1264            "States.TaskFailed".to_string(),
1265            "Missing TableName in DynamoDB putItem parameters".to_string(),
1266        )
1267    })?;
1268
1269    let item = input
1270        .get("Item")
1271        .and_then(|i| i.as_object())
1272        .ok_or_else(|| {
1273            (
1274                "States.TaskFailed".to_string(),
1275                "Missing Item in DynamoDB putItem parameters".to_string(),
1276            )
1277        })?;
1278
1279    let item_map: HashMap<String, Value> =
1280        item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1281
1282    let mut state = ddb.write();
1283    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1284        (
1285            "States.TaskFailed".to_string(),
1286            format!("Table '{table_name}' not found"),
1287        )
1288    })?;
1289
1290    // Replace existing item with same key, or insert new
1291    if let Some(idx) = table.find_item_index(&item_map) {
1292        table.items[idx] = item_map;
1293    } else {
1294        table.items.push(item_map);
1295    }
1296
1297    Ok(json!({}))
1298}
1299
1300/// Delete an item from DynamoDB via direct state access.
1301fn invoke_dynamodb_delete_item(
1302    input: &Value,
1303    dynamodb_state: &Option<SharedDynamoDbState>,
1304) -> Result<Value, (String, String)> {
1305    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1306        (
1307            "States.TaskFailed".to_string(),
1308            "No DynamoDB state configured".to_string(),
1309        )
1310    })?;
1311
1312    let table_name = input["TableName"].as_str().ok_or_else(|| {
1313        (
1314            "States.TaskFailed".to_string(),
1315            "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1316        )
1317    })?;
1318
1319    let key = input
1320        .get("Key")
1321        .and_then(|k| k.as_object())
1322        .ok_or_else(|| {
1323            (
1324                "States.TaskFailed".to_string(),
1325                "Missing Key in DynamoDB deleteItem parameters".to_string(),
1326            )
1327        })?;
1328
1329    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1330
1331    let mut state = ddb.write();
1332    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1333        (
1334            "States.TaskFailed".to_string(),
1335            format!("Table '{table_name}' not found"),
1336        )
1337    })?;
1338
1339    if let Some(idx) = table.find_item_index(&key_map) {
1340        table.items.remove(idx);
1341    }
1342
1343    Ok(json!({}))
1344}
1345
1346/// Update an item in DynamoDB via direct state access.
1347/// Simplified: merges Key+ExpressionAttributeValues into the existing item.
1348fn invoke_dynamodb_update_item(
1349    input: &Value,
1350    dynamodb_state: &Option<SharedDynamoDbState>,
1351) -> Result<Value, (String, String)> {
1352    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1353        (
1354            "States.TaskFailed".to_string(),
1355            "No DynamoDB state configured".to_string(),
1356        )
1357    })?;
1358
1359    let table_name = input["TableName"].as_str().ok_or_else(|| {
1360        (
1361            "States.TaskFailed".to_string(),
1362            "Missing TableName in DynamoDB updateItem parameters".to_string(),
1363        )
1364    })?;
1365
1366    let key = input
1367        .get("Key")
1368        .and_then(|k| k.as_object())
1369        .ok_or_else(|| {
1370            (
1371                "States.TaskFailed".to_string(),
1372                "Missing Key in DynamoDB updateItem parameters".to_string(),
1373            )
1374        })?;
1375
1376    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1377
1378    let mut state = ddb.write();
1379    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1380        (
1381            "States.TaskFailed".to_string(),
1382            format!("Table '{table_name}' not found"),
1383        )
1384    })?;
1385
1386    // Parse UpdateExpression to apply SET operations
1387    if let Some(update_expr) = input["UpdateExpression"].as_str() {
1388        let attr_values = input
1389            .get("ExpressionAttributeValues")
1390            .and_then(|v| v.as_object())
1391            .cloned()
1392            .unwrap_or_default();
1393        let attr_names = input
1394            .get("ExpressionAttributeNames")
1395            .and_then(|v| v.as_object())
1396            .cloned()
1397            .unwrap_or_default();
1398
1399        if let Some(idx) = table.find_item_index(&key_map) {
1400            apply_update_expression(
1401                &mut table.items[idx],
1402                update_expr,
1403                &attr_values,
1404                &attr_names,
1405            );
1406        } else {
1407            // Create new item with key + update expression values
1408            let mut new_item = key_map;
1409            apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1410            table.items.push(new_item);
1411        }
1412    }
1413
1414    Ok(json!({}))
1415}
1416
1417/// Apply a simple SET UpdateExpression to an item.
1418fn apply_update_expression(
1419    item: &mut HashMap<String, Value>,
1420    expr: &str,
1421    attr_values: &serde_json::Map<String, Value>,
1422    attr_names: &serde_json::Map<String, Value>,
1423) {
1424    // Parse "SET #name1 = :val1, #name2 = :val2" or "SET field = :val"
1425    // DynamoDB keywords are case-insensitive
1426    let trimmed = expr.trim();
1427    let set_part = if trimmed.len() >= 4 && trimmed[..4].eq_ignore_ascii_case("SET ") {
1428        &trimmed[4..]
1429    } else {
1430        trimmed
1431    };
1432
1433    for assignment in set_part.split(',') {
1434        let parts: Vec<&str> = assignment.splitn(2, '=').collect();
1435        if parts.len() == 2 {
1436            let attr_ref = parts[0].trim();
1437            let val_ref = parts[1].trim();
1438
1439            // Resolve attribute name (could be #alias or direct name)
1440            let attr_name = if attr_ref.starts_with('#') {
1441                attr_names
1442                    .get(attr_ref)
1443                    .and_then(|v| v.as_str())
1444                    .unwrap_or(attr_ref)
1445                    .to_string()
1446            } else {
1447                attr_ref.to_string()
1448            };
1449
1450            // Resolve value
1451            if val_ref.starts_with(':') {
1452                if let Some(val) = attr_values.get(val_ref) {
1453                    item.insert(attr_name, val.clone());
1454                }
1455            }
1456        }
1457    }
1458}
1459
1460/// Convert an SQS queue URL to an ARN.
1461/// QueueUrl format: http://localhost:4566/123456789012/my-queue
1462fn queue_url_to_arn(url: &str) -> String {
1463    let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1464    if parts.len() >= 2 {
1465        let queue_name = parts[0];
1466        let account_id = parts[1];
1467        Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
1468    } else {
1469        url.to_string()
1470    }
1471}
1472
1473/// Compute MD5 hex digest for SQS message response format.
1474fn md5_hex(data: &str) -> String {
1475    use md5::Digest;
1476    let result = md5::Md5::digest(data.as_bytes());
1477    format!("{result:032x}")
1478}
1479
1480/// Invoke a Lambda function directly via DeliveryBus.
1481async fn invoke_lambda_direct(
1482    function_arn: &str,
1483    input: &Value,
1484    delivery: &Option<Arc<DeliveryBus>>,
1485    timeout_seconds: Option<u64>,
1486) -> Result<Value, (String, String)> {
1487    let delivery = delivery.as_ref().ok_or_else(|| {
1488        (
1489            "States.TaskFailed".to_string(),
1490            "No delivery bus configured for Lambda invocation".to_string(),
1491        )
1492    })?;
1493
1494    let payload = serde_json::to_string(input).unwrap_or_default();
1495
1496    let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1497
1498    let result = if let Some(timeout) = timeout_seconds {
1499        match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1500            Ok(r) => r,
1501            Err(_) => {
1502                return Err((
1503                    "States.Timeout".to_string(),
1504                    format!("Task timed out after {timeout} seconds"),
1505                ));
1506            }
1507        }
1508    } else {
1509        invoke_future.await
1510    };
1511
1512    match result {
1513        Some(Ok(bytes)) => {
1514            let response_str = String::from_utf8_lossy(&bytes);
1515            let value: Value =
1516                serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1517            Ok(value)
1518        }
1519        Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1520        None => {
1521            // No runtime available — return empty result
1522            Ok(json!({}))
1523        }
1524    }
1525}
1526
1527/// Apply Parameters template: keys ending with .$ are treated as JsonPath references.
1528fn apply_parameters(template: &Value, input: &Value) -> Value {
1529    match template {
1530        Value::Object(map) => {
1531            let mut result = serde_json::Map::new();
1532            for (key, value) in map {
1533                if let Some(stripped) = key.strip_suffix(".$") {
1534                    if let Some(path) = value.as_str() {
1535                        result.insert(
1536                            stripped.to_string(),
1537                            crate::io_processing::resolve_path(input, path),
1538                        );
1539                    }
1540                } else {
1541                    result.insert(key.clone(), apply_parameters(value, input));
1542                }
1543            }
1544            Value::Object(result)
1545        }
1546        Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1547        other => other.clone(),
1548    }
1549}
1550
1551enum NextState {
1552    Name(String),
1553    End,
1554    Error(String),
1555}
1556
1557fn next_state(state_def: &Value) -> NextState {
1558    if state_def["End"].as_bool() == Some(true) {
1559        return NextState::End;
1560    }
1561    match state_def["Next"].as_str() {
1562        Some(next) => NextState::Name(next.to_string()),
1563        None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
1564    }
1565}
1566
1567/// Find the first `Catch` clause on `state_def` that matches `error` and
1568/// apply its `ResultPath` to produce the state to transition to and the
1569/// new effective input. Returns None when no catcher applies, in which
1570/// case the error should propagate up.
1571fn apply_state_catcher(
1572    state_def: &Value,
1573    effective_input: &Value,
1574    error: &str,
1575    cause: &str,
1576) -> Option<(String, Value)> {
1577    let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
1578    let (next, result_path) = find_catcher(&catchers, error)?;
1579    let error_output = json!({
1580        "Error": error,
1581        "Cause": cause,
1582    });
1583    let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
1584    Some((next, new_input))
1585}
1586
1587fn add_event(
1588    state: &SharedStepFunctionsState,
1589    execution_arn: &str,
1590    event_type: &str,
1591    previous_event_id: i64,
1592    details: Value,
1593) -> i64 {
1594    let mut s = state.write();
1595    if let Some(exec) = s.executions.get_mut(execution_arn) {
1596        let id = exec.history_events.len() as i64 + 1;
1597        exec.history_events.push(HistoryEvent {
1598            id,
1599            event_type: event_type.to_string(),
1600            timestamp: Utc::now(),
1601            previous_event_id,
1602            details,
1603        });
1604        id
1605    } else {
1606        0
1607    }
1608}
1609
1610fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
1611    // Check terminal status before recording events to avoid inconsistent history
1612    {
1613        let s = state.read();
1614        if let Some(exec) = s.executions.get(execution_arn) {
1615            if exec.status != ExecutionStatus::Running {
1616                return;
1617            }
1618        }
1619    }
1620
1621    let output_str = serde_json::to_string(output).unwrap_or_default();
1622
1623    add_event(
1624        state,
1625        execution_arn,
1626        "ExecutionSucceeded",
1627        0,
1628        json!({ "output": output_str }),
1629    );
1630
1631    let mut s = state.write();
1632    if let Some(exec) = s.executions.get_mut(execution_arn) {
1633        exec.status = ExecutionStatus::Succeeded;
1634        exec.output = Some(output_str);
1635        exec.stop_date = Some(Utc::now());
1636    }
1637}
1638
1639fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
1640    // Check terminal status before recording events to avoid inconsistent history
1641    {
1642        let s = state.read();
1643        if let Some(exec) = s.executions.get(execution_arn) {
1644            if exec.status != ExecutionStatus::Running {
1645                return;
1646            }
1647        }
1648    }
1649
1650    add_event(
1651        state,
1652        execution_arn,
1653        "ExecutionFailed",
1654        0,
1655        json!({ "error": error, "cause": cause }),
1656    );
1657
1658    let mut s = state.write();
1659    if let Some(exec) = s.executions.get_mut(execution_arn) {
1660        exec.status = ExecutionStatus::Failed;
1661        exec.error = Some(error.to_string());
1662        exec.cause = Some(cause.to_string());
1663        exec.stop_date = Some(Utc::now());
1664    }
1665}
1666
1667#[cfg(test)]
1668mod tests {
1669    use super::*;
1670    use crate::state::{Execution, StepFunctionsState};
1671    use parking_lot::RwLock;
1672    use std::sync::Arc;
1673
1674    fn make_state() -> SharedStepFunctionsState {
1675        Arc::new(RwLock::new(StepFunctionsState::new(
1676            "123456789012",
1677            "us-east-1",
1678        )))
1679    }
1680
1681    fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
1682        let mut s = state.write();
1683        s.executions.insert(
1684            arn.to_string(),
1685            Execution {
1686                execution_arn: arn.to_string(),
1687                state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
1688                    .to_string(),
1689                state_machine_name: "test".to_string(),
1690                name: "exec-1".to_string(),
1691                status: ExecutionStatus::Running,
1692                input,
1693                output: None,
1694                start_date: Utc::now(),
1695                stop_date: None,
1696                error: None,
1697                cause: None,
1698                history_events: vec![],
1699            },
1700        );
1701    }
1702
1703    #[tokio::test]
1704    async fn test_simple_pass_state() {
1705        let state = make_state();
1706        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1707        create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
1708
1709        let definition = json!({
1710            "StartAt": "PassState",
1711            "States": {
1712                "PassState": {
1713                    "Type": "Pass",
1714                    "Result": {"processed": true},
1715                    "End": true
1716                }
1717            }
1718        })
1719        .to_string();
1720
1721        execute_state_machine(
1722            state.clone(),
1723            arn.to_string(),
1724            definition,
1725            Some(r#"{"hello":"world"}"#.to_string()),
1726            None,
1727            None,
1728        )
1729        .await;
1730
1731        let s = state.read();
1732        let exec = s.executions.get(arn).unwrap();
1733        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1734        assert!(exec.output.is_some());
1735        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1736        assert_eq!(output, json!({"processed": true}));
1737    }
1738
1739    #[tokio::test]
1740    async fn test_pass_chain() {
1741        let state = make_state();
1742        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1743        create_execution(&state, arn, Some(r#"{}"#.to_string()));
1744
1745        let definition = json!({
1746            "StartAt": "First",
1747            "States": {
1748                "First": {
1749                    "Type": "Pass",
1750                    "Result": "step1",
1751                    "ResultPath": "$.first",
1752                    "Next": "Second"
1753                },
1754                "Second": {
1755                    "Type": "Pass",
1756                    "Result": "step2",
1757                    "ResultPath": "$.second",
1758                    "End": true
1759                }
1760            }
1761        })
1762        .to_string();
1763
1764        execute_state_machine(
1765            state.clone(),
1766            arn.to_string(),
1767            definition,
1768            Some("{}".to_string()),
1769            None,
1770            None,
1771        )
1772        .await;
1773
1774        let s = state.read();
1775        let exec = s.executions.get(arn).unwrap();
1776        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1777        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1778        assert_eq!(output["first"], json!("step1"));
1779        assert_eq!(output["second"], json!("step2"));
1780    }
1781
1782    #[tokio::test]
1783    async fn test_succeed_state() {
1784        let state = make_state();
1785        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1786        create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
1787
1788        let definition = json!({
1789            "StartAt": "Done",
1790            "States": {
1791                "Done": {
1792                    "Type": "Succeed"
1793                }
1794            }
1795        })
1796        .to_string();
1797
1798        execute_state_machine(
1799            state.clone(),
1800            arn.to_string(),
1801            definition,
1802            Some(r#"{"data": "value"}"#.to_string()),
1803            None,
1804            None,
1805        )
1806        .await;
1807
1808        let s = state.read();
1809        let exec = s.executions.get(arn).unwrap();
1810        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1811    }
1812
1813    #[tokio::test]
1814    async fn test_fail_state() {
1815        let state = make_state();
1816        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1817        create_execution(&state, arn, None);
1818
1819        let definition = json!({
1820            "StartAt": "FailState",
1821            "States": {
1822                "FailState": {
1823                    "Type": "Fail",
1824                    "Error": "CustomError",
1825                    "Cause": "Something went wrong"
1826                }
1827            }
1828        })
1829        .to_string();
1830
1831        execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
1832
1833        let s = state.read();
1834        let exec = s.executions.get(arn).unwrap();
1835        assert_eq!(exec.status, ExecutionStatus::Failed);
1836        assert_eq!(exec.error.as_deref(), Some("CustomError"));
1837        assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
1838    }
1839
1840    #[tokio::test]
1841    async fn test_history_events_recorded() {
1842        let state = make_state();
1843        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1844        create_execution(&state, arn, Some("{}".to_string()));
1845
1846        let definition = json!({
1847            "StartAt": "PassState",
1848            "States": {
1849                "PassState": {
1850                    "Type": "Pass",
1851                    "End": true
1852                }
1853            }
1854        })
1855        .to_string();
1856
1857        execute_state_machine(
1858            state.clone(),
1859            arn.to_string(),
1860            definition,
1861            Some("{}".to_string()),
1862            None,
1863            None,
1864        )
1865        .await;
1866
1867        let s = state.read();
1868        let exec = s.executions.get(arn).unwrap();
1869        let event_types: Vec<&str> = exec
1870            .history_events
1871            .iter()
1872            .map(|e| e.event_type.as_str())
1873            .collect();
1874        assert_eq!(
1875            event_types,
1876            vec![
1877                "ExecutionStarted",
1878                "PassStateEntered",
1879                "PassStateExited",
1880                "ExecutionSucceeded"
1881            ]
1882        );
1883    }
1884}