Skip to main content

fakecloud_stepfunctions/
interpreter.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_core::delivery::DeliveryBus;
9use fakecloud_dynamodb::state::SharedDynamoDbState;
10
11use crate::choice::evaluate_choice;
12use crate::error_handling::{find_catcher, should_retry};
13use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
14use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
15
16/// Execute a state machine definition with the given input.
17/// Updates the execution record in shared state as it progresses.
18pub async fn execute_state_machine(
19    state: SharedStepFunctionsState,
20    execution_arn: String,
21    definition: String,
22    input: Option<String>,
23    delivery: Option<Arc<DeliveryBus>>,
24    dynamodb_state: Option<SharedDynamoDbState>,
25) {
26    let def: Value = match serde_json::from_str(&definition) {
27        Ok(v) => v,
28        Err(e) => {
29            fail_execution(
30                &state,
31                &execution_arn,
32                "States.Runtime",
33                &format!("Failed to parse definition: {e}"),
34            );
35            return;
36        }
37    };
38
39    let raw_input: Value = input
40        .as_deref()
41        .and_then(|s| serde_json::from_str(s).ok())
42        .unwrap_or(json!({}));
43
44    // Record ExecutionStarted event
45    add_event(
46        &state,
47        &execution_arn,
48        "ExecutionStarted",
49        0,
50        json!({
51            "input": serde_json::to_string(&raw_input).unwrap_or_default(),
52            "roleArn": "arn:aws:iam::123456789012:role/execution-role"
53        }),
54    );
55
56    match run_states(
57        &def,
58        raw_input,
59        &delivery,
60        &dynamodb_state,
61        &state,
62        &execution_arn,
63    )
64    .await
65    {
66        Ok(output) => {
67            succeed_execution(&state, &execution_arn, &output);
68        }
69        Err((error, cause)) => {
70            fail_execution(&state, &execution_arn, &error, &cause);
71        }
72    }
73}
74
75type StatesResult<'a> = std::pin::Pin<
76    Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
77>;
78
79/// Core execution loop: runs through states in a definition and returns the output.
80/// Used by the top-level executor, Parallel branches, and Map iterations.
81fn run_states<'a>(
82    def: &'a Value,
83    input: Value,
84    delivery: &'a Option<Arc<DeliveryBus>>,
85    dynamodb_state: &'a Option<SharedDynamoDbState>,
86    shared_state: &'a SharedStepFunctionsState,
87    execution_arn: &'a str,
88) -> StatesResult<'a> {
89    Box::pin(async move {
90        let start_at = def["StartAt"]
91            .as_str()
92            .ok_or_else(|| {
93                (
94                    "States.Runtime".to_string(),
95                    "Missing StartAt in definition".to_string(),
96                )
97            })?
98            .to_string();
99
100        let states = def.get("States").ok_or_else(|| {
101            (
102                "States.Runtime".to_string(),
103                "Missing States in definition".to_string(),
104            )
105        })?;
106
107        let mut current_state = start_at;
108        let mut effective_input = input;
109        let mut iteration = 0;
110        let max_iterations = 500;
111
112        loop {
113            iteration += 1;
114            if iteration > max_iterations {
115                return Err((
116                    "States.Runtime".to_string(),
117                    "Maximum number of state transitions exceeded".to_string(),
118                ));
119            }
120
121            let state_def = states.get(&current_state).cloned().ok_or_else(|| {
122                (
123                    "States.Runtime".to_string(),
124                    format!("State '{current_state}' not found in definition"),
125                )
126            })?;
127
128            let state_type = state_def["Type"]
129                .as_str()
130                .ok_or_else(|| {
131                    (
132                        "States.Runtime".to_string(),
133                        format!("State '{current_state}' missing Type field"),
134                    )
135                })?
136                .to_string();
137
138            debug!(
139                execution_arn = %execution_arn,
140                state = %current_state,
141                state_type = %state_type,
142                "Executing state"
143            );
144
145            match state_type.as_str() {
146                "Pass" => {
147                    let entered_event_id = add_event(
148                        shared_state,
149                        execution_arn,
150                        "PassStateEntered",
151                        0,
152                        json!({
153                            "name": current_state,
154                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
155                        }),
156                    );
157
158                    let result = execute_pass_state(&state_def, &effective_input);
159
160                    add_event(
161                        shared_state,
162                        execution_arn,
163                        "PassStateExited",
164                        entered_event_id,
165                        json!({
166                            "name": current_state,
167                            "output": serde_json::to_string(&result).unwrap_or_default(),
168                        }),
169                    );
170
171                    effective_input = result;
172
173                    match next_state(&state_def) {
174                        NextState::Name(next) => current_state = next,
175                        NextState::End => return Ok(effective_input),
176                        NextState::Error(msg) => {
177                            return Err(("States.Runtime".to_string(), msg));
178                        }
179                    }
180                }
181
182                "Succeed" => {
183                    add_event(
184                        shared_state,
185                        execution_arn,
186                        "SucceedStateEntered",
187                        0,
188                        json!({
189                            "name": current_state,
190                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
191                        }),
192                    );
193
194                    let input_path = state_def["InputPath"].as_str();
195                    let output_path = state_def["OutputPath"].as_str();
196
197                    let processed = if input_path == Some("null") {
198                        json!({})
199                    } else {
200                        apply_input_path(&effective_input, input_path)
201                    };
202
203                    let output = if output_path == Some("null") {
204                        json!({})
205                    } else {
206                        apply_output_path(&processed, output_path)
207                    };
208
209                    add_event(
210                        shared_state,
211                        execution_arn,
212                        "SucceedStateExited",
213                        0,
214                        json!({
215                            "name": current_state,
216                            "output": serde_json::to_string(&output).unwrap_or_default(),
217                        }),
218                    );
219
220                    return Ok(output);
221                }
222
223                "Fail" => {
224                    let error = state_def["Error"]
225                        .as_str()
226                        .unwrap_or("States.Fail")
227                        .to_string();
228                    let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
229
230                    add_event(
231                        shared_state,
232                        execution_arn,
233                        "FailStateEntered",
234                        0,
235                        json!({
236                            "name": current_state,
237                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
238                        }),
239                    );
240
241                    return Err((error, cause));
242                }
243
244                "Task" => {
245                    let entered_event_id = add_event(
246                        shared_state,
247                        execution_arn,
248                        "TaskStateEntered",
249                        0,
250                        json!({
251                            "name": current_state,
252                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
253                        }),
254                    );
255
256                    let result = execute_task_state(
257                        &state_def,
258                        &effective_input,
259                        delivery,
260                        dynamodb_state,
261                        shared_state,
262                        execution_arn,
263                        entered_event_id,
264                    )
265                    .await;
266
267                    match result {
268                        Ok(output) => {
269                            add_event(
270                                shared_state,
271                                execution_arn,
272                                "TaskStateExited",
273                                entered_event_id,
274                                json!({
275                                    "name": current_state,
276                                    "output": serde_json::to_string(&output).unwrap_or_default(),
277                                }),
278                            );
279
280                            effective_input = output;
281
282                            match next_state(&state_def) {
283                                NextState::Name(next) => current_state = next,
284                                NextState::End => return Ok(effective_input),
285                                NextState::Error(msg) => {
286                                    return Err(("States.Runtime".to_string(), msg));
287                                }
288                            }
289                        }
290                        Err((error, cause)) => {
291                            let catchers =
292                                state_def["Catch"].as_array().cloned().unwrap_or_default();
293
294                            if let Some((next, result_path)) = find_catcher(&catchers, &error) {
295                                let error_output = json!({
296                                    "Error": error,
297                                    "Cause": cause,
298                                });
299                                effective_input = apply_result_path(
300                                    &effective_input,
301                                    &error_output,
302                                    result_path.as_deref(),
303                                );
304                                current_state = next;
305                            } else {
306                                return Err((error, cause));
307                            }
308                        }
309                    }
310                }
311
312                "Choice" => {
313                    let entered_event_id = add_event(
314                        shared_state,
315                        execution_arn,
316                        "ChoiceStateEntered",
317                        0,
318                        json!({
319                            "name": current_state,
320                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
321                        }),
322                    );
323
324                    let input_path = state_def["InputPath"].as_str();
325                    let processed_input = if input_path == Some("null") {
326                        json!({})
327                    } else {
328                        apply_input_path(&effective_input, input_path)
329                    };
330
331                    match evaluate_choice(&state_def, &processed_input) {
332                        Some(next) => {
333                            add_event(
334                                shared_state,
335                                execution_arn,
336                                "ChoiceStateExited",
337                                entered_event_id,
338                                json!({
339                                    "name": current_state,
340                                    "output": serde_json::to_string(&effective_input).unwrap_or_default(),
341                                }),
342                            );
343                            current_state = next;
344                        }
345                        None => {
346                            return Err((
347                                "States.NoChoiceMatched".to_string(),
348                                format!(
349                                    "No choice rule matched and no Default in state '{current_state}'"
350                                ),
351                            ));
352                        }
353                    }
354                }
355
356                "Wait" => {
357                    let entered_event_id = add_event(
358                        shared_state,
359                        execution_arn,
360                        "WaitStateEntered",
361                        0,
362                        json!({
363                            "name": current_state,
364                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
365                        }),
366                    );
367
368                    execute_wait_state(&state_def, &effective_input).await;
369
370                    add_event(
371                        shared_state,
372                        execution_arn,
373                        "WaitStateExited",
374                        entered_event_id,
375                        json!({
376                            "name": current_state,
377                            "output": serde_json::to_string(&effective_input).unwrap_or_default(),
378                        }),
379                    );
380
381                    match next_state(&state_def) {
382                        NextState::Name(next) => current_state = next,
383                        NextState::End => return Ok(effective_input),
384                        NextState::Error(msg) => {
385                            return Err(("States.Runtime".to_string(), msg));
386                        }
387                    }
388                }
389
390                "Parallel" => {
391                    let entered_event_id = add_event(
392                        shared_state,
393                        execution_arn,
394                        "ParallelStateEntered",
395                        0,
396                        json!({
397                            "name": current_state,
398                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
399                        }),
400                    );
401
402                    let result = execute_parallel_state(
403                        &state_def,
404                        &effective_input,
405                        delivery,
406                        dynamodb_state,
407                        shared_state,
408                        execution_arn,
409                    )
410                    .await;
411
412                    match result {
413                        Ok(output) => {
414                            add_event(
415                                shared_state,
416                                execution_arn,
417                                "ParallelStateExited",
418                                entered_event_id,
419                                json!({
420                                    "name": current_state,
421                                    "output": serde_json::to_string(&output).unwrap_or_default(),
422                                }),
423                            );
424
425                            effective_input = output;
426
427                            match next_state(&state_def) {
428                                NextState::Name(next) => current_state = next,
429                                NextState::End => return Ok(effective_input),
430                                NextState::Error(msg) => {
431                                    return Err(("States.Runtime".to_string(), msg));
432                                }
433                            }
434                        }
435                        Err((error, cause)) => {
436                            let catchers =
437                                state_def["Catch"].as_array().cloned().unwrap_or_default();
438
439                            if let Some((next, result_path)) = find_catcher(&catchers, &error) {
440                                let error_output = json!({
441                                    "Error": error,
442                                    "Cause": cause,
443                                });
444                                effective_input = apply_result_path(
445                                    &effective_input,
446                                    &error_output,
447                                    result_path.as_deref(),
448                                );
449                                current_state = next;
450                            } else {
451                                return Err((error, cause));
452                            }
453                        }
454                    }
455                }
456
457                "Map" => {
458                    let entered_event_id = add_event(
459                        shared_state,
460                        execution_arn,
461                        "MapStateEntered",
462                        0,
463                        json!({
464                            "name": current_state,
465                            "input": serde_json::to_string(&effective_input).unwrap_or_default(),
466                        }),
467                    );
468
469                    let result = execute_map_state(
470                        &state_def,
471                        &effective_input,
472                        delivery,
473                        dynamodb_state,
474                        shared_state,
475                        execution_arn,
476                    )
477                    .await;
478
479                    match result {
480                        Ok(output) => {
481                            add_event(
482                                shared_state,
483                                execution_arn,
484                                "MapStateExited",
485                                entered_event_id,
486                                json!({
487                                    "name": current_state,
488                                    "output": serde_json::to_string(&output).unwrap_or_default(),
489                                }),
490                            );
491
492                            effective_input = output;
493
494                            match next_state(&state_def) {
495                                NextState::Name(next) => current_state = next,
496                                NextState::End => return Ok(effective_input),
497                                NextState::Error(msg) => {
498                                    return Err(("States.Runtime".to_string(), msg));
499                                }
500                            }
501                        }
502                        Err((error, cause)) => {
503                            let catchers =
504                                state_def["Catch"].as_array().cloned().unwrap_or_default();
505
506                            if let Some((next, result_path)) = find_catcher(&catchers, &error) {
507                                let error_output = json!({
508                                    "Error": error,
509                                    "Cause": cause,
510                                });
511                                effective_input = apply_result_path(
512                                    &effective_input,
513                                    &error_output,
514                                    result_path.as_deref(),
515                                );
516                                current_state = next;
517                            } else {
518                                return Err((error, cause));
519                            }
520                        }
521                    }
522                }
523
524                other => {
525                    return Err((
526                        "States.Runtime".to_string(),
527                        format!("Unsupported state type: '{other}'"),
528                    ));
529                }
530            }
531        }
532    })
533}
534
535/// Execute a Wait state: pause execution for a specified duration or until a timestamp.
536async fn execute_wait_state(state_def: &Value, input: &Value) {
537    if let Some(seconds) = state_def["Seconds"].as_u64() {
538        tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
539        return;
540    }
541
542    if let Some(path) = state_def["SecondsPath"].as_str() {
543        let val = crate::io_processing::resolve_path(input, path);
544        if let Some(seconds) = val.as_u64() {
545            tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
546        }
547        return;
548    }
549
550    if let Some(ts_str) = state_def["Timestamp"].as_str() {
551        if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
552            let now = Utc::now();
553            let target_utc = target.with_timezone(&chrono::Utc);
554            if target_utc > now {
555                let duration = (target_utc - now).to_std().unwrap_or_default();
556                tokio::time::sleep(duration).await;
557            }
558        }
559        return;
560    }
561
562    if let Some(path) = state_def["TimestampPath"].as_str() {
563        let val = crate::io_processing::resolve_path(input, path);
564        if let Some(ts_str) = val.as_str() {
565            if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
566                let now = Utc::now();
567                let target_utc = target.with_timezone(&chrono::Utc);
568                if target_utc > now {
569                    let duration = (target_utc - now).to_std().unwrap_or_default();
570                    tokio::time::sleep(duration).await;
571                }
572            }
573        }
574        return;
575    }
576
577    warn!(
578        "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
579    );
580}
581
582/// Execute a Pass state: apply InputPath, use Result if present, apply ResultPath and OutputPath.
583fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
584    let input_path = state_def["InputPath"].as_str();
585    let result_path = state_def["ResultPath"].as_str();
586    let output_path = state_def["OutputPath"].as_str();
587
588    let effective_input = if input_path == Some("null") {
589        json!({})
590    } else {
591        apply_input_path(input, input_path)
592    };
593
594    let result = if let Some(r) = state_def.get("Result") {
595        r.clone()
596    } else {
597        effective_input.clone()
598    };
599
600    let after_result = if result_path == Some("null") {
601        input.clone()
602    } else {
603        apply_result_path(input, &result, result_path)
604    };
605
606    if output_path == Some("null") {
607        json!({})
608    } else {
609        apply_output_path(&after_result, output_path)
610    }
611}
612
613/// Execute a Task state: invoke the resource (Lambda, SQS, SNS, EventBridge, DynamoDB),
614/// apply I/O processing, handle Retry.
615async fn execute_task_state(
616    state_def: &Value,
617    input: &Value,
618    delivery: &Option<Arc<DeliveryBus>>,
619    dynamodb_state: &Option<SharedDynamoDbState>,
620    shared_state: &SharedStepFunctionsState,
621    execution_arn: &str,
622    entered_event_id: i64,
623) -> Result<Value, (String, String)> {
624    let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
625
626    let input_path = state_def["InputPath"].as_str();
627    let result_path = state_def["ResultPath"].as_str();
628    let output_path = state_def["OutputPath"].as_str();
629
630    let effective_input = if input_path == Some("null") {
631        json!({})
632    } else {
633        apply_input_path(input, input_path)
634    };
635
636    let task_input = if let Some(params) = state_def.get("Parameters") {
637        apply_parameters(params, &effective_input)
638    } else {
639        effective_input
640    };
641
642    let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
643    let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
644    let mut attempt = 0u32;
645
646    loop {
647        add_event(
648            shared_state,
649            execution_arn,
650            "TaskScheduled",
651            entered_event_id,
652            json!({
653                "resource": resource,
654                "region": "us-east-1",
655                "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
656            }),
657        );
658
659        add_event(
660            shared_state,
661            execution_arn,
662            "TaskStarted",
663            entered_event_id,
664            json!({ "resource": resource }),
665        );
666
667        let invoke_result = invoke_resource(
668            &resource,
669            &task_input,
670            delivery,
671            dynamodb_state,
672            timeout_seconds,
673        )
674        .await;
675
676        match invoke_result {
677            Ok(result) => {
678                add_event(
679                    shared_state,
680                    execution_arn,
681                    "TaskSucceeded",
682                    entered_event_id,
683                    json!({
684                        "resource": resource,
685                        "output": serde_json::to_string(&result).unwrap_or_default(),
686                    }),
687                );
688
689                let selected = if let Some(selector) = state_def.get("ResultSelector") {
690                    apply_parameters(selector, &result)
691                } else {
692                    result
693                };
694
695                let after_result = if result_path == Some("null") {
696                    input.clone()
697                } else {
698                    apply_result_path(input, &selected, result_path)
699                };
700
701                let output = if output_path == Some("null") {
702                    json!({})
703                } else {
704                    apply_output_path(&after_result, output_path)
705                };
706
707                return Ok(output);
708            }
709            Err((error, cause)) => {
710                add_event(
711                    shared_state,
712                    execution_arn,
713                    "TaskFailed",
714                    entered_event_id,
715                    json!({ "error": error, "cause": cause }),
716                );
717
718                if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
719                    attempt += 1;
720                    let actual_delay = delay_ms.min(5000);
721                    tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
722                    continue;
723                }
724
725                return Err((error, cause));
726            }
727        }
728    }
729}
730
731/// Execute a Parallel state: run all branches concurrently, collect results into an array.
732async fn execute_parallel_state(
733    state_def: &Value,
734    input: &Value,
735    delivery: &Option<Arc<DeliveryBus>>,
736    dynamodb_state: &Option<SharedDynamoDbState>,
737    shared_state: &SharedStepFunctionsState,
738    execution_arn: &str,
739) -> Result<Value, (String, String)> {
740    let input_path = state_def["InputPath"].as_str();
741    let result_path = state_def["ResultPath"].as_str();
742    let output_path = state_def["OutputPath"].as_str();
743
744    let effective_input = if input_path == Some("null") {
745        json!({})
746    } else {
747        apply_input_path(input, input_path)
748    };
749
750    let branches = state_def["Branches"]
751        .as_array()
752        .cloned()
753        .unwrap_or_default();
754
755    if branches.is_empty() {
756        return Err((
757            "States.Runtime".to_string(),
758            "Parallel state has no Branches".to_string(),
759        ));
760    }
761
762    // Spawn all branches concurrently
763    let mut handles = Vec::new();
764    for branch_def in &branches {
765        let branch = branch_def.clone();
766        let branch_input = effective_input.clone();
767        let delivery = delivery.clone();
768        let ddb = dynamodb_state.clone();
769        let state = shared_state.clone();
770        let arn = execution_arn.to_string();
771
772        handles.push(tokio::spawn(async move {
773            run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
774        }));
775    }
776
777    // Collect results in order
778    let mut results = Vec::with_capacity(handles.len());
779    for handle in handles {
780        let result = handle.await.map_err(|e| {
781            (
782                "States.Runtime".to_string(),
783                format!("Branch execution panicked: {e}"),
784            )
785        })??;
786        results.push(result);
787    }
788
789    let branch_output = Value::Array(results);
790
791    // Apply ResultSelector if present
792    let selected = if let Some(selector) = state_def.get("ResultSelector") {
793        apply_parameters(selector, &branch_output)
794    } else {
795        branch_output
796    };
797
798    // Apply ResultPath
799    let after_result = if result_path == Some("null") {
800        input.clone()
801    } else {
802        apply_result_path(input, &selected, result_path)
803    };
804
805    // Apply OutputPath
806    let output = if output_path == Some("null") {
807        json!({})
808    } else {
809        apply_output_path(&after_result, output_path)
810    };
811
812    Ok(output)
813}
814
815/// Execute a Map state: iterate over an array and run a sub-state machine per item.
816async fn execute_map_state(
817    state_def: &Value,
818    input: &Value,
819    delivery: &Option<Arc<DeliveryBus>>,
820    dynamodb_state: &Option<SharedDynamoDbState>,
821    shared_state: &SharedStepFunctionsState,
822    execution_arn: &str,
823) -> Result<Value, (String, String)> {
824    let input_path = state_def["InputPath"].as_str();
825    let result_path = state_def["ResultPath"].as_str();
826    let output_path = state_def["OutputPath"].as_str();
827
828    let effective_input = if input_path == Some("null") {
829        json!({})
830    } else {
831        apply_input_path(input, input_path)
832    };
833
834    // Get the items to iterate over
835    let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
836    let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
837    let items = items_value.as_array().cloned().unwrap_or_default();
838
839    // Get the iterator definition (ItemProcessor or Iterator for backwards compat)
840    let iterator_def = state_def
841        .get("ItemProcessor")
842        .or_else(|| state_def.get("Iterator"))
843        .cloned()
844        .ok_or_else(|| {
845            (
846                "States.Runtime".to_string(),
847                "Map state has no ItemProcessor or Iterator".to_string(),
848            )
849        })?;
850
851    let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
852    let effective_concurrency = if max_concurrency == 0 {
853        40
854    } else {
855        max_concurrency as usize
856    };
857
858    let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
859
860    // Process all items
861    let mut handles = Vec::new();
862    for (index, item) in items.into_iter().enumerate() {
863        let iter_def = iterator_def.clone();
864        let delivery = delivery.clone();
865        let ddb = dynamodb_state.clone();
866        let state = shared_state.clone();
867        let arn = execution_arn.to_string();
868        let sem = semaphore.clone();
869
870        // Apply ItemSelector if present
871        let item_input = if let Some(selector) = state_def.get("ItemSelector") {
872            let mut ctx = serde_json::Map::new();
873            ctx.insert("value".to_string(), item.clone());
874            ctx.insert("index".to_string(), json!(index));
875            apply_parameters(selector, &Value::Object(ctx))
876        } else {
877            item
878        };
879
880        add_event(
881            shared_state,
882            execution_arn,
883            "MapIterationStarted",
884            0,
885            json!({ "index": index }),
886        );
887
888        handles.push(tokio::spawn(async move {
889            let _permit = sem
890                .acquire()
891                .await
892                .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
893            let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
894            Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
895        }));
896    }
897
898    // Collect results in order
899    let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
900    for handle in handles {
901        let (index, result) = handle.await.map_err(|e| {
902            (
903                "States.Runtime".to_string(),
904                format!("Map iteration panicked: {e}"),
905            )
906        })??;
907
908        match result {
909            Ok(output) => {
910                add_event(
911                    shared_state,
912                    execution_arn,
913                    "MapIterationSucceeded",
914                    0,
915                    json!({ "index": index }),
916                );
917                results.push((index, output));
918            }
919            Err((error, cause)) => {
920                add_event(
921                    shared_state,
922                    execution_arn,
923                    "MapIterationFailed",
924                    0,
925                    json!({ "index": index, "error": error }),
926                );
927                return Err((error, cause));
928            }
929        }
930    }
931
932    // Sort by index to maintain order
933    results.sort_by_key(|(i, _)| *i);
934    let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
935
936    // Apply ResultSelector if present
937    let selected = if let Some(selector) = state_def.get("ResultSelector") {
938        apply_parameters(selector, &map_output)
939    } else {
940        map_output
941    };
942
943    // Apply ResultPath
944    let after_result = if result_path == Some("null") {
945        input.clone()
946    } else {
947        apply_result_path(input, &selected, result_path)
948    };
949
950    // Apply OutputPath
951    let output = if output_path == Some("null") {
952        json!({})
953    } else {
954        apply_output_path(&after_result, output_path)
955    };
956
957    Ok(output)
958}
959
960/// Invoke a resource (Lambda function or SDK integration).
961async fn invoke_resource(
962    resource: &str,
963    input: &Value,
964    delivery: &Option<Arc<DeliveryBus>>,
965    dynamodb_state: &Option<SharedDynamoDbState>,
966    timeout_seconds: Option<u64>,
967) -> Result<Value, (String, String)> {
968    // Direct Lambda ARN: arn:aws:lambda:<region>:<account>:function:<name>
969    if resource.contains(":lambda:") && resource.contains(":function:") {
970        return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
971    }
972
973    // SDK integration patterns: arn:aws:states:::<service>:<action>
974    if resource.starts_with("arn:aws:states:::lambda:invoke") {
975        let function_name = input["FunctionName"].as_str().unwrap_or("");
976        let payload = if let Some(p) = input.get("Payload") {
977            p.clone()
978        } else {
979            input.clone()
980        };
981        return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
982    }
983
984    if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
985        return invoke_sqs_send_message(input, delivery);
986    }
987
988    if resource.starts_with("arn:aws:states:::sns:publish") {
989        return invoke_sns_publish(input, delivery);
990    }
991
992    if resource.starts_with("arn:aws:states:::events:putEvents") {
993        return invoke_eventbridge_put_events(input, delivery);
994    }
995
996    if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
997        return invoke_dynamodb_get_item(input, dynamodb_state);
998    }
999
1000    if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1001        return invoke_dynamodb_put_item(input, dynamodb_state);
1002    }
1003
1004    if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1005        return invoke_dynamodb_delete_item(input, dynamodb_state);
1006    }
1007
1008    if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1009        return invoke_dynamodb_update_item(input, dynamodb_state);
1010    }
1011
1012    Err((
1013        "States.TaskFailed".to_string(),
1014        format!("Unsupported resource: {resource}"),
1015    ))
1016}
1017
1018/// Send a message to an SQS queue via DeliveryBus.
1019fn invoke_sqs_send_message(
1020    input: &Value,
1021    delivery: &Option<Arc<DeliveryBus>>,
1022) -> Result<Value, (String, String)> {
1023    let delivery = delivery.as_ref().ok_or_else(|| {
1024        (
1025            "States.TaskFailed".to_string(),
1026            "No delivery bus configured for SQS".to_string(),
1027        )
1028    })?;
1029
1030    let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1031        (
1032            "States.TaskFailed".to_string(),
1033            "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1034        )
1035    })?;
1036
1037    let message_body = input["MessageBody"]
1038        .as_str()
1039        .map(|s| s.to_string())
1040        .unwrap_or_else(|| {
1041            // If MessageBody is not a string, serialize the value
1042            serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1043        });
1044
1045    // Convert QueueUrl to ARN format for the delivery bus
1046    // QueueUrl format: http://.../<account>/<queue-name>
1047    // ARN format: arn:aws:sqs:<region>:<account>:<queue-name>
1048    let queue_arn = queue_url_to_arn(queue_url);
1049
1050    delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1051
1052    Ok(json!({
1053        "MessageId": uuid::Uuid::new_v4().to_string(),
1054        "MD5OfMessageBody": md5_hex(&message_body),
1055    }))
1056}
1057
1058/// Publish a message to an SNS topic via DeliveryBus.
1059fn invoke_sns_publish(
1060    input: &Value,
1061    delivery: &Option<Arc<DeliveryBus>>,
1062) -> Result<Value, (String, String)> {
1063    let delivery = delivery.as_ref().ok_or_else(|| {
1064        (
1065            "States.TaskFailed".to_string(),
1066            "No delivery bus configured for SNS".to_string(),
1067        )
1068    })?;
1069
1070    let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1071        (
1072            "States.TaskFailed".to_string(),
1073            "Missing TopicArn in SNS publish parameters".to_string(),
1074        )
1075    })?;
1076
1077    let message = input["Message"]
1078        .as_str()
1079        .map(|s| s.to_string())
1080        .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1081
1082    let subject = input["Subject"].as_str();
1083
1084    delivery.publish_to_sns(topic_arn, &message, subject);
1085
1086    Ok(json!({
1087        "MessageId": uuid::Uuid::new_v4().to_string(),
1088    }))
1089}
1090
1091/// Put events onto an EventBridge bus via DeliveryBus.
1092fn invoke_eventbridge_put_events(
1093    input: &Value,
1094    delivery: &Option<Arc<DeliveryBus>>,
1095) -> Result<Value, (String, String)> {
1096    let delivery = delivery.as_ref().ok_or_else(|| {
1097        (
1098            "States.TaskFailed".to_string(),
1099            "No delivery bus configured for EventBridge".to_string(),
1100        )
1101    })?;
1102
1103    let entries = input["Entries"]
1104        .as_array()
1105        .ok_or_else(|| {
1106            (
1107                "States.TaskFailed".to_string(),
1108                "Missing Entries in EventBridge putEvents parameters".to_string(),
1109            )
1110        })?
1111        .clone();
1112
1113    let mut event_ids = Vec::new();
1114    for entry in &entries {
1115        let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1116        let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1117        let detail = entry["Detail"]
1118            .as_str()
1119            .map(|s| s.to_string())
1120            .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1121        let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1122
1123        delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1124        event_ids.push(uuid::Uuid::new_v4().to_string());
1125    }
1126
1127    Ok(json!({
1128        "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1129        "FailedEntryCount": 0,
1130    }))
1131}
1132
1133/// Get an item from DynamoDB via direct state access.
1134fn invoke_dynamodb_get_item(
1135    input: &Value,
1136    dynamodb_state: &Option<SharedDynamoDbState>,
1137) -> Result<Value, (String, String)> {
1138    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1139        (
1140            "States.TaskFailed".to_string(),
1141            "No DynamoDB state configured".to_string(),
1142        )
1143    })?;
1144
1145    let table_name = input["TableName"].as_str().ok_or_else(|| {
1146        (
1147            "States.TaskFailed".to_string(),
1148            "Missing TableName in DynamoDB getItem parameters".to_string(),
1149        )
1150    })?;
1151
1152    let key = input
1153        .get("Key")
1154        .and_then(|k| k.as_object())
1155        .ok_or_else(|| {
1156            (
1157                "States.TaskFailed".to_string(),
1158                "Missing Key in DynamoDB getItem parameters".to_string(),
1159            )
1160        })?;
1161
1162    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1163
1164    let state = ddb.read();
1165    let table = state.tables.get(table_name).ok_or_else(|| {
1166        (
1167            "States.TaskFailed".to_string(),
1168            format!("Table '{table_name}' not found"),
1169        )
1170    })?;
1171
1172    let item = table
1173        .find_item_index(&key_map)
1174        .map(|idx| table.items[idx].clone());
1175
1176    match item {
1177        Some(item_map) => {
1178            let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1179            Ok(json!({ "Item": item_value }))
1180        }
1181        None => Ok(json!({})),
1182    }
1183}
1184
1185/// Put an item into DynamoDB via direct state access.
1186fn invoke_dynamodb_put_item(
1187    input: &Value,
1188    dynamodb_state: &Option<SharedDynamoDbState>,
1189) -> Result<Value, (String, String)> {
1190    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1191        (
1192            "States.TaskFailed".to_string(),
1193            "No DynamoDB state configured".to_string(),
1194        )
1195    })?;
1196
1197    let table_name = input["TableName"].as_str().ok_or_else(|| {
1198        (
1199            "States.TaskFailed".to_string(),
1200            "Missing TableName in DynamoDB putItem parameters".to_string(),
1201        )
1202    })?;
1203
1204    let item = input
1205        .get("Item")
1206        .and_then(|i| i.as_object())
1207        .ok_or_else(|| {
1208            (
1209                "States.TaskFailed".to_string(),
1210                "Missing Item in DynamoDB putItem parameters".to_string(),
1211            )
1212        })?;
1213
1214    let item_map: HashMap<String, Value> =
1215        item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1216
1217    let mut state = ddb.write();
1218    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1219        (
1220            "States.TaskFailed".to_string(),
1221            format!("Table '{table_name}' not found"),
1222        )
1223    })?;
1224
1225    // Replace existing item with same key, or insert new
1226    if let Some(idx) = table.find_item_index(&item_map) {
1227        table.items[idx] = item_map;
1228    } else {
1229        table.items.push(item_map);
1230    }
1231
1232    Ok(json!({}))
1233}
1234
1235/// Delete an item from DynamoDB via direct state access.
1236fn invoke_dynamodb_delete_item(
1237    input: &Value,
1238    dynamodb_state: &Option<SharedDynamoDbState>,
1239) -> Result<Value, (String, String)> {
1240    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1241        (
1242            "States.TaskFailed".to_string(),
1243            "No DynamoDB state configured".to_string(),
1244        )
1245    })?;
1246
1247    let table_name = input["TableName"].as_str().ok_or_else(|| {
1248        (
1249            "States.TaskFailed".to_string(),
1250            "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1251        )
1252    })?;
1253
1254    let key = input
1255        .get("Key")
1256        .and_then(|k| k.as_object())
1257        .ok_or_else(|| {
1258            (
1259                "States.TaskFailed".to_string(),
1260                "Missing Key in DynamoDB deleteItem parameters".to_string(),
1261            )
1262        })?;
1263
1264    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1265
1266    let mut state = ddb.write();
1267    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1268        (
1269            "States.TaskFailed".to_string(),
1270            format!("Table '{table_name}' not found"),
1271        )
1272    })?;
1273
1274    if let Some(idx) = table.find_item_index(&key_map) {
1275        table.items.remove(idx);
1276    }
1277
1278    Ok(json!({}))
1279}
1280
1281/// Update an item in DynamoDB via direct state access.
1282/// Simplified: merges Key+ExpressionAttributeValues into the existing item.
1283fn invoke_dynamodb_update_item(
1284    input: &Value,
1285    dynamodb_state: &Option<SharedDynamoDbState>,
1286) -> Result<Value, (String, String)> {
1287    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1288        (
1289            "States.TaskFailed".to_string(),
1290            "No DynamoDB state configured".to_string(),
1291        )
1292    })?;
1293
1294    let table_name = input["TableName"].as_str().ok_or_else(|| {
1295        (
1296            "States.TaskFailed".to_string(),
1297            "Missing TableName in DynamoDB updateItem parameters".to_string(),
1298        )
1299    })?;
1300
1301    let key = input
1302        .get("Key")
1303        .and_then(|k| k.as_object())
1304        .ok_or_else(|| {
1305            (
1306                "States.TaskFailed".to_string(),
1307                "Missing Key in DynamoDB updateItem parameters".to_string(),
1308            )
1309        })?;
1310
1311    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1312
1313    let mut state = ddb.write();
1314    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1315        (
1316            "States.TaskFailed".to_string(),
1317            format!("Table '{table_name}' not found"),
1318        )
1319    })?;
1320
1321    // Parse UpdateExpression to apply SET operations
1322    if let Some(update_expr) = input["UpdateExpression"].as_str() {
1323        let attr_values = input
1324            .get("ExpressionAttributeValues")
1325            .and_then(|v| v.as_object())
1326            .cloned()
1327            .unwrap_or_default();
1328        let attr_names = input
1329            .get("ExpressionAttributeNames")
1330            .and_then(|v| v.as_object())
1331            .cloned()
1332            .unwrap_or_default();
1333
1334        if let Some(idx) = table.find_item_index(&key_map) {
1335            apply_update_expression(
1336                &mut table.items[idx],
1337                update_expr,
1338                &attr_values,
1339                &attr_names,
1340            );
1341        } else {
1342            // Create new item with key + update expression values
1343            let mut new_item = key_map;
1344            apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1345            table.items.push(new_item);
1346        }
1347    }
1348
1349    Ok(json!({}))
1350}
1351
1352/// Apply a simple SET UpdateExpression to an item.
1353fn apply_update_expression(
1354    item: &mut HashMap<String, Value>,
1355    expr: &str,
1356    attr_values: &serde_json::Map<String, Value>,
1357    attr_names: &serde_json::Map<String, Value>,
1358) {
1359    // Parse "SET #name1 = :val1, #name2 = :val2" or "SET field = :val"
1360    // DynamoDB keywords are case-insensitive
1361    let trimmed = expr.trim();
1362    let set_part = if trimmed.len() >= 4 && trimmed[..4].eq_ignore_ascii_case("SET ") {
1363        &trimmed[4..]
1364    } else {
1365        trimmed
1366    };
1367
1368    for assignment in set_part.split(',') {
1369        let parts: Vec<&str> = assignment.splitn(2, '=').collect();
1370        if parts.len() == 2 {
1371            let attr_ref = parts[0].trim();
1372            let val_ref = parts[1].trim();
1373
1374            // Resolve attribute name (could be #alias or direct name)
1375            let attr_name = if attr_ref.starts_with('#') {
1376                attr_names
1377                    .get(attr_ref)
1378                    .and_then(|v| v.as_str())
1379                    .unwrap_or(attr_ref)
1380                    .to_string()
1381            } else {
1382                attr_ref.to_string()
1383            };
1384
1385            // Resolve value
1386            if val_ref.starts_with(':') {
1387                if let Some(val) = attr_values.get(val_ref) {
1388                    item.insert(attr_name, val.clone());
1389                }
1390            }
1391        }
1392    }
1393}
1394
1395/// Convert an SQS queue URL to an ARN.
1396/// QueueUrl format: http://localhost:4566/123456789012/my-queue
1397fn queue_url_to_arn(url: &str) -> String {
1398    let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1399    if parts.len() >= 2 {
1400        let queue_name = parts[0];
1401        let account_id = parts[1];
1402        format!("arn:aws:sqs:us-east-1:{account_id}:{queue_name}")
1403    } else {
1404        url.to_string()
1405    }
1406}
1407
1408/// Compute MD5 hex digest for SQS message response format.
1409fn md5_hex(data: &str) -> String {
1410    use md5::Digest;
1411    let result = md5::Md5::digest(data.as_bytes());
1412    format!("{result:032x}")
1413}
1414
1415/// Invoke a Lambda function directly via DeliveryBus.
1416async fn invoke_lambda_direct(
1417    function_arn: &str,
1418    input: &Value,
1419    delivery: &Option<Arc<DeliveryBus>>,
1420    timeout_seconds: Option<u64>,
1421) -> Result<Value, (String, String)> {
1422    let delivery = delivery.as_ref().ok_or_else(|| {
1423        (
1424            "States.TaskFailed".to_string(),
1425            "No delivery bus configured for Lambda invocation".to_string(),
1426        )
1427    })?;
1428
1429    let payload = serde_json::to_string(input).unwrap_or_default();
1430
1431    let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1432
1433    let result = if let Some(timeout) = timeout_seconds {
1434        match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1435            Ok(r) => r,
1436            Err(_) => {
1437                return Err((
1438                    "States.Timeout".to_string(),
1439                    format!("Task timed out after {timeout} seconds"),
1440                ));
1441            }
1442        }
1443    } else {
1444        invoke_future.await
1445    };
1446
1447    match result {
1448        Some(Ok(bytes)) => {
1449            let response_str = String::from_utf8_lossy(&bytes);
1450            let value: Value =
1451                serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1452            Ok(value)
1453        }
1454        Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1455        None => {
1456            // No runtime available — return empty result
1457            Ok(json!({}))
1458        }
1459    }
1460}
1461
1462/// Apply Parameters template: keys ending with .$ are treated as JsonPath references.
1463fn apply_parameters(template: &Value, input: &Value) -> Value {
1464    match template {
1465        Value::Object(map) => {
1466            let mut result = serde_json::Map::new();
1467            for (key, value) in map {
1468                if let Some(stripped) = key.strip_suffix(".$") {
1469                    if let Some(path) = value.as_str() {
1470                        result.insert(
1471                            stripped.to_string(),
1472                            crate::io_processing::resolve_path(input, path),
1473                        );
1474                    }
1475                } else {
1476                    result.insert(key.clone(), apply_parameters(value, input));
1477                }
1478            }
1479            Value::Object(result)
1480        }
1481        Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1482        other => other.clone(),
1483    }
1484}
1485
1486enum NextState {
1487    Name(String),
1488    End,
1489    Error(String),
1490}
1491
1492fn next_state(state_def: &Value) -> NextState {
1493    if state_def["End"].as_bool() == Some(true) {
1494        return NextState::End;
1495    }
1496    match state_def["Next"].as_str() {
1497        Some(next) => NextState::Name(next.to_string()),
1498        None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
1499    }
1500}
1501
1502fn add_event(
1503    state: &SharedStepFunctionsState,
1504    execution_arn: &str,
1505    event_type: &str,
1506    previous_event_id: i64,
1507    details: Value,
1508) -> i64 {
1509    let mut s = state.write();
1510    if let Some(exec) = s.executions.get_mut(execution_arn) {
1511        let id = exec.history_events.len() as i64 + 1;
1512        exec.history_events.push(HistoryEvent {
1513            id,
1514            event_type: event_type.to_string(),
1515            timestamp: Utc::now(),
1516            previous_event_id,
1517            details,
1518        });
1519        id
1520    } else {
1521        0
1522    }
1523}
1524
1525fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
1526    // Check terminal status before recording events to avoid inconsistent history
1527    {
1528        let s = state.read();
1529        if let Some(exec) = s.executions.get(execution_arn) {
1530            if exec.status != ExecutionStatus::Running {
1531                return;
1532            }
1533        }
1534    }
1535
1536    let output_str = serde_json::to_string(output).unwrap_or_default();
1537
1538    add_event(
1539        state,
1540        execution_arn,
1541        "ExecutionSucceeded",
1542        0,
1543        json!({ "output": output_str }),
1544    );
1545
1546    let mut s = state.write();
1547    if let Some(exec) = s.executions.get_mut(execution_arn) {
1548        exec.status = ExecutionStatus::Succeeded;
1549        exec.output = Some(output_str);
1550        exec.stop_date = Some(Utc::now());
1551    }
1552}
1553
1554fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
1555    // Check terminal status before recording events to avoid inconsistent history
1556    {
1557        let s = state.read();
1558        if let Some(exec) = s.executions.get(execution_arn) {
1559            if exec.status != ExecutionStatus::Running {
1560                return;
1561            }
1562        }
1563    }
1564
1565    add_event(
1566        state,
1567        execution_arn,
1568        "ExecutionFailed",
1569        0,
1570        json!({ "error": error, "cause": cause }),
1571    );
1572
1573    let mut s = state.write();
1574    if let Some(exec) = s.executions.get_mut(execution_arn) {
1575        exec.status = ExecutionStatus::Failed;
1576        exec.error = Some(error.to_string());
1577        exec.cause = Some(cause.to_string());
1578        exec.stop_date = Some(Utc::now());
1579    }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584    use super::*;
1585    use crate::state::{Execution, StepFunctionsState};
1586    use parking_lot::RwLock;
1587    use std::sync::Arc;
1588
1589    fn make_state() -> SharedStepFunctionsState {
1590        Arc::new(RwLock::new(StepFunctionsState::new(
1591            "123456789012",
1592            "us-east-1",
1593        )))
1594    }
1595
1596    fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
1597        let mut s = state.write();
1598        s.executions.insert(
1599            arn.to_string(),
1600            Execution {
1601                execution_arn: arn.to_string(),
1602                state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
1603                    .to_string(),
1604                state_machine_name: "test".to_string(),
1605                name: "exec-1".to_string(),
1606                status: ExecutionStatus::Running,
1607                input,
1608                output: None,
1609                start_date: Utc::now(),
1610                stop_date: None,
1611                error: None,
1612                cause: None,
1613                history_events: vec![],
1614            },
1615        );
1616    }
1617
1618    #[tokio::test]
1619    async fn test_simple_pass_state() {
1620        let state = make_state();
1621        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1622        create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
1623
1624        let definition = json!({
1625            "StartAt": "PassState",
1626            "States": {
1627                "PassState": {
1628                    "Type": "Pass",
1629                    "Result": {"processed": true},
1630                    "End": true
1631                }
1632            }
1633        })
1634        .to_string();
1635
1636        execute_state_machine(
1637            state.clone(),
1638            arn.to_string(),
1639            definition,
1640            Some(r#"{"hello":"world"}"#.to_string()),
1641            None,
1642            None,
1643        )
1644        .await;
1645
1646        let s = state.read();
1647        let exec = s.executions.get(arn).unwrap();
1648        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1649        assert!(exec.output.is_some());
1650        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1651        assert_eq!(output, json!({"processed": true}));
1652    }
1653
1654    #[tokio::test]
1655    async fn test_pass_chain() {
1656        let state = make_state();
1657        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1658        create_execution(&state, arn, Some(r#"{}"#.to_string()));
1659
1660        let definition = json!({
1661            "StartAt": "First",
1662            "States": {
1663                "First": {
1664                    "Type": "Pass",
1665                    "Result": "step1",
1666                    "ResultPath": "$.first",
1667                    "Next": "Second"
1668                },
1669                "Second": {
1670                    "Type": "Pass",
1671                    "Result": "step2",
1672                    "ResultPath": "$.second",
1673                    "End": true
1674                }
1675            }
1676        })
1677        .to_string();
1678
1679        execute_state_machine(
1680            state.clone(),
1681            arn.to_string(),
1682            definition,
1683            Some("{}".to_string()),
1684            None,
1685            None,
1686        )
1687        .await;
1688
1689        let s = state.read();
1690        let exec = s.executions.get(arn).unwrap();
1691        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1692        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
1693        assert_eq!(output["first"], json!("step1"));
1694        assert_eq!(output["second"], json!("step2"));
1695    }
1696
1697    #[tokio::test]
1698    async fn test_succeed_state() {
1699        let state = make_state();
1700        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1701        create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
1702
1703        let definition = json!({
1704            "StartAt": "Done",
1705            "States": {
1706                "Done": {
1707                    "Type": "Succeed"
1708                }
1709            }
1710        })
1711        .to_string();
1712
1713        execute_state_machine(
1714            state.clone(),
1715            arn.to_string(),
1716            definition,
1717            Some(r#"{"data": "value"}"#.to_string()),
1718            None,
1719            None,
1720        )
1721        .await;
1722
1723        let s = state.read();
1724        let exec = s.executions.get(arn).unwrap();
1725        assert_eq!(exec.status, ExecutionStatus::Succeeded);
1726    }
1727
1728    #[tokio::test]
1729    async fn test_fail_state() {
1730        let state = make_state();
1731        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1732        create_execution(&state, arn, None);
1733
1734        let definition = json!({
1735            "StartAt": "FailState",
1736            "States": {
1737                "FailState": {
1738                    "Type": "Fail",
1739                    "Error": "CustomError",
1740                    "Cause": "Something went wrong"
1741                }
1742            }
1743        })
1744        .to_string();
1745
1746        execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
1747
1748        let s = state.read();
1749        let exec = s.executions.get(arn).unwrap();
1750        assert_eq!(exec.status, ExecutionStatus::Failed);
1751        assert_eq!(exec.error.as_deref(), Some("CustomError"));
1752        assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
1753    }
1754
1755    #[tokio::test]
1756    async fn test_history_events_recorded() {
1757        let state = make_state();
1758        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
1759        create_execution(&state, arn, Some("{}".to_string()));
1760
1761        let definition = json!({
1762            "StartAt": "PassState",
1763            "States": {
1764                "PassState": {
1765                    "Type": "Pass",
1766                    "End": true
1767                }
1768            }
1769        })
1770        .to_string();
1771
1772        execute_state_machine(
1773            state.clone(),
1774            arn.to_string(),
1775            definition,
1776            Some("{}".to_string()),
1777            None,
1778            None,
1779        )
1780        .await;
1781
1782        let s = state.read();
1783        let exec = s.executions.get(arn).unwrap();
1784        let event_types: Vec<&str> = exec
1785            .history_events
1786            .iter()
1787            .map(|e| e.event_type.as_str())
1788            .collect();
1789        assert_eq!(
1790            event_types,
1791            vec![
1792                "ExecutionStarted",
1793                "PassStateEntered",
1794                "PassStateExited",
1795                "ExecutionSucceeded"
1796            ]
1797        );
1798    }
1799}