Skip to main content

fakecloud_stepfunctions/
interpreter.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::debug;
7
8use fakecloud_core::delivery::DeliveryBus;
9use fakecloud_dynamodb::state::SharedDynamoDbState;
10
11use crate::choice::evaluate_choice;
12use crate::error_handling::{find_catcher, should_retry};
13use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
14use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
15
16/// Execute a state machine definition with the given input.
17/// Updates the execution record in shared state as it progresses.
18pub async fn execute_state_machine(
19    state: SharedStepFunctionsState,
20    execution_arn: String,
21    definition: String,
22    input: Option<String>,
23    delivery: Option<Arc<DeliveryBus>>,
24    dynamodb_state: Option<SharedDynamoDbState>,
25) {
26    let def: Value = match serde_json::from_str(&definition) {
27        Ok(v) => v,
28        Err(e) => {
29            fail_execution(
30                &state,
31                &execution_arn,
32                "States.Runtime",
33                &format!("Failed to parse definition: {e}"),
34            );
35            return;
36        }
37    };
38
39    let raw_input: Value = input
40        .as_deref()
41        .and_then(|s| serde_json::from_str(s).ok())
42        .unwrap_or(json!({}));
43
44    // Record ExecutionStarted event
45    add_event(
46        &state,
47        &execution_arn,
48        "ExecutionStarted",
49        0,
50        json!({
51            "input": serde_json::to_string(&raw_input).unwrap_or_default(),
52            "roleArn": "arn:aws:iam::123456789012:role/execution-role"
53        }),
54    );
55
56    match run_states(
57        &def,
58        raw_input,
59        &delivery,
60        &dynamodb_state,
61        &state,
62        &execution_arn,
63    )
64    .await
65    {
66        Ok(output) => {
67            succeed_execution(&state, &execution_arn, &output);
68        }
69        Err((error, cause)) => {
70            fail_execution(&state, &execution_arn, &error, &cause);
71        }
72    }
73}
74
75type StatesResult<'a> = std::pin::Pin<
76    Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
77>;
78
79/// Core execution loop: runs through states in a definition and returns the output.
80/// Used by the top-level executor, Parallel branches, and Map iterations.
81fn run_states<'a>(
82    def: &'a Value,
83    input: Value,
84    delivery: &'a Option<Arc<DeliveryBus>>,
85    dynamodb_state: &'a Option<SharedDynamoDbState>,
86    shared_state: &'a SharedStepFunctionsState,
87    execution_arn: &'a str,
88) -> StatesResult<'a> {
89    Box::pin(async move {
90        let start_at = def["StartAt"]
91            .as_str()
92            .ok_or_else(|| {
93                (
94                    "States.Runtime".to_string(),
95                    "Missing StartAt in definition".to_string(),
96                )
97            })?
98            .to_string();
99
100        let states = def.get("States").ok_or_else(|| {
101            (
102                "States.Runtime".to_string(),
103                "Missing States in definition".to_string(),
104            )
105        })?;
106
107        let mut current_state = start_at;
108        let mut effective_input = input;
109        let mut iteration = 0;
110        let max_iterations = 500;
111
112        loop {
113            iteration += 1;
114            if iteration > max_iterations {
115                return Err((
116                    "States.Runtime".to_string(),
117                    "Maximum number of state transitions exceeded".to_string(),
118                ));
119            }
120
121            let state_def = states.get(&current_state).cloned().ok_or_else(|| {
122                (
123                    "States.Runtime".to_string(),
124                    format!("State '{current_state}' not found in definition"),
125                )
126            })?;
127
128            let state_type = state_def["Type"]
129                .as_str()
130                .ok_or_else(|| {
131                    (
132                        "States.Runtime".to_string(),
133                        format!("State '{current_state}' missing Type field"),
134                    )
135                })?
136                .to_string();
137
138            debug!(
139                execution_arn = %execution_arn,
140                state = %current_state,
141                state_type = %state_type,
142                "Executing state"
143            );
144
145            match state_type.as_str() {
146                "Pass" => {
147                    let entered_event_id = add_event(
148                        shared_state,
149                        execution_arn,
150                        "PassStateEntered",
151                        0,
152                        json!({
153                            "name": current_state,
154                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
155                        }),
156                    );
157
158                    let result = execute_pass_state(&state_def, &effective_input);
159
160                    add_event(
161                        shared_state,
162                        execution_arn,
163                        "PassStateExited",
164                        entered_event_id,
165                        json!({
166                            "name": current_state,
167                            "output": serde_json::to_string(&result).unwrap_or_default(),
168                        }),
169                    );
170
171                    effective_input = result;
172
173                    match next_state(&state_def) {
174                        NextState::Name(next) => current_state = next,
175                        NextState::End => return Ok(effective_input),
176                        NextState::Error(msg) => {
177                            return Err(("States.Runtime".to_string(), msg));
178                        }
179                    }
180                }
181
182                "Succeed" => {
183                    add_event(
184                        shared_state,
185                        execution_arn,
186                        "SucceedStateEntered",
187                        0,
188                        json!({
189                            "name": current_state,
190                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
191                        }),
192                    );
193
194                    let input_path = state_def["InputPath"].as_str();
195                    let output_path = state_def["OutputPath"].as_str();
196
197                    let processed = if input_path == Some("null") {
198                        json!({})
199                    } else {
200                        apply_input_path(&effective_input, input_path)
201                    };
202
203                    let output = if output_path == Some("null") {
204                        json!({})
205                    } else {
206                        apply_output_path(&processed, output_path)
207                    };
208
209                    add_event(
210                        shared_state,
211                        execution_arn,
212                        "SucceedStateExited",
213                        0,
214                        json!({
215                            "name": current_state,
216                            "output": serde_json::to_string(&output).unwrap_or_default(),
217                        }),
218                    );
219
220                    return Ok(output);
221                }
222
223                "Fail" => {
224                    let error = state_def["Error"]
225                        .as_str()
226                        .unwrap_or("States.Fail")
227                        .to_string();
228                    let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
229
230                    add_event(
231                        shared_state,
232                        execution_arn,
233                        "FailStateEntered",
234                        0,
235                        json!({
236                            "name": current_state,
237                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
238                        }),
239                    );
240
241                    return Err((error, cause));
242                }
243
244                "Task" => {
245                    let entered_event_id = add_event(
246                        shared_state,
247                        execution_arn,
248                        "TaskStateEntered",
249                        0,
250                        json!({
251                            "name": current_state,
252                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
253                        }),
254                    );
255
256                    let result = execute_task_state(
257                        &state_def,
258                        &effective_input,
259                        delivery,
260                        dynamodb_state,
261                        shared_state,
262                        execution_arn,
263                        entered_event_id,
264                    )
265                    .await;
266
267                    match result {
268                        Ok(output) => {
269                            add_event(
270                                shared_state,
271                                execution_arn,
272                                "TaskStateExited",
273                                entered_event_id,
274                                json!({
275                                    "name": current_state,
276                                    "output": serde_json::to_string(&output).unwrap_or_default(),
277                                }),
278                            );
279
280                            effective_input = output;
281
282                            match next_state(&state_def) {
283                                NextState::Name(next) => current_state = next,
284                                NextState::End => return Ok(effective_input),
285                                NextState::Error(msg) => {
286                                    return Err(("States.Runtime".to_string(), msg));
287                                }
288                            }
289                        }
290                        Err((error, cause)) => {
291                            let catchers =
292                                state_def["Catch"].as_array().cloned().unwrap_or_default();
293
294                            if let Some((next, result_path)) = find_catcher(&catchers, &error) {
295                                let error_output = json!({
296                                    "Error": error,
297                                    "Cause": cause,
298                                });
299                                effective_input = apply_result_path(
300                                    &effective_input,
301                                    &error_output,
302                                    result_path.as_deref(),
303                                );
304                                current_state = next;
305                            } else {
306                                return Err((error, cause));
307                            }
308                        }
309                    }
310                }
311
312                "Choice" => {
313                    let entered_event_id = add_event(
314                        shared_state,
315                        execution_arn,
316                        "ChoiceStateEntered",
317                        0,
318                        json!({
319                            "name": current_state,
320                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
321                        }),
322                    );
323
324                    let input_path = state_def["InputPath"].as_str();
325                    let processed_input = if input_path == Some("null") {
326                        json!({})
327                    } else {
328                        apply_input_path(&effective_input, input_path)
329                    };
330
331                    match evaluate_choice(&state_def, &processed_input) {
332                        Some(next) => {
333                            add_event(
334                                shared_state,
335                                execution_arn,
336                                "ChoiceStateExited",
337                                entered_event_id,
338                                json!({
339                                    "name": current_state,
340                                    "output": serde_json::to_string(&effective_input).unwrap_or_default(),
341                                }),
342                            );
343                            current_state = next;
344                        }
345                        None => {
346                            return Err((
347                                "States.NoChoiceMatched".to_string(),
348                                format!(
349                                    "No choice rule matched and no Default in state '{current_state}'"
350                                ),
351                            ));
352                        }
353                    }
354                }
355
356                "Wait" => {
357                    let entered_event_id = add_event(
358                        shared_state,
359                        execution_arn,
360                        "WaitStateEntered",
361                        0,
362                        json!({
363                            "name": current_state,
364                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
365                        }),
366                    );
367
368                    execute_wait_state(&state_def, &effective_input).await;
369
370                    add_event(
371                        shared_state,
372                        execution_arn,
373                        "WaitStateExited",
374                        entered_event_id,
375                        json!({
376                            "name": current_state,
377                            "output": serde_json::to_string(&effective_input).unwrap_or_default(),
378                        }),
379                    );
380
381                    match next_state(&state_def) {
382                        NextState::Name(next) => current_state = next,
383                        NextState::End => return Ok(effective_input),
384                        NextState::Error(msg) => {
385                            return Err(("States.Runtime".to_string(), msg));
386                        }
387                    }
388                }
389
390                "Parallel" => {
391                    let entered_event_id = add_event(
392                        shared_state,
393                        execution_arn,
394                        "ParallelStateEntered",
395                        0,
396                        json!({
397                            "name": current_state,
398                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
399                        }),
400                    );
401
402                    let result = execute_parallel_state(
403                        &state_def,
404                        &effective_input,
405                        delivery,
406                        dynamodb_state,
407                        shared_state,
408                        execution_arn,
409                    )
410                    .await;
411
412                    match result {
413                        Ok(output) => {
414                            add_event(
415                                shared_state,
416                                execution_arn,
417                                "ParallelStateExited",
418                                entered_event_id,
419                                json!({
420                                    "name": current_state,
421                                    "output": serde_json::to_string(&output).unwrap_or_default(),
422                                }),
423                            );
424
425                            effective_input = output;
426
427                            match next_state(&state_def) {
428                                NextState::Name(next) => current_state = next,
429                                NextState::End => return Ok(effective_input),
430                                NextState::Error(msg) => {
431                                    return Err(("States.Runtime".to_string(), msg));
432                                }
433                            }
434                        }
435                        Err((error, cause)) => {
436                            let catchers =
437                                state_def["Catch"].as_array().cloned().unwrap_or_default();
438
439                            if let Some((next, result_path)) = find_catcher(&catchers, &error) {
440                                let error_output = json!({
441                                    "Error": error,
442                                    "Cause": cause,
443                                });
444                                effective_input = apply_result_path(
445                                    &effective_input,
446                                    &error_output,
447                                    result_path.as_deref(),
448                                );
449                                current_state = next;
450                            } else {
451                                return Err((error, cause));
452                            }
453                        }
454                    }
455                }
456
457                "Map" => {
458                    let entered_event_id = add_event(
459                        shared_state,
460                        execution_arn,
461                        "MapStateEntered",
462                        0,
463                        json!({
464                            "name": current_state,
465                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
466                        }),
467                    );
468
469                    let result = execute_map_state(
470                        &state_def,
471                        &effective_input,
472                        delivery,
473                        dynamodb_state,
474                        shared_state,
475                        execution_arn,
476                    )
477                    .await;
478
479                    match result {
480                        Ok(output) => {
481                            add_event(
482                                shared_state,
483                                execution_arn,
484                                "MapStateExited",
485                                entered_event_id,
486                                json!({
487                                    "name": current_state,
488                                    "output": serde_json::to_string(&output).unwrap_or_default(),
489                                }),
490                            );
491
492                            effective_input = output;
493
494                            match next_state(&state_def) {
495                                NextState::Name(next) => current_state = next,
496                                NextState::End => return Ok(effective_input),
497                                NextState::Error(msg) => {
498                                    return Err(("States.Runtime".to_string(), msg));
499                                }
500                            }
501                        }
502                        Err((error, cause)) => {
503                            let catchers =
504                                state_def["Catch"].as_array().cloned().unwrap_or_default();
505
506                            if let Some((next, result_path)) = find_catcher(&catchers, &error) {
507                                let error_output = json!({
508                                    "Error": error,
509                                    "Cause": cause,
510                                });
511                                effective_input = apply_result_path(
512                                    &effective_input,
513                                    &error_output,
514                                    result_path.as_deref(),
515                                );
516                                current_state = next;
517                            } else {
518                                return Err((error, cause));
519                            }
520                        }
521                    }
522                }
523
524                other => {
525                    return Err((
526                        "States.Runtime".to_string(),
527                        format!("Unsupported state type: '{other}'"),
528                    ));
529                }
530            }
531        }
532    })
533}
534
535/// Execute a Wait state: pause execution for a specified duration or until a timestamp.
536async fn execute_wait_state(state_def: &Value, input: &Value) {
537    if let Some(seconds) = state_def["Seconds"].as_u64() {
538        tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
539        return;
540    }
541
542    if let Some(path) = state_def["SecondsPath"].as_str() {
543        let val = crate::io_processing::resolve_path(input, path);
544        if let Some(seconds) = val.as_u64() {
545            tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
546        }
547        return;
548    }
549
550    if let Some(ts_str) = state_def["Timestamp"].as_str() {
551        if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
552            let now = Utc::now();
553            let target_utc = target.with_timezone(&chrono::Utc);
554            if target_utc > now {
555                let duration = (target_utc - now).to_std().unwrap_or_default();
556                tokio::time::sleep(duration).await;
557            }
558        }
559        return;
560    }
561
562    if let Some(path) = state_def["TimestampPath"].as_str() {
563        let val = crate::io_processing::resolve_path(input, path);
564        if let Some(ts_str) = val.as_str() {
565            if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
566                let now = Utc::now();
567                let target_utc = target.with_timezone(&chrono::Utc);
568                if target_utc > now {
569                    let duration = (target_utc - now).to_std().unwrap_or_default();
570                    tokio::time::sleep(duration).await;
571                }
572            }
573        }
574    }
575}
576
577/// Execute a Pass state: apply InputPath, use Result if present, apply ResultPath and OutputPath.
578fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
579    let input_path = state_def["InputPath"].as_str();
580    let result_path = state_def["ResultPath"].as_str();
581    let output_path = state_def["OutputPath"].as_str();
582
583    let effective_input = if input_path == Some("null") {
584        json!({})
585    } else {
586        apply_input_path(input, input_path)
587    };
588
589    let result = if let Some(r) = state_def.get("Result") {
590        r.clone()
591    } else {
592        effective_input.clone()
593    };
594
595    let after_result = if result_path == Some("null") {
596        input.clone()
597    } else {
598        apply_result_path(input, &result, result_path)
599    };
600
601    if output_path == Some("null") {
602        json!({})
603    } else {
604        apply_output_path(&after_result, output_path)
605    }
606}
607
608/// Execute a Task state: invoke the resource (Lambda, SQS, SNS, EventBridge, DynamoDB),
609/// apply I/O processing, handle Retry.
610async fn execute_task_state(
611    state_def: &Value,
612    input: &Value,
613    delivery: &Option<Arc<DeliveryBus>>,
614    dynamodb_state: &Option<SharedDynamoDbState>,
615    shared_state: &SharedStepFunctionsState,
616    execution_arn: &str,
617    entered_event_id: i64,
618) -> Result<Value, (String, String)> {
619    let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
620
621    let input_path = state_def["InputPath"].as_str();
622    let result_path = state_def["ResultPath"].as_str();
623    let output_path = state_def["OutputPath"].as_str();
624
625    let effective_input = if input_path == Some("null") {
626        json!({})
627    } else {
628        apply_input_path(input, input_path)
629    };
630
631    let task_input = if let Some(params) = state_def.get("Parameters") {
632        apply_parameters(params, &effective_input)
633    } else {
634        effective_input
635    };
636
637    let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
638    let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
639    let mut attempt = 0u32;
640
641    loop {
642        add_event(
643            shared_state,
644            execution_arn,
645            "TaskScheduled",
646            entered_event_id,
647            json!({
648                "resource": resource,
649                "region": "us-east-1",
650                "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
651            }),
652        );
653
654        add_event(
655            shared_state,
656            execution_arn,
657            "TaskStarted",
658            entered_event_id,
659            json!({ "resource": resource }),
660        );
661
662        let invoke_result = invoke_resource(
663            &resource,
664            &task_input,
665            delivery,
666            dynamodb_state,
667            timeout_seconds,
668        )
669        .await;
670
671        match invoke_result {
672            Ok(result) => {
673                add_event(
674                    shared_state,
675                    execution_arn,
676                    "TaskSucceeded",
677                    entered_event_id,
678                    json!({
679                        "resource": resource,
680                        "output": serde_json::to_string(&result).unwrap_or_default(),
681                    }),
682                );
683
684                let selected = if let Some(selector) = state_def.get("ResultSelector") {
685                    apply_parameters(selector, &result)
686                } else {
687                    result
688                };
689
690                let after_result = if result_path == Some("null") {
691                    input.clone()
692                } else {
693                    apply_result_path(input, &selected, result_path)
694                };
695
696                let output = if output_path == Some("null") {
697                    json!({})
698                } else {
699                    apply_output_path(&after_result, output_path)
700                };
701
702                return Ok(output);
703            }
704            Err((error, cause)) => {
705                add_event(
706                    shared_state,
707                    execution_arn,
708                    "TaskFailed",
709                    entered_event_id,
710                    json!({ "error": error, "cause": cause }),
711                );
712
713                if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
714                    attempt += 1;
715                    let actual_delay = delay_ms.min(5000);
716                    tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
717                    continue;
718                }
719
720                return Err((error, cause));
721            }
722        }
723    }
724}
725
726/// Execute a Parallel state: run all branches concurrently, collect results into an array.
727async fn execute_parallel_state(
728    state_def: &Value,
729    input: &Value,
730    delivery: &Option<Arc<DeliveryBus>>,
731    dynamodb_state: &Option<SharedDynamoDbState>,
732    shared_state: &SharedStepFunctionsState,
733    execution_arn: &str,
734) -> Result<Value, (String, String)> {
735    let input_path = state_def["InputPath"].as_str();
736    let result_path = state_def["ResultPath"].as_str();
737    let output_path = state_def["OutputPath"].as_str();
738
739    let effective_input = if input_path == Some("null") {
740        json!({})
741    } else {
742        apply_input_path(input, input_path)
743    };
744
745    let branches = state_def["Branches"]
746        .as_array()
747        .cloned()
748        .unwrap_or_default();
749
750    if branches.is_empty() {
751        return Err((
752            "States.Runtime".to_string(),
753            "Parallel state has no Branches".to_string(),
754        ));
755    }
756
757    // Spawn all branches concurrently
758    let mut handles = Vec::new();
759    for branch_def in &branches {
760        let branch = branch_def.clone();
761        let branch_input = effective_input.clone();
762        let delivery = delivery.clone();
763        let ddb = dynamodb_state.clone();
764        let state = shared_state.clone();
765        let arn = execution_arn.to_string();
766
767        handles.push(tokio::spawn(async move {
768            run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
769        }));
770    }
771
772    // Collect results in order
773    let mut results = Vec::with_capacity(handles.len());
774    for handle in handles {
775        let result = handle.await.map_err(|e| {
776            (
777                "States.Runtime".to_string(),
778                format!("Branch execution panicked: {e}"),
779            )
780        })??;
781        results.push(result);
782    }
783
784    let branch_output = Value::Array(results);
785
786    // Apply ResultSelector if present
787    let selected = if let Some(selector) = state_def.get("ResultSelector") {
788        apply_parameters(selector, &branch_output)
789    } else {
790        branch_output
791    };
792
793    // Apply ResultPath
794    let after_result = if result_path == Some("null") {
795        input.clone()
796    } else {
797        apply_result_path(input, &selected, result_path)
798    };
799
800    // Apply OutputPath
801    let output = if output_path == Some("null") {
802        json!({})
803    } else {
804        apply_output_path(&after_result, output_path)
805    };
806
807    Ok(output)
808}
809
810/// Execute a Map state: iterate over an array and run a sub-state machine per item.
811async fn execute_map_state(
812    state_def: &Value,
813    input: &Value,
814    delivery: &Option<Arc<DeliveryBus>>,
815    dynamodb_state: &Option<SharedDynamoDbState>,
816    shared_state: &SharedStepFunctionsState,
817    execution_arn: &str,
818) -> Result<Value, (String, String)> {
819    let input_path = state_def["InputPath"].as_str();
820    let result_path = state_def["ResultPath"].as_str();
821    let output_path = state_def["OutputPath"].as_str();
822
823    let effective_input = if input_path == Some("null") {
824        json!({})
825    } else {
826        apply_input_path(input, input_path)
827    };
828
829    // Get the items to iterate over
830    let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
831    let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
832    let items = items_value.as_array().cloned().unwrap_or_default();
833
834    // Get the iterator definition (ItemProcessor or Iterator for backwards compat)
835    let iterator_def = state_def
836        .get("ItemProcessor")
837        .or_else(|| state_def.get("Iterator"))
838        .cloned()
839        .ok_or_else(|| {
840            (
841                "States.Runtime".to_string(),
842                "Map state has no ItemProcessor or Iterator".to_string(),
843            )
844        })?;
845
846    let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
847    let effective_concurrency = if max_concurrency == 0 {
848        40
849    } else {
850        max_concurrency as usize
851    };
852
853    let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
854
855    // Process all items
856    let mut handles = Vec::new();
857    for (index, item) in items.into_iter().enumerate() {
858        let iter_def = iterator_def.clone();
859        let delivery = delivery.clone();
860        let ddb = dynamodb_state.clone();
861        let state = shared_state.clone();
862        let arn = execution_arn.to_string();
863        let sem = semaphore.clone();
864
865        // Apply ItemSelector if present
866        let item_input = if let Some(selector) = state_def.get("ItemSelector") {
867            let mut ctx = serde_json::Map::new();
868            ctx.insert("value".to_string(), item.clone());
869            ctx.insert("index".to_string(), json!(index));
870            apply_parameters(selector, &Value::Object(ctx))
871        } else {
872            item
873        };
874
875        add_event(
876            shared_state,
877            execution_arn,
878            "MapIterationStarted",
879            0,
880            json!({ "index": index }),
881        );
882
883        handles.push(tokio::spawn(async move {
884            let _permit = sem
885                .acquire()
886                .await
887                .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
888            let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
889            Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
890        }));
891    }
892
893    // Collect results in order
894    let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
895    for handle in handles {
896        let (index, result) = handle.await.map_err(|e| {
897            (
898                "States.Runtime".to_string(),
899                format!("Map iteration panicked: {e}"),
900            )
901        })??;
902
903        match result {
904            Ok(output) => {
905                add_event(
906                    shared_state,
907                    execution_arn,
908                    "MapIterationSucceeded",
909                    0,
910                    json!({ "index": index }),
911                );
912                results.push((index, output));
913            }
914            Err((error, cause)) => {
915                add_event(
916                    shared_state,
917                    execution_arn,
918                    "MapIterationFailed",
919                    0,
920                    json!({ "index": index, "error": error }),
921                );
922                return Err((error, cause));
923            }
924        }
925    }
926
927    // Sort by index to maintain order
928    results.sort_by_key(|(i, _)| *i);
929    let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
930
931    // Apply ResultSelector if present
932    let selected = if let Some(selector) = state_def.get("ResultSelector") {
933        apply_parameters(selector, &map_output)
934    } else {
935        map_output
936    };
937
938    // Apply ResultPath
939    let after_result = if result_path == Some("null") {
940        input.clone()
941    } else {
942        apply_result_path(input, &selected, result_path)
943    };
944
945    // Apply OutputPath
946    let output = if output_path == Some("null") {
947        json!({})
948    } else {
949        apply_output_path(&after_result, output_path)
950    };
951
952    Ok(output)
953}
954
955/// Invoke a resource (Lambda function or SDK integration).
956async fn invoke_resource(
957    resource: &str,
958    input: &Value,
959    delivery: &Option<Arc<DeliveryBus>>,
960    dynamodb_state: &Option<SharedDynamoDbState>,
961    timeout_seconds: Option<u64>,
962) -> Result<Value, (String, String)> {
963    // Direct Lambda ARN: arn:aws:lambda:<region>:<account>:function:<name>
964    if resource.contains(":lambda:") && resource.contains(":function:") {
965        return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
966    }
967
968    // SDK integration patterns: arn:aws:states:::<service>:<action>
969    if resource.starts_with("arn:aws:states:::lambda:invoke") {
970        let function_name = input["FunctionName"].as_str().unwrap_or("");
971        let payload = if let Some(p) = input.get("Payload") {
972            p.clone()
973        } else {
974            input.clone()
975        };
976        return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
977    }
978
979    if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
980        return invoke_sqs_send_message(input, delivery);
981    }
982
983    if resource.starts_with("arn:aws:states:::sns:publish") {
984        return invoke_sns_publish(input, delivery);
985    }
986
987    if resource.starts_with("arn:aws:states:::events:putEvents") {
988        return invoke_eventbridge_put_events(input, delivery);
989    }
990
991    if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
992        return invoke_dynamodb_get_item(input, dynamodb_state);
993    }
994
995    if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
996        return invoke_dynamodb_put_item(input, dynamodb_state);
997    }
998
999    if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1000        return invoke_dynamodb_delete_item(input, dynamodb_state);
1001    }
1002
1003    if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1004        return invoke_dynamodb_update_item(input, dynamodb_state);
1005    }
1006
1007    Err((
1008        "States.TaskFailed".to_string(),
1009        format!("Unsupported resource: {resource}"),
1010    ))
1011}
1012
1013/// Send a message to an SQS queue via DeliveryBus.
1014fn invoke_sqs_send_message(
1015    input: &Value,
1016    delivery: &Option<Arc<DeliveryBus>>,
1017) -> Result<Value, (String, String)> {
1018    let delivery = delivery.as_ref().ok_or_else(|| {
1019        (
1020            "States.TaskFailed".to_string(),
1021            "No delivery bus configured for SQS".to_string(),
1022        )
1023    })?;
1024
1025    let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1026        (
1027            "States.TaskFailed".to_string(),
1028            "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1029        )
1030    })?;
1031
1032    let message_body = input["MessageBody"]
1033        .as_str()
1034        .map(|s| s.to_string())
1035        .unwrap_or_else(|| {
1036            // If MessageBody is not a string, serialize the value
1037            serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1038        });
1039
1040    // Convert QueueUrl to ARN format for the delivery bus
1041    // QueueUrl format: http://.../<account>/<queue-name>
1042    // ARN format: arn:aws:sqs:<region>:<account>:<queue-name>
1043    let queue_arn = queue_url_to_arn(queue_url);
1044
1045    delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1046
1047    Ok(json!({
1048        "MessageId": uuid::Uuid::new_v4().to_string(),
1049        "MD5OfMessageBody": md5_hex(&message_body),
1050    }))
1051}
1052
1053/// Publish a message to an SNS topic via DeliveryBus.
1054fn invoke_sns_publish(
1055    input: &Value,
1056    delivery: &Option<Arc<DeliveryBus>>,
1057) -> Result<Value, (String, String)> {
1058    let delivery = delivery.as_ref().ok_or_else(|| {
1059        (
1060            "States.TaskFailed".to_string(),
1061            "No delivery bus configured for SNS".to_string(),
1062        )
1063    })?;
1064
1065    let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1066        (
1067            "States.TaskFailed".to_string(),
1068            "Missing TopicArn in SNS publish parameters".to_string(),
1069        )
1070    })?;
1071
1072    let message = input["Message"]
1073        .as_str()
1074        .map(|s| s.to_string())
1075        .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1076
1077    let subject = input["Subject"].as_str();
1078
1079    delivery.publish_to_sns(topic_arn, &message, subject);
1080
1081    Ok(json!({
1082        "MessageId": uuid::Uuid::new_v4().to_string(),
1083    }))
1084}
1085
1086/// Put events onto an EventBridge bus via DeliveryBus.
1087fn invoke_eventbridge_put_events(
1088    input: &Value,
1089    delivery: &Option<Arc<DeliveryBus>>,
1090) -> Result<Value, (String, String)> {
1091    let delivery = delivery.as_ref().ok_or_else(|| {
1092        (
1093            "States.TaskFailed".to_string(),
1094            "No delivery bus configured for EventBridge".to_string(),
1095        )
1096    })?;
1097
1098    let entries = input["Entries"]
1099        .as_array()
1100        .ok_or_else(|| {
1101            (
1102                "States.TaskFailed".to_string(),
1103                "Missing Entries in EventBridge putEvents parameters".to_string(),
1104            )
1105        })?
1106        .clone();
1107
1108    let mut event_ids = Vec::new();
1109    for entry in &entries {
1110        let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1111        let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1112        let detail = entry["Detail"]
1113            .as_str()
1114            .map(|s| s.to_string())
1115            .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1116        let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1117
1118        delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1119        event_ids.push(uuid::Uuid::new_v4().to_string());
1120    }
1121
1122    Ok(json!({
1123        "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1124        "FailedEntryCount": 0,
1125    }))
1126}
1127
1128/// Get an item from DynamoDB via direct state access.
1129fn invoke_dynamodb_get_item(
1130    input: &Value,
1131    dynamodb_state: &Option<SharedDynamoDbState>,
1132) -> Result<Value, (String, String)> {
1133    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1134        (
1135            "States.TaskFailed".to_string(),
1136            "No DynamoDB state configured".to_string(),
1137        )
1138    })?;
1139
1140    let table_name = input["TableName"].as_str().ok_or_else(|| {
1141        (
1142            "States.TaskFailed".to_string(),
1143            "Missing TableName in DynamoDB getItem parameters".to_string(),
1144        )
1145    })?;
1146
1147    let key = input
1148        .get("Key")
1149        .and_then(|k| k.as_object())
1150        .ok_or_else(|| {
1151            (
1152                "States.TaskFailed".to_string(),
1153                "Missing Key in DynamoDB getItem parameters".to_string(),
1154            )
1155        })?;
1156
1157    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1158
1159    let state = ddb.read();
1160    let table = state.tables.get(table_name).ok_or_else(|| {
1161        (
1162            "States.TaskFailed".to_string(),
1163            format!("Table '{table_name}' not found"),
1164        )
1165    })?;
1166
1167    let item = table
1168        .find_item_index(&key_map)
1169        .map(|idx| table.items[idx].clone());
1170
1171    match item {
1172        Some(item_map) => {
1173            let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1174            Ok(json!({ "Item": item_value }))
1175        }
1176        None => Ok(json!({})),
1177    }
1178}
1179
1180/// Put an item into DynamoDB via direct state access.
1181fn invoke_dynamodb_put_item(
1182    input: &Value,
1183    dynamodb_state: &Option<SharedDynamoDbState>,
1184) -> Result<Value, (String, String)> {
1185    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1186        (
1187            "States.TaskFailed".to_string(),
1188            "No DynamoDB state configured".to_string(),
1189        )
1190    })?;
1191
1192    let table_name = input["TableName"].as_str().ok_or_else(|| {
1193        (
1194            "States.TaskFailed".to_string(),
1195            "Missing TableName in DynamoDB putItem parameters".to_string(),
1196        )
1197    })?;
1198
1199    let item = input
1200        .get("Item")
1201        .and_then(|i| i.as_object())
1202        .ok_or_else(|| {
1203            (
1204                "States.TaskFailed".to_string(),
1205                "Missing Item in DynamoDB putItem parameters".to_string(),
1206            )
1207        })?;
1208
1209    let item_map: HashMap<String, Value> =
1210        item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1211
1212    let mut state = ddb.write();
1213    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1214        (
1215            "States.TaskFailed".to_string(),
1216            format!("Table '{table_name}' not found"),
1217        )
1218    })?;
1219
1220    // Replace existing item with same key, or insert new
1221    if let Some(idx) = table.find_item_index(&item_map) {
1222        table.items[idx] = item_map;
1223    } else {
1224        table.items.push(item_map);
1225    }
1226
1227    Ok(json!({}))
1228}
1229
1230/// Delete an item from DynamoDB via direct state access.
1231fn invoke_dynamodb_delete_item(
1232    input: &Value,
1233    dynamodb_state: &Option<SharedDynamoDbState>,
1234) -> Result<Value, (String, String)> {
1235    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1236        (
1237            "States.TaskFailed".to_string(),
1238            "No DynamoDB state configured".to_string(),
1239        )
1240    })?;
1241
1242    let table_name = input["TableName"].as_str().ok_or_else(|| {
1243        (
1244            "States.TaskFailed".to_string(),
1245            "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1246        )
1247    })?;
1248
1249    let key = input
1250        .get("Key")
1251        .and_then(|k| k.as_object())
1252        .ok_or_else(|| {
1253            (
1254                "States.TaskFailed".to_string(),
1255                "Missing Key in DynamoDB deleteItem parameters".to_string(),
1256            )
1257        })?;
1258
1259    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1260
1261    let mut state = ddb.write();
1262    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1263        (
1264            "States.TaskFailed".to_string(),
1265            format!("Table '{table_name}' not found"),
1266        )
1267    })?;
1268
1269    if let Some(idx) = table.find_item_index(&key_map) {
1270        table.items.remove(idx);
1271    }
1272
1273    Ok(json!({}))
1274}
1275
1276/// Update an item in DynamoDB via direct state access.
1277/// Simplified: merges Key+ExpressionAttributeValues into the existing item.
1278fn invoke_dynamodb_update_item(
1279    input: &Value,
1280    dynamodb_state: &Option<SharedDynamoDbState>,
1281) -> Result<Value, (String, String)> {
1282    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1283        (
1284            "States.TaskFailed".to_string(),
1285            "No DynamoDB state configured".to_string(),
1286        )
1287    })?;
1288
1289    let table_name = input["TableName"].as_str().ok_or_else(|| {
1290        (
1291            "States.TaskFailed".to_string(),
1292            "Missing TableName in DynamoDB updateItem parameters".to_string(),
1293        )
1294    })?;
1295
1296    let key = input
1297        .get("Key")
1298        .and_then(|k| k.as_object())
1299        .ok_or_else(|| {
1300            (
1301                "States.TaskFailed".to_string(),
1302                "Missing Key in DynamoDB updateItem parameters".to_string(),
1303            )
1304        })?;
1305
1306    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1307
1308    let mut state = ddb.write();
1309    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1310        (
1311            "States.TaskFailed".to_string(),
1312            format!("Table '{table_name}' not found"),
1313        )
1314    })?;
1315
1316    // Parse UpdateExpression to apply SET operations
1317    if let Some(update_expr) = input["UpdateExpression"].as_str() {
1318        let attr_values = input
1319            .get("ExpressionAttributeValues")
1320            .and_then(|v| v.as_object())
1321            .cloned()
1322            .unwrap_or_default();
1323        let attr_names = input
1324            .get("ExpressionAttributeNames")
1325            .and_then(|v| v.as_object())
1326            .cloned()
1327            .unwrap_or_default();
1328
1329        if let Some(idx) = table.find_item_index(&key_map) {
1330            apply_update_expression(
1331                &mut table.items[idx],
1332                update_expr,
1333                &attr_values,
1334                &attr_names,
1335            );
1336        } else {
1337            // Create new item with key + update expression values
1338            let mut new_item = key_map;
1339            apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1340            table.items.push(new_item);
1341        }
1342    }
1343
1344    Ok(json!({}))
1345}
1346
1347/// Apply a simple SET UpdateExpression to an item.
1348fn apply_update_expression(
1349    item: &mut HashMap<String, Value>,
1350    expr: &str,
1351    attr_values: &serde_json::Map<String, Value>,
1352    attr_names: &serde_json::Map<String, Value>,
1353) {
1354    // Parse "SET #name1 = :val1, #name2 = :val2" or "SET field = :val"
1355    // DynamoDB keywords are case-insensitive
1356    let trimmed = expr.trim();
1357    let set_part = if trimmed.len() >= 4 && trimmed[..4].eq_ignore_ascii_case("SET ") {
1358        &trimmed[4..]
1359    } else {
1360        trimmed
1361    };
1362
1363    for assignment in set_part.split(',') {
1364        let parts: Vec<&str> = assignment.splitn(2, '=').collect();
1365        if parts.len() == 2 {
1366            let attr_ref = parts[0].trim();
1367            let val_ref = parts[1].trim();
1368
1369            // Resolve attribute name (could be #alias or direct name)
1370            let attr_name = if attr_ref.starts_with('#') {
1371                attr_names
1372                    .get(attr_ref)
1373                    .and_then(|v| v.as_str())
1374                    .unwrap_or(attr_ref)
1375                    .to_string()
1376            } else {
1377                attr_ref.to_string()
1378            };
1379
1380            // Resolve value
1381            if val_ref.starts_with(':') {
1382                if let Some(val) = attr_values.get(val_ref) {
1383                    item.insert(attr_name, val.clone());
1384                }
1385            }
1386        }
1387    }
1388}
1389
1390/// Convert an SQS queue URL to an ARN.
1391/// QueueUrl format: http://localhost:4566/123456789012/my-queue
1392fn queue_url_to_arn(url: &str) -> String {
1393    let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1394    if parts.len() >= 2 {
1395        let queue_name = parts[0];
1396        let account_id = parts[1];
1397        format!("arn:aws:sqs:us-east-1:{account_id}:{queue_name}")
1398    } else {
1399        url.to_string()
1400    }
1401}
1402
1403/// Compute MD5 hex digest for SQS message response format.
1404fn md5_hex(data: &str) -> String {
1405    use md5::Digest;
1406    let result = md5::Md5::digest(data.as_bytes());
1407    format!("{result:032x}")
1408}
1409
1410/// Invoke a Lambda function directly via DeliveryBus.
1411async fn invoke_lambda_direct(
1412    function_arn: &str,
1413    input: &Value,
1414    delivery: &Option<Arc<DeliveryBus>>,
1415    timeout_seconds: Option<u64>,
1416) -> Result<Value, (String, String)> {
1417    let delivery = delivery.as_ref().ok_or_else(|| {
1418        (
1419            "States.TaskFailed".to_string(),
1420            "No delivery bus configured for Lambda invocation".to_string(),
1421        )
1422    })?;
1423
1424    let payload = serde_json::to_string(input).unwrap_or_default();
1425
1426    let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1427
1428    let result = if let Some(timeout) = timeout_seconds {
1429        match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1430            Ok(r) => r,
1431            Err(_) => {
1432                return Err((
1433                    "States.Timeout".to_string(),
1434                    format!("Task timed out after {timeout} seconds"),
1435                ));
1436            }
1437        }
1438    } else {
1439        invoke_future.await
1440    };
1441
1442    match result {
1443        Some(Ok(bytes)) => {
1444            let response_str = String::from_utf8_lossy(&bytes);
1445            let value: Value =
1446                serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1447            Ok(value)
1448        }
1449        Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1450        None => {
1451            // No runtime available — return empty result
1452            Ok(json!({}))
1453        }
1454    }
1455}
1456
1457/// Apply Parameters template: keys ending with .$ are treated as JsonPath references.
1458fn apply_parameters(template: &Value, input: &Value) -> Value {
1459    match template {
1460        Value::Object(map) => {
1461            let mut result = serde_json::Map::new();
1462            for (key, value) in map {
1463                if let Some(stripped) = key.strip_suffix(".$") {
1464                    if let Some(path) = value.as_str() {
1465                        result.insert(
1466                            stripped.to_string(),
1467                            crate::io_processing::resolve_path(input, path),
1468                        );
1469                    }
1470                } else {
1471                    result.insert(key.clone(), apply_parameters(value, input));
1472                }
1473            }
1474            Value::Object(result)
1475        }
1476        Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1477        other => other.clone(),
1478    }
1479}
1480
1481enum NextState {
1482    Name(String),
1483    End,
1484    Error(String),
1485}
1486
1487fn next_state(state_def: &Value) -> NextState {
1488    if state_def["End"].as_bool() == Some(true) {
1489        return NextState::End;
1490    }
1491    match state_def["Next"].as_str() {
1492        Some(next) => NextState::Name(next.to_string()),
1493        None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
1494    }
1495}
1496
1497fn add_event(
1498    state: &SharedStepFunctionsState,
1499    execution_arn: &str,
1500    event_type: &str,
1501    previous_event_id: i64,
1502    details: Value,
1503) -> i64 {
1504    let mut s = state.write();
1505    if let Some(exec) = s.executions.get_mut(execution_arn) {
1506        let id = exec.history_events.len() as i64 + 1;
1507        exec.history_events.push(HistoryEvent {
1508            id,
1509            event_type: event_type.to_string(),
1510            timestamp: Utc::now(),
1511            previous_event_id,
1512            details,
1513        });
1514        id
1515    } else {
1516        0
1517    }
1518}
1519
1520fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
1521    let output_str = serde_json::to_string(output).unwrap_or_default();
1522
1523    add_event(
1524        state,
1525        execution_arn,
1526        "ExecutionSucceeded",
1527        0,
1528        json!({ "output": output_str }),
1529    );
1530
1531    let mut s = state.write();
1532    if let Some(exec) = s.executions.get_mut(execution_arn) {
1533        exec.status = ExecutionStatus::Succeeded;
1534        exec.output = Some(output_str);
1535        exec.stop_date = Some(Utc::now());
1536    }
1537}
1538
1539fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
1540    add_event(
1541        state,
1542        execution_arn,
1543        "ExecutionFailed",
1544        0,
1545        json!({ "error": error, "cause": cause }),
1546    );
1547
1548    let mut s = state.write();
1549    if let Some(exec) = s.executions.get_mut(execution_arn) {
1550        exec.status = ExecutionStatus::Failed;
1551        exec.error = Some(error.to_string());
1552        exec.cause = Some(cause.to_string());
1553        exec.stop_date = Some(Utc::now());
1554    }
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559    use super::*;
1560    use crate::state::{Execution, StepFunctionsState};
1561    use parking_lot::RwLock;
1562    use std::sync::Arc;
1563
1564    fn make_state() -> SharedStepFunctionsState {
1565        Arc::new(RwLock::new(StepFunctionsState::new(
1566            "123456789012",
1567            "us-east-1",
1568        )))
1569    }
1570
1571    fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
1572        let mut s = state.write();
1573        s.executions.insert(
1574            arn.to_string(),
1575            Execution {
1576                execution_arn: arn.to_string(),
1577                state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
1578                    .to_string(),
1579                state_machine_name: "test".to_string(),
1580                name: "exec-1".to_string(),
1581                status: ExecutionStatus::Running,
1582                input,
1583                output: None,
1584                start_date: Utc::now(),
1585                stop_date: None,
1586                error: None,
1587                cause: None,
1588                history_events: vec![],
1589            },
1590        );
1591    }
1592
1593    #[tokio::test]
1594    async fn test_simple_pass_state() {
1595        let state = make_state();
1596        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1597        create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
1598
1599        let definition = json!({
1600            "StartAt": "PassState",
1601            "States": {
1602                "PassState": {
1603                    "Type": "Pass",
1604                    "Result": {"processed": true},
1605                    "End": true
1606                }
1607            }
1608        })
1609        .to_string();
1610
1611        execute_state_machine(
1612            state.clone(),
1613            arn.to_string(),
1614            definition,
1615            Some(r#"{"hello":"world"}"#.to_string()),
1616            None,
1617            None,
1618        )
1619        .await;
1620
1621        let s = state.read();
1622        let exec = s.executions.get(arn).unwrap();
1623        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1624        assert!(exec.output.is_some());
1625        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1626        assert_eq!(output, json!({"processed": true}));
1627    }
1628
1629    #[tokio::test]
1630    async fn test_pass_chain() {
1631        let state = make_state();
1632        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1633        create_execution(&state, arn, Some(r#"{}"#.to_string()));
1634
1635        let definition = json!({
1636            "StartAt": "First",
1637            "States": {
1638                "First": {
1639                    "Type": "Pass",
1640                    "Result": "step1",
1641                    "ResultPath": "$.first",
1642                    "Next": "Second"
1643                },
1644                "Second": {
1645                    "Type": "Pass",
1646                    "Result": "step2",
1647                    "ResultPath": "$.second",
1648                    "End": true
1649                }
1650            }
1651        })
1652        .to_string();
1653
1654        execute_state_machine(
1655            state.clone(),
1656            arn.to_string(),
1657            definition,
1658            Some("{}".to_string()),
1659            None,
1660            None,
1661        )
1662        .await;
1663
1664        let s = state.read();
1665        let exec = s.executions.get(arn).unwrap();
1666        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1667        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1668        assert_eq!(output["first"], json!("step1"));
1669        assert_eq!(output["second"], json!("step2"));
1670    }
1671
1672    #[tokio::test]
1673    async fn test_succeed_state() {
1674        let state = make_state();
1675        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1676        create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
1677
1678        let definition = json!({
1679            "StartAt": "Done",
1680            "States": {
1681                "Done": {
1682                    "Type": "Succeed"
1683                }
1684            }
1685        })
1686        .to_string();
1687
1688        execute_state_machine(
1689            state.clone(),
1690            arn.to_string(),
1691            definition,
1692            Some(r#"{"data": "value"}"#.to_string()),
1693            None,
1694            None,
1695        )
1696        .await;
1697
1698        let s = state.read();
1699        let exec = s.executions.get(arn).unwrap();
1700        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1701    }
1702
1703    #[tokio::test]
1704    async fn test_fail_state() {
1705        let state = make_state();
1706        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1707        create_execution(&state, arn, None);
1708
1709        let definition = json!({
1710            "StartAt": "FailState",
1711            "States": {
1712                "FailState": {
1713                    "Type": "Fail",
1714                    "Error": "CustomError",
1715                    "Cause": "Something went wrong"
1716                }
1717            }
1718        })
1719        .to_string();
1720
1721        execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
1722
1723        let s = state.read();
1724        let exec = s.executions.get(arn).unwrap();
1725        assert_eq!(exec.status, ExecutionStatus::Failed);
1726        assert_eq!(exec.error.as_deref(), Some("CustomError"));
1727        assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
1728    }
1729
1730    #[tokio::test]
1731    async fn test_history_events_recorded() {
1732        let state = make_state();
1733        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1734        create_execution(&state, arn, Some("{}".to_string()));
1735
1736        let definition = json!({
1737            "StartAt": "PassState",
1738            "States": {
1739                "PassState": {
1740                    "Type": "Pass",
1741                    "End": true
1742                }
1743            }
1744        })
1745        .to_string();
1746
1747        execute_state_machine(
1748            state.clone(),
1749            arn.to_string(),
1750            definition,
1751            Some("{}".to_string()),
1752            None,
1753            None,
1754        )
1755        .await;
1756
1757        let s = state.read();
1758        let exec = s.executions.get(arn).unwrap();
1759        let event_types: Vec<&str> = exec
1760            .history_events
1761            .iter()
1762            .map(|e| e.event_type.as_str())
1763            .collect();
1764        assert_eq!(
1765            event_types,
1766            vec![
1767                "ExecutionStarted",
1768                "PassStateEntered",
1769                "PassStateExited",
1770                "ExecutionSucceeded"
1771            ]
1772        );
1773    }
1774}