Skip to main content

fakecloud_stepfunctions/
interpreter.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::Utc;
5use serde_json::{json, Value};
6use tracing::{debug, warn};
7
8use fakecloud_aws::arn::Arn;
9use fakecloud_core::delivery::DeliveryBus;
10use fakecloud_dynamodb::state::SharedDynamoDbState;
11
12use crate::choice::evaluate_choice;
13use crate::error_handling::{find_catcher, should_retry};
14use crate::io_processing::{apply_input_path, apply_output_path, apply_result_path};
15use crate::state::{ExecutionStatus, HistoryEvent, SharedStepFunctionsState};
16
17/// Execute a state machine definition with the given input.
18/// Updates the execution record in shared state as it progresses.
19pub async fn execute_state_machine(
20    state: SharedStepFunctionsState,
21    execution_arn: String,
22    definition: String,
23    input: Option<String>,
24    delivery: Option<Arc<DeliveryBus>>,
25    dynamodb_state: Option<SharedDynamoDbState>,
26) {
27    let def: Value = match serde_json::from_str(&definition) {
28        Ok(v) => v,
29        Err(e) => {
30            fail_execution(
31                &state,
32                &execution_arn,
33                "States.Runtime",
34                &format!("Failed to parse definition: {e}"),
35            );
36            return;
37        }
38    };
39
40    let raw_input: Value = input
41        .as_deref()
42        .and_then(|s| serde_json::from_str(s).ok())
43        .unwrap_or(json!({}));
44
45    // Record ExecutionStarted event
46    add_event(
47        &state,
48        &execution_arn,
49        "ExecutionStarted",
50        0,
51        json!({
52            "input": serde_json::to_string(&raw_input).unwrap_or_default(),
53            "roleArn": "arn:aws:iam::123456789012:role/execution-role"
54        }),
55    );
56
57    // Run the state machine inside an inner tokio::spawn so that any panic
58    // bubbles up as a JoinError instead of tearing down the caller. Without
59    // this the panic propagates through the outer spawn in `start_execution`
60    // which leaves the execution stuck in Running and leaks the panic to
61    // tokio's default hook.
62    let def_owned = def;
63    let state_clone = state.clone();
64    let execution_arn_clone = execution_arn.clone();
65    let delivery_clone = delivery.clone();
66    let dynamodb_state_clone = dynamodb_state.clone();
67    let handle = tokio::spawn(async move {
68        run_states(
69            &def_owned,
70            raw_input,
71            &delivery_clone,
72            &dynamodb_state_clone,
73            &state_clone,
74            &execution_arn_clone,
75        )
76        .await
77    });
78
79    match handle.await {
80        Ok(Ok(output)) => {
81            succeed_execution(&state, &execution_arn, &output);
82        }
83        Ok(Err((error, cause))) => {
84            fail_execution(&state, &execution_arn, &error, &cause);
85        }
86        Err(join_err) => {
87            let msg = if join_err.is_panic() {
88                let payload = join_err.into_panic();
89                if let Some(s) = payload.downcast_ref::<String>() {
90                    s.clone()
91                } else if let Some(s) = payload.downcast_ref::<&'static str>() {
92                    (*s).to_string()
93                } else {
94                    "execution task panicked".to_string()
95                }
96            } else {
97                format!("execution task cancelled: {join_err}")
98            };
99            tracing::error!(
100                execution_arn = %execution_arn,
101                panic = %msg,
102                "Step Functions execution panicked"
103            );
104            fail_execution(&state, &execution_arn, "States.Runtime", &msg);
105        }
106    }
107}
108
109type StatesResult<'a> = std::pin::Pin<
110    Box<dyn std::future::Future<Output = Result<Value, (String, String)>> + Send + 'a>,
111>;
112
113/// Core execution loop: runs through states in a definition and returns the output.
114/// Used by the top-level executor, Parallel branches, and Map iterations.
115fn run_states<'a>(
116    def: &'a Value,
117    input: Value,
118    delivery: &'a Option<Arc<DeliveryBus>>,
119    dynamodb_state: &'a Option<SharedDynamoDbState>,
120    shared_state: &'a SharedStepFunctionsState,
121    execution_arn: &'a str,
122) -> StatesResult<'a> {
123    Box::pin(async move {
124        let start_at = def["StartAt"]
125            .as_str()
126            .ok_or_else(|| {
127                (
128                    "States.Runtime".to_string(),
129                    "Missing StartAt in definition".to_string(),
130                )
131            })?
132            .to_string();
133
134        let states = def.get("States").ok_or_else(|| {
135            (
136                "States.Runtime".to_string(),
137                "Missing States in definition".to_string(),
138            )
139        })?;
140
141        let mut current_state = start_at;
142        let mut effective_input = input;
143        let mut iteration = 0;
144        let max_iterations = 500;
145
146        loop {
147            iteration += 1;
148            if iteration > max_iterations {
149                return Err((
150                    "States.Runtime".to_string(),
151                    "Maximum number of state transitions exceeded".to_string(),
152                ));
153            }
154
155            let state_def = states.get(&current_state).cloned().ok_or_else(|| {
156                (
157                    "States.Runtime".to_string(),
158                    format!("State '{current_state}' not found in definition"),
159                )
160            })?;
161
162            let state_type = state_def["Type"]
163                .as_str()
164                .ok_or_else(|| {
165                    (
166                        "States.Runtime".to_string(),
167                        format!("State '{current_state}' missing Type field"),
168                    )
169                })?
170                .to_string();
171
172            debug!(
173                execution_arn = %execution_arn,
174                state = %current_state,
175                state_type = %state_type,
176                "Executing state"
177            );
178
179            let advance = match state_type.as_str() {
180                "Pass" => run_pass_state(
181                    &current_state,
182                    &state_def,
183                    effective_input,
184                    shared_state,
185                    execution_arn,
186                ),
187                "Succeed" => run_succeed_state(
188                    &current_state,
189                    &state_def,
190                    effective_input,
191                    shared_state,
192                    execution_arn,
193                ),
194                "Fail" => run_fail_state(
195                    &current_state,
196                    &state_def,
197                    effective_input,
198                    shared_state,
199                    execution_arn,
200                ),
201                "Choice" => run_choice_state(
202                    &current_state,
203                    &state_def,
204                    effective_input,
205                    shared_state,
206                    execution_arn,
207                ),
208                "Wait" => {
209                    run_wait_state(
210                        &current_state,
211                        &state_def,
212                        effective_input,
213                        shared_state,
214                        execution_arn,
215                    )
216                    .await
217                }
218                "Task" => {
219                    run_task_state(
220                        &current_state,
221                        &state_def,
222                        effective_input,
223                        delivery,
224                        dynamodb_state,
225                        shared_state,
226                        execution_arn,
227                    )
228                    .await
229                }
230                "Parallel" => {
231                    run_parallel_state(
232                        &current_state,
233                        &state_def,
234                        effective_input,
235                        delivery,
236                        dynamodb_state,
237                        shared_state,
238                        execution_arn,
239                    )
240                    .await
241                }
242                "Map" => {
243                    run_map_state(
244                        &current_state,
245                        &state_def,
246                        effective_input,
247                        delivery,
248                        dynamodb_state,
249                        shared_state,
250                        execution_arn,
251                    )
252                    .await
253                }
254                other => Advance::Fail(
255                    "States.Runtime".to_string(),
256                    format!("Unsupported state type: '{other}'"),
257                ),
258            };
259
260            match advance {
261                Advance::Next(next, new_input) => {
262                    effective_input = new_input;
263                    current_state = next;
264                }
265                Advance::End(output) => return Ok(output),
266                Advance::Fail(error, cause) => return Err((error, cause)),
267            }
268        }
269    })
270}
271
272/// Result of executing a single state in the state machine.
273enum Advance {
274    /// Continue to the given state with the given input.
275    Next(String, Value),
276    /// Terminate the state machine with the given output.
277    End(Value),
278    /// Fail the state machine with the given error and cause.
279    Fail(String, String),
280}
281
282fn advance_from_next(state_def: &Value, input: Value) -> Advance {
283    match next_state(state_def) {
284        NextState::Name(next) => Advance::Next(next, input),
285        NextState::End => Advance::End(input),
286        NextState::Error(msg) => Advance::Fail("States.Runtime".to_string(), msg),
287    }
288}
289
290fn advance_from_error(state_def: &Value, input: &Value, error: String, cause: String) -> Advance {
291    match apply_state_catcher(state_def, input, &error, &cause) {
292        Some((next, new_input)) => Advance::Next(next, new_input),
293        None => Advance::Fail(error, cause),
294    }
295}
296
297fn run_pass_state(
298    name: &str,
299    state_def: &Value,
300    input: Value,
301    shared_state: &SharedStepFunctionsState,
302    execution_arn: &str,
303) -> Advance {
304    let entered_event_id = add_event(
305        shared_state,
306        execution_arn,
307        "PassStateEntered",
308        0,
309        json!({
310            "name": name,
311            "input": serde_json::to_string(&input).unwrap_or_default(),
312        }),
313    );
314
315    let result = execute_pass_state(state_def, &input);
316
317    add_event(
318        shared_state,
319        execution_arn,
320        "PassStateExited",
321        entered_event_id,
322        json!({
323            "name": name,
324            "output": serde_json::to_string(&result).unwrap_or_default(),
325        }),
326    );
327
328    advance_from_next(state_def, result)
329}
330
331fn run_succeed_state(
332    name: &str,
333    state_def: &Value,
334    input: Value,
335    shared_state: &SharedStepFunctionsState,
336    execution_arn: &str,
337) -> Advance {
338    add_event(
339        shared_state,
340        execution_arn,
341        "SucceedStateEntered",
342        0,
343        json!({
344            "name": name,
345            "input": serde_json::to_string(&input).unwrap_or_default(),
346        }),
347    );
348
349    let input_path = state_def["InputPath"].as_str();
350    let output_path = state_def["OutputPath"].as_str();
351
352    let processed = if input_path == Some("null") {
353        json!({})
354    } else {
355        apply_input_path(&input, input_path)
356    };
357
358    let output = if output_path == Some("null") {
359        json!({})
360    } else {
361        apply_output_path(&processed, output_path)
362    };
363
364    add_event(
365        shared_state,
366        execution_arn,
367        "SucceedStateExited",
368        0,
369        json!({
370            "name": name,
371            "output": serde_json::to_string(&output).unwrap_or_default(),
372        }),
373    );
374
375    Advance::End(output)
376}
377
378fn run_fail_state(
379    name: &str,
380    state_def: &Value,
381    input: Value,
382    shared_state: &SharedStepFunctionsState,
383    execution_arn: &str,
384) -> Advance {
385    let error = state_def["Error"]
386        .as_str()
387        .unwrap_or("States.Fail")
388        .to_string();
389    let cause = state_def["Cause"].as_str().unwrap_or("").to_string();
390
391    add_event(
392        shared_state,
393        execution_arn,
394        "FailStateEntered",
395        0,
396        json!({
397            "name": name,
398            "input": serde_json::to_string(&input).unwrap_or_default(),
399        }),
400    );
401
402    Advance::Fail(error, cause)
403}
404
405fn run_choice_state(
406    name: &str,
407    state_def: &Value,
408    input: Value,
409    shared_state: &SharedStepFunctionsState,
410    execution_arn: &str,
411) -> Advance {
412    let entered_event_id = add_event(
413        shared_state,
414        execution_arn,
415        "ChoiceStateEntered",
416        0,
417        json!({
418            "name": name,
419            "input": serde_json::to_string(&input).unwrap_or_default(),
420        }),
421    );
422
423    let input_path = state_def["InputPath"].as_str();
424    let processed_input = if input_path == Some("null") {
425        json!({})
426    } else {
427        apply_input_path(&input, input_path)
428    };
429
430    match evaluate_choice(state_def, &processed_input) {
431        Some(next) => {
432            add_event(
433                shared_state,
434                execution_arn,
435                "ChoiceStateExited",
436                entered_event_id,
437                json!({
438                    "name": name,
439                    "output": serde_json::to_string(&input).unwrap_or_default(),
440                }),
441            );
442            Advance::Next(next, input)
443        }
444        None => Advance::Fail(
445            "States.NoChoiceMatched".to_string(),
446            format!("No choice rule matched and no Default in state '{name}'"),
447        ),
448    }
449}
450
451async fn run_wait_state(
452    name: &str,
453    state_def: &Value,
454    input: Value,
455    shared_state: &SharedStepFunctionsState,
456    execution_arn: &str,
457) -> Advance {
458    let entered_event_id = add_event(
459        shared_state,
460        execution_arn,
461        "WaitStateEntered",
462        0,
463        json!({
464            "name": name,
465            "input": serde_json::to_string(&input).unwrap_or_default(),
466        }),
467    );
468
469    execute_wait_state(state_def, &input).await;
470
471    add_event(
472        shared_state,
473        execution_arn,
474        "WaitStateExited",
475        entered_event_id,
476        json!({
477            "name": name,
478            "output": serde_json::to_string(&input).unwrap_or_default(),
479        }),
480    );
481
482    advance_from_next(state_def, input)
483}
484
485#[allow(clippy::too_many_arguments)]
486async fn run_task_state(
487    name: &str,
488    state_def: &Value,
489    input: Value,
490    delivery: &Option<Arc<DeliveryBus>>,
491    dynamodb_state: &Option<SharedDynamoDbState>,
492    shared_state: &SharedStepFunctionsState,
493    execution_arn: &str,
494) -> Advance {
495    let entered_event_id = add_event(
496        shared_state,
497        execution_arn,
498        "TaskStateEntered",
499        0,
500        json!({
501            "name": name,
502            "input": serde_json::to_string(&input).unwrap_or_default(),
503        }),
504    );
505
506    let result = execute_task_state(
507        state_def,
508        &input,
509        delivery,
510        dynamodb_state,
511        shared_state,
512        execution_arn,
513        entered_event_id,
514    )
515    .await;
516
517    match result {
518        Ok(output) => {
519            add_event(
520                shared_state,
521                execution_arn,
522                "TaskStateExited",
523                entered_event_id,
524                json!({
525                    "name": name,
526                    "output": serde_json::to_string(&output).unwrap_or_default(),
527                }),
528            );
529            advance_from_next(state_def, output)
530        }
531        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
532    }
533}
534
535#[allow(clippy::too_many_arguments)]
536async fn run_parallel_state(
537    name: &str,
538    state_def: &Value,
539    input: Value,
540    delivery: &Option<Arc<DeliveryBus>>,
541    dynamodb_state: &Option<SharedDynamoDbState>,
542    shared_state: &SharedStepFunctionsState,
543    execution_arn: &str,
544) -> Advance {
545    let entered_event_id = add_event(
546        shared_state,
547        execution_arn,
548        "ParallelStateEntered",
549        0,
550        json!({
551            "name": name,
552            "input": serde_json::to_string(&input).unwrap_or_default(),
553        }),
554    );
555
556    let result = execute_parallel_state(
557        state_def,
558        &input,
559        delivery,
560        dynamodb_state,
561        shared_state,
562        execution_arn,
563    )
564    .await;
565
566    match result {
567        Ok(output) => {
568            add_event(
569                shared_state,
570                execution_arn,
571                "ParallelStateExited",
572                entered_event_id,
573                json!({
574                    "name": name,
575                    "output": serde_json::to_string(&output).unwrap_or_default(),
576                }),
577            );
578            advance_from_next(state_def, output)
579        }
580        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
581    }
582}
583
584#[allow(clippy::too_many_arguments)]
585async fn run_map_state(
586    name: &str,
587    state_def: &Value,
588    input: Value,
589    delivery: &Option<Arc<DeliveryBus>>,
590    dynamodb_state: &Option<SharedDynamoDbState>,
591    shared_state: &SharedStepFunctionsState,
592    execution_arn: &str,
593) -> Advance {
594    let entered_event_id = add_event(
595        shared_state,
596        execution_arn,
597        "MapStateEntered",
598        0,
599        json!({
600            "name": name,
601            "input": serde_json::to_string(&input).unwrap_or_default(),
602        }),
603    );
604
605    let result = execute_map_state(
606        state_def,
607        &input,
608        delivery,
609        dynamodb_state,
610        shared_state,
611        execution_arn,
612    )
613    .await;
614
615    match result {
616        Ok(output) => {
617            add_event(
618                shared_state,
619                execution_arn,
620                "MapStateExited",
621                entered_event_id,
622                json!({
623                    "name": name,
624                    "output": serde_json::to_string(&output).unwrap_or_default(),
625                }),
626            );
627            advance_from_next(state_def, output)
628        }
629        Err((error, cause)) => advance_from_error(state_def, &input, error, cause),
630    }
631}
632
633/// Execute a Wait state: pause execution for a specified duration or until a timestamp.
634async fn execute_wait_state(state_def: &Value, input: &Value) {
635    if let Some(seconds) = state_def["Seconds"].as_u64() {
636        tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
637        return;
638    }
639
640    if let Some(path) = state_def["SecondsPath"].as_str() {
641        let val = crate::io_processing::resolve_path(input, path);
642        if let Some(seconds) = val.as_u64() {
643            tokio::time::sleep(tokio::time::Duration::from_secs(seconds)).await;
644        }
645        return;
646    }
647
648    if let Some(ts_str) = state_def["Timestamp"].as_str() {
649        if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
650            let now = Utc::now();
651            let target_utc = target.with_timezone(&chrono::Utc);
652            if target_utc > now {
653                let duration = (target_utc - now).to_std().unwrap_or_default();
654                tokio::time::sleep(duration).await;
655            }
656        }
657        return;
658    }
659
660    if let Some(path) = state_def["TimestampPath"].as_str() {
661        let val = crate::io_processing::resolve_path(input, path);
662        if let Some(ts_str) = val.as_str() {
663            if let Ok(target) = chrono::DateTime::parse_from_rfc3339(ts_str) {
664                let now = Utc::now();
665                let target_utc = target.with_timezone(&chrono::Utc);
666                if target_utc > now {
667                    let duration = (target_utc - now).to_std().unwrap_or_default();
668                    tokio::time::sleep(duration).await;
669                }
670            }
671        }
672        return;
673    }
674
675    warn!(
676        "Wait state has no valid Seconds, SecondsPath, Timestamp, or TimestampPath — skipping wait"
677    );
678}
679
680/// Execute a Pass state: apply InputPath, use Result if present, apply ResultPath and OutputPath.
681fn execute_pass_state(state_def: &Value, input: &Value) -> Value {
682    let input_path = state_def["InputPath"].as_str();
683    let result_path = state_def["ResultPath"].as_str();
684    let output_path = state_def["OutputPath"].as_str();
685
686    let effective_input = if input_path == Some("null") {
687        json!({})
688    } else {
689        apply_input_path(input, input_path)
690    };
691
692    let result = if let Some(r) = state_def.get("Result") {
693        r.clone()
694    } else {
695        effective_input.clone()
696    };
697
698    let after_result = if result_path == Some("null") {
699        input.clone()
700    } else {
701        apply_result_path(input, &result, result_path)
702    };
703
704    if output_path == Some("null") {
705        json!({})
706    } else {
707        apply_output_path(&after_result, output_path)
708    }
709}
710
711/// Execute a Task state: invoke the resource (Lambda, SQS, SNS, EventBridge, DynamoDB),
712/// apply I/O processing, handle Retry.
713async fn execute_task_state(
714    state_def: &Value,
715    input: &Value,
716    delivery: &Option<Arc<DeliveryBus>>,
717    dynamodb_state: &Option<SharedDynamoDbState>,
718    shared_state: &SharedStepFunctionsState,
719    execution_arn: &str,
720    entered_event_id: i64,
721) -> Result<Value, (String, String)> {
722    let resource = state_def["Resource"].as_str().unwrap_or("").to_string();
723
724    let input_path = state_def["InputPath"].as_str();
725    let result_path = state_def["ResultPath"].as_str();
726    let output_path = state_def["OutputPath"].as_str();
727
728    let effective_input = if input_path == Some("null") {
729        json!({})
730    } else {
731        apply_input_path(input, input_path)
732    };
733
734    let task_input = if let Some(params) = state_def.get("Parameters") {
735        apply_parameters(params, &effective_input)
736    } else {
737        effective_input
738    };
739
740    let retriers = state_def["Retry"].as_array().cloned().unwrap_or_default();
741    let timeout_seconds = state_def["TimeoutSeconds"].as_u64();
742    let heartbeat_seconds = state_def["HeartbeatSeconds"].as_u64();
743    let mut attempt = 0u32;
744
745    loop {
746        add_event(
747            shared_state,
748            execution_arn,
749            "TaskScheduled",
750            entered_event_id,
751            json!({
752                "resource": resource,
753                "region": "us-east-1",
754                "parameters": serde_json::to_string(&task_input).unwrap_or_default(),
755            }),
756        );
757
758        add_event(
759            shared_state,
760            execution_arn,
761            "TaskStarted",
762            entered_event_id,
763            json!({ "resource": resource }),
764        );
765
766        let invoke_result = invoke_resource(
767            &resource,
768            &task_input,
769            delivery,
770            dynamodb_state,
771            timeout_seconds,
772            heartbeat_seconds,
773            shared_state,
774        )
775        .await;
776
777        match invoke_result {
778            Ok(result) => {
779                add_event(
780                    shared_state,
781                    execution_arn,
782                    "TaskSucceeded",
783                    entered_event_id,
784                    json!({
785                        "resource": resource,
786                        "output": serde_json::to_string(&result).unwrap_or_default(),
787                    }),
788                );
789
790                let selected = if let Some(selector) = state_def.get("ResultSelector") {
791                    apply_parameters(selector, &result)
792                } else {
793                    result
794                };
795
796                let after_result = if result_path == Some("null") {
797                    input.clone()
798                } else {
799                    apply_result_path(input, &selected, result_path)
800                };
801
802                let output = if output_path == Some("null") {
803                    json!({})
804                } else {
805                    apply_output_path(&after_result, output_path)
806                };
807
808                return Ok(output);
809            }
810            Err((error, cause)) => {
811                add_event(
812                    shared_state,
813                    execution_arn,
814                    "TaskFailed",
815                    entered_event_id,
816                    json!({ "error": error, "cause": cause }),
817                );
818
819                if let Some(delay_ms) = should_retry(&retriers, &error, attempt) {
820                    attempt += 1;
821                    let actual_delay = delay_ms.min(5000);
822                    tokio::time::sleep(tokio::time::Duration::from_millis(actual_delay)).await;
823                    continue;
824                }
825
826                return Err((error, cause));
827            }
828        }
829    }
830}
831
832/// Execute a Parallel state: run all branches concurrently, collect results into an array.
833async fn execute_parallel_state(
834    state_def: &Value,
835    input: &Value,
836    delivery: &Option<Arc<DeliveryBus>>,
837    dynamodb_state: &Option<SharedDynamoDbState>,
838    shared_state: &SharedStepFunctionsState,
839    execution_arn: &str,
840) -> Result<Value, (String, String)> {
841    let input_path = state_def["InputPath"].as_str();
842    let result_path = state_def["ResultPath"].as_str();
843    let output_path = state_def["OutputPath"].as_str();
844
845    let effective_input = if input_path == Some("null") {
846        json!({})
847    } else {
848        apply_input_path(input, input_path)
849    };
850
851    let branches = state_def["Branches"]
852        .as_array()
853        .cloned()
854        .unwrap_or_default();
855
856    if branches.is_empty() {
857        return Err((
858            "States.Runtime".to_string(),
859            "Parallel state has no Branches".to_string(),
860        ));
861    }
862
863    // Spawn all branches concurrently
864    let mut handles = Vec::new();
865    for branch_def in &branches {
866        let branch = branch_def.clone();
867        let branch_input = effective_input.clone();
868        let delivery = delivery.clone();
869        let ddb = dynamodb_state.clone();
870        let state = shared_state.clone();
871        let arn = execution_arn.to_string();
872
873        handles.push(tokio::spawn(async move {
874            run_states(&branch, branch_input, &delivery, &ddb, &state, &arn).await
875        }));
876    }
877
878    // Collect results in order
879    let mut results = Vec::with_capacity(handles.len());
880    for handle in handles {
881        let result = handle.await.map_err(|e| {
882            (
883                "States.Runtime".to_string(),
884                format!("Branch execution panicked: {e}"),
885            )
886        })??;
887        results.push(result);
888    }
889
890    let branch_output = Value::Array(results);
891
892    // Apply ResultSelector if present
893    let selected = if let Some(selector) = state_def.get("ResultSelector") {
894        apply_parameters(selector, &branch_output)
895    } else {
896        branch_output
897    };
898
899    // Apply ResultPath
900    let after_result = if result_path == Some("null") {
901        input.clone()
902    } else {
903        apply_result_path(input, &selected, result_path)
904    };
905
906    // Apply OutputPath
907    let output = if output_path == Some("null") {
908        json!({})
909    } else {
910        apply_output_path(&after_result, output_path)
911    };
912
913    Ok(output)
914}
915
916/// Execute a Map state: iterate over an array and run a sub-state machine per item.
917async fn execute_map_state(
918    state_def: &Value,
919    input: &Value,
920    delivery: &Option<Arc<DeliveryBus>>,
921    dynamodb_state: &Option<SharedDynamoDbState>,
922    shared_state: &SharedStepFunctionsState,
923    execution_arn: &str,
924) -> Result<Value, (String, String)> {
925    let input_path = state_def["InputPath"].as_str();
926    let result_path = state_def["ResultPath"].as_str();
927    let output_path = state_def["OutputPath"].as_str();
928
929    let effective_input = if input_path == Some("null") {
930        json!({})
931    } else {
932        apply_input_path(input, input_path)
933    };
934
935    // Get the items to iterate over
936    let items_path = state_def["ItemsPath"].as_str().unwrap_or("$");
937    let items_value = crate::io_processing::resolve_path(&effective_input, items_path);
938    let items = items_value.as_array().cloned().unwrap_or_default();
939
940    // Get the iterator definition (ItemProcessor or Iterator for backwards compat)
941    let iterator_def = state_def
942        .get("ItemProcessor")
943        .or_else(|| state_def.get("Iterator"))
944        .cloned()
945        .ok_or_else(|| {
946            (
947                "States.Runtime".to_string(),
948                "Map state has no ItemProcessor or Iterator".to_string(),
949            )
950        })?;
951
952    let max_concurrency = state_def["MaxConcurrency"].as_u64().unwrap_or(0);
953    let effective_concurrency = if max_concurrency == 0 {
954        40
955    } else {
956        max_concurrency as usize
957    };
958
959    let semaphore = Arc::new(tokio::sync::Semaphore::new(effective_concurrency));
960
961    // Process all items
962    let mut handles = Vec::new();
963    for (index, item) in items.into_iter().enumerate() {
964        let iter_def = iterator_def.clone();
965        let delivery = delivery.clone();
966        let ddb = dynamodb_state.clone();
967        let state = shared_state.clone();
968        let arn = execution_arn.to_string();
969        let sem = semaphore.clone();
970
971        // Apply ItemSelector if present
972        let item_input = if let Some(selector) = state_def.get("ItemSelector") {
973            let mut ctx = serde_json::Map::new();
974            ctx.insert("value".to_string(), item.clone());
975            ctx.insert("index".to_string(), json!(index));
976            apply_parameters(selector, &Value::Object(ctx))
977        } else {
978            item
979        };
980
981        add_event(
982            shared_state,
983            execution_arn,
984            "MapIterationStarted",
985            0,
986            json!({ "index": index }),
987        );
988
989        handles.push(tokio::spawn(async move {
990            let _permit = sem
991                .acquire()
992                .await
993                .map_err(|_| ("States.Runtime".to_string(), "Semaphore closed".to_string()))?;
994            let result = run_states(&iter_def, item_input, &delivery, &ddb, &state, &arn).await;
995            Ok::<(usize, Result<Value, (String, String)>), (String, String)>((index, result))
996        }));
997    }
998
999    // Collect results in order
1000    let mut results: Vec<(usize, Value)> = Vec::with_capacity(handles.len());
1001    for handle in handles {
1002        let (index, result) = handle.await.map_err(|e| {
1003            (
1004                "States.Runtime".to_string(),
1005                format!("Map iteration panicked: {e}"),
1006            )
1007        })??;
1008
1009        match result {
1010            Ok(output) => {
1011                add_event(
1012                    shared_state,
1013                    execution_arn,
1014                    "MapIterationSucceeded",
1015                    0,
1016                    json!({ "index": index }),
1017                );
1018                results.push((index, output));
1019            }
1020            Err((error, cause)) => {
1021                add_event(
1022                    shared_state,
1023                    execution_arn,
1024                    "MapIterationFailed",
1025                    0,
1026                    json!({ "index": index, "error": error }),
1027                );
1028                return Err((error, cause));
1029            }
1030        }
1031    }
1032
1033    // Sort by index to maintain order
1034    results.sort_by_key(|(i, _)| *i);
1035    let map_output = Value::Array(results.into_iter().map(|(_, v)| v).collect());
1036
1037    // Apply ResultSelector if present
1038    let selected = if let Some(selector) = state_def.get("ResultSelector") {
1039        apply_parameters(selector, &map_output)
1040    } else {
1041        map_output
1042    };
1043
1044    // Apply ResultPath
1045    let after_result = if result_path == Some("null") {
1046        input.clone()
1047    } else {
1048        apply_result_path(input, &selected, result_path)
1049    };
1050
1051    // Apply OutputPath
1052    let output = if output_path == Some("null") {
1053        json!({})
1054    } else {
1055        apply_output_path(&after_result, output_path)
1056    };
1057
1058    Ok(output)
1059}
1060
1061/// Invoke a resource (Lambda function or SDK integration).
1062#[allow(clippy::too_many_arguments)]
1063async fn invoke_resource(
1064    resource: &str,
1065    input: &Value,
1066    delivery: &Option<Arc<DeliveryBus>>,
1067    dynamodb_state: &Option<SharedDynamoDbState>,
1068    timeout_seconds: Option<u64>,
1069    heartbeat_seconds: Option<u64>,
1070    shared_state: &SharedStepFunctionsState,
1071) -> Result<Value, (String, String)> {
1072    // Direct activity ARN: arn:aws:states:<region>:<account>:activity:<name>
1073    if resource.contains(":states:") && resource.contains(":activity:") {
1074        return invoke_activity(
1075            resource,
1076            input,
1077            shared_state,
1078            timeout_seconds,
1079            heartbeat_seconds,
1080        )
1081        .await;
1082    }
1083
1084    // Direct Lambda ARN: arn:aws:lambda:<region>:<account>:function:<name>
1085    if resource.contains(":lambda:") && resource.contains(":function:") {
1086        return invoke_lambda_direct(resource, input, delivery, timeout_seconds).await;
1087    }
1088
1089    // SDK integration patterns: arn:aws:states:::<service>:<action>
1090    if resource.starts_with("arn:aws:states:::lambda:invoke") {
1091        let function_name = input["FunctionName"].as_str().unwrap_or("");
1092        let payload = if let Some(p) = input.get("Payload") {
1093            p.clone()
1094        } else {
1095            input.clone()
1096        };
1097        return invoke_lambda_direct(function_name, &payload, delivery, timeout_seconds).await;
1098    }
1099
1100    if resource.starts_with("arn:aws:states:::sqs:sendMessage") {
1101        return invoke_sqs_send_message(input, delivery);
1102    }
1103
1104    if resource.starts_with("arn:aws:states:::sns:publish") {
1105        return invoke_sns_publish(input, delivery);
1106    }
1107
1108    if resource.starts_with("arn:aws:states:::events:putEvents") {
1109        return invoke_eventbridge_put_events(input, delivery);
1110    }
1111
1112    if resource.starts_with("arn:aws:states:::dynamodb:getItem") {
1113        return invoke_dynamodb_get_item(input, dynamodb_state);
1114    }
1115
1116    if resource.starts_with("arn:aws:states:::dynamodb:putItem") {
1117        return invoke_dynamodb_put_item(input, dynamodb_state);
1118    }
1119
1120    if resource.starts_with("arn:aws:states:::dynamodb:deleteItem") {
1121        return invoke_dynamodb_delete_item(input, dynamodb_state);
1122    }
1123
1124    if resource.starts_with("arn:aws:states:::dynamodb:updateItem") {
1125        return invoke_dynamodb_update_item(input, dynamodb_state);
1126    }
1127
1128    Err((
1129        "States.TaskFailed".to_string(),
1130        format!("Unsupported resource: {resource}"),
1131    ))
1132}
1133
1134/// Send a message to an SQS queue via DeliveryBus.
1135fn invoke_sqs_send_message(
1136    input: &Value,
1137    delivery: &Option<Arc<DeliveryBus>>,
1138) -> Result<Value, (String, String)> {
1139    let delivery = delivery.as_ref().ok_or_else(|| {
1140        (
1141            "States.TaskFailed".to_string(),
1142            "No delivery bus configured for SQS".to_string(),
1143        )
1144    })?;
1145
1146    let queue_url = input["QueueUrl"].as_str().ok_or_else(|| {
1147        (
1148            "States.TaskFailed".to_string(),
1149            "Missing QueueUrl in SQS sendMessage parameters".to_string(),
1150        )
1151    })?;
1152
1153    let message_body = input["MessageBody"]
1154        .as_str()
1155        .map(|s| s.to_string())
1156        .unwrap_or_else(|| {
1157            // If MessageBody is not a string, serialize the value
1158            serde_json::to_string(&input["MessageBody"]).unwrap_or_default()
1159        });
1160
1161    // Convert QueueUrl to ARN format for the delivery bus
1162    // QueueUrl format: http://.../<account>/<queue-name>
1163    // ARN format: arn:aws:sqs:<region>:<account>:<queue-name>
1164    let queue_arn = queue_url_to_arn(queue_url);
1165
1166    delivery.send_to_sqs(&queue_arn, &message_body, &HashMap::new());
1167
1168    Ok(json!({
1169        "MessageId": uuid::Uuid::new_v4().to_string(),
1170        "MD5OfMessageBody": md5_hex(&message_body),
1171    }))
1172}
1173
1174/// Publish a message to an SNS topic via DeliveryBus.
1175fn invoke_sns_publish(
1176    input: &Value,
1177    delivery: &Option<Arc<DeliveryBus>>,
1178) -> Result<Value, (String, String)> {
1179    let delivery = delivery.as_ref().ok_or_else(|| {
1180        (
1181            "States.TaskFailed".to_string(),
1182            "No delivery bus configured for SNS".to_string(),
1183        )
1184    })?;
1185
1186    let topic_arn = input["TopicArn"].as_str().ok_or_else(|| {
1187        (
1188            "States.TaskFailed".to_string(),
1189            "Missing TopicArn in SNS publish parameters".to_string(),
1190        )
1191    })?;
1192
1193    let message = input["Message"]
1194        .as_str()
1195        .map(|s| s.to_string())
1196        .unwrap_or_else(|| serde_json::to_string(&input["Message"]).unwrap_or_default());
1197
1198    let subject = input["Subject"].as_str();
1199
1200    delivery.publish_to_sns(topic_arn, &message, subject);
1201
1202    Ok(json!({
1203        "MessageId": uuid::Uuid::new_v4().to_string(),
1204    }))
1205}
1206
1207/// Put events onto an EventBridge bus via DeliveryBus.
1208fn invoke_eventbridge_put_events(
1209    input: &Value,
1210    delivery: &Option<Arc<DeliveryBus>>,
1211) -> Result<Value, (String, String)> {
1212    let delivery = delivery.as_ref().ok_or_else(|| {
1213        (
1214            "States.TaskFailed".to_string(),
1215            "No delivery bus configured for EventBridge".to_string(),
1216        )
1217    })?;
1218
1219    let entries = input["Entries"]
1220        .as_array()
1221        .ok_or_else(|| {
1222            (
1223                "States.TaskFailed".to_string(),
1224                "Missing Entries in EventBridge putEvents parameters".to_string(),
1225            )
1226        })?
1227        .clone();
1228
1229    let mut event_ids = Vec::new();
1230    for entry in &entries {
1231        let source = entry["Source"].as_str().unwrap_or("aws.stepfunctions");
1232        let detail_type = entry["DetailType"].as_str().unwrap_or("StepFunctionsEvent");
1233        let detail = entry["Detail"]
1234            .as_str()
1235            .map(|s| s.to_string())
1236            .unwrap_or_else(|| serde_json::to_string(&entry["Detail"]).unwrap_or("{}".to_string()));
1237        let bus_name = entry["EventBusName"].as_str().unwrap_or("default");
1238
1239        delivery.put_event_to_eventbridge(source, detail_type, &detail, bus_name);
1240        event_ids.push(uuid::Uuid::new_v4().to_string());
1241    }
1242
1243    Ok(json!({
1244        "Entries": event_ids.iter().map(|id| json!({"EventId": id})).collect::<Vec<_>>(),
1245        "FailedEntryCount": 0,
1246    }))
1247}
1248
1249/// Get an item from DynamoDB via direct state access.
1250fn invoke_dynamodb_get_item(
1251    input: &Value,
1252    dynamodb_state: &Option<SharedDynamoDbState>,
1253) -> Result<Value, (String, String)> {
1254    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1255        (
1256            "States.TaskFailed".to_string(),
1257            "No DynamoDB state configured".to_string(),
1258        )
1259    })?;
1260
1261    let table_name = input["TableName"].as_str().ok_or_else(|| {
1262        (
1263            "States.TaskFailed".to_string(),
1264            "Missing TableName in DynamoDB getItem parameters".to_string(),
1265        )
1266    })?;
1267
1268    let key = input
1269        .get("Key")
1270        .and_then(|k| k.as_object())
1271        .ok_or_else(|| {
1272            (
1273                "States.TaskFailed".to_string(),
1274                "Missing Key in DynamoDB getItem parameters".to_string(),
1275            )
1276        })?;
1277
1278    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1279
1280    let __mas = ddb.read();
1281    let state = __mas.default_ref();
1282    let table = state.tables.get(table_name).ok_or_else(|| {
1283        (
1284            "States.TaskFailed".to_string(),
1285            format!("Table '{table_name}' not found"),
1286        )
1287    })?;
1288
1289    let item = table
1290        .find_item_index(&key_map)
1291        .map(|idx| table.items[idx].clone());
1292
1293    match item {
1294        Some(item_map) => {
1295            let item_value: serde_json::Map<String, Value> = item_map.into_iter().collect();
1296            Ok(json!({ "Item": item_value }))
1297        }
1298        None => Ok(json!({})),
1299    }
1300}
1301
1302/// Put an item into DynamoDB via direct state access.
1303fn invoke_dynamodb_put_item(
1304    input: &Value,
1305    dynamodb_state: &Option<SharedDynamoDbState>,
1306) -> Result<Value, (String, String)> {
1307    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1308        (
1309            "States.TaskFailed".to_string(),
1310            "No DynamoDB state configured".to_string(),
1311        )
1312    })?;
1313
1314    let table_name = input["TableName"].as_str().ok_or_else(|| {
1315        (
1316            "States.TaskFailed".to_string(),
1317            "Missing TableName in DynamoDB putItem parameters".to_string(),
1318        )
1319    })?;
1320
1321    let item = input
1322        .get("Item")
1323        .and_then(|i| i.as_object())
1324        .ok_or_else(|| {
1325            (
1326                "States.TaskFailed".to_string(),
1327                "Missing Item in DynamoDB putItem parameters".to_string(),
1328            )
1329        })?;
1330
1331    let item_map: HashMap<String, Value> =
1332        item.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1333
1334    let mut __mas = ddb.write();
1335    let state = __mas.default_mut();
1336    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1337        (
1338            "States.TaskFailed".to_string(),
1339            format!("Table '{table_name}' not found"),
1340        )
1341    })?;
1342
1343    // Replace existing item with same key, or insert new
1344    if let Some(idx) = table.find_item_index(&item_map) {
1345        table.items[idx] = item_map;
1346    } else {
1347        table.items.push(item_map);
1348    }
1349
1350    Ok(json!({}))
1351}
1352
1353/// Delete an item from DynamoDB via direct state access.
1354fn invoke_dynamodb_delete_item(
1355    input: &Value,
1356    dynamodb_state: &Option<SharedDynamoDbState>,
1357) -> Result<Value, (String, String)> {
1358    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1359        (
1360            "States.TaskFailed".to_string(),
1361            "No DynamoDB state configured".to_string(),
1362        )
1363    })?;
1364
1365    let table_name = input["TableName"].as_str().ok_or_else(|| {
1366        (
1367            "States.TaskFailed".to_string(),
1368            "Missing TableName in DynamoDB deleteItem parameters".to_string(),
1369        )
1370    })?;
1371
1372    let key = input
1373        .get("Key")
1374        .and_then(|k| k.as_object())
1375        .ok_or_else(|| {
1376            (
1377                "States.TaskFailed".to_string(),
1378                "Missing Key in DynamoDB deleteItem parameters".to_string(),
1379            )
1380        })?;
1381
1382    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1383
1384    let mut __mas = ddb.write();
1385    let state = __mas.default_mut();
1386    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1387        (
1388            "States.TaskFailed".to_string(),
1389            format!("Table '{table_name}' not found"),
1390        )
1391    })?;
1392
1393    if let Some(idx) = table.find_item_index(&key_map) {
1394        table.items.remove(idx);
1395    }
1396
1397    Ok(json!({}))
1398}
1399
1400/// Update an item in DynamoDB via direct state access. Honors UpdateExpression
1401/// SET (with `=`, `+`, `-`, `if_not_exists`), REMOVE, ADD (numeric), and
1402/// DELETE (set elements). Creates the item from `Key` plus the expression
1403/// when no matching item exists, mirroring DynamoDB upsert semantics.
1404fn invoke_dynamodb_update_item(
1405    input: &Value,
1406    dynamodb_state: &Option<SharedDynamoDbState>,
1407) -> Result<Value, (String, String)> {
1408    let ddb = dynamodb_state.as_ref().ok_or_else(|| {
1409        (
1410            "States.TaskFailed".to_string(),
1411            "No DynamoDB state configured".to_string(),
1412        )
1413    })?;
1414
1415    let table_name = input["TableName"].as_str().ok_or_else(|| {
1416        (
1417            "States.TaskFailed".to_string(),
1418            "Missing TableName in DynamoDB updateItem parameters".to_string(),
1419        )
1420    })?;
1421
1422    let key = input
1423        .get("Key")
1424        .and_then(|k| k.as_object())
1425        .ok_or_else(|| {
1426            (
1427                "States.TaskFailed".to_string(),
1428                "Missing Key in DynamoDB updateItem parameters".to_string(),
1429            )
1430        })?;
1431
1432    let key_map: HashMap<String, Value> = key.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
1433
1434    let mut __mas = ddb.write();
1435    let state = __mas.default_mut();
1436    let table = state.tables.get_mut(table_name).ok_or_else(|| {
1437        (
1438            "States.TaskFailed".to_string(),
1439            format!("Table '{table_name}' not found"),
1440        )
1441    })?;
1442
1443    // Parse UpdateExpression to apply SET operations
1444    if let Some(update_expr) = input["UpdateExpression"].as_str() {
1445        let attr_values = input
1446            .get("ExpressionAttributeValues")
1447            .and_then(|v| v.as_object())
1448            .cloned()
1449            .unwrap_or_default();
1450        let attr_names = input
1451            .get("ExpressionAttributeNames")
1452            .and_then(|v| v.as_object())
1453            .cloned()
1454            .unwrap_or_default();
1455
1456        if let Some(idx) = table.find_item_index(&key_map) {
1457            apply_update_expression(
1458                &mut table.items[idx],
1459                update_expr,
1460                &attr_values,
1461                &attr_names,
1462            );
1463        } else {
1464            // Create new item with key + update expression values
1465            let mut new_item = key_map;
1466            apply_update_expression(&mut new_item, update_expr, &attr_values, &attr_names);
1467            table.items.push(new_item);
1468        }
1469    }
1470
1471    Ok(json!({}))
1472}
1473
1474/// Apply a simple SET UpdateExpression to an item.
1475fn apply_update_expression(
1476    item: &mut HashMap<String, Value>,
1477    expr: &str,
1478    attr_values: &serde_json::Map<String, Value>,
1479    attr_names: &serde_json::Map<String, Value>,
1480) {
1481    // DynamoDB UpdateExpression has up to four clauses: SET, REMOVE, ADD, DELETE.
1482    // The clauses are separated by whitespace; we tokenize by walking the string
1483    // and switching mode whenever we hit a keyword, then split the body of each
1484    // clause on commas.
1485    let clauses = split_update_clauses(expr);
1486    for (clause, body) in clauses {
1487        match clause {
1488            UpdateClause::Set => apply_set(item, &body, attr_values, attr_names),
1489            UpdateClause::Remove => apply_remove(item, &body, attr_names),
1490            UpdateClause::Add => apply_add(item, &body, attr_values, attr_names),
1491            UpdateClause::Delete => apply_delete(item, &body, attr_values, attr_names),
1492        }
1493    }
1494}
1495
1496#[derive(Clone, Copy)]
1497enum UpdateClause {
1498    Set,
1499    Remove,
1500    Add,
1501    Delete,
1502}
1503
1504fn split_update_clauses(expr: &str) -> Vec<(UpdateClause, String)> {
1505    let mut out = Vec::new();
1506    let mut current: Option<UpdateClause> = None;
1507    let mut buf = String::new();
1508    for token in expr.split_whitespace() {
1509        let upper = token.to_ascii_uppercase();
1510        let next_clause = match upper.as_str() {
1511            "SET" => Some(UpdateClause::Set),
1512            "REMOVE" => Some(UpdateClause::Remove),
1513            "ADD" => Some(UpdateClause::Add),
1514            "DELETE" => Some(UpdateClause::Delete),
1515            _ => None,
1516        };
1517        if let Some(nc) = next_clause {
1518            if let Some(prev) = current.take() {
1519                out.push((prev, buf.trim().to_string()));
1520                buf.clear();
1521            }
1522            current = Some(nc);
1523        } else if current.is_some() {
1524            if !buf.is_empty() {
1525                buf.push(' ');
1526            }
1527            buf.push_str(token);
1528        }
1529    }
1530    if let Some(c) = current {
1531        out.push((c, buf.trim().to_string()));
1532    }
1533    out
1534}
1535
1536fn resolve_attr_name(token: &str, attr_names: &serde_json::Map<String, Value>) -> String {
1537    if token.starts_with('#') {
1538        attr_names
1539            .get(token)
1540            .and_then(|v| v.as_str())
1541            .unwrap_or(token)
1542            .to_string()
1543    } else {
1544        token.to_string()
1545    }
1546}
1547
1548fn apply_set(
1549    item: &mut HashMap<String, Value>,
1550    body: &str,
1551    attr_values: &serde_json::Map<String, Value>,
1552    attr_names: &serde_json::Map<String, Value>,
1553) {
1554    for assignment in split_top_commas(body) {
1555        let Some((lhs, rhs)) = assignment.split_once('=') else {
1556            continue;
1557        };
1558        let attr_name = resolve_attr_name(lhs.trim(), attr_names);
1559        let value = evaluate_set_rhs(rhs.trim(), &attr_name, item, attr_values, attr_names);
1560        if let Some(v) = value {
1561            item.insert(attr_name, v);
1562        }
1563    }
1564}
1565
1566fn evaluate_set_rhs(
1567    rhs: &str,
1568    attr_name: &str,
1569    item: &HashMap<String, Value>,
1570    attr_values: &serde_json::Map<String, Value>,
1571    attr_names: &serde_json::Map<String, Value>,
1572) -> Option<Value> {
1573    // if_not_exists(path, :val)
1574    if let Some(args) = rhs
1575        .strip_prefix("if_not_exists(")
1576        .and_then(|s| s.strip_suffix(')'))
1577    {
1578        let parts: Vec<&str> = args.splitn(2, ',').collect();
1579        if parts.len() == 2 {
1580            let path = resolve_attr_name(parts[0].trim(), attr_names);
1581            if item.contains_key(&path) {
1582                return item.get(&path).cloned();
1583            }
1584            return resolve_value(parts[1].trim(), attr_values);
1585        }
1586        return None;
1587    }
1588    // path + :inc / path - :dec — DynamoDB stores numbers as {"N":"<str>"}.
1589    for op in ['+', '-'] {
1590        if let Some((left, right)) = split_top_op(rhs, op) {
1591            let left = left.trim();
1592            let right = right.trim();
1593            let left_val = if left.starts_with(':') {
1594                resolve_value(left, attr_values)
1595            } else {
1596                let name = resolve_attr_name(left, attr_names);
1597                item.get(&name).cloned()
1598            };
1599            let right_val = if right.starts_with(':') {
1600                resolve_value(right, attr_values)
1601            } else {
1602                let name = resolve_attr_name(right, attr_names);
1603                item.get(&name).cloned()
1604            };
1605            return arithmetic(left_val.as_ref(), op, right_val.as_ref());
1606        }
1607    }
1608    // bare value or attribute reference
1609    if rhs.starts_with(':') {
1610        return resolve_value(rhs, attr_values);
1611    }
1612    if rhs.starts_with('#') {
1613        let _ = attr_name;
1614        let name = resolve_attr_name(rhs, attr_names);
1615        return item.get(&name).cloned();
1616    }
1617    None
1618}
1619
1620fn arithmetic(left: Option<&Value>, op: char, right: Option<&Value>) -> Option<Value> {
1621    let lf = number_from_dynamo(left?)?;
1622    let rf = number_from_dynamo(right?)?;
1623    let out = match op {
1624        '+' => lf + rf,
1625        '-' => lf - rf,
1626        _ => return None,
1627    };
1628    Some(json!({ "N": format_number(out) }))
1629}
1630
1631fn number_from_dynamo(v: &Value) -> Option<f64> {
1632    v.get("N")?.as_str()?.parse().ok()
1633}
1634
1635fn format_number(n: f64) -> String {
1636    // i64::MAX is 2^63-1 which is not exactly representable in f64; `i64::MAX as f64`
1637    // rounds up to 2^63, and casting 2^63 back to i64 saturates. Use an exclusive upper
1638    // bound so we never hand `n as i64` a value it can't faithfully represent.
1639    if n.fract() == 0.0 && n.is_finite() && n >= i64::MIN as f64 && n < i64::MAX as f64 {
1640        format!("{}", n as i64)
1641    } else {
1642        format!("{n}")
1643    }
1644}
1645
1646fn resolve_value(token: &str, attr_values: &serde_json::Map<String, Value>) -> Option<Value> {
1647    attr_values.get(token).cloned()
1648}
1649
1650fn apply_remove(
1651    item: &mut HashMap<String, Value>,
1652    body: &str,
1653    attr_names: &serde_json::Map<String, Value>,
1654) {
1655    for path in split_top_commas(body) {
1656        let name = resolve_attr_name(path.trim(), attr_names);
1657        item.remove(&name);
1658    }
1659}
1660
1661fn apply_add(
1662    item: &mut HashMap<String, Value>,
1663    body: &str,
1664    attr_values: &serde_json::Map<String, Value>,
1665    attr_names: &serde_json::Map<String, Value>,
1666) {
1667    // ADD #path :inc — numeric increment, with the value initialized to :inc when
1668    // the attribute is absent. Set union (NS/SS/BS) is not implemented; ADD on a
1669    // non-numeric attribute is a no-op.
1670    for clause in split_top_commas(body) {
1671        let mut parts = clause.split_whitespace();
1672        let Some(path) = parts.next() else { continue };
1673        let Some(value_ref) = parts.next() else {
1674            continue;
1675        };
1676        let attr_name = resolve_attr_name(path, attr_names);
1677        let Some(delta) = resolve_value(value_ref, attr_values) else {
1678            continue;
1679        };
1680        let current = item.get(&attr_name).cloned();
1681        let next = match (current.as_ref(), &delta) {
1682            (None, _) => delta.clone(),
1683            (Some(cur), _) => arithmetic(Some(cur), '+', Some(&delta)).unwrap_or(delta.clone()),
1684        };
1685        item.insert(attr_name, next);
1686    }
1687}
1688
1689fn apply_delete(
1690    item: &mut HashMap<String, Value>,
1691    body: &str,
1692    attr_values: &serde_json::Map<String, Value>,
1693    attr_names: &serde_json::Map<String, Value>,
1694) {
1695    // DELETE #path :elements — remove each element of the set value from the
1696    // attribute's set. Drops the attribute when the resulting set is empty.
1697    for clause in split_top_commas(body) {
1698        let mut parts = clause.split_whitespace();
1699        let Some(path) = parts.next() else { continue };
1700        let Some(value_ref) = parts.next() else {
1701            continue;
1702        };
1703        let attr_name = resolve_attr_name(path, attr_names);
1704        let Some(elements) = resolve_value(value_ref, attr_values) else {
1705            continue;
1706        };
1707        let Some(current) = item.get_mut(&attr_name) else {
1708            continue;
1709        };
1710        for set_kind in ["SS", "NS", "BS"] {
1711            if let (Some(cur_arr), Some(rem_arr)) = (
1712                current.get_mut(set_kind).and_then(|v| v.as_array_mut()),
1713                elements.get(set_kind).and_then(|v| v.as_array()),
1714            ) {
1715                let to_remove: std::collections::HashSet<String> = rem_arr
1716                    .iter()
1717                    .filter_map(|v| v.as_str().map(String::from))
1718                    .collect();
1719                cur_arr.retain(|v| !v.as_str().is_some_and(|s| to_remove.contains(s)));
1720                if cur_arr.is_empty() {
1721                    item.remove(&attr_name);
1722                }
1723                break;
1724            }
1725        }
1726    }
1727}
1728
1729fn split_top_commas(s: &str) -> Vec<String> {
1730    // Splits on `,` while respecting paren depth (so commas inside
1731    // `if_not_exists(a, :b)` don't split the assignment).
1732    let mut out = Vec::new();
1733    let mut depth = 0i32;
1734    let mut buf = String::new();
1735    for c in s.chars() {
1736        match c {
1737            '(' => {
1738                depth += 1;
1739                buf.push(c);
1740            }
1741            ')' => {
1742                depth -= 1;
1743                buf.push(c);
1744            }
1745            ',' if depth == 0 => {
1746                out.push(std::mem::take(&mut buf).trim().to_string());
1747            }
1748            _ => buf.push(c),
1749        }
1750    }
1751    if !buf.trim().is_empty() {
1752        out.push(buf.trim().to_string());
1753    }
1754    out
1755}
1756
1757fn split_top_op(s: &str, op: char) -> Option<(&str, &str)> {
1758    let mut depth = 0i32;
1759    for (i, c) in s.char_indices() {
1760        match c {
1761            '(' => depth += 1,
1762            ')' => depth -= 1,
1763            c if c == op && depth == 0 && i > 0 => {
1764                return Some((&s[..i], &s[i + c.len_utf8()..]));
1765            }
1766            _ => {}
1767        }
1768    }
1769    None
1770}
1771
1772/// Convert an SQS queue URL to an ARN.
1773/// QueueUrl format: http://localhost:4566/123456789012/my-queue
1774fn queue_url_to_arn(url: &str) -> String {
1775    let parts: Vec<&str> = url.rsplitn(3, '/').collect();
1776    if parts.len() >= 2 {
1777        let queue_name = parts[0];
1778        let account_id = parts[1];
1779        Arn::new("sqs", "us-east-1", account_id, queue_name).to_string()
1780    } else {
1781        url.to_string()
1782    }
1783}
1784
1785/// Compute MD5 hex digest for SQS message response format.
1786fn md5_hex(data: &str) -> String {
1787    use md5::Digest;
1788    let result = md5::Md5::digest(data.as_bytes());
1789    format!("{result:032x}")
1790}
1791
1792/// Invoke a Lambda function directly via DeliveryBus.
1793async fn invoke_lambda_direct(
1794    function_arn: &str,
1795    input: &Value,
1796    delivery: &Option<Arc<DeliveryBus>>,
1797    timeout_seconds: Option<u64>,
1798) -> Result<Value, (String, String)> {
1799    let delivery = delivery.as_ref().ok_or_else(|| {
1800        (
1801            "States.TaskFailed".to_string(),
1802            "No delivery bus configured for Lambda invocation".to_string(),
1803        )
1804    })?;
1805
1806    let payload = serde_json::to_string(input).unwrap_or_default();
1807
1808    let invoke_future = delivery.invoke_lambda(function_arn, &payload);
1809
1810    let result = if let Some(timeout) = timeout_seconds {
1811        match tokio::time::timeout(tokio::time::Duration::from_secs(timeout), invoke_future).await {
1812            Ok(r) => r,
1813            Err(_) => {
1814                return Err((
1815                    "States.Timeout".to_string(),
1816                    format!("Task timed out after {timeout} seconds"),
1817                ));
1818            }
1819        }
1820    } else {
1821        invoke_future.await
1822    };
1823
1824    match result {
1825        Some(Ok(bytes)) => {
1826            let response_str = String::from_utf8_lossy(&bytes);
1827            let value: Value =
1828                serde_json::from_str(&response_str).unwrap_or(json!(response_str.to_string()));
1829            Ok(value)
1830        }
1831        Some(Err(e)) => Err(("States.TaskFailed".to_string(), e)),
1832        None => {
1833            // No runtime available — return empty result
1834            Ok(json!({}))
1835        }
1836    }
1837}
1838
1839/// Invoke an activity worker. Inserts a `PENDING` token into shared state
1840/// so a worker can claim it via `GetActivityTask`, then polls until the
1841/// worker calls `SendTaskSuccess` / `SendTaskFailure` or the heartbeat /
1842/// timeout windows expire.
1843async fn invoke_activity(
1844    activity_arn: &str,
1845    input: &Value,
1846    shared_state: &SharedStepFunctionsState,
1847    timeout_seconds: Option<u64>,
1848    heartbeat_seconds: Option<u64>,
1849) -> Result<Value, (String, String)> {
1850    use crate::state::TaskTokenState;
1851
1852    // Activity must exist (look up across accounts via ARN segment).
1853    let activity_account = activity_arn.split(':').nth(4).unwrap_or("").to_string();
1854    {
1855        let accounts = shared_state.read();
1856        let exists = accounts
1857            .get(&activity_account)
1858            .map(|s| s.activities.contains_key(activity_arn))
1859            .unwrap_or(false);
1860        if !exists {
1861            return Err((
1862                "States.TaskFailed".to_string(),
1863                format!("Activity does not exist: {activity_arn}"),
1864            ));
1865        }
1866    }
1867
1868    let token = format!(
1869        "FCToken-{}-{}",
1870        chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0),
1871        uuid::Uuid::new_v4().simple(),
1872    );
1873    let now = chrono::Utc::now();
1874    let input_str = serde_json::to_string(input).unwrap_or_else(|_| "{}".to_string());
1875    {
1876        let mut accounts = shared_state.write();
1877        let state = accounts.get_or_create(&activity_account);
1878        state.task_tokens.insert(
1879            token.clone(),
1880            TaskTokenState {
1881                activity_arn: activity_arn.to_string(),
1882                status: "PENDING".to_string(),
1883                output: None,
1884                error: None,
1885                cause: None,
1886                input: Some(input_str),
1887                created_at: now,
1888                last_heartbeat_at: None,
1889                heartbeat_seconds: heartbeat_seconds.map(|s| s as i64),
1890                timeout_seconds: timeout_seconds.map(|s| s as i64),
1891            },
1892        );
1893    }
1894
1895    // Poll for completion. Default poll cadence 200ms; 1 hour absolute
1896    // ceiling so a stuck activity can't block the interpreter forever
1897    // when no TimeoutSeconds is set on the Task state.
1898    let absolute_deadline =
1899        std::time::Instant::now() + std::time::Duration::from_secs(timeout_seconds.unwrap_or(3600));
1900    loop {
1901        let now_ts = chrono::Utc::now();
1902        let snapshot = {
1903            let accounts = shared_state.read();
1904            accounts
1905                .get(&activity_account)
1906                .and_then(|s| s.task_tokens.get(&token).cloned())
1907        };
1908        let Some(entry) = snapshot else {
1909            return Err((
1910                "States.TaskFailed".to_string(),
1911                "Activity task token disappeared".to_string(),
1912            ));
1913        };
1914        match entry.status.as_str() {
1915            "SUCCEEDED" => {
1916                cleanup_token(shared_state, &activity_account, &token);
1917                let output = entry.output.unwrap_or_else(|| "{}".to_string());
1918                let value: Value = serde_json::from_str(&output).unwrap_or(Value::String(output));
1919                return Ok(value);
1920            }
1921            "FAILED" => {
1922                cleanup_token(shared_state, &activity_account, &token);
1923                return Err((
1924                    entry
1925                        .error
1926                        .unwrap_or_else(|| "States.TaskFailed".to_string()),
1927                    entry.cause.unwrap_or_default(),
1928                ));
1929            }
1930            _ => {}
1931        }
1932        // Heartbeat timeout: only enforced once a worker has picked up the
1933        // task (status == IN_PROGRESS) and a heartbeat window is set.
1934        if entry.status == "IN_PROGRESS" {
1935            if let Some(hb) = entry.heartbeat_seconds {
1936                let last = entry.last_heartbeat_at.unwrap_or(entry.created_at);
1937                if (now_ts - last).num_seconds() > hb {
1938                    cleanup_token(shared_state, &activity_account, &token);
1939                    return Err((
1940                        "States.HeartbeatTimeout".to_string(),
1941                        format!("Activity worker missed heartbeat ({hb}s window)"),
1942                    ));
1943                }
1944            }
1945        }
1946        if std::time::Instant::now() >= absolute_deadline {
1947            cleanup_token(shared_state, &activity_account, &token);
1948            let secs = timeout_seconds.unwrap_or(3600);
1949            return Err((
1950                "States.Timeout".to_string(),
1951                format!("Activity timed out after {secs} seconds"),
1952            ));
1953        }
1954        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1955    }
1956}
1957
1958fn cleanup_token(shared_state: &SharedStepFunctionsState, account_id: &str, token: &str) {
1959    let mut accounts = shared_state.write();
1960    if let Some(state) = accounts.get_mut(account_id) {
1961        state.task_tokens.remove(token);
1962    }
1963}
1964
1965/// Apply Parameters template: keys ending with .$ are treated as JsonPath references.
1966fn apply_parameters(template: &Value, input: &Value) -> Value {
1967    match template {
1968        Value::Object(map) => {
1969            let mut result = serde_json::Map::new();
1970            for (key, value) in map {
1971                if let Some(stripped) = key.strip_suffix(".$") {
1972                    if let Some(path) = value.as_str() {
1973                        result.insert(
1974                            stripped.to_string(),
1975                            crate::io_processing::resolve_path(input, path),
1976                        );
1977                    }
1978                } else {
1979                    result.insert(key.clone(), apply_parameters(value, input));
1980                }
1981            }
1982            Value::Object(result)
1983        }
1984        Value::Array(arr) => Value::Array(arr.iter().map(|v| apply_parameters(v, input)).collect()),
1985        other => other.clone(),
1986    }
1987}
1988
1989enum NextState {
1990    Name(String),
1991    End,
1992    Error(String),
1993}
1994
1995fn next_state(state_def: &Value) -> NextState {
1996    if state_def["End"].as_bool() == Some(true) {
1997        return NextState::End;
1998    }
1999    match state_def["Next"].as_str() {
2000        Some(next) => NextState::Name(next.to_string()),
2001        None => NextState::Error("State has neither 'End' nor 'Next' field".to_string()),
2002    }
2003}
2004
2005/// Find the first `Catch` clause on `state_def` that matches `error` and
2006/// apply its `ResultPath` to produce the state to transition to and the
2007/// new effective input. Returns None when no catcher applies, in which
2008/// case the error should propagate up.
2009fn apply_state_catcher(
2010    state_def: &Value,
2011    effective_input: &Value,
2012    error: &str,
2013    cause: &str,
2014) -> Option<(String, Value)> {
2015    let catchers = state_def["Catch"].as_array().cloned().unwrap_or_default();
2016    let (next, result_path) = find_catcher(&catchers, error)?;
2017    let error_output = json!({
2018        "Error": error,
2019        "Cause": cause,
2020    });
2021    let new_input = apply_result_path(effective_input, &error_output, result_path.as_deref());
2022    Some((next, new_input))
2023}
2024
2025/// Extract account ID from an execution ARN (`arn:aws:states:region:account_id:...`).
2026fn account_id_from_arn(arn: &str) -> &str {
2027    arn.split(':').nth(4).unwrap_or("000000000000")
2028}
2029
2030fn add_event(
2031    state: &SharedStepFunctionsState,
2032    execution_arn: &str,
2033    event_type: &str,
2034    previous_event_id: i64,
2035    details: Value,
2036) -> i64 {
2037    let account_id = account_id_from_arn(execution_arn).to_string();
2038    let mut accounts = state.write();
2039    let s = accounts.get_or_create(&account_id);
2040    if let Some(exec) = s.executions.get_mut(execution_arn) {
2041        let id = exec.history_events.len() as i64 + 1;
2042        exec.history_events.push(HistoryEvent {
2043            id,
2044            event_type: event_type.to_string(),
2045            timestamp: Utc::now(),
2046            previous_event_id,
2047            details,
2048        });
2049        id
2050    } else {
2051        0
2052    }
2053}
2054
2055fn succeed_execution(state: &SharedStepFunctionsState, execution_arn: &str, output: &Value) {
2056    let account_id = account_id_from_arn(execution_arn).to_string();
2057    // Check terminal status before recording events to avoid inconsistent history
2058    {
2059        let accounts = state.read();
2060        if let Some(s) = accounts.get(&account_id) {
2061            if let Some(exec) = s.executions.get(execution_arn) {
2062                if exec.status != ExecutionStatus::Running {
2063                    return;
2064                }
2065            }
2066        }
2067    }
2068
2069    let output_str = serde_json::to_string(output).unwrap_or_default();
2070
2071    add_event(
2072        state,
2073        execution_arn,
2074        "ExecutionSucceeded",
2075        0,
2076        json!({ "output": output_str }),
2077    );
2078
2079    let mut accounts = state.write();
2080    let s = accounts.get_or_create(&account_id);
2081    if let Some(exec) = s.executions.get_mut(execution_arn) {
2082        exec.status = ExecutionStatus::Succeeded;
2083        exec.output = Some(output_str);
2084        exec.stop_date = Some(Utc::now());
2085    }
2086}
2087
2088fn fail_execution(state: &SharedStepFunctionsState, execution_arn: &str, error: &str, cause: &str) {
2089    let account_id = account_id_from_arn(execution_arn).to_string();
2090    // Check terminal status before recording events to avoid inconsistent history
2091    {
2092        let accounts = state.read();
2093        if let Some(s) = accounts.get(&account_id) {
2094            if let Some(exec) = s.executions.get(execution_arn) {
2095                if exec.status != ExecutionStatus::Running {
2096                    return;
2097                }
2098            }
2099        }
2100    }
2101
2102    add_event(
2103        state,
2104        execution_arn,
2105        "ExecutionFailed",
2106        0,
2107        json!({ "error": error, "cause": cause }),
2108    );
2109
2110    let mut accounts = state.write();
2111    let s = accounts.get_or_create(&account_id);
2112    if let Some(exec) = s.executions.get_mut(execution_arn) {
2113        exec.status = ExecutionStatus::Failed;
2114        exec.error = Some(error.to_string());
2115        exec.cause = Some(cause.to_string());
2116        exec.stop_date = Some(Utc::now());
2117    }
2118}
2119
2120#[cfg(test)]
2121mod tests {
2122    use super::*;
2123    use crate::state::Execution;
2124    use parking_lot::RwLock;
2125    use std::sync::Arc;
2126
2127    fn make_state() -> SharedStepFunctionsState {
2128        Arc::new(RwLock::new(
2129            fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
2130        ))
2131    }
2132
2133    fn create_execution(state: &SharedStepFunctionsState, arn: &str, input: Option<String>) {
2134        let mut accounts = state.write();
2135        let s = accounts.get_or_create("123456789012");
2136        s.executions.insert(
2137            arn.to_string(),
2138            Execution {
2139                execution_arn: arn.to_string(),
2140                state_machine_arn: "arn:aws:states:us-east-1:123456789012:stateMachine:test"
2141                    .to_string(),
2142                state_machine_name: "test".to_string(),
2143                name: "exec-1".to_string(),
2144                status: ExecutionStatus::Running,
2145                input,
2146                output: None,
2147                start_date: Utc::now(),
2148                stop_date: None,
2149                error: None,
2150                cause: None,
2151                history_events: vec![],
2152            },
2153        );
2154    }
2155
2156    #[tokio::test]
2157    async fn test_simple_pass_state() {
2158        let state = make_state();
2159        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2160        create_execution(&state, arn, Some(r#"{"hello":"world"}"#.to_string()));
2161
2162        let definition = json!({
2163            "StartAt": "PassState",
2164            "States": {
2165                "PassState": {
2166                    "Type": "Pass",
2167                    "Result": {"processed": true},
2168                    "End": true
2169                }
2170            }
2171        })
2172        .to_string();
2173
2174        execute_state_machine(
2175            state.clone(),
2176            arn.to_string(),
2177            definition,
2178            Some(r#"{"hello":"world"}"#.to_string()),
2179            None,
2180            None,
2181        )
2182        .await;
2183
2184        let __a = state.read();
2185        let s = __a.default_ref();
2186        let exec = s.executions.get(arn).unwrap();
2187        assert_eq!(exec.status, ExecutionStatus::Succeeded);
2188        assert!(exec.output.is_some());
2189        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2190        assert_eq!(output, json!({"processed": true}));
2191    }
2192
2193    #[tokio::test]
2194    async fn test_pass_chain() {
2195        let state = make_state();
2196        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2197        create_execution(&state, arn, Some(r#"{}"#.to_string()));
2198
2199        let definition = json!({
2200            "StartAt": "First",
2201            "States": {
2202                "First": {
2203                    "Type": "Pass",
2204                    "Result": "step1",
2205                    "ResultPath": "$.first",
2206                    "Next": "Second"
2207                },
2208                "Second": {
2209                    "Type": "Pass",
2210                    "Result": "step2",
2211                    "ResultPath": "$.second",
2212                    "End": true
2213                }
2214            }
2215        })
2216        .to_string();
2217
2218        execute_state_machine(
2219            state.clone(),
2220            arn.to_string(),
2221            definition,
2222            Some("{}".to_string()),
2223            None,
2224            None,
2225        )
2226        .await;
2227
2228        let __a = state.read();
2229        let s = __a.default_ref();
2230        let exec = s.executions.get(arn).unwrap();
2231        assert_eq!(exec.status, ExecutionStatus::Succeeded);
2232        let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2233        assert_eq!(output["first"], json!("step1"));
2234        assert_eq!(output["second"], json!("step2"));
2235    }
2236
2237    #[tokio::test]
2238    async fn test_succeed_state() {
2239        let state = make_state();
2240        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2241        create_execution(&state, arn, Some(r#"{"data": "value"}"#.to_string()));
2242
2243        let definition = json!({
2244            "StartAt": "Done",
2245            "States": {
2246                "Done": {
2247                    "Type": "Succeed"
2248                }
2249            }
2250        })
2251        .to_string();
2252
2253        execute_state_machine(
2254            state.clone(),
2255            arn.to_string(),
2256            definition,
2257            Some(r#"{"data": "value"}"#.to_string()),
2258            None,
2259            None,
2260        )
2261        .await;
2262
2263        let __a = state.read();
2264        let s = __a.default_ref();
2265        let exec = s.executions.get(arn).unwrap();
2266        assert_eq!(exec.status, ExecutionStatus::Succeeded);
2267    }
2268
2269    #[tokio::test]
2270    async fn test_fail_state() {
2271        let state = make_state();
2272        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2273        create_execution(&state, arn, None);
2274
2275        let definition = json!({
2276            "StartAt": "FailState",
2277            "States": {
2278                "FailState": {
2279                    "Type": "Fail",
2280                    "Error": "CustomError",
2281                    "Cause": "Something went wrong"
2282                }
2283            }
2284        })
2285        .to_string();
2286
2287        execute_state_machine(state.clone(), arn.to_string(), definition, None, None, None).await;
2288
2289        let __a = state.read();
2290        let s = __a.default_ref();
2291        let exec = s.executions.get(arn).unwrap();
2292        assert_eq!(exec.status, ExecutionStatus::Failed);
2293        assert_eq!(exec.error.as_deref(), Some("CustomError"));
2294        assert_eq!(exec.cause.as_deref(), Some("Something went wrong"));
2295    }
2296
2297    #[tokio::test]
2298    async fn test_history_events_recorded() {
2299        let state = make_state();
2300        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:exec-1";
2301        create_execution(&state, arn, Some("{}".to_string()));
2302
2303        let definition = json!({
2304            "StartAt": "PassState",
2305            "States": {
2306                "PassState": {
2307                    "Type": "Pass",
2308                    "End": true
2309                }
2310            }
2311        })
2312        .to_string();
2313
2314        execute_state_machine(
2315            state.clone(),
2316            arn.to_string(),
2317            definition,
2318            Some("{}".to_string()),
2319            None,
2320            None,
2321        )
2322        .await;
2323
2324        let __a = state.read();
2325        let s = __a.default_ref();
2326        let exec = s.executions.get(arn).unwrap();
2327        let event_types: Vec<&str> = exec
2328            .history_events
2329            .iter()
2330            .map(|e| e.event_type.as_str())
2331            .collect();
2332        assert_eq!(
2333            event_types,
2334            vec![
2335                "ExecutionStarted",
2336                "PassStateEntered",
2337                "PassStateExited",
2338                "ExecutionSucceeded"
2339            ]
2340        );
2341    }
2342
2343    fn drive(state: &SharedStepFunctionsState, arn: &str, def: Value, input: Option<&str>) {
2344        create_execution(state, arn, input.map(|s| s.to_string()));
2345        let fut = execute_state_machine(
2346            state.clone(),
2347            arn.to_string(),
2348            def.to_string(),
2349            input.map(|s| s.to_string()),
2350            None,
2351            None,
2352        );
2353        let rt = tokio::runtime::Builder::new_current_thread()
2354            .enable_time()
2355            .build()
2356            .unwrap();
2357        rt.block_on(fut);
2358    }
2359
2360    fn read_exec<R>(
2361        state: &SharedStepFunctionsState,
2362        arn: &str,
2363        f: impl FnOnce(&Execution) -> R,
2364    ) -> R {
2365        let __a = state.read();
2366        let s = __a.default_ref();
2367        f(s.executions.get(arn).expect("execution missing"))
2368    }
2369
2370    fn arn_for(name: &str) -> String {
2371        format!("arn:aws:states:us-east-1:123456789012:execution:test:{name}")
2372    }
2373
2374    // ── Pass state: InputPath / OutputPath ───────────────────────────
2375
2376    #[test]
2377    fn pass_state_input_output_path_select_fields() {
2378        let state = make_state();
2379        let arn = arn_for("pass-paths");
2380        let def = json!({
2381            "StartAt": "P",
2382            "States": {
2383                "P": {
2384                    "Type": "Pass",
2385                    "InputPath": "$.inner",
2386                    "OutputPath": "$.kept",
2387                    "End": true
2388                }
2389            }
2390        });
2391        drive(
2392            &state,
2393            &arn,
2394            def,
2395            Some(r#"{"inner":{"kept":"yes","dropped":true},"sibling":1}"#),
2396        );
2397
2398        read_exec(&state, &arn, |exec| {
2399            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2400            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2401            assert_eq!(output, json!("yes"));
2402        });
2403    }
2404
2405    // ── Succeed / Fail variants ──────────────────────────────────────
2406
2407    #[test]
2408    fn succeed_state_honors_input_path_null() {
2409        let state = make_state();
2410        let arn = arn_for("succeed-null");
2411        let def = json!({
2412            "StartAt": "S",
2413            "States": { "S": { "Type": "Succeed", "InputPath": "null" } }
2414        });
2415        drive(&state, &arn, def, Some(r#"{"a":1}"#));
2416
2417        read_exec(&state, &arn, |exec| {
2418            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2419            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2420            assert_eq!(output, json!({}));
2421        });
2422    }
2423
2424    #[test]
2425    fn fail_state_defaults_when_fields_missing() {
2426        let state = make_state();
2427        let arn = arn_for("fail-default");
2428        let def = json!({
2429            "StartAt": "F",
2430            "States": { "F": { "Type": "Fail" } }
2431        });
2432        drive(&state, &arn, def, None);
2433
2434        read_exec(&state, &arn, |exec| {
2435            assert_eq!(exec.status, ExecutionStatus::Failed);
2436            assert_eq!(exec.error.as_deref(), Some("States.Fail"));
2437            assert_eq!(exec.cause.as_deref(), Some(""));
2438        });
2439    }
2440
2441    // ── Choice ───────────────────────────────────────────────────────
2442
2443    fn choice_def() -> Value {
2444        json!({
2445            "StartAt": "C",
2446            "States": {
2447                "C": {
2448                    "Type": "Choice",
2449                    "Choices": [
2450                        {
2451                            "Variable": "$.n",
2452                            "NumericGreaterThan": 10,
2453                            "Next": "Big"
2454                        }
2455                    ],
2456                    "Default": "Small"
2457                },
2458                "Big":   { "Type": "Pass", "Result": "big",   "End": true },
2459                "Small": { "Type": "Pass", "Result": "small", "End": true }
2460            }
2461        })
2462    }
2463
2464    #[test]
2465    fn choice_routes_to_matching_branch() {
2466        let state = make_state();
2467        let arn = arn_for("choice-big");
2468        drive(&state, &arn, choice_def(), Some(r#"{"n":42}"#));
2469
2470        read_exec(&state, &arn, |exec| {
2471            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2472            assert_eq!(
2473                serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2474                json!("big")
2475            );
2476        });
2477    }
2478
2479    #[test]
2480    fn choice_falls_through_to_default() {
2481        let state = make_state();
2482        let arn = arn_for("choice-default");
2483        drive(&state, &arn, choice_def(), Some(r#"{"n":3}"#));
2484
2485        read_exec(&state, &arn, |exec| {
2486            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2487            assert_eq!(
2488                serde_json::from_str::<Value>(exec.output.as_ref().unwrap()).unwrap(),
2489                json!("small")
2490            );
2491        });
2492    }
2493
2494    #[test]
2495    fn choice_no_match_and_no_default_fails() {
2496        let state = make_state();
2497        let arn = arn_for("choice-nomatch");
2498        let def = json!({
2499            "StartAt": "C",
2500            "States": {
2501                "C": {
2502                    "Type": "Choice",
2503                    "Choices": [
2504                        { "Variable": "$.n", "NumericEquals": 1, "Next": "End1" }
2505                    ]
2506                },
2507                "End1": { "Type": "Pass", "End": true }
2508            }
2509        });
2510        drive(&state, &arn, def, Some(r#"{"n":99}"#));
2511
2512        read_exec(&state, &arn, |exec| {
2513            assert_eq!(exec.status, ExecutionStatus::Failed);
2514            assert_eq!(exec.error.as_deref(), Some("States.NoChoiceMatched"));
2515        });
2516    }
2517
2518    // ── Wait ─────────────────────────────────────────────────────────
2519
2520    #[test]
2521    fn wait_seconds_then_advances() {
2522        let state = make_state();
2523        let arn = arn_for("wait-secs");
2524        let def = json!({
2525            "StartAt": "W",
2526            "States": {
2527                "W": { "Type": "Wait", "Seconds": 0, "Next": "Done" },
2528                "Done": { "Type": "Succeed" }
2529            }
2530        });
2531        drive(&state, &arn, def, Some(r#"{"k":1}"#));
2532
2533        read_exec(&state, &arn, |exec| {
2534            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2535        });
2536    }
2537
2538    #[test]
2539    fn wait_timestamp_in_past_is_noop() {
2540        let state = make_state();
2541        let arn = arn_for("wait-past");
2542        let def = json!({
2543            "StartAt": "W",
2544            "States": {
2545                "W": {
2546                    "Type": "Wait",
2547                    "Timestamp": "2000-01-01T00:00:00Z",
2548                    "End": true
2549                }
2550            }
2551        });
2552        drive(&state, &arn, def, Some(r#"{"k":1}"#));
2553
2554        read_exec(&state, &arn, |exec| {
2555            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2556        });
2557    }
2558
2559    #[test]
2560    fn wait_without_any_duration_falls_through() {
2561        let state = make_state();
2562        let arn = arn_for("wait-none");
2563        let def = json!({
2564            "StartAt": "W",
2565            "States": { "W": { "Type": "Wait", "End": true } }
2566        });
2567        drive(&state, &arn, def, None);
2568
2569        read_exec(&state, &arn, |exec| {
2570            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2571        });
2572    }
2573
2574    // ── Parallel ─────────────────────────────────────────────────────
2575
2576    #[test]
2577    fn parallel_runs_two_pass_branches_and_collects_results() {
2578        let state = make_state();
2579        let arn = arn_for("parallel-ok");
2580        let def = json!({
2581            "StartAt": "P",
2582            "States": {
2583                "P": {
2584                    "Type": "Parallel",
2585                    "End": true,
2586                    "Branches": [
2587                        {
2588                            "StartAt": "A",
2589                            "States": { "A": { "Type": "Pass", "Result": "a", "End": true } }
2590                        },
2591                        {
2592                            "StartAt": "B",
2593                            "States": { "B": { "Type": "Pass", "Result": "b", "End": true } }
2594                        }
2595                    ]
2596                }
2597            }
2598        });
2599        drive(&state, &arn, def, Some(r#"{}"#));
2600
2601        read_exec(&state, &arn, |exec| {
2602            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2603            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2604            assert_eq!(output, json!(["a", "b"]));
2605        });
2606    }
2607
2608    #[test]
2609    fn parallel_empty_branches_fails() {
2610        let state = make_state();
2611        let arn = arn_for("parallel-empty");
2612        let def = json!({
2613            "StartAt": "P",
2614            "States": { "P": { "Type": "Parallel", "Branches": [], "End": true } }
2615        });
2616        drive(&state, &arn, def, None);
2617
2618        read_exec(&state, &arn, |exec| {
2619            assert_eq!(exec.status, ExecutionStatus::Failed);
2620            assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2621        });
2622    }
2623
2624    // ── Map ──────────────────────────────────────────────────────────
2625
2626    #[test]
2627    fn map_iterates_pass_state_over_items_path() {
2628        let state = make_state();
2629        let arn = arn_for("map-ok");
2630        let def = json!({
2631            "StartAt": "M",
2632            "States": {
2633                "M": {
2634                    "Type": "Map",
2635                    "ItemsPath": "$.items",
2636                    "Iterator": {
2637                        "StartAt": "Item",
2638                        "States": { "Item": { "Type": "Pass", "End": true } }
2639                    },
2640                    "End": true
2641                }
2642            }
2643        });
2644        drive(&state, &arn, def, Some(r#"{"items":[1,2,3]}"#));
2645
2646        read_exec(&state, &arn, |exec| {
2647            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2648            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2649            assert_eq!(output, json!([1, 2, 3]));
2650        });
2651    }
2652
2653    // ── Task: unsupported resources / delivery == None ───────────────
2654
2655    #[test]
2656    fn task_unsupported_resource_propagates_failure() {
2657        let state = make_state();
2658        let arn = arn_for("task-unsupported");
2659        let def = json!({
2660            "StartAt": "T",
2661            "States": {
2662                "T": {
2663                    "Type": "Task",
2664                    "Resource": "arn:aws:states:::nothing:here",
2665                    "End": true
2666                }
2667            }
2668        });
2669        drive(&state, &arn, def, None);
2670
2671        read_exec(&state, &arn, |exec| {
2672            assert_eq!(exec.status, ExecutionStatus::Failed);
2673            assert_eq!(exec.error.as_deref(), Some("States.TaskFailed"));
2674            assert!(exec.cause.as_deref().unwrap().contains("Unsupported"));
2675        });
2676    }
2677
2678    #[test]
2679    fn task_sqs_send_without_delivery_fails() {
2680        let state = make_state();
2681        let arn = arn_for("task-sqs-nodelivery");
2682        let def = json!({
2683            "StartAt": "T",
2684            "States": {
2685                "T": {
2686                    "Type": "Task",
2687                    "Resource": "arn:aws:states:::sqs:sendMessage",
2688                    "Parameters": { "QueueUrl": "http://localhost/123/q", "MessageBody": "hi" },
2689                    "End": true
2690                }
2691            }
2692        });
2693        drive(&state, &arn, def, Some("{}"));
2694
2695        read_exec(&state, &arn, |exec| {
2696            assert_eq!(exec.status, ExecutionStatus::Failed);
2697            assert!(exec.cause.as_deref().unwrap().contains("delivery bus"));
2698        });
2699    }
2700
2701    // ── Task: Catch clause ───────────────────────────────────────────
2702
2703    #[test]
2704    fn task_catch_routes_error_into_handler() {
2705        let state = make_state();
2706        let arn = arn_for("task-catch");
2707        let def = json!({
2708            "StartAt": "T",
2709            "States": {
2710                "T": {
2711                    "Type": "Task",
2712                    "Resource": "arn:aws:states:::nothing:here",
2713                    "Catch": [
2714                        { "ErrorEquals": ["States.ALL"], "Next": "Handler", "ResultPath": "$.err" }
2715                    ],
2716                    "End": true
2717                },
2718                "Handler": { "Type": "Pass", "End": true }
2719            }
2720        });
2721        drive(&state, &arn, def, Some(r#"{"orig":"v"}"#));
2722
2723        read_exec(&state, &arn, |exec| {
2724            assert_eq!(exec.status, ExecutionStatus::Succeeded);
2725            let output: Value = serde_json::from_str(exec.output.as_ref().unwrap()).unwrap();
2726            // Handler is Pass with no Result — effective input flows through.
2727            assert_eq!(output["orig"], json!("v"));
2728            assert_eq!(output["err"]["Error"], json!("States.TaskFailed"));
2729        });
2730    }
2731
2732    // ── Top-level errors: definition / start-at / missing states ─────
2733
2734    #[test]
2735    fn invalid_definition_json_fails_execution() {
2736        let state = make_state();
2737        let arn = arn_for("bad-json");
2738        create_execution(&state, &arn, None);
2739        let rt = tokio::runtime::Builder::new_current_thread()
2740            .enable_time()
2741            .build()
2742            .unwrap();
2743        rt.block_on(execute_state_machine(
2744            state.clone(),
2745            arn.clone(),
2746            "not json{".to_string(),
2747            None,
2748            None,
2749            None,
2750        ));
2751
2752        read_exec(&state, &arn, |exec| {
2753            assert_eq!(exec.status, ExecutionStatus::Failed);
2754            assert_eq!(exec.error.as_deref(), Some("States.Runtime"));
2755            assert!(exec.cause.as_deref().unwrap().contains("Failed to parse"));
2756        });
2757    }
2758
2759    #[test]
2760    fn missing_start_at_fails_execution() {
2761        let state = make_state();
2762        let arn = arn_for("no-startat");
2763        let def = json!({ "States": { "X": { "Type": "Succeed" } } });
2764        drive(&state, &arn, def, None);
2765
2766        read_exec(&state, &arn, |exec| {
2767            assert_eq!(exec.status, ExecutionStatus::Failed);
2768            assert!(exec.cause.as_deref().unwrap().contains("StartAt"));
2769        });
2770    }
2771
2772    #[test]
2773    fn next_state_not_found_fails_execution() {
2774        let state = make_state();
2775        let arn = arn_for("dangling-next");
2776        let def = json!({
2777            "StartAt": "A",
2778            "States": { "A": { "Type": "Pass", "Next": "DoesNotExist" } }
2779        });
2780        drive(&state, &arn, def, None);
2781
2782        read_exec(&state, &arn, |exec| {
2783            assert_eq!(exec.status, ExecutionStatus::Failed);
2784            assert!(exec.cause.as_deref().unwrap().contains("not found"));
2785        });
2786    }
2787
2788    #[test]
2789    fn unsupported_state_type_fails_execution() {
2790        let state = make_state();
2791        let arn = arn_for("bad-type");
2792        let def = json!({
2793            "StartAt": "X",
2794            "States": { "X": { "Type": "WatChoo", "End": true } }
2795        });
2796        drive(&state, &arn, def, None);
2797
2798        read_exec(&state, &arn, |exec| {
2799            assert_eq!(exec.status, ExecutionStatus::Failed);
2800            assert!(exec
2801                .cause
2802                .as_deref()
2803                .unwrap()
2804                .contains("Unsupported state type"));
2805        });
2806    }
2807
2808    // ── Pure helpers ─────────────────────────────────────────────────
2809
2810    #[test]
2811    fn apply_parameters_substitutes_json_path_refs() {
2812        let template = json!({
2813            "literal": "constant",
2814            "ref.$": "$.user.id",
2815            "nested": { "inner.$": "$.user.name" },
2816            "list": [ { "x.$": "$.user.id" } ]
2817        });
2818        let input = json!({ "user": { "id": 42, "name": "zoe" } });
2819        let out = apply_parameters(&template, &input);
2820        assert_eq!(out["literal"], json!("constant"));
2821        assert_eq!(out["ref"], json!(42));
2822        assert_eq!(out["nested"]["inner"], json!("zoe"));
2823        assert_eq!(out["list"][0]["x"], json!(42));
2824    }
2825
2826    #[test]
2827    fn next_state_returns_end_name_or_error() {
2828        match next_state(&json!({ "End": true })) {
2829            NextState::End => {}
2830            _ => panic!("expected End"),
2831        }
2832        match next_state(&json!({ "Next": "A" })) {
2833            NextState::Name(n) => assert_eq!(n, "A"),
2834            _ => panic!("expected Name"),
2835        }
2836        match next_state(&json!({})) {
2837            NextState::Error(_) => {}
2838            _ => panic!("expected Error"),
2839        }
2840    }
2841
2842    #[test]
2843    fn apply_state_catcher_matches_wildcard_and_stashes_error() {
2844        let state_def = json!({
2845            "Catch": [
2846                { "ErrorEquals": ["States.ALL"], "Next": "H", "ResultPath": "$.caught" }
2847            ]
2848        });
2849        let input = json!({ "a": 1 });
2850        let (next, new_input) =
2851            apply_state_catcher(&state_def, &input, "Boom", "it exploded").unwrap();
2852        assert_eq!(next, "H");
2853        assert_eq!(new_input["a"], json!(1));
2854        assert_eq!(new_input["caught"]["Error"], json!("Boom"));
2855        assert_eq!(new_input["caught"]["Cause"], json!("it exploded"));
2856    }
2857
2858    #[test]
2859    fn apply_state_catcher_returns_none_without_match() {
2860        let state_def = json!({
2861            "Catch": [
2862                { "ErrorEquals": ["Specific.Error"], "Next": "H" }
2863            ]
2864        });
2865        let input = json!({});
2866        assert!(apply_state_catcher(&state_def, &input, "Other", "why").is_none());
2867    }
2868
2869    #[test]
2870    fn queue_url_to_arn_parses_account_and_name() {
2871        assert_eq!(
2872            queue_url_to_arn("http://sqs.local:4566/123456789012/my-queue"),
2873            "arn:aws:sqs:us-east-1:123456789012:my-queue"
2874        );
2875    }
2876
2877    #[test]
2878    fn queue_url_to_arn_falls_back_for_unparseable_input() {
2879        assert_eq!(queue_url_to_arn("bad"), "bad");
2880    }
2881
2882    #[test]
2883    fn md5_hex_is_deterministic_and_32_chars() {
2884        let a = md5_hex("hello");
2885        let b = md5_hex("hello");
2886        assert_eq!(a, b);
2887        assert_eq!(a.len(), 32);
2888        assert_ne!(a, md5_hex("world"));
2889    }
2890
2891    #[test]
2892    fn apply_update_expression_sets_direct_and_aliased_attrs() {
2893        let mut item: HashMap<String, Value> = HashMap::new();
2894        item.insert("id".to_string(), json!({"S": "1"}));
2895
2896        let mut attr_values = serde_json::Map::new();
2897        attr_values.insert(":n".to_string(), json!({"S": "Alice"}));
2898        attr_values.insert(":c".to_string(), json!({"N": "5"}));
2899
2900        let mut attr_names = serde_json::Map::new();
2901        attr_names.insert("#name".to_string(), json!("name"));
2902
2903        apply_update_expression(
2904            &mut item,
2905            "SET #name = :n, count = :c",
2906            &attr_values,
2907            &attr_names,
2908        );
2909
2910        assert_eq!(item.get("name").unwrap(), &json!({"S": "Alice"}));
2911        assert_eq!(item.get("count").unwrap(), &json!({"N": "5"}));
2912        assert_eq!(item.get("id").unwrap(), &json!({"S": "1"}));
2913    }
2914
2915    #[test]
2916    fn apply_update_expression_accepts_lowercase_set_keyword() {
2917        let mut item: HashMap<String, Value> = HashMap::new();
2918        let mut attr_values = serde_json::Map::new();
2919        attr_values.insert(":v".to_string(), json!({"S": "x"}));
2920        apply_update_expression(
2921            &mut item,
2922            "set field = :v",
2923            &attr_values,
2924            &serde_json::Map::new(),
2925        );
2926        assert_eq!(item.get("field").unwrap(), &json!({"S": "x"}));
2927    }
2928
2929    #[test]
2930    fn apply_update_expression_set_arithmetic_increments_counter() {
2931        let mut item: HashMap<String, Value> = HashMap::new();
2932        item.insert("count".to_string(), json!({"N": "10"}));
2933        let mut attr_values = serde_json::Map::new();
2934        attr_values.insert(":inc".to_string(), json!({"N": "3"}));
2935        apply_update_expression(
2936            &mut item,
2937            "SET count = count + :inc",
2938            &attr_values,
2939            &serde_json::Map::new(),
2940        );
2941        assert_eq!(item.get("count").unwrap(), &json!({"N": "13"}));
2942    }
2943
2944    #[test]
2945    fn apply_update_expression_set_decrement() {
2946        let mut item: HashMap<String, Value> = HashMap::new();
2947        item.insert("count".to_string(), json!({"N": "10"}));
2948        let mut attr_values = serde_json::Map::new();
2949        attr_values.insert(":d".to_string(), json!({"N": "4"}));
2950        apply_update_expression(
2951            &mut item,
2952            "SET count = count - :d",
2953            &attr_values,
2954            &serde_json::Map::new(),
2955        );
2956        assert_eq!(item.get("count").unwrap(), &json!({"N": "6"}));
2957    }
2958
2959    #[test]
2960    fn apply_update_expression_remove_drops_attributes() {
2961        let mut item: HashMap<String, Value> = HashMap::new();
2962        item.insert("a".to_string(), json!({"S": "x"}));
2963        item.insert("b".to_string(), json!({"S": "y"}));
2964        item.insert("c".to_string(), json!({"S": "z"}));
2965        apply_update_expression(
2966            &mut item,
2967            "REMOVE a, c",
2968            &serde_json::Map::new(),
2969            &serde_json::Map::new(),
2970        );
2971        assert!(!item.contains_key("a"));
2972        assert!(item.contains_key("b"));
2973        assert!(!item.contains_key("c"));
2974    }
2975
2976    #[test]
2977    fn apply_update_expression_add_increments_existing_or_initializes() {
2978        // Existing attribute -> sum
2979        let mut item: HashMap<String, Value> = HashMap::new();
2980        item.insert("count".to_string(), json!({"N": "5"}));
2981        let mut attr_values = serde_json::Map::new();
2982        attr_values.insert(":inc".to_string(), json!({"N": "2"}));
2983        apply_update_expression(
2984            &mut item,
2985            "ADD count :inc",
2986            &attr_values,
2987            &serde_json::Map::new(),
2988        );
2989        assert_eq!(item.get("count").unwrap(), &json!({"N": "7"}));
2990
2991        // Absent attribute -> initialized to value
2992        let mut item2: HashMap<String, Value> = HashMap::new();
2993        apply_update_expression(
2994            &mut item2,
2995            "ADD count :inc",
2996            &attr_values,
2997            &serde_json::Map::new(),
2998        );
2999        assert_eq!(item2.get("count").unwrap(), &json!({"N": "2"}));
3000    }
3001
3002    #[test]
3003    fn apply_update_expression_delete_removes_set_elements() {
3004        let mut item: HashMap<String, Value> = HashMap::new();
3005        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c"]}));
3006        let mut attr_values = serde_json::Map::new();
3007        attr_values.insert(":rm".to_string(), json!({"SS": ["b"]}));
3008        apply_update_expression(
3009            &mut item,
3010            "DELETE tags :rm",
3011            &attr_values,
3012            &serde_json::Map::new(),
3013        );
3014        assert_eq!(item.get("tags").unwrap(), &json!({"SS": ["a", "c"]}));
3015    }
3016
3017    #[test]
3018    fn apply_update_expression_if_not_exists_initializes_only_when_absent() {
3019        // Absent -> initialize.
3020        let mut item: HashMap<String, Value> = HashMap::new();
3021        let mut attr_values = serde_json::Map::new();
3022        attr_values.insert(":zero".to_string(), json!({"N": "0"}));
3023        apply_update_expression(
3024            &mut item,
3025            "SET count = if_not_exists(count, :zero)",
3026            &attr_values,
3027            &serde_json::Map::new(),
3028        );
3029        assert_eq!(item.get("count").unwrap(), &json!({"N": "0"}));
3030
3031        // Present -> preserve existing.
3032        let mut item2: HashMap<String, Value> = HashMap::new();
3033        item2.insert("count".to_string(), json!({"N": "42"}));
3034        apply_update_expression(
3035            &mut item2,
3036            "SET count = if_not_exists(count, :zero)",
3037            &attr_values,
3038            &serde_json::Map::new(),
3039        );
3040        assert_eq!(item2.get("count").unwrap(), &json!({"N": "42"}));
3041    }
3042
3043    #[test]
3044    fn apply_update_expression_combines_clauses() {
3045        let mut item: HashMap<String, Value> = HashMap::new();
3046        item.insert("a".to_string(), json!({"S": "old"}));
3047        item.insert("b".to_string(), json!({"N": "1"}));
3048        item.insert("c".to_string(), json!({"S": "drop"}));
3049        let mut attr_values = serde_json::Map::new();
3050        attr_values.insert(":new".to_string(), json!({"S": "new"}));
3051        attr_values.insert(":one".to_string(), json!({"N": "1"}));
3052        apply_update_expression(
3053            &mut item,
3054            "SET a = :new ADD b :one REMOVE c",
3055            &attr_values,
3056            &serde_json::Map::new(),
3057        );
3058        assert_eq!(item.get("a").unwrap(), &json!({"S": "new"}));
3059        assert_eq!(item.get("b").unwrap(), &json!({"N": "2"}));
3060        assert!(!item.contains_key("c"));
3061    }
3062
3063    // ── DynamoDB invoke: error paths without delivery bus ────────────
3064
3065    #[test]
3066    fn task_dynamodb_get_item_without_state_fails() {
3067        let state = make_state();
3068        let arn = arn_for("ddb-get-nostate");
3069        let def = json!({
3070            "StartAt": "T",
3071            "States": {
3072                "T": {
3073                    "Type": "Task",
3074                    "Resource": "arn:aws:states:::dynamodb:getItem",
3075                    "Parameters": { "TableName": "t", "Key": { "id": { "S": "1" } } },
3076                    "End": true
3077                }
3078            }
3079        });
3080        drive(&state, &arn, def, Some("{}"));
3081        read_exec(&state, &arn, |exec| {
3082            assert_eq!(exec.status, ExecutionStatus::Failed);
3083            assert!(exec.cause.as_deref().unwrap().contains("DynamoDB"));
3084        });
3085    }
3086
3087    // ── Terminal guards on succeed/fail helpers ──────────────────────
3088
3089    #[test]
3090    fn succeed_execution_is_noop_when_already_terminal() {
3091        let state = make_state();
3092        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already";
3093        create_execution(&state, arn, None);
3094        {
3095            let mut __a = state.write();
3096            let s = __a.default_mut();
3097            s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Failed;
3098        }
3099        succeed_execution(&state, arn, &json!({"x":1}));
3100        let __a = state.read();
3101        let s = __a.default_ref();
3102        let exec = s.executions.get(arn).unwrap();
3103        assert_eq!(exec.status, ExecutionStatus::Failed);
3104        assert!(exec.output.is_none());
3105    }
3106
3107    #[test]
3108    fn fail_execution_is_noop_when_already_terminal() {
3109        let state = make_state();
3110        let arn = "arn:aws:states:us-east-1:123456789012:execution:test:already2";
3111        create_execution(&state, arn, None);
3112        {
3113            let mut __a = state.write();
3114            let s = __a.default_mut();
3115            s.executions.get_mut(arn).unwrap().status = ExecutionStatus::Succeeded;
3116        }
3117        fail_execution(&state, arn, "Oops", "nope");
3118        let __a = state.read();
3119        let s = __a.default_ref();
3120        let exec = s.executions.get(arn).unwrap();
3121        assert_eq!(exec.status, ExecutionStatus::Succeeded);
3122        assert!(exec.error.is_none());
3123    }
3124
3125    // ── Pass state with ResultPath ──
3126
3127    #[test]
3128    fn pass_state_result_path_merges_into_input() {
3129        let state = make_state();
3130        let arn = arn_for("result-path");
3131        let def = json!({
3132            "StartAt": "P",
3133            "States": {
3134                "P": {"Type": "Pass", "Result": {"x": 2}, "ResultPath": "$.data", "End": true}
3135            }
3136        });
3137        drive(&state, &arn, def, Some(r#"{"a":1}"#));
3138        let output = read_exec(&state, &arn, |e| e.output.clone().unwrap_or_default());
3139        let v: Value = serde_json::from_str(&output).unwrap();
3140        assert_eq!(v["a"], 1);
3141        assert_eq!(v["data"]["x"], 2);
3142    }
3143
3144    // ── Choice with many operators ──
3145
3146    #[test]
3147    fn choice_string_greater_than_equals() {
3148        let state = make_state();
3149        let arn = arn_for("choice-sgte");
3150        let def = json!({
3151            "StartAt": "C",
3152            "States": {
3153                "C": {
3154                    "Type": "Choice",
3155                    "Choices": [
3156                        {"Variable": "$.val", "StringGreaterThanEquals": "apple", "Next": "End"}
3157                    ],
3158                    "Default": "Fail"
3159                },
3160                "End": {"Type": "Pass", "End": true},
3161                "Fail": {"Type": "Fail"}
3162            }
3163        });
3164        drive(&state, &arn, def, Some(r#"{"val":"banana"}"#));
3165        let status = read_exec(&state, &arn, |e| e.status);
3166        assert_eq!(status, ExecutionStatus::Succeeded);
3167    }
3168
3169    #[test]
3170    fn choice_is_present_and_is_null() {
3171        let state = make_state();
3172        let arn = arn_for("choice-ispres");
3173        let def = json!({
3174            "StartAt": "C",
3175            "States": {
3176                "C": {
3177                    "Type": "Choice",
3178                    "Choices": [
3179                        {"Variable": "$.foo", "IsPresent": true, "Next": "End"}
3180                    ],
3181                    "Default": "Fail"
3182                },
3183                "End": {"Type": "Pass", "End": true},
3184                "Fail": {"Type": "Fail"}
3185            }
3186        });
3187        drive(&state, &arn, def, Some(r#"{"foo":null}"#));
3188        assert_eq!(
3189            read_exec(&state, &arn, |e| e.status),
3190            ExecutionStatus::Succeeded
3191        );
3192    }
3193
3194    #[test]
3195    fn choice_or_short_circuits() {
3196        let state = make_state();
3197        let arn = arn_for("choice-or");
3198        let def = json!({
3199            "StartAt": "C",
3200            "States": {
3201                "C": {
3202                    "Type": "Choice",
3203                    "Choices": [{
3204                        "Or": [
3205                            {"Variable": "$.x", "NumericEquals": 99},
3206                            {"Variable": "$.y", "StringEquals": "b"}
3207                        ],
3208                        "Next": "End"
3209                    }],
3210                    "Default": "Fail"
3211                },
3212                "End": {"Type": "Pass", "End": true},
3213                "Fail": {"Type": "Fail"}
3214            }
3215        });
3216        drive(&state, &arn, def, Some(r#"{"x":1,"y":"b"}"#));
3217        assert_eq!(
3218            read_exec(&state, &arn, |e| e.status),
3219            ExecutionStatus::Succeeded
3220        );
3221    }
3222
3223    #[test]
3224    fn choice_not_negates() {
3225        let state = make_state();
3226        let arn = arn_for("choice-not");
3227        let def = json!({
3228            "StartAt": "C",
3229            "States": {
3230                "C": {
3231                    "Type": "Choice",
3232                    "Choices": [{
3233                        "Not": {"Variable": "$.x", "NumericEquals": 99},
3234                        "Next": "End"
3235                    }],
3236                    "Default": "Fail"
3237                },
3238                "End": {"Type": "Pass", "End": true},
3239                "Fail": {"Type": "Fail"}
3240            }
3241        });
3242        drive(&state, &arn, def, Some(r#"{"x":1}"#));
3243        assert_eq!(
3244            read_exec(&state, &arn, |e| e.status),
3245            ExecutionStatus::Succeeded
3246        );
3247    }
3248
3249    #[test]
3250    fn choice_boolean_equals() {
3251        let state = make_state();
3252        let arn = arn_for("choice-bool");
3253        let def = json!({
3254            "StartAt": "C",
3255            "States": {
3256                "C": {
3257                    "Type": "Choice",
3258                    "Choices": [
3259                        {"Variable": "$.ok", "BooleanEquals": true, "Next": "End"}
3260                    ],
3261                    "Default": "Fail"
3262                },
3263                "End": {"Type": "Pass", "End": true},
3264                "Fail": {"Type": "Fail"}
3265            }
3266        });
3267        drive(&state, &arn, def, Some(r#"{"ok":true}"#));
3268        assert_eq!(
3269            read_exec(&state, &arn, |e| e.status),
3270            ExecutionStatus::Succeeded
3271        );
3272    }
3273
3274    // ── Wait with SecondsPath ──
3275
3276    #[test]
3277    fn wait_seconds_path_uses_input_value() {
3278        let state = make_state();
3279        let arn = arn_for("wait-sp");
3280        let def = json!({
3281            "StartAt": "W",
3282            "States": {
3283                "W": {"Type": "Wait", "SecondsPath": "$.wait", "End": true}
3284            }
3285        });
3286        drive(&state, &arn, def, Some(r#"{"wait":0}"#));
3287        assert_eq!(
3288            read_exec(&state, &arn, |e| e.status),
3289            ExecutionStatus::Succeeded
3290        );
3291    }
3292
3293    // ── Map state with empty input array ──
3294
3295    #[test]
3296    fn map_state_empty_array_succeeds() {
3297        let state = make_state();
3298        let arn = arn_for("map-empty");
3299        let def = json!({
3300            "StartAt": "M",
3301            "States": {
3302                "M": {
3303                    "Type": "Map",
3304                    "ItemsPath": "$.items",
3305                    "ItemProcessor": {
3306                        "StartAt": "P",
3307                        "States": {
3308                            "P": {"Type": "Pass", "End": true}
3309                        }
3310                    },
3311                    "End": true
3312                }
3313            }
3314        });
3315        drive(&state, &arn, def, Some(r#"{"items":[]}"#));
3316        assert_eq!(
3317            read_exec(&state, &arn, |e| e.status),
3318            ExecutionStatus::Succeeded
3319        );
3320    }
3321
3322    // ── Fail state with Error + Cause ──
3323
3324    #[test]
3325    fn fail_state_with_explicit_error_and_cause() {
3326        let state = make_state();
3327        let arn = arn_for("fail-fields");
3328        create_execution(&state, &arn, None);
3329        let def = json!({
3330            "StartAt": "F",
3331            "States": {
3332                "F": {"Type": "Fail", "Error": "MyError", "Cause": "my cause"}
3333            }
3334        });
3335        drive(&state, &arn, def, None);
3336        let status = read_exec(&state, &arn, |e| e.status);
3337        assert_eq!(status, ExecutionStatus::Failed);
3338        let err = read_exec(&state, &arn, |e| e.error.clone().unwrap_or_default());
3339        assert_eq!(err, "MyError");
3340    }
3341}